diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index c769c6705ac3f..69f28a0e6e595 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -265,64 +265,17 @@ def _populate_external_upstreams(self, discovered_tables: List[str]) -> None: with PerfTimer() as timer: self.report.num_external_table_edges_scanned = 0 - for ( - known_lineage_mapping - ) in self._populate_external_lineage_from_copy_history(discovered_tables): - self.sql_aggregator.add(known_lineage_mapping) - logger.info( - "Done populating external lineage from copy history. " - f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far." - ) - - for ( - known_lineage_mapping - ) in self._populate_external_lineage_from_show_query(discovered_tables): - self.sql_aggregator.add(known_lineage_mapping) - - logger.info( - "Done populating external lineage from show external tables. " - f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far." - ) + for entry in self._get_copy_history_lineage(discovered_tables): + self.sql_aggregator.add(entry) + logger.info("Done populating external lineage from copy history. ") self.report.external_lineage_queries_secs = timer.elapsed_seconds() - # Handles the case for explicitly created external tables. - # NOTE: Snowflake does not log this information to the access_history table. - def _populate_external_lineage_from_show_query( - self, discovered_tables: List[str] - ) -> Iterable[KnownLineageMapping]: - external_tables_query: str = SnowflakeQuery.show_external_tables() - try: - for db_row in self.connection.query(external_tables_query): - key = self.identifiers.get_dataset_identifier( - db_row["name"], db_row["schema_name"], db_row["database_name"] - ) - - if key not in discovered_tables: - continue - if db_row["location"].startswith("s3://"): - yield KnownLineageMapping( - upstream_urn=make_s3_urn_for_lineage( - db_row["location"], self.config.env - ), - downstream_urn=self.identifiers.gen_dataset_urn(key), - ) - self.report.num_external_table_edges_scanned += 1 - - self.report.num_external_table_edges_scanned += 1 - except Exception as e: - logger.debug(e, exc_info=e) - self.structured_reporter.warning( - "Error populating external table lineage from Snowflake", - exc=e, - ) - self.report_status(EXTERNAL_LINEAGE, False) - # Handles the case where a table is populated from an external stage/s3 location via copy. # Eg: copy into category_english from @external_s3_stage; # Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv'; # NOTE: Snowflake does not log this information to the access_history table. - def _populate_external_lineage_from_copy_history( + def _get_copy_history_lineage( self, discovered_tables: List[str] ) -> Iterable[KnownLineageMapping]: query: str = SnowflakeQuery.copy_lineage_history( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index 2d2bdc50467c6..174aad0bddd4a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -247,9 +247,6 @@ def get_workunits_internal( for entry in self.fetch_copy_history(): queries.append(entry) - # TODO: Add "show external tables" lineage to the main schema extractor. - # Because it's not a time-based thing, it doesn't really make sense in the snowflake-queries extractor. - with self.report.query_log_fetch_timer: for entry in self.fetch_query_log(): queries.append(entry) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index bc64693b6a108..4b72b09fafe2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -16,6 +16,7 @@ ClassificationHandler, classification_workunit_processor, ) +from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, DatasetSubTypes, @@ -35,6 +36,7 @@ ) from datahub.ingestion.source.snowflake.snowflake_data_reader import SnowflakeDataReader from datahub.ingestion.source.snowflake.snowflake_profiler import SnowflakeProfiler +from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report from datahub.ingestion.source.snowflake.snowflake_schema import ( SCHEMA_PARALLELISM, @@ -65,6 +67,7 @@ get_domain_wu, ) from datahub.ingestion.source_report.ingestion_stage import ( + EXTERNAL_TABLE_DDL_LINEAGE, METADATA_EXTRACTION, PROFILING, ) @@ -96,7 +99,10 @@ TimeType, ) from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties -from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator +from datahub.sql_parsing.sql_parsing_aggregator import ( + KnownLineageMapping, + SqlParsingAggregator, +) from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor @@ -180,7 +186,8 @@ def __init__( # These are populated as side-effects of get_workunits_internal. self.databases: List[SnowflakeDatabase] = [] - self.aggregator: Optional[SqlParsingAggregator] = aggregator + + self.aggregator = aggregator def get_connection(self) -> SnowflakeConnection: return self.connection @@ -212,6 +219,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION) yield from self._process_database(snowflake_db) + self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE) + discovered_tables: List[str] = [ + self.identifiers.get_dataset_identifier( + table_name, schema.name, db.name + ) + for db in self.databases + for schema in db.schemas + for table_name in schema.tables + ] + if self.aggregator: + for entry in self._external_tables_ddl_lineage(discovered_tables): + self.aggregator.add(entry) + except SnowflakePermissionError as e: self.structured_reporter.failure( GENERIC_PERMISSION_ERROR_KEY, @@ -1082,3 +1102,33 @@ def get_fk_constraints_for_table( # Access to table but none of its constraints - is this possible ? return constraints.get(table_name, []) + + # Handles the case for explicitly created external tables. + # NOTE: Snowflake does not log this information to the access_history table. + def _external_tables_ddl_lineage( + self, discovered_tables: List[str] + ) -> Iterable[KnownLineageMapping]: + external_tables_query: str = SnowflakeQuery.show_external_tables() + try: + for db_row in self.connection.query(external_tables_query): + key = self.identifiers.get_dataset_identifier( + db_row["name"], db_row["schema_name"], db_row["database_name"] + ) + + if key not in discovered_tables: + continue + if db_row["location"].startswith("s3://"): + yield KnownLineageMapping( + upstream_urn=make_s3_urn_for_lineage( + db_row["location"], self.config.env + ), + downstream_urn=self.identifiers.gen_dataset_urn(key), + ) + self.report.num_external_table_edges_scanned += 1 + + self.report.num_external_table_edges_scanned += 1 + except Exception as e: + self.structured_reporter.warning( + "External table ddl lineage extraction failed", + exc=e, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index e5883dd0349a3..884e6c49f5b62 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -161,35 +161,32 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): # For database, schema, tables, views, etc self.data_dictionary = SnowflakeDataDictionary(connection=self.connection) self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None - self.aggregator: Optional[SqlParsingAggregator] = None - - if self.config.use_queries_v2 or self.config.include_table_lineage: - self.aggregator = self._exit_stack.enter_context( - SqlParsingAggregator( - platform=self.identifiers.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - graph=self.ctx.graph, - eager_graph_load=( - # If we're ingestion schema metadata for tables/views, then we will populate - # schemas into the resolver as we go. We only need to do a bulk fetch - # if we're not ingesting schema metadata as part of ingestion. - not ( - self.config.include_technical_schema - and self.config.include_tables - and self.config.include_views - ) - and not self.config.lazy_schema_resolver - ), - generate_usage_statistics=False, - generate_operations=False, - format_queries=self.config.format_sql_queries, - ) + + self.aggregator: SqlParsingAggregator = self._exit_stack.enter_context( + SqlParsingAggregator( + platform=self.identifiers.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + graph=self.ctx.graph, + eager_graph_load=( + # If we're ingestion schema metadata for tables/views, then we will populate + # schemas into the resolver as we go. We only need to do a bulk fetch + # if we're not ingesting schema metadata as part of ingestion. + not ( + self.config.include_technical_schema + and self.config.include_tables + and self.config.include_views + ) + and not self.config.lazy_schema_resolver + ), + generate_usage_statistics=False, + generate_operations=False, + format_queries=self.config.format_sql_queries, ) - self.report.sql_aggregator = self.aggregator.report + ) + self.report.sql_aggregator = self.aggregator.report if self.config.include_table_lineage: - assert self.aggregator is not None redundant_lineage_run_skip_handler: Optional[ RedundantLineageRunSkipHandler ] = None @@ -487,8 +484,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: databases = schema_extractor.databases - # TODO: The checkpoint state for stale entity detection can be committed here. - if self.config.shares: yield from SnowflakeSharesHandler( self.config, self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 4308b405e46e3..92407eaae6e90 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -14,6 +14,7 @@ USAGE_EXTRACTION_INGESTION = "Usage Extraction Ingestion" USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats" USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation" +EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage" QUERIES_EXTRACTION = "Queries Extraction" PROFILING = "Profiling"