Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions coriolis/tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""

import os
import subprocess
import time
import unittest
from unittest import mock
Expand Down Expand Up @@ -239,17 +238,7 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):

@classmethod
def setUpClass(cls):
result = subprocess.run(
["docker", "image", "inspect", test_utils.DATA_MINION_IMAGE],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
raise unittest.SkipTest(
"Docker image not found; build it with: "
"docker build -t %s "
"coriolis/tests/integration/dockerfiles/data-minion/"
% test_utils.DATA_MINION_IMAGE)
harness._IntegrationHarness.get().imp_harness.check_prerequisites()

super().setUpClass()

Expand Down Expand Up @@ -289,8 +278,6 @@ def setUp(self):

self._src_device = test_utils.add_scsi_debug_device()
self.addCleanup(test_utils.remove_scsi_debug_device)
self._dst_device = test_utils.add_scsi_debug_device()
self.addCleanup(test_utils.remove_scsi_debug_device)

# Write a test pattern on the src device.
# Incremental transfer tests update the second chunk (offset=4096).
Expand All @@ -302,14 +289,19 @@ def setUp(self):
# Use basename as instance name; real VM names do not contain slashes,
# and some providers use the name as is in resource indentifiers.
self._instance_name = os.path.basename(self._src_device)
dst_environment = self._harness.imp_harness.prepare_transfer(self)
self._transfer = self._create_transfer(
self._src_endpoint.id,
self._dst_endpoint.id,
instances=[self._instance_name],
destination_minion_pool_id=self._pool_id,
source_environment={"block_device_path": self._src_device},
destination_environment={"devices": [self._dst_device]},
destination_environment=dst_environment,
)
# Safety-net cleanup for destination devices allocated by the provider.
# Must be registered after the transfer, so it runs (LIFO) before the
# transfer delete, while the volumes_info is still in the DB.
self.addCleanup(self._cleanup_provider_dst_devices)

# mock a few commands that are going to be ran through ssh; they won't
# pass anyway.
Expand All @@ -325,6 +317,44 @@ def setUp(self):
mocker.start()
self.addCleanup(mocker.stop)

@property
def _dst_device(self):
"""First destination dev path from the transfer's volumes_info."""
ctxt = self._get_db_context()

transfer = db_api.get_transfer(
ctxt, self._transfer.id, include_task_info=True)
info = transfer.get("info", {}).get(self._instance_name, {})
for vol in info.get("volumes_info", []):
if vol.get("volume_dev"):
return vol["volume_dev"]

return None

def _cleanup_provider_dst_devices(self):
"""Remove any devices the provider allocated for this test.

Acts as a safety net for cases where delete_replica_disks was not
called (e.g. the execution never reached that task, or the test did
not explicitly trigger a delete-disks execution). Uses
os.path.exists so it is a no-op when the provider already cleaned up.
"""
ctxt = self._get_db_context()

try:
transfer = db_api.get_transfer(
ctxt, self._transfer.id, include_task_info=True)
volumes_info = transfer.get("info", {}).get(
self._instance_name, {}).get('volumes_info', [])
except Exception as ex:
LOG.warn("Could not get volumes info for cleanup. Ex: %s", ex)
return

for vol in volumes_info:
device = vol.get('volume_dev')
if device and os.path.exists(device):
test_utils.remove_scsi_debug_device()

def _execute_and_wait(self, transfer_id, timeout=300):
"""Trigger one execution of *transfer_id* and wait for completion."""
execution = self._client.transfer_executions.create(
Expand Down Expand Up @@ -426,6 +456,9 @@ def _cleanup_deployment(self, deployment_id):
that occurs when a deployment is still in-flight at cleanup time, which
can happen with slow providers when a test fails or times out before
the deployment completes.

Calls ``imp_harness.cleanup_deployed_instance`` for every deployment
instance, so that finalized VMs at the destination are destroyed.
"""
ctxt = self._get_db_context()
deployment = db_api.get_deployment(ctxt, deployment_id)
Expand All @@ -434,13 +467,24 @@ def _cleanup_deployment(self, deployment_id):
"Deployment '%s' not found. Skip cleanup.", deployment_id)
return

instances = list(deployment.instances or [])

if deployment.last_execution_status in (
constants.ACTIVE_EXECUTION_STATUSES):
self._client.deployments.cancel(deployment_id)
self.wait_for_deployment(deployment_id, timeout=60)

self._client.deployments.delete(deployment_id)

for instance_name in instances:
try:
self._harness.imp_harness.cleanup_deployed_instance(
instance_name)
except Exception as ex:
LOG.warning(
"Could not clean up deployed instance '%s': %s",
instance_name, ex)

def wait_for_deployment(self, deployment_id, timeout=300,
desired_statuses=None):
"""Block until *deployment_id* reaches any terminal state.
Expand Down
10 changes: 10 additions & 0 deletions coriolis/tests/integration/deployments/test_osmorphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
installation in the target OS.
"""

import unittest
import uuid

from coriolis.tests.integration import base as integration_base
from coriolis.tests.integration import harness as integration_harness
from coriolis.tests.integration import utils as test_utils


Expand All @@ -19,6 +21,14 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
# any new packages to be added during OS morphing.
_SCSI_DEBUG_SIZE_MB = 256

@classmethod
def setUpClass(cls):
harness = integration_harness._IntegrationHarness.get()
if not harness.imp_harness.are_disks_local:
raise unittest.SkipTest(
"OS morphing tests require local disk access")
super().setUpClass()

def setUp(self):
super().setUp()
test_utils.write_os_image_to_disk(self._src_device, "ubuntu:24.04")
Expand Down
34 changes: 24 additions & 10 deletions coriolis/tests/integration/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import shutil
import socket
import subprocess
import sys
import tempfile
import uuid

Expand Down Expand Up @@ -55,6 +56,7 @@
from coriolis import service
from coriolis.taskflow import runner as taskflow_runner
from coriolis.tasks import factory as task_runners_factory
from coriolis.tests.integration.test_provider import harness as test_harness
from coriolis.tests.integration import utils as test_utils
from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server
from coriolis import utils as coriolis_utils
Expand All @@ -68,9 +70,6 @@
_TEST_EXPORT_PROVIDER = (
"coriolis.tests.integration.test_provider.exp.TestExportProvider"
)
_TEST_IMPORT_PROVIDER = (
"coriolis.tests.integration.test_provider.imp.TestImportProvider"
)

# Fixed project used for all test requests.
_TEST_PROJECT_ID = 'integration-project'
Expand Down Expand Up @@ -276,8 +275,13 @@ def __init__(self):
cfg.CONF([], project='coriolis', version='1.0.0',
default_config_files=[], default_config_dirs=[])
cfg.CONF.set_override('messaging_transport_url', 'fake://')

imp_harness_cls = test_harness.TestImportHarness
imp_cls = imp_harness_cls.provider_class
imp_provider = "%s.%s" % (imp_cls.__module__, imp_cls.__qualname__)

cfg.CONF.set_override(
'providers', [_TEST_EXPORT_PROVIDER, _TEST_IMPORT_PROVIDER])
'providers', [_TEST_EXPORT_PROVIDER, imp_provider])
db_url = ('mysql+pymysql://%(user)s:%(password)s'
'@localhost:13306/%(database)s') % {
"user": self._mysql_username,
Expand All @@ -302,19 +306,18 @@ def __init__(self):
# Policy enforcer: reset so it re-reads the new CONF (no policy file).
policy_module.reset()

# Init exporter.
self.exp_provider_class = _get_provider(_TEST_EXPORT_PROVIDER)
self.exp_provider_platform = self.exp_provider_class.platform
self.exp_conn_info = {
"pkey_path": self.ssh_key_path,
"role": "source",
}

self.imp_provider_class = _get_provider(_TEST_IMPORT_PROVIDER)
# Init importer.
self.imp_provider_class = imp_cls
self.imp_provider_platform = self.imp_provider_class.platform
self.imp_conn_info = {
"pkey_path": self.ssh_key_path,
"role": "destination",
}
self.imp_harness = imp_harness_cls(self)
self.imp_conn_info = self.imp_harness.get_connection_info()

self._wsgi_server = None
self._wsgi_server_thread = None
Expand All @@ -332,7 +335,9 @@ def __init__(self):
sqlalchemy_api._facade = None
rpc_module._TRANSPORT = None

atexit.register(self._check_harness_errors)
atexit.register(self._teardown)
atexit.register(self.imp_harness.teardown)

self._start_db_container()

Expand Down Expand Up @@ -472,6 +477,15 @@ def _start_coriolis_services(self):
daemon=True,
)

def _check_harness_errors(self):
errors = self.imp_harness._deferred_errors
if not errors:
return

for err in errors:
LOG.error("Harness teardown failure: %s", err)
sys.exit(1)

def _teardown(self):
LOG.info("Teardown initiated.")

Expand Down
81 changes: 81 additions & 0 deletions coriolis/tests/integration/provider_harness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2026 Cloudbase Solutions Srl
# All Rights Reserved.

"""
Abstract base class for provider harnesses.

A harness encapsulates all provider-specific logic that cannot be expressed
through static YAML configuration: per-transfer resource allocation,
end-of-session leak assertions, VM finalization, etc.
"""

import abc

from oslo_log import log as logging

LOG = logging.getLogger(__name__)


class ImportProviderHarness(abc.ABC):
# Subclasses must set this to the import provider class they wrap.
# The framework derives the dotted provider path from it automatically.
provider_class = None

def __init__(self, harness):
self._deferred_errors = []

def _record_error(self, exc):
"""Stash *exc* for deferred reporting after all teardown completes."""
self._deferred_errors.append(exc)

def teardown(self):
"""One-time teardown called at atexit."""

def check_prerequisites(self):
"""Raise ``unittest.SkipTest`` if required infrastructure is absent."""

@abc.abstractmethod
def get_connection_info(self) -> dict:
"""Return ``connection_info`` for the destination endpoint."""

def prepare_transfer(self, test_instance) -> dict:
"""Allocate per-transfer destination resources.

Returns a dict merged into ``destination_environment`` for this
transfer. Must register ``addCleanup()`` on *test_instance* for
every resource created, so it is released even on test failure.
"""
return {}

@property
def are_disks_local(self) -> bool:
"""True if destination disks are local block devices on the test host.
Copy link
Copy Markdown
Member

@petrutlucian94 petrutlucian94 May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this design. It doesn't really matter if the disks are local, provider agnostic tests shouldn't directly touch resources created by the providers, otherwise they're no longer provider agnostic.

We should clearly separate provider agnostic tests from those that rely on the "test provider" and interact with the created resources (e.g. scsi debug devices).

Without a proper design and clear limitations, our tests are going to become a mess and we'll refactor them every other week. We don't want that.

For what is worth, Openstack's Tempest tests are completely driver agnostic. They do have "functional tests" that call directly into the provider, but hose are completely separate.

https://github.com/openstack/nova/tree/master/nova/tests/functional/libvirt


When True, tests may directly read / mount the destination device to
verify contents.
"""
return False

def check_no_resources_leaked(self):
"""Check for resources left behind by this test session.

Subclasses record each leak via ``_record_error()``, and log errors
rather than raising, so that all checks and teardowns run regardless of
earlier failures.
"""

def cleanup_deployed_instance(self, instance_name: str):
"""Destroy the VM created at the destination by a completed deployment.

Called during integration test cleanup after each deployment test, so
that finalized VMs do not accumulate across runs and cause failures in
later tests (e.g. name collisions, resource exhaustion).

The default is a no-op; harnesses that create real VMs must override
this method. Implementations should be idempotent: if the VM is
already gone the method must return without raising.
"""

def get_minion_pool_config(self) -> dict | None:
"""Return minion-pool creation kwargs, or None."""
return None
Loading
Loading