Skip to content

Commit

Permalink
feat(tableau): adds more reporting metrics to better understand linea…
Browse files Browse the repository at this point in the history
…ge construction in tableau ingestion (#12008)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
sgomezvillamor and hsheth2 authored Dec 9, 2024
1 parent 70bec48 commit 0e7ebaf
Showing 1 changed file with 38 additions and 15 deletions.
53 changes: 38 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,15 @@ class TableauSourceReport(StaleEntityRemovalSourceReport):
num_datasource_field_skipped_no_name: int = 0
num_csql_field_skipped_no_name: int = 0
num_table_field_skipped_no_name: int = 0
# lineage
num_tables_with_upstream_lineage: int = 0
num_upstream_table_lineage: int = 0
num_upstream_fine_grained_lineage: int = 0
num_upstream_table_skipped_no_name: int = 0
num_upstream_table_skipped_no_columns: int = 0
num_upstream_table_failed_generate_reference: int = 0
num_upstream_table_lineage_failed_parse_sql: int = 0
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0


@platform_name("Tableau")
Expand Down Expand Up @@ -1311,7 +1319,7 @@ def _create_upstream_table_lineage(
datasource: dict,
browse_path: Optional[str],
is_embedded_ds: bool = False,
) -> Tuple:
) -> Tuple[List[Upstream], List[FineGrainedLineage]]:
upstream_tables: List[Upstream] = []
fine_grained_lineages: List[FineGrainedLineage] = []
table_id_to_urn = {}
Expand Down Expand Up @@ -1472,6 +1480,7 @@ def get_upstream_tables(
c.COLUMNS_CONNECTION
].get("totalCount")
if not is_custom_sql and not num_tbl_cols:
self.report.num_upstream_table_skipped_no_columns += 1
logger.warning(
f"Skipping upstream table with id {table[c.ID]}, no columns: {table}"
)
Expand All @@ -1488,6 +1497,7 @@ def get_upstream_tables(
table, default_schema_map=self.config.default_schema_map
)
except Exception as e:
self.report.num_upstream_table_failed_generate_reference += 1
self.report.warning(
title="Potentially Missing Lineage Issue",
message="Failed to generate upstream reference",
Expand Down Expand Up @@ -1659,15 +1669,7 @@ def get_upstream_fields_from_custom_sql(
func_overridden_info=None, # Here we don't want to override any information from configuration
)

if parsed_result is None:
logger.info(
f"Failed to extract column level lineage from datasource {datasource_urn}"
)
return []
if parsed_result.debug_info.error:
logger.info(
f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.error}"
)
if parsed_result is None or parsed_result.debug_info.error:
return []

cll: List[ColumnLineageInfo] = (
Expand Down Expand Up @@ -2031,6 +2033,8 @@ def _create_lineage_to_upstream_tables(
aspect_name=c.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
)
self.report.num_tables_with_upstream_lineage += 1
self.report.num_upstream_table_lineage += len(upstream_tables)

@staticmethod
def _clean_tableau_query_parameters(query: str) -> str:
Expand Down Expand Up @@ -2130,7 +2134,7 @@ def parse_custom_sql(
f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}"
)

return create_lineage_sql_parsed_result(
parsed_result = create_lineage_sql_parsed_result(
query=query,
default_db=upstream_db,
platform=platform,
Expand All @@ -2140,6 +2144,21 @@ def parse_custom_sql(
schema_aware=not self.config.sql_parsing_disable_schema_awareness,
)

assert parsed_result is not None

if parsed_result.debug_info.table_error:
logger.warning(
f"Failed to extract table lineage from datasource {datasource_urn}: {parsed_result.debug_info.table_error}"
)
self.report.num_upstream_table_lineage_failed_parse_sql += 1
elif parsed_result.debug_info.column_error:
logger.warning(
f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.column_error}"
)
self.report.num_upstream_fine_grained_lineage_failed_parse_sql += 1

return parsed_result

def _enrich_database_tables_with_parsed_schemas(
self, parsing_result: SqlParsingResult
) -> None:
Expand Down Expand Up @@ -2174,9 +2193,6 @@ def _create_lineage_from_unsupported_csql(
)

if parsed_result is None:
logger.info(
f"Failed to extract table level lineage for datasource {csql_urn}"
)
return

self._enrich_database_tables_with_parsed_schemas(parsed_result)
Expand All @@ -2196,12 +2212,14 @@ def _create_lineage_from_unsupported_csql(
upstreams=upstream_tables,
fineGrainedLineages=fine_grained_lineages,
)

yield self.get_metadata_change_proposal(
csql_urn,
aspect_name=c.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
)
self.report.num_tables_with_upstream_lineage += 1
self.report.num_upstream_table_lineage += len(upstream_tables)
self.report.num_upstream_fine_grained_lineage += len(fine_grained_lineages)

def _get_schema_metadata_for_datasource(
self, datasource_fields: List[dict]
Expand Down Expand Up @@ -2352,6 +2370,11 @@ def emit_datasource(
aspect_name=c.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
)
self.report.num_tables_with_upstream_lineage += 1
self.report.num_upstream_table_lineage += len(upstream_tables)
self.report.num_upstream_fine_grained_lineage += len(
fine_grained_lineages
)

# Datasource Fields
schema_metadata = self._get_schema_metadata_for_datasource(
Expand Down

0 comments on commit 0e7ebaf

Please sign in to comment.