Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support building DAGs from topologically unsorted YAML files #307

Merged
merged 6 commits into from
Dec 6, 2024

Conversation

tatiana
Copy link
Collaborator

@tatiana tatiana commented Dec 4, 2024

Any YAML files that declare upstream tasks after downstream tasks, regardless of using dynamic task mapping, would fail.

Example of DAG that would fail:

test_expand:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "test expand"
  schedule_interval: "0 3 * * *"
  default_view: "graph"
  tasks:
    process:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: expand_task
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
      partial:
        op_kwargs:
          test_id: "test"
      expand:
        op_args:
          request.output
      dependencies: [request]
    request:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: example_task_mapping
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py

In this example, the upstream (parent) task "request" is defined after the downstream (child) task "process". Before this change, this DAG would fail.

I implemented a solution to solve the problem that uses Kahn's algorithm to sort the tasks topologically:
https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm

It has asymptotic complexity O(N + D), where N is the total number of tasks, and D is the total number of dependencies. This complexity seems acceptable.

An alternative to the current approach would be to create all the tasks without dependencies as a starting point and add the dependencies once all tasks were made - similar to what we did in https://github.com/astronomer/astronomer-cosmos. However, this approach would require a bigger refactor of the DAG factory and may have issues with dynamic task mapping.

Closes: #225

@codecov-commenter
Copy link

codecov-commenter commented Dec 5, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 92.70%. Comparing base (2e9fd4e) to head (0ef4f66).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #307      +/-   ##
==========================================
+ Coverage   92.45%   92.70%   +0.25%     
==========================================
  Files          10       10              
  Lines         702      727      +25     
==========================================
+ Hits          649      674      +25     
  Misses         53       53              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana merged commit a6bf015 into main Dec 6, 2024
67 checks passed
@tatiana tatiana deleted the support-yml-with-tasks-outside-of-topological-order branch December 6, 2024 11:56
@tatiana tatiana mentioned this pull request Dec 6, 2024
@tatiana tatiana changed the title Support building DAGs out of topologically unsorted YAML files Support building DAGs from topologically unsorted YAML files Dec 6, 2024
tatiana added a commit that referenced this pull request Dec 6, 2024
### Added

- Add support to TaskFlow and improve dynamic task mapping support by
@tatiana in #314
- Render YML DAG config as DAG Docs by @pankajastro #305
- Support building DAGs out of topologically unsorted YAML files by
@tatiana in #307
- Add support for nested task groups by @glazunov996 and @pankajastro in
#292
- Add support for templating `on_failure_callback` by @jroach-astronomer
#252

### Fixed

- Fix compatibility with
apache-airflow-providers-cncf-kubernetes>=10.0.0 by @tatiana in #311
- Refactor telemetry to collect events during DAG run and not during DAG
parsing by @pankajastro #300

### Docs

- Fix reference for HttpSensor in README.md by @pankajastro in #277
- Add example DAG for task group by @pankajastro in #293
- Add CODEOWNERS by @pankajkoti in #270
- Update CODEOWNERS to track all files by @pankajkoti in #276
- Modified Status badge in README by @jaejun #298

### Others

- Refactor dynamic task mapping implementation by @tatiana in #313
- Remove pytest durations from tests by @tatiana in #309
- Remove DAG retries check since many DAGs have different retry values
by @tatiana in #310
- Lint fixes after running `pre-commit run --all-files` by @tatiana in
#312
- Remove redundant exception code by @pankajastro #294
- Add GitHub issue template for bug reports and feature requests by
@pankajkoti in #269

Closes: #223
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants