From a4a64e00aca20706dd1a5a0935486829f3a26e10 Mon Sep 17 00:00:00 2001 From: Fabian Fulga Date: Wed, 25 Mar 2026 16:59:27 +0200 Subject: [PATCH] Add clustered migration sync for shared disks (SYNCING barrier) Introduce TASK_STATUS_SYNCING and TASK_TYPES_REQUIRING_CLUSTER_SYNC (DEPLOY_TRANSFER_DISKS, SHUTDOWN_INSTANCE) so multi-instance transfers with base_transfer_action.clustered=True wait for all peer tasks before marking COMPLETED and advancing dependents. - Add clustered boolean on base_transfer_action (DB migration 024) - Plumb clustered through create_instances_transfer, REST transfers API, deployment creation (inherits from parent transfer) - On task_completed: set SYNCING when barrier applies; when all peers are SYNCING, finalize (for deploy: dedupe volumes_info by disk_id, leader gets replicate_disk_data=True, followers False) - ReplicateDisksTask: skip provider replicate_disks for volumes with replicate_disk_data=False and merge back in export disk order - On set_task_error: abort peer tasks stuck in SYNCING for the same type Volumes schema already allows extra properties; replicate_disk_data is consumed by replication only (default True preserves behavior). --- .../openstack-transfer-create-resp.json | 3 +- .../transfer/openstack-transfer-get-resp.json | 1 + .../transfer/transfer-list-resp.json | 3 +- .../transfer/transfer-update-resp.json | 3 +- coriolis/api-refs/source/parameters.yaml | 8 + coriolis/api-refs/source/transfer.inc | 3 + coriolis/conductor/rpc/client.py | 6 +- coriolis/conductor/rpc/server.py | 285 +++++++++++++++++- coriolis/constants.py | 7 + ...4_add_clustered_to_base_transfer_action.py | 20 ++ coriolis/db/sqlalchemy/models.py | 4 + coriolis/tasks/replica_tasks.py | 28 +- coriolis/tests/conductor/rpc/test_client.py | 1 + coriolis/tests/db/sqlalchemy/test_models.py | 1 + coriolis/tests/tasks/test_replica_tasks.py | 2 + 15 files changed, 362 insertions(+), 13 deletions(-) create mode 100644 coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json index 5baf549a..96ccfd03 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json @@ -49,6 +49,7 @@ } }, "executions": [], - "scenario": "replica" + "scenario": "replica", + "clustered": false } } diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json index 55749ec6..5e48ef5b 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json @@ -42,6 +42,7 @@ "origin_minion_pool_id": null, "destination_minion_pool_id": null, "instance_osmorphing_minion_pool_mappings": {}, + "clustered": false, "executions": [ { "created_at": "2019-07-11T10:06:47.000000", diff --git a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json index 14c909b3..c94b6164 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json @@ -74,7 +74,8 @@ "instances": {} }, "id": "0460aa4d-6b16-4c98-bd56-27ee186e4a22", - "scenario": "replica" + "scenario": "replica", + "clustered": false } ] } diff --git a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json index 8bcdbbf0..8332a19f 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json @@ -133,7 +133,8 @@ "ubuntu-xenial": "echo 'anything you need'" } }, - "scenario": "replica" + "scenario": "replica", + "clustered": false } } } diff --git a/coriolis/api-refs/source/parameters.yaml b/coriolis/api-refs/source/parameters.yaml index 9fef155a..933d1349 100644 --- a/coriolis/api-refs/source/parameters.yaml +++ b/coriolis/api-refs/source/parameters.yaml @@ -123,6 +123,14 @@ connection_info_schema: in: body type: object required: false +clustered: + description: | + Present only in API responses (read-only). ``true`` when the transfer + includes more than one instance, ``false`` for a single instance. Not + stored in the database; derived from ``instances``. + in: body + type: boolean + required: false deployment_cancel: description: | Object containing information about the type of deployment cancellation. diff --git a/coriolis/api-refs/source/transfer.inc b/coriolis/api-refs/source/transfer.inc index fa17913b..e9c86d21 100644 --- a/coriolis/api-refs/source/transfer.inc +++ b/coriolis/api-refs/source/transfer.inc @@ -51,6 +51,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer List Response** @@ -111,6 +112,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Show Response** @@ -183,6 +185,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Create Response** diff --git a/coriolis/conductor/rpc/client.py b/coriolis/conductor/rpc/client.py index dabf2bbe..e579f7e4 100644 --- a/coriolis/conductor/rpc/client.py +++ b/coriolis/conductor/rpc/client.py @@ -169,7 +169,8 @@ def create_instances_transfer(self, ctxt, source_environment, destination_environment, instances, network_map, storage_mappings, notes=None, user_scripts=None, - clone_disks=True, skip_os_morphing=False): + clone_disks=True, skip_os_morphing=False, + clustered=None): return self._call( ctxt, 'create_instances_transfer', transfer_scenario=transfer_scenario, @@ -187,7 +188,8 @@ def create_instances_transfer(self, ctxt, source_environment=source_environment, user_scripts=user_scripts, clone_disks=clone_disks, - skip_os_morphing=skip_os_morphing) + skip_os_morphing=skip_os_morphing, + clustered=clustered) def get_transfers(self, ctxt, include_tasks_executions=False, include_task_info=False): diff --git a/coriolis/conductor/rpc/server.py b/coriolis/conductor/rpc/server.py index fa70e524..8d848457 100644 --- a/coriolis/conductor/rpc/server.py +++ b/coriolis/conductor/rpc/server.py @@ -1293,7 +1293,10 @@ def create_instances_transfer(self, ctxt, transfer_scenario, destination_environment, instances, network_map, storage_mappings, notes=None, user_scripts=None, clone_disks=True, - skip_os_morphing=False): + skip_os_morphing=False, clustered=None): + if clustered is None: + clustered = len(instances) > 1 + clustered = bool(clustered) supported_scenarios = [ constants.TRANSFER_SCENARIO_REPLICA, constants.TRANSFER_SCENARIO_LIVE_MIGRATION] @@ -1330,6 +1333,7 @@ def create_instances_transfer(self, ctxt, transfer_scenario, transfer.user_scripts = user_scripts or {} transfer.clone_disks = clone_disks transfer.skip_os_morphing = skip_os_morphing + transfer.clustered = clustered self._check_minion_pools_for_action(ctxt, transfer) @@ -1738,6 +1742,7 @@ def deploy_transfer_instances( deployment.user_scripts = user_scripts deployment.clone_disks = clone_disks deployment.skip_os_morphing = skip_os_morphing + deployment.clustered = bool(getattr(transfer, 'clustered', False)) deployment.deployer_id = wait_for_execution deployment.trust_id = trust_id deployment.last_execution_status = init_status @@ -2091,7 +2096,8 @@ def _cancel_tasks_execution( "of the parent tasks execution.")) elif task.status in ( constants.TASK_STATUS_PENDING, - constants.TASK_STATUS_STARTING): + constants.TASK_STATUS_STARTING, + constants.TASK_STATUS_SYNCING): # any PENDING/STARTING tasks means that they did not have a # host assigned to them yet, and presuming the host does not # start executing the task until it marks itself as the runner, @@ -2536,6 +2542,99 @@ def _advance_execution_state( started_tasks = [] + def _normalize_identity(raw_value): + if raw_value is None: + return None + normalized = str(raw_value).strip() + if not normalized: + return None + return normalized.lower() + + def _get_disk_identity(disk_info): + disk_info = disk_info or {} + for key in ("native_id", "uuid", "path", "name", "disk_id", "id"): + ident = _normalize_identity(disk_info.get(key)) + if ident: + if key in ("disk_id", "id") and ident.isdigit(): + continue + return ident + return None + + def _get_all_disk_identities_for_instance(instance_id): + task_info = action.info.get(instance_id, {}) + export_info = task_info.get("export_info", {}) + disks = export_info.get("devices", {}).get("disks", []) + identities = [] + for disk in disks: + ident = _get_disk_identity(disk) + if ident: + identities.append({ + "identity": ident, + "shareable": bool(disk.get("shareable"))}) + return identities + + def _get_shared_disk_identities(): + identity_count = {} + identity_marked_shareable = {} + for instance_id in action.instances: + for entry in _get_all_disk_identities_for_instance( + instance_id): + ident = entry["identity"] + identity_count[ident] = identity_count.get(ident, 0) + 1 + if entry["shareable"]: + identity_marked_shareable[ident] = True + + return { + ident for ident, count in identity_count.items() + if count > 1 or identity_marked_shareable.get(ident, False)} + + def _get_shared_disk_owner_by_identity(): + shared_identities = _get_shared_disk_identities() + owners = {} + for instance_id in action.instances: + for entry in _get_all_disk_identities_for_instance( + instance_id): + ident = entry["identity"] + if ident not in shared_identities: + continue + if ident not in owners: + owners[ident] = instance_id + return owners + + def _should_sync_replicate_task(task): + if task.task_type not in ( + constants.TASK_TYPES_REQUIRING_CLUSTER_SYNC): + return False + if not bool(getattr(action, "clustered", False)): + return False + + owners = _get_shared_disk_owner_by_identity() + if not owners: + return False + + instance_id = task.instance + instance_identities = [ + entry["identity"] + for entry in _get_all_disk_identities_for_instance(instance_id) + if entry["identity"] in owners] + if not instance_identities: + return False + + task_by_instance = { + t.instance: t for t in execution.tasks + if t.task_type in constants.TASK_TYPES_REQUIRING_CLUSTER_SYNC} + for ident in instance_identities: + owner = owners.get(ident) + if not owner or owner == instance_id: + continue + owner_task = task_by_instance.get(owner) + if ( + owner_task + and owner_task.status != + constants.TASK_STATUS_COMPLETED): + return True + return False + def _start_task(task): task_info = None if task.instance not in action.info: @@ -2601,6 +2700,15 @@ def _start_task(task): for task in sorted(tasks_to_process, key=lambda t: t.index): if task_statuses[task.id] == constants.TASK_STATUS_SCHEDULED: + if _should_sync_replicate_task(task): + LOG.info( + "Task '%s' for instance '%s' entering SYNCING while " + "waiting for shared-disk owner task to complete.", + task.id, task.instance) + db_api.set_task_status( + ctxt, task.id, constants.TASK_STATUS_SYNCING) + task_statuses[task.id] = constants.TASK_STATUS_SYNCING + continue # immediately start depency-less tasks (on-error or otherwise) if not task_deps[task.id]: @@ -2760,6 +2868,174 @@ def _start_task(task): return started_tasks + def _release_cluster_synced_replicate_tasks( + self, ctxt, execution, action_id, completed_task): + """Unblocks SYNCING replicate tasks after owner completion. + + This is a task-level barrier used for clustered transfers with + shared disks. Once the owner replicate task completes, any + waiting (SYNCING) replicate tasks are moved back to SCHEDULED + so they can continue their normal flow. + """ + if completed_task.task_type not in ( + constants.TASK_TYPES_REQUIRING_CLUSTER_SYNC): + return [] + action = db_api.get_action(ctxt, action_id, include_task_info=True) + if not bool(getattr(action, "clustered", False)): + return [] + + def _normalize_identity(raw_value): + if raw_value is None: + return None + normalized = str(raw_value).strip() + if not normalized: + return None + return normalized.lower() + + def _get_disk_identity(disk_info): + disk_info = disk_info or {} + for key in ("native_id", "uuid", "path", "name", "disk_id", "id"): + ident = _normalize_identity(disk_info.get(key)) + if ident: + if key in ("disk_id", "id") and ident.isdigit(): + continue + return ident + return None + + def _get_instance_export_disks(instance_id): + info = action.info.get(instance_id, {}) + return ( + info.get("export_info", {}) + .get("devices", {}) + .get("disks", []) + ) + + def _get_instance_disk_identity_to_disk_id(instance_id): + mapping = {} + for disk in _get_instance_export_disks(instance_id): + ident = _get_disk_identity(disk) + disk_id = disk.get("id") + if ident and disk_id and ident not in mapping: + mapping[ident] = disk_id + return mapping + + def _get_shared_identities(): + identity_counts = {} + explicitly_shareable = {} + for instance_id in action.instances: + for disk in _get_instance_export_disks(instance_id): + ident = _get_disk_identity(disk) + if not ident: + continue + identity_counts[ident] = identity_counts.get(ident, 0) + 1 + if disk.get("shareable"): + explicitly_shareable[ident] = True + return { + ident for ident, count in identity_counts.items() + if count > 1 or explicitly_shareable.get(ident, False)} + + shared_identities = _get_shared_identities() + if not shared_identities: + return [] + + owner_instance = completed_task.instance + owner_identity_to_disk_id = _get_instance_disk_identity_to_disk_id( + owner_instance) + owner_disk_id_to_volume = { + vol.get("disk_id"): copy.deepcopy(vol) + for vol in action.info.get(owner_instance, {}).get( + "volumes_info", []) + if vol.get("disk_id")} + owner_updated_disk_ids = set() + + released_instances = [] + for task in execution.tasks: + if task.task_type not in ( + constants.TASK_TYPES_REQUIRING_CLUSTER_SYNC): + continue + if task.status != constants.TASK_STATUS_SYNCING: + continue + + waiter_identity_to_disk_id = ( + _get_instance_disk_identity_to_disk_id(task.instance) + ) + waiter_info = copy.deepcopy(action.info.get(task.instance, {})) + waiter_volumes = waiter_info.get("volumes_info", []) or [] + waiter_disk_id_to_volume = { + vol.get("disk_id"): copy.deepcopy(vol) + for vol in waiter_volumes if vol.get("disk_id")} + + inherited_count = 0 + for identity in shared_identities: + owner_disk_id = owner_identity_to_disk_id.get(identity) + waiter_disk_id = waiter_identity_to_disk_id.get(identity) + owner_volume = owner_disk_id_to_volume.get(owner_disk_id) + if not owner_disk_id or not waiter_disk_id or not owner_volume: + continue + owner_volume[constants.VOLUME_INFO_REPLICATE_DISK_DATA] = False + if "shareable" not in owner_volume: + owner_volume["shareable"] = True + owner_updated_disk_ids.add(owner_disk_id) + inherited_volume = copy.deepcopy(owner_volume) + inherited_volume["disk_id"] = waiter_disk_id + inherited_volume[constants.VOLUME_INFO_REPLICATE_DISK_DATA] = ( + False) + waiter_disk_id_to_volume[waiter_disk_id] = inherited_volume + inherited_count += 1 + + if owner_updated_disk_ids: + owner_current_info = copy.deepcopy( + action.info.get(owner_instance, {})) + owner_current_volumes = ( + owner_current_info.get("volumes_info", []) or []) + owner_updated_volumes = [] + for owner_existing in owner_current_volumes: + owner_disk_id = owner_existing.get("disk_id") + if owner_disk_id in owner_updated_disk_ids: + owner_updated_volumes.append( + copy.deepcopy( + owner_disk_id_to_volume[owner_disk_id])) + else: + owner_updated_volumes.append(owner_existing) + db_api.update_transfer_action_info_for_instance( + ctxt, action_id, owner_instance, + {"volumes_info": owner_updated_volumes}) + + if inherited_count: + merged_waiter_volumes = list(waiter_disk_id_to_volume.values()) + db_api.update_transfer_action_info_for_instance( + ctxt, action_id, task.instance, + {"volumes_info": merged_waiter_volumes}) + LOG.info( + "Inherited %d shared disk volume mappings for waiter " + "instance '%s' from owner instance '%s'.", + inherited_count, task.instance, owner_instance) + + waiter_all_identities = set(waiter_identity_to_disk_id.keys()) + waiter_non_shared_identities = { + ident for ident in waiter_all_identities + if ident not in shared_identities} + if waiter_non_shared_identities: + LOG.info( + "Releasing SYNCING replicate task '%s' for instance '%s' " + "back to SCHEDULED after inheriting shared mappings. " + "Task still has %d non-shared disk(s) to replicate.", + task.id, task.instance, len(waiter_non_shared_identities)) + db_api.set_task_status( + ctxt, task.id, constants.TASK_STATUS_SCHEDULED) + else: + LOG.info( + "Auto-completing SYNCING replicate task '%s' for instance " + "'%s' after completion of owner task '%s'.", + task.id, task.instance, completed_task.id) + db_api.set_task_status( + ctxt, task.id, constants.TASK_STATUS_COMPLETED, + exception_details=( + "Task auto-completed after inheriting shared-disk " + "replication results from the owner task.")) + released_instances.append(task.instance) + return released_instances + @staticmethod def _update_transfer_volumes_info(ctxt, transfer_id, instance, updated_task_info): @@ -3103,6 +3379,11 @@ def task_completed(self, ctxt, task_id, task_result): newly_started_tasks = self._advance_execution_state( ctxt, execution, instance=task.instance, requery=False) + released_instances = self._release_cluster_synced_replicate_tasks( + ctxt, execution, action_id, task) + for released_instance in released_instances: + self._advance_execution_state( + ctxt, execution, instance=released_instance, requery=True) if newly_started_tasks: LOG.info( "The following tasks were started for execution '%s' " diff --git a/coriolis/constants.py b/coriolis/constants.py index 71e55343..2bf7f825 100644 --- a/coriolis/constants.py +++ b/coriolis/constants.py @@ -50,11 +50,13 @@ TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK" TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY" TASK_STATUS_FAILED_TO_SCHEDULE = "FAILED_TO_SCHEDULE" +TASK_STATUS_SYNCING = "SYNCING" ACTIVE_TASK_STATUSES = [ TASK_STATUS_PENDING, TASK_STATUS_STARTING, TASK_STATUS_RUNNING, + TASK_STATUS_SYNCING, TASK_STATUS_CANCELLING, TASK_STATUS_CANCELLING_AFTER_COMPLETION ] @@ -161,6 +163,10 @@ TASK_TYPE_POWER_ON_DESTINATION_MINION = "POWER_ON_DESTINATION_MINION" TASK_TYPE_POWER_OFF_DESTINATION_MINION = "POWER_OFF_DESTINATION_MINION" +TASK_TYPES_REQUIRING_CLUSTER_SYNC = [ + TASK_TYPE_REPLICATE_DISKS, +] + MINION_POOL_OPERATIONS_TASKS = [ TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS, @@ -218,6 +224,7 @@ DISK_FORMAT_QCOW2 = 'qcow2' DISK_FORMAT_VHD = 'vhd' DISK_FORMAT_VHDX = 'vhdx' +VOLUME_INFO_REPLICATE_DISK_DATA = "replicate_disk_data" DISK_ALLOCATION_TYPE_STATIC = "static" DISK_ALLOCATION_TYPE_DYNAMIC = "dynamic" diff --git a/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py new file mode 100644 index 00000000..9e834e85 --- /dev/null +++ b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py @@ -0,0 +1,20 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +import sqlalchemy + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData() + meta.bind = migrate_engine + + base_transfer = sqlalchemy.Table( + 'base_transfer_action', meta, autoload=True) + if 'clustered' in base_transfer.c: + return + # server_default so existing rows get a value when the column is added + # (MySQL stores booleans as TINYINT). + clustered = sqlalchemy.Column( + 'clustered', sqlalchemy.Boolean, nullable=False, + server_default=sqlalchemy.text('0')) + base_transfer.create_column(clustered) diff --git a/coriolis/db/sqlalchemy/models.py b/coriolis/db/sqlalchemy/models.py index d4377999..0dba8b6c 100644 --- a/coriolis/db/sqlalchemy/models.py +++ b/coriolis/db/sqlalchemy/models.py @@ -285,6 +285,9 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase, sqlalchemy.Boolean, nullable=False, default=True) skip_os_morphing = sqlalchemy.Column( sqlalchemy.Boolean, nullable=False, default=False) + # Multi-instance transfer hint; must be set on INSERT (MySQL NOT NULL). + clustered = sqlalchemy.Column( + sqlalchemy.Boolean, nullable=False, default=False) __mapper_args__ = { 'polymorphic_identity': 'base_transfer_action', @@ -320,6 +323,7 @@ def to_dict(self, include_task_info=True, include_executions=True): "user_scripts": self.user_scripts, "clone_disks": self.clone_disks, "skip_os_morphing": self.skip_os_morphing, + "clustered": bool(self.clustered), } if include_executions: for ex in self.executions: diff --git a/coriolis/tasks/replica_tasks.py b/coriolis/tasks/replica_tasks.py index 0407e8bc..a89a952d 100644 --- a/coriolis/tasks/replica_tasks.py +++ b/coriolis/tasks/replica_tasks.py @@ -244,12 +244,28 @@ def _run(self, ctxt, instance, origin, destination, task_info, source_environment = task_info['source_environment'] source_resources = task_info.get('source_resources', {}) - volumes_info = provider.replicate_disks( - ctxt, connection_info, source_environment, instance, - source_resources, migr_source_conn_info, migr_target_conn_info, - volumes_info, incremental) - schemas.validate_value( - volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) + + volumes_to_replicate = [ + vol for vol in volumes_info + if vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)] + pre_replicated_volumes = [ + vol for vol in volumes_info + if not vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)] + + if volumes_to_replicate: + replicated_volumes = provider.replicate_disks( + ctxt, connection_info, source_environment, instance, + source_resources, migr_source_conn_info, migr_target_conn_info, + volumes_to_replicate, incremental) + schemas.validate_value( + replicated_volumes, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) + else: + LOG.info( + "No disks marked for replication for instance '%s'. " + "Using pre-provisioned volumes_info.", instance) + replicated_volumes = [] + + volumes_info = pre_replicated_volumes + replicated_volumes volumes_info = _check_ensure_volumes_info_ordering( export_info, volumes_info) diff --git a/coriolis/tests/conductor/rpc/test_client.py b/coriolis/tests/conductor/rpc/test_client.py index 2fc6084b..1eac552d 100644 --- a/coriolis/tests/conductor/rpc/test_client.py +++ b/coriolis/tests/conductor/rpc/test_client.py @@ -197,6 +197,7 @@ def test_create_instances_transfer(self): "user_scripts": None, "clone_disks": True, "skip_os_morphing": False, + "clustered": None, } args.update(new_args) self._test(self.client.create_instances_transfer, args) diff --git a/coriolis/tests/db/sqlalchemy/test_models.py b/coriolis/tests/db/sqlalchemy/test_models.py index 7b0c7610..4b1f2412 100644 --- a/coriolis/tests/db/sqlalchemy/test_models.py +++ b/coriolis/tests/db/sqlalchemy/test_models.py @@ -313,6 +313,7 @@ def test_to_dict(self): "user_scripts": mock.sentinel.user_scripts, "info": mock.sentinel.info, "clone_disks": True, + "clustered": False, "skip_os_morphing": False, } diff --git a/coriolis/tests/tasks/test_replica_tasks.py b/coriolis/tests/tasks/test_replica_tasks.py index 1e3207d3..dcfc955a 100644 --- a/coriolis/tests/tasks/test_replica_tasks.py +++ b/coriolis/tests/tasks/test_replica_tasks.py @@ -145,6 +145,8 @@ def test__run(self, mock_unmarshal, mock_check_vol_info, mock_get_vol_info, task_info.get.side_effect = [ task_info['incremental'], task_info['source_resources']] prov_fun = mock_get_provider.return_value.replicate_disks + mock_get_vol_info.return_value = [{"disk_id": "disk_id1"}] + prov_fun.return_value = [{"disk_id": "disk_id1"}] expected_result = {"volumes_info": mock_check_vol_info.return_value} expected_validation_calls = [ mock.call.mock_validate_value(