From f65277605920a3da0753e74b595a4447c0c7ef90 Mon Sep 17 00:00:00 2001 From: Daniel Vincze Date: Thu, 31 Oct 2024 14:53:32 +0200 Subject: [PATCH] WiP - Refactor conductor layer Refactors include mostly renames of Replica into Transfer, and Migration into Deployment. They also include some constants refactoring (renaming and removing deprecated ones). --- coriolis/api/v1/replica_schedules.py | 2 +- coriolis/api/v1/replicas.py | 6 +- coriolis/cmd/replica_cron.py | 4 +- coriolis/conductor/rpc/server.py | 1129 ++++++++--------- coriolis/constants.py | 103 +- coriolis/db/sqlalchemy/models.py | 2 +- coriolis/diagnostics/api.py | 4 +- coriolis/exception.py | 12 +- coriolis/minion_manager/rpc/client.py | 4 +- coriolis/minion_manager/rpc/server.py | 8 +- coriolis/minion_manager/rpc/tasks.py | 4 +- coriolis/scheduler/scheduler_utils.py | 6 +- coriolis/tasks/factory.py | 58 +- coriolis/tasks/replica_tasks.py | 2 +- .../tests/api/v1/test_replica_schedules.py | 2 +- coriolis/tests/cmd/test_replica_cron.py | 4 +- coriolis/tests/conductor/rpc/test_server.py | 196 +-- .../tests/minion_manager/rpc/test_client.py | 4 +- .../tests/minion_manager/rpc/test_tasks.py | 2 +- .../tests/replica_cron/rpc/test_client.py | 4 +- .../tests/replica_cron/rpc/test_server.py | 4 +- coriolis/tests/replica_cron/test_api.py | 2 +- coriolis/tests/tasks/test_replica_tasks.py | 2 +- .../__init__.py | 0 .../{replica_cron => transfer_cron}/api.py | 0 .../rpc/__init__.py | 0 .../rpc/client.py | 6 +- .../rpc/server.py | 4 +- 28 files changed, 754 insertions(+), 820 deletions(-) rename coriolis/{replica_cron => transfer_cron}/__init__.py (100%) rename coriolis/{replica_cron => transfer_cron}/api.py (100%) rename coriolis/{replica_cron => transfer_cron}/rpc/__init__.py (100%) rename coriolis/{replica_cron => transfer_cron}/rpc/client.py (75%) rename coriolis/{replica_cron => transfer_cron}/rpc/server.py (97%) diff --git a/coriolis/api/v1/replica_schedules.py b/coriolis/api/v1/replica_schedules.py index 32a3e1d02..ce8cb01d3 100644 --- a/coriolis/api/v1/replica_schedules.py +++ b/coriolis/api/v1/replica_schedules.py @@ -5,7 +5,7 @@ from coriolis.api import wsgi as api_wsgi from coriolis import exception from coriolis.policies import replica_schedules as schedules_policies -from coriolis.replica_cron import api +from coriolis.transfer_cron import api from coriolis import schemas import jsonschema diff --git a/coriolis/api/v1/replicas.py b/coriolis/api/v1/replicas.py index 1d1aac9c1..197d80e84 100644 --- a/coriolis/api/v1/replicas.py +++ b/coriolis/api/v1/replicas.py @@ -27,8 +27,8 @@ LOG = logging.getLogger(__name__) SUPPORTED_REPLICA_SCENARIOS = [ - constants.REPLICA_SCENARIO_REPLICA, - constants.REPLICA_SCENARIO_LIVE_MIGRATION] + constants.TRANSFER_SCENARIO_REPLICA, + constants.TRANSFER_SCENARIO_LIVE_MIGRATION] class ReplicaController(api_wsgi.Controller): @@ -79,7 +79,7 @@ def _validate_create_body(self, context, body): f"'{scenario}', must be one of: " f"{SUPPORTED_REPLICA_SCENARIOS}") else: - scenario = constants.REPLICA_SCENARIO_REPLICA + scenario = constants.TRANSFER_SCENARIO_REPLICA LOG.warn( "No Replica 'scenario' field set in Replica body, " f"defaulting to: '{scenario}'") diff --git a/coriolis/cmd/replica_cron.py b/coriolis/cmd/replica_cron.py index 2484ccde6..1f7d09631 100644 --- a/coriolis/cmd/replica_cron.py +++ b/coriolis/cmd/replica_cron.py @@ -6,7 +6,7 @@ from oslo_config import cfg from coriolis import constants -from coriolis.replica_cron.rpc import server as rpc_server +from coriolis.transfer_cron.rpc import server as rpc_server from coriolis import service from coriolis import utils @@ -19,7 +19,7 @@ def main(): utils.setup_logging() server = service.MessagingService( - constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC, + constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC, [rpc_server.ReplicaCronServerEndpoint()], rpc_server.VERSION, worker_count=1) launcher = service.service.launch( diff --git a/coriolis/conductor/rpc/server.py b/coriolis/conductor/rpc/server.py index 1d9bad4e3..69904ebca 100644 --- a/coriolis/conductor/rpc/server.py +++ b/coriolis/conductor/rpc/server.py @@ -18,7 +18,7 @@ from coriolis import keystone from coriolis.licensing import client as licensing_client from coriolis.minion_manager.rpc import client as rpc_minion_manager_client -from coriolis.replica_cron.rpc import client as rpc_cron_client +from coriolis.transfer_cron.rpc import client as rpc_cron_client from coriolis.scheduler.rpc import client as rpc_scheduler_client from coriolis import schemas from coriolis.tasks import factory as tasks_factory @@ -46,9 +46,9 @@ "Please review the Conductor logs and contact support for assistance.") SCENARIO_TYPE_TO_LICENSING_RESERVATION_MAP = { - constants.REPLICA_SCENARIO_REPLICA: + constants.TRANSFER_SCENARIO_REPLICA: licensing_client.RESERVATION_TYPE_REPLICA, - constants.REPLICA_SCENARIO_LIVE_MIGRATION: + constants.TRANSFER_SCENARIO_LIVE_MIGRATION: licensing_client.RESERVATION_TYPE_MIGRATION } @@ -65,26 +65,26 @@ def inner(): return wrapper -def replica_synchronized(func): +def transfer_synchronized(func): @functools.wraps(func) - def wrapper(self, ctxt, replica_id, *args, **kwargs): + def wrapper(self, ctxt, transfer_id, *args, **kwargs): @lockutils.synchronized( - constants.REPLICA_LOCK_NAME_FORMAT % replica_id, + constants.TRANSFER_LOCK_NAME_FORMAT % transfer_id, external=True) def inner(): - return func(self, ctxt, replica_id, *args, **kwargs) + return func(self, ctxt, transfer_id, *args, **kwargs) return inner() return wrapper def schedule_synchronized(func): @functools.wraps(func) - def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs): + def wrapper(self, ctxt, transfer_id, schedule_id, *args, **kwargs): @lockutils.synchronized( constants.SCHEDULE_LOCK_NAME_FORMAT % schedule_id, external=True) def inner(): - return func(self, ctxt, replica_id, schedule_id, *args, **kwargs) + return func(self, ctxt, transfer_id, schedule_id, *args, **kwargs) return inner() return wrapper @@ -118,18 +118,6 @@ def inner(): return wrapper -def migration_synchronized(func): - @functools.wraps(func) - def wrapper(self, ctxt, migration_id, *args, **kwargs): - @lockutils.synchronized( - constants.MIGRATION_LOCK_NAME_FORMAT % migration_id, - external=True) - def inner(): - return func(self, ctxt, migration_id, *args, **kwargs) - return inner() - return wrapper - - def deployment_synchronized(func): @functools.wraps(func) def wrapper(self, ctxt, deployment_id, *args, **kwargs): @@ -144,12 +132,12 @@ def inner(): def tasks_execution_synchronized(func): @functools.wraps(func) - def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs): + def wrapper(self, ctxt, transfer_id, execution_id, *args, **kwargs): @lockutils.synchronized( constants.EXECUTION_LOCK_NAME_FORMAT % execution_id, external=True) def inner(): - return func(self, ctxt, replica_id, execution_id, *args, **kwargs) + return func(self, ctxt, transfer_id, execution_id, *args, **kwargs) return inner() return wrapper @@ -183,7 +171,7 @@ def __init__(self): self._licensing_client = licensing_client.LicensingClient.from_env() self._worker_client_instance = None self._scheduler_client_instance = None - self._replica_cron_client_instance = None + self._transfer_cron_client_instance = None self._minion_manager_client_instance = None # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated @@ -206,11 +194,11 @@ def _scheduler_client(self): return self._scheduler_client_instance @property - def _replica_cron_client(self): - if not self._replica_cron_client_instance: - self._replica_cron_client_instance = ( - rpc_cron_client.ReplicaCronClient()) - return self._replica_cron_client_instance + def _transfer_cron_client(self): + if not self._transfer_cron_client_instance: + self._transfer_cron_client_instance = ( + rpc_cron_client.TransferCronClient()) + return self._transfer_cron_client_instance @property def _minion_manager_client(self): @@ -222,7 +210,7 @@ def _minion_manager_client(self): def get_all_diagnostics(self, ctxt): client_objects = { "conductor": self, - "replica_cron": self._replica_cron_client, + "transfer_cron": self._transfer_cron_client, "minion_manager": self._minion_manager_client, "scheduler": self._scheduler_client} @@ -297,22 +285,22 @@ def _check_delete_reservation_for_transfer(self, transfer_action): "action with ID '%s'. Skipping. Exception\n%s", reservation_id, action_id, utils.get_exception_details()) - def _create_reservation_for_replica(self, replica): - action_id = replica.base_id - scenario = replica.scenario + def _create_reservation_for_transfer(self, transfer): + action_id = transfer.base_id + scenario = transfer.scenario reservation_type = SCENARIO_TYPE_TO_LICENSING_RESERVATION_MAP.get( scenario, None) if not reservation_type: raise exception.LicensingException( - message="Could not determine reservation type for replica " - f"'{action_id}' with scenario '{replica.scenario}'.") + message="Could not determine reservation type for transfer " + f"'{action_id}' with scenario '{transfer.scenario}'.") if not self._licensing_client: LOG.warn( "Licensing client not instantiated. Skipping creation of " "reservation for transfer action '%s'", action_id) return - ninstances = len(replica.instances) + ninstances = len(transfer.instances) LOG.debug( "Attempting to create '%s' reservation for %d instances for " "transfer action with ID '%s'.", @@ -323,7 +311,7 @@ def _create_reservation_for_replica(self, replica): LOG.info( f"Sucessfully created licensing reservation for transfer " f"with ID '{action_id}' with properties: {reservation}") - replica.reservation_id = reservation['id'] + transfer.reservation_id = reservation['id'] return reservation @@ -374,23 +362,24 @@ def _check_mark_reservation_fulfilled( f"Successfully marked reservation with ID '{reservation_id}' " f"for transfer action '{action_id}' as fulfilled") - def _check_reservation_for_replica(self, replica): - scenario = replica.scenario + def _check_reservation_for_tranfer(self, transfer): + scenario = transfer.scenario reservation_type = SCENARIO_TYPE_TO_LICENSING_RESERVATION_MAP.get( scenario, None) if not reservation_type: raise exception.LicensingException( - message="Could not determine reservation type for replica " - f"'{replica.id}' with scenario '{replica.scenario}'.") + message="Could not determine reservation type for transfer " + f"'{transfer.id}' with scenario " + f"'{transfer.scenario}'.") - action_id = replica.base_id + action_id = transfer.base_id if not self._licensing_client: LOG.warn( "Licensing client not instantiated. Skipping checking of " "reservation for transfer action '%s'", action_id) return - reservation_id = replica.reservation_id + reservation_id = transfer.reservation_id if reservation_id: LOG.debug( "Attempting to check reservation with ID '%s' for transfer " @@ -400,13 +389,13 @@ def _check_reservation_for_replica(self, replica): reservation_id) fulfilled_at = reservation.get("fulfilled_at", None) - if scenario == constants.REPLICA_SCENARIO_LIVE_MIGRATION and ( + if scenario == constants.TRANSFER_SCENARIO_LIVE_MIGRATION and ( fulfilled_at): raise exception.MigrationLicenceFulfilledException( - action_id=replica.id, reservation_id=reservation_id, + action_id=transfer.id, reservation_id=reservation_id, fulfilled_at=fulfilled_at) - replica.reservation_id = ( + transfer.reservation_id = ( self._licensing_client.check_refresh_reservation( reservation_id)['id']) except Exception as ex: @@ -427,14 +416,14 @@ def _check_reservation_for_replica(self, replica): "reservation. Trace was: %s", reservation_id, action_id, utils.get_exception_details()) - self._create_reservation_for_replica(replica) + self._create_reservation_for_transfer(transfer) else: raise ex else: LOG.info( f"Transfer action '{action_id}' has no reservation ID set, " f"attempting to create a new one for it") - self._create_reservation_for_replica(replica) + self._create_reservation_for_transfer(transfer) def create_endpoint(self, ctxt, name, endpoint_type, description, connection_info, mapped_regions=None): @@ -487,11 +476,11 @@ def get_endpoint(self, ctxt, endpoint_id): @endpoint_synchronized def delete_endpoint(self, ctxt, endpoint_id): - q_replicas_count = db_api.get_endpoint_transfers_count( + q_transfers_count = db_api.get_endpoint_transfers_count( ctxt, endpoint_id) - if q_replicas_count != 0: - raise exception.NotAuthorized("%s replicas would be orphaned!" % - q_replicas_count) + if q_transfers_count != 0: + raise exception.NotAuthorized("%s transfers would be orphaned!" % + q_transfers_count) db_api.delete_endpoint(ctxt, endpoint_id) def get_endpoint_instances(self, ctxt, endpoint_id, source_environment, @@ -904,39 +893,40 @@ def _check_task_cls_param_requirements(task, instance_task_info_keys): "for ordering or state conflicts.", execution.id, execution.type) - @replica_synchronized - def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): - replica = self._get_replica(ctxt, replica_id, include_task_info=True) - self._check_replica_running_executions(ctxt, replica) - self._check_minion_pools_for_action(ctxt, replica) - self._check_reservation_for_replica(replica) + @transfer_synchronized + def execute_transfer_tasks(self, ctxt, transfer_id, shutdown_instances): + transfer = self._get_transfer( + ctxt, transfer_id, include_task_info=True) + self._check_transfer_running_executions(ctxt, transfer) + self._check_minion_pools_for_action(ctxt, transfer) + self._check_reservation_for_tranfer(transfer) execution = models.TasksExecution() execution.id = str(uuid.uuid4()) - execution.action = replica + execution.action = transfer execution.status = constants.EXECUTION_STATUS_UNEXECUTED - execution.type = constants.EXECUTION_TYPE_REPLICA_EXECUTION + execution.type = constants.EXECUTION_TYPE_TRANSFER_EXECUTION # TODO(aznashwan): have these passed separately to the relevant # provider methods. They're currently passed directly inside # dest-env by the API service when accepting the call, but we - # re-overwrite them here in case of Replica updates. - dest_env = copy.deepcopy(replica.destination_environment) - dest_env['network_map'] = replica.network_map - dest_env['storage_mappings'] = replica.storage_mappings + # re-overwrite them here in case of Transfer updates. + dest_env = copy.deepcopy(transfer.destination_environment) + dest_env['network_map'] = transfer.network_map + dest_env['storage_mappings'] = transfer.storage_mappings for instance in execution.action.instances: # NOTE: we default/convert the volumes info to an empty list # to preserve backwards-compatibility with older versions # of Coriolis dating before the scheduling overhaul (PR##114) - if instance not in replica.info: - replica.info[instance] = {'volumes_info': []} - elif replica.info[instance].get('volumes_info') is None: - replica.info[instance]['volumes_info'] = [] + if instance not in transfer.info: + transfer.info[instance] = {'volumes_info': []} + elif transfer.info[instance].get('volumes_info') is None: + transfer.info[instance]['volumes_info'] = [] # NOTE: we update all of the param values before triggering an # execution to ensure that the latest parameters are used: - replica.info[instance].update({ - "source_environment": replica.source_environment, + transfer.info[instance].update({ + "source_environment": transfer.source_environment, "target_environment": dest_env}) # TODO(aznashwan): have these passed separately to the relevant # provider methods (they're currently passed directly inside @@ -944,9 +934,9 @@ def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): # "network_map": network_map, # "storage_mappings": storage_mappings, - validate_replica_source_inputs_task = self._create_task( + validate_transfer_source_inputs_task = self._create_task( instance, - constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS, + constants.TASK_TYPE_VALIDATE_TRANSFER_SOURCE_INPUTS, execution) get_instance_info_task = self._create_task( @@ -954,20 +944,20 @@ def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): constants.TASK_TYPE_GET_INSTANCE_INFO, execution) - validate_replica_destination_inputs_task = self._create_task( + validate_transfer_destination_inputs_task = self._create_task( instance, - constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS, + constants.TASK_TYPE_VALIDATE_TRANSFER_DESTINATION_INPUTS, execution, depends_on=[get_instance_info_task.id]) disk_deployment_depends_on = [] validate_origin_minion_task = None - if replica.origin_minion_pool_id: + if transfer.origin_minion_pool_id: # NOTE: these values are required for the # _check_execution_tasks_sanity call but # will be populated later when the pool # allocations actually happen: - replica.info[instance].update({ + transfer.info[instance].update({ "origin_minion_machine_id": None, "origin_minion_provider_properties": None, "origin_minion_connection_info": None}) @@ -977,20 +967,20 @@ def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): execution, depends_on=[ get_instance_info_task.id, - validate_replica_source_inputs_task.id]) + validate_transfer_source_inputs_task.id]) disk_deployment_depends_on.append( validate_origin_minion_task.id) else: disk_deployment_depends_on.append( - validate_replica_source_inputs_task.id) + validate_transfer_source_inputs_task.id) validate_destination_minion_task = None - if replica.destination_minion_pool_id: + if transfer.destination_minion_pool_id: # NOTE: these values are required for the # _check_execution_tasks_sanity call but # will be populated later when the pool # allocations actually happen: - replica.info[instance].update({ + transfer.info[instance].update({ "destination_minion_machine_id": None, "destination_minion_provider_properties": None, "destination_minion_connection_info": None, @@ -1000,42 +990,42 @@ def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY, # noqa: E501 execution, depends_on=[ - validate_replica_destination_inputs_task.id]) + validate_transfer_destination_inputs_task.id]) disk_deployment_depends_on.append( validate_destination_minion_task.id) else: disk_deployment_depends_on.append( - validate_replica_destination_inputs_task.id) + validate_transfer_destination_inputs_task.id) - deploy_replica_disks_task = self._create_task( - instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS, + deploy_transfer_disks_task = self._create_task( + instance, constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS, execution, depends_on=disk_deployment_depends_on) shutdown_deps = [] - deploy_replica_source_resources_task = None - if not replica.origin_minion_pool_id: - deploy_replica_source_resources_task = self._create_task( + deploy_transfer_source_resources_task = None + if not transfer.origin_minion_pool_id: + deploy_transfer_source_resources_task = self._create_task( instance, - constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES, + constants.TASK_TYPE_DEPLOY_TRANSFER_SOURCE_RESOURCES, execution, depends_on=[ - deploy_replica_disks_task.id]) - shutdown_deps.append(deploy_replica_source_resources_task) + deploy_transfer_disks_task.id]) + shutdown_deps.append(deploy_transfer_source_resources_task) attach_destination_minion_disks_task = None - deploy_replica_target_resources_task = None - if replica.destination_minion_pool_id: + deploy_transfer_target_resources_task = None + if transfer.destination_minion_pool_id: ttyp = constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION attach_destination_minion_disks_task = self._create_task( instance, ttyp, execution, depends_on=[ - deploy_replica_disks_task.id]) + deploy_transfer_disks_task.id]) shutdown_deps.append(attach_destination_minion_disks_task) else: - deploy_replica_target_resources_task = self._create_task( + deploy_transfer_target_resources_task = self._create_task( instance, - constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES, + constants.TASK_TYPE_DEPLOY_TRANSFER_TARGET_RESOURCES, execution, depends_on=[ - deploy_replica_disks_task.id]) - shutdown_deps.append(deploy_replica_target_resources_task) + deploy_transfer_disks_task.id]) + shutdown_deps.append(deploy_transfer_target_resources_task) depends_on = [t.id for t in shutdown_deps] if shutdown_instances: @@ -1048,7 +1038,7 @@ def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): instance, constants.TASK_TYPE_REPLICATE_DISKS, execution, depends_on=depends_on) - if replica.origin_minion_pool_id: + if transfer.origin_minion_pool_id: self._create_task( instance, constants.TASK_TYPE_RELEASE_SOURCE_MINION, @@ -1060,14 +1050,14 @@ def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): else: self._create_task( instance, - constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES, + constants.TASK_TYPE_DELETE_TRANSFER_SOURCE_RESOURCES, execution, depends_on=[ - deploy_replica_source_resources_task.id, + deploy_transfer_source_resources_task.id, replicate_disks_task.id], on_error=True) - if replica.destination_minion_pool_id: + if transfer.destination_minion_pool_id: detach_volumes_from_minion_task = self._create_task( instance, constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION, @@ -1088,162 +1078,162 @@ def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances): else: self._create_task( instance, - constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES, + constants.TASK_TYPE_DELETE_TRANSFER_TARGET_RESOURCES, execution, depends_on=[ - deploy_replica_target_resources_task.id, + deploy_transfer_target_resources_task.id, replicate_disks_task.id], on_error=True) - self._check_execution_tasks_sanity(execution, replica.info) + self._check_execution_tasks_sanity(execution, transfer.info) - # update the action info for all of the Replicas: + # update the action info for all of the Transfers: for instance in execution.action.instances: db_api.update_transfer_action_info_for_instance( - ctxt, replica.id, instance, replica.info[instance]) + ctxt, transfer.id, instance, transfer.info[instance]) # add new execution to DB: db_api.add_transfer_tasks_execution(ctxt, execution) - LOG.info("Replica tasks execution added to DB: %s", execution.id) + LOG.info("Transfer tasks execution added to DB: %s", execution.id) uses_minion_pools = any([ - replica.origin_minion_pool_id, - replica.destination_minion_pool_id]) + transfer.origin_minion_pool_id, + transfer.destination_minion_pool_id]) if uses_minion_pools: - self._minion_manager_client.allocate_minion_machines_for_replica( - ctxt, replica) + self._minion_manager_client.allocate_minion_machines_for_transfer( + ctxt, transfer) self._set_tasks_execution_status( ctxt, execution, constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS) else: - self._begin_tasks(ctxt, replica, execution) + self._begin_tasks(ctxt, transfer, execution) - return self.get_replica_tasks_execution( - ctxt, replica_id, execution.id) + return self.get_transfer_tasks_execution( + ctxt, transfer_id, execution.id) - @replica_synchronized - def get_replica_tasks_executions(self, ctxt, replica_id, - include_tasks=False, - include_task_info=False): + @transfer_synchronized + def get_transfer_tasks_executions(self, ctxt, transfer_id, + include_tasks=False, + include_task_info=False): return db_api.get_transfer_tasks_executions( - ctxt, replica_id, include_tasks, + ctxt, transfer_id, include_tasks, include_task_info=include_task_info, to_dict=True) @tasks_execution_synchronized - def get_replica_tasks_execution(self, ctxt, replica_id, execution_id, - include_task_info=False): - return self._get_replica_tasks_execution( - ctxt, replica_id, execution_id, + def get_transfer_tasks_execution(self, ctxt, transfer_id, execution_id, + include_task_info=False): + return self._get_transfer_tasks_execution( + ctxt, transfer_id, execution_id, include_task_info=include_task_info, to_dict=True) @tasks_execution_synchronized - def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id): - execution = self._get_replica_tasks_execution( - ctxt, replica_id, execution_id) + def delete_transfer_tasks_execution(self, ctxt, transfer_id, execution_id): + execution = self._get_transfer_tasks_execution( + ctxt, transfer_id, execution_id) if execution.status in constants.ACTIVE_EXECUTION_STATUSES: - raise exception.InvalidMigrationState( - "Cannot delete execution '%s' for Replica '%s' as it is " + raise exception.InvalidTasksExecutionState( + "Cannot delete execution '%s' for Transfer '%s' as it is " "currently in '%s' state." % ( - execution_id, replica_id, execution.status)) + execution_id, transfer_id, execution.status)) db_api.delete_transfer_tasks_execution(ctxt, execution_id) @tasks_execution_synchronized - def cancel_replica_tasks_execution(self, ctxt, replica_id, execution_id, - force): - execution = self._get_replica_tasks_execution( - ctxt, replica_id, execution_id) + def cancel_transfer_tasks_execution(self, ctxt, transfer_id, execution_id, + force): + execution = self._get_transfer_tasks_execution( + ctxt, transfer_id, execution_id) if execution.status not in constants.ACTIVE_EXECUTION_STATUSES: - raise exception.InvalidReplicaState( - "Replica '%s' has no running execution to cancel." % ( - replica_id)) + raise exception.InvalidTransferState( + "Transfer '%s' has no running execution to cancel." % ( + transfer_id)) if execution.status == constants.EXECUTION_STATUS_CANCELLING and ( not force): - raise exception.InvalidReplicaState( - "Replica '%s' is already being cancelled. Please use the " + raise exception.InvalidTransferState( + "Transfer '%s' is already being cancelled. Please use the " "force option if you'd like to force-cancel it." % ( - replica_id)) + transfer_id)) self._cancel_tasks_execution(ctxt, execution, force=force) - def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id, - include_task_info=False, to_dict=False): + def _get_transfer_tasks_execution(self, ctxt, transfer_id, execution_id, + include_task_info=False, to_dict=False): execution = db_api.get_transfer_tasks_execution( - ctxt, replica_id, execution_id, + ctxt, transfer_id, execution_id, include_task_info=include_task_info, to_dict=to_dict) if not execution: raise exception.NotFound( - "Execution with ID '%s' for Replica '%s' not found." % ( - execution_id, replica_id)) + "Execution with ID '%s' for Transfer '%s' not found." % ( + execution_id, transfer_id)) return execution - def get_replicas(self, ctxt, include_tasks_executions=False, - include_task_info=False): + def get_transfers(self, ctxt, include_tasks_executions=False, + include_task_info=False): return db_api.get_transfers( ctxt, include_tasks_executions, include_task_info=include_task_info, to_dict=True) - @replica_synchronized - def get_replica(self, ctxt, replica_id, include_task_info=False): - return self._get_replica( - ctxt, replica_id, + @transfer_synchronized + def get_transfer(self, ctxt, transfer_id, include_task_info=False): + return self._get_transfer( + ctxt, transfer_id, include_task_info=include_task_info, to_dict=True) - @replica_synchronized - def delete_replica(self, ctxt, replica_id): - replica = self._get_replica(ctxt, replica_id) - self._check_replica_running_executions(ctxt, replica) - self._check_delete_reservation_for_transfer(replica) - db_api.delete_transfer(ctxt, replica_id) + @transfer_synchronized + def delete_transfer(self, ctxt, transfer_id): + transfer = self._get_transfer(ctxt, transfer_id) + self._check_transfer_running_executions(ctxt, transfer) + self._check_delete_reservation_for_transfer(transfer) + db_api.delete_transfer(ctxt, transfer_id) - @replica_synchronized - def delete_replica_disks(self, ctxt, replica_id): - replica = self._get_replica(ctxt, replica_id, include_task_info=True) - self._check_replica_running_executions(ctxt, replica) + @transfer_synchronized + def delete_transfer_disks(self, ctxt, transfer_id): + transfer = self._get_transfer(ctxt, transfer_id, include_task_info=True) + self._check_transfer_running_executions(ctxt, transfer) execution = models.TasksExecution() execution.id = str(uuid.uuid4()) execution.status = constants.EXECUTION_STATUS_UNEXECUTED - execution.action = replica - execution.type = constants.EXECUTION_TYPE_REPLICA_DISKS_DELETE + execution.action = transfer + execution.type = constants.EXECUTION_TYPE_TRANSFER_DISKS_DELETE has_tasks = False - for instance in replica.instances: - if (instance in replica.info and ( - replica.info[instance].get('volumes_info'))): + for instance in transfer.instances: + if (instance in transfer.info and ( + transfer.info[instance].get('volumes_info'))): source_del_task = self._create_task( instance, - constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS, + constants.TASK_TYPE_DELETE_TRANSFER_SOURCE_DISK_SNAPSHOTS, execution) self._create_task( - instance, constants.TASK_TYPE_DELETE_REPLICA_DISKS, + instance, constants.TASK_TYPE_DELETE_TRANSFER_DISKS, execution, depends_on=[source_del_task.id]) has_tasks = True if not has_tasks: - raise exception.InvalidReplicaState( - "Replica '%s' does not have volumes information for any " - "instances. Ensure that the replica has been executed " - "successfully priorly" % replica_id) + raise exception.InvalidTransferState( + "Transfer '%s' does not have volumes information for any " + "instances. Ensure that the transfer has been executed " + "successfully priorly" % transfer_id) # ensure we're passing the updated target-env options on the - # parent Replica itself in case of a Replica update: - dest_env = copy.deepcopy(replica.destination_environment) - dest_env['network_map'] = replica.network_map - dest_env['storage_mappings'] = replica.storage_mappings - for instance in replica.instances: - replica.info[instance].update({ + # parent Transfer itself in case of a Transfer update: + dest_env = copy.deepcopy(transfer.destination_environment) + dest_env['network_map'] = transfer.network_map + dest_env['storage_mappings'] = transfer.storage_mappings + for instance in transfer.instances: + transfer.info[instance].update({ "target_environment": dest_env}) - self._check_execution_tasks_sanity(execution, replica.info) + self._check_execution_tasks_sanity(execution, transfer.info) - # update the action info for all of the Replicas' instances: - for instance in replica.instances: + # update the action info for all of the Transfers' instances: + for instance in transfer.instances: db_api.update_transfer_action_info_for_instance( - ctxt, replica.id, instance, replica.info[instance]) + ctxt, transfer.id, instance, transfer.info[instance]) db_api.add_transfer_tasks_execution(ctxt, execution) - LOG.info("Replica tasks execution created: %s", execution.id) + LOG.info("Transfer tasks execution created: %s", execution.id) - self._begin_tasks(ctxt, replica, execution) - return self.get_replica_tasks_execution( - ctxt, replica_id, execution.id) + self._begin_tasks(ctxt, transfer, execution) + return self.get_transfer_tasks_execution( + ctxt, transfer_id, execution.id) @staticmethod def _check_endpoints(ctxt, origin_endpoint, destination_endpoint): @@ -1258,74 +1248,68 @@ def _check_endpoints(ctxt, origin_endpoint, destination_endpoint): destination_endpoint.connection_info)): raise exception.SameDestination() - def create_instances_replica(self, ctxt, replica_scenario, - origin_endpoint_id, - destination_endpoint_id, - origin_minion_pool_id, - destination_minion_pool_id, - instance_osmorphing_minion_pool_mappings, - source_environment, - destination_environment, instances, - network_map, storage_mappings, notes=None, - user_scripts=None): + def create_instances_transfer(self, ctxt, transfer_scenario, + origin_endpoint_id, + destination_endpoint_id, + origin_minion_pool_id, + destination_minion_pool_id, + instance_osmorphing_minion_pool_mappings, + source_environment, + destination_environment, instances, + network_map, storage_mappings, notes=None, + user_scripts=None): supported_scenarios = [ - constants.REPLICA_SCENARIO_REPLICA, - constants.REPLICA_SCENARIO_LIVE_MIGRATION] - if replica_scenario not in supported_scenarios: + constants.TRANSFER_SCENARIO_REPLICA, + constants.TRANSFER_SCENARIO_LIVE_MIGRATION] + if transfer_scenario not in supported_scenarios: raise exception.InvalidInput( - message=f"Unsupported Replica scenario '{replica_scenario}'. " - f"Must be one of: {supported_scenarios}") + message=f"Unsupported Transfer scenario '{transfer_scenario}'." + f" Must be one of: {supported_scenarios}") origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id) destination_endpoint = self.get_endpoint( ctxt, destination_endpoint_id) self._check_endpoints(ctxt, origin_endpoint, destination_endpoint) - replica = models.Transfer() - replica.id = str(uuid.uuid4()) - replica.base_id = replica.id - replica.scenario = replica_scenario - replica.origin_endpoint_id = origin_endpoint_id - replica.origin_minion_pool_id = origin_minion_pool_id - replica.destination_endpoint_id = destination_endpoint_id - replica.destination_minion_pool_id = destination_minion_pool_id - replica.destination_environment = destination_environment - replica.source_environment = source_environment - replica.last_execution_status = constants.EXECUTION_STATUS_UNEXECUTED - replica.instances = instances - replica.executions = [] - replica.info = {instance: { + transfer = models.Transfer() + transfer.id = str(uuid.uuid4()) + transfer.base_id = transfer.id + transfer.scenario = transfer_scenario + transfer.origin_endpoint_id = origin_endpoint_id + transfer.origin_minion_pool_id = origin_minion_pool_id + transfer.destination_endpoint_id = destination_endpoint_id + transfer.destination_minion_pool_id = destination_minion_pool_id + transfer.destination_environment = destination_environment + transfer.source_environment = source_environment + transfer.last_execution_status = constants.EXECUTION_STATUS_UNEXECUTED + transfer.instances = instances + transfer.executions = [] + transfer.info = {instance: { 'volumes_info': []} for instance in instances} - replica.notes = notes - replica.network_map = network_map - replica.storage_mappings = storage_mappings - replica.instance_osmorphing_minion_pool_mappings = ( + transfer.notes = notes + transfer.network_map = network_map + transfer.storage_mappings = storage_mappings + transfer.instance_osmorphing_minion_pool_mappings = ( instance_osmorphing_minion_pool_mappings) - replica.user_scripts = user_scripts or {} + transfer.user_scripts = user_scripts or {} - self._check_minion_pools_for_action(ctxt, replica) + self._check_minion_pools_for_action(ctxt, transfer) - self._create_reservation_for_replica(replica) + self._create_reservation_for_transfer(transfer) - db_api.add_transfer(ctxt, replica) - LOG.info("Replica created: %s", replica.id) - return self.get_replica(ctxt, replica.id) + db_api.add_transfer(ctxt, transfer) + LOG.info("Transfer created: %s", transfer.id) + return self.get_transfer(ctxt, transfer.id) - def _get_replica(self, ctxt, replica_id, include_task_info=False, - to_dict=False): - replica = db_api.get_transfer( - ctxt, replica_id, include_task_info=include_task_info, + def _get_transfer(self, ctxt, transfer_id, include_task_info=False, + to_dict=False): + transfer = db_api.get_transfer( + ctxt, transfer_id, include_task_info=include_task_info, to_dict=to_dict) - if not replica: + if not transfer: raise exception.NotFound( - "Replica with ID '%s' not found." % replica_id) - return replica - - @migration_synchronized - def get_migration(self, ctxt, migration_id, include_task_info=False): - return self._get_migration( - ctxt, migration_id, include_task_info=include_task_info, - to_dict=True) + "Transfer with ID '%s' not found." % transfer_id) + return transfer def get_deployments(self, ctxt, include_tasks, include_task_info=False): @@ -1336,17 +1320,17 @@ def get_deployments(self, ctxt, include_tasks, @deployment_synchronized def get_deployment(self, ctxt, deployment_id, include_task_info=False): - return self._get_migration( + return self._get_deployment( ctxt, deployment_id, include_task_info=include_task_info, to_dict=True) @staticmethod - def _check_running_replica_migrations(ctxt, replica_id): - migrations = db_api.get_transfer_deployments(ctxt, replica_id) - if [m.id for m in migrations if m.executions[0].status in ( + def _check_running_transfer_deployments(ctxt, transfer_id): + deployments = db_api.get_transfer_deployments(ctxt, transfer_id) + if [m.id for m in deployments if m.executions[0].status in ( constants.ACTIVE_EXECUTION_STATUSES)]: - raise exception.InvalidReplicaState( - "Transfer '%s' is currently being deployed" % replica_id) + raise exception.InvalidTransferState( + "Transfer '%s' is currently being deployed" % transfer_id) @staticmethod def _check_running_executions(action): @@ -1358,25 +1342,25 @@ def _check_running_executions(action): "Another tasks execution is in progress: %s" % ( running_executions)) - def _check_replica_running_executions(self, ctxt, replica): - self._check_running_executions(replica) - self._check_running_replica_migrations(ctxt, replica.id) + def _check_transfer_running_executions(self, ctxt, transfer): + self._check_running_executions(transfer) + self._check_running_transfer_deployments(ctxt, transfer.id) @staticmethod - def _check_valid_replica_tasks_execution(replica, force=False): + def _check_valid_transfer_tasks_execution(transfer, force=False): sorted_executions = sorted( - replica.executions, key=lambda e: e.number, reverse=True) + transfer.executions, key=lambda e: e.number, reverse=True) if not sorted_executions: - raise exception.InvalidReplicaState( - "The Replica has never been executed.") + raise exception.InvalidTransferState( + "The Transfer has never been executed.") if not [e for e in sorted_executions - if e.type == constants.EXECUTION_TYPE_REPLICA_EXECUTION and ( + if e.type == constants.EXECUTION_TYPE_TRANSFER_EXECUTION and ( e.status == constants.EXECUTION_STATUS_COMPLETED)]: if not force: - raise exception.InvalidReplicaState( - "A replica must have been executed successfully at least " - "once in order to be migrated") + raise exception.InvalidTransferState( + "A transfer must have been executed successfully at least " + "once in order to be deployed") def _get_provider_types(self, ctxt, endpoint): provider_types = self.get_available_providers(ctxt).get(endpoint.type) @@ -1385,84 +1369,85 @@ def _get_provider_types(self, ctxt, endpoint): "No provider found for: %s" % endpoint.type) return provider_types["types"] - @replica_synchronized - def deploy_replica_instances( - self, ctxt, replica_id, clone_disks, force, + @transfer_synchronized + def deploy_transfer_instances( + self, ctxt, transfer_id, clone_disks, force, instance_osmorphing_minion_pool_mappings=None, skip_os_morphing=False, user_scripts=None): - replica = self._get_replica(ctxt, replica_id, include_task_info=True) - self._check_replica_running_executions(ctxt, replica) - self._check_valid_replica_tasks_execution(replica, force) - user_scripts = user_scripts or replica.user_scripts + transfer = self._get_transfer( + ctxt, transfer_id, include_task_info=True) + self._check_transfer_running_executions(ctxt, transfer) + self._check_valid_transfer_tasks_execution(transfer, force) + user_scripts = user_scripts or transfer.user_scripts destination_endpoint = self.get_endpoint( - ctxt, replica.destination_endpoint_id) + ctxt, transfer.destination_endpoint_id) destination_provider_types = self._get_provider_types( ctxt, destination_endpoint) - for instance, info in replica.info.items(): + for instance, info in transfer.info.items(): if not info.get("volumes_info"): - raise exception.InvalidReplicaState( - "The replica doesn't contain volumes information for " + raise exception.InvalidTransferState( + "The transfer doesn't contain volumes information for " "instance: %s. If replicated disks are deleted, the " - "replica needs to be executed anew before a migration can " - "occur" % instance) + "transfer needs to be executed anew before a deployment " + "can occur" % instance) - instances = replica.instances + instances = transfer.instances - migration = models.Deployment() - migration.id = str(uuid.uuid4()) - migration.base_id = migration.id - migration.origin_endpoint_id = replica.origin_endpoint_id - migration.destination_endpoint_id = replica.destination_endpoint_id + deployment = models.Deployment() + deployment.id = str(uuid.uuid4()) + deployment.base_id = deployment.id + deployment.origin_endpoint_id = transfer.origin_endpoint_id + deployment.destination_endpoint_id = transfer.destination_endpoint_id # TODO(aznashwan): have these passed separately to the relevant # provider methods instead of through the dest-env: - dest_env = copy.deepcopy(replica.destination_environment) - dest_env['network_map'] = replica.network_map - dest_env['storage_mappings'] = replica.storage_mappings - migration.destination_environment = dest_env - migration.source_environment = replica.source_environment - migration.network_map = replica.network_map - migration.storage_mappings = replica.storage_mappings - migration.instances = instances - migration.replica = replica - migration.info = replica.info - migration.notes = replica.notes - migration.user_scripts = user_scripts - # NOTE: Migrations-from-Replica have no use for the source/target - # pools of the parent Replica so these can be omitted: - migration.origin_minion_pool_id = None - migration.destination_minion_pool_id = None - migration.instance_osmorphing_minion_pool_mappings = ( - replica.instance_osmorphing_minion_pool_mappings) + dest_env = copy.deepcopy(transfer.destination_environment) + dest_env['network_map'] = transfer.network_map + dest_env['storage_mappings'] = transfer.storage_mappings + deployment.destination_environment = dest_env + deployment.source_environment = transfer.source_environment + deployment.network_map = transfer.network_map + deployment.storage_mappings = transfer.storage_mappings + deployment.instances = instances + deployment.transfer = transfer + deployment.info = transfer.info + deployment.notes = transfer.notes + deployment.user_scripts = user_scripts + # NOTE: Deployments have no use for the source/target + # pools of the parent Transfer so these can be omitted: + deployment.origin_minion_pool_id = None + deployment.destination_minion_pool_id = None + deployment.instance_osmorphing_minion_pool_mappings = ( + transfer.instance_osmorphing_minion_pool_mappings) if instance_osmorphing_minion_pool_mappings: - migration.instance_osmorphing_minion_pool_mappings.update( + deployment.instance_osmorphing_minion_pool_mappings.update( instance_osmorphing_minion_pool_mappings) - self._check_minion_pools_for_action(ctxt, migration) - self._check_reservation_for_replica(replica) + self._check_minion_pools_for_action(ctxt, deployment) + self._check_reservation_for_tranfer(transfer) execution = models.TasksExecution() - migration.executions = [execution] + deployment.executions = [execution] execution.status = constants.EXECUTION_STATUS_UNEXECUTED execution.number = 1 - execution.type = constants.EXECUTION_TYPE_REPLICA_DEPLOY + execution.type = constants.EXECUTION_TYPE_TRANSFER_DEPLOY for instance in instances: - migration.info[instance]["clone_disks"] = clone_disks + deployment.info[instance]["clone_disks"] = clone_disks scripts = self._get_instance_scripts(user_scripts, instance) - migration.info[instance]["user_scripts"] = scripts + deployment.info[instance]["user_scripts"] = scripts # NOTE: we default/convert the volumes info to an empty list # to preserve backwards-compatibility with older versions # of Coriolis dating before the scheduling overhaul (PR##114) - if instance not in migration.info: - migration.info[instance] = {'volumes_info': []} + if instance not in deployment.info: + deployment.info[instance] = {'volumes_info': []} # NOTE: we update all of the param values before triggering an - # execution to ensure that the params on the Replica are used - # in case there was a failed Replica update (where the new values + # execution to ensure that the params on the Transfer are used + # in case there was a failed Transfer update (where the new values # could be in the `.info` field instead of the old ones) - migration.info[instance].update({ - "source_environment": migration.source_environment, + deployment.info[instance].update({ + "source_environment": deployment.source_environment, "target_environment": dest_env}) # TODO(aznashwan): have these passed separately to the relevant # provider methods (they're currently passed directly inside @@ -1470,20 +1455,20 @@ def deploy_replica_instances( # "network_map": network_map, # "storage_mappings": storage_mappings, - validate_replica_deployment_inputs_task = self._create_task( + validate_deployment_inputs_task = self._create_task( instance, - constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS, + constants.TASK_TYPE_VALIDATE_DEPLOYMENT_INPUTS, execution) validate_osmorphing_minion_task = None - last_validation_task = validate_replica_deployment_inputs_task + last_validation_task = validate_deployment_inputs_task if not skip_os_morphing and instance in ( - migration.instance_osmorphing_minion_pool_mappings): + deployment.instance_osmorphing_minion_pool_mappings): # NOTE: these values are required for the # _check_execution_tasks_sanity call but # will be populated later when the pool # allocations actually happen: - migration.info[instance].update({ + deployment.info[instance].update({ "osmorphing_minion_machine_id": None, "osmorphing_minion_provider_properties": None, "osmorphing_minion_connection_info": None}) @@ -1491,27 +1476,27 @@ def deploy_replica_instances( instance, constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY, # noqa: E501 execution, depends_on=[ - validate_replica_deployment_inputs_task.id]) + validate_deployment_inputs_task.id]) last_validation_task = validate_osmorphing_minion_task create_snapshot_task = self._create_task( - instance, constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS, + instance, constants.TASK_TYPE_CREATE_TRANSFER_DISK_SNAPSHOTS, execution, depends_on=[ last_validation_task.id]) - deploy_replica_task = self._create_task( + deploy_instance_task = self._create_task( instance, - constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES, + constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES, execution, depends_on=[create_snapshot_task.id]) - depends_on = [deploy_replica_task.id] + depends_on = [deploy_instance_task.id] if not skip_os_morphing: task_deploy_os_morphing_resources = None attach_osmorphing_minion_volumes_task = None last_osmorphing_resources_deployment_task = None if instance in ( - migration.instance_osmorphing_minion_pool_mappings): + deployment.instance_osmorphing_minion_pool_mappings): osmorphing_vol_attachment_deps = [ validate_osmorphing_minion_task.id] osmorphing_vol_attachment_deps.extend(depends_on) @@ -1545,7 +1530,7 @@ def deploy_replica_instances( depends_on = [task_osmorphing.id] if instance in ( - migration.instance_osmorphing_minion_pool_mappings): + deployment.instance_osmorphing_minion_pool_mappings): detach_osmorphing_minion_volumes_task = self._create_task( instance, constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION, # noqa: E501 @@ -1581,13 +1566,13 @@ def deploy_replica_instances( finalize_deployment_task = self._create_task( instance, - constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT, + constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT, execution, depends_on=depends_on) self._create_task( instance, - constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS, + constants.TASK_TYPE_DELETE_TRANSFER_TARGET_DISK_SNAPSHOTS, execution, depends_on=[ create_snapshot_task.id, finalize_deployment_task.id], @@ -1595,43 +1580,43 @@ def deploy_replica_instances( cleanup_deployment_task = self._create_task( instance, - constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT, + constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT, execution, depends_on=[ - deploy_replica_task.id, + deploy_instance_task.id, finalize_deployment_task.id], on_error_only=True) if not clone_disks: self._create_task( instance, - constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS, + constants.TASK_TYPE_RESTORE_TRANSFER_DISK_SNAPSHOTS, execution, depends_on=[cleanup_deployment_task.id], on_error=True) - self._check_execution_tasks_sanity(execution, migration.info) - db_api.add_deployment(ctxt, migration) - LOG.info("Migration created: %s", migration.id) + self._check_execution_tasks_sanity(execution, deployment.info) + db_api.add_deployment(ctxt, deployment) + LOG.info("Deployment created: %s", deployment.id) if not skip_os_morphing and ( - migration.instance_osmorphing_minion_pool_mappings): - # NOTE: we lock on the migration ID to ensure the minion + deployment.instance_osmorphing_minion_pool_mappings): + # NOTE: we lock on the deployment ID to ensure the minion # allocation confirmations don't come in too early: with lockutils.lock( - constants.MIGRATION_LOCK_NAME_FORMAT % migration.id, + constants.DEPLOYMENT_LOCK_NAME_FORMAT % deployment.id, external=True): (self._minion_manager_client - .allocate_minion_machines_for_migration( - ctxt, migration, include_transfer_minions=False, + .allocate_minion_machines_for_deployment( + ctxt, deployment, include_transfer_minions=False, include_osmorphing_minions=True)) self._set_tasks_execution_status( ctxt, execution, constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS) else: - self._begin_tasks(ctxt, migration, execution) + self._begin_tasks(ctxt, deployment, execution) - return self.get_migration(ctxt, migration.id) + return self.get_deployment(ctxt, deployment.id) def _get_instance_scripts(self, user_scripts, instance): user_scripts = user_scripts or {} @@ -1710,162 +1695,163 @@ def _update_task_info_for_minion_allocations( db_api.update_transfer_action_info_for_instance( ctxt, action.id, instance, action.info[instance]) - def _get_last_execution_for_replica(self, ctxt, replica, requery=False): + def _get_last_execution_for_transfer(self, ctxt, transfer, requery=False): if requery: - replica = self._get_replica(ctxt, replica.id) - last_replica_execution = None - if not replica.executions: - raise exception.InvalidReplicaState( - "Replica with ID '%s' has no existing Replica " - "executions." % (replica.id)) - last_replica_execution = sorted( - replica.executions, key=lambda e: e.number)[-1] - return last_replica_execution - - def _get_execution_for_migration(self, ctxt, migration, requery=False): + transfer = self._get_transfer(ctxt, transfer.id) + last_transfer_execution = None + if not transfer.executions: + raise exception.InvalidTransferState( + f"Transfer with ID '{transfer.id}' has no existing " + f"executions.") + last_transfer_execution = sorted( + transfer.executions, key=lambda e: e.number)[-1] + return last_transfer_execution + + def _get_execution_for_deployment(self, ctxt, deployment, requery=False): if requery: - migration = self._get_migration(ctxt, migration.id) - - if not migration.executions: - raise exception.InvalidMigrationState( - "Migration with ID '%s' has no existing executions." % ( - migration.id)) - if len(migration.executions) > 1: - raise exception.InvalidMigrationState( - "Migration with ID '%s' has more than one execution:" - " %s" % (migration.id, [e.id for e in migration.executions])) - return migration.executions[0] - - @replica_synchronized - def confirm_replica_minions_allocation( - self, ctxt, replica_id, minion_machine_allocations): - replica = self._get_replica(ctxt, replica_id, include_task_info=True) + deployment = self._get_deployment(ctxt, deployment.id) + + if not deployment.executions: + raise exception.InvalidDeploymentState( + "Deployment with ID '%s' has no existing executions." % ( + deployment.id)) + if len(deployment.executions) > 1: + raise exception.InvalidDeploymentState( + "Deployment with ID '%s' has more than one execution:" + " %s" % (deployment.id, [e.id for e in deployment.executions])) + return deployment.executions[0] + + @transfer_synchronized + def confirm_transfer_minions_allocation( + self, ctxt, transfer_id, minion_machine_allocations): + transfer = self._get_transfer( + ctxt, transfer_id, include_task_info=True) awaiting_minions_status = ( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS) - if replica.last_execution_status != awaiting_minions_status: - raise exception.InvalidReplicaState( - "Replica is in '%s' status instead of the expected '%s' to " + if transfer.last_execution_status != awaiting_minions_status: + raise exception.InvalidTransferState( + "Transfer is in '%s' status instead of the expected '%s' to " "have minion machines allocated for it." % ( - replica.last_execution_status, awaiting_minions_status)) + transfer.last_execution_status, awaiting_minions_status)) - last_replica_execution = self._get_last_execution_for_replica( - ctxt, replica, requery=False) + last_transfer_execution = self._get_last_execution_for_transfer( + ctxt, transfer, requery=False) self._update_task_info_for_minion_allocations( - ctxt, replica, minion_machine_allocations) + ctxt, transfer, minion_machine_allocations) - last_replica_execution = db_api.get_transfer_tasks_execution( - ctxt, replica.id, last_replica_execution.id) + last_transfer_execution = db_api.get_transfer_tasks_execution( + ctxt, transfer.id, last_transfer_execution.id) self._begin_tasks( - ctxt, replica, last_replica_execution) + ctxt, transfer, last_transfer_execution) - @replica_synchronized - def report_replica_minions_allocation_error( - self, ctxt, replica_id, minion_allocation_error_details): - replica = self._get_replica(ctxt, replica_id) + @transfer_synchronized + def report_transfer_minions_allocation_error( + self, ctxt, transfer_id, minion_allocation_error_details): + transfer = self._get_transfer(ctxt, transfer_id) awaiting_minions_status = ( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS) - if replica.last_execution_status != awaiting_minions_status: - raise exception.InvalidReplicaState( - "Replica is in '%s' status instead of the expected '%s' to " + if transfer.last_execution_status != awaiting_minions_status: + raise exception.InvalidTransferState( + "Transfer is in '%s' status instead of the expected '%s' to " "have minion machines allocations fail for it." % ( - replica.last_execution_status, awaiting_minions_status)) + transfer.last_execution_status, awaiting_minions_status)) - last_replica_execution = self._get_last_execution_for_replica( - ctxt, replica, requery=False) + last_transfer_execution = self._get_last_execution_for_transfer( + ctxt, transfer, requery=False) LOG.warn( - "Error occured while allocating minion machines for Replica '%s'. " - "Cancelling the current Replica Execution ('%s'). Error was: %s", - replica_id, last_replica_execution.id, + "Error occured while allocating minion machines for Transfer '%s'." + " Cancelling the current Transfer Execution ('%s'). Error was: %s", + transfer_id, last_transfer_execution.id, minion_allocation_error_details) self._cancel_tasks_execution( - ctxt, last_replica_execution, requery=True) + ctxt, last_transfer_execution, requery=True) self._set_tasks_execution_status( - ctxt, last_replica_execution, + ctxt, last_transfer_execution, constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS) - @migration_synchronized - def confirm_migration_minions_allocation( - self, ctxt, migration_id, minion_machine_allocations): - migration = self._get_migration( - ctxt, migration_id, include_task_info=True) + @deployment_synchronized + def confirm_deployment_minions_allocation( + self, ctxt, deployment_id, minion_machine_allocations): + deployment = self._get_deployment( + ctxt, deployment_id, include_task_info=True) awaiting_minions_status = ( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS) - if migration.last_execution_status != awaiting_minions_status: - raise exception.InvalidMigrationState( - "Migration is in '%s' status instead of the expected '%s' to " + if deployment.last_execution_status != awaiting_minions_status: + raise exception.InvalidDeploymentState( + "Deployment is in '%s' status instead of the expected '%s' to " "have minion machines allocated for it." % ( - migration.last_execution_status, awaiting_minions_status)) + deployment.last_execution_status, awaiting_minions_status)) - execution = self._get_execution_for_migration( - ctxt, migration, requery=False) + execution = self._get_execution_for_deployment( + ctxt, deployment, requery=False) self._update_task_info_for_minion_allocations( - ctxt, migration, minion_machine_allocations) - self._begin_tasks(ctxt, migration, execution) + ctxt, deployment, minion_machine_allocations) + self._begin_tasks(ctxt, deployment, execution) - @migration_synchronized - def report_migration_minions_allocation_error( - self, ctxt, migration_id, minion_allocation_error_details): - migration = self._get_migration(ctxt, migration_id) + @deployment_synchronized + def report_deployment_minions_allocation_error( + self, ctxt, deployment_id, minion_allocation_error_details): + deployment = self._get_deployment(ctxt, deployment_id) awaiting_minions_status = ( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS) - if migration.last_execution_status != awaiting_minions_status: - raise exception.InvalidMigrationState( - "Migration is in '%s' status instead of the expected '%s' to " + if deployment.last_execution_status != awaiting_minions_status: + raise exception.InvalidDeploymentState( + "Deployment is in '%s' status instead of the expected '%s' to " "have minion machines allocations fail for it." % ( - migration.last_execution_status, awaiting_minions_status)) + deployment.last_execution_status, awaiting_minions_status)) - execution = self._get_execution_for_migration( - ctxt, migration, requery=False) + execution = self._get_execution_for_deployment( + ctxt, deployment, requery=False) LOG.warn( "Error occured while allocating minion machines for " - "Migration '%s'. Cancelling the current Execution ('%s'). " + "Deployment '%s'. Cancelling the current Execution ('%s'). " "Error was: %s", - migration_id, execution.id, minion_allocation_error_details) + deployment_id, execution.id, minion_allocation_error_details) self._cancel_tasks_execution( ctxt, execution, requery=True) self._set_tasks_execution_status( ctxt, execution, constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS) - def _get_migration(self, ctxt, migration_id, include_task_info=False, - to_dict=False): - migration = db_api.get_deployment( - ctxt, migration_id, include_task_info=include_task_info, + def _get_deployment(self, ctxt, deployment_id, include_task_info=False, + to_dict=False): + deployment = db_api.get_deployment( + ctxt, deployment_id, include_task_info=include_task_info, to_dict=to_dict) - if not migration: + if not deployment: raise exception.NotFound( - "Migration with ID '%s' not found." % migration_id) - return migration + "Deployment with ID '%s' not found." % deployment_id) + return deployment - def _delete_migration(self, ctxt, migration_id): - migration = self._get_migration(ctxt, migration_id) - execution = migration.executions[0] + def _delete_deployment(self, ctxt, deployment_id): + deployment = self._get_deployment(ctxt, deployment_id) + execution = deployment.executions[0] if execution.status in constants.ACTIVE_EXECUTION_STATUSES: - raise exception.InvalidMigrationState( - "Cannot delete Migration '%s' as it is currently in " - "'%s' state." % (migration_id, execution.status)) - db_api.delete_deployment(ctxt, migration_id) + raise exception.InvalidDeploymentState( + "Cannot delete Deployment '%s' as it is currently in " + "'%s' state." % (deployment_id, execution.status)) + db_api.delete_deployment(ctxt, deployment_id) @deployment_synchronized def delete_deployment(self, ctxt, deployment_id): - self._delete_migration(ctxt, deployment_id) - - def _cancel_migration(self, ctxt, migration_id, force): - migration = self._get_migration(ctxt, migration_id) - if len(migration.executions) != 1: - raise exception.InvalidMigrationState( - "Migration '%s' has in improper number of tasks " - "executions: %d" % (migration_id, len(migration.executions))) - execution = migration.executions[0] + self._delete_deployment(ctxt, deployment_id) + + def _cancel_deployment(self, ctxt, deployment_id, force): + deployment = self._get_deployment(ctxt, deployment_id) + if len(deployment.executions) != 1: + raise exception.InvalidDeploymentState( + "Deployment '%s' has an improper number of tasks " + "executions: %d" % (deployment_id, len(deployment.executions))) + execution = deployment.executions[0] if execution.status not in constants.ACTIVE_EXECUTION_STATUSES: - raise exception.InvalidMigrationState( - "Migration '%s' is not currently running" % migration_id) + raise exception.InvalidDeploymentState( + "Deployment '%s' is not currently running" % deployment_id) if execution.status == constants.EXECUTION_STATUS_CANCELLING and ( not force): - raise exception.InvalidMigrationState( - "Migration '%s' is already being cancelled. Please use the " + raise exception.InvalidDeploymentState( + "Deployment '%s' is already being cancelled. Please use the " "force option if you'd like to force-cancel it.") with lockutils.lock( @@ -1875,7 +1861,7 @@ def _cancel_migration(self, ctxt, migration_id, force): @deployment_synchronized def cancel_deployment(self, ctxt, deployment_id, force): - self._cancel_migration(ctxt, deployment_id, force) + self._cancel_deployment(ctxt, deployment_id, force) def _cancel_tasks_execution( self, ctxt, execution, requery=True, force=False): @@ -2052,45 +2038,45 @@ def _update_reservation_fulfillment_for_execution(self, ctxt, execution): """ Updates the reservation fulfillment status for the parent transfer action of the given execution based on its type. - Replica transfers are marked as fulfilled as soon as a Replica + Replica transfers are marked as fulfilled as soon as a Transfer Execution is successfully completed. Live migration transfers are marked as fulfilled as soon as they are deployed for the first (and only) time. """ if execution.type not in ( - constants.EXECUTION_TYPE_REPLICA_EXECUTION, - constants.EXECUTION_TYPE_REPLICA_DEPLOY): + constants.EXECUTION_TYPE_TRANSFER_EXECUTION, + constants.EXECUTION_TYPE_DEPLOYMENT): LOG.debug( f"Skipping setting reservation fulfillment for execution " f"'{execution.id}' of type '{execution.type}'.") return if execution.type not in ( - constants.EXECUTION_TYPE_REPLICA_EXECUTION, - constants.EXECUTION_TYPE_REPLICA_DEPLOY): + constants.EXECUTION_TYPE_TRANSFER_EXECUTION, + constants.EXECUTION_TYPE_DEPLOYMENT): LOG.debug( - f"Skipping setting replica fulfillment for execution " + f"Skipping setting transfer fulfillment for execution " f"'{execution.id}' of type '{execution.type}'.") return transfer_action = execution.action transfer_id = transfer_action.base_id - if transfer_action.type == constants.TRANSFER_ACTION_TYPE_MIGRATION: - deployment = self._get_migration(ctxt, transfer_id) + if transfer_action.type == constants.TRANSFER_ACTION_TYPE_DEPLOYMENT: + deployment = self._get_deployment(ctxt, transfer_id) transfer_id = deployment.transfer_id - transfer_action = self._get_replica( + transfer_action = self._get_transfer( ctxt, transfer_id, include_task_info=False) else: - transfer_action = self._get_replica( + transfer_action = self._get_transfer( ctxt, execution.action_id, include_task_info=False) scenario = transfer_action.scenario - if scenario == constants.REPLICA_SCENARIO_REPLICA and ( - execution.type == constants.EXECUTION_TYPE_REPLICA_EXECUTION): + if scenario == constants.TRANSFER_SCENARIO_REPLICA and ( + execution.type == constants.EXECUTION_TYPE_TRANSFER_EXECUTION): self._check_mark_reservation_fulfilled( transfer_action, must_unfulfilled=False) - elif scenario == constants.REPLICA_SCENARIO_LIVE_MIGRATION and ( - execution.type == constants.EXECUTION_TYPE_REPLICA_DEPLOY): + elif scenario == constants.TRANSFER_SCENARIO_LIVE_MIGRATION and ( + execution.type == constants.EXECUTION_TYPE_DEPLOYMENT): self._check_mark_reservation_fulfilled( transfer_action, must_unfulfilled=False) else: @@ -2368,7 +2354,7 @@ def _advance_execution_state( requery=not requery) == ( constants.EXECUTION_STATUS_DEADLOCKED): LOG.error( - "Execution '%s' deadlocked even before Replica state " + "Execution '%s' deadlocked even before Transfer state " "advancement . Cleanup has been perfomed. Returning.", execution.id) return [] @@ -2597,9 +2583,9 @@ def _start_task(task): ctxt, execution, task_statuses=task_statuses) == ( constants.EXECUTION_STATUS_DEADLOCKED): LOG.error( - "Execution '%s' deadlocked after Replica state advancement" - ". Cleanup has been perfomed. Returning early.", - execution.id) + "Execution '%s' deadlocked after Transfer state " + "advancement. Cleanup has been performed. " + "Returning early.", execution.id) return [] LOG.debug( "No new tasks were started for execution '%s'", execution.id) @@ -2623,26 +2609,26 @@ def _start_task(task): return started_tasks - def _update_replica_volumes_info(self, ctxt, replica_id, instance, - updated_task_info): - """ WARN: the lock for the Replica must be pre-acquired. """ + def _update_transfer_volumes_info(self, ctxt, transfer_id, instance, + updated_task_info): + """ WARN: the lock for the Transfer must be pre-acquired. """ db_api.update_transfer_action_info_for_instance( - ctxt, replica_id, instance, + ctxt, transfer_id, instance, updated_task_info) - def _update_volumes_info_for_migration_parent_replica( - self, ctxt, migration_id, instance, updated_task_info): - migration = db_api.get_deployment(ctxt, migration_id) - replica_id = migration.transfer_id + def _update_volumes_info_for_deployment_parent_transfer( + self, ctxt, deployment_id, instance, updated_task_info): + deployment = db_api.get_deployment(ctxt, deployment_id) + transfer_id = deployment.transfer_id with lockutils.lock( - constants.REPLICA_LOCK_NAME_FORMAT % replica_id, + constants.TRANSFER_LOCK_NAME_FORMAT % transfer_id, external=True): LOG.debug( - "Updating volume_info in replica due to snapshot " - "restore during migration. replica id: %s", replica_id) - self._update_replica_volumes_info( - ctxt, replica_id, instance, updated_task_info) + "Updating volume_info in transfer due to snapshot " + "restore during depployment. transfer id: %s", transfer_id) + self._update_transfer_volumes_info( + ctxt, transfer_id, instance, updated_task_info) def _handle_post_task_actions(self, ctxt, task, execution, task_info): task_type = task.task_type @@ -2657,11 +2643,11 @@ def _check_other_tasks_running(execution, current_task): break return still_running - if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS: + if task_type == constants.TASK_TYPE_RESTORE_TRANSFER_DISK_SNAPSHOTS: # When restoring a snapshot in some import providers (OpenStack), # a new volume_id is generated. This needs to be updated in the - # Replica instance as well. + # Transfer instance as well. volumes_info = task_info.get('volumes_info') if not volumes_info: LOG.warn( @@ -2675,30 +2661,28 @@ def _check_other_tasks_running(execution, current_task): task.instance, execution.action_id, task.id, task_type, utils.sanitize_task_info( {'volumes_info': volumes_info})) - self._update_volumes_info_for_migration_parent_replica( + self._update_volumes_info_for_deployment_parent_transfer( ctxt, execution.action_id, task.instance, {"volumes_info": volumes_info}) elif task_type == ( - constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS): + constants.TASK_TYPE_DELETE_TRANSFER_TARGET_DISK_SNAPSHOTS): if not task_info.get("clone_disks"): - # The migration completed. If the replica is executed again, - # new volumes need to be deployed in place of the migrated + # The deployment completed. If the transfer is executed again, + # new volumes need to be created in place of the deployed # ones. LOG.info( - "Unsetting 'volumes_info' for instance '%s' in Replica " - "'%s' after completion of Replica task '%s' (type '%s') " + "Unsetting 'volumes_info' for instance '%s' in Transfer " + "'%s' after completion of Transfer task '%s' (type '%s') " "with clone_disks=False.", task.instance, execution.action_id, task.id, task_type) - self._update_volumes_info_for_migration_parent_replica( + self._update_volumes_info_for_deployment_parent_transfer( ctxt, execution.action_id, task.instance, {"volumes_info": []}) - elif task_type in ( - constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT, - constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT): + elif task_type == constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT: # set 'transfer_result' in the 'base_transfer_action' # table if the task returned a result. if "transfer_result" in task_info: @@ -2724,30 +2708,30 @@ def _check_other_tasks_running(execution, current_task): "No 'transfer_result' was returned for task type '%s' " "for transfer action '%s'", task_type, execution.action_id) elif task_type in ( - constants.TASK_TYPE_UPDATE_SOURCE_REPLICA, - constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA): + constants.TASK_TYPE_UPDATE_SOURCE_TRANSFER, + constants.TASK_TYPE_UPDATE_DESTINATION_TRANSFER): # NOTE: remember to update the `volumes_info`: # NOTE: considering this method is only called with a lock on the - # `execution.action_id` (in a Replica update tasks' case that's the - # ID of the Replica itself) we can safely call - # `_update_replica_volumes_info` below: - self._update_replica_volumes_info( + # `execution.action_id` (in a Transfer update tasks' case that's + # the ID of the Transfer itself) we can safely call + # `_update_transfer_volumes_info` below: + self._update_transfer_volumes_info( ctxt, execution.action_id, task.instance, {"volumes_info": task_info.get("volumes_info", [])}) - if task_type == constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA: + if task_type == constants.TASK_TYPE_UPDATE_DESTINATION_TRANSFER: # check if this was the last task in the update execution: still_running = _check_other_tasks_running(execution, task) if not still_running: # it means this was the last update task in the Execution - # and we may safely update the params of the Replica + # and we may safely update the params of the Transfer # as they are in the DB: LOG.info( - "All tasks of the '%s' Replica update procedure have " + "All tasks of the '%s' Transfer update procedure have " "completed successfully. Setting the updated " - "parameter values on the parent Replica itself.", + "parameter values on the parent Transfer itself.", execution.action_id) - # NOTE: considering all the instances of the Replica get + # NOTE: considering all the instances of the Transfer get # the same params, it doesn't matter which instance's # update task finishes last: db_api.update_transfer( @@ -3077,7 +3061,7 @@ def confirm_task_cancellation(self, ctxt, task_id, cancellation_details): "confirmation of its cancellation.", task.id, task.status, final_status) execution = db_api.get_tasks_execution(ctxt, task.execution_id) - if execution.type == constants.EXECUTION_TYPE_MIGRATION: + if execution.type == constants.EXECUTION_TYPE_DEPLOYMENT: action = db_api.get_action( ctxt, execution.action_id, include_task_info=False) self._check_delete_reservation_for_transfer(action) @@ -3183,7 +3167,7 @@ def set_task_error(self, ctxt, task_id, exception_details): "connection info. Original error was: %s" % ( exception_details))) LOG.warn( - "All subtasks for Migration '%s' have been cancelled " + "All subtasks for Deployment '%s' have been cancelled " "to allow for OSMorphing debugging. The connection " "info for the worker VM is: %s", action_id, action.info.get(task.instance, {}).get( @@ -3239,97 +3223,98 @@ def update_task_progress_update( ctxt, task_id, progress_update_index, new_current_step, new_total_steps=new_total_steps, new_message=new_message) - def _get_replica_schedule(self, ctxt, replica_id, - schedule_id, expired=True): + def _get_transfer_schedule(self, ctxt, transfer_id, + schedule_id, expired=True): schedule = db_api.get_transfer_schedule( - ctxt, replica_id, schedule_id, expired=expired) + ctxt, transfer_id, schedule_id, expired=expired) if not schedule: raise exception.NotFound( - "Schedule with ID '%s' for Replica '%s' not found." % ( - schedule_id, replica_id)) + "Schedule with ID '%s' for Transfer '%s' not found." % ( + schedule_id, transfer_id)) return schedule - def create_replica_schedule(self, ctxt, replica_id, - schedule, enabled, exp_date, - shutdown_instance): + def create_transfer_schedule(self, ctxt, transfer_id, + schedule, enabled, exp_date, + shutdown_instance): keystone.create_trust(ctxt) - replica = self._get_replica(ctxt, replica_id) - replica_schedule = models.TransferSchedule() - replica_schedule.id = str(uuid.uuid4()) - replica_schedule.transfer = replica - replica_schedule.transfer_id = replica_id - replica_schedule.schedule = schedule - replica_schedule.expiration_date = exp_date - replica_schedule.enabled = enabled - replica_schedule.shutdown_instance = shutdown_instance - replica_schedule.trust_id = ctxt.trust_id + transfer = self._get_transfer(ctxt, transfer_id) + transfer_schedule = models.TransferSchedule() + transfer_schedule.id = str(uuid.uuid4()) + transfer_schedule.transfer = transfer + transfer_schedule.transfer_id = transfer_id + transfer_schedule.schedule = schedule + transfer_schedule.expiration_date = exp_date + transfer_schedule.enabled = enabled + transfer_schedule.shutdown_instance = shutdown_instance + transfer_schedule.trust_id = ctxt.trust_id db_api.add_transfer_schedule( - ctxt, replica_schedule, - lambda ctxt, sched: self._replica_cron_client.register( + ctxt, transfer_schedule, + lambda ctxt, sched: self._transfer_cron_client.register( ctxt, sched)) - return self.get_replica_schedule( - ctxt, replica_id, replica_schedule.id) + return self.get_transfer_schedule( + ctxt, transfer_id, transfer_schedule.id) @schedule_synchronized - def update_replica_schedule(self, ctxt, replica_id, schedule_id, - updated_values): + def update_transfer_schedule(self, ctxt, transfer_id, schedule_id, + updated_values): db_api.update_transfer_schedule( - ctxt, replica_id, schedule_id, updated_values, None, - lambda ctxt, sched: self._replica_cron_client.register( + ctxt, transfer_id, schedule_id, updated_values, None, + lambda ctxt, sched: self._transfer_cron_client.register( ctxt, sched)) - return self._get_replica_schedule(ctxt, replica_id, schedule_id) + return self._get_transfer_schedule(ctxt, transfer_id, schedule_id) def _cleanup_schedule_resources(self, ctxt, schedule): - self._replica_cron_client.unregister(ctxt, schedule) + self._transfer_cron_client.unregister(ctxt, schedule) if schedule.trust_id: tmp_trust = context.get_admin_context( trust_id=schedule.trust_id) keystone.delete_trust(tmp_trust) @schedule_synchronized - def delete_replica_schedule(self, ctxt, replica_id, schedule_id): - replica = self._get_replica(ctxt, replica_id) - replica_status = replica.last_execution_status + def delete_transfer_schedule(self, ctxt, transfer_id, schedule_id): + transfer = self._get_transfer(ctxt, transfer_id) + transfer_status = transfer.last_execution_status valid_statuses = list(itertools.chain( constants.FINALIZED_EXECUTION_STATUSES, [constants.EXECUTION_STATUS_UNEXECUTED])) - if replica_status not in valid_statuses: - raise exception.InvalidReplicaState( - 'Replica Schedule cannot be deleted while the Replica is in ' - '%s state. Please wait for the Replica execution to finish' % - (replica_status)) + if transfer_status not in valid_statuses: + raise exception.InvalidTransferState( + 'Transfer Schedule cannot be deleted while the Transfer is in ' + '%s state. Please wait for the Transfer execution to finish' % + transfer_status) db_api.delete_transfer_schedule( - ctxt, replica_id, schedule_id, None, + ctxt, transfer_id, schedule_id, None, lambda ctxt, sched: self._cleanup_schedule_resources( ctxt, sched)) - @replica_synchronized - def get_replica_schedules(self, ctxt, replica_id=None, expired=True): + @transfer_synchronized + def get_transfer_schedules(self, ctxt, transfer_id=None, expired=True): return db_api.get_transfer_schedules( - ctxt, transfer_id=replica_id, expired=expired) + ctxt, transfer_id=transfer_id, expired=expired) @schedule_synchronized - def get_replica_schedule(self, ctxt, replica_id, - schedule_id, expired=True): - return self._get_replica_schedule( - ctxt, replica_id, schedule_id, expired=expired) + def get_transfer_schedule(self, ctxt, transfer_id, + schedule_id, expired=True): + return self._get_transfer_schedule( + ctxt, transfer_id, schedule_id, expired=expired) - @replica_synchronized - def update_replica( - self, ctxt, replica_id, updated_properties): - replica = self._get_replica(ctxt, replica_id, include_task_info=True) + @transfer_synchronized + def update_transfer( + self, ctxt, transfer_id, updated_properties): + transfer = self._get_transfer( + ctxt, transfer_id, include_task_info=True) minion_pool_fields = [ "origin_minion_pool_id", "destination_minion_pool_id", "instance_osmorphing_minion_pool_mappings"] if any([mpf in updated_properties for mpf in minion_pool_fields]): - # NOTE: this is just a dummy Replica model to use for validation: + # NOTE: this is just a dummy Transfer model to use for validation: dummy = models.Transfer() - dummy.id = replica.id - dummy.instances = replica.instances - dummy.origin_endpoint_id = replica.origin_endpoint_id - dummy.destination_endpoint_id = replica.destination_endpoint_id + dummy.id = transfer.id + dummy.instances = transfer.instances + dummy.origin_endpoint_id = transfer.origin_endpoint_id + dummy.destination_endpoint_id = transfer.destination_endpoint_id dummy.origin_minion_pool_id = updated_properties.get( 'origin_minion_pool_id') dummy.destination_minion_pool_id = updated_properties.get( @@ -3339,33 +3324,33 @@ def update_replica( 'instance_osmorphing_minion_pool_mappings')) self._check_minion_pools_for_action(ctxt, dummy) - self._check_replica_running_executions(ctxt, replica) - self._check_valid_replica_tasks_execution(replica, force=True) + self._check_transfer_running_executions(ctxt, transfer) + self._check_valid_transfer_tasks_execution(transfer, force=True) if updated_properties.get('user_scripts'): - replica.user_scripts = updated_properties['user_scripts'] + transfer.user_scripts = updated_properties['user_scripts'] execution = models.TasksExecution() execution.id = str(uuid.uuid4()) execution.status = constants.EXECUTION_STATUS_UNEXECUTED - execution.action = replica - execution.type = constants.EXECUTION_TYPE_REPLICA_UPDATE + execution.action = transfer + execution.type = constants.EXECUTION_TYPE_TRANSFER_UPDATE - for instance in replica.instances: + for instance in transfer.instances: LOG.debug( - "Pre-replica-update task_info for instance '%s' of Replica " - "'%s': %s", instance, replica_id, + "Pre-transfer-update task_info for instance '%s' of Transfer " + "'%s': %s", instance, transfer_id, utils.sanitize_task_info( - replica.info[instance])) + transfer.info[instance])) # NOTE: "circular assignment" would lead to a `None` value # so we must operate on a copy: - inst_info_copy = copy.deepcopy(replica.info[instance]) + inst_info_copy = copy.deepcopy(transfer.info[instance]) # NOTE: we update the various values in the task info itself # As a result, the values within the task_info will be the updated # values which will be checked. The old values will be sent to the # tasks through the origin/destination parameters for them to be # compared to the new ones. - # The actual values on the Replica object itself will be set + # The actual values on the Transfer object itself will be set # during _handle_post_task_actions once the final destination-side # update task will be completed. inst_info_copy.update({ @@ -3377,45 +3362,45 @@ def update_replica( if "destination_environment" in updated_properties: inst_info_copy["target_environment"] = updated_properties[ "destination_environment"] - replica.info[instance] = inst_info_copy + transfer.info[instance] = inst_info_copy LOG.debug( - "Updated task_info for instance '%s' of Replica " + "Updated task_info for instance '%s' of Transfer " "'%s' which will be verified during update procedure: %s", - instance, replica_id, utils.sanitize_task_info( - replica.info[instance])) + instance, transfer_id, utils.sanitize_task_info( + transfer.info[instance])) get_instance_info_task = self._create_task( instance, constants.TASK_TYPE_GET_INSTANCE_INFO, execution) - update_source_replica_task = self._create_task( - instance, constants.TASK_TYPE_UPDATE_SOURCE_REPLICA, + update_source_transfer_task = self._create_task( + instance, constants.TASK_TYPE_UPDATE_SOURCE_TRANSFER, execution) self._create_task( - instance, constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA, + instance, constants.TASK_TYPE_UPDATE_DESTINATION_TRANSFER, execution, depends_on=[ get_instance_info_task.id, # NOTE: the dest-side update task must be done after # the source-side one as both can potentially modify # the 'volumes_info' together: - update_source_replica_task.id]) + update_source_transfer_task.id]) - self._check_execution_tasks_sanity(execution, replica.info) + self._check_execution_tasks_sanity(execution, transfer.info) - # update the action info for all of the instances in the Replica: + # update the action info for all of the instances in the Transfer: for instance in execution.action.instances: db_api.update_transfer_action_info_for_instance( - ctxt, replica.id, instance, replica.info[instance]) + ctxt, transfer.id, instance, transfer.info[instance]) db_api.add_transfer_tasks_execution(ctxt, execution) - LOG.debug("Execution for Replica update tasks created: %s", + LOG.debug("Execution for Transfer update tasks created: %s", execution.id) - self._begin_tasks(ctxt, replica, execution) + self._begin_tasks(ctxt, transfer, execution) - return self.get_replica_tasks_execution( - ctxt, replica_id, execution.id) + return self.get_transfer_tasks_execution( + ctxt, transfer_id, execution.id) def get_diagnostics(self, ctxt): diagnostics = utils.get_diagnostics_info() diff --git a/coriolis/constants.py b/coriolis/constants.py index 0d8068b92..b9e16c155 100644 --- a/coriolis/constants.py +++ b/coriolis/constants.py @@ -3,8 +3,8 @@ DEFAULT_CORIOLIS_REGION_NAME = "Default Region" -REPLICA_SCENARIO_REPLICA = "replica" -REPLICA_SCENARIO_LIVE_MIGRATION = "live_migration" +TRANSFER_SCENARIO_REPLICA = "replica" +TRANSFER_SCENARIO_LIVE_MIGRATION = "live_migration" EXECUTION_STATUS_UNEXECUTED = "UNEXECUTED" EXECUTION_STATUS_RUNNING = "RUNNING" @@ -82,62 +82,36 @@ TASK_STATUS_FAILED_TO_CANCEL ] -TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = ( - "DEPLOY_MIGRATION_SOURCE_RESOURCES") -TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES = ( - "DEPLOY_MIGRATION_TARGET_RESOURCES") -TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES = ( - "DELETE_MIGRATION_SOURCE_RESOURCES") -TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES = ( - "DELETE_MIGRATION_TARGET_RESOURCES") -TASK_TYPE_DEPLOY_INSTANCE_RESOURCES = "DEPLOY_INSTANCE_RESOURCES" TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT = "FINALIZE_INSTANCE_DEPLOYMENT" -TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT = ( - "CLEANUP_FAILED_INSTANCE_DEPLOYMENT") -TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE = ( - "CLEANUP_INSTANCE_SOURCE_STORAGE") -TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE = ( - "CLEANUP_INSTANCE_TARGET_STORAGE") - -TASK_TYPE_CREATE_INSTANCE_DISKS = "CREATE_INSTANCE_DISKS" - TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES = "DEPLOY_OS_MORPHING_RESOURCES" TASK_TYPE_OS_MORPHING = "OS_MORPHING" TASK_TYPE_DELETE_OS_MORPHING_RESOURCES = "DELETE_OS_MORPHING_RESOURCES" TASK_TYPE_GET_INSTANCE_INFO = "GET_INSTANCE_INFO" -TASK_TYPE_DEPLOY_REPLICA_DISKS = "DEPLOY_REPLICA_DISKS" -TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS = ( - "DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS") -TASK_TYPE_DELETE_REPLICA_DISKS = "DELETE_REPLICA_DISKS" +TASK_TYPE_DEPLOY_TRANSFER_DISKS = "DEPLOY_TRANSFER_DISKS" +TASK_TYPE_DELETE_TRANSFER_SOURCE_DISK_SNAPSHOTS = ( + "DELETE_TRANSFER_SOURCE_DISK_SNAPSHOTS") +TASK_TYPE_DELETE_TRANSFER_DISKS = "DELETE_TRANSFER_DISKS" TASK_TYPE_REPLICATE_DISKS = "REPLICATE_DISKS" -TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES = "DEPLOY_REPLICA_SOURCE_RESOURCES" -TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES = "DELETE_REPLICA_SOURCE_RESOURCES" -TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES = "DEPLOY_REPLICA_TARGET_RESOURCES" -TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES = "DELETE_REPLICA_TARGET_RESOURCES" +TASK_TYPE_DEPLOY_TRANSFER_SOURCE_RESOURCES = "DEPLOY_TRANSFER_SOURCE_RESOURCES" +TASK_TYPE_DELETE_TRANSFER_SOURCE_RESOURCES = "DELETE_TRANSFER_SOURCE_RESOURCES" +TASK_TYPE_DEPLOY_TRANSFER_TARGET_RESOURCES = "DEPLOY_TRANSFER_TARGET_RESOURCES" +TASK_TYPE_DELETE_TRANSFER_TARGET_RESOURCES = "DELETE_TRANSFER_TARGET_RESOURCES" TASK_TYPE_SHUTDOWN_INSTANCE = "SHUTDOWN_INSTANCE" -TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES = ( - "DEPLOY_REPLICA_INSTANCE_RESOURCES") -TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT = ( - "FINALIZE_REPLICA_INSTANCE_DEPLOYMENT") -TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT = ( - "CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT") -TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS = "CREATE_REPLICA_DISK_SNAPSHOTS" -TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS = ( - "DELETE_REPLICA_TARGET_DISK_SNAPSHOTS") -TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS = "RESTORE_REPLICA_DISK_SNAPSHOTS" +TASK_TYPE_DEPLOY_INSTANCE_RESOURCES = "DEPLOY_INSTANCE_RESOURCES" +TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT = ( + "CLEANUP_FAILED_INSTANCE_DEPLOYMENT") +TASK_TYPE_CREATE_TRANSFER_DISK_SNAPSHOTS = "CREATE_TRANSFER_DISK_SNAPSHOTS" +TASK_TYPE_DELETE_TRANSFER_TARGET_DISK_SNAPSHOTS = ( + "DELETE_TRANSFER_TARGET_DISK_SNAPSHOTS") +TASK_TYPE_RESTORE_TRANSFER_DISK_SNAPSHOTS = "RESTORE_TRANSFER_DISK_SNAPSHOTS" TASK_TYPE_GET_OPTIMAL_FLAVOR = "GET_OPTIMAL_FLAVOR" -TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS = ( - "VALIDATE_MIGRATION_SOURCE_INPUTS") -TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS = ( - "VALIDATE_MIGRATION_DESTINATION_INPUTS") -TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS = "VALIDATE_REPLICA_SOURCE_INPUTS" -TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS = ( - "VALIDATE_REPLICA_DESTINATION_INPUTS") -TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = ( - "VALIDATE_REPLICA_DEPLOYMENT_INPUTS") -TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA" -TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA" +TASK_TYPE_VALIDATE_TRANSFER_SOURCE_INPUTS = "VALIDATE_TRANSFER_SOURCE_INPUTS" +TASK_TYPE_VALIDATE_TRANSFER_DESTINATION_INPUTS = ( + "VALIDATE_TRANSFER_DESTINATION_INPUTS") +TASK_TYPE_VALIDATE_DEPLOYMENT_INPUTS = "VALIDATE_DEPLOYMENT_INPUTS" +TASK_TYPE_UPDATE_SOURCE_TRANSFER = "UPDATE_SOURCE_TRANSFER" +TASK_TYPE_UPDATE_DESTINATION_TRANSFER = "UPDATE_DESTINATION_TRANSFER" TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS = ( "VALIDATE_SOURCE_MINION_POOL_ENVIRONMENT_OPTIONS") @@ -185,7 +159,6 @@ TASK_TYPE_POWER_ON_DESTINATION_MINION = "POWER_ON_DESTINATION_MINION" TASK_TYPE_POWER_OFF_DESTINATION_MINION = "POWER_OFF_DESTINATION_MINION" - MINION_POOL_OPERATIONS_TASKS = [ TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS, TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS, @@ -284,14 +257,13 @@ COMPRESSION_FORMAT_ZLIB ] -TRANSFER_ACTION_TYPE_MIGRATION = "migration" -TRANSFER_ACTION_TYPE_REPLICA = "replica" +TRANSFER_ACTION_TYPE_DEPLOYMENT = "deployment" +TRANSFER_ACTION_TYPE_TRANSFER = "transfer" -EXECUTION_TYPE_REPLICA_EXECUTION = "replica_execution" -EXECUTION_TYPE_REPLICA_DISKS_DELETE = "replica_disks_delete" -EXECUTION_TYPE_REPLICA_DEPLOY = "replica_deploy" -EXECUTION_TYPE_MIGRATION = "migration" -EXECUTION_TYPE_REPLICA_UPDATE = "replica_update" +EXECUTION_TYPE_TRANSFER_EXECUTION = "transfer_execution" +EXECUTION_TYPE_TRANSFER_DISKS_DELETE = "transfer_disks_delete" +EXECUTION_TYPE_DEPLOYMENT = "deployment" +EXECUTION_TYPE_TRANSFER_UPDATE = "transfer_update" EXECUTION_TYPE_MINION_POOL_MAINTENANCE = "minion_pool_maintenance" EXECUTION_TYPE_MINION_POOL_UPDATE = "minion_pool_update" EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES = ( @@ -306,10 +278,8 @@ TASKFLOW_LOCK_NAME_FORMAT = "taskflow-%s" EXECUTION_LOCK_NAME_FORMAT = "execution-%s" ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s" -MIGRATION_LOCK_NAME_FORMAT = "migration-%s" -# NOTE(aznashwan): intentionately left identical to Migration locks. -DEPLOYMENT_LOCK_NAME_FORMAT = "migration-%s" -REPLICA_LOCK_NAME_FORMAT = "replica-%s" +DEPLOYMENT_LOCK_NAME_FORMAT = "deployment-%s" +TRANSFER_LOCK_NAME_FORMAT = "transfer-%s" SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s" REGION_LOCK_NAME_FORMAT = "region-%s" SERVICE_LOCK_NAME_FORMAT = "service-%s" @@ -317,11 +287,10 @@ MINION_MACHINE_LOCK_NAME_FORMAT = "minion-pool-%s-machine-%s" EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = { - EXECUTION_TYPE_MIGRATION: MIGRATION_LOCK_NAME_FORMAT, - EXECUTION_TYPE_REPLICA_EXECUTION: REPLICA_LOCK_NAME_FORMAT, - EXECUTION_TYPE_REPLICA_DEPLOY: REPLICA_LOCK_NAME_FORMAT, - EXECUTION_TYPE_REPLICA_UPDATE: REPLICA_LOCK_NAME_FORMAT, - EXECUTION_TYPE_REPLICA_DISKS_DELETE: REPLICA_LOCK_NAME_FORMAT, + EXECUTION_TYPE_TRANSFER_EXECUTION: TRANSFER_LOCK_NAME_FORMAT, + EXECUTION_TYPE_TRANSFER_UPDATE: TRANSFER_LOCK_NAME_FORMAT, + EXECUTION_TYPE_TRANSFER_DISKS_DELETE: TRANSFER_LOCK_NAME_FORMAT, + EXECUTION_TYPE_DEPLOYMENT: DEPLOYMENT_LOCK_NAME_FORMAT, EXECUTION_TYPE_MINION_POOL_MAINTENANCE: MINION_POOL_LOCK_NAME_FORMAT, EXECUTION_TYPE_MINION_POOL_UPDATE: MINION_POOL_LOCK_NAME_FORMAT, EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES: ( @@ -340,7 +309,7 @@ CONDUCTOR_MAIN_MESSAGING_TOPIC = "coriolis_conductor" WORKER_MAIN_MESSAGING_TOPIC = "coriolis_worker" SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler" -REPLICA_CRON_MAIN_MESSAGING_TOPIC = "coriolis_replica_cron_worker" +TRANSFER_CRON_MAIN_MESSAGING_TOPIC = "coriolis_transfer_cron_worker" MINION_MANAGER_MAIN_MESSAGING_TOPIC = "coriolis_minion_manager" MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE = "delete" diff --git a/coriolis/db/sqlalchemy/models.py b/coriolis/db/sqlalchemy/models.py index 2f101be77..638d13d96 100644 --- a/coriolis/db/sqlalchemy/models.py +++ b/coriolis/db/sqlalchemy/models.py @@ -332,7 +332,7 @@ class Transfer(BaseTransferAction): 'base_transfer_action.base_id'), primary_key=True) scenario = sqlalchemy.Column( sqlalchemy.String(255), nullable=False, - default=constants.REPLICA_SCENARIO_REPLICA) + default=constants.TRANSFER_SCENARIO_REPLICA) __mapper_args__ = { 'polymorphic_identity': 'transfer', diff --git a/coriolis/diagnostics/api.py b/coriolis/diagnostics/api.py index 364259303..aaa865a85 100644 --- a/coriolis/diagnostics/api.py +++ b/coriolis/diagnostics/api.py @@ -2,7 +2,7 @@ # All Rights Reserved. from coriolis.conductor.rpc import client as conductor_rpc -from coriolis.replica_cron.rpc import client as cron_rpc +from coriolis.transfer_cron.rpc import client as cron_rpc from coriolis import utils from coriolis.worker.rpc import client as worker_rpc @@ -10,7 +10,7 @@ class API(object): def __init__(self): self._conductor_cli = conductor_rpc.ConductorClient() - self._cron_cli = cron_rpc.ReplicaCronClient() + self._cron_cli = cron_rpc.TransferCronClient() self._worker_cli = worker_rpc.WorkerClient() def get(self, ctxt): diff --git a/coriolis/exception.py b/coriolis/exception.py index 8e0a54d62..fc7f3a661 100644 --- a/coriolis/exception.py +++ b/coriolis/exception.py @@ -235,12 +235,16 @@ class InvalidActionTasksExecutionState(Invalid): message = _("Invalid tasks execution state: %(reason)s") -class InvalidMigrationState(Invalid): - message = _("Invalid migration state: %(reason)s") +class InvalidDeploymentState(Invalid): + message = _("Invalid deployment state: %(reason)s") -class InvalidReplicaState(Invalid): - message = _("Invalid replica state: %(reason)s") +class InvalidTasksExecutionState(Invalid): + message = _("Invalid tasks execution state: %(reason)s") + + +class InvalidTransferState(Invalid): + message = _("Invalid transfer state: %(reason)s") class InvalidInstanceState(Invalid): diff --git a/coriolis/minion_manager/rpc/client.py b/coriolis/minion_manager/rpc/client.py index 4803047d8..1de89e2db 100644 --- a/coriolis/minion_manager/rpc/client.py +++ b/coriolis/minion_manager/rpc/client.py @@ -67,12 +67,12 @@ def validate_minion_pool_selections_for_action(self, ctxt, action): ctxt, 'validate_minion_pool_selections_for_action', action=action) - def allocate_minion_machines_for_replica( + def allocate_minion_machines_for_transfer( self, ctxt, replica): return self._cast( ctxt, 'allocate_minion_machines_for_replica', replica=replica) - def allocate_minion_machines_for_migration( + def allocate_minion_machines_for_deployment( self, ctxt, migration, include_transfer_minions=True, include_osmorphing_minions=True): return self._cast( diff --git a/coriolis/minion_manager/rpc/server.py b/coriolis/minion_manager/rpc/server.py index d639cd7f0..005c96042 100644 --- a/coriolis/minion_manager/rpc/server.py +++ b/coriolis/minion_manager/rpc/server.py @@ -515,7 +515,7 @@ def allocate_minion_machines_for_replica( try: self._run_machine_allocation_subflow_for_action( ctxt, replica, - constants.TRANSFER_ACTION_TYPE_REPLICA, + constants.TRANSFER_ACTION_TYPE_TRANSFER, include_transfer_minions=True, include_osmorphing_minions=False) except Exception as ex: @@ -539,7 +539,7 @@ def allocate_minion_machines_for_migration( try: self._run_machine_allocation_subflow_for_action( ctxt, migration, - constants.TRANSFER_ACTION_TYPE_MIGRATION, + constants.TRANSFER_ACTION_TYPE_DEPLOYMENT, include_transfer_minions=include_transfer_minions, include_osmorphing_minions=include_osmorphing_minions) except Exception as ex: @@ -779,7 +779,7 @@ def _run_machine_allocation_subflow_for_action( machine_action_allocation_subflow_name_format = None allocation_failure_reporting_task_class = None allocation_confirmation_reporting_task_class = None - if action_type == constants.TRANSFER_ACTION_TYPE_MIGRATION: + if action_type == constants.TRANSFER_ACTION_TYPE_DEPLOYMENT: allocation_flow_name_format = ( (minion_mgr_tasks. MINION_POOL_MIGRATION_ALLOCATION_FLOW_NAME_FORMAT)) @@ -793,7 +793,7 @@ def _run_machine_allocation_subflow_for_action( machine_action_allocation_subflow_name_format = ( (minion_mgr_tasks. MINION_POOL_ALLOCATE_MACHINES_FOR_MIGRATION_SUBFLOW_NAME_FORMAT)) # noqa: E501 - elif action_type == constants.TRANSFER_ACTION_TYPE_REPLICA: + elif action_type == constants.TRANSFER_ACTION_TYPE_TRANSFER: allocation_flow_name_format = ( (minion_mgr_tasks. MINION_POOL_REPLICA_ALLOCATION_FLOW_NAME_FORMAT)) diff --git a/coriolis/minion_manager/rpc/tasks.py b/coriolis/minion_manager/rpc/tasks.py index a3daef2ab..67116e089 100644 --- a/coriolis/minion_manager/rpc/tasks.py +++ b/coriolis/minion_manager/rpc/tasks.py @@ -379,8 +379,8 @@ def _check_minion_properties( raise exception.MinionMachineAllocationFailure( msg) from ex except ( - exception.InvalidMigrationState, - exception.InvalidReplicaState) as ex: + exception.InvalidDeploymentState, + exception.InvalidTransferState) as ex: msg = ( "The Conductor has refused minion machine allocations for " "%s with ID '%s' as it is purportedly in an invalid state " diff --git a/coriolis/scheduler/scheduler_utils.py b/coriolis/scheduler/scheduler_utils.py index 00b30a79b..96be58bbe 100644 --- a/coriolis/scheduler/scheduler_utils.py +++ b/coriolis/scheduler/scheduler_utils.py @@ -8,7 +8,7 @@ from coriolis import constants from coriolis.db import api as db_api from coriolis import exception -from coriolis.replica_cron.rpc import client as rpc_cron_client +from coriolis.transfer_cron.rpc import client as rpc_cron_client from coriolis.scheduler.rpc import client as rpc_scheduler_client from coriolis.worker.rpc import client as rpc_worker_client @@ -21,8 +21,8 @@ constants.WORKER_MAIN_MESSAGING_TOPIC: rpc_worker_client.WorkerClient, constants.SCHEDULER_MAIN_MESSAGING_TOPIC: ( rpc_scheduler_client.SchedulerClient), - constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC: ( - rpc_cron_client.ReplicaCronClient) + constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC: ( + rpc_cron_client.TransferCronClient) } diff --git a/coriolis/tasks/factory.py b/coriolis/tasks/factory.py index a3df2e9d6..2953becca 100644 --- a/coriolis/tasks/factory.py +++ b/coriolis/tasks/factory.py @@ -9,32 +9,10 @@ from coriolis.tasks import replica_tasks _TASKS_MAP = { - constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES: - migration_tasks.DeployMigrationSourceResourcesTask, - constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES: - migration_tasks.DeployMigrationTargetResourcesTask, - constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES: - migration_tasks.DeleteMigrationSourceResourcesTask, - constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES: - migration_tasks.DeleteMigrationTargetResourcesTask, - constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES: - migration_tasks.DeployInstanceResourcesTask, constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT: migration_tasks.FinalizeInstanceDeploymentTask, - constants.TASK_TYPE_CREATE_INSTANCE_DISKS: - migration_tasks.CreateInstanceDisksTask, - constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT: - migration_tasks.CleanupFailedInstanceDeploymentTask, - constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE: - migration_tasks.CleanupInstanceTargetStorageTask, - constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE: - migration_tasks.CleanupInstanceSourceStorageTask, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR: migration_tasks.GetOptimalFlavorTask, - constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS: - migration_tasks.ValidateMigrationSourceInputsTask, - constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS: - migration_tasks.ValidateMigrationDestinationInputsTask, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES: osmorphing_tasks.DeployOSMorphingResourcesTask, constants.TASK_TYPE_OS_MORPHING: @@ -47,41 +25,39 @@ replica_tasks.ReplicateDisksTask, constants.TASK_TYPE_SHUTDOWN_INSTANCE: replica_tasks.ShutdownInstanceTask, - constants.TASK_TYPE_DEPLOY_REPLICA_DISKS: + constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS: replica_tasks.DeployReplicaDisksTask, - constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS: + constants.TASK_TYPE_DELETE_TRANSFER_SOURCE_DISK_SNAPSHOTS: replica_tasks.DeleteReplicaSourceDiskSnapshotsTask, - constants.TASK_TYPE_DELETE_REPLICA_DISKS: + constants.TASK_TYPE_DELETE_TRANSFER_DISKS: replica_tasks.DeleteReplicaDisksTask, - constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES: + constants.TASK_TYPE_DEPLOY_TRANSFER_TARGET_RESOURCES: replica_tasks.DeployReplicaTargetResourcesTask, - constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES: + constants.TASK_TYPE_DELETE_TRANSFER_TARGET_RESOURCES: replica_tasks.DeleteReplicaTargetResourcesTask, - constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES: + constants.TASK_TYPE_DEPLOY_TRANSFER_SOURCE_RESOURCES: replica_tasks.DeployReplicaSourceResourcesTask, - constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES: + constants.TASK_TYPE_DELETE_TRANSFER_SOURCE_RESOURCES: replica_tasks.DeleteReplicaSourceResourcesTask, - constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES: + constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES: replica_tasks.DeployReplicaInstanceResourcesTask, - constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT: - replica_tasks.FinalizeReplicaInstanceDeploymentTask, - constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT: + constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT: replica_tasks.CleanupFailedReplicaInstanceDeploymentTask, - constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS: + constants.TASK_TYPE_CREATE_TRANSFER_DISK_SNAPSHOTS: replica_tasks.CreateReplicaDiskSnapshotsTask, - constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS: + constants.TASK_TYPE_DELETE_TRANSFER_TARGET_DISK_SNAPSHOTS: replica_tasks.DeleteReplicaTargetDiskSnapshotsTask, - constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS: + constants.TASK_TYPE_RESTORE_TRANSFER_DISK_SNAPSHOTS: replica_tasks.RestoreReplicaDiskSnapshotsTask, - constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS: + constants.TASK_TYPE_VALIDATE_TRANSFER_SOURCE_INPUTS: replica_tasks.ValidateReplicaExecutionSourceInputsTask, - constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS: + constants.TASK_TYPE_VALIDATE_TRANSFER_DESTINATION_INPUTS: replica_tasks.ValidateReplicaExecutionDestinationInputsTask, - constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS: + constants.TASK_TYPE_VALIDATE_DEPLOYMENT_INPUTS: replica_tasks.ValidateReplicaDeploymentParametersTask, - constants.TASK_TYPE_UPDATE_SOURCE_REPLICA: + constants.TASK_TYPE_UPDATE_SOURCE_TRANSFER: replica_tasks.UpdateSourceReplicaTask, - constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA: + constants.TASK_TYPE_UPDATE_DESTINATION_TRANSFER: replica_tasks.UpdateDestinationReplicaTask, constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS: minion_pool_tasks.ValidateSourceMinionPoolOptionsTask, diff --git a/coriolis/tasks/replica_tasks.py b/coriolis/tasks/replica_tasks.py index f2c195dc2..b2c06964d 100644 --- a/coriolis/tasks/replica_tasks.py +++ b/coriolis/tasks/replica_tasks.py @@ -358,7 +358,7 @@ def _run(self, ctxt, instance, origin, destination, task_info, volumes_info = _get_volumes_info(task_info) target_environment = task_info['target_environment'] - volumes_info = provider.delete_replica_disks( + volumes_info = provider.delete_transfer_disks( ctxt, connection_info, target_environment, volumes_info) if volumes_info: LOG.warn( diff --git a/coriolis/tests/api/v1/test_replica_schedules.py b/coriolis/tests/api/v1/test_replica_schedules.py index c32cc792f..fe10c0483 100644 --- a/coriolis/tests/api/v1/test_replica_schedules.py +++ b/coriolis/tests/api/v1/test_replica_schedules.py @@ -10,7 +10,7 @@ from coriolis.api.v1 import replica_schedules from coriolis.api.v1.views import replica_schedule_view from coriolis import exception -from coriolis.replica_cron import api +from coriolis.transfer_cron import api from coriolis import schemas from coriolis.tests import test_base diff --git a/coriolis/tests/cmd/test_replica_cron.py b/coriolis/tests/cmd/test_replica_cron.py index 7b5e49966..ca9bf3b6b 100644 --- a/coriolis/tests/cmd/test_replica_cron.py +++ b/coriolis/tests/cmd/test_replica_cron.py @@ -6,7 +6,7 @@ from coriolis.cmd import replica_cron from coriolis import constants -from coriolis.replica_cron.rpc import server as rpc_server +from coriolis.transfer_cron.rpc import server as rpc_server from coriolis import service from coriolis.tests import test_base from coriolis import utils @@ -37,7 +37,7 @@ def test_main( mock_setup_logging.assert_called_once() mock_ReplicaCronServerEndpoint.assert_called_once() mock_MessagingService.assert_called_once_with( - constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC, + constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC, [mock_ReplicaCronServerEndpoint.return_value], rpc_server.VERSION, worker_count=1) diff --git a/coriolis/tests/conductor/rpc/test_server.py b/coriolis/tests/conductor/rpc/test_server.py index 0582c990e..6b20bbf20 100644 --- a/coriolis/tests/conductor/rpc/test_server.py +++ b/coriolis/tests/conductor/rpc/test_server.py @@ -1095,11 +1095,11 @@ def test_delete_replica_disks_invalid_state( mock_replica.instances = [mock.sentinel.instance] mock_replica.info = {} delete_replica_disks = testutils.get_wrapped_function( - self.server.delete_replica_disks + self.server.delete_transfer_disks ) self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, delete_replica_disks, self.server, mock.sentinel.context, @@ -1246,7 +1246,7 @@ def test_execute_replica_tasks( def call_execute_replica_tasks(): return testutils\ - .get_wrapped_function(self.server.execute_replica_tasks)( + .get_wrapped_function(self.server.execute_transfer_tasks)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1315,7 +1315,7 @@ def create_task_side_effect( mock_create_task.assert_has_calls([ mock.call( instance, - constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS, + constants.TASK_TYPE_VALIDATE_TRANSFER_SOURCE_INPUTS, mock_tasks_execution.return_value), mock.call( instance, @@ -1323,7 +1323,7 @@ def create_task_side_effect( mock_tasks_execution.return_value), mock.call( instance, - constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS, + constants.TASK_TYPE_VALIDATE_TRANSFER_DESTINATION_INPUTS, mock_tasks_execution.return_value, depends_on=[constants.TASK_TYPE_GET_INSTANCE_INFO]), ]) @@ -1362,7 +1362,7 @@ def create_task_side_effect( if any([has_origin_minion_pool, has_target_minion_pool]): mock_minion_manager_client\ - .allocate_minion_machines_for_replica.assert_called_once_with( + .allocate_minion_machines_for_transfer.assert_called_once_with( mock.sentinel.context, mock_replica, ) @@ -1388,7 +1388,7 @@ def create_task_side_effect( constants.EXECUTION_STATUS_UNEXECUTED) self.assertEqual( mock_tasks_execution.return_value.type, - constants.EXECUTION_TYPE_REPLICA_EXECUTION) + constants.EXECUTION_TYPE_TRANSFER_EXECUTION) self.assertEqual( result, mock_get_replica_tasks_execution.return_value) @@ -1398,7 +1398,7 @@ def test_get_replica_tasks_executions( mock_get_transfer_tasks_executions ): result = testutils.get_wrapped_function( - self.server.get_replica_tasks_executions)( + self.server.get_transfer_tasks_executions)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1424,7 +1424,7 @@ def test_get_replica_tasks_execution( mock_get_transfer_tasks_execution ): result = testutils.get_wrapped_function( - self.server.get_replica_tasks_execution)( + self.server.get_transfer_tasks_execution)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1456,7 +1456,7 @@ def test_delete_replica_tasks_execution( ): def call_delete_replica_tasks_execution(): return testutils.get_wrapped_function( - self.server.delete_replica_tasks_execution)( + self.server.delete_transfer_tasks_execution)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1476,7 +1476,7 @@ def call_delete_replica_tasks_execution(): constants.EXECUTION_STATUS_RUNNING) self.assertRaises( - exception.InvalidMigrationState, + exception.InvalidDeploymentState, call_delete_replica_tasks_execution) @mock.patch.object(server.ConductorServerEndpoint, @@ -1491,7 +1491,7 @@ def test_cancel_replica_tasks_execution( mock_get_replica_tasks_execution.return_value.status = constants\ .EXECUTION_STATUS_RUNNING testutils.get_wrapped_function( - self.server.cancel_replica_tasks_execution)( + self.server.cancel_transfer_tasks_execution)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1512,7 +1512,7 @@ def test_cancel_replica_tasks_execution( mock_get_replica_tasks_execution.return_value.status = constants\ .EXECUTION_STATUS_CANCELLING testutils.get_wrapped_function( - self.server.cancel_replica_tasks_execution)( + self.server.cancel_transfer_tasks_execution)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1538,9 +1538,9 @@ def test_cancel_replica_tasks_execution_status_not_active( mock_get_replica_tasks_execution ): self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, testutils.get_wrapped_function( - self.server.cancel_replica_tasks_execution), + self.server.cancel_transfer_tasks_execution), self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1565,9 +1565,9 @@ def test_cancel_replica_tasks_execution_status_cancelling_no_force( mock_get_replica_tasks_execution.return_value.status = constants\ .EXECUTION_STATUS_CANCELLING self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, testutils.get_wrapped_function( - self.server.cancel_replica_tasks_execution), + self.server.cancel_transfer_tasks_execution), self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1585,7 +1585,7 @@ def test__get_replica_tasks_execution( self, mock_get_transfer_tasks_execution ): - result = self.server._get_replica_tasks_execution( + result = self.server._get_transfer_tasks_execution( mock.sentinel.context, mock.sentinel.transfer_id, mock.sentinel.execution_id, @@ -1611,7 +1611,7 @@ def test__get_replica_tasks_execution_no_execution( mock_get_transfer_tasks_execution.return_value = None self.assertRaises( exception.NotFound, - self.server._get_replica_tasks_execution, + self.server._get_transfer_tasks_execution, mock.sentinel.context, mock.sentinel.transfer_id, mock.sentinel.execution_id, @@ -1628,7 +1628,7 @@ def test__get_replica_tasks_execution_no_execution( @mock.patch.object(db_api, 'get_transfers') def test_get_replicas(self, mock_get_transfers): - result = self.server.get_replicas( + result = self.server.get_transfers( mock.sentinel.context, include_tasks_executions=False, include_task_info=False @@ -1647,7 +1647,7 @@ def test_get_replicas(self, mock_get_transfers): @mock.patch.object(server.ConductorServerEndpoint, '_get_replica') def test_get_replica(self, mock_get_replica): - result = testutils.get_wrapped_function(self.server.get_replica)( + result = testutils.get_wrapped_function(self.server.get_transfer)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -1678,7 +1678,7 @@ def test_delete_replica( mock_check_delete_reservation_for_transfer, mock_delete_transfer, ): - testutils.get_wrapped_function(self.server.delete_replica)( + testutils.get_wrapped_function(self.server.delete_transfer)( self.server, mock.sentinel.context, mock.sentinel.transfer_id @@ -1737,7 +1737,7 @@ def test_delete_replica_disks( ): def call_delete_replica_disks(): return testutils.get_wrapped_function( - self.server.delete_replica_disks)( + self.server.delete_transfer_disks)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, # type: ignore @@ -1788,7 +1788,7 @@ def create_task_side_effect( ) self.assertEqual( mock_tasks_execution.return_value.type, - constants.EXECUTION_TYPE_REPLICA_DISKS_DELETE + constants.EXECUTION_TYPE_TRANSFER_DISKS_DELETE ) for instance in instances: @@ -1797,16 +1797,16 @@ def create_task_side_effect( mock_create_task.assert_has_calls([ mock.call( instance, - constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS, + constants.TASK_TYPE_DELETE_TRANSFER_SOURCE_DISK_SNAPSHOTS, mock_tasks_execution.return_value, ), mock.call( instance, - constants.TASK_TYPE_DELETE_REPLICA_DISKS, + constants.TASK_TYPE_DELETE_TRANSFER_DISKS, mock_tasks_execution.return_value, depends_on=[ constants - .TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS + .TASK_TYPE_DELETE_TRANSFER_SOURCE_DISK_SNAPSHOTS ], ), ]) @@ -1848,7 +1848,7 @@ def create_task_side_effect( instances[1].get.return_value = None self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, call_delete_replica_disks ) @@ -1858,7 +1858,7 @@ def create_task_side_effect( mock_replica.info = {} self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, call_delete_replica_disks ) @@ -1918,9 +1918,9 @@ def test_create_instances_replica( mock_get_endpoint.side_effect = mock.sentinel.origin_endpoint_id, \ mock.sentinel.destination_endpoint_id mock_transfer.return_value = mock.Mock() - result = self.server.create_instances_replica( + result = self.server.create_instances_transfer( mock.sentinel.context, - constants.REPLICA_SCENARIO_REPLICA, + constants.TRANSFER_SCENARIO_REPLICA, mock.sentinel.origin_endpoint_id, mock.sentinel.destination_endpoint_id, mock.sentinel.origin_minion_pool_id, @@ -1987,7 +1987,7 @@ def test_create_instances_replica( @mock.patch.object(db_api, 'get_transfer') def test__get_replica(self, mock_get_replica): - result = self.server._get_replica( + result = self.server._get_transfer( mock.sentinel.context, mock.sentinel.transfer_id, include_task_info=False, @@ -2009,7 +2009,7 @@ def test__get_replica_not_found(self, mock_get_transfer): mock_get_transfer.return_value = None self.assertRaises( exception.NotFound, - self.server._get_replica, + self.server._get_transfer, mock.sentinel.context, mock.sentinel.transfer_id, include_task_info=False, @@ -2056,7 +2056,7 @@ def test_check_running_replica_migrations( constants.EXECUTION_STATUS_ERROR migrations = [migration_1, migration_2] mock_get_transfer_deployments.return_value = migrations - self.server._check_running_replica_migrations( + self.server._check_running_transfer_deployments( mock.sentinel.context, mock.sentinel.transfer_id, ) @@ -2080,8 +2080,8 @@ def test_check_running_replica_migrations_invalid_replica_state( migrations = [migration_1, migration_2] mock_get_transfer_deployments.return_value = migrations self.assertRaises( - exception.InvalidReplicaState, - self.server._check_running_replica_migrations, + exception.InvalidTransferState, + self.server._check_running_transfer_deployments, mock.sentinel.context, mock.sentinel.transfer_id, ) @@ -2122,7 +2122,7 @@ def test_check_replica_running_executions( mock_check_running_replica_migrations ): replica = mock.Mock() - self.server._check_replica_running_executions( + self.server._check_transfer_running_executions( mock.sentinel.context, replica ) @@ -2136,18 +2136,18 @@ def test_check_replica_running_executions( def test_check_valid_replica_tasks_execution(self): execution1 = mock.Mock( number=1, - type=constants.EXECUTION_TYPE_REPLICA_EXECUTION, + type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION, status=constants.EXECUTION_STATUS_COMPLETED, ) execution2 = mock.Mock( number=2, - type=constants.EXECUTION_TYPE_REPLICA_EXECUTION, + type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION, status=constants.EXECUTION_STATUS_COMPLETED, ) mock_replica = mock.Mock( executions=[execution1, execution2] ) - self.server._check_valid_replica_tasks_execution( + self.server._check_valid_transfer_tasks_execution( mock_replica ) @@ -2156,14 +2156,14 @@ def test_check_valid_replica_tasks_execution(self): execution2.status = constants.EXECUTION_STATUS_UNEXECUTED self.assertRaises( - exception.InvalidReplicaState, - self.server._check_valid_replica_tasks_execution, + exception.InvalidTransferState, + self.server._check_valid_transfer_tasks_execution, mock_replica ) # doesn't raise exception if all executions are incomplete # and is forced - self.server._check_valid_replica_tasks_execution( + self.server._check_valid_transfer_tasks_execution( mock_replica, True ) @@ -2172,15 +2172,15 @@ def test_check_valid_replica_tasks_execution(self): execution1.status = constants.EXECUTION_STATUS_COMPLETED execution2.status = constants.EXECUTION_STATUS_UNEXECUTED - self.server._check_valid_replica_tasks_execution( + self.server._check_valid_transfer_tasks_execution( mock_replica ) mock_replica.executions = [] self.assertRaises( - exception.InvalidReplicaState, - self.server._check_valid_replica_tasks_execution, + exception.InvalidTransferState, + self.server._check_valid_transfer_tasks_execution, mock_replica ) @@ -2335,7 +2335,7 @@ def test_deploy_replica_instance( ) def call_deploy_replica_instance(): - return self.server.deploy_replica_instances( + return self.server.deploy_transfer_instances( mock.sentinel.context, mock.sentinel.transfer_id, clone_disks=clone_disks, @@ -2348,7 +2348,7 @@ def call_deploy_replica_instance(): # One of the instances has no volumes info self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, call_deploy_replica_instance, ) @@ -2416,7 +2416,7 @@ def create_task_side_effect( ) self.assertEqual( mock_tasks_execution.return_value.type, - constants.EXECUTION_TYPE_REPLICA_DEPLOY + constants.EXECUTION_TYPE_TRANSFER_DEPLOY ) for instance in mock_get_replica.return_value.instances: @@ -2426,7 +2426,7 @@ def create_task_side_effect( ) mock_create_task.assert_any_call( instance, - constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS, + constants.TASK_TYPE_VALIDATE_DEPLOYMENT_INPUTS, mock_tasks_execution.return_value, ) @@ -2464,7 +2464,7 @@ def create_task_side_effect( external=True, ) mock_minion_manager_client\ - .allocate_minion_machines_for_migration\ + .allocate_minion_machines_for_deployment\ .assert_called_once_with( mock.sentinel.context, mock_deployment.return_value, @@ -2748,7 +2748,7 @@ def test_get_last_execution_for_replica( execution3 = mock.Mock(id=mock.sentinel.execution_id3, number=2) replica.executions = [execution1, execution2, execution3] mock_get_replica.return_value = replica - result = self.server._get_last_execution_for_replica( + result = self.server._get_last_execution_for_transfer( mock.sentinel.context, replica, requery=False @@ -2760,8 +2760,8 @@ def test_get_last_execution_for_replica( mock_get_replica.assert_not_called() replica.executions = None self.assertRaises( - exception.InvalidReplicaState, - self.server._get_last_execution_for_replica, + exception.InvalidTransferState, + self.server._get_last_execution_for_transfer, mock.sentinel.context, replica, requery=True @@ -2780,7 +2780,7 @@ def test_get_execution_for_migration( execution2 = mock.Mock(id=mock.sentinel.execution_id2) migration.executions = [execution1] mock_get_migration.return_value = migration - result = self.server._get_execution_for_migration( + result = self.server._get_execution_for_deployment( mock.sentinel.context, migration, requery=False @@ -2792,8 +2792,8 @@ def test_get_execution_for_migration( mock_get_migration.assert_not_called() migration.executions = [execution1, execution2] self.assertRaises( - exception.InvalidMigrationState, - self.server._get_execution_for_migration, + exception.InvalidDeploymentState, + self.server._get_execution_for_deployment, mock.sentinel.context, migration, requery=True @@ -2802,8 +2802,8 @@ def test_get_execution_for_migration( mock.sentinel.context, mock.sentinel.id) migration.executions = [] self.assertRaises( - exception.InvalidMigrationState, - self.server._get_execution_for_migration, + exception.InvalidDeploymentState, + self.server._get_execution_for_deployment, mock.sentinel.context, migration, requery=False @@ -2828,7 +2828,7 @@ def test_confirm_replica_minions_allocation( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS testutils.get_wrapped_function( - self.server.confirm_replica_minions_allocation)( + self.server.confirm_transfer_minions_allocation)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -2880,9 +2880,9 @@ def test_confirm_replica_minions_allocation_unexpected_status( constants.EXECUTION_STATUS_CANCELED self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, testutils.get_wrapped_function( - self.server.confirm_replica_minions_allocation), + self.server.confirm_transfer_minions_allocation), self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -2917,7 +2917,7 @@ def test_report_replica_minions_allocation_error( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS testutils.get_wrapped_function( - self.server.report_replica_minions_allocation_error)( + self.server.report_transfer_minions_allocation_error)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -2962,9 +2962,9 @@ def test_report_replica_minions_allocation_error_unexpected_status( constants.EXECUTION_STATUS_CANCELED self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, testutils.get_wrapped_function( - self.server.report_replica_minions_allocation_error), + self.server.report_transfer_minions_allocation_error), self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -2996,7 +2996,7 @@ def test_confirm_migration_minions_allocation( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS testutils.get_wrapped_function( - self.server.confirm_migration_minions_allocation)( + self.server.confirm_deployment_minions_allocation)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -3041,9 +3041,9 @@ def test_confirm_migration_minions_allocation_unexpected_status( constants.EXECUTION_STATUS_CANCELED self.assertRaises( - exception.InvalidMigrationState, + exception.InvalidDeploymentState, testutils.get_wrapped_function( - self.server.confirm_migration_minions_allocation), + self.server.confirm_deployment_minions_allocation), self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -3078,7 +3078,7 @@ def test_report_migration_minions_allocation_error( constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS testutils.get_wrapped_function( - self.server.report_migration_minions_allocation_error)( + self.server.report_deployment_minions_allocation_error)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -3123,9 +3123,9 @@ def test_report_migration_minions_allocation_error_unexpected_status( constants.EXECUTION_STATUS_CANCELED self.assertRaises( - exception.InvalidMigrationState, + exception.InvalidDeploymentState, testutils.get_wrapped_function( - self.server.report_migration_minions_allocation_error), + self.server.report_deployment_minions_allocation_error), self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -3251,7 +3251,7 @@ def test__get_migration( self, mock_get_deployment ): - result = self.server._get_migration( + result = self.server._get_deployment( mock.sentinel.context, mock.sentinel.migration_id, include_task_info=False, @@ -3273,7 +3273,7 @@ def test__get_migration( self.assertRaises( exception.NotFound, - self.server._get_migration, + self.server._get_deployment, mock.sentinel.context, mock.sentinel.migration_id, include_task_info=False, @@ -4001,7 +4001,7 @@ def test_update_replica_volumes_info( self, mock_update_transfer_action_info_for_instance ): - self.server._update_replica_volumes_info( + self.server._update_transfer_volumes_info( mock.sentinel.context, mock.sentinel.transfer_id, mock.sentinel.instance, @@ -4028,7 +4028,7 @@ def test_update_volumes_info_for_migration_parent_replica( deployment = mock.Mock() mock_get_deployment.return_value = deployment - self.server._update_volumes_info_for_migration_parent_replica( + self.server._update_volumes_info_for_deployment_parent_transfer( mock.sentinel.context, mock.sentinel.migration_id, mock.sentinel.instance, @@ -4040,7 +4040,7 @@ def test_update_volumes_info_for_migration_parent_replica( mock.sentinel.migration_id ) mock_lock.assert_called_once_with( - constants.REPLICA_LOCK_NAME_FORMAT % + constants.TRANSFER_LOCK_NAME_FORMAT % mock_get_deployment.return_value.transfer_id, external=True ) @@ -4079,7 +4079,7 @@ def test_handle_post_task_actions( ): # TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS task = mock.Mock( - task_type=constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS, + task_type=constants.TASK_TYPE_RESTORE_TRANSFER_DISK_SNAPSHOTS, instance=mock.sentinel.instance, ) execution = mock.Mock( @@ -4119,7 +4119,7 @@ def call_handle_post_task_actions(): # TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS task.task_type = constants\ - .TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS + .TASK_TYPE_DELETE_TRANSFER_TARGET_DISK_SNAPSHOTS call_handle_post_task_actions() # no clone_disks, reset volumes_info mock_update_volumes_info_for_migration_parent_replica\ @@ -4189,8 +4189,8 @@ def call_handle_post_task_actions(): # TASK_TYPE_UPDATE_SOURCE_REPLICA # TASK_TYPE_UPDATE_DESTINATION_REPLICA types = [ - constants.TASK_TYPE_UPDATE_SOURCE_REPLICA, - constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA, + constants.TASK_TYPE_UPDATE_SOURCE_TRANSFER, + constants.TASK_TYPE_UPDATE_DESTINATION_TRANSFER, ] execution.tasks = [ mock.Mock( @@ -4210,7 +4210,7 @@ def call_handle_post_task_actions(): ) # execution has active tasks - task.type = constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA + task.type = constants.TASK_TYPE_UPDATE_DESTINATION_TRANSFER call_handle_post_task_actions() mock_update_transfer.assert_not_called() @@ -4371,7 +4371,7 @@ def test_task_completed( mock_get_tasks_execution.return_value = mock.Mock( id=mock.sentinel.execution_id, - type=constants.EXECUTION_TYPE_MIGRATION, + type=constants.EXECUTION_TYPE_DEPLOYMENT, action_id=mock.sentinel.action_id, tasks=[ mock.Mock( @@ -4500,7 +4500,7 @@ def test_confirm_task_cancellation( expected_final_status = getattr(constants, expected_final_status) mock_get_task.return_value = task mock_execution = mock.MagicMock() - mock_execution.type = constants.EXECUTION_TYPE_MIGRATION + mock_execution.type = constants.EXECUTION_TYPE_DEPLOYMENT mock_get_tasks_execution.return_value = mock_execution testutils.get_wrapped_function(self.server.confirm_task_cancellation)( @@ -4572,7 +4572,7 @@ def test_set_task_error( ): task_status = config['task_status'] mock_get_tasks_execution.return_value = mock.Mock( - type=constants.EXECUTION_TYPE_MIGRATION, + type=constants.EXECUTION_TYPE_DEPLOYMENT, action_id=mock.sentinel.action_id, tasks=[ mock.Mock( @@ -4737,7 +4737,7 @@ def test__get_replica_schedule( self, mock_get_transfer_schedule ): - result = self.server._get_replica_schedule( + result = self.server._get_transfer_schedule( mock.sentinel.context, mock.sentinel.transfer_id, mock.sentinel.schedule_id, @@ -4760,7 +4760,7 @@ def test__get_replica_schedule( self.assertRaises( exception.NotFound, - self.server._get_replica_schedule, + self.server._get_transfer_schedule, mock.sentinel.context, mock.sentinel.transfer_id, mock.sentinel.schedule_id, @@ -4792,7 +4792,7 @@ def test_create_replica_schedule( context.trust_id = mock.sentinel.trust_id mock_transfer_schedule.return_value = transfer_schedule - result = self.server.create_replica_schedule( + result = self.server.create_transfer_schedule( context, mock.sentinel.transfer_id, mock.sentinel.schedule, @@ -4850,7 +4850,7 @@ def test_update_replica_schedule( mock_get_replica_schedule ): result = testutils.get_wrapped_function( - self.server.update_replica_schedule)( + self.server.update_transfer_schedule)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -4928,7 +4928,7 @@ def test_delete_replica_schedule( replica.last_execution_status = constants.EXECUTION_STATUS_COMPLETED mock_get_replica.return_value = replica - testutils.get_wrapped_function(self.server.delete_replica_schedule)( + testutils.get_wrapped_function(self.server.delete_transfer_schedule)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -4952,9 +4952,9 @@ def test_delete_replica_schedule( replica.last_execution_status = constants.EXECUTION_STATUS_RUNNING self.assertRaises( - exception.InvalidReplicaState, + exception.InvalidTransferState, testutils.get_wrapped_function( - self.server.delete_replica_schedule), + self.server.delete_transfer_schedule), self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -4970,7 +4970,7 @@ def test_delete_replica_schedule( @mock.patch.object(db_api, "get_transfer_schedules") def test_get_replica_schedules(self, mock_get_transfer_schedules): result = testutils.get_wrapped_function( - self.server.get_replica_schedules)( + self.server.get_transfer_schedules)( self.server, mock.sentinel.context, replica_id=None, @@ -4990,7 +4990,7 @@ def test_get_replica_schedules(self, mock_get_transfer_schedules): @mock.patch.object(db_api, "get_transfer_schedule") def test_get_replica_schedule(self, mock_get_transfer_schedule): result = testutils.get_wrapped_function( - self.server.get_replica_schedule)( + self.server.get_transfer_schedule)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -5058,7 +5058,7 @@ def test_update_replica( mock_TasksExecution.return_value = execution updated_properties = config.get("updated_properties", {}) - result = testutils.get_wrapped_function(self.server.update_replica)( + result = testutils.get_wrapped_function(self.server.update_transfer)( self.server, mock.sentinel.context, mock.sentinel.transfer_id, @@ -5126,11 +5126,11 @@ def test_update_replica( execution)) create_task_calls.append(mock.call( instance, - constants.TASK_TYPE_UPDATE_SOURCE_REPLICA, + constants.TASK_TYPE_UPDATE_SOURCE_TRANSFER, execution)) create_task_calls.append(mock.call( instance, - constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA, + constants.TASK_TYPE_UPDATE_DESTINATION_TRANSFER, execution, depends_on=mock.ANY)) update_transfer_action_info_for_instance_calls.append( @@ -5594,7 +5594,7 @@ def test_set_task_error_os_morphing( mock_conf_conductor, ): execution = mock.Mock( - type=constants.EXECUTION_TYPE_REPLICA_UPDATE, + type=constants.EXECUTION_TYPE_TRANSFER_UPDATE, action_id=mock.sentinel.action_id, tasks=[ mock.Mock( @@ -5643,7 +5643,7 @@ def test_set_task_error_os_morphing( # migration execution mock_check_delete_reservation_for_transfer.assert_not_called() - execution.type = constants.EXECUTION_TYPE_MIGRATION + execution.type = constants.EXECUTION_TYPE_DEPLOYMENT self.server.set_task_error( mock.sentinel.context, mock.sentinel.task_id, diff --git a/coriolis/tests/minion_manager/rpc/test_client.py b/coriolis/tests/minion_manager/rpc/test_client.py index c2e56936a..1cb3dedcb 100644 --- a/coriolis/tests/minion_manager/rpc/test_client.py +++ b/coriolis/tests/minion_manager/rpc/test_client.py @@ -125,7 +125,7 @@ def test_validate_minion_pool_selections_for_action(self): def test_allocate_minion_machines_for_replica(self): args = {"replica": "test_replica"} self._test( - self.client.allocate_minion_machines_for_replica, args, + self.client.allocate_minion_machines_for_transfer, args, rpc_op='_cast', ) @@ -136,7 +136,7 @@ def test_allocate_minion_machines_for_migration(self): "include_osmorphing_minions": True } self._test( - self.client.allocate_minion_machines_for_migration, args, + self.client.allocate_minion_machines_for_deployment, args, rpc_op='_cast', ) diff --git a/coriolis/tests/minion_manager/rpc/test_tasks.py b/coriolis/tests/minion_manager/rpc/test_tasks.py index e32e56757..7852abee5 100644 --- a/coriolis/tests/minion_manager/rpc/test_tasks.py +++ b/coriolis/tests/minion_manager/rpc/test_tasks.py @@ -465,7 +465,7 @@ def test_execute_raises_exception_when_invalid_migration_state( mock_get_action_label): mock_get_minion_machine.return_value = self.minion_machine mock_confirm_allocation.side_effect = [ - exception.InvalidReplicaState(reason='Invalid state')] + exception.InvalidTransferState(reason='Invalid state')] self.assertRaises( exception.MinionMachineAllocationFailure, diff --git a/coriolis/tests/replica_cron/rpc/test_client.py b/coriolis/tests/replica_cron/rpc/test_client.py index bcb9157c8..9170323bf 100644 --- a/coriolis/tests/replica_cron/rpc/test_client.py +++ b/coriolis/tests/replica_cron/rpc/test_client.py @@ -3,7 +3,7 @@ from unittest import mock -from coriolis.replica_cron.rpc import client as rpc_client +from coriolis.transfer_cron.rpc import client as rpc_client from coriolis.tests import test_base @@ -12,7 +12,7 @@ class ReplicaCronClientTestCase(test_base.CoriolisBaseTestCase): def setUp(self): super(ReplicaCronClientTestCase, self).setUp() - self.client = rpc_client.ReplicaCronClient() + self.client = rpc_client.TransferCronClient() self.ctxt = mock.MagicMock() def test_register(self): diff --git a/coriolis/tests/replica_cron/rpc/test_server.py b/coriolis/tests/replica_cron/rpc/test_server.py index 1fdb3ca38..59071deff 100644 --- a/coriolis/tests/replica_cron/rpc/test_server.py +++ b/coriolis/tests/replica_cron/rpc/test_server.py @@ -9,7 +9,7 @@ from coriolis.conductor.rpc import client as rpc_client from coriolis import exception -from coriolis.replica_cron.rpc import server +from coriolis.transfer_cron.rpc import server from coriolis.tests import test_base @@ -40,7 +40,7 @@ def test__trigger_replica_invalid_replica_state(self): mock_conductor_client = mock.MagicMock() mock_conductor_client.execute_replica_tasks.side_effect = ( - exception.InvalidReplicaState(reason='test_reason')) + exception.InvalidTransferState(reason='test_reason')) with self.assertLogs('coriolis.replica_cron.rpc.server', level=logging.INFO): diff --git a/coriolis/tests/replica_cron/test_api.py b/coriolis/tests/replica_cron/test_api.py index a770b4e24..4ff06fb3a 100644 --- a/coriolis/tests/replica_cron/test_api.py +++ b/coriolis/tests/replica_cron/test_api.py @@ -3,7 +3,7 @@ from unittest import mock -from coriolis.replica_cron import api as replicas_cron_module +from coriolis.transfer_cron import api as replicas_cron_module from coriolis.tests import test_base diff --git a/coriolis/tests/tasks/test_replica_tasks.py b/coriolis/tests/tasks/test_replica_tasks.py index a5d9679b6..75ebec832 100644 --- a/coriolis/tests/tasks/test_replica_tasks.py +++ b/coriolis/tests/tasks/test_replica_tasks.py @@ -270,7 +270,7 @@ def test__run(self, mock_get_vol_info, mock_get_conn_info, destination = mock.MagicMock() task_info = mock.MagicMock() task_info.get.side_effect = [task_info['volumes_info']] - prov_fun = mock_get_provider.return_value.delete_replica_disks + prov_fun = mock_get_provider.return_value.delete_transfer_disks expected_result = {"volumes_info": []} result = self.task_runner._run( diff --git a/coriolis/replica_cron/__init__.py b/coriolis/transfer_cron/__init__.py similarity index 100% rename from coriolis/replica_cron/__init__.py rename to coriolis/transfer_cron/__init__.py diff --git a/coriolis/replica_cron/api.py b/coriolis/transfer_cron/api.py similarity index 100% rename from coriolis/replica_cron/api.py rename to coriolis/transfer_cron/api.py diff --git a/coriolis/replica_cron/rpc/__init__.py b/coriolis/transfer_cron/rpc/__init__.py similarity index 100% rename from coriolis/replica_cron/rpc/__init__.py rename to coriolis/transfer_cron/rpc/__init__.py diff --git a/coriolis/replica_cron/rpc/client.py b/coriolis/transfer_cron/rpc/client.py similarity index 75% rename from coriolis/replica_cron/rpc/client.py rename to coriolis/transfer_cron/rpc/client.py index 593796301..1483e2393 100644 --- a/coriolis/replica_cron/rpc/client.py +++ b/coriolis/transfer_cron/rpc/client.py @@ -9,11 +9,11 @@ VERSION = "1.0" -class ReplicaCronClient(rpc.BaseRPCClient): - def __init__(self, topic=constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC): +class TransferCronClient(rpc.BaseRPCClient): + def __init__(self, topic=constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC): target = messaging.Target( topic=topic, version=VERSION) - super(ReplicaCronClient, self).__init__(target) + super(TransferCronClient, self).__init__(target) def register(self, ctxt, schedule): self._call(ctxt, 'register', schedule=schedule) diff --git a/coriolis/replica_cron/rpc/server.py b/coriolis/transfer_cron/rpc/server.py similarity index 97% rename from coriolis/replica_cron/rpc/server.py rename to coriolis/transfer_cron/rpc/server.py index 1b2d6a3a3..762b118f3 100644 --- a/coriolis/replica_cron/rpc/server.py +++ b/coriolis/transfer_cron/rpc/server.py @@ -19,12 +19,12 @@ def _trigger_replica(ctxt, conductor_client, replica_id, shutdown_instance): try: - execution = conductor_client.execute_replica_tasks( + execution = conductor_client.execute_transfer_tasks( ctxt, replica_id, shutdown_instance) result_msg = 'Execution %s for Replica %s' % ( execution.get('id'), execution.get('action_id')) return result_msg - except (exception.InvalidReplicaState, + except (exception.InvalidTransferState, exception.InvalidActionTasksExecutionState): LOG.info("A replica or migration already running")