From 3b1523ee4272f66196b1184c8ca65192bdcc3036 Mon Sep 17 00:00:00 2001 From: nukima <67093353+nukima@users.noreply.github.com> Date: Wed, 24 Jan 2024 10:36:12 +0700 Subject: [PATCH] Update airflow-db-cleanup.py This should work with airflow 2.x (tested with 2.3.4) --- db-cleanup/airflow-db-cleanup.py | 521 +++++++++++++++++++------------ 1 file changed, 319 insertions(+), 202 deletions(-) diff --git a/db-cleanup/airflow-db-cleanup.py b/db-cleanup/airflow-db-cleanup.py index 36a8b0b..e306fb3 100644 --- a/db-cleanup/airflow-db-cleanup.py +++ b/db-cleanup/airflow-db-cleanup.py @@ -3,35 +3,61 @@ out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid having too much data in your Airflow MetaStore. -airflow trigger_dag --conf '[curly-braces]"maxDBEntryAgeInDays":30[curly-braces]' airflow-db-cleanup - ---conf options: - maxDBEntryAgeInDays: - Optional - +## Authors + +The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup) + +## Usage + +1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME, + ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values + +2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each + dictionary in the list features the following parameters: + - airflow_db_model: Model imported from airflow.models corresponding to + a table in the airflow metadata database + - age_check_column: Column in the model/table to use for calculating max + date of data deletion + - keep_last: Boolean to specify whether to preserve last run instance + - keep_last_filters: List of filters to preserve data from deleting + during clean-up, such as DAG runs where the external trigger is set to 0. + - keep_last_group_by: Option to specify column by which to group the + database entries and perform aggregate functions. + +3. Create and Set the following Variables in the Airflow Web Server + (Admin -> Variables) + - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain + the log files if not already provided in the conf. If this is set to 30, + the job will remove those files that are 30 days old or older. + +4. Put the DAG in your gcs bucket. """ +from datetime import timedelta +import logging +import os + import airflow from airflow import settings -from airflow.configuration import conf -from airflow.models import DAG, DagTag, DagModel, DagRun, Log, XCom, SlaMiss, TaskInstance, Variable -try: - from airflow.jobs import BaseJob -except Exception as e: - from airflow.jobs.base_job import BaseJob -from airflow.operators.python_operator import PythonOperator -from datetime import datetime, timedelta +from airflow.models import ( + DAG, + DagModel, + DagRun, + Log, + SlaMiss, + TaskInstance, + Variable, + XCom, +) +from airflow.operators.python import PythonOperator +from airflow.utils import timezone +from airflow.version import version as airflow_version + import dateutil.parser -import logging -import os -from sqlalchemy import func, and_ +from sqlalchemy import and_, func, text from sqlalchemy.exc import ProgrammingError from sqlalchemy.orm import load_only -try: - # airflow.utils.timezone is available from v1.10 onwards - from airflow.utils import timezone - now = timezone.utcnow -except ImportError: - now = datetime.utcnow +now = timezone.utcnow # airflow-db-cleanup DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") @@ -42,170 +68,183 @@ DAG_OWNER_NAME = "operations" # List of email address to send email alerts to if this job fails ALERT_EMAIL_ADDRESSES = [] +# Airflow version used by the environment in list form, value stored in +# airflow_version is in format e.g "2.3.4+composer" +AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".") # Length to retain the log files if not already provided in the conf. If this # is set to 30, the job will remove those files that arE 30 days old or older. - DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int( Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30) ) -# Prints the database entries which will be getting deleted; set to False to avoid printing large lists and slowdown process -PRINT_DELETES = True +# Prints the database entries which will be getting deleted; set to False +# to avoid printing large lists and slowdown process +PRINT_DELETES = False # Whether the job should delete the db entries or not. Included if you want to # temporarily avoid deleting the db entries. ENABLE_DELETE = True - -# get dag model last schedule run -try: - dag_model_last_scheduler_run = DagModel.last_scheduler_run -except AttributeError: - dag_model_last_scheduler_run = DagModel.last_parsed_time - # List of all the objects that will be deleted. Comment out the DB objects you # want to skip. DATABASE_OBJECTS = [ - { - "airflow_db_model": BaseJob, - "age_check_column": BaseJob.latest_heartbeat, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None - }, { "airflow_db_model": DagRun, "age_check_column": DagRun.execution_date, "keep_last": True, "keep_last_filters": [DagRun.external_trigger.is_(False)], - "keep_last_group_by": DagRun.dag_id - }, - { - "airflow_db_model": TaskInstance, - "age_check_column": TaskInstance.execution_date, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None + "keep_last_group_by": DagRun.dag_id, }, + # { + # "airflow_db_model": TaskInstance, + # "age_check_column": TaskInstance.start_date + # if AIRFLOW_VERSION < ["2", "2", "0"] + # else TaskInstance.start_date, + # "keep_last": False, + # "keep_last_filters": None, + # "keep_last_group_by": None, + # }, { "airflow_db_model": Log, "age_check_column": Log.dttm, "keep_last": False, "keep_last_filters": None, - "keep_last_group_by": None - }, - { - "airflow_db_model": XCom, - "age_check_column": XCom.execution_date, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None + "keep_last_group_by": None, }, + # { + # "airflow_db_model": XCom, + # "age_check_column": XCom.execution_date + # if AIRFLOW_VERSION < ["2", "2", "5"] + # else XCom.timestamp, + # "keep_last": False, + # "keep_last_filters": None, + # "keep_last_group_by": None, + # }, { "airflow_db_model": SlaMiss, "age_check_column": SlaMiss.execution_date, "keep_last": False, "keep_last_filters": None, - "keep_last_group_by": None + "keep_last_group_by": None, }, { "airflow_db_model": DagModel, - "age_check_column": dag_model_last_scheduler_run, + "age_check_column": DagModel.last_parsed_time, "keep_last": False, "keep_last_filters": None, - "keep_last_group_by": None - }] + "keep_last_group_by": None, + }, +] # Check for TaskReschedule model -try: - from airflow.models import TaskReschedule - DATABASE_OBJECTS.append({ - "airflow_db_model": TaskReschedule, - "age_check_column": TaskReschedule.execution_date, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None - }) - -except Exception as e: - logging.error(e) +# try: +# from airflow.models import TaskReschedule + +# DATABASE_OBJECTS.append( +# { +# "airflow_db_model": TaskReschedule, +# "age_check_column": TaskReschedule.execution_date +# if AIRFLOW_VERSION < ["2", "2", "0"] +# else TaskReschedule.start_date, +# "keep_last": False, +# "keep_last_filters": None, +# "keep_last_group_by": None, +# } +# ) + +# except Exception as e: +# logging.error(e) # Check for TaskFail model try: from airflow.models import TaskFail - DATABASE_OBJECTS.append({ - "airflow_db_model": TaskFail, - "age_check_column": TaskFail.execution_date, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None - }) + + DATABASE_OBJECTS.append( + { + "airflow_db_model": TaskFail, + "age_check_column": TaskFail.start_date, + "keep_last": False, + "keep_last_filters": None, + "keep_last_group_by": None, + } + ) except Exception as e: logging.error(e) # Check for RenderedTaskInstanceFields model -try: - from airflow.models import RenderedTaskInstanceFields - DATABASE_OBJECTS.append({ - "airflow_db_model": RenderedTaskInstanceFields, - "age_check_column": RenderedTaskInstanceFields.execution_date, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None - }) - -except Exception as e: - logging.error(e) +# if AIRFLOW_VERSION < ["2", "4", "0"]: +# try: +# from airflow.models import RenderedTaskInstanceFields + +# DATABASE_OBJECTS.append( +# { +# "airflow_db_model": RenderedTaskInstanceFields, +# "age_check_column": RenderedTaskInstanceFields.execution_date, +# "keep_last": False, +# "keep_last_filters": None, +# "keep_last_group_by": None, +# } +# ) + +# except Exception as e: +# logging.error(e) # Check for ImportError model try: from airflow.models import ImportError - DATABASE_OBJECTS.append({ - "airflow_db_model": ImportError, - "age_check_column": ImportError.timestamp, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None - }) + + DATABASE_OBJECTS.append( + { + "airflow_db_model": ImportError, + "age_check_column": ImportError.timestamp, + "keep_last": False, + "keep_last_filters": None, + "keep_last_group_by": None, + "do_not_delete_by_dag_id": True, + } + ) except Exception as e: logging.error(e) -# Check for celery executor -airflow_executor = str(conf.get("core", "executor")) -logging.info("Airflow Executor: " + str(airflow_executor)) -if(airflow_executor == "CeleryExecutor"): - logging.info("Including Celery Modules") +if AIRFLOW_VERSION < ["2", "6", "0"]: try: - from celery.backends.database.models import Task, TaskSet - DATABASE_OBJECTS.extend(( + from airflow.jobs.base_job import BaseJob + + DATABASE_OBJECTS.append( { - "airflow_db_model": Task, - "age_check_column": Task.date_done, + "airflow_db_model": BaseJob, + "age_check_column": BaseJob.latest_heartbeat, "keep_last": False, "keep_last_filters": None, - "keep_last_group_by": None - }, + "keep_last_group_by": None, + } + ) + except Exception as e: + logging.error(e) +else: + try: + from airflow.jobs.job import Job + + DATABASE_OBJECTS.append( { - "airflow_db_model": TaskSet, - "age_check_column": TaskSet.date_done, + "airflow_db_model": Job, + "age_check_column": Job.latest_heartbeat, "keep_last": False, "keep_last_filters": None, - "keep_last_group_by": None - })) - + "keep_last_group_by": None, + } + ) except Exception as e: logging.error(e) -session = settings.Session() - default_args = { - 'owner': DAG_OWNER_NAME, - 'depends_on_past': False, - 'email': ALERT_EMAIL_ADDRESSES, - 'email_on_failure': True, - 'email_on_retry': False, - 'start_date': START_DATE, - 'retries': 1, - 'retry_delay': timedelta(minutes=1) + "owner": DAG_OWNER_NAME, + "depends_on_past": False, + "email": ALERT_EMAIL_ADDRESSES, + "email_on_failure": True, + "email_on_retry": False, + "start_date": START_DATE, + "retries": 1, + "retry_delay": timedelta(minutes=1), } dag = DAG( @@ -213,11 +252,10 @@ default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, start_date=START_DATE, - tags=['teamclairvoyant', 'airflow-maintenance-dags'] ) -if hasattr(dag, 'doc_md'): +if hasattr(dag, "doc_md"): dag.doc_md = __doc__ -if hasattr(dag, 'catchup'): +if hasattr(dag, "catchup"): dag.catchup = False @@ -227,15 +265,14 @@ def print_configuration_function(**context): logging.info("dag_run.conf: " + str(dag_run_conf)) max_db_entry_age_in_days = None if dag_run_conf: - max_db_entry_age_in_days = dag_run_conf.get( - "maxDBEntryAgeInDays", None - ) + max_db_entry_age_in_days = dag_run_conf.get("maxDBEntryAgeInDays", None) logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf)) - if (max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1): + if max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1: logging.info( - "maxDBEntryAgeInDays conf variable isn't included or Variable " + - "value is less than 1. Using Default '" + - str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS) + "'" + "maxDBEntryAgeInDays conf variable isn't included or Variable " + + "value is less than 1. Using Default '" + + str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS) + + "'" ) max_db_entry_age_in_days = DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS max_date = now() + timedelta(-max_db_entry_age_in_days) @@ -246,7 +283,6 @@ def print_configuration_function(**context): logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days)) logging.info("max_date: " + str(max_date)) logging.info("enable_delete: " + str(ENABLE_DELETE)) - logging.info("session: " + str(session)) logging.info("") logging.info("Setting max_execution_date to XCom for Downstream Processes") @@ -254,13 +290,77 @@ def print_configuration_function(**context): print_configuration = PythonOperator( - task_id='print_configuration', + task_id="print_configuration", python_callable=print_configuration_function, provide_context=True, - dag=dag) + dag=dag, +) + + +def build_query( + session, + airflow_db_model, + age_check_column, + max_date, + keep_last, + keep_last_filters=None, + keep_last_group_by=None, +): + query = session.query(airflow_db_model).options(load_only(age_check_column)) + + logging.info("INITIAL QUERY : " + str(query)) + + if not keep_last: + query = query.filter( + age_check_column <= max_date, + ) + else: + subquery = session.query(func.max(DagRun.execution_date)) + # workaround for MySQL "table specified twice" issue + # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41 + if keep_last_filters is not None: + for entry in keep_last_filters: + subquery = subquery.filter(entry) + + logging.info("SUB QUERY [keep_last_filters]: " + str(subquery)) + + if keep_last_group_by is not None: + subquery = subquery.group_by(keep_last_group_by) + logging.info("SUB QUERY [keep_last_group_by]: " + str(subquery)) + + subquery = subquery.from_self() + + query = query.filter( + and_(age_check_column.notin_(subquery)), and_(age_check_column <= max_date) + ) + + return query + + +def print_query(query, airflow_db_model, age_check_column): + entries_to_delete = query.all() + + logging.info("Query: " + str(query)) + logging.info( + "Process will be Deleting the following " + + str(airflow_db_model.__name__) + + "(s):" + ) + for entry in entries_to_delete: + date = str(entry.__dict__[str(age_check_column).split(".")[1]]) + logging.info("\tEntry: " + str(entry) + ", Date: " + date) + + logging.info( + "Process will be Deleting " + + str(len(entries_to_delete)) + + " " + + str(airflow_db_model.__name__) + + "(s)" + ) def cleanup_function(**context): + session = settings.Session() logging.info("Retrieving max_execution_date from XCom") max_date = context["ti"].xcom_pull( @@ -291,92 +391,109 @@ def cleanup_function(**context): logging.info("Running Cleanup Process...") try: - query = session.query(airflow_db_model).options( - load_only(age_check_column) - ) + if context["params"].get("do_not_delete_by_dag_id"): + query = build_query( + session, + airflow_db_model, + age_check_column, + max_date, + keep_last, + keep_last_filters, + keep_last_group_by, + ) + if PRINT_DELETES: + print_query(query, airflow_db_model, age_check_column) + if ENABLE_DELETE: + logging.info("Performing Delete...") + query.delete(synchronize_session=False) + session.commit() + else: + dags = session.query(airflow_db_model.dag_id).distinct() + session.commit() - logging.info("INITIAL QUERY : " + str(query)) + list_dags = [str(list(dag)[0]) for dag in dags] + [None] + for dag in list_dags: + query = build_query( + session, + airflow_db_model, + age_check_column, + max_date, + keep_last, + keep_last_filters, + keep_last_group_by, + ) + query = query.filter(airflow_db_model.dag_id == dag) + if PRINT_DELETES: + print_query(query, airflow_db_model, age_check_column) + if ENABLE_DELETE: + logging.info("Performing Delete...") + query.delete(synchronize_session=False) + session.commit() + + if not ENABLE_DELETE: + logging.warn( + "You've opted to skip deleting the db entries. " + "Set ENABLE_DELETE to True to delete entries!!!" + ) - if keep_last: + logging.info("Finished Running Cleanup Process") - subquery = session.query(func.max(DagRun.execution_date)) - # workaround for MySQL "table specified twice" issue - # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41 - if keep_last_filters is not None: - for entry in keep_last_filters: - subquery = subquery.filter(entry) + except ProgrammingError as e: + logging.error(e) + logging.error( + str(airflow_db_model) + " is not present in the metadata. " "Skipping..." + ) - logging.info("SUB QUERY [keep_last_filters]: " + str(subquery)) + finally: + session.close() - if keep_last_group_by is not None: - subquery = subquery.group_by(keep_last_group_by) - logging.info( - "SUB QUERY [keep_last_group_by]: " + str(subquery)) - subquery = subquery.from_self() +def cleanup_sessions(): + session = settings.Session() - query = query.filter( - and_(age_check_column.notin_(subquery)), - and_(age_check_column <= max_date) - ) + try: + logging.info("Deleting sessions...") + count_statement = "SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);" + before = session.execute(text(count_statement)).one_or_none()["cnt"] + session.execute(text("DELETE FROM session WHERE expiry < now()::timestamp(0);")) + after = session.execute(text(count_statement)).one_or_none()["cnt"] + logging.info("Deleted %s expired sessions.", (before - after)) + except Exception as err: + logging.exception(err) - else: - query = query.filter(age_check_column <= max_date,) + session.commit() + session.close() - if PRINT_DELETES: - entries_to_delete = query.all() - logging.info("Query: " + str(query)) - logging.info( - "Process will be Deleting the following " + - str(airflow_db_model.__name__) + "(s):" - ) - for entry in entries_to_delete: - logging.info( - "\tEntry: " + str(entry) + ", Date: " + - str(entry.__dict__[str(age_check_column).split(".")[1]]) - ) +def analyze_db(): + session = settings.Session() + session.execute("ANALYZE") + session.commit() + session.close() - logging.info( - "Process will be Deleting " + str(len(entries_to_delete)) + " " + - str(airflow_db_model.__name__) + "(s)" - ) - else: - logging.warn( - "You've opted to skip printing the db entries to be deleted. Set PRINT_DELETES to True to show entries!!!") - - if ENABLE_DELETE: - logging.info('Performing Delete...') - if airflow_db_model.__name__ == 'DagModel': - logging.info('Deleting tags...') - ids_query = query.from_self().with_entities(DagModel.dag_id) - tags_query = session.query(DagTag).filter(DagTag.dag_id.in_(ids_query)) - logging.info('Tags delete Query: ' + str(tags_query)) - tags_query.delete(synchronize_session=False) - # using bulk delete - query.delete(synchronize_session=False) - session.commit() - logging.info('Finished Performing Delete') - else: - logging.warn( - "You've opted to skip deleting the db entries. Set ENABLE_DELETE to True to delete entries!!!") - logging.info("Finished Running Cleanup Process") +analyze_op = PythonOperator( + task_id="analyze_query", python_callable=analyze_db, provide_context=True, dag=dag +) - except ProgrammingError as e: - logging.error(e) - logging.error(str(airflow_db_model) + - " is not present in the metadata. Skipping...") +cleanup_session_op = PythonOperator( + task_id="cleanup_sessions", + python_callable=cleanup_sessions, + provide_context=True, + dag=dag, +) +cleanup_session_op.set_downstream(analyze_op) for db_object in DATABASE_OBJECTS: - cleanup_op = PythonOperator( - task_id='cleanup_' + str(db_object["airflow_db_model"].__name__), + task_id="cleanup_" + str(db_object["airflow_db_model"].__name__), python_callable=cleanup_function, params=db_object, provide_context=True, - dag=dag + dag=dag, ) print_configuration.set_downstream(cleanup_op) + cleanup_op.set_downstream(analyze_op) +