diff --git a/.github/workflows/cicd.yaml b/.github/workflows/cicd.yaml index 007acff7..c93ef4f1 100644 --- a/.github/workflows/cicd.yaml +++ b/.github/workflows/cicd.yaml @@ -2,7 +2,7 @@ name: Test and release related jobs on: push: # Run on pushes to the default branch - branches: [main] + branches: [integration-tests] pull_request_target: # Also run on pull requests originated from forks branches: [main] release: @@ -104,6 +104,104 @@ jobs: path: .coverage include-hidden-files: true + Run-Integration-Tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ] + airflow-version: [ "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ] + exclude: + # Apache Airflow versions prior to 2.3.0 have not been tested with Python 3.10 + # See: https://airflow.apache.org/docs/apache-airflow/2.2.0/installation/prerequisites.html + - python-version: "3.10" + airflow-version: "2.2" + # Apache Airflow versions prior to 2.6.2 have not been tested with Python 3.11 + - python-version: "3.11" + airflow-version: "2.2" + - python-version: "3.11" + airflow-version: "2.3" + - python-version: "3.11" + airflow-version: "2.4" + - python-version: "3.11" + airflow-version: "2.5" + - python-version: "3.11" + airflow-version: "2.6" + # Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12. + # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. + # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements + # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements + - python-version: "3.12" + airflow-version: "2.2" + - python-version: "3.12" + airflow-version: "2.3" + - python-version: "3.12" + airflow-version: "2.4" + - python-version: "3.12" + airflow-version: "2.5" + - python-version: "3.12" + airflow-version: "2.6" + - python-version: "3.12" + airflow-version: "2.7" + - python-version: "3.12" + airflow-version: "2.8" + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + + - uses: actions/cache@v4 + with: + path: | + ~/.cache/pip + .local/share/hatch/ + key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('dagfactory/__init__.py') }} + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install packages and dependencies + run: | + python -m pip install uv + uv pip install --system hatch + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + + - name: Test DAG Factory against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + run: | + PYTHONPATH=`pwd`:`pwd`/examples CONFIG_ROOT_DIR=`pwd`"/dags" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup + PYTHONPATH=`pwd`:`pwd`/examples CONFIG_ROOT_DIR=`pwd`"/dags" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration + env: + AIRFLOW_HOME: /home/runner/work/dag-factory/dag-factory/ + AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 + PYTHONPATH: /home/runner/work/dag-factory/dag-factory/:/home/runner/work/dag-factory/dag-factory/examples:$PYTHONPATH + POSTGRES_HOST: localhost + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + POSTGRES_SCHEMA: public + POSTGRES_PORT: 5432 + + - name: Upload coverage to Github + uses: actions/upload-artifact@v4 + with: + name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + path: .coverage + include-hidden-files: true + Code-Coverage: if: github.event.action != 'labeled' needs: diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh new file mode 100644 index 00000000..1dc53da9 --- /dev/null +++ b/scripts/test/integration-setup.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -v +set -x +set -e + +rm -rf airflow.* +pip freeze | grep airflow +airflow db reset -y +airflow db init diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh new file mode 100644 index 00000000..4a22c858 --- /dev/null +++ b/scripts/test/integration.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +set -x +set -e + + +pip freeze | grep airflow +echo $AIRFLOW_HOME +ls $AIRFLOW_HOME + +airflow db check + + +pytest -vv \ + --cov=dagfactory \ + --cov-report=term-missing \ + --cov-report=xml \ + --durations=0 \ + -m integration diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 09010771..11b23088 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -2,27 +2,77 @@ from pathlib import Path +try: + from functools import cache +except ImportError: + from functools import lru_cache as cache + import airflow +import pytest from airflow.models.dagbag import DagBag +from airflow.utils.db import create_default_connections +from airflow.utils.session import provide_session from packaging.version import Version +from . import utils as test_utils + EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "examples" AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" AIRFLOW_VERSION = Version(airflow.__version__) - +IGNORED_DAG_FILES = [] MIN_VER_DAG_FILE_VER: dict[str, list[str]] = { "2.3": ["example_dynamic_task_mapping.py"], } -def test_no_import_errors(): +@provide_session +def get_session(session=None): + create_default_connections(session) + return session + + +@pytest.fixture() +def session(): + return get_session() + + +@cache +def get_dag_bag() -> DagBag: + """Create a DagBag by adding the files that are not supported to .airflowignore""" + with open(AIRFLOW_IGNORE_FILE, "w+") as file: for min_version, files in MIN_VER_DAG_FILE_VER.items(): if AIRFLOW_VERSION < Version(min_version): print(f"Adding {files} to .airflowignore") file.writelines([f"{file}\n" for file in files]) + for dagfile in IGNORED_DAG_FILES: + print(f"Adding {dagfile} to .airflowignore") + file.writelines([f"{dagfile}\n"]) + + print(".airflowignore contents: ") + print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) assert db.dags assert not db.import_errors + return db + + +def get_dag_ids() -> list[str]: + dag_bag = get_dag_bag() + return dag_bag.dag_ids + + +@pytest.mark.integration +@pytest.mark.parametrize("dag_id", get_dag_ids()) +def test_example_dag(session, dag_id: str): + dag_bag = get_dag_bag() + dag = dag_bag.get_dag(dag_id) + + # This feature is available since Airflow 2.5: + # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 + if AIRFLOW_VERSION >= Version("2.5"): + dag.test() + else: + test_utils.run_dag(dag) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 00000000..1f73b693 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import logging +import sys +from datetime import datetime +from typing import Any + +from airflow.configuration import secrets_backend_list +from airflow.exceptions import AirflowSkipException +from airflow.models.dag import DAG +from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance +from airflow.secrets.local_filesystem import LocalFilesystemBackend +from airflow.utils import timezone +from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.state import DagRunState, State +from airflow.utils.types import DagRunType +from sqlalchemy.orm.session import Session + +log = logging.getLogger(__name__) + + +def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: + return test_dag(dag=dag, conn_file_path=conn_file_path) + + +# DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the +# implementation here. +@provide_session +def test_dag( + dag, + execution_date: datetime | None = None, + run_conf: dict[str, Any] | None = None, + conn_file_path: str | None = None, + variable_file_path: str | None = None, + session: Session = NEW_SESSION, +) -> DagRun: + """ + Execute one single DagRun for a given DAG and execution date. + + :param execution_date: execution date for the DAG run + :param run_conf: configuration to pass to newly created dagrun + :param conn_file_path: file path to a connection file in either yaml or json + :param variable_file_path: file path to a variable file in either yaml or json + :param session: database connection (optional) + """ + + if conn_file_path or variable_file_path: + local_secrets = LocalFilesystemBackend( + variables_file_path=variable_file_path, connections_file_path=conn_file_path + ) + secrets_backend_list.insert(0, local_secrets) + + execution_date = execution_date or timezone.utcnow() + + dag.log.debug("Clearing existing task instances for execution date %s", execution_date) + dag.clear( + start_date=execution_date, + end_date=execution_date, + dag_run_state=False, + session=session, + ) + dag.log.debug("Getting dagrun for dag %s", dag.dag_id) + dr: DagRun = _get_or_create_dagrun( + dag=dag, + start_date=execution_date, + execution_date=execution_date, + run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date), + session=session, + conf=run_conf, + ) + + tasks = dag.task_dict + dag.log.debug("starting dagrun") + # Instead of starting a scheduler, we run the minimal loop possible to check + # for task readiness and dependency management. This is notably faster + # than creating a BackfillJob and allows us to surface logs to the user + while dr.state == State.RUNNING: + schedulable_tis, _ = dr.update_state(session=session) + for ti in schedulable_tis: + add_logger_if_needed(dag, ti) + ti.task = tasks[ti.task_id] + _run_task(ti, session=session) + if conn_file_path or variable_file_path: + # Remove the local variables we have added to the secrets_backend_list + secrets_backend_list.pop(0) + + print("conn_file_path", conn_file_path) + + return dr, session + + +def add_logger_if_needed(dag: DAG, ti: TaskInstance): + """ + Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead + of into a task file. Since this is a local test run, it is much better for the user to see logs + in the command line, rather than needing to search for a log file. + Args: + ti: The taskinstance that will receive a logger + + """ + logging_format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") + handler = logging.StreamHandler(sys.stdout) + handler.level = logging.INFO + handler.setFormatter(logging_format) + # only add log handler once + if not any(isinstance(h, logging.StreamHandler) for h in ti.log.handlers): + dag.log.debug("Adding Streamhandler to taskinstance %s", ti.task_id) + ti.log.addHandler(handler) + + +def _run_task(ti: TaskInstance, session): + """ + Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of + extra steps used in `task.run` to keep our local running as fast as possible + This function is only meant for the `dag.test` function as a helper function. + + Args: + ti: TaskInstance to run + """ + log.info("*****************************************************") + if hasattr(ti, "map_index") and ti.map_index > 0: + log.info("Running task %s index %d", ti.task_id, ti.map_index) + else: + log.info("Running task %s", ti.task_id) + try: + ti._run_raw_task(session=session) + session.flush() + log.info("%s ran successfully!", ti.task_id) + except AirflowSkipException: + log.info("Task Skipped, continuing") + log.info("*****************************************************") + + +def _get_or_create_dagrun( + dag: DAG, + conf: dict[Any, Any] | None, + start_date: datetime, + execution_date: datetime, + run_id: str, + session: Session, +) -> DagRun: + """ + Create a DAGRun, but only after clearing the previous instance of said dagrun to prevent collisions. + This function is only meant for the `dag.test` function as a helper function. + :param dag: Dag to be used to find dagrun + :param conf: configuration to pass to newly created dagrun + :param start_date: start date of new dagrun, defaults to execution_date + :param execution_date: execution_date for finding the dagrun + :param run_id: run_id to pass to new dagrun + :param session: sqlalchemy session + :return: + """ + log.info("dagrun id: %s", dag.dag_id) + dr: DagRun = ( + session.query(DagRun).filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date).first() + ) + if dr: + session.delete(dr) + session.commit() + dr = dag.create_dagrun( + state=DagRunState.RUNNING, + execution_date=execution_date, + run_id=run_id, + start_date=start_date or execution_date, + session=session, + conf=conf, + ) + log.info("created dagrun %s", str(dr)) + return dr