Skip to content

Commit

Permalink
Add organic intallations (singer-io#20)
Browse files Browse the repository at this point in the history
* simple solution for organic installs

* return all the streams

* make organic installs opt out

* remove comment

* add todo

* add state

Co-authored-by: yonidavidson <[email protected]>
  • Loading branch information
yonidavidson and yonidavidson authored Aug 10, 2021
1 parent 0dcfced commit ed5e68f
Show file tree
Hide file tree
Showing 2 changed files with 383 additions and 2 deletions.
134 changes: 132 additions & 2 deletions tap_appsflyer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

CONFIG = {
"app_id": None,
"api_token": None,
"api_token": None
}


Expand All @@ -31,6 +31,7 @@

ENDPOINTS = {
"installs": "/export/{app_id}/installs_report/v5",
"organic_installs": "/export/{app_id}/organic_installs_report/v5",
"in_app_events": "/export/{app_id}/in_app_events_report/v5"
}

Expand Down Expand Up @@ -298,6 +299,132 @@ def sync_installs():
utils.update_state(STATE, "installs", bookmark)
singer.write_state(STATE)

def sync_organic_installs():

schema = load_schema("raw_data/organic_installs")
singer.write_schema("organic_installs", schema, [
"event_time",
"event_name",
"appsflyer_id"
])

# This order matters
fieldnames = (
"attributed_touch_type",
"attributed_touch_time",
"install_time",
"event_time",
"event_name",
"event_value",
"event_revenue",
"event_revenue_currency",
"event_revenue_usd",
"event_source",
"is_receipt_validated",
"af_prt",
"media_source",
"af_channel",
"af_keywords",
"campaign",
"af_c_id",
"af_adset",
"af_adset_id",
"af_ad",
"af_ad_id",
"af_ad_type",
"af_siteid",
"af_sub_siteid",
"af_sub1",
"af_sub2",
"af_sub3",
"af_sub4",
"af_sub5",
"af_cost_model",
"af_cost_value",
"af_cost_currency",
"contributor1_af_prt",
"contributor1_media_source",
"contributor1_campaign",
"contributor1_touch_type",
"contributor1_touch_time",
"contributor2_af_prt",
"contributor2_media_source",
"contributor2_campaign",
"contributor2_touch_type",
"contributor2_touch_time",
"contributor3_af_prt",
"contributor3_media_source",
"contributor3_campaign",
"contributor3_touch_type",
"contributor3_touch_time",
"region",
"country_code",
"state",
"city",
"postal_code",
"dma",
"ip",
"wifi",
"operator",
"carrier",
"language",
"appsflyer_id",
"advertising_id",
"idfa",
"android_id",
"customer_user_id",
"imei",
"idfv",
"platform",
"device_type",
"os_version",
"app_version",
"sdk_version",
"app_id",
"app_name",
"bundle_id",
"is_retargeting",
"retargeting_conversion_type",
"af_attribution_lookback",
"af_reengagement_window",
"is_primary_attribution",
"user_agent",
"http_referrer",
"original_url",
)

from_datetime = get_start("organic_installs")
to_datetime = get_stop(from_datetime, datetime.datetime.now())

if to_datetime < from_datetime:
LOGGER.error("to_datetime (%s) is less than from_endtime (%s).", to_datetime, from_datetime)
return

params = dict()
params["from"] = from_datetime.strftime("%Y-%m-%d %H:%M")
params["to"] = to_datetime.strftime("%Y-%m-%d %H:%M")
params["api_token"] = CONFIG["api_token"]

url = get_url("organic_installs", app_id=CONFIG["app_id"])
request_data = request(url, params)

csv_data = RequestToCsvAdapter(request_data)
reader = csv.DictReader(csv_data, fieldnames)

next(reader) # Skip the heading row

bookmark = from_datetime
for i, row in enumerate(reader):
record = xform(row, schema)
singer.write_record("organic_installs", record)
# AppsFlyer returns records in order of most recent first.
if utils.strptime(record["event_time"]) > bookmark:
bookmark = utils.strptime(record["event_time"])

# Write out state
utils.update_state(STATE, "organic_installs", bookmark)
singer.write_state(STATE)


def sync_in_app_events():

Expand Down Expand Up @@ -438,6 +565,9 @@ def sync_in_app_events():
def get_streams_to_sync(streams, state):
target_stream = state.get("this_stream")
result = streams
if "organic_installs" in CONFIG:
if CONFIG["organic_installs"]:
result.append(Stream("organic_installs", sync_organic_installs))
if target_stream:
result = list(itertools.dropwhile(lambda x: x.name != target_stream, streams))
if not result:
Expand All @@ -462,7 +592,7 @@ def main():
args = utils.parse_args(
[
"app_id",
"api_token",
"api_token"
])

CONFIG.update(args.config)
Expand Down
Loading

0 comments on commit ed5e68f

Please sign in to comment.