Skip to content

Commit

Permalink
2034 add retry loop to Google symptoms data pull (#2057)
Browse files Browse the repository at this point in the history
* first implementation

* add testing and more robust conditions

* revert unneeded change

* only retry once and added other applicable error

* lint

* fixed test

* lint
  • Loading branch information
aysim319 authored Nov 5, 2024
1 parent 3450dfc commit 57368e9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
29 changes: 23 additions & 6 deletions google_symptoms/delphi_google_symptoms/pull.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Retrieve data and wrangle into appropriate format."""
# -*- coding: utf-8 -*-
import random
import re
import time
from datetime import date, datetime # pylint: disable=unused-import

import numpy as np
import pandas as pd
import pandas_gbq
from google.api_core.exceptions import BadRequest, InternalServerError, ServerError
from google.oauth2 import service_account

from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS
Expand Down Expand Up @@ -184,16 +187,30 @@ def pull_gs_data_one_geolevel(level, date_range):
pd.DataFrame
"""
query = produce_query(level, date_range)
df = None

# recommends to only try once for 500/503 error
try:
df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS)
# pylint: disable=W0703
except Exception as e:
# sometimes google throws out 400 error when it's 500
# https://github.com/googleapis/python-bigquery/issues/23
if (
# pylint: disable=E1101
(isinstance(e, BadRequest) and e.reason == "backendError")
or isinstance(e, (ServerError, InternalServerError))
):
time.sleep(2 + random.randint(0, 1000) / 1000.0)
else:
raise e
if df is None:
df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS)

df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS)
if len(df) == 0:
df = pd.DataFrame(
columns=["open_covid_region_code", "date"] +
list(colname_map.keys())
)
df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys()))

df = preprocess(df, level)

return df


Expand Down
38 changes: 38 additions & 0 deletions google_symptoms/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import mock
from freezegun import freeze_time
from datetime import date, datetime
from google.api_core.exceptions import BadRequest, ServerError

import pandas as pd
from google.rpc import error_details_pb2
from pandas.testing import assert_frame_equal

from delphi_google_symptoms.pull import (
Expand Down Expand Up @@ -120,6 +123,41 @@ def test_pull_one_gs_no_dates(self, mock_read_gbq):
expected = pd.DataFrame(columns=new_keep_cols)
assert_frame_equal(output, expected, check_dtype = False)

def test_pull_one_gs_retry_success(self):
info = error_details_pb2.ErrorInfo(
reason="backendError",
)
badRequestException = BadRequest(message="message", error_info=info)
serverErrorException = ServerError(message="message")

with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
mock_read_gbq.side_effect = [badRequestException, pd.DataFrame()]

output = pull_gs_data_one_geolevel("state", ["", ""])
expected = pd.DataFrame(columns=new_keep_cols)
assert_frame_equal(output, expected, check_dtype = False)
assert mock_read_gbq.call_count == 2

def test_pull_one_gs_retry_too_many(self):
info = error_details_pb2.ErrorInfo(
reason="backendError",
)
badRequestException = BadRequest(message="message", error_info=info)

with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
with pytest.raises(BadRequest):
mock_read_gbq.side_effect = [badRequestException, badRequestException, pd.DataFrame()]
pull_gs_data_one_geolevel("state", ["", ""])


def test_pull_one_gs_retry_bad(self):
badRequestException = BadRequest(message="message", )

with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
with pytest.raises(BadRequest):
mock_read_gbq.side_effect = [badRequestException,pd.DataFrame()]
pull_gs_data_one_geolevel("state", ["", ""])

def test_preprocess_no_data(self):
output = preprocess(pd.DataFrame(columns=keep_cols), "state")
expected = pd.DataFrame(columns=new_keep_cols)
Expand Down

0 comments on commit 57368e9

Please sign in to comment.