Skip to content

Commit

Permalink
Add support for InvocationMode.DBT_RUNNER for local execution mode (a…
Browse files Browse the repository at this point in the history
…stronomer#850)

## Description

This PR adds `dbtRunner` programmatic invocation for
`ExecutionMode.LOCAL`. I decided to not make a new execution mode for
each (e.g. `ExecutionMode.LOCAL_DBT_RUNNER`) and all of the child
operators but instead added an additional config
`ExecutionConfig.invocation_mode` where `InvocationMode.DBT_RUNNER`
could be specified. This is so that users who are already using local
execution mode could use dbt runner and see performance improvements.

With the `dbtRunnerResult` it makes it easy to know whether the dbt run
was successful and logs do not need to be parsed but are still logged in
the operator:


![image](https://github.com/astronomer/astronomer-cosmos/assets/79104794/76a4cf82-f0f2-4133-8d68-a0a6a145b1d8)

## Performance Testing

After astronomer#827 was added, I modified it slightly to use postgres adapter
instead of sqlite because the latest dbt-core support for sqlite is 1.4
when programmatic invocation requires >=1.5.0. I got the following
results comparing subprocess to dbt runner for 10 models:

1. `InvocationMode.SUBPROCESS`:
```shell
Ran 10 models in 23.77661895751953 seconds
NUM_MODELS=10
TIME=23.77661895751953
```
2. `InvocationMode.DBT_RUNNER`:
```shell
Ran 10 models in 8.390100002288818 seconds
NUM_MODELS=10
TIME=8.390100002288818
```

So using `InvocationMode.DBT_RUNNER` is almost 3x faster, and can speed
up dag runs if there are a lot of models that execute relatively quickly
since there seems to be a 1-2s speed up per task.


One thing I found while working on this is that a
[manifest](https://docs.getdbt.com/reference/programmatic-invocations#reusing-objects)
is stored in the result if you parse a project with the runner, and can
be reused in subsequent commands to avoid reparsing. This could be a
useful way for caching the manifest if we use dbt runner for dbt ls
parsing and could speed up the initial render as well.


I thought at first it would be easy to have this also work for
virtualenv execution, since I at first thought the entire `execute`
method was run in the virtualenv, which is not the case since the
virtualenv operator creates a virtualenv and then passes the executable
path to a subprocess. It may be possible to have this work for
virtualenv and would be better suited for a follow-up PR.

## Related Issue(s)

closes astronomer#717 

## Breaking Change?

None

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works - added unit tests and integration tests.
  • Loading branch information
jbandoro authored and arojasb3 committed Jul 14, 2024
1 parent de22c92 commit a8246ac
Show file tree
Hide file tree
Showing 21 changed files with 679 additions and 106 deletions.
22 changes: 20 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,25 @@ jobs:
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH

Run-Performance-Tests:
needs: Authorize
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
airflow-version: ["2.7"]
num-models: [1, 10, 50, 100]

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -335,8 +347,14 @@ jobs:
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
MODEL_COUNT: ${{ matrix.num-models }}

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
Expand Down
13 changes: 12 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
import warnings
from typing import Any, Iterator, Callable

from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, LoadMode, TestIndirectSelection
from cosmos.constants import (
DbtResourceType,
TestBehavior,
ExecutionMode,
LoadMode,
TestIndirectSelection,
InvocationMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -295,17 +302,21 @@ class ExecutionConfig:
Contains configuration about how to execute dbt.
:param execution_mode: The execution mode for dbt. Defaults to local
:param invocation_mode: The invocation mode for the dbt command. This is only configurable for ExecutionMode.LOCAL.
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
invocation_mode: InvocationMode | None = None
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER
dbt_executable_path: str | Path = field(default_factory=get_system_dbt)

dbt_project_path: InitVar[str | Path | None] = None
project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.invocation_mode and self.execution_mode != ExecutionMode.LOCAL:
raise CosmosValueError("ExecutionConfig.invocation_mode is only configurable for ExecutionMode.LOCAL.")
self.project_path = Path(dbt_project_path) if dbt_project_path else None
9 changes: 9 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ class ExecutionMode(Enum):
AZURE_CONTAINER_INSTANCE = "azure_container_instance"


class InvocationMode(Enum):
"""
How the dbt command should be invoked.
"""

SUBPROCESS = "subprocess"
DBT_RUNNER = "dbt_runner"


class TestIndirectSelection(Enum):
"""
Modes to configure the test behavior when performing indirect selection.
Expand Down
2 changes: 2 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ def __init__(
}
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path
if execution_config.invocation_mode:
task_args["invocation_mode"] = execution_config.invocation_mode

validate_arguments(
render_config.select,
Expand Down
69 changes: 57 additions & 12 deletions cosmos/dbt/parser/output.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,53 @@
from __future__ import annotations

import logging
import re
from typing import List, Tuple
from typing import List, Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from dbt.cli.main import dbtRunnerResult

from cosmos.hooks.subprocess import FullOutputSubprocessResult


def parse_output(result: FullOutputSubprocessResult, keyword: str) -> int:
DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"


def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> int:
"""
Parses the dbt test output message and returns the number of errors or warnings.
Parses the dbt test output message and returns the number of warnings.
:param result: String containing the output to be parsed.
:param keyword: String representing the keyword to search for in the output (WARN, ERROR).
:return: An integer value associated with the keyword, or 0 if parsing fails.
Usage:
-----
output_str = "Done. PASS=15 WARN=1 ERROR=0 SKIP=0 TOTAL=16"
keyword = "WARN"
num_warns = parse_output(output_str, keyword)
num_warns = parse_output(output_str)
print(num_warns)
# Output: 1
"""
output = result.output
try:
num = int(output.split(f"{keyword}=")[1].split()[0])
except ValueError:
logging.error(
f"Could not parse number of {keyword}s. Check your dbt/airflow version or if --quiet is not being used"
)
num = 0
if DBT_NO_TESTS_MSG not in result.output and DBT_WARN_MSG in result.output:
try:
num = int(output.split(f"{DBT_WARN_MSG}=")[1].split()[0])
except ValueError:
logging.error(
f"Could not parse number of {DBT_WARN_MSG}s. Check your dbt/airflow version or if --quiet is not being used"
)
return num


def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int:
"""Parses a dbt runner result and returns the number of warnings found. This only works for dbtRunnerResult
from invoking dbt build, compile, run, seed, snapshot, test, or run-operation.
"""
num = 0
for run_result in result.result.results: # type: ignore
if run_result.status == "warn":
num += 1
return num


Expand Down Expand Up @@ -67,3 +87,28 @@ def clean_line(line: str) -> str:
test_results.append(test_result)

return test_names, test_results


def extract_dbt_runner_issues(
result: dbtRunnerResult, status_levels: list[str] = ["warn"]
) -> Tuple[List[str], List[str]]:
"""
Extracts messages from the dbt runner result and returns them as a formatted string.
This function iterates over dbtRunnerResult messages in dbt run. It extracts results that match the
status levels provided and appends them to a list of issues.
:param result: dbtRunnerResult object containing the output to be parsed.
:param status_levels: List of strings, where each string is a result status level. Default is ["warn"].
:return: two lists of strings, the first one containing the node names and the second one
containing the node result message.
"""
node_names = []
node_results = []

for node_result in result.result.results: # type: ignore
if node_result.status in status_levels:
node_names.append(str(node_result.node.name))
node_results.append(str(node_result.message))

return node_names, node_results
13 changes: 13 additions & 0 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,16 @@ def environ(env_vars: dict[str, str]) -> Generator[None, None, None]:
del os.environ[key]
else:
os.environ[key] = value


@contextmanager
def change_working_directory(path: str) -> Generator[None, None, None]:
"""Temporarily changes the working directory to the given path, and then restores
back to the previous value on exit.
"""
previous_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(previous_cwd)
Loading

0 comments on commit a8246ac

Please sign in to comment.