Skip to content

Commit

Permalink
Add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Oct 18, 2024
1 parent 7f6bdda commit 1c994d2
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 3 deletions.
100 changes: 99 additions & 1 deletion .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Test and release related jobs

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [integration-tests]
pull_request_target: # Also run on pull requests originated from forks
branches: [main]
release:
Expand Down Expand Up @@ -104,6 +104,104 @@ jobs:
path: .coverage
include-hidden-files: true

Run-Integration-Tests:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ]
airflow-version: [ "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10" ]
exclude:
# Apache Airflow versions prior to 2.3.0 have not been tested with Python 3.10
# See: https://airflow.apache.org/docs/apache-airflow/2.2.0/installation/prerequisites.html
- python-version: "3.10"
airflow-version: "2.2"
# Apache Airflow versions prior to 2.6.2 have not been tested with Python 3.11
- python-version: "3.11"
airflow-version: "2.2"
- python-version: "3.11"
airflow-version: "2.3"
- python-version: "3.11"
airflow-version: "2.4"
- python-version: "3.11"
airflow-version: "2.5"
- python-version: "3.11"
airflow-version: "2.6"
# Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12.
# Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0.
# See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements
# See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements
- python-version: "3.12"
airflow-version: "2.2"
- python-version: "3.12"
airflow-version: "2.3"
- python-version: "3.12"
airflow-version: "2.4"
- python-version: "3.12"
airflow-version: "2.5"
- python-version: "3.12"
airflow-version: "2.6"
- python-version: "3.12"
airflow-version: "2.7"
- python-version: "3.12"
airflow-version: "2.8"
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}

- uses: actions/cache@v4
with:
path: |
~/.cache/pip
.local/share/hatch/
key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('dagfactory/__init__.py') }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install packages and dependencies
run: |
python -m pip install uv
uv pip install --system hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze
- name: Test DAG Factory against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }}
run: |
PYTHONPATH=`pwd`:`pwd`/examples CONFIG_ROOT_DIR=`pwd`"/dags" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup
PYTHONPATH=`pwd`:`pwd`/examples CONFIG_ROOT_DIR=`pwd`"/dags" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration
env:
AIRFLOW_HOME: /home/runner/work/dag-factory/dag-factory/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:[email protected]:5432/postgres
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/dag-factory/dag-factory/:/home/runner/work/dag-factory/dag-factory/examples:$PYTHONPATH
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
with:
name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
include-hidden-files: true

Code-Coverage:
if: github.event.action != 'labeled'
needs:
Expand Down
10 changes: 10 additions & 0 deletions scripts/test/integration-setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

set -v
set -x
set -e

rm -rf airflow.*
pip freeze | grep airflow
airflow db reset -y
airflow db init
19 changes: 19 additions & 0 deletions scripts/test/integration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

set -x
set -e


pip freeze | grep airflow
echo $AIRFLOW_HOME
ls $AIRFLOW_HOME

airflow db check


pytest -vv \
--cov=dagfactory \
--cov-report=term-missing \
--cov-report=xml \
--durations=0 \
-m integration
54 changes: 52 additions & 2 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,77 @@

from pathlib import Path

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache

import airflow
import pytest
from airflow.models.dagbag import DagBag
from airflow.utils.db import create_default_connections
from airflow.utils.session import provide_session
from packaging.version import Version

from . import utils as test_utils

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "examples"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
AIRFLOW_VERSION = Version(airflow.__version__)

IGNORED_DAG_FILES = []

MIN_VER_DAG_FILE_VER: dict[str, list[str]] = {
"2.3": ["example_dynamic_task_mapping.py"],
}


def test_no_import_errors():
@provide_session
def get_session(session=None):
create_default_connections(session)
return session


@pytest.fixture()
def session():
return get_session()


@cache
def get_dag_bag() -> DagBag:
"""Create a DagBag by adding the files that are not supported to .airflowignore"""

with open(AIRFLOW_IGNORE_FILE, "w+") as file:
for min_version, files in MIN_VER_DAG_FILE_VER.items():
if AIRFLOW_VERSION < Version(min_version):
print(f"Adding {files} to .airflowignore")
file.writelines([f"{file}\n" for file in files])

for dagfile in IGNORED_DAG_FILES:
print(f"Adding {dagfile} to .airflowignore")
file.writelines([f"{dagfile}\n"])

print(".airflowignore contents: ")
print(AIRFLOW_IGNORE_FILE.read_text())
db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
assert db.dags
assert not db.import_errors
return db


def get_dag_ids() -> list[str]:
dag_bag = get_dag_bag()
return dag_bag.dag_ids


@pytest.mark.integration
@pytest.mark.parametrize("dag_id", get_dag_ids())
def test_example_dag(session, dag_id: str):
dag_bag = get_dag_bag()
dag = dag_bag.get_dag(dag_id)

# This feature is available since Airflow 2.5:
# https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02
if AIRFLOW_VERSION >= Version("2.5"):
dag.test()
else:
test_utils.run_dag(dag)
170 changes: 170 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from __future__ import annotations

import logging
import sys
from datetime import datetime
from typing import Any

from airflow.configuration import secrets_backend_list
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunType
from sqlalchemy.orm.session import Session

log = logging.getLogger(__name__)


def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun:
return test_dag(dag=dag, conn_file_path=conn_file_path)


# DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the
# implementation here.
@provide_session
def test_dag(
dag,
execution_date: datetime | None = None,
run_conf: dict[str, Any] | None = None,
conn_file_path: str | None = None,
variable_file_path: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
"""
Execute one single DagRun for a given DAG and execution date.
:param execution_date: execution date for the DAG run
:param run_conf: configuration to pass to newly created dagrun
:param conn_file_path: file path to a connection file in either yaml or json
:param variable_file_path: file path to a variable file in either yaml or json
:param session: database connection (optional)
"""

if conn_file_path or variable_file_path:
local_secrets = LocalFilesystemBackend(
variables_file_path=variable_file_path, connections_file_path=conn_file_path
)
secrets_backend_list.insert(0, local_secrets)

execution_date = execution_date or timezone.utcnow()

dag.log.debug("Clearing existing task instances for execution date %s", execution_date)
dag.clear(
start_date=execution_date,
end_date=execution_date,
dag_run_state=False,
session=session,
)
dag.log.debug("Getting dagrun for dag %s", dag.dag_id)
dr: DagRun = _get_or_create_dagrun(
dag=dag,
start_date=execution_date,
execution_date=execution_date,
run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
session=session,
conf=run_conf,
)

tasks = dag.task_dict
dag.log.debug("starting dagrun")
# Instead of starting a scheduler, we run the minimal loop possible to check
# for task readiness and dependency management. This is notably faster
# than creating a BackfillJob and allows us to surface logs to the user
while dr.state == State.RUNNING:
schedulable_tis, _ = dr.update_state(session=session)
for ti in schedulable_tis:
add_logger_if_needed(dag, ti)
ti.task = tasks[ti.task_id]
_run_task(ti, session=session)
if conn_file_path or variable_file_path:
# Remove the local variables we have added to the secrets_backend_list
secrets_backend_list.pop(0)

print("conn_file_path", conn_file_path)

return dr, session


def add_logger_if_needed(dag: DAG, ti: TaskInstance):
"""
Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead
of into a task file. Since this is a local test run, it is much better for the user to see logs
in the command line, rather than needing to search for a log file.
Args:
ti: The taskinstance that will receive a logger
"""
logging_format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stdout)
handler.level = logging.INFO
handler.setFormatter(logging_format)
# only add log handler once
if not any(isinstance(h, logging.StreamHandler) for h in ti.log.handlers):
dag.log.debug("Adding Streamhandler to taskinstance %s", ti.task_id)
ti.log.addHandler(handler)


def _run_task(ti: TaskInstance, session):
"""
Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of
extra steps used in `task.run` to keep our local running as fast as possible
This function is only meant for the `dag.test` function as a helper function.
Args:
ti: TaskInstance to run
"""
log.info("*****************************************************")
if hasattr(ti, "map_index") and ti.map_index > 0:
log.info("Running task %s index %d", ti.task_id, ti.map_index)
else:
log.info("Running task %s", ti.task_id)
try:
ti._run_raw_task(session=session)
session.flush()
log.info("%s ran successfully!", ti.task_id)
except AirflowSkipException:
log.info("Task Skipped, continuing")
log.info("*****************************************************")


def _get_or_create_dagrun(
dag: DAG,
conf: dict[Any, Any] | None,
start_date: datetime,
execution_date: datetime,
run_id: str,
session: Session,
) -> DagRun:
"""
Create a DAGRun, but only after clearing the previous instance of said dagrun to prevent collisions.
This function is only meant for the `dag.test` function as a helper function.
:param dag: Dag to be used to find dagrun
:param conf: configuration to pass to newly created dagrun
:param start_date: start date of new dagrun, defaults to execution_date
:param execution_date: execution_date for finding the dagrun
:param run_id: run_id to pass to new dagrun
:param session: sqlalchemy session
:return:
"""
log.info("dagrun id: %s", dag.dag_id)
dr: DagRun = (
session.query(DagRun).filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date).first()
)
if dr:
session.delete(dr)
session.commit()
dr = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=execution_date,
run_id=run_id,
start_date=start_date or execution_date,
session=session,
conf=conf,
)
log.info("created dagrun %s", str(dr))
return dr

0 comments on commit 1c994d2

Please sign in to comment.