Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PBE-1321] support campaign endpoints #156

Merged
merged 23 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions stream_chat/async_chat/campaign.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import datetime
from typing import Any, Optional, Union

from stream_chat.base.campaign import CampaignInterface
from stream_chat.types.campaign import CampaignData
from stream_chat.types.stream_response import StreamResponse


class Campaign(CampaignInterface):
async def create(
self, campaign_id: Optional[str] = None, data: Optional[CampaignData] = None
) -> StreamResponse:
if campaign_id is not None:
self.campaign_id = campaign_id
if data is not None:
self.data = data
state = await self.client.create_campaign( # type: ignore
campaign_id=self.campaign_id, data=self.data
)

if self.campaign_id is None and state.is_ok() and "campaign" in state:
self.campaign_id = state["campaign"]["id"]
return state

async def get(self) -> StreamResponse:
return await self.client.get_campaign( # type: ignore
campaign_id=self.campaign_id
)

async def update(self, data: CampaignData) -> StreamResponse:
return await self.client.update_campaign( # type: ignore
campaign_id=self.campaign_id, data=data
)

async def delete(self, **options: Any) -> StreamResponse:
return await self.client.delete_campaign( # type: ignore
campaign_id=self.campaign_id, **options
)

async def start(
self, scheduled_for: Optional[Union[str, datetime.datetime]] = None
) -> StreamResponse:
return await self.client.start_campaign( # type: ignore
campaign_id=self.campaign_id, scheduled_for=scheduled_for
)

async def stop(self) -> StreamResponse:
return await self.client.stop_campaign( # type: ignore
campaign_id=self.campaign_id
)
160 changes: 135 additions & 25 deletions stream_chat/async_chat/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,21 @@
Optional,
Type,
Union,
cast,
)
from urllib.parse import urlparse

from stream_chat.async_chat.campaign import Campaign
from stream_chat.async_chat.segment import Segment
from stream_chat.types.base import SortParam
from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions
from stream_chat.types.segment import (
QuerySegmentsOptions,
QuerySegmentTargetsOptions,
SegmentData,
SegmentType,
)

if sys.version_info >= (3, 8):
from typing import Literal
else:
Expand Down Expand Up @@ -537,45 +549,143 @@ async def delete_role(self, name: str) -> StreamResponse:
async def list_roles(self) -> StreamResponse:
return await self.get("roles")

async def create_segment(self, segment: Dict) -> StreamResponse:
return await self.post("segments", data={"segment": segment})
def segment( # type: ignore
self,
segment_type: SegmentType,
segment_id: Optional[str] = None,
data: Optional[SegmentData] = None,
) -> Segment:
return Segment(
client=self, segment_type=segment_type, segment_id=segment_id, data=data
)

async def create_segment(
self,
segment_type: SegmentType,
segment_id: Optional[str] = None,
data: Optional[SegmentData] = None,
) -> StreamResponse:
payload = {"type": segment_type.value}
if segment_id is not None:
payload["id"] = segment_id
if data is not None:
payload.update(cast(dict, data))
return await self.post("segments", data=payload)

async def query_segments(self, **params: Any) -> StreamResponse:
return await self.get("segments", params={"payload": json.dumps(params)})
async def get_segment(self, segment_id: str) -> StreamResponse:
return await self.get(f"segments/{segment_id}")

async def update_segment(self, segment_id: str, data: Dict) -> StreamResponse:
return await self.put(f"segments/{segment_id}", data={"segment": data})
async def query_segments(
self,
filter_conditions: Optional[Dict[str, Any]] = None,
sort: Optional[List[SortParam]] = None,
options: Optional[QuerySegmentsOptions] = None,
) -> StreamResponse:
payload = {}
if filter_conditions is not None:
payload["filter"] = filter_conditions
if sort is not None:
payload["sort"] = sort # type: ignore
if options is not None:
payload.update(cast(dict, options))
return await self.post("segments/query", data=payload)

async def update_segment(
self, segment_id: str, data: SegmentData
) -> StreamResponse:
return await self.put(f"segments/{segment_id}", data=data)

async def delete_segment(self, segment_id: str) -> StreamResponse:
return await self.delete(f"segments/{segment_id}")

async def create_campaign(self, campaign: Dict) -> StreamResponse:
return await self.post("campaigns", data={"campaign": campaign})
async def segment_target_exists(
self, segment_id: str, target_id: str
) -> StreamResponse:
return await self.get(f"segments/{segment_id}/target/{target_id}")

async def query_campaigns(self, **params: Any) -> StreamResponse:
return await self.get("campaigns", params={"payload": json.dumps(params)})
async def add_segment_targets(
self, segment_id: str, target_ids: List[str]
) -> StreamResponse:
return await self.post(
f"segments/{segment_id}/addtargets", data={"target_ids": target_ids}
)

async def update_campaign(self, campaign_id: str, data: Dict) -> StreamResponse:
return await self.put(f"campaigns/{campaign_id}", data={"campaign": data})
async def query_segment_targets(
self,
segment_id: str,
filter_conditions: Optional[Dict[str, Any]] = None,
sort: Optional[List[SortParam]] = None,
options: Optional[QuerySegmentTargetsOptions] = None,
) -> StreamResponse:
payload = {}
if filter_conditions is not None:
payload["filter"] = filter_conditions
if sort is not None:
payload["sort"] = sort # type: ignore
if options is not None:
payload.update(cast(dict, options))
return await self.post(f"segments/{segment_id}/targets/query", data=payload)

async def remove_segment_targets(
self, segment_id: str, target_ids: List[str]
) -> StreamResponse:
return await self.post(
f"segments/{segment_id}/deletetargets", data={"target_ids": target_ids}
)

async def delete_campaign(self, campaign_id: str, **options: Any) -> StreamResponse:
return await self.delete(f"campaigns/{campaign_id}", params=options)
def campaign( # type: ignore
self, campaign_id: Optional[str] = None, data: Optional[CampaignData] = None
) -> Campaign:
return Campaign(client=self, campaign_id=campaign_id, data=data)

async def schedule_campaign(
self, campaign_id: str, scheduled_for: int = None
async def create_campaign(
self, campaign_id: Optional[str] = None, data: Optional[CampaignData] = None
) -> StreamResponse:
return await self.patch(
f"campaigns/{campaign_id}/schedule", data={"scheduled_for": scheduled_for}
)
payload = {"id": campaign_id}
if data is not None:
payload.update(cast(dict, data))
return await self.post("campaigns", data=payload)

async def query_recipients(self, **params: Any) -> StreamResponse:
return await self.get("recipients", params={"payload": json.dumps(params)})
async def get_campaign(self, campaign_id: str) -> StreamResponse:
return await self.get(f"campaigns/{campaign_id}")

async def stop_campaign(self, campaign_id: str) -> StreamResponse:
return await self.patch(f"campaigns/{campaign_id}/stop")
async def query_campaigns(
self,
filter_conditions: Optional[Dict[str, Any]] = None,
sort: Optional[List[SortParam]] = None,
options: QueryCampaignsOptions = None,
) -> StreamResponse:
payload = {}
if filter_conditions is not None:
payload["filter"] = filter_conditions
if sort is not None:
payload["sort"] = sort # type: ignore
if options is not None:
payload.update(cast(dict, options))
return await self.post("campaigns/query", data=payload)

async def update_campaign(
self, campaign_id: str, data: CampaignData
) -> StreamResponse:
return await self.put(f"campaigns/{campaign_id}", data=data)

async def delete_campaign(self, campaign_id: str, **options: Any) -> StreamResponse:
return await self.delete(f"campaigns/{campaign_id}", options)

async def resume_campaign(self, campaign_id: str) -> StreamResponse:
return await self.patch(f"campaigns/{campaign_id}/resume")
async def start_campaign(
self,
campaign_id: str,
scheduled_for: Optional[Union[str, datetime.datetime]] = None,
) -> StreamResponse:
payload = {}
if scheduled_for is not None:
if isinstance(scheduled_for, datetime.datetime):
scheduled_for = scheduled_for.isoformat()
payload["scheduled_for"] = scheduled_for
return await self.post(f"campaigns/{campaign_id}/start", data=payload)

async def stop_campaign(self, campaign_id: str) -> StreamResponse:
return await self.post(f"campaigns/{campaign_id}/stop")

async def test_campaign(
self, campaign_id: str, users: Iterable[str]
Expand Down
65 changes: 65 additions & 0 deletions stream_chat/async_chat/segment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import Dict, List, Optional

from stream_chat.base.segment import SegmentInterface
from stream_chat.types.base import SortParam
from stream_chat.types.segment import QuerySegmentTargetsOptions, SegmentData
from stream_chat.types.stream_response import StreamResponse


class Segment(SegmentInterface):
async def create(
self, segment_id: Optional[str] = None, data: Optional[SegmentData] = None
) -> StreamResponse:
if segment_id is not None:
self.segment_id = segment_id
if data is not None:
self.data = data

state = await self.client.create_segment( # type: ignore
segment_type=self.segment_type, segment_id=self.segment_id, data=self.data
)

if self.segment_id is None and state.is_ok() and "segment" in state:
self.segment_id = state["segment"]["id"]
return state

async def get(self) -> StreamResponse:
return await self.client.get_segment(segment_id=self.segment_id) # type: ignore

async def update(self, data: SegmentData) -> StreamResponse:
return await self.client.update_segment( # type: ignore
segment_id=self.segment_id, data=data
)

async def delete(self) -> StreamResponse:
return await self.client.delete_segment( # type: ignore
segment_id=self.segment_id
)

async def target_exists(self, target_id: str) -> StreamResponse:
return await self.client.segment_target_exists( # type: ignore
segment_id=self.segment_id, target_id=target_id
)

async def add_targets(self, target_ids: list) -> StreamResponse:
return await self.client.add_segment_targets( # type: ignore
segment_id=self.segment_id, target_ids=target_ids
)

async def query_targets(
self,
filter_conditions: Optional[Dict] = None,
sort: Optional[List[SortParam]] = None,
options: Optional[QuerySegmentTargetsOptions] = None,
) -> StreamResponse:
return await self.client.query_segment_targets( # type: ignore
segment_id=self.segment_id,
filter_conditions=filter_conditions,
sort=sort,
options=options,
)

async def remove_targets(self, target_ids: list) -> StreamResponse:
return await self.client.remove_segment_targets( # type: ignore
segment_id=self.segment_id, target_ids=target_ids
)
49 changes: 49 additions & 0 deletions stream_chat/base/campaign.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import abc
import datetime
from typing import Awaitable, Optional, Union

from stream_chat.base.client import StreamChatInterface
from stream_chat.types.campaign import CampaignData
from stream_chat.types.stream_response import StreamResponse


class CampaignInterface(abc.ABC):
def __init__(
self,
client: StreamChatInterface,
campaign_id: Optional[str] = None,
data: CampaignData = None,
):
self.client = client
self.campaign_id = campaign_id
self.data = data

@abc.abstractmethod
def create(
self, campaign_id: Optional[str], data: Optional[CampaignData]
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def get(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def update(
self, data: CampaignData
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def delete(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def start(
self, scheduled_for: Optional[Union[str, datetime.datetime]] = None
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def stop(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass
Loading
Loading