Skip to content

Commit

Permalink
Added ability to send SQS message. (#43)
Browse files Browse the repository at this point in the history
* Queue: added send_message method, docstrings, and typing
* Updated requirements and pipfile for dev
* added some typing
  • Loading branch information
mskytt authored Sep 20, 2022
1 parent eeba96d commit de828f2
Show file tree
Hide file tree
Showing 6 changed files with 850 additions and 27 deletions.
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"python.formatting.provider": "autopep8",
"python.linting.mypyEnabled": false,
"python.linting.pylintEnabled": true,
"python.linting.enabled": true
}
16 changes: 16 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[dev-packages]
mypy = "*"
flake8 = "*"
autopep8 = "*"
pylint = "*"

[requires]
python_version = "3.10"

[packages]
types-aiobotocore-sqs = "*"
754 changes: 754 additions & 0 deletions Pipfile.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
"botocore",
"aiohttp",
"async_timeout",
"types-aiobotocore-sqs"
]


setup(
name="tibber_aws",
packages=["tibber_aws"],
install_requires=install_requires,
version="0.12.0",
version="0.13.0",
description="A python3 library to communicate with Aws",
python_requires=">=3.7.0",
author="Tibber",
Expand Down
4 changes: 2 additions & 2 deletions tibber_aws/aws_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import logging
from contextlib import AsyncExitStack

from aiobotocore.session import get_session
from aiobotocore.session import get_session, AioSession
from aiobotocore.config import AioConfig

_LOGGER = logging.getLogger(__name__)


def get_aiosession():
def get_aiosession() -> AioSession:
return get_session()


Expand Down
94 changes: 70 additions & 24 deletions tibber_aws/aws_queue.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
"""SQS Queue object to subscribe queue to topic, produce and consume messages.
#TODO: Shouldn't have to subscribe to an SNS topic to get messages from SQS.
"""
import json
from typing import List, Optional
import logging
import time

from .aws_base import AwsBase, get_aiosession
from .aws_base import AwsBase, get_aiosession, AioSession
from types_aiobotocore_sqs.client import SQSClient
from types_aiobotocore_sqs.type_defs import SendMessageResultTypeDef

_LOGGER = logging.getLogger(__name__)

Expand All @@ -21,20 +29,29 @@ def receipt_handle(self):


class Queue(AwsBase):
"""Queue object to publish and consume messages from AWS SQS, async."""

_client: SQSClient

def __init__(self, queue_name, region_name="eu-west-1") -> None:
self._queue_name = queue_name
self.queue_url = None
self.queue_url: Optional[str] = None
super().__init__("sqs", region_name)

async def subscribe_topic(self, topic_name, session=None) -> None:
async def subscribe_topic(self, topic_name: str, session: AioSession = None) -> None:
"""Subscribe `Queue` to a topic.
:param topic_name: name of the topic to subscribe to
:type topic_name: str
:param session: the aiobotocore session to use, defaults to None
:type session: AioSession, optional
"""
session = session or get_aiosession()
await self.init_client_if_required(session)

response = await self._client.create_queue(QueueName=self._queue_name)
self.queue_url = response["QueueUrl"]
attr_response = await self._client.get_queue_attributes(
QueueUrl=self.queue_url, AttributeNames=["All"]
)
attr_response = await self._client.get_queue_attributes(QueueUrl=self.queue_url, AttributeNames=["All"])

queue_attributes = attr_response.get("Attributes")
queue_arn = queue_attributes.get("QueueArn")
Expand All @@ -58,45 +75,74 @@ async def subscribe_topic(self, topic_name, session=None) -> None:
statement["Resource"] = statement.get("Resource", queue_arn)
statement["Condition"] = statement.get("Condition", {})

statement["Condition"]["StringLike"] = statement["Condition"].get(
"StringLike", {}
)
statement["Condition"]["StringLike"] = statement["Condition"].get("StringLike", {})
source_arn = statement["Condition"]["StringLike"].get("aws:SourceArn", [])
if not isinstance(source_arn, list):
source_arn = [source_arn]

sns = await self._context_stack.enter_async_context(
session.create_client("sns", region_name=self._region_name)
)
sns = await self._context_stack.enter_async_context(session.create_client("sns", region_name=self._region_name))
response = await sns.create_topic(Name=topic_name)
topic_arn = response["TopicArn"]

if topic_arn not in source_arn:
source_arn.append(topic_arn)
statement["Condition"]["StringLike"]["aws:SourceArn"] = source_arn
policy["Statement"] = statement
await self._client.set_queue_attributes(
QueueUrl=self.queue_url, Attributes={"Policy": json.dumps(policy)}
)
await self._client.set_queue_attributes(QueueUrl=self.queue_url, Attributes={"Policy": json.dumps(policy)})

await sns.subscribe(
TopicArn=topic_arn, Protocol=self._service_name, Endpoint=queue_arn
)
await sns.subscribe(TopicArn=topic_arn, Protocol=self._service_name, Endpoint=queue_arn)
await sns.close()

async def receive_message(self, num_msgs=1) -> [MessageHandle]:
async def receive_message(self, num_msgs: int = 1) -> List[MessageHandle]:
"""
Consume messages from the queue.
:param num_msgs: max number of messages to receive. Default 1
:type num_msgs: int
:return: Message in the form of a message handle
:rtype: list[MessageHandle]
"""
if self.queue_url is None:
_LOGGER.error("No subscribed queue")
return [None]
response = await self._client.receive_message(
QueueUrl=self.queue_url, MaxNumberOfMessages=num_msgs
)
response = await self._client.receive_message(QueueUrl=self.queue_url, MaxNumberOfMessages=num_msgs)
res = []
for msg in response.get("Messages", []):
res.append(MessageHandle(msg))
return res

async def delete_message(self, msg_handle: MessageHandle) -> None:
await self._client.delete_message(
QueueUrl=self.queue_url, ReceiptHandle=msg_handle.receipt_handle
"""Delete a message from the queue.
:param msg_handle: Message handle to delete, will delete based on `msg_handle.receipt_handle`
:type msg_handle: MessageHandle
"""
await self._client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=msg_handle.receipt_handle)

async def send_message(self, message: dict, **kwargs) -> Optional[SendMessageResultTypeDef]:
"""Publish a message to the queue.
Example of returned metadata dict:
{'MD5OfMessageBody': '83956d1d4f05f535d55c9895ff593550',
'MessageId': 'd9654e21-f92b-4d27-bcb5-b27e1dc18839',
'ResponseMetadata': {'HTTPHeaders': {'content-length': '378',
'content-type': 'text/xml',
'date': 'Tue, 20 Sep 2022 07:00:38 GMT',
'x-amzn-requestid': '7202509e-f09f-5082-8aea-0b3d97c92220'},
'HTTPStatusCode': 200,
'RequestId': '7202509e-f09f-5082-8aea-0b3d97c92220',
'RetryAttempts': 0}}
:param message: Message to publish, must be serializable to JSON
:type message: dict
:return: sent message metadata
:rtype: SendMessageResultTypeDef
"""
if self.queue_url is None:
_LOGGER.error("No subscribed queue, call subscribe_topic first")
return
await self.init_client_if_required()
resp: SendMessageResultTypeDef = await self._client.send_message(
QueueUrl=self.queue_url, MessageBody=json.dumps(message), **kwargs
)
return resp

0 comments on commit de828f2

Please sign in to comment.