From 35470d2873b0a49f61098087fc15a3db59caf4fa Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Thu, 31 Oct 2024 13:13:49 +0200 Subject: [PATCH 1/3] chore: add email column for campaign and events Klaviyo v2 Streams --- .../source-klaviyo/source_klaviyo/streams.py | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 706bcc39b048..4c3bed0e9159 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -423,6 +423,7 @@ class Campaigns(IncrementalKlaviyoStreamLatest): cursor_field = "updated_at" page_size = None current_channel = None + include = "campaign-messages,tags" def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "campaigns" @@ -475,6 +476,22 @@ def request_params( params["filter"] = archived_filter return params + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + included = response_json.get("included", []) + campaign_messages = [record for record in included if record["type"] == "campaign-message"] + campaign_message_cache = {record["id"]: record for record in campaign_messages} + for record in response_json.get("data", []): + relationships = record.get("relationships", {}) + campaign_messages_data = relationships.get("campaign-messages", {}).get("data", []) + for idx, campaign_message in enumerate(campaign_messages_data): + campaign_message_id = campaign_message.get("id", None) + if campaign_message_id and campaign_message_id in campaign_message_cache: + message = campaign_message_cache.get(campaign_message_id) + record[f"from_email_{idx}"] = message.get("attributes", {}).get("content", {}).get("from_email", None) + + record = self.map_record(record) + yield record class Lists(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-lists""" @@ -562,15 +579,23 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp """:return an iterable containing each record in the response""" response_json = response.json() + profiles = [record for record in response_json.get("included", []) if record["type"] == "profile"] + profile_cache = {record["id"]: record for record in profiles} for record in response_json.get("data", []): attributes = record["attributes"] - flow = attributes.get("$flow") - flow_message_id = attributes.get("$message") - + event_properties = attributes.get("event_properties", {}) + flow = event_properties.get("$flow") + flow_message_id = event_properties.get("$message") record["flow_id"] = flow record["flow_message_id"] = flow_message_id record["campaign_id"] = flow_message_id if not flow else None record[self.cursor_field] = attributes[self.cursor_field] + profiles_data = record.get("relationships", {}).get("profile", {}).get("data", None) + if profiles_data: + profile_id = profiles_data.get("id", None) + if profile_id and profile_id in profile_cache: + profile = profile_cache.get(profile_id) + record["profile_email"] = profile.get("attributes", {}).get("email", None) yield process_record(record) From a0a671b8abb334417e2eeec7a08b9c7f9fb9ad4d Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Tue, 5 Nov 2024 12:36:16 +0200 Subject: [PATCH 2/3] chore: use map_record function --- .../connectors/source-klaviyo/source_klaviyo/streams.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 4c3bed0e9159..a279d62014b5 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -589,14 +589,13 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp record["flow_id"] = flow record["flow_message_id"] = flow_message_id record["campaign_id"] = flow_message_id if not flow else None - record[self.cursor_field] = attributes[self.cursor_field] profiles_data = record.get("relationships", {}).get("profile", {}).get("data", None) if profiles_data: profile_id = profiles_data.get("id", None) if profile_id and profile_id in profile_cache: profile = profile_cache.get(profile_id) record["profile_email"] = profile.get("attributes", {}).get("email", None) - + self.map_record(record) yield process_record(record) From 1ae08c925820c31c8228d93c682d0ba7fa7567fe Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Wed, 6 Nov 2024 10:04:58 +0200 Subject: [PATCH 3/3] chore: only add values if exist --- .../connectors/source-klaviyo/source_klaviyo/streams.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index a279d62014b5..52c31fffc279 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -488,7 +488,9 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp campaign_message_id = campaign_message.get("id", None) if campaign_message_id and campaign_message_id in campaign_message_cache: message = campaign_message_cache.get(campaign_message_id) - record[f"from_email_{idx}"] = message.get("attributes", {}).get("content", {}).get("from_email", None) + from_email = message.get("attributes", {}).get("content", {}).get("from_email", None) + if from_email: + record[f"from_email_{idx}"] = from_email record = self.map_record(record) yield record @@ -594,7 +596,9 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp profile_id = profiles_data.get("id", None) if profile_id and profile_id in profile_cache: profile = profile_cache.get(profile_id) - record["profile_email"] = profile.get("attributes", {}).get("email", None) + profile_email = profile.get("attributes", {}).get("email", None) + if profile_email: + record["profile_email"] = profile_email self.map_record(record) yield process_record(record)