Skip to content

Commit

Permalink
Fix issue reported by the community in slack with lineage in Cosmos 1…
Browse files Browse the repository at this point in the history
….1 (#526)

Fix the following issue, that was reported in
[slack](https://apache-airflow.slack.com/archives/C059CC42E9W/p1694212778764869)
by a user named Sai:

"After upgrading to cosmos 1.1, I see the below error in Airflow logs. I
don't have any OpenLineage integrations. Is there a way to stop emitting
this error?"

```
[2023-09-08, 22:28:07 UTC] {taskinstance.py:1318} INFO - Marking task as SUCCESS. dag_id=high_freq_core_dbt, task_id=dbt_front.daily_company_user_metrics.daily_company_user_metrics_run, execution_date=20230908T183000, start_date=20230908T222730, end_date=20230908T222807
[2023-09-08, 22:28:07 UTC] {manager.py:61} ERROR - Failed to extract metadata 'TaskInstance' object has no attribute 'openlineage_events_completes' task_type=DbtRunLocalOperator airflow_dag_id=high_freq_core_dbt task_id=dbt_front.daily_company_user_metrics.daily_company_user_metrics_run airflow_run_id=scheduled__2023-09-08T18:30:00+00:00 
Traceback (most recent call last):
  File "/home/airflow/python-venvs/data-pipelines/lib/python3.10/site-packages/openlineage/airflow/extractors/manager.py", line 45, in extract_metadata
    task_metadata = extractor.extract_on_complete(task_instance)
  File "/home/airflow/python-venvs/data-pipelines/lib/python3.10/site-packages/openlineage/airflow/extractors/base.py", line 116, in extract_on_complete
    return self._get_openlineage_facets(
  File "/home/airflow/python-venvs/data-pipelines/lib/python3.10/site-packages/openlineage/airflow/extractors/base.py", line 124, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File "/home/airflow/python-venvs/data-pipelines/lib/python3.10/site-packages/cosmos/operators/local.py", line 318, in get_openlineage_facets_on_complete
    for completed in task_instance.openlineage_events_completes:
AttributeError: 'TaskInstance' object has no attribute 'openlineage_events_completes'
```

The issue was reported to happen using:
* Airflow version: v2.5.1
* dbt version: 1.5.1 
* without any intentional setup of OpenLineage

Since the issue was reported, I learned that
`get_openlineage_facets_on_complete` is called even if `execute` fails
because `get_openlineage_facets_on_failure` is not currently
implemented.

Closes: #530 (Rust dependency issue reported)
  • Loading branch information
tatiana authored Sep 14, 2023
1 parent bcf7714 commit 665a9b8
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 18 deletions.
54 changes: 38 additions & 16 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import signal
import tempfile
from attr import define
from pathlib import Path
from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING

Expand Down Expand Up @@ -53,12 +54,24 @@
from openlineage.airflow.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError) as error:
logger.warning(
"To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or "
"install astronomer-cosmos[openlineage]."
"To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]."
)
logger.exception(error)
is_openlineage_available = False

@define
class OperatorLineage: # type: ignore
inputs: list[str] = list()
outputs: list[str] = list()
run_facets: dict[str, str] = dict()
job_facets: dict[str, str] = dict()


try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)


class DbtLocalBaseOperator(DbtBaseOperator):
"""
Expand Down Expand Up @@ -251,15 +264,9 @@ def calculate_openlineage_events_completes(
for key, value in env.items():
os.environ[key] = str(value)

lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
try:
lineage_namespace = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
pass

openlineage_processor = DbtLocalArtifactProcessor(
producer=OPENLINEAGE_PRODUCER,
job_namespace=lineage_namespace,
job_namespace=LINEAGE_NAMESPACE,
project_dir=project_dir,
profile_name=self.profile_config.profile_name,
target=self.profile_config.target_name,
Expand All @@ -270,8 +277,8 @@ def calculate_openlineage_events_completes(
try:
events = openlineage_processor.parse()
self.openlineage_events_completes = events.completes
except (FileNotFoundError, NotImplementedError) as error:
logger.exception(error)
except (FileNotFoundError, NotImplementedError):
logger.debug("Unable to parse OpenLineage events", stack_info=True)

def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
"""
Expand Down Expand Up @@ -309,17 +316,32 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
"""
Collect the input, output, job and run facets for this operator.
It relies on the calculate_openlineage_events_completes having being called before.
This method is called by Openlineage even if `execute` fails, because `get_openlineage_facets_on_failure`
is not implemented.
"""

inputs = []
outputs = []
run_facets: dict[str, Any] = {}
job_facets: dict[str, Any] = {}

for completed in task_instance.openlineage_events_completes:
[inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore
[outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore
run_facets = {**run_facets, **completed.run.facets}
job_facets = {**job_facets, **completed.job.facets}
openlineage_events_completes = None
if hasattr(self, "openlineage_events_completes"):
openlineage_events_completes = self.openlineage_events_completes
elif hasattr(task_instance, "openlineage_events_completes"):
openlineage_events_completes = task_instance.openlineage_events_completes
else:
logger.info("Unable to emit OpenLineage events due to lack of data.")

if openlineage_events_completes is not None:
for completed in openlineage_events_completes:
[inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore
[outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore
run_facets = {**run_facets, **completed.run.facets}
job_facets = {**job_facets, **completed.job.facets}
else:
logger.info("Unable to emit OpenLineage events due to lack of dependencies or data.")

return OperatorLineage(
inputs=inputs,
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ classifiers = [
]
dependencies = [
# Airflow & Pydantic issue: https://github.com/apache/airflow/issues/32311
"attrs",
"pydantic>=1.10.0,<2.0.0",
"apache-airflow>=2.3.0",
"importlib-metadata; python_version < '3.8'",
"Jinja2>=3.0.0",
"typing-extensions; python_version < '3.8'",
"virtualenv",
"openlineage-integration-common",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -76,6 +76,7 @@ dbt-spark = [
"dbt-spark<=1.5.4",
]
openlineage = [
"openlineage-integration-common",
"openlineage-airflow",
]
all = [
Expand Down Expand Up @@ -134,7 +135,6 @@ dependencies = [
"apache-airflow-providers-cncf-kubernetes>=5.1.1,<7.3.0",
"types-PyYAML",
"types-attrs",
"attrs",
"types-requests",
"types-python-dateutil",
"apache-airflow"
Expand Down
17 changes: 17 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ class MockEvent:
assert facets.job_facets == {"d": 4}


def test_run_operator_emits_events_without_openlineage_events_completes(caplog):
dbt_base_operator = DbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
should_store_compiled_sql=False,
)
delattr(dbt_base_operator, "openlineage_events_completes")
facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator)
assert facets.inputs == []
assert facets.outputs == []
assert facets.run_facets == {}
assert facets.job_facets == {}
log = "Unable to emit OpenLineage events due to lack of dependencies or data."
assert log in caplog.text


def test_store_compiled_sql() -> None:
dbt_base_operator = DbtLocalBaseOperator(
profile_config=profile_config,
Expand Down

0 comments on commit 665a9b8

Please sign in to comment.