diff --git a/coriolis/tests/integration/base.py b/coriolis/tests/integration/base.py index f9c00e04..022101ed 100644 --- a/coriolis/tests/integration/base.py +++ b/coriolis/tests/integration/base.py @@ -13,7 +13,6 @@ """ import os -import subprocess import time import unittest from unittest import mock @@ -239,17 +238,7 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase): @classmethod def setUpClass(cls): - result = subprocess.run( - ["docker", "image", "inspect", test_utils.DATA_MINION_IMAGE], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - if result.returncode != 0: - raise unittest.SkipTest( - "Docker image not found; build it with: " - "docker build -t %s " - "coriolis/tests/integration/dockerfiles/data-minion/" - % test_utils.DATA_MINION_IMAGE) + harness._IntegrationHarness.get().imp_harness.check_prerequisites() super().setUpClass() @@ -289,8 +278,6 @@ def setUp(self): self._src_device = test_utils.add_scsi_debug_device() self.addCleanup(test_utils.remove_scsi_debug_device) - self._dst_device = test_utils.add_scsi_debug_device() - self.addCleanup(test_utils.remove_scsi_debug_device) # Write a test pattern on the src device. # Incremental transfer tests update the second chunk (offset=4096). @@ -302,14 +289,19 @@ def setUp(self): # Use basename as instance name; real VM names do not contain slashes, # and some providers use the name as is in resource indentifiers. self._instance_name = os.path.basename(self._src_device) + dst_environment = self._harness.imp_harness.prepare_transfer(self) self._transfer = self._create_transfer( self._src_endpoint.id, self._dst_endpoint.id, instances=[self._instance_name], destination_minion_pool_id=self._pool_id, source_environment={"block_device_path": self._src_device}, - destination_environment={"devices": [self._dst_device]}, + destination_environment=dst_environment, ) + # Safety-net cleanup for destination devices allocated by the provider. + # Must be registered after the transfer, so it runs (LIFO) before the + # transfer delete, while the volumes_info is still in the DB. + self.addCleanup(self._cleanup_provider_dst_devices) # mock a few commands that are going to be ran through ssh; they won't # pass anyway. @@ -325,6 +317,44 @@ def setUp(self): mocker.start() self.addCleanup(mocker.stop) + @property + def _dst_device(self): + """First destination dev path from the transfer's volumes_info.""" + ctxt = self._get_db_context() + + transfer = db_api.get_transfer( + ctxt, self._transfer.id, include_task_info=True) + info = transfer.get("info", {}).get(self._instance_name, {}) + for vol in info.get("volumes_info", []): + if vol.get("volume_dev"): + return vol["volume_dev"] + + return None + + def _cleanup_provider_dst_devices(self): + """Remove any devices the provider allocated for this test. + + Acts as a safety net for cases where delete_replica_disks was not + called (e.g. the execution never reached that task, or the test did + not explicitly trigger a delete-disks execution). Uses + os.path.exists so it is a no-op when the provider already cleaned up. + """ + ctxt = self._get_db_context() + + try: + transfer = db_api.get_transfer( + ctxt, self._transfer.id, include_task_info=True) + volumes_info = transfer.get("info", {}).get( + self._instance_name, {}).get('volumes_info', []) + except Exception as ex: + LOG.warn("Could not get volumes info for cleanup. Ex: %s", ex) + return + + for vol in volumes_info: + device = vol.get('volume_dev') + if device and os.path.exists(device): + test_utils.remove_scsi_debug_device() + def _execute_and_wait(self, transfer_id, timeout=300): """Trigger one execution of *transfer_id* and wait for completion.""" execution = self._client.transfer_executions.create( @@ -426,6 +456,9 @@ def _cleanup_deployment(self, deployment_id): that occurs when a deployment is still in-flight at cleanup time, which can happen with slow providers when a test fails or times out before the deployment completes. + + Calls ``imp_harness.cleanup_deployed_instance`` for every deployment + instance, so that finalized VMs at the destination are destroyed. """ ctxt = self._get_db_context() deployment = db_api.get_deployment(ctxt, deployment_id) @@ -434,6 +467,8 @@ def _cleanup_deployment(self, deployment_id): "Deployment '%s' not found. Skip cleanup.", deployment_id) return + instances = list(deployment.instances or []) + if deployment.last_execution_status in ( constants.ACTIVE_EXECUTION_STATUSES): self._client.deployments.cancel(deployment_id) @@ -441,6 +476,15 @@ def _cleanup_deployment(self, deployment_id): self._client.deployments.delete(deployment_id) + for instance_name in instances: + try: + self._harness.imp_harness.cleanup_deployed_instance( + instance_name) + except Exception as ex: + LOG.warning( + "Could not clean up deployed instance '%s': %s", + instance_name, ex) + def wait_for_deployment(self, deployment_id, timeout=300, desired_statuses=None): """Block until *deployment_id* reaches any terminal state. diff --git a/coriolis/tests/integration/deployments/test_osmorphing.py b/coriolis/tests/integration/deployments/test_osmorphing.py index ab49d341..3a0894a3 100644 --- a/coriolis/tests/integration/deployments/test_osmorphing.py +++ b/coriolis/tests/integration/deployments/test_osmorphing.py @@ -7,9 +7,11 @@ installation in the target OS. """ +import unittest import uuid from coriolis.tests.integration import base as integration_base +from coriolis.tests.integration import harness as integration_harness from coriolis.tests.integration import utils as test_utils @@ -19,6 +21,14 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase): # any new packages to be added during OS morphing. _SCSI_DEBUG_SIZE_MB = 256 + @classmethod + def setUpClass(cls): + harness = integration_harness._IntegrationHarness.get() + if not harness.imp_harness.are_disks_local: + raise unittest.SkipTest( + "OS morphing tests require local disk access") + super().setUpClass() + def setUp(self): super().setUp() test_utils.write_os_image_to_disk(self._src_device, "ubuntu:24.04") diff --git a/coriolis/tests/integration/harness.py b/coriolis/tests/integration/harness.py index b96d4a58..665cf0fc 100644 --- a/coriolis/tests/integration/harness.py +++ b/coriolis/tests/integration/harness.py @@ -24,6 +24,7 @@ import shutil import socket import subprocess +import sys import tempfile import uuid @@ -55,6 +56,7 @@ from coriolis import service from coriolis.taskflow import runner as taskflow_runner from coriolis.tasks import factory as task_runners_factory +from coriolis.tests.integration.test_provider import harness as test_harness from coriolis.tests.integration import utils as test_utils from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server from coriolis import utils as coriolis_utils @@ -68,9 +70,6 @@ _TEST_EXPORT_PROVIDER = ( "coriolis.tests.integration.test_provider.exp.TestExportProvider" ) -_TEST_IMPORT_PROVIDER = ( - "coriolis.tests.integration.test_provider.imp.TestImportProvider" -) # Fixed project used for all test requests. _TEST_PROJECT_ID = 'integration-project' @@ -276,8 +275,13 @@ def __init__(self): cfg.CONF([], project='coriolis', version='1.0.0', default_config_files=[], default_config_dirs=[]) cfg.CONF.set_override('messaging_transport_url', 'fake://') + + imp_harness_cls = test_harness.TestImportHarness + imp_cls = imp_harness_cls.provider_class + imp_provider = "%s.%s" % (imp_cls.__module__, imp_cls.__qualname__) + cfg.CONF.set_override( - 'providers', [_TEST_EXPORT_PROVIDER, _TEST_IMPORT_PROVIDER]) + 'providers', [_TEST_EXPORT_PROVIDER, imp_provider]) db_url = ('mysql+pymysql://%(user)s:%(password)s' '@localhost:13306/%(database)s') % { "user": self._mysql_username, @@ -302,19 +306,18 @@ def __init__(self): # Policy enforcer: reset so it re-reads the new CONF (no policy file). policy_module.reset() + # Init exporter. self.exp_provider_class = _get_provider(_TEST_EXPORT_PROVIDER) self.exp_provider_platform = self.exp_provider_class.platform self.exp_conn_info = { "pkey_path": self.ssh_key_path, "role": "source", } - - self.imp_provider_class = _get_provider(_TEST_IMPORT_PROVIDER) + # Init importer. + self.imp_provider_class = imp_cls self.imp_provider_platform = self.imp_provider_class.platform - self.imp_conn_info = { - "pkey_path": self.ssh_key_path, - "role": "destination", - } + self.imp_harness = imp_harness_cls(self) + self.imp_conn_info = self.imp_harness.get_connection_info() self._wsgi_server = None self._wsgi_server_thread = None @@ -332,7 +335,9 @@ def __init__(self): sqlalchemy_api._facade = None rpc_module._TRANSPORT = None + atexit.register(self._check_harness_errors) atexit.register(self._teardown) + atexit.register(self.imp_harness.teardown) self._start_db_container() @@ -472,6 +477,15 @@ def _start_coriolis_services(self): daemon=True, ) + def _check_harness_errors(self): + errors = self.imp_harness._deferred_errors + if not errors: + return + + for err in errors: + LOG.error("Harness teardown failure: %s", err) + sys.exit(1) + def _teardown(self): LOG.info("Teardown initiated.") diff --git a/coriolis/tests/integration/provider_harness.py b/coriolis/tests/integration/provider_harness.py new file mode 100644 index 00000000..b5e6d4be --- /dev/null +++ b/coriolis/tests/integration/provider_harness.py @@ -0,0 +1,81 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +""" +Abstract base class for provider harnesses. + +A harness encapsulates all provider-specific logic that cannot be expressed +through static YAML configuration: per-transfer resource allocation, +end-of-session leak assertions, VM finalization, etc. +""" + +import abc + +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + + +class ImportProviderHarness(abc.ABC): + # Subclasses must set this to the import provider class they wrap. + # The framework derives the dotted provider path from it automatically. + provider_class = None + + def __init__(self, harness): + self._deferred_errors = [] + + def _record_error(self, exc): + """Stash *exc* for deferred reporting after all teardown completes.""" + self._deferred_errors.append(exc) + + def teardown(self): + """One-time teardown called at atexit.""" + + def check_prerequisites(self): + """Raise ``unittest.SkipTest`` if required infrastructure is absent.""" + + @abc.abstractmethod + def get_connection_info(self) -> dict: + """Return ``connection_info`` for the destination endpoint.""" + + def prepare_transfer(self, test_instance) -> dict: + """Allocate per-transfer destination resources. + + Returns a dict merged into ``destination_environment`` for this + transfer. Must register ``addCleanup()`` on *test_instance* for + every resource created, so it is released even on test failure. + """ + return {} + + @property + def are_disks_local(self) -> bool: + """True if destination disks are local block devices on the test host. + + When True, tests may directly read / mount the destination device to + verify contents. + """ + return False + + def check_no_resources_leaked(self): + """Check for resources left behind by this test session. + + Subclasses record each leak via ``_record_error()``, and log errors + rather than raising, so that all checks and teardowns run regardless of + earlier failures. + """ + + def cleanup_deployed_instance(self, instance_name: str): + """Destroy the VM created at the destination by a completed deployment. + + Called during integration test cleanup after each deployment test, so + that finalized VMs do not accumulate across runs and cause failures in + later tests (e.g. name collisions, resource exhaustion). + + The default is a no-op; harnesses that create real VMs must override + this method. Implementations should be idempotent: if the VM is + already gone the method must return without raising. + """ + + def get_minion_pool_config(self) -> dict | None: + """Return minion-pool creation kwargs, or None.""" + return None diff --git a/coriolis/tests/integration/test_provider/harness.py b/coriolis/tests/integration/test_provider/harness.py new file mode 100644 index 00000000..fb545480 --- /dev/null +++ b/coriolis/tests/integration/test_provider/harness.py @@ -0,0 +1,99 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +""" +Import harness for the test Docker provider. +""" + +import shutil +import subprocess +import tempfile +import unittest + +from oslo_log import log as logging + +from coriolis.tests.integration import provider_harness +from coriolis.tests.integration.test_provider import imp +from coriolis.tests.integration import utils as test_utils + +LOG = logging.getLogger(__name__) + +# Name prefixes used by TestImportProvider._create_minion callers. +_CONTAINER_PREFIXES = ( + "coriolis-writer-", + "coriolis-osmorphing-", + "coriolis-pool-minion-", +) + + +class TestImportHarness(provider_harness.ImportProviderHarness): + """Import harness for the test Docker provider.""" + + provider_class = imp.TestImportProvider + + def __init__(self, harness): + super().__init__(harness) + self._initial_containers = self._list_test_containers() + self._workdir = tempfile.mkdtemp(prefix="coriolis-test-imp-") + self._ssh_key_path = "%s/id_rsa" % self._workdir + + subprocess.run( + ["ssh-keygen", "-t", "rsa", "-b", "2048", + "-f", self._ssh_key_path, "-N", ""], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + def teardown(self): + self.check_no_resources_leaked() + + if self._workdir: + shutil.rmtree(self._workdir, ignore_errors=True) + + for name in self._list_test_containers() - self._initial_containers: + test_utils.remove_container(name) + + def check_prerequisites(self): + result = subprocess.run( + ["docker", "image", "inspect", test_utils.DATA_MINION_IMAGE], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + if result.returncode != 0: + raise unittest.SkipTest( + "Docker image '%s' not found; build it with: " + "docker build -t %s " + "coriolis/tests/integration/dockerfiles/data-minion/" + % (test_utils.DATA_MINION_IMAGE, test_utils.DATA_MINION_IMAGE) + ) + + def get_connection_info(self) -> dict: + return { + "pkey_path": self._ssh_key_path, + "role": "destination", + } + + def check_no_resources_leaked(self): + leaked = self._list_test_containers() - self._initial_containers + if leaked: + msg = "Docker containers leaked by integration tests: %s" % leaked + LOG.error(msg) + self._record_error(AssertionError(msg)) + + @property + def are_disks_local(self) -> bool: + return True + + def _list_test_containers(self) -> set: + result = subprocess.run( + ["docker", "ps", "-a", "--format", "{{.Names}}"], + capture_output=True, + text=True, + ) + + return { + name for name in result.stdout.splitlines() + if any(name.startswith(p) for p in _CONTAINER_PREFIXES) + } diff --git a/coriolis/tests/integration/test_provider/imp.py b/coriolis/tests/integration/test_provider/imp.py index b37d4c78..f677841b 100644 --- a/coriolis/tests/integration/test_provider/imp.py +++ b/coriolis/tests/integration/test_provider/imp.py @@ -55,7 +55,6 @@ class TestImportProvider( ``target_environment`` (per-transfer destination settings) has the form:: { - "devices": ["/dev/sdY", ...], # pre-allocated destination devs } """ @@ -86,12 +85,7 @@ def validate_connection(self, ctxt, connection_info): def get_target_environment_schema(self): return { "type": "object", - "properties": { - "devices": { - "type": "array", - "items": {"type": "string"}, - }, - }, + "properties": {}, "required": [], } @@ -131,25 +125,14 @@ def check_update_destination_environment_params( def deploy_replica_disks( self, ctxt, connection_info, target_environment, instance_name, export_info, volumes_info): - """Map each source disk in export_info to a destination device. - - Returns a volumes_info list where each entry has ``disk_id`` (from - the source) and ``volume_dev`` (the destination block device path). - """ - dest_devices = list(target_environment["devices"]) + """Allocate disks and return volumes_info.""" src_disks = export_info.get("devices", {}).get("disks", []) - if len(src_disks) > len(dest_devices): - raise ValueError( - "Not enough destination devices (%d) for %d source disks" - % (len(dest_devices), len(src_disks)) - ) - result = [] for i, disk in enumerate(src_disks): result.append({ "disk_id": disk["id"], - "volume_dev": dest_devices[i], + "volume_dev": test_utils.add_scsi_debug_device(), }) return result @@ -221,7 +204,8 @@ def delete_replica_target_resources( def delete_replica_disks( self, ctxt, connection_info, target_environment, volumes_info): - # scsi_debug devices are managed externally; nothing to delete here. + for _ in (volumes_info or []): + test_utils.remove_scsi_debug_device() return volumes_info def create_replica_disk_snapshots( @@ -240,7 +224,14 @@ def restore_replica_disk_snapshots( def deploy_replica_instance( self, ctxt, connection_info, target_environment, instance_name, export_info, volumes_info, clone_disks): - return {"instance_deployment_info": {}} + devices = [ + vol["volume_dev"] for vol in volumes_info if vol.get("volume_dev") + ] + return { + "instance_deployment_info": { + "devices": devices, + }, + } def finalize_replica_instance_deployment( self, ctxt, connection_info, target_environment, @@ -277,7 +268,7 @@ def get_os_morphing_tools(self, os_type, osmorphing_info): def deploy_os_morphing_resources( self, ctxt, connection_info, target_environment, instance_deployment_info): - devices = list(target_environment.get("devices", [])) + devices = list(instance_deployment_info.get("devices", [])) # lsblk inside the container sees all the host block devices because # Docker containers share the host kernel's sysfs (/sys/block/). diff --git a/coriolis/tests/integration/transfers/test_transfer.py b/coriolis/tests/integration/transfers/test_transfer.py index 23e8625b..5f00fe38 100644 --- a/coriolis/tests/integration/transfers/test_transfer.py +++ b/coriolis/tests/integration/transfers/test_transfer.py @@ -50,10 +50,11 @@ def test_incremental_replica_transfer(self): # First run: full transfer self._execute_and_wait(self._transfer.id) - self.assertTrue( - test_utils.devices_match(self._src_device, self._dst_device), - "Devices do not match after initial full transfer", - ) + if self._harness.imp_harness.are_disks_local: + self.assertTrue( + test_utils.devices_match(self._src_device, self._dst_device), + "Devices do not match after initial full transfer", + ) # Mutate source: write a different pattern at the second chunk test_utils.write_bytes_at_offset( @@ -61,18 +62,20 @@ def test_incremental_replica_transfer(self): offset=4096, data=b"\xff\xfe\xfd\xfc" * 1024, ) - self.assertFalse( - test_utils.devices_match(self._src_device, self._dst_device), - "Devices should differ after mutating the source", - ) + if self._harness.imp_harness.are_disks_local: + self.assertFalse( + test_utils.devices_match(self._src_device, self._dst_device), + "Devices should differ after mutating the source", + ) # Second run: incremental self._execute_and_wait(self._transfer.id) - self.assertTrue( - test_utils.devices_match(self._src_device, self._dst_device), - "Destination does not match source after incremental transfer", - ) + if self._harness.imp_harness.are_disks_local: + self.assertTrue( + test_utils.devices_match(self._src_device, self._dst_device), + "Destination does not match source after incremental transfer", + ) class MinionPoolTransferTest(