Skip to content

Commit

Permalink
Merge pull request #7 from kerrermanisNL/add_dynamic_exchange_for_sen…
Browse files Browse the repository at this point in the history
…d_event

Add dynamic exchange for send event
  • Loading branch information
ThomasAllison authored Feb 3, 2022
2 parents 3f8de05 + 898b4e5 commit 8ddc580
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
16 changes: 11 additions & 5 deletions eventsender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,29 @@ def dst(self, dt):
utc = UTC()


def send_event(event):
def send_event(event, exchange=None, routing_key=None):
"""
Add a timestamp to the event data and send it to a message queue
:param dict event: JSON-serialisable dictionary
:param str exchange: Exchange to use. If not set will default to EVENT_QUEUE_EXCHANGE setting.
:param str routing_key: Routing key to use. If not set will default to EVENT_QUEUE_EXCHANGE setting.
If this is not set, use blank by default.
"""
settings = get_settings()
exchange = exchange or settings.EVENT_QUEUE_EXCHANGE
routing_key = routing_key or getattr(settings, 'EVENT_QUEUE_ROUTING_KEY', '')

if not settings.EVENT_QUEUE_URL:
raise ImproperlyConfigured('EVENT_QUEUE_URL is not configured in settings')
if not settings.EVENT_QUEUE_EXCHANGE:
raise ImproperlyConfigured('EVENT_QUEUE_EXCHANGE is not configured in settings')
if not exchange:
raise ImproperlyConfigured('EVENT_QUEUE_EXCHANGE is not configured in settings '
'and no exchange provided in parameters.')

event.update({'timestamp': datetime.datetime.now(tz=utc).isoformat()})
with open_channel(settings.EVENT_QUEUE_URL) as channel:
channel.basic_publish(
exchange=settings.EVENT_QUEUE_EXCHANGE,
routing_key=getattr(settings, 'EVENT_QUEUE_ROUTING_KEY', ''),
exchange=exchange,
routing_key=routing_key,
body=json.dumps(event),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json')
)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def readfile(filename):

setup(
name='eventsender',
version='1.1.2',
version='1.1.3',
packages=find_packages(exclude=['tests*']),
url='https://github.com/ByteInternet/eventsender',
author='Byte B.V.',
Expand Down
60 changes: 60 additions & 0 deletions tests/unit/test_eventhandler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from unittest.mock import Mock

import pika
import eventsender
Expand Down Expand Up @@ -54,3 +55,62 @@ def test_raise_exchange_improperly_configured(self):
self.set_up_patch('eventsender.get_settings', return_value=mock_settings)
with self.assertRaises(ImproperlyConfigured):
eventsender.send_event({})

def test_does_not_raise_exchange_improperly_configured_if_no_setting_but_param_provided(self):
mock_settings = Settings('amqp://host/url', None, 'key')
self.set_up_patch('eventsender.get_settings', return_value=mock_settings)

eventsender.send_event({}, "my_exchange")

def test_send_event_uses_provided_exchange_parameter(self):
eventsender.send_event({}, "my_exchange")

self.mock_channel.basic_publish.assert_called_once_with(
exchange='my_exchange',
routing_key='key',
body=json.dumps(dict({}, timestamp=self.now.isoformat())),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json'))

def test_send_event_exchange_parameter_takes_precedence_over_exchange_setting(self):
mock_settings = Settings('amqp://host/url', "other_exchange", 'key')
self.set_up_patch('eventsender.get_settings', return_value=mock_settings)

eventsender.send_event({}, "my_exchange")

self.mock_channel.basic_publish.assert_called_once_with(
exchange='my_exchange',
routing_key='key',
body=json.dumps(dict({}, timestamp=self.now.isoformat())),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json'))

def test_send_event_routing_key_takes_precendence_over_exchange_setting(self):
mock_settings = Settings('amqp://host/url', "exchange", 'key')
self.set_up_patch('eventsender.get_settings', return_value=mock_settings)

eventsender.send_event({}, routing_key="my_key")

self.mock_channel.basic_publish.assert_called_once_with(
exchange='exchange',
routing_key='my_key',
body=json.dumps(dict({}, timestamp=self.now.isoformat())),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json'))

def test_uses_blank_routing_key_if_no_setting_and_no_parameter_provided(self):
class NoKeySettings:
EVENT_QUEUE_URL = None
EVENT_QUEUE_EXCHANGE = None

def __init__(self, event_queue_url, event_queue_exchange):
self.EVENT_QUEUE_URL= event_queue_url
self.EVENT_QUEUE_EXCHANGE= event_queue_exchange

mock_settings = NoKeySettings('amqp://host/url', "exchange")
self.set_up_patch('eventsender.get_settings', return_value=mock_settings)

eventsender.send_event({})

self.mock_channel.basic_publish.assert_called_once_with(
exchange='exchange',
routing_key='',
body=json.dumps(dict({}, timestamp=self.now.isoformat())),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json'))

0 comments on commit 8ddc580

Please sign in to comment.