From ecf6c8c58858aeea64262dae8fdb586b21d45fe9 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Sun, 15 Dec 2024 10:27:50 -0500 Subject: [PATCH] feat(airflow): add `DatahubRestHook.make_graph` method (#12116) --- .../example_dags/graph_usage_sample_dag.py | 35 +++++++++++++++++++ .../datahub_airflow_plugin/hooks/datahub.py | 4 +++ 2 files changed, 39 insertions(+) create mode 100644 metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/graph_usage_sample_dag.py diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/graph_usage_sample_dag.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/graph_usage_sample_dag.py new file mode 100644 index 0000000000000..d72ba67c23cd7 --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/graph_usage_sample_dag.py @@ -0,0 +1,35 @@ +"""This example DAG demonstrates how to create and use a DataHubGraph client.""" + +from datetime import timedelta + +import pendulum +from airflow.decorators import dag, task +from datahub.ingestion.graph.client import DataHubGraph, RemovedStatusFilter + +from datahub_airflow_plugin.hooks.datahub import DatahubRestHook + + +@dag( + schedule_interval=timedelta(days=1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, +) +def datahub_graph_usage_sample_dag(): + @task() + def use_the_graph(): + graph: DataHubGraph = DatahubRestHook("my_datahub_rest_conn_id").make_graph() + graph.test_connection() + + # Example usage: Find all soft-deleted BigQuery DEV entities + # in DataHub, and hard delete them. + for urn in graph.get_urns_by_filter( + platform="bigquery", + env="DEV", + status=RemovedStatusFilter.ONLY_SOFT_DELETED, + ): + graph.hard_delete_entity(urn) + + use_the_graph() + + +datahub_graph_usage_sample_dag() diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py index b60f20c5bf8b2..5f4d787fb893d 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py @@ -14,6 +14,7 @@ from datahub.emitter.kafka_emitter import DatahubKafkaEmitter from datahub.emitter.rest_emitter import DataHubRestEmitter from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter + from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig @@ -94,6 +95,9 @@ def make_emitter(self) -> "DataHubRestEmitter": host, token, **extra_args ) + def make_graph(self) -> "DataHubGraph": + return self.make_emitter().to_graph() + def emit( self, items: Sequence[