diff --git a/pyproject.toml b/pyproject.toml index b2670a23..eb7512b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "coverage[toml]>=6.5", "pytest", "pylint", + "databricks-labs-pytester>=0.2.1", "pytest-xdist", "pytest-cov>=4.0.0,<5.0.0", "pytest-mock>=3.0.0,<4.0.0", diff --git a/src/databricks/labs/lsql/backends.py b/src/databricks/labs/lsql/backends.py index 35e651c8..52378224 100644 --- a/src/databricks/labs/lsql/backends.py +++ b/src/databricks/labs/lsql/backends.py @@ -13,10 +13,12 @@ BadRequest, DatabricksError, DataLoss, + InternalError, NotFound, PermissionDenied, Unknown, ) +from databricks.sdk.retries import retried from databricks.sdk.service.compute import Language from databricks.labs.lsql.core import Row, StatementExecutionExt @@ -202,6 +204,8 @@ def __init__(self, ws: WorkspaceClient, warehouse_id, *, max_records_per_batch: self._debug_truncate_bytes = debug_truncate_bytes if isinstance(debug_truncate_bytes, int) else 96 super().__init__(max_records_per_batch) + # InternalError is retried on for resilience on sporadic Databricks issues. + @retried(on=[InternalError], timeout=datetime.timedelta(minutes=1)) def execute(self, sql: str, *, catalog: str | None = None, schema: str | None = None) -> None: logger.debug(f"[api][execute] {self._only_n_bytes(sql, self._debug_truncate_bytes)}") self._sql.execute(sql, catalog=catalog, schema=schema) diff --git a/src/databricks/labs/lsql/core.py b/src/databricks/labs/lsql/core.py index f60a1620..856315b1 100644 --- a/src/databricks/labs/lsql/core.py +++ b/src/databricks/labs/lsql/core.py @@ -140,7 +140,7 @@ class StatementExecutionExt: megabytes or gigabytes of data serialized in Apache Arrow format, and low result fetching latency, should use the stateful Databricks SQL Connector for Python.""" - def __init__( # pylint: disable=too-many-arguments + def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments self, ws: WorkspaceClient, disposition: Disposition | None = None, diff --git a/src/databricks/labs/lsql/deployment.py b/src/databricks/labs/lsql/deployment.py index d765c143..36a1beac 100644 --- a/src/databricks/labs/lsql/deployment.py +++ b/src/databricks/labs/lsql/deployment.py @@ -1,46 +1,53 @@ -import datetime as dt import logging import pkgutil from typing import Any -from databricks.sdk.errors import InternalError -from databricks.sdk.retries import retried - from databricks.labs.lsql.backends import Dataclass, SqlBackend logger = logging.getLogger(__name__) class SchemaDeployer: - def __init__(self, sql_backend: SqlBackend, inventory_schema: str, mod: Any): + """Deploy schema, tables, and views for a given inventory schema.""" + + def __init__( + self, + sql_backend: SqlBackend, + schema: str, + mod: Any, + *, + catalog: str = "hive_metastore", + ) -> None: self._sql_backend = sql_backend - self._inventory_schema = inventory_schema + self._schema = schema self._module = mod + self._catalog = catalog - # InternalError are retried for resilience on sporadic Databricks issues - @retried(on=[InternalError], timeout=dt.timedelta(minutes=1)) - def deploy_schema(self): - logger.info(f"Ensuring {self._inventory_schema} database exists") - self._sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS hive_metastore.{self._inventory_schema}") - - def delete_schema(self): - logger.info(f"deleting {self._inventory_schema} database") + def deploy_schema(self) -> None: + schema_full_name = f"{self._catalog}.{self._schema}" + logger.info(f"Ensuring {schema_full_name} database exists") + self._sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_full_name}") - self._sql_backend.execute(f"DROP SCHEMA IF EXISTS hive_metastore.{self._inventory_schema} CASCADE") + def delete_schema(self) -> None: + schema_full_name = f"{self._catalog}.{self._schema}" + logger.info(f"Deleting {schema_full_name} database") + self._sql_backend.execute(f"DROP SCHEMA IF EXISTS {schema_full_name} CASCADE") - def deploy_table(self, name: str, klass: Dataclass): - logger.info(f"Ensuring {self._inventory_schema}.{name} table exists") - self._sql_backend.create_table(f"hive_metastore.{self._inventory_schema}.{name}", klass) + def deploy_table(self, name: str, klass: Dataclass) -> None: + table_full_name = f"{self._catalog}.{self._schema}.{name}" + logger.info(f"Ensuring {table_full_name} table exists") + self._sql_backend.create_table(table_full_name, klass) - def deploy_view(self, name: str, relative_filename: str): + def deploy_view(self, name: str, relative_filename: str) -> None: query = self._load(relative_filename) - logger.info(f"Ensuring {self._inventory_schema}.{name} view matches {relative_filename} contents") - ddl = f"CREATE OR REPLACE VIEW hive_metastore.{self._inventory_schema}.{name} AS {query}" + view_full_name = f"{self._catalog}.{self._schema}.{name}" + logger.info(f"Ensuring {view_full_name} view matches {relative_filename} contents") + ddl = f"CREATE OR REPLACE VIEW {view_full_name} AS {query}" self._sql_backend.execute(ddl) def _load(self, relative_filename: str) -> str: data = pkgutil.get_data(self._module.__name__, relative_filename) assert data is not None sql = data.decode("utf-8") - sql = sql.replace("$inventory", f"hive_metastore.{self._inventory_schema}") + sql = sql.replace("$inventory", f"{self._catalog}.{self._schema}") return sql diff --git a/tests/integration/test_backends.py b/tests/integration/test_backends.py index b29b2e97..6930aec2 100644 --- a/tests/integration/test_backends.py +++ b/tests/integration/test_backends.py @@ -3,8 +3,11 @@ from databricks.labs.blueprint.installation import Installation from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2 +from databricks.labs.lsql import Row from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend +from . import views + INCORRECT_SCHEMA = """ from databricks.labs.lsql.backends import RuntimeBackend from databricks.sdk.errors import NotFound @@ -148,6 +151,18 @@ def test_statement_execution_backend_overrides(ws, env_or_skip): assert len(rows) == 10 +def test_statement_execution_backend_overwrites_table(ws, env_or_skip, make_random) -> None: + sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")) + catalog = env_or_skip("TEST_CATALOG") + schema = env_or_skip("TEST_SCHEMA") + + sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("abc", True)], views.Foo, "append") + sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("xyz", True)], views.Foo, "overwrite") + + rows = list(sql_backend.fetch(f"SELECT * FROM {catalog}.{schema}.foo")) + assert rows == [Row(first="xyz", second=True)] + + def test_runtime_backend_use_statements(ws): product_info = ProductInfo.for_testing(SqlBackend) installation = Installation.assume_user_home(ws, product_info.product_name()) diff --git a/tests/integration/test_deployment.py b/tests/integration/test_deployment.py index fff42122..f9e83189 100644 --- a/tests/integration/test_deployment.py +++ b/tests/integration/test_deployment.py @@ -1,37 +1,24 @@ import pytest from databricks.labs.lsql import Row -from databricks.labs.lsql.backends import StatementExecutionBackend from databricks.labs.lsql.deployment import SchemaDeployer from . import views -@pytest.mark.xfail -def test_deploys_database(ws, env_or_skip, make_random): - # TODO: create per-project/per-scope catalog - schema = "default" - sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")) +@pytest.mark.xfail(reason="Identity used in CI misses privileges to create UC resources") +def test_deploys_schema(ws, sql_backend, make_random, make_catalog) -> None: + """Test deploying a full, minimal inventory schema with a single schema, table and view.""" + catalog = make_catalog(name=f"lsql_test_{make_random()}") + schema_name = "lsql_test" + table_full_name = f"{catalog.name}.{schema_name}.foo" - deployer = SchemaDeployer(sql_backend, schema, views) + deployer = SchemaDeployer(sql_backend, schema_name, views, catalog=catalog.name) deployer.deploy_schema() deployer.deploy_table("foo", views.Foo) deployer.deploy_view("some", "some.sql") - sql_backend.save_table(f"{schema}.foo", [views.Foo("abc", True)], views.Foo) - rows = list(sql_backend.fetch(f"SELECT * FROM {schema}.some")) + sql_backend.save_table(table_full_name, [views.Foo("abc", True)], views.Foo) - assert rows == [Row(name="abc", id=1)] - - -def test_overwrite(ws, env_or_skip, make_random): - schema = "default" - sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")) - catalog = env_or_skip("TEST_CATALOG") - schema = env_or_skip("TEST_SCHEMA") - - sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("abc", True)], views.Foo, "append") - sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("xyz", True)], views.Foo, "overwrite") - rows = list(sql_backend.fetch(f"SELECT * FROM {catalog}.{schema}.foo")) - - assert rows == [Row(first="xyz", second=True)] + rows = list(sql_backend.fetch(f"SELECT * FROM {table_full_name}")) + assert rows == [Row(first="abc", second=1)] diff --git a/tests/unit/test_deployment.py b/tests/unit/test_deployment.py index 1e407a6f..027449f9 100644 --- a/tests/unit/test_deployment.py +++ b/tests/unit/test_deployment.py @@ -1,24 +1,46 @@ +import logging from dataclasses import dataclass +import pytest + from databricks.labs.lsql.backends import MockBackend from databricks.labs.lsql.deployment import SchemaDeployer from . import views -def test_deploys_view(): +@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"]) +def test_deploys_schema(caplog, inventory_catalog: str) -> None: mock_backend = MockBackend() deployment = SchemaDeployer( sql_backend=mock_backend, - inventory_schema="inventory", + schema="inventory", mod=views, + catalog=inventory_catalog, ) - deployment.deploy_view("some", "some.sql") + with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"): + deployment.deploy_schema() - assert mock_backend.queries == [ - "CREATE OR REPLACE VIEW hive_metastore.inventory.some AS SELECT\n id,\n name\nFROM hive_metastore.inventory.something" - ] + assert mock_backend.queries == [f"CREATE SCHEMA IF NOT EXISTS {inventory_catalog}.inventory"] + assert f"Ensuring {inventory_catalog}.inventory database exists" in caplog.messages + + +@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"]) +def test_deletes_schema(caplog, inventory_catalog: str) -> None: + mock_backend = MockBackend() + deployment = SchemaDeployer( + sql_backend=mock_backend, + schema="inventory", + mod=views, + catalog=inventory_catalog, + ) + + with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"): + deployment.delete_schema() + + assert mock_backend.queries == [f"DROP SCHEMA IF EXISTS {inventory_catalog}.inventory CASCADE"] + assert f"Deleting {inventory_catalog}.inventory database" in caplog.messages @dataclass @@ -27,19 +49,40 @@ class Foo: second: bool -def test_deploys_dataclass(): +@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"]) +def test_deploys_dataclass(caplog, inventory_catalog: str) -> None: mock_backend = MockBackend() deployment = SchemaDeployer( sql_backend=mock_backend, - inventory_schema="inventory", + schema="inventory", mod=views, + catalog=inventory_catalog, ) - deployment.deploy_schema() - deployment.deploy_table("foo", Foo) - deployment.delete_schema() + + with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"): + deployment.deploy_table("foo", Foo) + + assert mock_backend.queries == [ + f"CREATE TABLE IF NOT EXISTS {inventory_catalog}.inventory.foo (first STRING NOT NULL, second BOOLEAN NOT NULL) USING DELTA", + ] + assert f"Ensuring {inventory_catalog}.inventory.foo table exists" in caplog.messages + + +@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"]) +def test_deploys_view(caplog, inventory_catalog: str) -> None: + mock_backend = MockBackend() + deployment = SchemaDeployer( + sql_backend=mock_backend, + schema="inventory", + mod=views, + catalog=inventory_catalog, + ) + + with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"): + deployment.deploy_view("some", "some.sql") assert mock_backend.queries == [ - "CREATE SCHEMA IF NOT EXISTS hive_metastore.inventory", - "CREATE TABLE IF NOT EXISTS hive_metastore.inventory.foo (first STRING NOT NULL, second BOOLEAN NOT NULL) USING DELTA", - "DROP SCHEMA IF EXISTS hive_metastore.inventory CASCADE", + f"CREATE OR REPLACE VIEW {inventory_catalog}.inventory.some AS SELECT\n id,\n name\n" + f"FROM {inventory_catalog}.inventory.something" ] + assert f"Ensuring {inventory_catalog}.inventory.some view matches some.sql contents" in caplog.messages