From 52fc68c7f95abb6fa87122d159476c103d389d79 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Thu, 11 Jul 2024 15:54:18 +0700 Subject: [PATCH] fix: logging result only on non dry-run --- .../executor/bumblebee/bigquery_service.py | 14 ++++++++++ .../executor/bumblebee/transformation.py | 28 ++++--------------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/task/bq2bq/executor/bumblebee/bigquery_service.py b/task/bq2bq/executor/bumblebee/bigquery_service.py index c1a054d..c6d3565 100644 --- a/task/bq2bq/executor/bumblebee/bigquery_service.py +++ b/task/bq2bq/executor/bumblebee/bigquery_service.py @@ -91,6 +91,10 @@ def execute_query(self, query, dry_run=False): query_job = self.client.query(query=query, job_config=query_job_config, retry=self.retry) + if dry_run: + logger.info("dry-run finished") + return None + logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state, query_job.project)) @@ -113,6 +117,9 @@ def execute_query(self, query, dry_run=False): if self.on_job_finish is not None: self.on_job_finish(query_job) + + logger.info(result) + logger.info("finished") return result def transform_load(self, @@ -146,6 +153,10 @@ def transform_load(self, query_job = self.client.query(query=query, job_config=query_job_config, retry=self.retry) + if dry_run: + logger.info("dry-run finished") + return None + logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state, query_job.project)) @@ -168,6 +179,9 @@ def transform_load(self, if self.on_job_finish is not None: self.on_job_finish(query_job) + + logger.info(result) + logger.info("finished") return result def create_table(self, full_table_name, schema_file, diff --git a/task/bq2bq/executor/bumblebee/transformation.py b/task/bq2bq/executor/bumblebee/transformation.py index 8ee2b56..73b28c8 100644 --- a/task/bq2bq/executor/bumblebee/transformation.py +++ b/task/bq2bq/executor/bumblebee/transformation.py @@ -190,13 +190,7 @@ def execute(self): query = query.apply_parameter(parameter) query.print_with_logger(logger) - result = self.loader.load(query, dry_run=self.dry_run) - logger.info(result) - - if not self.dry_run: - logger.info("finished") - else: - logger.info("dry-run finished") + self.loader.load(query, dry_run=self.dry_run) class TableTransformation: """ @@ -291,13 +285,7 @@ def execute(self): logger.info("start transformation job") self.query.print_with_logger(logger) - result = self.loader.load(self.query, dry_run=self.dry_run) - logger.info(result) - - if not self.dry_run: - logger.info("finished") - else: - logger.info("dry-run finished") + self.loader.load(self.query, dry_run=self.dry_run) async def async_execute(self): self.execute() @@ -354,12 +342,8 @@ def transform(self): result = self.loader.load(query, dry_run=self.dry_run) - logger.info(result) - - if not self.dry_run: - logger.info("finished {}".format(result.total_rows)) - else: - logger.info("dry-run finished") + if not self.dry_run and result: + logger.info("total rows: {}".format(result.total_rows)) class MultiPartitionTransformation: @@ -515,9 +499,7 @@ def collect_datetimes(self): destination_parameter) query.print() - results = None - if not self.dry_run: - results = self.bigquery_service.execute_query(query) + results = self.bigquery_service.execute_query(query) dates = [row[0] for row in results] datetimes = [datetime.combine(d, datetime.min.time()) for d in dates]