Skip to content

Commit

Permalink
chore: add email column for campaign and events Klaviyo v2 Streams (#210
Browse files Browse the repository at this point in the history
)

* chore: add email column for campaign and events Klaviyo v2 Streams

* chore: use map_record function

* chore: only add values if exist
  • Loading branch information
am6010 authored Nov 6, 2024
1 parent 3f130cb commit 9d6aa69
Showing 1 changed file with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class Campaigns(IncrementalKlaviyoStreamLatest):
cursor_field = "updated_at"
page_size = None
current_channel = None
include = "campaign-messages,tags"

def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
return "campaigns"
Expand Down Expand Up @@ -475,6 +476,24 @@ def request_params(
params["filter"] = archived_filter
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
included = response_json.get("included", [])
campaign_messages = [record for record in included if record["type"] == "campaign-message"]
campaign_message_cache = {record["id"]: record for record in campaign_messages}
for record in response_json.get("data", []):
relationships = record.get("relationships", {})
campaign_messages_data = relationships.get("campaign-messages", {}).get("data", [])
for idx, campaign_message in enumerate(campaign_messages_data):
campaign_message_id = campaign_message.get("id", None)
if campaign_message_id and campaign_message_id in campaign_message_cache:
message = campaign_message_cache.get(campaign_message_id)
from_email = message.get("attributes", {}).get("content", {}).get("from_email", None)
if from_email:
record[f"from_email_{idx}"] = from_email

record = self.map_record(record)
yield record

class Lists(IncrementalKlaviyoStreamLatest):
"""Docs: https://developers.klaviyo.com/en/reference/get-lists"""
Expand Down Expand Up @@ -562,16 +581,25 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
""":return an iterable containing each record in the response"""

response_json = response.json()
profiles = [record for record in response_json.get("included", []) if record["type"] == "profile"]
profile_cache = {record["id"]: record for record in profiles}
for record in response_json.get("data", []):
attributes = record["attributes"]
flow = attributes.get("$flow")
flow_message_id = attributes.get("$message")

event_properties = attributes.get("event_properties", {})
flow = event_properties.get("$flow")
flow_message_id = event_properties.get("$message")
record["flow_id"] = flow
record["flow_message_id"] = flow_message_id
record["campaign_id"] = flow_message_id if not flow else None
record[self.cursor_field] = attributes[self.cursor_field]

profiles_data = record.get("relationships", {}).get("profile", {}).get("data", None)
if profiles_data:
profile_id = profiles_data.get("id", None)
if profile_id and profile_id in profile_cache:
profile = profile_cache.get(profile_id)
profile_email = profile.get("attributes", {}).get("email", None)
if profile_email:
record["profile_email"] = profile_email
self.map_record(record)
yield process_record(record)


Expand Down

0 comments on commit 9d6aa69

Please sign in to comment.