Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for InvocationMode.DBT_RUNNER for local execution mode #836

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9fe6336
add InvocationMode to ExecutionConfig
jbandoro Feb 5, 2024
fc60b91
pass invocation_mode in task_args only if not None since it's only va…
jbandoro Feb 5, 2024
2f7d2ab
allow DbtLocalBaseOperator to use dbtRunner for invocation
jbandoro Feb 6, 2024
77cd3cb
improve type hints for subprocess hooks
jbandoro Feb 6, 2024
7247487
add change_working_directory context manager and add integration tests
jbandoro Feb 6, 2024
a1c5da0
rm duplicate import
jbandoro Feb 6, 2024
25e669e
add test coverage for env vars context
jbandoro Feb 6, 2024
2adda9c
add invocation mode to docs
jbandoro Feb 6, 2024
15da888
fix: test coverage
jbandoro Feb 6, 2024
fe5b1ab
Merge branch 'main' into 717-add-dbtrunner-local-executor
jbandoro Feb 6, 2024
1a90a3f
on_kill check for InvocationMode.SUBPROCESS
jbandoro Feb 13, 2024
64d68e3
Merge branch 'main' into 717-add-dbtrunner-local-executor
jbandoro Feb 14, 2024
af6059d
Merge branch 'main' into 717-add-dbtrunner-local-executor
jbandoro Feb 14, 2024
6c6c913
Merge branch 'main' into 717-add-dbtrunner-local-executor
jbandoro Feb 15, 2024
f0e03be
add note of dbt >= v1.50 requirement
jbandoro Feb 15, 2024
9ca62b3
Merge branch 'main' into 717-add-dbtrunner-local-executor
jbandoro Feb 16, 2024
7a3b182
update perf dag to use postgres to allow latest dbt-core
jbandoro Feb 17, 2024
fd92032
add invocation mode discovery if none selected
jbandoro Feb 17, 2024
56c5e84
add branch to test performance dag updates
jbandoro Feb 17, 2024
42217c6
update test env vars
jbandoro Feb 17, 2024
f761a8a
try add authorize
jbandoro Feb 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: test

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [main, 717-add-dbtrunner-local-executor] # # TODO:remove before merge
pull_request_target: # Also run on pull requests originated from forks
branches: [main]

Expand Down Expand Up @@ -293,13 +293,25 @@ jobs:
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH

Run-Performance-Tests:
needs: Authorize
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
airflow-version: ["2.7"]
num-models: [1, 10, 50, 100]

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -335,8 +347,14 @@ jobs:
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
MODEL_COUNT: ${{ matrix.num-models }}

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
Expand Down
13 changes: 12 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
import warnings
from typing import Any, Iterator, Callable

from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, LoadMode, TestIndirectSelection
from cosmos.constants import (
DbtResourceType,
TestBehavior,
ExecutionMode,
LoadMode,
TestIndirectSelection,
InvocationMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -290,17 +297,21 @@ class ExecutionConfig:
Contains configuration about how to execute dbt.

:param execution_mode: The execution mode for dbt. Defaults to local
:param invocation_mode: The invocation mode for the dbt command. This is only configurable for ExecutionMode.LOCAL.
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
invocation_mode: InvocationMode | None = None
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER
dbt_executable_path: str | Path = field(default_factory=get_system_dbt)

dbt_project_path: InitVar[str | Path | None] = None
project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.invocation_mode and self.execution_mode != ExecutionMode.LOCAL:
raise CosmosValueError("ExecutionConfig.invocation_mode is only configurable for ExecutionMode.LOCAL.")
self.project_path = Path(dbt_project_path) if dbt_project_path else None
9 changes: 9 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class ExecutionMode(Enum):
AZURE_CONTAINER_INSTANCE = "azure_container_instance"


class InvocationMode(Enum):
"""
How the dbt command should be invoked.
"""

SUBPROCESS = "subprocess"
DBT_RUNNER = "dbt_runner"


class TestIndirectSelection(Enum):
"""
Modes to configure the test behavior when performing indirect selection.
Expand Down
2 changes: 2 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ def __init__(
}
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path
if execution_config.invocation_mode:
task_args["invocation_mode"] = execution_config.invocation_mode

validate_arguments(
render_config.select,
Expand Down
66 changes: 54 additions & 12 deletions cosmos/dbt/parser/output.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,53 @@
from __future__ import annotations

import logging
import re
from typing import List, Tuple
from typing import List, Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from dbt.cli.main import dbtRunnerResult

from cosmos.hooks.subprocess import FullOutputSubprocessResult


def parse_output(result: FullOutputSubprocessResult, keyword: str) -> int:
DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"


def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> int:
"""
Parses the dbt test output message and returns the number of errors or warnings.
Parses the dbt test output message and returns the number of warnings.

:param result: String containing the output to be parsed.
:param keyword: String representing the keyword to search for in the output (WARN, ERROR).
:return: An integer value associated with the keyword, or 0 if parsing fails.

Usage:
-----
output_str = "Done. PASS=15 WARN=1 ERROR=0 SKIP=0 TOTAL=16"
keyword = "WARN"
num_warns = parse_output(output_str, keyword)
num_warns = parse_output(output_str)
print(num_warns)
# Output: 1
"""
output = result.output
try:
num = int(output.split(f"{keyword}=")[1].split()[0])
except ValueError:
logging.error(
f"Could not parse number of {keyword}s. Check your dbt/airflow version or if --quiet is not being used"
)
num = 0
if DBT_NO_TESTS_MSG not in result.output and DBT_WARN_MSG in result.output:
try:
num = int(output.split(f"{DBT_WARN_MSG}=")[1].split()[0])
except ValueError:
logging.error(
f"Could not parse number of {DBT_WARN_MSG}s. Check your dbt/airflow version or if --quiet is not being used"
)
return num


def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int:
"""Parses a dbt runner result and returns the number of warnings found. This only works for dbtRunnerResult
from invoking dbt build, compile, run, seed, snapshot, test, or run-operation.
"""
num = 0
for run_result in result.result.results: # type: ignore
if run_result.status == "warn":
num += 1
return num


Expand Down Expand Up @@ -67,3 +87,25 @@ def clean_line(line: str) -> str:
test_results.append(test_result)

return test_names, test_results


def extract_dbt_runner_issues(result: dbtRunnerResult) -> Tuple[List[str], List[str]]:
"""
Extracts warning messages from the dbt runner result and returns them as a formatted string.

This function searches for warning messages in dbt run. It extracts and formats the relevant
information and appends it to a list of warnings.

:param result: dbtRunnerResult object containing the output to be parsed.
:return: two lists of strings, the first one containing the test names and the second one
containing the test results.
"""
test_names = []
test_results = []

for run_result in result.result.results: # type: ignore
if run_result.status == "warn":
test_names.append(str(run_result.node.name))
test_results.append(str(run_result.message))

return test_names, test_results
13 changes: 13 additions & 0 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ def environ(env_vars: dict[str, str]) -> Generator[None, None, None]:
del os.environ[key]
else:
os.environ[key] = value


@contextmanager
def change_working_directory(path: str) -> Generator[None, None, None]:
"""Temporarily changes the working directory to the given path, and then restores
back to the previous value on exit.
"""
previous_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(previous_cwd)
Loading
Loading