diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 34123253bd..d34318d6db 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -57,12 +57,22 @@ def __init__( self._seen_tables: dict[str, str] = {} self._principal_grants = principal_grants - def not_migrated_refresh(self) -> list[Table]: + def get_remaining_tables(self) -> list[Table]: table_rows: list[Table] = [] for crawled_table in self._tc.snapshot(): - if not self.is_migrated(crawled_table.database, crawled_table.name): + if not self._is_migrated(crawled_table.database, crawled_table.name): table_rows.append(crawled_table) - return table_rows # depending on how to publish this data, we may need to convert it to other forms to able to show it in the dashboard + logger.info(f"remained-table-to-migrate: {crawled_table.key}") + return table_rows + + + # def get_remaining_tables(self, workspace_name) -> list[Table]: + # table_rows: list[Table] = [] + # for crawled_table in self._tc.snapshot(): + # if not self._is_migrated(crawled_table.database, crawled_table.name): + # table_rows.append(crawled_table) + # logger.info(f"remained-table-to-migrate: {crawled_table.key} in {workspace_name}") + # return table_rows def index(self): return self._migration_status_refresher.index() @@ -496,3 +506,6 @@ def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int): f"('upgraded_from' = '{source}'" f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');" ) + def _is_migrated(self, schema: str, table: str) -> bool: + index = self._migration_status_refresher.index() + return index.is_migrated(schema, table) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index e25aa92950..ee19c2b19e 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -73,10 +73,13 @@ def refresh_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.index_full_refresh() - @job_task(job_cluster="table_migration", depends_on=[migrate_external_tables_sync, migrate_dbfs_root_delta_tables, migrate_dbfs_root_non_delta_tables,migrate_views]) + @job_task(job_cluster="table_migration", depends_on=[migrate_external_tables_sync, migrate_dbfs_root_delta_tables, + migrate_dbfs_root_non_delta_tables, migrate_views, refresh_migration_status]) def refresh_not_migrated_status(self, ctx: RuntimeContext): """Refresh the not migrated tables status to present it in the dashboard.""" - ctx.tables_migrator.not_migrated_refresh() + ctx.tables_migrator.get_remaining_tables( + # workspace_name=ctx.workspace_info.current() + ) class MigrateHiveSerdeTablesInPlace(Workflow): def __init__(self): diff --git a/src/databricks/labs/ucx/queries/assessment/main/40_3_logs.sql b/src/databricks/labs/ucx/queries/assessment/main/40_3_logs.sql index 04cbfdb762..02aa7c472a 100644 --- a/src/databricks/labs/ucx/queries/assessment/main/40_3_logs.sql +++ b/src/databricks/labs/ucx/queries/assessment/main/40_3_logs.sql @@ -22,4 +22,4 @@ WHERE ) ) ORDER BY - timestamp ASC \ No newline at end of file + timestamp ASC diff --git a/src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql b/src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql new file mode 100644 index 0000000000..d6d0e6d0cb --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql @@ -0,0 +1,4 @@ +/* --title 'List of remaining tables in HMS' --width 6 */ +SELECT message +FROM inventory.logs +WHERE message LIKE '"remained-table-to-migrate: %' diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 84bcf5e46d..a67f9fc677 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -97,3 +97,23 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name except NotFound: assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + +#TODO: add an integration test for refresh_not_migrated_status task in migrate-tables workflow +@pytest.mark.parametrize('prepare_tables_for_migration', [('regular')], indirect=True) +def test_refresh_not_migrated_status_job(ws, installation_ctx, prepare_tables_for_migration): + tables, dst_schema = prepare_tables_for_migration + ctx = installation_ctx.replace( + extend_prompts={ + r".*Do you want to update the existing installation?.*": 'yes', + }, + ) + ctx.workspace_installation.run() + ctx.deployed_workflows.run_workflow("migrate-tables") + # assert the workflow is successful + assert ctx.deployed_workflows.validate_step("migrate-tables") +# # assert the tables are migrated +# for table in tables.values(): +# try: +# assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name +# except NotFound: +# assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"