Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions coriolis/osmorphing/osmount/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from oslo_log import log as logging

from coriolis import constants
from coriolis import exception
from coriolis.osmorphing.osmount import base
from coriolis import utils
Expand All @@ -15,6 +16,13 @@


class WindowsMountTools(base.BaseOSMountTools):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# A list of BitLocker encrypted volumes that were unlocked
# by us. We'll use a first-boot script to resume BitLocker.
self._unlocked_volumes: list[str] = []

def _connect(self):
connection_info = self._connection_info

Expand Down Expand Up @@ -223,9 +231,105 @@ def _set_volumes_drive_letter(self):
f"Error was: {utils.get_exception_details()}")
self._rebring_disks_online(disk_nums=disk_nums)

def _get_encrypted_volume_ids(self):
out = self._conn.exec_ps_command(
'gwmi -ns "Root\\CIMV2\\Security\\MicrosoftVolumeEncryption" '
'-class Win32_EncryptableVolume | % {$_.DeviceID}')
return [x for x in out.replace("\r\n", "\n").split("\n") if x]

def _unlock_encrypted_volume(self, volume_id: str, recovery_password: str):
self._conn.exec_ps_command(
f'manage-bde -unlock "{volume_id}" '
f'-RecoveryPassword "{recovery_password}"')

def _suspend_bitlocker(self, volume_id: str):
"""Suspend BitLocker until the next reboot for a given volume.

It doesn't decrypt the device, it just adds a publicly accessible
BitLocker protector that automatically unlocks the volume.

When the replica instance boots, the TPM protector will be reconfigured
automatically. Unfortunately the '-RebootCount' parameter isn't
honored, perhaps due to the fact that the disks are attached to a
separate VM. For this reason, we'll use a first-boot script to resume
BitLocker explicitly.
"""
self._conn.exec_ps_command(f'Suspend-BitLocker "{volume_id}"')

def _unlock_encrypted_volumes(self):
recovery_password = self._osmorphing_info.get(
constants.ENCRYPTED_DISKS_PASS)
if not recovery_password:
LOG.info("No encrypted disk password specified, "
"skipping BitLocker unlock.")
return

encrypted_volume_ids = self._get_encrypted_volume_ids()
if not encrypted_volume_ids:
LOG.warning("Received encrypted disk password but no "
"BitLocker encrypted volumes found.")
return

unlocked = False
for encrypted_volume_id in encrypted_volume_ids:
try:
self._unlock_encrypted_volume(
encrypted_volume_id, recovery_password)
LOG.info(
"Successfully unlocked BitLocker encrypted volume: %s",
encrypted_volume_id)
unlocked = True
except Exception:
LOG.info(
"Could not unlock volume %s using the specified "
"recovery password.",
encrypted_volume_id)
continue

# Suspend BitLocker until the replica boots.
#
# We'll intentionally propagate the failure if we managed to
# unlock the volume but failed to suspend BitLocker.
self._suspend_bitlocker(encrypted_volume_id)
self._unlocked_volumes.append(encrypted_volume_id)

if not unlocked:
raise exception.CoriolisException(
"Could not unlock any volume using the specified "
"BitLocker recovery password.")

def install_encryption_firstboot_setup(
self,
os_root_dir,
os_morphing_tools,
):
if not self._unlocked_volumes:
LOG.info(
"No unlocked BitLocker volumes, skipping first-boot setup.")
return

# We'll inject a first-boot script to resume BitLocker explicitly.
# Unfortunately the "-RebootCount" parameter of "Suspend-BitLocker"
# isn't honored, perhaps due to the fact that the disks are attached
# to a different VM.
script_content = ""
for encrypted_volume_id in self._unlocked_volumes:
LOG.info(
"Resuming BitLocker after first boot, volume: %s",
encrypted_volume_id)
script_content += f'Resume-BitLocker "{encrypted_volume_id}"\r\n'

# Resume BitLocker after bringing the disks online, which has a script
# priority of 10.
os_morphing_tools.register_firstboot_script(
script_content,
user_provided=False,
script_filename="11-bitlocker-firstboot.ps1")

def mount_os(self):
self._set_basic_disks_rw_mode()
self._bring_disks_online()
self._unlock_encrypted_volumes()
self._set_volumes_drive_letter()
fs_roots = utils.retry_on_error(sleep_seconds=5)(self._get_fs_roots)(
fail_if_empty=True)
Expand Down
170 changes: 170 additions & 0 deletions coriolis/tests/osmorphing/osmount/test_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from unittest import mock

from coriolis import constants
from coriolis import exception
from coriolis.osmorphing.osmount import windows
from coriolis.tests import test_base
Expand Down Expand Up @@ -336,3 +337,172 @@ def test_dismount_os(self):

self.tools._conn.exec_ps_command.assert_called_once_with(
'(Get-Disk | Where-Object { $_.IsBoot -eq $False }).Number')

def test_get_encrypted_volume_ids(self):
# Powershell wouldn't mix line endings, we're just ensuring that
# we can properly handle both line ending types.
self.tools._conn.exec_ps_command.return_value = (
"\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\\r\n"
"\\\\?\\Volume{7723f315-c13c-450c-8be6-f58e06f4ad45}\\\r\n"
"\\\\?\\Volume{cb7399af-8f6a-4a7b-a55c-e885ec3ff5fd}\\\n"
)

exp_ret = [
"\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\",
"\\\\?\\Volume{7723f315-c13c-450c-8be6-f58e06f4ad45}\\",
"\\\\?\\Volume{cb7399af-8f6a-4a7b-a55c-e885ec3ff5fd}\\",
]
ret = self.tools._get_encrypted_volume_ids()

self.assertEqual(exp_ret, ret)
self.tools._conn.exec_ps_command.assert_called_once_with(
'gwmi -ns "Root\\CIMV2\\Security\\MicrosoftVolumeEncryption" '
'-class Win32_EncryptableVolume | % {$_.DeviceID}')

def test_unlock_encrypted_volume(self):
vol = "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\"
password = "6010ba47-28e4-4105-8b0a-69eed0a54283"

self.tools._unlock_encrypted_volume(vol, password)

exp_cmd = 'manage-bde -unlock "%s" -RecoveryPassword "%s"' % (
vol, password)
self.tools._conn.exec_ps_command.assert_called_once_with(
exp_cmd)

def test_suspend_bitlocker(self):
vol = "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\"

self.tools._suspend_bitlocker(vol)

exp_cmd = 'Suspend-BitLocker "%s"' % (vol)
self.tools._conn.exec_ps_command.assert_called_once_with(
exp_cmd)

@mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
def test_unlock_encrypted_volumes_no_password(
self,
mock_get_encrypted_volume_ids,
):
self.tools._unlock_encrypted_volumes()
mock_get_encrypted_volume_ids.assert_not_called()

@mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
@mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
def test_unlock_encrypted_volumes_not_encrypted(
self,
mock_unlock_encrypted_volume,
mock_get_encrypted_volume_ids,
):
fake_pass = "fake-recovery-password"
self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass

mock_get_encrypted_volume_ids.return_value = []

self.tools._unlock_encrypted_volumes()
mock_unlock_encrypted_volume.assert_not_called()

@mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
@mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
@mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker")
def test_unlock_encrypted_volumes_all_failed(
self,
mock_suspend_bitlocker,
mock_unlock_encrypted_volume,
mock_get_encrypted_volume_ids,
):
fake_pass = "fake-recovery-password"
self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass

mock_get_encrypted_volume_ids.return_value = [
mock.sentinel.volume0,
mock.sentinel.volume1,
]
mock_unlock_encrypted_volume.side_effect = ValueError

self.assertRaises(
exception.CoriolisException,
self.tools._unlock_encrypted_volumes,
)

@mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
@mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
@mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker")
def test_unlock_encrypted_volumes_one_failed(
self,
mock_suspend_bitlocker,
mock_unlock_encrypted_volume,
mock_get_encrypted_volume_ids,
):
fake_pass = "fake-recovery-password"
self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass

encrypted_volume_ids = [
mock.sentinel.volume0,
mock.sentinel.volume1,
]
mock_get_encrypted_volume_ids.return_value = encrypted_volume_ids
mock_unlock_encrypted_volume.side_effect = [ValueError, None]

self.tools._unlock_encrypted_volumes()

mock_unlock_encrypted_volume.assert_has_calls(
[mock.call(vol_id, fake_pass) for vol_id in encrypted_volume_ids])
mock_suspend_bitlocker.assert_called_once_with(
mock.sentinel.volume1)

self.assertEqual(
self.tools._unlocked_volumes, [mock.sentinel.volume1])

@mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
@mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
@mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker")
def test_unlock_encrypted_volumes_suspend_failed(
self,
mock_suspend_bitlocker,
mock_unlock_encrypted_volume,
mock_get_encrypted_volume_ids,
):
fake_pass = "fake-recovery-password"
self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass

encrypted_volume_ids = [
mock.sentinel.volume0,
mock.sentinel.volume1,
mock.sentinel.volume2
]
mock_get_encrypted_volume_ids.return_value = encrypted_volume_ids
mock_unlock_encrypted_volume.side_effect = [ValueError, None, None]
mock_suspend_bitlocker.side_effect = [IOError, None]

# It should error out immediately when failing to suspend BitLocker.
self.assertRaises(
IOError,
self.tools._unlock_encrypted_volumes,
)
mock_unlock_encrypted_volume.assert_has_calls(
[mock.call(mock.sentinel.volume0, fake_pass),
mock.call(mock.sentinel.volume1, fake_pass)]
)

def test_install_encryption_firstboot_setup_noop(self):
# No unlocked volumes, nothing to do.
mock_morphing_tools = mock.Mock()
self.tools.install_encryption_firstboot_setup(
mock.sentinel.os_root_dir,
mock_morphing_tools)
mock_morphing_tools.register_firstboot_script.assert_not_called()

def test_install_encryption_firstboot_setup(self):
self.tools._unlocked_volumes = ["vol1", "vol2"]
mock_morphing_tools = mock.Mock()
self.tools.install_encryption_firstboot_setup(
mock.sentinel.os_root_dir,
mock_morphing_tools)

expected_script = (
'Resume-BitLocker "vol1"\r\nResume-BitLocker "vol2"\r\n')
mock_morphing_tools.register_firstboot_script.assert_called_once_with(
expected_script,
user_provided=False,
script_filename="11-bitlocker-firstboot.ps1")
1 change: 1 addition & 0 deletions coriolis/tests/osmorphing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def test_morph_image(
mock_get_os_mount_tools.assert_called_once_with(
'linux', mock.sentinel.connection_info, self.event_manager, [], 60,
osmorphing_info=self.osmorphing_info)
mock_EventManager.assert_called_with(self.event_handler)

self.os_mount_tools.dismount_os.assert_called_once()

Expand Down
Loading