diff --git a/tap_appsflyer/__init__.py b/tap_appsflyer/__init__.py index e4d8b4f..692c93a 100644 --- a/tap_appsflyer/__init__.py +++ b/tap_appsflyer/__init__.py @@ -271,25 +271,16 @@ def sync_installs(): next(reader) # Skip the heading row - # AppsFlyer returns records in order of most recent first. So, we - # need to reverse them in order to provide sensible STATE - # checkpoints. According to the API documentation, there may be - # as many as 200,000 rows here but I don't have a better solution. - rows = [] - for row in reader: - rows.append(row) - rows = reversed(rows) - - # Emit updated records with state checkpoint - for i, row in enumerate(rows): + bookmark = from_datetime + for i, row in enumerate(reader): record = xform(row, schema) singer.write_record("installs", record) - utils.update_state(STATE, "installs", record["attributed_touch_time"]) + # AppsFlyer returns records in order of most recent first. + if utils.strptime(record["attributed_touch_time"]) > bookmark: + bookmark = utils.strptime(record["attributed_touch_time"]) - if (i % 250) == 0: - singer.write_state(STATE) - - # Write out final state + # Write out state + utils.update_state(STATE, "installs", bookmark) singer.write_state(STATE) @@ -407,25 +398,17 @@ def sync_in_app_events(): next(reader) # Skip the heading row - # AppsFlyer returns records in order of most recent first. So, we - # need to reverse them in order to provide sensible STATE - # checkpoints. According to the API documentation, there may be - # as many as 200,000 rows here but I don't have a better solution. - rows = [] - for row in reader: - rows.append(row) - rows = reversed(rows) - - # Emit updated records with state checkpoint - for i, row in enumerate(rows): + bookmark = from_datetime + for i, row in enumerate(reader): record = xform(row, schema) singer.write_record("in_app_events", record) - utils.update_state(STATE, "in_app_events", record["event_time"]) # NOTE: This is different in each report - - if (i % 250) == 0: - singer.write_state(STATE) + # AppsFlyer returns records in order of most recent first. + if utils.strptime(record["event_time"]) > bookmark: + LOGGER.info("replacing bookmark!") + bookmark = utils.strptime(record["event_time"]) - # Write out final state + # Write out state + utils.update_state(STATE, "in_app_events", bookmark) singer.write_state(STATE)