Skip to content

Commit

Permalink
fix memory issues by tracking the bookmark and emiting state at the end
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Allan committed Jul 10, 2018
1 parent 201fd3b commit b4dbbf1
Showing 1 changed file with 15 additions and 32 deletions.
47 changes: 15 additions & 32 deletions tap_appsflyer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,25 +271,16 @@ def sync_installs():

next(reader) # Skip the heading row

# AppsFlyer returns records in order of most recent first. So, we
# need to reverse them in order to provide sensible STATE
# checkpoints. According to the API documentation, there may be
# as many as 200,000 rows here but I don't have a better solution.
rows = []
for row in reader:
rows.append(row)
rows = reversed(rows)

# Emit updated records with state checkpoint
for i, row in enumerate(rows):
bookmark = from_datetime
for i, row in enumerate(reader):
record = xform(row, schema)
singer.write_record("installs", record)
utils.update_state(STATE, "installs", record["attributed_touch_time"])
# AppsFlyer returns records in order of most recent first.
if utils.strptime(record["attributed_touch_time"]) > bookmark:
bookmark = utils.strptime(record["attributed_touch_time"])

if (i % 250) == 0:
singer.write_state(STATE)

# Write out final state
# Write out state
utils.update_state(STATE, "installs", bookmark)
singer.write_state(STATE)


Expand Down Expand Up @@ -407,25 +398,17 @@ def sync_in_app_events():

next(reader) # Skip the heading row

# AppsFlyer returns records in order of most recent first. So, we
# need to reverse them in order to provide sensible STATE
# checkpoints. According to the API documentation, there may be
# as many as 200,000 rows here but I don't have a better solution.
rows = []
for row in reader:
rows.append(row)
rows = reversed(rows)

# Emit updated records with state checkpoint
for i, row in enumerate(rows):
bookmark = from_datetime
for i, row in enumerate(reader):
record = xform(row, schema)
singer.write_record("in_app_events", record)
utils.update_state(STATE, "in_app_events", record["event_time"]) # NOTE: This is different in each report

if (i % 250) == 0:
singer.write_state(STATE)
# AppsFlyer returns records in order of most recent first.
if utils.strptime(record["event_time"]) > bookmark:
LOGGER.info("replacing bookmark!")
bookmark = utils.strptime(record["event_time"])

# Write out final state
# Write out state
utils.update_state(STATE, "in_app_events", bookmark)
singer.write_state(STATE)


Expand Down

0 comments on commit b4dbbf1

Please sign in to comment.