Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Threshold Query Builder #188

Merged
merged 15 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ branch = true
parallel = true

[tool.coverage.report]
omit = ["src/databricks/labs/remorph/reconcile/*",
omit = [
"src/databricks/labs/remorph/coverage/*",
"src/databricks/labs/remorph/helpers/execution_time.py",
"__about__.py"]
Expand Down
28 changes: 22 additions & 6 deletions src/databricks/labs/remorph/reconcile/connectors/data_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from abc import ABC, abstractmethod

from databricks.sdk import WorkspaceClient # pylint: disable-next=wrong-import-order
Expand All @@ -6,7 +7,6 @@
from databricks.labs.remorph.reconcile.recon_config import ( # pylint: disable=ungrouped-imports
JdbcReaderOptions,
Schema,
Tables,
)


Expand All @@ -20,11 +20,16 @@
self.scope = scope

@abstractmethod
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(self, catalog: str, schema: str, query: str, jdbc_reader_options: JdbcReaderOptions) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def read_data(self, catalog: str, schema: str, query: str, jdbc_reader_options: JdbcReaderOptions) -> DataFrame:
def read_data(self, catalog: str, schema: str, query: str, options: JdbcReaderOptions) -> DataFrame:

nit: can you still rename all arguments to make them reasonably shorter? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the arguments to shorter.

return NotImplemented

@abstractmethod
def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
self,
catalog: str,
schema: str,
table: str,
) -> list[Schema]:
return NotImplemented

def _get_jdbc_reader(self, query, jdbc_url, driver):
Expand All @@ -45,6 +50,17 @@
"fetchsize": jdbc_reader_options.fetch_size,
}

def _get_secrets(self, key_name):
key = self.source + '_' + key_name
return self.ws.secrets.get_secret(self.scope, key)
def _get_secrets(self, key):
return self.ws.secrets.get_secret(self.scope, self.source + '_' + key)

Check warning on line 54 in src/databricks/labs/remorph/reconcile/connectors/data_source.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/data_source.py#L54

Added line #L54 was not covered by tests

@staticmethod
def _get_table_or_query(
catalog: str,
schema: str,
query: str,
):
if re.search('select', query, re.IGNORECASE):
return query.format(catalog_name=catalog, schema_name=schema)

Check warning on line 63 in src/databricks/labs/remorph/reconcile/connectors/data_source.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/data_source.py#L63

Added line #L63 was not covered by tests
if catalog:
return catalog + "." + schema + "." + query
return schema + "." + query

Check warning on line 66 in src/databricks/labs/remorph/reconcile/connectors/data_source.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/data_source.py#L65-L66

Added lines #L65 - L66 were not covered by tests
11 changes: 8 additions & 3 deletions src/databricks/labs/remorph/reconcile/connectors/databricks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from pyspark.sql import DataFrame

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.recon_config import Schema, Tables
from databricks.labs.remorph.reconcile.recon_config import JdbcReaderOptions, Schema


class DatabricksDataSource(DataSource):
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(self, catalog: str, schema: str, query: str, jdbc_reader_options: JdbcReaderOptions) -> DataFrame:
# Implement Databricks-specific logic here
return NotImplemented

def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
self,
catalog: str,
schema: str,
table: str,
) -> list[Schema]:
# Implement Databricks-specific logic here
return NotImplemented

Expand Down
27 changes: 15 additions & 12 deletions src/databricks/labs/remorph/reconcile/connectors/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.constants import SourceDriver
from databricks.labs.remorph.reconcile.recon_config import Schema, Tables
from databricks.labs.remorph.reconcile.recon_config import JdbcReaderOptions, Schema


class OracleDataSource(DataSource):
Expand All @@ -16,16 +16,14 @@
f":{self._get_secrets('port')}/{self._get_secrets('database')}"
)

# TODO need to check schema_name,catalog_name is needed
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(self, catalog: str, schema: str, query: str, jdbc_reader_options: JdbcReaderOptions) -> DataFrame:
try:
if table_conf.jdbc_reader_options is None:
return self.reader(query).options(**self._get_timestamp_options()).load()
table_query = self._get_table_or_query(catalog, schema, query)

Check warning on line 21 in src/databricks/labs/remorph/reconcile/connectors/oracle.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/oracle.py#L21

Added line #L21 was not covered by tests
if jdbc_reader_options is None:
return self.reader(table_query).options(**self._get_timestamp_options()).load()

Check warning on line 23 in src/databricks/labs/remorph/reconcile/connectors/oracle.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/oracle.py#L23

Added line #L23 was not covered by tests
return (
self.reader(query)
.options(
**self._get_jdbc_reader_options(table_conf.jdbc_reader_options) | self._get_timestamp_options()
)
self.reader(table_query)
.options(**self._get_jdbc_reader_options(jdbc_reader_options) | self._get_timestamp_options())
ravit-db marked this conversation as resolved.
Show resolved Hide resolved
.load()
)
except PySparkException as e:
Expand All @@ -34,14 +32,19 @@
)
raise PySparkException(error_msg) from e

def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
self,
catalog: str,
schema: str,
table: str,
) -> list[Schema]:
try:
schema_query = self._get_schema_query(table_name, schema_name)
schema_query = self._get_schema_query(table, schema)

Check warning on line 42 in src/databricks/labs/remorph/reconcile/connectors/oracle.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/remorph/reconcile/connectors/oracle.py#L42

Added line #L42 was not covered by tests
schema_df = self.reader(schema_query).load()
return [Schema(field.column_name.lower(), field.data_type.lower()) for field in schema_df.collect()]
except PySparkException as e:
error_msg = (
f"An error occurred while fetching Oracle Schema using the following {table_name} in "
f"An error occurred while fetching Oracle Schema using the following {table} in "
f"OracleDataSource: {e!s}"
)
raise PySparkException(error_msg) from e
Expand Down
11 changes: 8 additions & 3 deletions src/databricks/labs/remorph/reconcile/connectors/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from pyspark.sql import DataFrame

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.recon_config import Schema, Tables
from databricks.labs.remorph.reconcile.recon_config import JdbcReaderOptions, Schema


class SnowflakeDataSource(DataSource):
def read_data(self, schema_name: str, catalog_name: str, query: str, table_conf: Tables) -> DataFrame:
def read_data(self, catalog: str, schema: str, query: str, jdbc_reader_options: JdbcReaderOptions) -> DataFrame:
# Implement Snowflake-specific logic here
return NotImplemented

def get_schema(self, table_name: str, schema_name: str, catalog_name: str) -> list[Schema]:
def get_schema(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does make fmt put it back to one line?...

self,
catalog: str,
schema: str,
table: str,
) -> list[Schema]:
# Implement Snowflake-specific logic here
return NotImplemented

Expand Down
6 changes: 3 additions & 3 deletions src/databricks/labs/remorph/reconcile/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from databricks.labs.blueprint.installation import Installation

from databricks.labs.remorph.reconcile.connectors.data_source import DataSource
from databricks.labs.remorph.reconcile.recon_config import TableRecon, Tables
from databricks.labs.remorph.reconcile.recon_config import Table, TableRecon

logger = logging.getLogger(__name__)

Expand All @@ -27,10 +27,10 @@ def __init__(self, source: DataSource, target: DataSource):
self.source = source
self.target = target

def compare_schemas(self, table_conf: Tables, schema_name: str, catalog_name: str) -> bool:
def compare_schemas(self, table_conf: Table, schema_name: str, catalog_name: str) -> bool:
raise NotImplementedError

def compare_data(self, table_conf: Tables, schema_name: str, catalog_name: str) -> bool:
def compare_data(self, table_conf: Table, schema_name: str, catalog_name: str) -> bool:
raise NotImplementedError


Expand Down
Loading
Loading