Skip to content

Commit

Permalink
Improve deletion state and cron to handle nested graphs and matrices
Browse files Browse the repository at this point in the history
  • Loading branch information
polyaxon-ci committed Nov 17, 2023
1 parent 69cd418 commit faf0e73
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 4 deletions.
2 changes: 0 additions & 2 deletions haupt/haupt/db/managers/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def get_deleting_runs(
live_state=LiveState.DELETION_PROGRESSING,
status__in=LifeCycle.DONE_VALUES,
pending__isnull=True,
managed_by=managed_by,
deleted_at__isnull=True,
updated_at__lte=now().replace(second=0, microsecond=0)
- timedelta(seconds=dj_settings.MIN_ARTIFACTS_DELETION_TIMEDELTA),
Expand Down Expand Up @@ -179,7 +178,6 @@ def get_deleting_runs(
},
live_state=LiveState.DELETION_PROGRESSING,
pending__isnull=True,
managed_by=managed_by,
)
.exclude(status__in=LifeCycle.DONE_VALUES)
.prefetch_related("project")
Expand Down
5 changes: 5 additions & 0 deletions haupt/haupt/db/managers/live_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ def confirm_delete_run(run: BaseRun):

def confirm_delete_runs(runs: QuerySet, run_ids: List[int] = None):
run_ids = run_ids or list(runs.values_list("id", flat=True))
if not run_ids:
return
runs.update(live_state=LiveState.DELETION_PROGRESSING, deleted_at=now())
Models.Run.all.filter(id__in=run_ids).exclude(
status__in=LifeCycle.DONE_VALUES
).update(status=V1Statuses.STOPPED)
queryset = Models.Run.all.filter(
Q(pipeline_id__in=run_ids) | Q(controller_id__in=run_ids)
).exclude(live_state=LiveState.DELETION_PROGRESSING, deleted_at__isnull=False)
Expand Down
8 changes: 7 additions & 1 deletion haupt/haupt/orchestration/crons/deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from haupt.db.defs import Models
from haupt.db.managers.live_state import confirm_delete_runs
from polyaxon.schemas import LifeCycle, LiveState, V1RunKind
from polyaxon.schemas import LifeCycle, LiveState, V1RunKind, V1Statuses


class CronsDeletionManager:
Expand Down Expand Up @@ -76,6 +76,12 @@ def delete_in_progress_runs():
live_state=LiveState.DELETION_PROGRESSING, deleted_at__lte=last_date
).delete()

# Stop all pipelines in deletion progress
Models.Run.all.filter(
kind__in={V1RunKind.DAG, V1RunKind.MATRIX, V1RunKind.SCHEDULE},
live_state=LiveState.DELETION_PROGRESSING,
).exclude(status__in=LifeCycle.DONE_VALUES).update(status=V1Statuses.STOPPED)

@staticmethod
def delete_archived_runs():
last_date = get_datetime_from_now(days=conf.get(CLEANING_INTERVALS_ARCHIVES))
Expand Down
4 changes: 3 additions & 1 deletion haupt/haupt/orchestration/executor/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ def handle_new_artifacts(

@classmethod
def handle_run_deleted(cls, workers_backend, event: "Event") -> None: # noqa: F821
run = cls.MANAGER.get_run(run_id=event.instance_id, run=event.instance)
run = cls.MANAGER.get_run(
run_id=event.instance_id, run=event.instance, use_all=True
)
if not run:
return

Expand Down

0 comments on commit faf0e73

Please sign in to comment.