Skip to content

Commit

Permalink
Deduplicated TableInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
edurdevic committed Oct 15, 2023
1 parent 5c25e73 commit df2bffd
Show file tree
Hide file tree
Showing 9 changed files with 459 additions and 455 deletions.
42 changes: 30 additions & 12 deletions discoverx/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class Discovery:
""" """

COLUMNS_TABLE_NAME = "system.information_schema.columns"
INFORMATION_SCHEMA = "system.information_schema"
MAX_WORKERS = 10

def __init__(
Expand All @@ -36,7 +36,9 @@ def __init__(
self._scan_result: Optional[ScanResult] = None
self.rules: Optional[Rules] = Rules(custom_rules=custom_rules, locale=locale)

def _msql(self, msql: str, what_if: bool = False, min_score: Optional[float] = None):
def _msql(
self, msql: str, what_if: bool = False, min_score: Optional[float] = None
):
logger.debug(f"Executing sql template: {msql}")

msql_builder = Msql(msql)
Expand Down Expand Up @@ -73,7 +75,7 @@ def scan(
rule_filter=rules,
sample_size=sample_size,
what_if=what_if,
columns_table_name=self.COLUMNS_TABLE_NAME,
information_schema=self.INFORMAI,
max_workers=self.MAX_WORKERS,
)

Expand All @@ -82,7 +84,9 @@ def scan(

def _check_scan_result(self):
if self._scan_result is None:
raise Exception("You first need to scan your lakehouse using Scanner.scan()")
raise Exception(
"You first need to scan your lakehouse using Scanner.scan()"
)

@property
def scan_result(self):
Expand Down Expand Up @@ -136,7 +140,9 @@ def search(
raise ValueError("search_term has not been provided.")

if not isinstance(search_term, str):
raise ValueError(f"The search_term type {type(search_term)} is not valid. Please use a string type.")
raise ValueError(
f"The search_term type {type(search_term)} is not valid. Please use a string type."
)

if by_class is None:
# Trying to infer the class by the search term
Expand All @@ -155,11 +161,15 @@ def search(
)
else:
by_class = search_matching_rules[0]
logger.friendly(f"Discoverx will search your lakehouse using the class {by_class}")
logger.friendly(
f"Discoverx will search your lakehouse using the class {by_class}"
)
elif isinstance(by_class, str):
search_matching_rules = [by_class]
else:
raise ValueError(f"The provided by_class {by_class} must be of string type.")
raise ValueError(
f"The provided by_class {by_class} must be of string type."
)

sql_filter = f"`[{search_matching_rules[0]}]` = '{search_term}'"
select_statement = (
Expand Down Expand Up @@ -209,7 +219,9 @@ def select_by_classes(

if isinstance(by_classes, str):
by_classes = [by_classes]
elif isinstance(by_classes, list) and all(isinstance(elem, str) for elem in by_classes):
elif isinstance(by_classes, list) and all(
isinstance(elem, str) for elem in by_classes
):
by_classes = by_classes
else:
raise ValueError(
Expand All @@ -229,7 +241,8 @@ def select_by_classes(
)

return self._msql(
f"SELECT {from_statement}, to_json(struct(*)) AS row_content FROM {from_tables}", min_score=min_score
f"SELECT {from_statement}, to_json(struct(*)) AS row_content FROM {from_tables}",
min_score=min_score,
)

def delete_by_class(
Expand Down Expand Up @@ -266,7 +279,9 @@ def delete_by_class(
Msql.validate_from_components(from_tables)

if (by_class is None) or (not isinstance(by_class, str)):
raise ValueError(f"Please provide a class to identify the columns to be matched on the provided values.")
raise ValueError(
f"Please provide a class to identify the columns to be matched on the provided values."
)

if values is None:
raise ValueError(
Expand All @@ -278,7 +293,8 @@ def delete_by_class(
value_string = "'" + "', '".join(values) + "'"
else:
raise ValueError(
f"The provided values {values} have the wrong type. Please provide" f" either a str or List[str]."
f"The provided values {values} have the wrong type. Please provide"
f" either a str or List[str]."
)

if not yes_i_am_sure:
Expand All @@ -297,4 +313,6 @@ def delete_by_class(

if delete_result is not None:
delete_result = delete_result.toPandas()
logger.friendlyHTML(f"<p>The affected tables are</p>{delete_result.to_html()}")
logger.friendlyHTML(
f"<p>The affected tables are</p>{delete_result.to_html()}"
)
56 changes: 40 additions & 16 deletions discoverx/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class DX:
Defaults to None.
"""

COLUMNS_TABLE_NAME = "system.information_schema.columns"
INFORMATION_SCHEMA = "system.information_schema"
MAX_WORKERS = 10

Expand All @@ -41,7 +40,10 @@ def __init__(
self.logger = logging.Logging()

self.rules = Rules(custom_rules=custom_rules, locale=locale)
self.uc_enabled = self.spark.conf.get("spark.databricks.unityCatalog.enabled", "false") == "true"
self.uc_enabled = (
self.spark.conf.get("spark.databricks.unityCatalog.enabled", "false")
== "true"
)

self.scanner: Optional[Scanner] = None
self._scan_result: Optional[ScanResult] = None
Expand All @@ -50,10 +52,12 @@ def __init__(

def _can_read_columns_table(self) -> bool:
try:
self.spark.sql(f"SELECT * FROM {self.COLUMNS_TABLE_NAME} LIMIT 1")
self.spark.sql(f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns LIMIT 1")
return True
except Exception as e:
self.logger.error(f"Error while reading table {self.COLUMNS_TABLE_NAME}: {e}")
self.logger.error(
f"Error while reading table {self.INFORMATION_SCHEMA}.columns: {e}"
)
return False

def intro(self):
Expand Down Expand Up @@ -138,7 +142,7 @@ def scan(
rule_filter=rules,
sample_size=sample_size,
what_if=what_if,
columns_table_name=self.COLUMNS_TABLE_NAME,
information_schema=self.INFORMATION_SCHEMA,
max_workers=self.MAX_WORKERS,
)

Expand All @@ -147,7 +151,9 @@ def scan(

def _check_scan_result(self):
if self._scan_result is None:
raise Exception("You first need to scan your lakehouse using Scanner.scan()")
raise Exception(
"You first need to scan your lakehouse using Scanner.scan()"
)

@property
def scan_result(self):
Expand Down Expand Up @@ -222,7 +228,9 @@ def search(
raise ValueError("search_term has not been provided.")

if not isinstance(search_term, str):
raise ValueError(f"The search_term type {type(search_term)} is not valid. Please use a string type.")
raise ValueError(
f"The search_term type {type(search_term)} is not valid. Please use a string type."
)

if by_class is None:
# Trying to infer the class by the search term
Expand All @@ -241,11 +249,15 @@ def search(
)
else:
by_class = search_matching_rules[0]
self.logger.friendly(f"Discoverx will search your lakehouse using the class {by_class}")
self.logger.friendly(
f"Discoverx will search your lakehouse using the class {by_class}"
)
elif isinstance(by_class, str):
search_matching_rules = [by_class]
else:
raise ValueError(f"The provided by_class {by_class} must be of string type.")
raise ValueError(
f"The provided by_class {by_class} must be of string type."
)

sql_filter = f"`[{search_matching_rules[0]}]` = '{search_term}'"
select_statement = (
Expand Down Expand Up @@ -295,7 +307,9 @@ def select_by_classes(

if isinstance(by_classes, str):
by_classes = [by_classes]
elif isinstance(by_classes, list) and all(isinstance(elem, str) for elem in by_classes):
elif isinstance(by_classes, list) and all(
isinstance(elem, str) for elem in by_classes
):
by_classes = by_classes
else:
raise ValueError(
Expand All @@ -315,7 +329,8 @@ def select_by_classes(
)

return self._msql(
f"SELECT {from_statement}, to_json(struct(*)) AS row_content FROM {from_tables}", min_score=min_score
f"SELECT {from_statement}, to_json(struct(*)) AS row_content FROM {from_tables}",
min_score=min_score,
)

def delete_by_class(
Expand Down Expand Up @@ -352,7 +367,9 @@ def delete_by_class(
Msql.validate_from_components(from_tables)

if (by_class is None) or (not isinstance(by_class, str)):
raise ValueError(f"Please provide a class to identify the columns to be matched on the provided values.")
raise ValueError(
f"Please provide a class to identify the columns to be matched on the provided values."
)

if values is None:
raise ValueError(
Expand All @@ -364,7 +381,8 @@ def delete_by_class(
value_string = "'" + "', '".join(values) + "'"
else:
raise ValueError(
f"The provided values {values} have the wrong type. Please provide" f" either a str or List[str]."
f"The provided values {values} have the wrong type. Please provide"
f" either a str or List[str]."
)

if not yes_i_am_sure:
Expand All @@ -383,7 +401,9 @@ def delete_by_class(

if delete_result is not None:
delete_result = delete_result.toPandas()
self.logger.friendlyHTML(f"<p>The affcted tables are</p>{delete_result.to_html()}")
self.logger.friendlyHTML(
f"<p>The affcted tables are</p>{delete_result.to_html()}"
)

def from_tables(self, from_tables: str = "*.*.*"):
"""Returns a DataExplorer object for the given tables
Expand All @@ -400,9 +420,13 @@ def from_tables(self, from_tables: str = "*.*.*"):
"""

return DataExplorer(from_tables, self.spark, InfoFetcher(self.spark, self.INFORMATION_SCHEMA))
return DataExplorer(
from_tables, self.spark, InfoFetcher(self.spark, self.INFORMATION_SCHEMA)
)

def _msql(self, msql: str, what_if: bool = False, min_score: Optional[float] = None):
def _msql(
self, msql: str, what_if: bool = False, min_score: Optional[float] = None
):
self.logger.debug(f"Executing sql template: {msql}")

msql_builder = Msql(msql)
Expand Down
Loading

0 comments on commit df2bffd

Please sign in to comment.