Skip to content

Commit

Permalink
Merge pull request #15816 from sanjaysrikakulam/galaxy_jwd
Browse files Browse the repository at this point in the history
Add clean up job working directory as celery task
  • Loading branch information
mvdbeek authored Nov 18, 2024
2 parents 8c30a87 + 429cddc commit c069907
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import json
import shutil
from concurrent.futures import TimeoutError
from functools import lru_cache
from pathlib import Path
Expand All @@ -21,6 +23,7 @@
from galaxy.config import GalaxyAppConfiguration
from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry as DatatypesRegistry
from galaxy.exceptions import ObjectNotFound
from galaxy.jobs import MinimalJobWrapper
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.datasets import (
Expand Down Expand Up @@ -506,3 +509,35 @@ def dispatch_pending_notifications(notification_manager: NotificationManager):
count = notification_manager.dispatch_pending_notifications_via_channels()
if count:
log.info(f"Successfully dispatched {count} notifications.")


@galaxy_task(action="clean up job working directories")
def cleanup_jwds(sa_session: galaxy_scoped_session, object_store: BaseObjectStore, days: int = 5):
"""Cleanup job working directories for failed jobs that are older than X days"""

def get_failed_jobs():
return sa_session.query(model.Job.id).filter(
model.Job.state == "error",
model.Job.update_time < datetime.datetime.now() - datetime.timedelta(days=days),
model.Job.object_store_id.isnot(None),
)

def delete_jwd(job):
try:
# Get job working directory from object store
path = object_store.get_filename(job, base_dir="job_work", dir_only=True, obj_dir=True)
shutil.rmtree(path)
except ObjectNotFound:
# job working directory already deleted
pass
except OSError as e:
log.error(f"Error deleting job working directory: {path} : {e.strerror}")

failed_jobs = get_failed_jobs()

if not failed_jobs:
log.info("No failed jobs found within the last %s days", days)

for job in failed_jobs:
delete_jwd(job)
log.info("Deleted job working directory for job %s", job.id)

0 comments on commit c069907

Please sign in to comment.