From 6167a3cf7e8a37db95558289e6a1fdaeffc260e1 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Thu, 28 May 2026 13:01:16 +0000 Subject: [PATCH 1/2] integration: Move destination device initialization to the provider Currently, we create a destination device in the test base. However, the import provider is supposed to create the disks for transfered as required. Moving this part into the test provider will make it easier for us to swap in other providers later on. --- coriolis/tests/integration/base.py | 45 +++++++++++++++++-- .../tests/integration/test_provider/imp.py | 37 ++++++--------- 2 files changed, 56 insertions(+), 26 deletions(-) diff --git a/coriolis/tests/integration/base.py b/coriolis/tests/integration/base.py index f9c00e04..be340464 100644 --- a/coriolis/tests/integration/base.py +++ b/coriolis/tests/integration/base.py @@ -289,8 +289,6 @@ def setUp(self): self._src_device = test_utils.add_scsi_debug_device() self.addCleanup(test_utils.remove_scsi_debug_device) - self._dst_device = test_utils.add_scsi_debug_device() - self.addCleanup(test_utils.remove_scsi_debug_device) # Write a test pattern on the src device. # Incremental transfer tests update the second chunk (offset=4096). @@ -308,8 +306,11 @@ def setUp(self): instances=[self._instance_name], destination_minion_pool_id=self._pool_id, source_environment={"block_device_path": self._src_device}, - destination_environment={"devices": [self._dst_device]}, ) + # Safety-net cleanup for destination devices allocated by the provider. + # Must be registered after the transfer, so it runs (LIFO) before the + # transfer delete, while the volumes_info is still in the DB. + self.addCleanup(self._cleanup_provider_dst_devices) # mock a few commands that are going to be ran through ssh; they won't # pass anyway. @@ -325,6 +326,44 @@ def setUp(self): mocker.start() self.addCleanup(mocker.stop) + @property + def _dst_device(self): + """First destination dev path from the transfer's volumes_info.""" + ctxt = self._get_db_context() + + transfer = db_api.get_transfer( + ctxt, self._transfer.id, include_task_info=True) + info = transfer.get("info", {}).get(self._instance_name, {}) + for vol in info.get("volumes_info", []): + if vol.get("volume_dev"): + return vol["volume_dev"] + + return None + + def _cleanup_provider_dst_devices(self): + """Remove any devices the provider allocated for this test. + + Acts as a safety net for cases where delete_replica_disks was not + called (e.g. the execution never reached that task, or the test did + not explicitly trigger a delete-disks execution). Uses + os.path.exists so it is a no-op when the provider already cleaned up. + """ + ctxt = self._get_db_context() + + try: + transfer = db_api.get_transfer( + ctxt, self._transfer.id, include_task_info=True) + volumes_info = transfer.get("info", {}).get( + self._instance_name, {}).get('volumes_info', []) + except Exception as ex: + LOG.warn("Could not get volumes info for cleanup. Ex: %s", ex) + return + + for vol in volumes_info: + device = vol.get('volume_dev') + if device and os.path.exists(device): + test_utils.remove_scsi_debug_device() + def _execute_and_wait(self, transfer_id, timeout=300): """Trigger one execution of *transfer_id* and wait for completion.""" execution = self._client.transfer_executions.create( diff --git a/coriolis/tests/integration/test_provider/imp.py b/coriolis/tests/integration/test_provider/imp.py index b37d4c78..f677841b 100644 --- a/coriolis/tests/integration/test_provider/imp.py +++ b/coriolis/tests/integration/test_provider/imp.py @@ -55,7 +55,6 @@ class TestImportProvider( ``target_environment`` (per-transfer destination settings) has the form:: { - "devices": ["/dev/sdY", ...], # pre-allocated destination devs } """ @@ -86,12 +85,7 @@ def validate_connection(self, ctxt, connection_info): def get_target_environment_schema(self): return { "type": "object", - "properties": { - "devices": { - "type": "array", - "items": {"type": "string"}, - }, - }, + "properties": {}, "required": [], } @@ -131,25 +125,14 @@ def check_update_destination_environment_params( def deploy_replica_disks( self, ctxt, connection_info, target_environment, instance_name, export_info, volumes_info): - """Map each source disk in export_info to a destination device. - - Returns a volumes_info list where each entry has ``disk_id`` (from - the source) and ``volume_dev`` (the destination block device path). - """ - dest_devices = list(target_environment["devices"]) + """Allocate disks and return volumes_info.""" src_disks = export_info.get("devices", {}).get("disks", []) - if len(src_disks) > len(dest_devices): - raise ValueError( - "Not enough destination devices (%d) for %d source disks" - % (len(dest_devices), len(src_disks)) - ) - result = [] for i, disk in enumerate(src_disks): result.append({ "disk_id": disk["id"], - "volume_dev": dest_devices[i], + "volume_dev": test_utils.add_scsi_debug_device(), }) return result @@ -221,7 +204,8 @@ def delete_replica_target_resources( def delete_replica_disks( self, ctxt, connection_info, target_environment, volumes_info): - # scsi_debug devices are managed externally; nothing to delete here. + for _ in (volumes_info or []): + test_utils.remove_scsi_debug_device() return volumes_info def create_replica_disk_snapshots( @@ -240,7 +224,14 @@ def restore_replica_disk_snapshots( def deploy_replica_instance( self, ctxt, connection_info, target_environment, instance_name, export_info, volumes_info, clone_disks): - return {"instance_deployment_info": {}} + devices = [ + vol["volume_dev"] for vol in volumes_info if vol.get("volume_dev") + ] + return { + "instance_deployment_info": { + "devices": devices, + }, + } def finalize_replica_instance_deployment( self, ctxt, connection_info, target_environment, @@ -277,7 +268,7 @@ def get_os_morphing_tools(self, os_type, osmorphing_info): def deploy_os_morphing_resources( self, ctxt, connection_info, target_environment, instance_deployment_info): - devices = list(target_environment.get("devices", [])) + devices = list(instance_deployment_info.get("devices", [])) # lsblk inside the container sees all the host block devices because # Docker containers share the host kernel's sysfs (/sys/block/). From ab7ec169497d7ef529a4ba4386ca2820507598ea Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Thu, 28 May 2026 08:15:22 +0000 Subject: [PATCH 2/2] integration: Add ImportProviderHarness abstraction Introduces an `ImportProviderHarness` ABC that encapsulates all import-provider-specific logic that cannot be expressed through static configuration: - `check_prerequisites()`: gate the test suite on required infrastructure (e.g. the data-minion Docker image). - `prepare_transfer()`: allocate per-transfer destination resources (cloud volume, etc.) and return the matching destination_environment dict. - `get_connection_info()`: for endpoint configuration. - `check_no_resources_leaked()`: resource leak checks. - `cleanup_deployed_instance(instance_name)`: removes VMs from completed deployments. Adds an implementation for it, TestImportHarness (test_provider/harness.py). --- coriolis/tests/integration/base.py | 29 +++--- .../deployments/test_osmorphing.py | 10 ++ coriolis/tests/integration/harness.py | 34 +++++-- .../tests/integration/provider_harness.py | 81 +++++++++++++++ .../integration/test_provider/harness.py | 99 +++++++++++++++++++ .../integration/transfers/test_transfer.py | 27 ++--- 6 files changed, 246 insertions(+), 34 deletions(-) create mode 100644 coriolis/tests/integration/provider_harness.py create mode 100644 coriolis/tests/integration/test_provider/harness.py diff --git a/coriolis/tests/integration/base.py b/coriolis/tests/integration/base.py index be340464..022101ed 100644 --- a/coriolis/tests/integration/base.py +++ b/coriolis/tests/integration/base.py @@ -13,7 +13,6 @@ """ import os -import subprocess import time import unittest from unittest import mock @@ -239,17 +238,7 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase): @classmethod def setUpClass(cls): - result = subprocess.run( - ["docker", "image", "inspect", test_utils.DATA_MINION_IMAGE], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - if result.returncode != 0: - raise unittest.SkipTest( - "Docker image not found; build it with: " - "docker build -t %s " - "coriolis/tests/integration/dockerfiles/data-minion/" - % test_utils.DATA_MINION_IMAGE) + harness._IntegrationHarness.get().imp_harness.check_prerequisites() super().setUpClass() @@ -300,12 +289,14 @@ def setUp(self): # Use basename as instance name; real VM names do not contain slashes, # and some providers use the name as is in resource indentifiers. self._instance_name = os.path.basename(self._src_device) + dst_environment = self._harness.imp_harness.prepare_transfer(self) self._transfer = self._create_transfer( self._src_endpoint.id, self._dst_endpoint.id, instances=[self._instance_name], destination_minion_pool_id=self._pool_id, source_environment={"block_device_path": self._src_device}, + destination_environment=dst_environment, ) # Safety-net cleanup for destination devices allocated by the provider. # Must be registered after the transfer, so it runs (LIFO) before the @@ -465,6 +456,9 @@ def _cleanup_deployment(self, deployment_id): that occurs when a deployment is still in-flight at cleanup time, which can happen with slow providers when a test fails or times out before the deployment completes. + + Calls ``imp_harness.cleanup_deployed_instance`` for every deployment + instance, so that finalized VMs at the destination are destroyed. """ ctxt = self._get_db_context() deployment = db_api.get_deployment(ctxt, deployment_id) @@ -473,6 +467,8 @@ def _cleanup_deployment(self, deployment_id): "Deployment '%s' not found. Skip cleanup.", deployment_id) return + instances = list(deployment.instances or []) + if deployment.last_execution_status in ( constants.ACTIVE_EXECUTION_STATUSES): self._client.deployments.cancel(deployment_id) @@ -480,6 +476,15 @@ def _cleanup_deployment(self, deployment_id): self._client.deployments.delete(deployment_id) + for instance_name in instances: + try: + self._harness.imp_harness.cleanup_deployed_instance( + instance_name) + except Exception as ex: + LOG.warning( + "Could not clean up deployed instance '%s': %s", + instance_name, ex) + def wait_for_deployment(self, deployment_id, timeout=300, desired_statuses=None): """Block until *deployment_id* reaches any terminal state. diff --git a/coriolis/tests/integration/deployments/test_osmorphing.py b/coriolis/tests/integration/deployments/test_osmorphing.py index ab49d341..3a0894a3 100644 --- a/coriolis/tests/integration/deployments/test_osmorphing.py +++ b/coriolis/tests/integration/deployments/test_osmorphing.py @@ -7,9 +7,11 @@ installation in the target OS. """ +import unittest import uuid from coriolis.tests.integration import base as integration_base +from coriolis.tests.integration import harness as integration_harness from coriolis.tests.integration import utils as test_utils @@ -19,6 +21,14 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase): # any new packages to be added during OS morphing. _SCSI_DEBUG_SIZE_MB = 256 + @classmethod + def setUpClass(cls): + harness = integration_harness._IntegrationHarness.get() + if not harness.imp_harness.are_disks_local: + raise unittest.SkipTest( + "OS morphing tests require local disk access") + super().setUpClass() + def setUp(self): super().setUp() test_utils.write_os_image_to_disk(self._src_device, "ubuntu:24.04") diff --git a/coriolis/tests/integration/harness.py b/coriolis/tests/integration/harness.py index b96d4a58..665cf0fc 100644 --- a/coriolis/tests/integration/harness.py +++ b/coriolis/tests/integration/harness.py @@ -24,6 +24,7 @@ import shutil import socket import subprocess +import sys import tempfile import uuid @@ -55,6 +56,7 @@ from coriolis import service from coriolis.taskflow import runner as taskflow_runner from coriolis.tasks import factory as task_runners_factory +from coriolis.tests.integration.test_provider import harness as test_harness from coriolis.tests.integration import utils as test_utils from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server from coriolis import utils as coriolis_utils @@ -68,9 +70,6 @@ _TEST_EXPORT_PROVIDER = ( "coriolis.tests.integration.test_provider.exp.TestExportProvider" ) -_TEST_IMPORT_PROVIDER = ( - "coriolis.tests.integration.test_provider.imp.TestImportProvider" -) # Fixed project used for all test requests. _TEST_PROJECT_ID = 'integration-project' @@ -276,8 +275,13 @@ def __init__(self): cfg.CONF([], project='coriolis', version='1.0.0', default_config_files=[], default_config_dirs=[]) cfg.CONF.set_override('messaging_transport_url', 'fake://') + + imp_harness_cls = test_harness.TestImportHarness + imp_cls = imp_harness_cls.provider_class + imp_provider = "%s.%s" % (imp_cls.__module__, imp_cls.__qualname__) + cfg.CONF.set_override( - 'providers', [_TEST_EXPORT_PROVIDER, _TEST_IMPORT_PROVIDER]) + 'providers', [_TEST_EXPORT_PROVIDER, imp_provider]) db_url = ('mysql+pymysql://%(user)s:%(password)s' '@localhost:13306/%(database)s') % { "user": self._mysql_username, @@ -302,19 +306,18 @@ def __init__(self): # Policy enforcer: reset so it re-reads the new CONF (no policy file). policy_module.reset() + # Init exporter. self.exp_provider_class = _get_provider(_TEST_EXPORT_PROVIDER) self.exp_provider_platform = self.exp_provider_class.platform self.exp_conn_info = { "pkey_path": self.ssh_key_path, "role": "source", } - - self.imp_provider_class = _get_provider(_TEST_IMPORT_PROVIDER) + # Init importer. + self.imp_provider_class = imp_cls self.imp_provider_platform = self.imp_provider_class.platform - self.imp_conn_info = { - "pkey_path": self.ssh_key_path, - "role": "destination", - } + self.imp_harness = imp_harness_cls(self) + self.imp_conn_info = self.imp_harness.get_connection_info() self._wsgi_server = None self._wsgi_server_thread = None @@ -332,7 +335,9 @@ def __init__(self): sqlalchemy_api._facade = None rpc_module._TRANSPORT = None + atexit.register(self._check_harness_errors) atexit.register(self._teardown) + atexit.register(self.imp_harness.teardown) self._start_db_container() @@ -472,6 +477,15 @@ def _start_coriolis_services(self): daemon=True, ) + def _check_harness_errors(self): + errors = self.imp_harness._deferred_errors + if not errors: + return + + for err in errors: + LOG.error("Harness teardown failure: %s", err) + sys.exit(1) + def _teardown(self): LOG.info("Teardown initiated.") diff --git a/coriolis/tests/integration/provider_harness.py b/coriolis/tests/integration/provider_harness.py new file mode 100644 index 00000000..b5e6d4be --- /dev/null +++ b/coriolis/tests/integration/provider_harness.py @@ -0,0 +1,81 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +""" +Abstract base class for provider harnesses. + +A harness encapsulates all provider-specific logic that cannot be expressed +through static YAML configuration: per-transfer resource allocation, +end-of-session leak assertions, VM finalization, etc. +""" + +import abc + +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + + +class ImportProviderHarness(abc.ABC): + # Subclasses must set this to the import provider class they wrap. + # The framework derives the dotted provider path from it automatically. + provider_class = None + + def __init__(self, harness): + self._deferred_errors = [] + + def _record_error(self, exc): + """Stash *exc* for deferred reporting after all teardown completes.""" + self._deferred_errors.append(exc) + + def teardown(self): + """One-time teardown called at atexit.""" + + def check_prerequisites(self): + """Raise ``unittest.SkipTest`` if required infrastructure is absent.""" + + @abc.abstractmethod + def get_connection_info(self) -> dict: + """Return ``connection_info`` for the destination endpoint.""" + + def prepare_transfer(self, test_instance) -> dict: + """Allocate per-transfer destination resources. + + Returns a dict merged into ``destination_environment`` for this + transfer. Must register ``addCleanup()`` on *test_instance* for + every resource created, so it is released even on test failure. + """ + return {} + + @property + def are_disks_local(self) -> bool: + """True if destination disks are local block devices on the test host. + + When True, tests may directly read / mount the destination device to + verify contents. + """ + return False + + def check_no_resources_leaked(self): + """Check for resources left behind by this test session. + + Subclasses record each leak via ``_record_error()``, and log errors + rather than raising, so that all checks and teardowns run regardless of + earlier failures. + """ + + def cleanup_deployed_instance(self, instance_name: str): + """Destroy the VM created at the destination by a completed deployment. + + Called during integration test cleanup after each deployment test, so + that finalized VMs do not accumulate across runs and cause failures in + later tests (e.g. name collisions, resource exhaustion). + + The default is a no-op; harnesses that create real VMs must override + this method. Implementations should be idempotent: if the VM is + already gone the method must return without raising. + """ + + def get_minion_pool_config(self) -> dict | None: + """Return minion-pool creation kwargs, or None.""" + return None diff --git a/coriolis/tests/integration/test_provider/harness.py b/coriolis/tests/integration/test_provider/harness.py new file mode 100644 index 00000000..fb545480 --- /dev/null +++ b/coriolis/tests/integration/test_provider/harness.py @@ -0,0 +1,99 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +""" +Import harness for the test Docker provider. +""" + +import shutil +import subprocess +import tempfile +import unittest + +from oslo_log import log as logging + +from coriolis.tests.integration import provider_harness +from coriolis.tests.integration.test_provider import imp +from coriolis.tests.integration import utils as test_utils + +LOG = logging.getLogger(__name__) + +# Name prefixes used by TestImportProvider._create_minion callers. +_CONTAINER_PREFIXES = ( + "coriolis-writer-", + "coriolis-osmorphing-", + "coriolis-pool-minion-", +) + + +class TestImportHarness(provider_harness.ImportProviderHarness): + """Import harness for the test Docker provider.""" + + provider_class = imp.TestImportProvider + + def __init__(self, harness): + super().__init__(harness) + self._initial_containers = self._list_test_containers() + self._workdir = tempfile.mkdtemp(prefix="coriolis-test-imp-") + self._ssh_key_path = "%s/id_rsa" % self._workdir + + subprocess.run( + ["ssh-keygen", "-t", "rsa", "-b", "2048", + "-f", self._ssh_key_path, "-N", ""], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + def teardown(self): + self.check_no_resources_leaked() + + if self._workdir: + shutil.rmtree(self._workdir, ignore_errors=True) + + for name in self._list_test_containers() - self._initial_containers: + test_utils.remove_container(name) + + def check_prerequisites(self): + result = subprocess.run( + ["docker", "image", "inspect", test_utils.DATA_MINION_IMAGE], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + if result.returncode != 0: + raise unittest.SkipTest( + "Docker image '%s' not found; build it with: " + "docker build -t %s " + "coriolis/tests/integration/dockerfiles/data-minion/" + % (test_utils.DATA_MINION_IMAGE, test_utils.DATA_MINION_IMAGE) + ) + + def get_connection_info(self) -> dict: + return { + "pkey_path": self._ssh_key_path, + "role": "destination", + } + + def check_no_resources_leaked(self): + leaked = self._list_test_containers() - self._initial_containers + if leaked: + msg = "Docker containers leaked by integration tests: %s" % leaked + LOG.error(msg) + self._record_error(AssertionError(msg)) + + @property + def are_disks_local(self) -> bool: + return True + + def _list_test_containers(self) -> set: + result = subprocess.run( + ["docker", "ps", "-a", "--format", "{{.Names}}"], + capture_output=True, + text=True, + ) + + return { + name for name in result.stdout.splitlines() + if any(name.startswith(p) for p in _CONTAINER_PREFIXES) + } diff --git a/coriolis/tests/integration/transfers/test_transfer.py b/coriolis/tests/integration/transfers/test_transfer.py index 23e8625b..5f00fe38 100644 --- a/coriolis/tests/integration/transfers/test_transfer.py +++ b/coriolis/tests/integration/transfers/test_transfer.py @@ -50,10 +50,11 @@ def test_incremental_replica_transfer(self): # First run: full transfer self._execute_and_wait(self._transfer.id) - self.assertTrue( - test_utils.devices_match(self._src_device, self._dst_device), - "Devices do not match after initial full transfer", - ) + if self._harness.imp_harness.are_disks_local: + self.assertTrue( + test_utils.devices_match(self._src_device, self._dst_device), + "Devices do not match after initial full transfer", + ) # Mutate source: write a different pattern at the second chunk test_utils.write_bytes_at_offset( @@ -61,18 +62,20 @@ def test_incremental_replica_transfer(self): offset=4096, data=b"\xff\xfe\xfd\xfc" * 1024, ) - self.assertFalse( - test_utils.devices_match(self._src_device, self._dst_device), - "Devices should differ after mutating the source", - ) + if self._harness.imp_harness.are_disks_local: + self.assertFalse( + test_utils.devices_match(self._src_device, self._dst_device), + "Devices should differ after mutating the source", + ) # Second run: incremental self._execute_and_wait(self._transfer.id) - self.assertTrue( - test_utils.devices_match(self._src_device, self._dst_device), - "Destination does not match source after incremental transfer", - ) + if self._harness.imp_harness.are_disks_local: + self.assertTrue( + test_utils.devices_match(self._src_device, self._dst_device), + "Destination does not match source after incremental transfer", + ) class MinionPoolTransferTest(