From 26811de51f4ba3ed5279682fa961e4f0d57db338 Mon Sep 17 00:00:00 2001 From: "jaegwon.seo" Date: Wed, 27 Nov 2024 16:33:43 +0900 Subject: [PATCH] change val name --- cosmos/airflow/graph.py | 6 ++++-- cosmos/core/airflow.py | 2 +- cosmos/core/graph/entities.py | 2 +- cosmos/dbt/graph.py | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 683522c8c..df2ce5b51 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -98,6 +98,7 @@ def create_test_task_metadata( extra_context = {} task_owner = "" + airflow_task_config = {} if test_indirect_selection != TestIndirectSelection.EAGER: task_args["indirect_selection"] = test_indirect_selection.value if node is not None: @@ -110,6 +111,7 @@ def create_test_task_metadata( extra_context = {"dbt_node_config": node.context_dict} task_owner = node.owner + airflow_task_config = node.airflow_task_config elif render_config is not None: # TestBehavior.AFTER_ALL task_args["select"] = render_config.select @@ -119,7 +121,7 @@ def create_test_task_metadata( return TaskMetadata( id=test_task_name, owner=task_owner, - cosmos_custom=node.cosmos_custom, + airflow_task_config=airflow_task_config, operator_class=calculate_operator_class( execution_mode=execution_mode, dbt_class="DbtTest", @@ -192,7 +194,7 @@ def create_task_metadata( task_metadata = TaskMetadata( id=task_id, owner=node.owner, - cosmos_custom=node.cosmos_custom, + airflow_task_config=node.airflow_task_config, operator_class=calculate_operator_class( execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type] ), diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index b0a96c8ef..e25404aed 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -32,7 +32,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) if task.owner != "": task_kwargs["owner"] = task.owner - for k, v in task.cosmos_custom.items(): + for k, v in task.airflow_task_config.items(): task_kwargs[k] = v airflow_task = Operator( diff --git a/cosmos/core/graph/entities.py b/cosmos/core/graph/entities.py index de7df982e..cdd5485a6 100644 --- a/cosmos/core/graph/entities.py +++ b/cosmos/core/graph/entities.py @@ -58,7 +58,7 @@ class Task(CosmosEntity): """ owner: str = "" - cosmos_custom: Dict[str, Any] = field(default_factory=dict) + airflow_task_config: Dict[str, Any] = field(default_factory=dict) operator_class: str = "airflow.operators.empty.EmptyOperator" arguments: Dict[str, Any] = field(default_factory=dict) extra_context: Dict[str, Any] = field(default_factory=dict) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 4726213b2..d3230eda0 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -68,7 +68,7 @@ class DbtNode: has_test: bool = False @property - def cosmos_custom(self) -> Dict[str, Any]: + def airflow_task_config(self) -> Dict[str, Any]: """ This method is designed to extend the dbt project's functionality by incorporating Airflow-related metadata into the dbt YAML configuration. Since dbt projects are independent of Airflow, adding Airflow-specific information to the `meta` field within the dbt YAML allows Airflow tasks to