Skip to content

Commit

Permalink
Merge pull request singer-io#7 from singer-io/iterate-over-in-app-events
Browse files Browse the repository at this point in the history
Iterate over in-app-events by using a shorter query window
  • Loading branch information
KAllan357 authored Jul 12, 2018
2 parents 60a438b + 834a9a1 commit e2ebeb7
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions tap_appsflyer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def get_start(key):
return datetime.datetime.now() - datetime.timedelta(days=30)


def get_stop(start_datetime):
return min(start_datetime + datetime.timedelta(days=30), datetime.datetime.now())
def get_stop(start_datetime, stop_time, days=30):
return min(start_datetime + datetime.timedelta(days=days), stop_time)


def get_base_url():
Expand Down Expand Up @@ -252,7 +252,7 @@ def sync_installs():
)

from_datetime = get_start("installs")
to_datetime = get_stop(from_datetime)
to_datetime = get_stop(from_datetime, datetime.datetime.now())

if to_datetime < from_datetime:
LOGGER.error("to_datetime (%s) is less than from_endtime (%s).", to_datetime, from_datetime)
Expand Down Expand Up @@ -378,37 +378,40 @@ def sync_in_app_events():
"original_url",
)

stop_time = datetime.datetime.now()
from_datetime = get_start("in_app_events")
to_datetime = get_stop(from_datetime)

if to_datetime < from_datetime:
LOGGER.error("to_datetime (%s) is less than from_endtime (%s).", to_datetime, from_datetime)
return

params = dict()
params["from"] = from_datetime.strftime("%Y-%m-%d %H:%M")
params["to"] = to_datetime.strftime("%Y-%m-%d %H:%M")
params["api_token"] = CONFIG["api_token"]

url = get_url("in_app_events", app_id=CONFIG["app_id"])
request_data = request(url, params)

csv_data = RequestToCsvAdapter(request_data)
reader = csv.DictReader(csv_data, fieldnames)

next(reader) # Skip the heading row

bookmark = from_datetime
for i, row in enumerate(reader):
record = xform(row, schema)
singer.write_record("in_app_events", record)
# AppsFlyer returns records in order of most recent first.
if utils.strptime(record["event_time"]) > bookmark:
bookmark = utils.strptime(record["event_time"])

# Write out state
utils.update_state(STATE, "in_app_events", bookmark)
singer.write_state(STATE)
to_datetime = get_stop(from_datetime, stop_time, 10)

while to_datetime <= stop_time:
LOGGER.info("Syncing data from %s to %s", from_datetime, to_datetime)
params = dict()
params["from"] = from_datetime.strftime("%Y-%m-%d %H:%M")
params["to"] = to_datetime.strftime("%Y-%m-%d %H:%M")
params["api_token"] = CONFIG["api_token"]

url = get_url("in_app_events", app_id=CONFIG["app_id"])
request_data = request(url, params)

csv_data = RequestToCsvAdapter(request_data)
reader = csv.DictReader(csv_data, fieldnames)

next(reader) # Skip the heading row

bookmark = from_datetime
for i, row in enumerate(reader):
record = xform(row, schema)
singer.write_record("in_app_events", record)
# AppsFlyer returns records in order of most recent first.
if utils.strptime(record["event_time"]) > bookmark:
bookmark = utils.strptime(record["event_time"])

# Write out state
utils.update_state(STATE, "in_app_events", bookmark)
singer.write_state(STATE)

# Move the timings forward
from_datetime = to_datetime
to_datetime = get_stop(from_datetime, stop_time, 10)


STREAMS = [
Expand Down

0 comments on commit e2ebeb7

Please sign in to comment.