Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Threshold Query Builder #188

Merged
merged 15 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ branch = true
parallel = true

[tool.coverage.report]
omit = ["src/databricks/labs/remorph/reconcile/*",
omit = [
"src/databricks/labs/remorph/coverage/*",
"src/databricks/labs/remorph/helpers/execution_time.py",
"__about__.py"]
Expand Down
25 changes: 22 additions & 3 deletions src/databricks/labs/remorph/reconcile/connectors/data_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from abc import ABC, abstractmethod

from databricks.sdk import WorkspaceClient # pylint: disable-next=wrong-import-order
Expand All @@ -6,7 +7,6 @@
from databricks.labs.remorph.reconcile.recon_config import ( # pylint: disable=ungrouped-imports
JdbcReaderOptions,
Schema,
Tables,
)


Expand All @@ -20,11 +20,18 @@
self.scope = scope

@abstractmethod
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(
self, catalog_name: str, schema_name: str, query: str, jdbc_reader_options: JdbcReaderOptions
ravit-db marked this conversation as resolved.
Show resolved Hide resolved
) -> DataFrame:
return NotImplemented

@abstractmethod
def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
self,
catalog_name: str,
ravit-db marked this conversation as resolved.
Show resolved Hide resolved
schema_name: str,
ravit-db marked this conversation as resolved.
Show resolved Hide resolved
table_name: str,
ravit-db marked this conversation as resolved.
Show resolved Hide resolved
) -> list[Schema]:
return NotImplemented

def _get_jdbc_reader(self, query, jdbc_url, driver):
Expand All @@ -48,3 +55,15 @@
def _get_secrets(self, key_name):
key = self.source + '_' + key_name
return self.ws.secrets.get_secret(self.scope, key)

@staticmethod
def _get_table_or_query(
catalog_name: str,
schema_name: str,
query: str,
):
if re.search('select', query, re.IGNORECASE):
return query.format(catalog_name=catalog_name, schema_name=schema_name)

Check warning on line 66 in src/databricks/labs/remorph/reconcile/connectors/data_source.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/data_source.py#L66

Added line #L66 was not covered by tests
if catalog_name:
return catalog_name + "." + schema_name + "." + query
return schema_name + "." + query

Check warning on line 69 in src/databricks/labs/remorph/reconcile/connectors/data_source.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/data_source.py#L68-L69

Added lines #L68 - L69 were not covered by tests
13 changes: 10 additions & 3 deletions src/databricks/labs/remorph/reconcile/connectors/databricks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
from pyspark.sql import DataFrame

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.recon_config import Schema, Tables
from databricks.labs.remorph.reconcile.recon_config import JdbcReaderOptions, Schema


class DatabricksDataSource(DataSource):
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(
self, catalog_name: str, schema_name: str, query: str, jdbc_reader_options: JdbcReaderOptions
) -> DataFrame:
# Implement Databricks-specific logic here
return NotImplemented

def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
self,
catalog_name: str,
schema_name: str,
table_name: str,
) -> list[Schema]:
# Implement Databricks-specific logic here
return NotImplemented

Expand Down
25 changes: 15 additions & 10 deletions src/databricks/labs/remorph/reconcile/connectors/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.constants import SourceDriver
from databricks.labs.remorph.reconcile.recon_config import Schema, Tables
from databricks.labs.remorph.reconcile.recon_config import JdbcReaderOptions, Schema


class OracleDataSource(DataSource):
Expand All @@ -16,16 +16,16 @@
f":{self._get_secrets('port')}/{self._get_secrets('database')}"
)

# TODO need to check schema_name,catalog_name is needed
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(
self, catalog_name: str, schema_name: str, query: str, jdbc_reader_options: JdbcReaderOptions
) -> DataFrame:
try:
if table_conf.jdbc_reader_options is None:
return self.reader(query).options(**self._get_timestamp_options()).load()
table_query = self._get_table_or_query(catalog_name, schema_name, query)

Check warning on line 23 in src/databricks/labs/remorph/reconcile/connectors/oracle.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/oracle.py#L23

Added line #L23 was not covered by tests
if jdbc_reader_options is None:
return self.reader(table_query).options(**self._get_timestamp_options()).load()

Check warning on line 25 in src/databricks/labs/remorph/reconcile/connectors/oracle.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/oracle.py#L25

Added line #L25 was not covered by tests
return (
self.reader(query)
.options(
**self._get_jdbc_reader_options(table_conf.jdbc_reader_options) | self._get_timestamp_options()
)
self.reader(table_query)
.options(**self._get_jdbc_reader_options(jdbc_reader_options) | self._get_timestamp_options())
ravit-db marked this conversation as resolved.
Show resolved Hide resolved
.load()
)
except PySparkException as e:
Expand All @@ -34,7 +34,12 @@
)
raise PySparkException(error_msg) from e

def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
self,
catalog_name: str,
schema_name: str,
table_name: str,
) -> list[Schema]:
try:
schema_query = self._get_schema_query(table_name, schema_name)
schema_df = self.reader(schema_query).load()
Expand Down
13 changes: 10 additions & 3 deletions src/databricks/labs/remorph/reconcile/connectors/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
from pyspark.sql import DataFrame

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.recon_config import Schema, Tables
from databricks.labs.remorph.reconcile.recon_config import JdbcReaderOptions, Schema


class SnowflakeDataSource(DataSource):
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(
self, catalog_name: str, schema_name: str, query: str, jdbc_reader_options: JdbcReaderOptions
) -> DataFrame:
# Implement Snowflake-specific logic here
return NotImplemented

def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does make fmt put it back to one line?...

self,
catalog_name: str,
schema_name: str,
table_name: str,
) -> list[Schema]:
# Implement Snowflake-specific logic here
return NotImplemented

Expand Down
6 changes: 3 additions & 3 deletions src/databricks/labs/remorph/reconcile/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from databricks.labs.blueprint.installation import Installation

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.recon_config import TableRecon, Tables
from databricks.labs.remorph.reconcile.recon_config import Table, TableRecon

logger = logging.getLogger(__name__)

Expand All @@ -27,10 +27,10 @@ def __init__(self, source: DataSource, target: DataSource):
self.source = source
self.target = target

def compare_schemas(self, table_conf: Tables, schema_name: str, catalog_name: str) -> bool:
def compare_schemas(self, table_conf: Table, schema_name: str, catalog_name: str) -> bool:
raise NotImplementedError

def compare_data(self, table_conf: Tables, schema_name: str, catalog_name: str) -> bool:
def compare_data(self, table_conf: Table, schema_name: str, catalog_name: str) -> bool:
raise NotImplementedError


Expand Down
Loading
Loading