Skip to content

Commit

Permalink
Add support to TestBehavior.BUILD (#1377)
Browse files Browse the repository at this point in the history
By default, Cosmos uses `TestBehavior.AFTER_EACH`, creating an Airflow
TaskGroup that contains two tasks:
* one to run the model, seed or snapshot
* another to run the tests related to that dbt resource

While many users desire and expect this behaviour, it can also mean
additional overhead, especially in dbt projects with more than 500
models. Each time the `dbt` command is executed, there is an overhead,
even when using optimisations such as partial parsing and `dbtRunner`.
There is also an overhead on splitting a task into multiple Airflow
workers.

Illustrating some numbers with data shared by an Astronomer customer
regarding the dbt command execution (between the logs "running dbt with
arguments" and "Done."):
* Running `dbt build` for a particular model + its tests: 46s
* Running `dbt run` + `dbt test` individually: 2min15s

This PR introduces a new behaviour, `TestBehavior.BUILD`, where Cosmos
can run both the model/seed/snapshot and the associated tests using a
single command (`dbt build`). For documentation on the dbt build, check
https://docs.getdbt.com/reference/commands/build.

This is an example of how the DAG will render when using this test
behaviour when running:
```
 airflow dags test example_cosmos_dbt_build 
```

<img width="1624" alt="Screenshot 2024-12-10 at 15 08 45"
src="https://github.com/user-attachments/assets/d74d7688-5cbf-4f18-83ad-c9847e34252e">

And this is an example of the output, showing both the model is being
run and also the tests, using the build command:
```
[2024-12-10 15:19:23,667] {local.py:405} INFO - Trying to run dbtRunner with:
 ['build', '--models', 'customers', '--full-refresh', '--project-dir', '/var/folders/td/522y78v91d1f5wgh67mj3p0m0000gn/T/tmpghz8naek', '--profiles-dir', '/tmp/profile/ac4e9cde9bc05d574c157e795dcbcc6b60246a73ca1d92d4fc669e90a1e494e0', '--profile', 'default', '--target', 'dev']
 in /var/folders/td/522y78v91d1f5wgh67mj3p0m0000gn/T/tmpghz8naek
[2024-12-10T15:19:23.667+0000] {local.py:405} INFO - Trying to run dbtRunner with:
 ['build', '--models', 'customers', '--full-refresh', '--project-dir', '/var/folders/td/522y78v91d1f5wgh67mj3p0m0000gn/T/tmpghz8naek', '--profiles-dir', '/tmp/profile/ac4e9cde9bc05d574c157e795dcbcc6b60246a73ca1d92d4fc669e90a1e494e0', '--profile', 'default', '--target', 'dev']
 in /var/folders/td/522y78v91d1f5wgh67mj3p0m0000gn/T/tmpghz8naek
15:19:23  Running with dbt=1.8.0
15:19:23  Registered adapter: postgres=1.8.0
15:19:23  Found 5 models, 3 seeds, 20 data tests, 528 macros
15:19:23  
15:19:23  Concurrency: 1 threads (target='dev')
15:19:23  
15:19:23  1 of 4 START sql table model public.customers .................................. [RUN]
15:19:23  1 of 4 OK created sql table model public.customers ............................. [SELECT 100 in 0.04s]
15:19:23  2 of 4 START test not_null_customers_customer_id ............................... [RUN]
15:19:23  2 of 4 PASS not_null_customers_customer_id ..................................... [PASS in 0.02s]
15:19:23  3 of 4 START test relationships_orders_customer_id__customer_id__ref_customers_  [RUN]
15:19:23  3 of 4 PASS relationships_orders_customer_id__customer_id__ref_customers_ ...... [PASS in 0.02s]
15:19:23  4 of 4 START test unique_customers_customer_id ................................. [RUN]
15:19:23  4 of 4 PASS unique_customers_customer_id ....................................... [PASS in 0.02s]
15:19:23  
15:19:23  Finished running 1 table model, 3 data tests in 0 hours 0 minutes and 0.19 seconds (0.19s).
15:19:24  
15:19:24  Completed successfully
15:19:24  
15:19:24  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4
```

Closes: #892
  • Loading branch information
tatiana authored Dec 12, 2024
1 parent 2fa5d01 commit 110fb07
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 13 deletions.
48 changes: 36 additions & 12 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cosmos.constants import (
DBT_COMPILE_TASK_ID,
DEFAULT_DBT_RESOURCES,
SUPPORTED_BUILD_RESOURCES,
TESTABLE_DBT_RESOURCES,
DbtResourceType,
ExecutionMode,
Expand Down Expand Up @@ -128,13 +129,39 @@ def create_test_task_metadata(
)


def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]:
"""
Return the map from dbt node type to Cosmos class prefix that should be used
to handle them.
"""

if test_behavior == TestBehavior.BUILD:
dbt_resource_to_class = {
DbtResourceType.MODEL: "DbtBuild",
DbtResourceType.SNAPSHOT: "DbtBuild",
DbtResourceType.SEED: "DbtBuild",
DbtResourceType.TEST: "DbtTest",
DbtResourceType.SOURCE: "DbtSource",
}
else:
dbt_resource_to_class = {
DbtResourceType.MODEL: "DbtRun",
DbtResourceType.SNAPSHOT: "DbtSnapshot",
DbtResourceType.SEED: "DbtSeed",
DbtResourceType.TEST: "DbtTest",
DbtResourceType.SOURCE: "DbtSource",
}
return dbt_resource_to_class


def create_task_metadata(
node: DbtNode,
execution_mode: ExecutionMode,
args: dict[str, Any],
dbt_dag_task_group_identifier: str,
use_task_group: bool = False,
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
test_behavior: TestBehavior = TestBehavior.AFTER_ALL,
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand All @@ -148,33 +175,29 @@ def create_task_metadata(
If it is False, then use the name as a prefix for the task id, otherwise do not.
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
dbt_resource_to_class = {
DbtResourceType.MODEL: "DbtRun",
DbtResourceType.SNAPSHOT: "DbtSnapshot",
DbtResourceType.SEED: "DbtSeed",
DbtResourceType.TEST: "DbtTest",
DbtResourceType.SOURCE: "DbtSource",
}
dbt_resource_to_class = create_dbt_resource_to_class(test_behavior)

args = {**args, **{"models": node.resource_name}}

if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
extra_context = {
"dbt_node_config": node.context_dict,
"dbt_dag_task_group_identifier": dbt_dag_task_group_identifier,
}
if node.resource_type == DbtResourceType.MODEL:
task_id = f"{node.name}_run"
if use_task_group is True:
if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
task_id = f"{node.name}_{node.resource_type.value}_build"
elif node.resource_type == DbtResourceType.MODEL:
if use_task_group:
task_id = "run"
else:
task_id = f"{node.name}_run"
elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
and node.has_freshness is False
and node.has_test is False
):
return None
# TODO: https://github.com/astronomer/astronomer-cosmos
# pragma: no cover
task_id = f"{node.name}_source"
args["select"] = f"source:{node.resource_name}"
args.pop("models")
Expand Down Expand Up @@ -234,6 +257,7 @@ def generate_task_or_group(
dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group),
use_task_group=use_task_group,
source_rendering_behavior=source_rendering_behavior,
test_behavior=test_behavior,
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down
9 changes: 9 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class TestBehavior(Enum):
Behavior of the tests.
"""

BUILD = "build"
NONE = "none"
AFTER_EACH = "after_each"
AFTER_ALL = "after_all"
Expand Down Expand Up @@ -144,6 +145,14 @@ def _missing_value_(cls, value): # type: ignore

DEFAULT_DBT_RESOURCES = DbtResourceType.__members__.values()

# According to the dbt documentation (https://docs.getdbt.com/reference/commands/build), build also supports test nodes.
# However, in the context of Cosmos, we will run test nodes together with the respective models/seeds/snapshots nodes
SUPPORTED_BUILD_RESOURCES = [
DbtResourceType.MODEL,
DbtResourceType.SNAPSHOT,
DbtResourceType.SEED,
]

# dbt test runs tests defined on models, sources, snapshots, and seeds.
# It expects that you have already created those resources through the appropriate commands.
# https://docs.getdbt.com/reference/commands/test
Expand Down
47 changes: 47 additions & 0 deletions dev/dags/example_cosmos_dbt_build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
An example Airflow DAG that illustrates using the dbt build to run both models/seeds/sources and their respective tests.
"""

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import TestBehavior
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)

# [START build_example]
example_cosmos_dbt_build = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
render_config=RenderConfig(
test_behavior=TestBehavior.BUILD,
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="example_cosmos_dbt_build",
default_args={"retries": 2},
)
# [END build_example]
Binary file added docs/_static/test_behavior_after_all.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/test_behavior_after_each.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/test_behavior_build.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
19 changes: 18 additions & 1 deletion docs/configuration/testing-behavior.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ default behavior, which runs all models and tests, and then reports all failures
Cosmos supports the following test behaviors:

- ``after_each`` (default): turns each model into a task group with two steps: run the model, and run the tests
- ``build``: run dbt resources using the ``dbt build`` command, using a single task. This applies to dbt models, seeds and snapshots.
- ``after_all``: each model becomes a single task, and the tests only run if all models are run successfully
- ``none``: don't include tests

Example:
Example of the standard behavior of ``TestBehavior.AFTER_EACH``,
when using the example DAG available in ``dev/dags/basic_cosmos_dag.py``:

.. image:: ../_static/test_behavior_after_each.png

Example when changing the behavior to use ``TestBehavior.AFTER_ALL``:

.. code-block:: python
Expand All @@ -31,6 +37,17 @@ Example:
)
)
.. image:: ../_static/test_behavior_after_all.png


Finally, an example DAG and how it is rendered in the Airflow UI when using ``TestBehavior.BUILD``:

.. literalinclude:: ../../dev/dags/example_cosmos_dbt_build.py
:language: python
:start-after: [START build_example]
:end-before: [END build_example]

.. image:: ../_static/test_behavior_build.png

Warning Behavior
----------------
Expand Down
43 changes: 43 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,49 @@ def test_build_airflow_graph_with_after_all():
assert dag.leaves[0].select == ["tag:some"]


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4"),
reason="Airflow DAG did not have task_group_dict until the 2.4 release",
)
@pytest.mark.integration
def test_build_airflow_graph_with_build():
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
task_args = {
"project_dir": SAMPLE_PROJ_PATH,
"conn_id": "fake_conn",
"profile_config": ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="fake_conn",
profile_args={"schema": "public"},
),
),
}
render_config = RenderConfig(
test_behavior=TestBehavior.BUILD,
)
build_airflow_graph(
nodes=sample_nodes,
dag=dag,
execution_mode=ExecutionMode.LOCAL,
test_indirect_selection=TestIndirectSelection.EAGER,
task_args=task_args,
dbt_project_name="astro_shop",
render_config=render_config,
)
topological_sort = [task.task_id for task in dag.topological_sort()]
expected_sort = ["seed_parent_seed_build", "parent_model_build", "child_model_build", "child2_v2_model_build"]
assert topological_sort == expected_sort

task_groups = dag.task_group_dict
assert len(task_groups) == 0

assert len(dag.leaves) == 2
assert dag.leaves[0].task_id in ("child_model_build", "child2_v2_model_build")
assert dag.leaves[1].task_id in ("child_model_build", "child2_v2_model_build")


@pytest.mark.integration
@patch("airflow.hooks.base.BaseHook.get_connection", new=MagicMock())
def test_build_airflow_graph_with_dbt_compile_task():
Expand Down

0 comments on commit 110fb07

Please sign in to comment.