Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): include external table ddl lineage for queries… #12179

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,64 +265,17 @@ def _populate_external_upstreams(self, discovered_tables: List[str]) -> None:
with PerfTimer() as timer:
self.report.num_external_table_edges_scanned = 0

for (
known_lineage_mapping
) in self._populate_external_lineage_from_copy_history(discovered_tables):
self.sql_aggregator.add(known_lineage_mapping)
logger.info(
"Done populating external lineage from copy history. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)

for (
known_lineage_mapping
) in self._populate_external_lineage_from_show_query(discovered_tables):
self.sql_aggregator.add(known_lineage_mapping)

logger.info(
"Done populating external lineage from show external tables. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)
for entry in self._get_copy_history_lineage(discovered_tables):
self.sql_aggregator.add(entry)
logger.info("Done populating external lineage from copy history. ")

self.report.external_lineage_queries_secs = timer.elapsed_seconds()

# Handles the case for explicitly created external tables.
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_show_query(
self, discovered_tables: List[str]
) -> Iterable[KnownLineageMapping]:
external_tables_query: str = SnowflakeQuery.show_external_tables()
try:
for db_row in self.connection.query(external_tables_query):
key = self.identifiers.get_dataset_identifier(
db_row["name"], db_row["schema_name"], db_row["database_name"]
)

if key not in discovered_tables:
continue
if db_row["location"].startswith("s3://"):
yield KnownLineageMapping(
upstream_urn=make_s3_urn_for_lineage(
db_row["location"], self.config.env
),
downstream_urn=self.identifiers.gen_dataset_urn(key),
)
self.report.num_external_table_edges_scanned += 1

self.report.num_external_table_edges_scanned += 1
except Exception as e:
logger.debug(e, exc_info=e)
self.structured_reporter.warning(
"Error populating external table lineage from Snowflake",
exc=e,
)
self.report_status(EXTERNAL_LINEAGE, False)

# Handles the case where a table is populated from an external stage/s3 location via copy.
# Eg: copy into category_english from @external_s3_stage;
# Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv';
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_copy_history(
def _get_copy_history_lineage(
self, discovered_tables: List[str]
) -> Iterable[KnownLineageMapping]:
query: str = SnowflakeQuery.copy_lineage_history(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,6 @@ def get_workunits_internal(
for entry in self.fetch_copy_history():
queries.append(entry)

# TODO: Add "show external tables" lineage to the main schema extractor.
# Because it's not a time-based thing, it doesn't really make sense in the snowflake-queries extractor.

with self.report.query_log_fetch_timer:
for entry in self.fetch_query_log():
queries.append(entry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ClassificationHandler,
classification_workunit_processor,
)
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
Expand All @@ -35,6 +36,7 @@
)
from datahub.ingestion.source.snowflake.snowflake_data_reader import SnowflakeDataReader
from datahub.ingestion.source.snowflake.snowflake_profiler import SnowflakeProfiler
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_schema import (
SCHEMA_PARALLELISM,
Expand Down Expand Up @@ -65,6 +67,7 @@
get_domain_wu,
)
from datahub.ingestion.source_report.ingestion_stage import (
EXTERNAL_TABLE_DDL_LINEAGE,
METADATA_EXTRACTION,
PROFILING,
)
Expand Down Expand Up @@ -96,7 +99,10 @@
TimeType,
)
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownLineageMapping,
SqlParsingAggregator,
)
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

Expand Down Expand Up @@ -180,7 +186,8 @@ def __init__(

# These are populated as side-effects of get_workunits_internal.
self.databases: List[SnowflakeDatabase] = []
self.aggregator: Optional[SqlParsingAggregator] = aggregator

self.aggregator = aggregator

def get_connection(self) -> SnowflakeConnection:
return self.connection
Expand Down Expand Up @@ -212,6 +219,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION)
yield from self._process_database(snowflake_db)

self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE)
discovered_tables: List[str] = [
self.identifiers.get_dataset_identifier(
table_name, schema.name, db.name
)
for db in self.databases
for schema in db.schemas
for table_name in schema.tables
]
if self.aggregator:
for entry in self._external_tables_ddl_lineage(discovered_tables):
self.aggregator.add(entry)

except SnowflakePermissionError as e:
self.structured_reporter.failure(
GENERIC_PERMISSION_ERROR_KEY,
Expand Down Expand Up @@ -1082,3 +1102,33 @@ def get_fk_constraints_for_table(

# Access to table but none of its constraints - is this possible ?
return constraints.get(table_name, [])

# Handles the case for explicitly created external tables.
# NOTE: Snowflake does not log this information to the access_history table.
def _external_tables_ddl_lineage(
self, discovered_tables: List[str]
) -> Iterable[KnownLineageMapping]:
external_tables_query: str = SnowflakeQuery.show_external_tables()
try:
for db_row in self.connection.query(external_tables_query):
key = self.identifiers.get_dataset_identifier(
db_row["name"], db_row["schema_name"], db_row["database_name"]
)

if key not in discovered_tables:
continue
if db_row["location"].startswith("s3://"):
yield KnownLineageMapping(
upstream_urn=make_s3_urn_for_lineage(
db_row["location"], self.config.env
),
downstream_urn=self.identifiers.gen_dataset_urn(key),
)
self.report.num_external_table_edges_scanned += 1

self.report.num_external_table_edges_scanned += 1
except Exception as e:
self.structured_reporter.warning(
"External table ddl lineage extraction failed",
exc=e,
)
Original file line number Diff line number Diff line change
Expand Up @@ -161,35 +161,32 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
# For database, schema, tables, views, etc
self.data_dictionary = SnowflakeDataDictionary(connection=self.connection)
self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None
self.aggregator: Optional[SqlParsingAggregator] = None

if self.config.use_queries_v2 or self.config.include_table_lineage:
self.aggregator = self._exit_stack.enter_context(
SqlParsingAggregator(
platform=self.identifiers.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
graph=self.ctx.graph,
eager_graph_load=(
# If we're ingestion schema metadata for tables/views, then we will populate
# schemas into the resolver as we go. We only need to do a bulk fetch
# if we're not ingesting schema metadata as part of ingestion.
not (
self.config.include_technical_schema
and self.config.include_tables
and self.config.include_views
)
and not self.config.lazy_schema_resolver
),
generate_usage_statistics=False,
generate_operations=False,
format_queries=self.config.format_sql_queries,
)

self.aggregator: SqlParsingAggregator = self._exit_stack.enter_context(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be calling self.aggregator.gen_metadata() unconditionally. right now, it seems like we only do it if some flags are set

Copy link
Collaborator Author

@mayurinehate mayurinehate Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed via #12191

gen_metadata is called in two separate conditions however the conditions are mutually exclusive as well as exhaustive. We can not pull gen_metadata in common place since the code flow without use_queries_v2 also relies on same aggregator to add table level lineage.

SqlParsingAggregator(
platform=self.identifiers.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
graph=self.ctx.graph,
eager_graph_load=(
# If we're ingestion schema metadata for tables/views, then we will populate
# schemas into the resolver as we go. We only need to do a bulk fetch
# if we're not ingesting schema metadata as part of ingestion.
not (
self.config.include_technical_schema
and self.config.include_tables
and self.config.include_views
)
and not self.config.lazy_schema_resolver
),
generate_usage_statistics=False,
generate_operations=False,
format_queries=self.config.format_sql_queries,
)
self.report.sql_aggregator = self.aggregator.report
)
self.report.sql_aggregator = self.aggregator.report

if self.config.include_table_lineage:
assert self.aggregator is not None
redundant_lineage_run_skip_handler: Optional[
RedundantLineageRunSkipHandler
] = None
Expand Down Expand Up @@ -487,8 +484,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

databases = schema_extractor.databases

# TODO: The checkpoint state for stale entity detection can be committed here.

if self.config.shares:
yield from SnowflakeSharesHandler(
self.config, self.report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
USAGE_EXTRACTION_INGESTION = "Usage Extraction Ingestion"
USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats"
USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation"
EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage"
QUERIES_EXTRACTION = "Queries Extraction"
PROFILING = "Profiling"

Expand Down
Loading