Skip to content

Commit

Permalink
feat: upgrade bigquery client version + add timeout as a predicate to…
Browse files Browse the repository at this point in the history
… retry
  • Loading branch information
deryrahman committed Apr 23, 2024
1 parent ffa0543 commit 56bc214
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
2 changes: 1 addition & 1 deletion task/bq2bq/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM python:3.8-alpine

WORKDIR /opt/bumblebee

COPY task/bq2bq/executor .
COPY . .
RUN ["pip", "install", "-r", "requirements.txt"]

ENTRYPOINT [ "python3", "/opt/bumblebee/main.py"]
12 changes: 11 additions & 1 deletion task/bq2bq/executor/bumblebee/bigquery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from abc import ABC, abstractmethod

import google as google
import requests.exceptions
from google.api_core.exceptions import BadRequest, Forbidden
from google.api_core.retry import if_exception_type, if_transient_error
from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJobConfig, CreateDisposition
from google.cloud.bigquery.schema import _parse_schema_resource
Expand Down Expand Up @@ -50,6 +52,10 @@ def delete_table(self, full_table_name):
def get_table(self, full_table_name):
pass

def if_exception_funcs(fn_origin, fn_additional):
def if_exception_func_predicate(exception):
return fn_origin(exception) or fn_additional(exception)
return if_exception_func_predicate

class BigqueryService(BaseBigqueryService):

Expand All @@ -61,7 +67,11 @@ def __init__(self, client, labels, writer, retry_timeout = None, on_job_finish =
self.client = client
self.labels = labels
self.writer = writer
self.retry = bigquery.DEFAULT_RETRY.with_deadline(retry_timeout) if retry_timeout else bigquery.DEFAULT_RETRY
if_additional_transient_error = if_exception_type(requests.exceptions.Timeout)
predicate = if_exception_funcs(if_transient_error, if_additional_transient_error)
retry = bigquery.DEFAULT_RETRY.with_deadline(retry_timeout) if retry_timeout else bigquery.DEFAULT_RETRY
retry.with_predicate(predicate)
self.retry = retry
self.on_job_finish = on_job_finish
self.on_job_register = on_job_register

Expand Down
2 changes: 1 addition & 1 deletion task/bq2bq/executor/bumblebee/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
def get_logger(name: str):
logger = logging.getLogger(name)
logformat = "[%(asctime)s] %(levelname)s:%(name)s: %(message)s"
logging.basicConfig(level=logging.INFO, stream=sys.stdout,
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout,
format=logformat, datefmt="%Y-%m-%d %H:%M:%S")

return logger
13 changes: 7 additions & 6 deletions task/bq2bq/executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ cachetools==4.1.1
certifi==2020.6.20
chardet==3.0.4
google==3.0.0
google-api-core==1.21.0
google-auth==1.18.0
google-cloud-bigquery==1.25.0
google-cloud-core==1.3.0
google-resumable-media==0.5.1
googleapis-common-protos==1.52.0
google-api-core==2.8.0
google-auth==2.29.0
google-cloud-bigquery==1.28.3
google-cloud-core==2.4.1
google-crc32c==1.5.0
google-resumable-media==1.3.3
googleapis-common-protos==1.56.0
idna==2.10
iso8601==0.1.12
protobuf==3.12.2
Expand Down

0 comments on commit 56bc214

Please sign in to comment.