From d8789487b8289c96b05ddf6dc82f160f668b637b Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Tue, 23 Apr 2024 13:26:50 +0700 Subject: [PATCH] feat: upgrade bigquery client version + add timeout as a predicate to retry --- task/bq2bq/executor/bumblebee/bigquery_service.py | 12 +++++++++++- task/bq2bq/executor/bumblebee/log.py | 2 +- task/bq2bq/executor/requirements.txt | 13 +++++++------ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/task/bq2bq/executor/bumblebee/bigquery_service.py b/task/bq2bq/executor/bumblebee/bigquery_service.py index d169888..6b11eca 100644 --- a/task/bq2bq/executor/bumblebee/bigquery_service.py +++ b/task/bq2bq/executor/bumblebee/bigquery_service.py @@ -4,7 +4,9 @@ from abc import ABC, abstractmethod import google as google +import requests.exceptions from google.api_core.exceptions import BadRequest, Forbidden +from google.api_core.retry import if_exception_type, if_transient_error from google.cloud import bigquery from google.cloud.bigquery.job import QueryJobConfig, CreateDisposition from google.cloud.bigquery.schema import _parse_schema_resource @@ -50,6 +52,10 @@ def delete_table(self, full_table_name): def get_table(self, full_table_name): pass +def if_exception_funcs(fn_origin, fn_additional): + def if_exception_func_predicate(exception): + return fn_origin(exception) or fn_additional(exception) + return if_exception_func_predicate class BigqueryService(BaseBigqueryService): @@ -61,7 +67,11 @@ def __init__(self, client, labels, writer, retry_timeout = None, on_job_finish = self.client = client self.labels = labels self.writer = writer - self.retry = bigquery.DEFAULT_RETRY.with_deadline(retry_timeout) if retry_timeout else bigquery.DEFAULT_RETRY + if_additional_transient_error = if_exception_type(requests.exceptions.Timeout) + predicate = if_exception_funcs(if_transient_error, if_additional_transient_error) + retry = bigquery.DEFAULT_RETRY.with_deadline(retry_timeout) if retry_timeout else bigquery.DEFAULT_RETRY + retry.with_predicate(predicate) + self.retry = retry self.on_job_finish = on_job_finish self.on_job_register = on_job_register diff --git a/task/bq2bq/executor/bumblebee/log.py b/task/bq2bq/executor/bumblebee/log.py index 7eaade3..9343af3 100644 --- a/task/bq2bq/executor/bumblebee/log.py +++ b/task/bq2bq/executor/bumblebee/log.py @@ -5,7 +5,7 @@ def get_logger(name: str): logger = logging.getLogger(name) logformat = "[%(asctime)s] %(levelname)s:%(name)s: %(message)s" - logging.basicConfig(level=logging.INFO, stream=sys.stdout, + logging.basicConfig(level=logging.DEBUG, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S") return logger diff --git a/task/bq2bq/executor/requirements.txt b/task/bq2bq/executor/requirements.txt index 77d99d0..9f8131e 100644 --- a/task/bq2bq/executor/requirements.txt +++ b/task/bq2bq/executor/requirements.txt @@ -2,12 +2,13 @@ cachetools==4.1.1 certifi==2020.6.20 chardet==3.0.4 google==3.0.0 -google-api-core==1.21.0 -google-auth==1.18.0 -google-cloud-bigquery==1.25.0 -google-cloud-core==1.3.0 -google-resumable-media==0.5.1 -googleapis-common-protos==1.52.0 +google-api-core==2.8.0 +google-auth==2.29.0 +google-cloud-bigquery==1.28.3 +google-cloud-core==2.4.1 +google-crc32c==1.5.0 +google-resumable-media==1.3.3 +googleapis-common-protos==1.56.0 idna==2.10 iso8601==0.1.12 protobuf==3.12.2