Skip to content

Commit

Permalink
fix(ingest/snowflake): always ingest view and external table ddl line…
Browse files Browse the repository at this point in the history
…age (#12191)
  • Loading branch information
mayurinehate authored Dec 24, 2024
1 parent 87e7b58 commit 4d990b0
Show file tree
Hide file tree
Showing 15 changed files with 36 additions and 78 deletions.
2 changes: 1 addition & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.

## Next

- #12191 - Configs `include_view_lineage` and `include_view_column_lineage` are removed from snowflake ingestion source. View and External Table DDL lineage will always be ingested when definitions are available.
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
Entity urn with `include_workspace_name_in_dataset_urn: false`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,26 +163,13 @@ class SnowflakeConfig(
default=True,
description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.",
)
include_view_lineage: bool = pydantic.Field(
default=True,
description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.",
)

_include_view_lineage = pydantic_removed_field("include_view_lineage")
_include_view_column_lineage = pydantic_removed_field("include_view_column_lineage")

ignore_start_time_lineage: bool = False
upstream_lineage_in_report: bool = False

@pydantic.root_validator(skip_on_failure=True)
def validate_include_view_lineage(cls, values):
if (
"include_table_lineage" in values
and not values.get("include_table_lineage")
and values.get("include_view_lineage")
):
raise ValueError(
"include_table_lineage must be True for include_view_lineage to be set."
)
return values


class SnowflakeV2Config(
SnowflakeConfig,
Expand Down Expand Up @@ -222,11 +209,6 @@ class SnowflakeV2Config(
description="Populates table->table and view->table column lineage. Requires appropriate grants given to the role and the Snowflake Enterprise Edition or above.",
)

include_view_column_lineage: bool = Field(
default=True,
description="Populates view->view and table->view column lineage using DataHub's sql parser.",
)

use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from snowflake.",
Expand Down Expand Up @@ -355,10 +337,6 @@ def get_sql_alchemy_url(
self, database=database, username=username, password=password, role=role
)

@property
def parse_view_ddl(self) -> bool:
return self.include_view_column_lineage

@validator("shares")
def validate_shares(
cls, shares: Optional[Dict[str, SnowflakeShareConfig]], values: Dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from datahub.configuration.datetimes import parse_absolute_time
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.snowflake.constants import (
LINEAGE_PERMISSION_ERROR,
Expand Down Expand Up @@ -163,11 +162,11 @@ def get_time_window(self) -> Tuple[datetime, datetime]:
self.config.end_time,
)

def get_workunits(
def add_time_based_lineage_to_aggregator(
self,
discovered_tables: List[str],
discovered_views: List[str],
) -> Iterable[MetadataWorkUnit]:
) -> None:
if not self._should_ingest_lineage():
return

Expand All @@ -177,9 +176,7 @@ def get_workunits(
# snowflake view/table -> snowflake table
self.populate_table_upstreams(discovered_tables)

for mcp in self.sql_aggregator.gen_metadata():
yield mcp.as_workunit()

def update_state(self):
if self.redundant_run_skip_handler:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
Expand Down Expand Up @@ -337,10 +334,6 @@ def _fetch_upstream_lineages_for_tables(self) -> Iterable[UpstreamLineageEdge]:
start_time_millis=int(self.start_time.timestamp() * 1000),
end_time_millis=int(self.end_time.timestamp() * 1000),
upstreams_deny_pattern=self.config.temporary_tables_pattern,
# The self.config.include_view_lineage setting is about fetching upstreams of views.
# We always generate lineage pointing at views from tables, even if self.config.include_view_lineage is False.
# TODO: Remove this `include_view_lineage` flag, since it's effectively dead code.
include_view_lineage=True,
include_column_lineage=self.config.include_column_lineage,
)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ def view_dependencies() -> str:
def table_to_table_lineage_history_v2(
start_time_millis: int,
end_time_millis: int,
include_view_lineage: bool = True,
include_column_lineage: bool = True,
upstreams_deny_pattern: List[str] = DEFAULT_TEMP_TABLES_PATTERNS,
) -> str:
Expand All @@ -385,14 +384,12 @@ def table_to_table_lineage_history_v2(
start_time_millis,
end_time_millis,
upstreams_deny_pattern,
include_view_lineage,
)
else:
return SnowflakeQuery.table_upstreams_only(
start_time_millis,
end_time_millis,
upstreams_deny_pattern,
include_view_lineage,
)

@staticmethod
Expand Down Expand Up @@ -677,12 +674,9 @@ def table_upstreams_with_column_lineage(
start_time_millis: int,
end_time_millis: int,
upstreams_deny_pattern: List[str],
include_view_lineage: bool = True,
) -> str:
allowed_upstream_table_domains = (
SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER
if include_view_lineage
else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
)

upstream_sql_filter = create_deny_regex_sql_filter(
Expand Down Expand Up @@ -847,12 +841,9 @@ def table_upstreams_only(
start_time_millis: int,
end_time_millis: int,
upstreams_deny_pattern: List[str],
include_view_lineage: bool = True,
) -> str:
allowed_upstream_table_domains = (
SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER
if include_view_lineage
else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
)

upstream_sql_filter = create_deny_regex_sql_filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,7 @@ def _process_schema(
)

if self.config.include_views:
if (
self.aggregator
and self.config.include_view_lineage
and self.config.parse_view_ddl
):
if self.aggregator:
for view in views:
view_identifier = self.identifiers.get_dataset_identifier(
view.name, schema_name, db_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_shares_workunits(
assert len(sibling_dbs) == 1
# SnowflakeLineageExtractor is unaware of database->schema->table hierarchy
# hence this lineage code is not written in SnowflakeLineageExtractor
# also this is not governed by configs include_table_lineage and include_view_lineage
# also this is not governed by configs include_table_lineage
yield self.get_upstream_lineage_with_primary_sibling(
db.name, schema.name, table_name, sibling_dbs[0]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
LINEAGE_EXTRACTION,
METADATA_EXTRACTION,
QUERIES_EXTRACTION,
VIEW_PARSING,
)
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
from datahub.utilities.registries.domain_registry import DomainRegistry
Expand All @@ -103,7 +104,7 @@
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
@capability(
SourceCapability.LINEAGE_COARSE,
"Enabled by default, can be disabled via configuration `include_table_lineage` and `include_view_lineage`",
"Enabled by default, can be disabled via configuration `include_table_lineage`",
)
@capability(
SourceCapability.LINEAGE_FINE,
Expand Down Expand Up @@ -512,15 +513,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
discovered_datasets = discovered_tables + discovered_views

if self.config.use_queries_v2:
self.report.set_ingestion_stage("*", "View Parsing")
assert self.aggregator is not None
self.report.set_ingestion_stage("*", VIEW_PARSING)
yield from auto_workunit(self.aggregator.gen_metadata())

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

schema_resolver = self.aggregator._schema_resolver

queries_extractor: SnowflakeQueriesExtractor = SnowflakeQueriesExtractor(
queries_extractor = SnowflakeQueriesExtractor(
connection=self.connection,
config=SnowflakeQueriesExtractorConfig(
window=self.config,
Expand All @@ -546,13 +546,21 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
queries_extractor.close()

else:
if self.config.include_table_lineage and self.lineage_extractor:
if self.lineage_extractor:
self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION)
yield from self.lineage_extractor.get_workunits(
self.lineage_extractor.add_time_based_lineage_to_aggregator(
discovered_tables=discovered_tables,
discovered_views=discovered_views,
)

# This would emit view and external table ddl lineage
# as well as query lineage via lineage_extractor
for mcp in self.aggregator.gen_metadata():
yield mcp.as_workunit()

if self.lineage_extractor:
self.lineage_extractor.update_state()

if (
self.config.include_usage_stats or self.config.include_operational_stats
) and self.usage_extractor:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats"
USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation"
EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage"
VIEW_PARSING = "View Parsing"
QUERIES_EXTRACTION = "Queries Extraction"
PROFILING = "Profiling"

Expand Down
2 changes: 0 additions & 2 deletions metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ def default_query_results( # noqa: C901
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_view_lineage=True,
include_column_lineage=True,
),
):
Expand Down Expand Up @@ -548,7 +547,6 @@ def default_query_results( # noqa: C901
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_view_lineage=True,
include_column_lineage=False,
),
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=True,
include_view_lineage=True,
include_usage_stats=True,
format_sql_queries=True,
validate_upstreams_against_patterns=False,
Expand Down Expand Up @@ -216,7 +215,6 @@ def test_snowflake_private_link_and_incremental_mcps(
include_table_lineage=True,
include_column_lineage=False,
include_views=True,
include_view_lineage=True,
include_usage_stats=False,
format_sql_queries=True,
incremental_lineage=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_snowflake_classification_perf(num_workers, num_cols_per_table, num_tabl
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=False,
include_view_lineage=False,
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def snowflake_pipeline_config(tmp_path):
include_technical_schema=True,
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_view_lineage=False,
include_usage_stats=False,
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(
tzinfo=timezone.utc
Expand Down Expand Up @@ -227,7 +226,6 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure(
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_view_lineage=True,
include_column_lineage=True,
)
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def test_snowflake_tag_pattern():
),
include_technical_schema=True,
include_table_lineage=False,
include_view_lineage=False,
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
Expand Down Expand Up @@ -74,7 +73,6 @@ def test_snowflake_tag_pattern_deny():
),
include_technical_schema=True,
include_table_lineage=False,
include_view_lineage=False,
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def run_test():
password="TST_PWD",
include_technical_schema=False,
include_table_lineage=True,
include_view_lineage=True,
include_usage_stats=True,
include_operational_stats=True,
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(tzinfo=timezone.utc),
Expand Down
23 changes: 12 additions & 11 deletions metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,6 @@ def test_options_contain_connect_args():
assert connect_args is not None


def test_snowflake_config_with_view_lineage_no_table_lineage_throws_error():
config_dict = default_config_dict.copy()
config_dict["include_view_lineage"] = True
config_dict["include_table_lineage"] = False
with pytest.raises(
ValidationError,
match="include_table_lineage must be True for include_view_lineage to be set",
):
SnowflakeV2Config.parse_obj(config_dict)


def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error():
config_dict = default_config_dict.copy()
config_dict["include_column_lineage"] = True
Expand Down Expand Up @@ -667,6 +656,18 @@ def test_snowflake_utils() -> None:
assert_doctest(datahub.ingestion.source.snowflake.snowflake_utils)


def test_using_removed_fields_causes_no_error() -> None:
assert SnowflakeV2Config.parse_obj(
{
"account_id": "test",
"username": "snowflake",
"password": "snowflake",
"include_view_lineage": "true",
"include_view_column_lineage": "true",
}
)


def test_snowflake_query_result_parsing():
db_row = {
"DOWNSTREAM_TABLE_NAME": "db.schema.downstream_table",
Expand Down

0 comments on commit 4d990b0

Please sign in to comment.