diff --git a/haupt/haupt/db/managers/agents.py b/haupt/haupt/db/managers/agents.py index dbdd19c750..54b2ce7417 100644 --- a/haupt/haupt/db/managers/agents.py +++ b/haupt/haupt/db/managers/agents.py @@ -128,7 +128,6 @@ def get_deleting_runs( live_state=LiveState.DELETION_PROGRESSING, status__in=LifeCycle.DONE_VALUES, pending__isnull=True, - managed_by=managed_by, deleted_at__isnull=True, updated_at__lte=now().replace(second=0, microsecond=0) - timedelta(seconds=dj_settings.MIN_ARTIFACTS_DELETION_TIMEDELTA), @@ -179,7 +178,6 @@ def get_deleting_runs( }, live_state=LiveState.DELETION_PROGRESSING, pending__isnull=True, - managed_by=managed_by, ) .exclude(status__in=LifeCycle.DONE_VALUES) .prefetch_related("project") diff --git a/haupt/haupt/db/managers/live_state.py b/haupt/haupt/db/managers/live_state.py index 6edb17c37e..ce4ab7892c 100644 --- a/haupt/haupt/db/managers/live_state.py +++ b/haupt/haupt/db/managers/live_state.py @@ -82,7 +82,12 @@ def confirm_delete_run(run: BaseRun): def confirm_delete_runs(runs: QuerySet, run_ids: List[int] = None): run_ids = run_ids or list(runs.values_list("id", flat=True)) + if not run_ids: + return runs.update(live_state=LiveState.DELETION_PROGRESSING, deleted_at=now()) + Models.Run.all.filter(id__in=run_ids).exclude( + status__in=LifeCycle.DONE_VALUES + ).update(status=V1Statuses.STOPPED) queryset = Models.Run.all.filter( Q(pipeline_id__in=run_ids) | Q(controller_id__in=run_ids) ).exclude(live_state=LiveState.DELETION_PROGRESSING, deleted_at__isnull=False) diff --git a/haupt/haupt/orchestration/crons/deletion.py b/haupt/haupt/orchestration/crons/deletion.py index d253f9106c..f001effa6d 100644 --- a/haupt/haupt/orchestration/crons/deletion.py +++ b/haupt/haupt/orchestration/crons/deletion.py @@ -11,7 +11,7 @@ ) from haupt.db.defs import Models from haupt.db.managers.live_state import confirm_delete_runs -from polyaxon.schemas import LifeCycle, LiveState, V1RunKind +from polyaxon.schemas import LifeCycle, LiveState, V1RunKind, V1Statuses class CronsDeletionManager: @@ -76,6 +76,12 @@ def delete_in_progress_runs(): live_state=LiveState.DELETION_PROGRESSING, deleted_at__lte=last_date ).delete() + # Stop all pipelines in deletion progress + Models.Run.all.filter( + kind__in={V1RunKind.DAG, V1RunKind.MATRIX, V1RunKind.SCHEDULE}, + live_state=LiveState.DELETION_PROGRESSING, + ).exclude(status__in=LifeCycle.DONE_VALUES).update(status=V1Statuses.STOPPED) + @staticmethod def delete_archived_runs(): last_date = get_datetime_from_now(days=conf.get(CLEANING_INTERVALS_ARCHIVES)) diff --git a/haupt/haupt/orchestration/executor/handlers.py b/haupt/haupt/orchestration/executor/handlers.py index bb0977336e..742891e7ef 100644 --- a/haupt/haupt/orchestration/executor/handlers.py +++ b/haupt/haupt/orchestration/executor/handlers.py @@ -158,7 +158,9 @@ def handle_new_artifacts( @classmethod def handle_run_deleted(cls, workers_backend, event: "Event") -> None: # noqa: F821 - run = cls.MANAGER.get_run(run_id=event.instance_id, run=event.instance) + run = cls.MANAGER.get_run( + run_id=event.instance_id, run=event.instance, use_all=True + ) if not run: return