diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 706bcc39b048..52c31fffc279 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -423,6 +423,7 @@ class Campaigns(IncrementalKlaviyoStreamLatest): cursor_field = "updated_at" page_size = None current_channel = None + include = "campaign-messages,tags" def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "campaigns" @@ -475,6 +476,24 @@ def request_params( params["filter"] = archived_filter return params + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + included = response_json.get("included", []) + campaign_messages = [record for record in included if record["type"] == "campaign-message"] + campaign_message_cache = {record["id"]: record for record in campaign_messages} + for record in response_json.get("data", []): + relationships = record.get("relationships", {}) + campaign_messages_data = relationships.get("campaign-messages", {}).get("data", []) + for idx, campaign_message in enumerate(campaign_messages_data): + campaign_message_id = campaign_message.get("id", None) + if campaign_message_id and campaign_message_id in campaign_message_cache: + message = campaign_message_cache.get(campaign_message_id) + from_email = message.get("attributes", {}).get("content", {}).get("from_email", None) + if from_email: + record[f"from_email_{idx}"] = from_email + + record = self.map_record(record) + yield record class Lists(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-lists""" @@ -562,16 +581,25 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp """:return an iterable containing each record in the response""" response_json = response.json() + profiles = [record for record in response_json.get("included", []) if record["type"] == "profile"] + profile_cache = {record["id"]: record for record in profiles} for record in response_json.get("data", []): attributes = record["attributes"] - flow = attributes.get("$flow") - flow_message_id = attributes.get("$message") - + event_properties = attributes.get("event_properties", {}) + flow = event_properties.get("$flow") + flow_message_id = event_properties.get("$message") record["flow_id"] = flow record["flow_message_id"] = flow_message_id record["campaign_id"] = flow_message_id if not flow else None - record[self.cursor_field] = attributes[self.cursor_field] - + profiles_data = record.get("relationships", {}).get("profile", {}).get("data", None) + if profiles_data: + profile_id = profiles_data.get("id", None) + if profile_id and profile_id in profile_cache: + profile = profile_cache.get(profile_id) + profile_email = profile.get("attributes", {}).get("email", None) + if profile_email: + record["profile_email"] = profile_email + self.map_record(record) yield process_record(record)