Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dry-run on loader #30

Merged
merged 6 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions task/bq2bq/executor/bumblebee/bigquery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def transform_load(self,
destination_table=None,
write_disposition=None,
create_disposition=CreateDisposition.CREATE_NEVER,
dry_run=False,
allow_field_addition=False):
pass

Expand Down Expand Up @@ -77,8 +78,9 @@ def __init__(self, client, labels, writer, retry_timeout = None, on_job_finish =
self.on_job_finish = on_job_finish
self.on_job_register = on_job_register

def execute_query(self, query):
def execute_query(self, query, dry_run=False):
query_job_config = QueryJobConfig()
query_job_config.dry_run = dry_run
query_job_config.use_legacy_sql = False
query_job_config.labels = self.labels

Expand All @@ -89,6 +91,10 @@ def execute_query(self, query):
query_job = self.client.query(query=query,
job_config=query_job_config,
retry=self.retry)
if dry_run:
logger.info("dry-run finished")
return None

logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state,
query_job.project))

Expand All @@ -111,6 +117,9 @@ def execute_query(self, query):

if self.on_job_finish is not None:
self.on_job_finish(query_job)

logger.info(result)
logger.info("finished")
return result

def transform_load(self,
Expand All @@ -119,11 +128,13 @@ def transform_load(self,
destination_table=None,
write_disposition=None,
create_disposition=CreateDisposition.CREATE_NEVER,
dry_run=False,
allow_field_addition=False):
if query is None or len(query) == 0:
raise ValueError("query must not be Empty")

query_job_config = QueryJobConfig()
query_job_config.dry_run = dry_run
query_job_config.create_disposition = create_disposition
query_job_config.write_disposition = write_disposition
query_job_config.use_legacy_sql = False
Expand All @@ -142,6 +153,10 @@ def transform_load(self,
query_job = self.client.query(query=query,
job_config=query_job_config,
retry=self.retry)
if dry_run:
logger.info("dry-run finished")
return None

logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state,
query_job.project))

Expand All @@ -164,6 +179,9 @@ def transform_load(self,

if self.on_job_finish is not None:
self.on_job_finish(query_job)

logger.info(result)
logger.info("finished")
return result

def create_table(self, full_table_name, schema_file,
Expand Down Expand Up @@ -251,7 +269,7 @@ def execute_query(self, query):
return []

def transform_load(self, query, source_project_id=None, destination_table=None, write_disposition=None,
create_disposition=CreateDisposition.CREATE_NEVER, allow_field_addition=False):
create_disposition=CreateDisposition.CREATE_NEVER, dry_run=False, allow_field_addition=False):
log = """ transform and load with config :
{}
{}
Expand Down
5 changes: 1 addition & 4 deletions task/bq2bq/executor/bumblebee/bq2bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ def bq2bq(properties_file: str,
else:
task_config = TaskConfigFromEnv()

bigquery_service = DummyService()
if not dry_run:
bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish, on_job_register=on_job_register)

bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish, on_job_register=on_job_register)
transformation = Transformation(bigquery_service,
task_config,
task_files.query,
Expand Down
15 changes: 9 additions & 6 deletions task/bq2bq/executor/bumblebee/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import ABC
from abc import abstractmethod
from bumblebee.config import LoadMethod
from bumblebee.bigquery_service import BigqueryService

class BaseLoader(ABC):

Expand All @@ -14,36 +15,38 @@ def load(self, query):

class PartitionLoader(BaseLoader):

def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, partition: datetime, allow_field_addition=False):
def __init__(self, bigquery_service: BigqueryService, destination: str, load_method: LoadMethod, partition: datetime, allow_field_addition=False):
self.bigquery_service = bigquery_service
self.destination_name = destination
self.load_method = load_method
self.partition_date = partition
self.allow_field_addition = allow_field_addition

def load(self, query):
def load(self, query, dry_run=False):
partition_date_str = self.partition_date.strftime("%Y%m%d")
load_destination = "{}${}".format(self.destination_name, partition_date_str)
write_disposition = self.load_method.write_disposition
allow_field_addition = self.allow_field_addition
return self.bigquery_service.transform_load(query=query,
write_disposition=write_disposition,
destination_table=load_destination,
dry_run = dry_run,
allow_field_addition=allow_field_addition)


class TableLoader(BaseLoader):

def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, allow_field_addition=False):
def __init__(self, bigquery_service: BigqueryService, destination: str, load_method: LoadMethod, allow_field_addition=False):
self.bigquery_service = bigquery_service
self.full_table_name = destination
self.load_method = load_method
self.allow_field_addition = allow_field_addition

def load(self, query):
def load(self, query, dry_run=False):
return self.bigquery_service.transform_load(query=query,
write_disposition=self.load_method.write_disposition,
destination_table=self.full_table_name,
dry_run = dry_run,
allow_field_addition=self.allow_field_addition)


Expand All @@ -52,5 +55,5 @@ def __init__(self,bigquery_service: BigqueryService, destination: str):
self.bigquery_service = bigquery_service
self.full_table_name = destination

def load(self,query):
return self.bigquery_service.execute_query(query)
def load(self,query, dry_run=False):
return self.bigquery_service.execute_query(query, dry_run=dry_run)
30 changes: 7 additions & 23 deletions task/bq2bq/executor/bumblebee/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,7 @@ def execute(self):
query = query.apply_parameter(parameter)
query.print_with_logger(logger)

result = None

if not self.dry_run:
result = self.loader.load(query)
logger.info(result)
logger.info("finished")
else:
logger.info("dry-run finished")
self.loader.load(query, dry_run=self.dry_run)

class TableTransformation:
"""
Expand Down Expand Up @@ -292,13 +285,7 @@ def execute(self):
logger.info("start transformation job")
self.query.print_with_logger(logger)

result = None
if not self.dry_run:
result = self.loader.load(self.query)
logger.info(result)
logger.info("finished")
else:
logger.info("dry-run finished")
self.loader.load(self.query, dry_run=self.dry_run)

async def async_execute(self):
self.execute()
Expand Down Expand Up @@ -353,11 +340,10 @@ def transform(self):
self.partition_column_type)
query.print_with_logger(logger)

if not self.dry_run:
result = self.loader.load(query)
logger.info("finished {}".format(result.total_rows))
else:
logger.info("dry-run finished")

result = self.loader.load(query, dry_run=self.dry_run)
if not self.dry_run and result:
logger.info("total rows: {}".format(result.total_rows))


class MultiPartitionTransformation:
Expand Down Expand Up @@ -513,9 +499,7 @@ def collect_datetimes(self):
destination_parameter)
query.print()

results = None
if not self.dry_run:
results = self.bigquery_service.execute_query(query)
results = self.bigquery_service.execute_query(query)

dates = [row[0] for row in results]
datetimes = [datetime.combine(d, datetime.min.time()) for d in dates]
Expand Down
24 changes: 20 additions & 4 deletions task/bq2bq/executor/tests/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def test_partition_transform_execute(self, BigqueryServiceMock):
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190101",
dry_run=False,
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand Down Expand Up @@ -90,6 +91,7 @@ def test_table_transform(self, BigqueryServiceMock):
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd",
dry_run=False,
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand All @@ -113,6 +115,7 @@ def test_single_partition_transform_1d_window_0_offset_without_spillover(self, B
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190101",
dry_run=False,
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand All @@ -135,6 +138,7 @@ def test_single_partition_transform_2d_window_24h_offset_without_spillover(self,
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190104",
dry_run=False,
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand Down Expand Up @@ -174,6 +178,7 @@ def test_single_partition_transform_7d_window_without_spillover(self, BigquerySe
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190103",
dry_run=False,
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand All @@ -197,9 +202,11 @@ def test_single_partition_transform_2d_with_spillover(self, BigqueryServiceMock)

calls = [call(query=final_query_1, write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190103",
dry_run=False,
allow_field_addition=False),
call(query=final_query_2, write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190104",
dry_run=False,
allow_field_addition=False)]
bigquery_service.transform_load.assert_has_calls(calls, any_order=True)
self.assertEqual(len(bigquery_service.transform_load.call_args_list), len(calls))
Expand All @@ -220,7 +227,7 @@ def test_dml_transform(self, BigqueryServiceMock):
task.execute()

final_query = """select count(1) from table where date >= '2019-01-02' and date < '2019-01-03'"""
bigquery_service.execute_query.assert_called_with(final_query)
bigquery_service.execute_query.assert_called_with(final_query,dry_run=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_execute_dry_run(self, BigqueryServiceMock):
Expand All @@ -237,7 +244,13 @@ def test_execute_dry_run(self, BigqueryServiceMock):
task = TableTransformation(bigquery_service, task_config, query, localized_start_time,
localized_end_time, dry_run, localized_execution_time)
task.transform()
bigquery_service.transform_load.assert_not_called()

final_query = "select count(1) from table where date >= '2019-01-01 00:00:00' and date < '2019-01-01 00:00:00'"
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd",
dry_run=True,
allow_field_addition=False)


@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand All @@ -262,6 +275,7 @@ def test_allow_field_addition(self, BigqueryServiceMock):
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd",
dry_run=False,
allow_field_addition=True)


Expand Down Expand Up @@ -289,7 +303,7 @@ def test_should_run_dml_merge_statements(self, BigqueryServiceMock):
transformation.transform()

final_query = """select count(1) from table where date >= '2019-02-01' and date < '2019-02-02'"""
bigquery_service.execute_query.assert_called_with(final_query)
bigquery_service.execute_query.assert_called_with(final_query,dry_run=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_should_run_table_task(self, BigqueryServiceMock):
Expand Down Expand Up @@ -325,6 +339,7 @@ def get_table_mock(table_name):
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_APPEND,
destination_table="bq_project.playground_dev.abcd",
dry_run=False,
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand Down Expand Up @@ -362,6 +377,7 @@ def get_table_mock(table_name):
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd",
dry_run=False,
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand Down Expand Up @@ -401,7 +417,7 @@ def get_table_mock(table_name):
transformation.transform()

final_query = """-- Optimus generated\nDECLARE partitions ARRAY<DATE>;\n\n\n\nCREATE TEMP TABLE `opt__partitions` AS (\n select count(1) from table where date >= '__dstart__' and date < '__dend__'\n);\n\nSET (partitions) = (\n SELECT AS STRUCT\n array_agg(DISTINCT DATE(`event_timestamp`))\n FROM opt__partitions\n);\n\nMERGE INTO\n `bq_project.playground_dev.abcd` AS target\nUSING\n (\n Select * from `opt__partitions`\n ) AS source\nON FALSE\nWHEN NOT MATCHED BY SOURCE AND DATE(`event_timestamp`) IN UNNEST(partitions)\nTHEN DELETE\nWHEN NOT MATCHED THEN INSERT\n (\n \n )\nVALUES\n (\n \n );\n"""
bigquery_service.execute_query.assert_called_with(final_query)
bigquery_service.execute_query.assert_called_with(final_query,dry_run=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_should_fail_if_partition_task_for_ingestion_time_without_filter_in_REPLACE_MERGE(self, BigqueryServiceMock):
Expand Down
Loading