diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..161b3dd --- /dev/null +++ b/Makefile @@ -0,0 +1,6 @@ +clean: + rm -fr build .databricks dlt_meta.egg-info + +dev: + python3 -m venv .databricks + .databricks/bin/python -m pip install -e . \ No newline at end of file diff --git a/discoverx/cli.py b/discoverx/cli.py new file mode 100644 index 0000000..0a17b5f --- /dev/null +++ b/discoverx/cli.py @@ -0,0 +1,49 @@ +import json +import logging +import sys + +from databricks.connect.session import DatabricksSession +from discoverx import DX + + +logger = logging.getLogger('databricks.labs.discoverx') + +def scan(spark, from_tables: str = '*.*.*', rules: str = '*', sample_size: str = '10000', what_if: str = 'false', locale='US'): + logger.info(f'scan: from_tables={from_tables} rules={rules}') + dx = DX(spark=spark, locale=locale) + dx.scan(from_tables=from_tables, rules=rules, sample_size=int(sample_size), what_if='true' == what_if) + print(dx.scan_result.head()) + + +MAPPING = { + 'scan': scan, +} + + +def main(raw): + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setLevel('DEBUG') + logging.root.addHandler(console_handler) + + payload = json.loads(raw) + command = payload['command'] + if command not in MAPPING: + raise KeyError(f'cannot find command: {command}') + flags = payload['flags'] + log_level = flags.pop('log_level') + if log_level != 'disabled': + databricks_logger = logging.getLogger("databricks") + databricks_logger.setLevel(log_level.upper()) + + kwargs = {k.replace('-', '_'): v for k,v in flags.items()} + + try: + spark = DatabricksSession.builder.getOrCreate() + MAPPING[command](spark, **kwargs) + except Exception as e: + logger.error(f'ERROR: {e}') + logger.debug(f'Failed execution of {command}', exc_info=e) + + +if __name__ == "__main__": + main(*sys.argv[1:]) diff --git a/discoverx/discovery.py b/discoverx/discovery.py index ea4dca4..6449f02 100644 --- a/discoverx/discovery.py +++ b/discoverx/discovery.py @@ -1,13 +1,13 @@ from typing import Optional, List, Union -from discoverx import logging +from discoverx import logs from discoverx.msql import Msql from discoverx.table_info import TableInfo from discoverx.scanner import Scanner, ScanResult from discoverx.rules import Rules, Rule from pyspark.sql import SparkSession -logger = logging.Logging() +logger = logs.Logging() class Discovery: diff --git a/discoverx/dx.py b/discoverx/dx.py index e99d825..d40c98c 100644 --- a/discoverx/dx.py +++ b/discoverx/dx.py @@ -1,7 +1,7 @@ import pandas as pd from pyspark.sql import SparkSession from typing import List, Optional, Union -from discoverx import logging +from discoverx import logs from discoverx.explorer import DataExplorer, InfoFetcher from discoverx.msql import Msql from discoverx.rules import Rules, Rule @@ -37,7 +37,7 @@ def __init__( if spark is None: spark = SparkSession.getActiveSession() self.spark = spark - self.logger = logging.Logging() + self.logger = logs.Logging() self.rules = Rules(custom_rules=custom_rules, locale=locale) self.uc_enabled = self.spark.conf.get("spark.databricks.unityCatalog.enabled", "false") == "true" @@ -49,6 +49,7 @@ def __init__( def _can_read_columns_table(self) -> bool: try: + self.logger.debug(f'Verifying if can read from {self.COLUMNS_TABLE_NAME}') self.spark.sql(f"SELECT * FROM {self.COLUMNS_TABLE_NAME} LIMIT 1") return True except Exception as e: diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 764730c..846269f 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -2,7 +2,7 @@ import copy import re from typing import Optional, List -from discoverx import logging +from discoverx import logs from discoverx.common import helper from discoverx.discovery import Discovery from discoverx.rules import Rule @@ -13,7 +13,7 @@ from discoverx.table_info import InfoFetcher, TableInfo -logger = logging.Logging() +logger = logs.Logging() class DataExplorer: diff --git a/discoverx/logging.py b/discoverx/logs.py similarity index 72% rename from discoverx/logging.py rename to discoverx/logs.py index 8d71217..451fafb 100644 --- a/discoverx/logging.py +++ b/discoverx/logs.py @@ -1,11 +1,12 @@ import logging import re +logger = logging.getLogger('databricks.labs.discoverx') class Logging: def friendly(self, message): print(re.sub("<[^<]+?>", "", message)) - logging.info(message) + logger.info(message) def friendlyHTML(self, message): try: @@ -15,15 +16,15 @@ def friendlyHTML(self, message): except: # Strip HTML classes print(re.sub("<[^<]+?>", "", message)) - logging.info(message) + logger.info(message) def info(self, message): print(message) - logging.info(message) + logger.info(message) def debug(self, message): - logging.debug(message) + logger.debug(message) def error(self, message): print(message) - logging.error(message) + logger.error(message) diff --git a/discoverx/msql.py b/discoverx/msql.py index 42d8348..3f5172b 100644 --- a/discoverx/msql.py +++ b/discoverx/msql.py @@ -1,7 +1,7 @@ """This module contains the M-SQL compiler""" from dataclasses import dataclass from functools import reduce -from discoverx import logging +from discoverx import logs from discoverx.table_info import ColumnInfo, TableInfo from discoverx.common.helper import strip_margin from fnmatch import fnmatch @@ -40,7 +40,7 @@ def __init__(self, msql: str) -> None: # Extract command self.command = self._extract_command() - self.logger = logging.Logging() + self.logger = logs.Logging() def compile_msql(self, table_info: TableInfo) -> list[SQLRow]: """ diff --git a/discoverx/scanner.py b/discoverx/scanner.py index 3a041d2..f1fd87f 100644 --- a/discoverx/scanner.py +++ b/discoverx/scanner.py @@ -8,11 +8,11 @@ from pyspark.sql.utils import AnalysisException from discoverx.common.helper import strip_margin, format_regex -from discoverx import logging +from discoverx import logs from discoverx.table_info import InfoFetcher, TableInfo from discoverx.rules import Rules, RuleTypes -logger = logging.Logging() +logger = logs.Logging() @dataclass diff --git a/labs.yml b/labs.yml new file mode 100644 index 0000000..1121880 --- /dev/null +++ b/labs.yml @@ -0,0 +1,24 @@ +--- +name: discoverx +description: Multi-table operations over the Lakehouse +entrypoint: discoverx/cli.py +min_python: 3.10 +commands: + - name: scan + description: Scans the lakehouse + flags: + - name: locale + default: US + description: Locale for scanning + - name: from_tables + default: '*.*.*' + description: The tables to be scanned in format "catalog.schema.table", use "*" as a wildcard. Defaults to "*.*.*". + - name: rules + default: '*' + description: The rule names to be used to scan the lakehouse, use "*" as a wildcard. Defaults to "*". + - name: sample_size + default: 10000 + description: The number of rows to be scanned per table. Defaults to 10000. + - name: what_if + default: false + description: Whether to run the scan in what-if mode and print the SQL commands instead of executing them. Defaults to False. \ No newline at end of file diff --git a/notebooks/interaction_commands.py b/notebooks/interaction_commands.py index ef47120..035a1de 100644 --- a/notebooks/interaction_commands.py +++ b/notebooks/interaction_commands.py @@ -1,3 +1,5 @@ +# Databricks notebook source + from discoverx import dx # COMMAND ---------- diff --git a/tests/unit/dx_test.py b/tests/unit/dx_test.py index ac0730e..ec828e3 100644 --- a/tests/unit/dx_test.py +++ b/tests/unit/dx_test.py @@ -1,10 +1,10 @@ import pandas as pd import pytest from discoverx.dx import DX -from discoverx import logging +from discoverx import logs from pyspark.sql.functions import col -logger = logging.Logging() +logger = logs.Logging() @pytest.fixture(scope="module", name="dx_ip")