diff --git a/coriolis/osmorphing/osmount/windows.py b/coriolis/osmorphing/osmount/windows.py index 2d8236e2..d2f03f28 100644 --- a/coriolis/osmorphing/osmount/windows.py +++ b/coriolis/osmorphing/osmount/windows.py @@ -6,6 +6,7 @@ from oslo_log import log as logging +from coriolis import constants from coriolis import exception from coriolis.osmorphing.osmount import base from coriolis import utils @@ -15,6 +16,13 @@ class WindowsMountTools(base.BaseOSMountTools): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # A list of BitLocker encrypted volumes that were unlocked + # by us. We'll use a first-boot script to resume BitLocker. + self._unlocked_volumes: list[str] = [] + def _connect(self): connection_info = self._connection_info @@ -223,9 +231,105 @@ def _set_volumes_drive_letter(self): f"Error was: {utils.get_exception_details()}") self._rebring_disks_online(disk_nums=disk_nums) + def _get_encrypted_volume_ids(self): + out = self._conn.exec_ps_command( + 'gwmi -ns "Root\\CIMV2\\Security\\MicrosoftVolumeEncryption" ' + '-class Win32_EncryptableVolume | % {$_.DeviceID}') + return [x for x in out.replace("\r\n", "\n").split("\n") if x] + + def _unlock_encrypted_volume(self, volume_id: str, recovery_password: str): + self._conn.exec_ps_command( + f'manage-bde -unlock "{volume_id}" ' + f'-RecoveryPassword "{recovery_password}"') + + def _suspend_bitlocker(self, volume_id: str): + """Suspend BitLocker until the next reboot for a given volume. + + It doesn't decrypt the device, it just adds a publicly accessible + BitLocker protector that automatically unlocks the volume. + + When the replica instance boots, the TPM protector will be reconfigured + automatically. Unfortunately the '-RebootCount' parameter isn't + honored, perhaps due to the fact that the disks are attached to a + separate VM. For this reason, we'll use a first-boot script to resume + BitLocker explicitly. + """ + self._conn.exec_ps_command(f'Suspend-BitLocker "{volume_id}"') + + def _unlock_encrypted_volumes(self): + recovery_password = self._osmorphing_info.get( + constants.ENCRYPTED_DISKS_PASS) + if not recovery_password: + LOG.info("No encrypted disk password specified, " + "skipping BitLocker unlock.") + return + + encrypted_volume_ids = self._get_encrypted_volume_ids() + if not encrypted_volume_ids: + LOG.warning("Received encrypted disk password but no " + "BitLocker encrypted volumes found.") + return + + unlocked = False + for encrypted_volume_id in encrypted_volume_ids: + try: + self._unlock_encrypted_volume( + encrypted_volume_id, recovery_password) + LOG.info( + "Successfully unlocked BitLocker encrypted volume: %s", + encrypted_volume_id) + unlocked = True + except Exception: + LOG.info( + "Could not unlock volume %s using the specified " + "recovery password.", + encrypted_volume_id) + continue + + # Suspend BitLocker until the replica boots. + # + # We'll intentionally propagate the failure if we managed to + # unlock the volume but failed to suspend BitLocker. + self._suspend_bitlocker(encrypted_volume_id) + self._unlocked_volumes.append(encrypted_volume_id) + + if not unlocked: + raise exception.CoriolisException( + "Could not unlock any volume using the specified " + "BitLocker recovery password.") + + def install_encryption_firstboot_setup( + self, + os_root_dir, + os_morphing_tools, + ): + if not self._unlocked_volumes: + LOG.info( + "No unlocked BitLocker volumes, skipping first-boot setup.") + return + + # We'll inject a first-boot script to resume BitLocker explicitly. + # Unfortunately the "-RebootCount" parameter of "Suspend-BitLocker" + # isn't honored, perhaps due to the fact that the disks are attached + # to a different VM. + script_content = "" + for encrypted_volume_id in self._unlocked_volumes: + LOG.info( + "Resuming BitLocker after first boot, volume: %s", + encrypted_volume_id) + script_content += f'Resume-BitLocker "{encrypted_volume_id}"\r\n' + + # Resume BitLocker after bringing the disks online, which has a script + # priority of 10. + os_morphing_tools.register_firstboot_script( + script_content, + user_provided=False, + script_filename="11-bitlocker-firstboot.ps1") + def mount_os(self): self._set_basic_disks_rw_mode() self._bring_disks_online() + self._unlock_encrypted_volumes() self._set_volumes_drive_letter() fs_roots = utils.retry_on_error(sleep_seconds=5)(self._get_fs_roots)( fail_if_empty=True) diff --git a/coriolis/tests/osmorphing/osmount/test_windows.py b/coriolis/tests/osmorphing/osmount/test_windows.py index 56dd8343..15810995 100644 --- a/coriolis/tests/osmorphing/osmount/test_windows.py +++ b/coriolis/tests/osmorphing/osmount/test_windows.py @@ -4,6 +4,7 @@ import logging from unittest import mock +from coriolis import constants from coriolis import exception from coriolis.osmorphing.osmount import windows from coriolis.tests import test_base @@ -336,3 +337,172 @@ def test_dismount_os(self): self.tools._conn.exec_ps_command.assert_called_once_with( '(Get-Disk | Where-Object { $_.IsBoot -eq $False }).Number') + + def test_get_encrypted_volume_ids(self): + # Powershell wouldn't mix line endings, we're just ensuring that + # we can properly handle both line ending types. + self.tools._conn.exec_ps_command.return_value = ( + "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\\r\n" + "\\\\?\\Volume{7723f315-c13c-450c-8be6-f58e06f4ad45}\\\r\n" + "\\\\?\\Volume{cb7399af-8f6a-4a7b-a55c-e885ec3ff5fd}\\\n" + ) + + exp_ret = [ + "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\", + "\\\\?\\Volume{7723f315-c13c-450c-8be6-f58e06f4ad45}\\", + "\\\\?\\Volume{cb7399af-8f6a-4a7b-a55c-e885ec3ff5fd}\\", + ] + ret = self.tools._get_encrypted_volume_ids() + + self.assertEqual(exp_ret, ret) + self.tools._conn.exec_ps_command.assert_called_once_with( + 'gwmi -ns "Root\\CIMV2\\Security\\MicrosoftVolumeEncryption" ' + '-class Win32_EncryptableVolume | % {$_.DeviceID}') + + def test_unlock_encrypted_volume(self): + vol = "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\" + password = "6010ba47-28e4-4105-8b0a-69eed0a54283" + + self.tools._unlock_encrypted_volume(vol, password) + + exp_cmd = 'manage-bde -unlock "%s" -RecoveryPassword "%s"' % ( + vol, password) + self.tools._conn.exec_ps_command.assert_called_once_with( + exp_cmd) + + def test_suspend_bitlocker(self): + vol = "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\" + + self.tools._suspend_bitlocker(vol) + + exp_cmd = 'Suspend-BitLocker "%s"' % (vol) + self.tools._conn.exec_ps_command.assert_called_once_with( + exp_cmd) + + @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids") + def test_unlock_encrypted_volumes_no_password( + self, + mock_get_encrypted_volume_ids, + ): + self.tools._unlock_encrypted_volumes() + mock_get_encrypted_volume_ids.assert_not_called() + + @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids") + @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume") + def test_unlock_encrypted_volumes_not_encrypted( + self, + mock_unlock_encrypted_volume, + mock_get_encrypted_volume_ids, + ): + fake_pass = "fake-recovery-password" + self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass + + mock_get_encrypted_volume_ids.return_value = [] + + self.tools._unlock_encrypted_volumes() + mock_unlock_encrypted_volume.assert_not_called() + + @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids") + @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume") + @mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker") + def test_unlock_encrypted_volumes_all_failed( + self, + mock_suspend_bitlocker, + mock_unlock_encrypted_volume, + mock_get_encrypted_volume_ids, + ): + fake_pass = "fake-recovery-password" + self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass + + mock_get_encrypted_volume_ids.return_value = [ + mock.sentinel.volume0, + mock.sentinel.volume1, + ] + mock_unlock_encrypted_volume.side_effect = ValueError + + self.assertRaises( + exception.CoriolisException, + self.tools._unlock_encrypted_volumes, + ) + + @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids") + @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume") + @mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker") + def test_unlock_encrypted_volumes_one_failed( + self, + mock_suspend_bitlocker, + mock_unlock_encrypted_volume, + mock_get_encrypted_volume_ids, + ): + fake_pass = "fake-recovery-password" + self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass + + encrypted_volume_ids = [ + mock.sentinel.volume0, + mock.sentinel.volume1, + ] + mock_get_encrypted_volume_ids.return_value = encrypted_volume_ids + mock_unlock_encrypted_volume.side_effect = [ValueError, None] + + self.tools._unlock_encrypted_volumes() + + mock_unlock_encrypted_volume.assert_has_calls( + [mock.call(vol_id, fake_pass) for vol_id in encrypted_volume_ids]) + mock_suspend_bitlocker.assert_called_once_with( + mock.sentinel.volume1) + + self.assertEqual( + self.tools._unlocked_volumes, [mock.sentinel.volume1]) + + @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids") + @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume") + @mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker") + def test_unlock_encrypted_volumes_suspend_failed( + self, + mock_suspend_bitlocker, + mock_unlock_encrypted_volume, + mock_get_encrypted_volume_ids, + ): + fake_pass = "fake-recovery-password" + self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass + + encrypted_volume_ids = [ + mock.sentinel.volume0, + mock.sentinel.volume1, + mock.sentinel.volume2 + ] + mock_get_encrypted_volume_ids.return_value = encrypted_volume_ids + mock_unlock_encrypted_volume.side_effect = [ValueError, None, None] + mock_suspend_bitlocker.side_effect = [IOError, None] + + # It should error out immediately when failing to suspend BitLocker. + self.assertRaises( + IOError, + self.tools._unlock_encrypted_volumes, + ) + mock_unlock_encrypted_volume.assert_has_calls( + [mock.call(mock.sentinel.volume0, fake_pass), + mock.call(mock.sentinel.volume1, fake_pass)] + ) + + def test_install_encryption_firstboot_setup_noop(self): + # No unlocked volumes, nothing to do. + mock_morphing_tools = mock.Mock() + self.tools.install_encryption_firstboot_setup( + mock.sentinel.os_root_dir, + mock_morphing_tools) + mock_morphing_tools.register_firstboot_script.assert_not_called() + + def test_install_encryption_firstboot_setup(self): + self.tools._unlocked_volumes = ["vol1", "vol2"] + mock_morphing_tools = mock.Mock() + self.tools.install_encryption_firstboot_setup( + mock.sentinel.os_root_dir, + mock_morphing_tools) + + expected_script = ( + 'Resume-BitLocker "vol1"\r\nResume-BitLocker "vol2"\r\n') + mock_morphing_tools.register_firstboot_script.assert_called_once_with( + expected_script, + user_provided=False, + script_filename="11-bitlocker-firstboot.ps1") diff --git a/coriolis/tests/osmorphing/test_manager.py b/coriolis/tests/osmorphing/test_manager.py index e85669b6..9b63b76f 100644 --- a/coriolis/tests/osmorphing/test_manager.py +++ b/coriolis/tests/osmorphing/test_manager.py @@ -255,6 +255,7 @@ def test_morph_image( mock_get_os_mount_tools.assert_called_once_with( 'linux', mock.sentinel.connection_info, self.event_manager, [], 60, osmorphing_info=self.osmorphing_info) + mock_EventManager.assert_called_with(self.event_handler) self.os_mount_tools.dismount_os.assert_called_once()