From 50375c701c58462966de948214309d1dd0f00ec8 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Thu, 18 Jul 2024 14:03:24 +1000 Subject: [PATCH] Detect tables that are not present in the mapping file Introduces #1221 --- .../labs/ucx/hive_metastore/mapping.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/databricks/labs/ucx/hive_metastore/mapping.py b/src/databricks/labs/ucx/hive_metastore/mapping.py index b0d258dadd..7f6b8a3fed 100644 --- a/src/databricks/labs/ucx/hive_metastore/mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/mapping.py @@ -66,6 +66,18 @@ def as_uc_table_key(self): def as_hms_table_key(self): return f"hive_metastore.{self.src_schema}.{self.src_table}" +@dataclass +class TableNotMapped: + workspace_name: str + src_table: str + + @classmethod + def initial(cls, workspace_name: str, table: str) -> "TableNotMapped": + return cls( + workspace_name=workspace_name, + src_table=table, + ) + @dataclass class TableToMigrate: @@ -81,6 +93,7 @@ def __eq__(self, other): class TableMapping: FILENAME = 'mapping.csv' + FILENAME_UNMAPPED = 'unmapped_tables.csv' UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" def __init__( @@ -103,10 +116,20 @@ def current_tables(self, tables: TablesCrawler, workspace_name: str, catalog_nam for table in tables_snapshot: yield Rule.initial(workspace_name, catalog_name, table, self._recon_tolerance_percent) + @staticmethod + def tables_not_mapped(tables_crawler: TablesCrawler, current_tables: list[Rule], workspace_name: str): + crawled_tables_keys = [crawled_table.key for crawled_table in tables_crawler.snapshot()] + hms_table_keys = [rule.as_hms_table_key for rule in current_tables] + for crawled_table_key in crawled_tables_keys: + if crawled_table_key not in hms_table_keys: + yield TableNotMapped.initial(workspace_name, crawled_table_key) + def save(self, tables: TablesCrawler, workspace_info: WorkspaceInfo) -> str: workspace_name = workspace_info.current() default_catalog_name = re.sub(r"\W+", "_", workspace_name) current_tables = self.current_tables(tables, workspace_name, default_catalog_name) + unmapped_tables = self.tables_not_mapped(tables, list(current_tables), workspace_name) + self._installation.save(list(unmapped_tables), filename=self.FILENAME_UNMAPPED) return self._installation.save(list(current_tables), filename=self.FILENAME) def load(self) -> list[Rule]: