Skip to content

Commit

Permalink
feat(ingest/snowflake): include external table ddl lineage for querie…
Browse files Browse the repository at this point in the history
…s v2
  • Loading branch information
mayurinehate committed Dec 19, 2024
1 parent 2e54461 commit 97f6230
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,64 +265,17 @@ def _populate_external_upstreams(self, discovered_tables: List[str]) -> None:
with PerfTimer() as timer:
self.report.num_external_table_edges_scanned = 0

for (
known_lineage_mapping
) in self._populate_external_lineage_from_copy_history(discovered_tables):
self.sql_aggregator.add(known_lineage_mapping)
logger.info(
"Done populating external lineage from copy history. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)

for (
known_lineage_mapping
) in self._populate_external_lineage_from_show_query(discovered_tables):
self.sql_aggregator.add(known_lineage_mapping)

logger.info(
"Done populating external lineage from show external tables. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)
for entry in self._get_copy_history_lineage(discovered_tables):
self.sql_aggregator.add(entry)
logger.info("Done populating external lineage from copy history. ")

self.report.external_lineage_queries_secs = timer.elapsed_seconds()

# Handles the case for explicitly created external tables.
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_show_query(
self, discovered_tables: List[str]
) -> Iterable[KnownLineageMapping]:
external_tables_query: str = SnowflakeQuery.show_external_tables()
try:
for db_row in self.connection.query(external_tables_query):
key = self.identifiers.get_dataset_identifier(
db_row["name"], db_row["schema_name"], db_row["database_name"]
)

if key not in discovered_tables:
continue
if db_row["location"].startswith("s3://"):
yield KnownLineageMapping(
upstream_urn=make_s3_urn_for_lineage(
db_row["location"], self.config.env
),
downstream_urn=self.identifiers.gen_dataset_urn(key),
)
self.report.num_external_table_edges_scanned += 1

self.report.num_external_table_edges_scanned += 1
except Exception as e:
logger.debug(e, exc_info=e)
self.structured_reporter.warning(
"Error populating external table lineage from Snowflake",
exc=e,
)
self.report_status(EXTERNAL_LINEAGE, False)

# Handles the case where a table is populated from an external stage/s3 location via copy.
# Eg: copy into category_english from @external_s3_stage;
# Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv';
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_copy_history(
def _get_copy_history_lineage(
self, discovered_tables: List[str]
) -> Iterable[KnownLineageMapping]:
query: str = SnowflakeQuery.copy_lineage_history(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,6 @@ def get_workunits_internal(
for entry in self.fetch_copy_history():
queries.append(entry)

# TODO: Add "show external tables" lineage to the main schema extractor.
# Because it's not a time-based thing, it doesn't really make sense in the snowflake-queries extractor.

with self.report.query_log_fetch_timer:
for entry in self.fetch_query_log():
queries.append(entry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ClassificationHandler,
classification_workunit_processor,
)
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
Expand All @@ -35,6 +36,7 @@
)
from datahub.ingestion.source.snowflake.snowflake_data_reader import SnowflakeDataReader
from datahub.ingestion.source.snowflake.snowflake_profiler import SnowflakeProfiler
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_schema import (
SCHEMA_PARALLELISM,
Expand Down Expand Up @@ -65,6 +67,7 @@
get_domain_wu,
)
from datahub.ingestion.source_report.ingestion_stage import (
EXTERNAL_TABLE_DDL_LINEAGE,
METADATA_EXTRACTION,
PROFILING,
)
Expand Down Expand Up @@ -96,7 +99,10 @@
TimeType,
)
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownLineageMapping,
SqlParsingAggregator,
)
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

Expand Down Expand Up @@ -180,7 +186,8 @@ def __init__(

# These are populated as side-effects of get_workunits_internal.
self.databases: List[SnowflakeDatabase] = []
self.aggregator: Optional[SqlParsingAggregator] = aggregator

self.aggregator = aggregator

def get_connection(self) -> SnowflakeConnection:
return self.connection
Expand Down Expand Up @@ -212,6 +219,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION)
yield from self._process_database(snowflake_db)

self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE)
discovered_tables: List[str] = [
self.identifiers.get_dataset_identifier(
table_name, schema.name, db.name
)
for db in self.databases
for schema in db.schemas
for table_name in schema.tables
]
if self.aggregator:
for entry in self._external_tables_ddl_lineage(discovered_tables):
self.aggregator.add(entry)

except SnowflakePermissionError as e:
self.structured_reporter.failure(
GENERIC_PERMISSION_ERROR_KEY,
Expand Down Expand Up @@ -1082,3 +1102,33 @@ def get_fk_constraints_for_table(

# Access to table but none of its constraints - is this possible ?
return constraints.get(table_name, [])

# Handles the case for explicitly created external tables.
# NOTE: Snowflake does not log this information to the access_history table.
def _external_tables_ddl_lineage(
self, discovered_tables: List[str]
) -> Iterable[KnownLineageMapping]:
external_tables_query: str = SnowflakeQuery.show_external_tables()
try:
for db_row in self.connection.query(external_tables_query):
key = self.identifiers.get_dataset_identifier(
db_row["name"], db_row["schema_name"], db_row["database_name"]
)

if key not in discovered_tables:
continue
if db_row["location"].startswith("s3://"):
yield KnownLineageMapping(
upstream_urn=make_s3_urn_for_lineage(
db_row["location"], self.config.env
),
downstream_urn=self.identifiers.gen_dataset_urn(key),
)
self.report.num_external_table_edges_scanned += 1

self.report.num_external_table_edges_scanned += 1
except Exception as e:
self.structured_reporter.warning(
"External table ddl lineage extraction failed",
exc=e,
)
Original file line number Diff line number Diff line change
Expand Up @@ -161,35 +161,32 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
# For database, schema, tables, views, etc
self.data_dictionary = SnowflakeDataDictionary(connection=self.connection)
self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None
self.aggregator: Optional[SqlParsingAggregator] = None

if self.config.use_queries_v2 or self.config.include_table_lineage:
self.aggregator = self._exit_stack.enter_context(
SqlParsingAggregator(
platform=self.identifiers.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
graph=self.ctx.graph,
eager_graph_load=(
# If we're ingestion schema metadata for tables/views, then we will populate
# schemas into the resolver as we go. We only need to do a bulk fetch
# if we're not ingesting schema metadata as part of ingestion.
not (
self.config.include_technical_schema
and self.config.include_tables
and self.config.include_views
)
and not self.config.lazy_schema_resolver
),
generate_usage_statistics=False,
generate_operations=False,
format_queries=self.config.format_sql_queries,
)

self.aggregator: SqlParsingAggregator = self._exit_stack.enter_context(
SqlParsingAggregator(
platform=self.identifiers.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
graph=self.ctx.graph,
eager_graph_load=(
# If we're ingestion schema metadata for tables/views, then we will populate
# schemas into the resolver as we go. We only need to do a bulk fetch
# if we're not ingesting schema metadata as part of ingestion.
not (
self.config.include_technical_schema
and self.config.include_tables
and self.config.include_views
)
and not self.config.lazy_schema_resolver
),
generate_usage_statistics=False,
generate_operations=False,
format_queries=self.config.format_sql_queries,
)
self.report.sql_aggregator = self.aggregator.report
)
self.report.sql_aggregator = self.aggregator.report

if self.config.include_table_lineage:
assert self.aggregator is not None
redundant_lineage_run_skip_handler: Optional[
RedundantLineageRunSkipHandler
] = None
Expand Down Expand Up @@ -487,8 +484,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

databases = schema_extractor.databases

# TODO: The checkpoint state for stale entity detection can be committed here.

if self.config.shares:
yield from SnowflakeSharesHandler(
self.config, self.report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
USAGE_EXTRACTION_INGESTION = "Usage Extraction Ingestion"
USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats"
USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation"
EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage"
QUERIES_EXTRACTION = "Queries Extraction"
PROFILING = "Profiling"

Expand Down

0 comments on commit 97f6230

Please sign in to comment.