Skip to content

Commit

Permalink
Merge pull request #13 from mr-raccoon-97/main
Browse files Browse the repository at this point in the history
Simplified publisher
  • Loading branch information
mapache-software authored Nov 14, 2024
2 parents 46cbf90 + bec4d1d commit 97261fa
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 105 deletions.
125 changes: 46 additions & 79 deletions pybondi/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,21 @@

class Base(ABC):
"""
An abstract base class for subscribers to a publisher.
An abstract base class for a publisher.
Subscribers are responsible for receiving messages, managing transactions,
and closing their connections.
A publisher is responsible for publishing messages to external systems
using callables as subscribers. It should implement the methods to
mantain transacional consistency and resource management using the
begin, commit, rollback and close methods.
The difference between a publisher and a messagebus is that a publisher
is responsible for publish data from the inside of the bounded context
to outside systems. A message bus is responsible for routing events and
commands within the bounded context, and handle them inside a transaction.
Messages passed to the publisher should be serializable objects and
inmutable.
"""

@abstractmethod
Expand Down Expand Up @@ -44,96 +55,52 @@ def close(self) -> None:
Closes the subscriber and its connection.
"""

class Subscriber(Base):
class Publisher(Base):
'''
An in memory publisher using a simple queue as buffer for messages.
'''

def __init__(self):
self.handlers = dict[str, list[Callable]]()
self.subscribers = dict[str, list[Callable]]()
self.queue = deque[tuple[str, Any]]()

def subscribe(self, topic: str, handler: Callable):
self.handlers.setdefault(topic, []).append(handler)
def subscribe(self, topic: str, subscriber: Callable):
'''
Subscribes a new handler to a topic. Each topic can have multiple
subscribers.
Parameters:
topic: The topic to subscribe to.
handler: The handler to subscribe.
'''
self.subscribers.setdefault(topic, []).append(subscriber)

def handle(self, topic: str, message: Any):
'''
Receives a message from the publisher.
'''
self.queue.append((topic, message))

def commit(self):
'''
Commit all messages enqueued to their respective subscribers.
'''
while self.queue:
topic, message = self.queue.popleft()
[handler(message) for handler in self.handlers.get(topic, [])]
[subscriber(message) for subscriber in self.subscribers.get(topic, [])]

def rollback(self):
'''
Rollback all messages enqueued.
'''
self.queue.clear()

def begin(self):
[handler('begin', None) for handler in self.handlers.get('begin', [])]
'''
Begin a new transaction.
'''
[subscriber(None) for subscriber in self.subscribers.get('begin', [])]

def close(self):
[handler('begin', None) for handler in self.handlers.get('close', [])]


class Publisher:
"""
A publisher that manages a set of subscribers and publishes messages to them.
The publisher is responsible for subscribing and unsubscribing subscribers,
publishing messages to subscribed topics, and coordinating transactions
across all subscribers.
The difference between a publisher and a message bus is that a publisher
is responsible for publish data from the inside of the bounded context
to outside systems. A message bus is responsible for routing events and
commands within the bounded context.
"""

def __init__(self) -> None:
"""
Initializes the publisher with an empty subscriber dictionary.
"""
self.subscribers = list[Subscriber]()

def subscribe(self, subscriber: Subscriber) -> None:
"""
Subscribes a new subscriber to the publisher.
Parameters:
subscriber: The subscriber to subscribe.
"""
assert isinstance(subscriber, Base), 'Subscriber should inherit from Base'
self.subscribers.append(subscriber)

def publish(self, topic: str, message: Any) -> None:
"""
Publishes a message to a specific topic.
The message is sent to all subscribers of the given topic.
Parameters:
topic: The topic to publish to.
message: The message to be published.
"""
[subscriber.handle(topic, message) for subscriber in self.subscribers]

def commit(self) -> None:
"""
Commits the current transaction for all subscribers.
"""
[subscriber.commit() for subscriber in self.subscribers]


def rollback(self) -> None:
"""
Rolls back the current transaction for all subscribers.
"""
[subscriber.rollback() for subscriber in self.subscribers]


def begin(self) -> None:
"""
Starts a new transaction for all subscribers.
"""
[subscriber.begin() for subscriber in self.subscribers]

def close(self) -> None:
"""
Closes all subscribers and their connections.
"""
[subscriber.close() for subscriber in self.subscribers]
[subscriber(None) for subscriber in self.subscribers.get('close', [])]
self.queue.clear()
17 changes: 8 additions & 9 deletions pybondi/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,39 @@
from pybondi.messagebus import Messagebus, Command, Event
from pybondi.repository import Repository
from pybondi.publisher import Publisher
from pybondi.publisher import Base as Subscriber

class Session:
"""
A session manages a unit of work, cordinates the repository, the message bus and
the publisher, mantaing the transactional boundaries.
"""
subscribers = list[Subscriber]()
event_handlers = dict[type[Event], list[Callable[[Event], None]]]()
command_handlers = dict[type[Command], Callable[[Command], None]]()
subscribers = dict[str, list[Callable]]()

@classmethod
def add_event_handler(cls, event_type: type[Event], handler: Callable[[Event], None]):
"""
Adds an event handler for a given event type, when a new session is created
Adds an event handler for a given event type when a new session is created
if no message bus is provided.
"""
cls.event_handlers.setdefault(event_type, []).append(handler)

@classmethod
def add_command_handler(cls, command_type: type[Command], handler: Callable[[Command], None]):
"""
Adds a command handler for a given command type, when a new session is created
Adds a command handler for a given command type when a new session is created
if no message bus is provided.
"""
cls.command_handlers[command_type] = handler

@classmethod
def add_subscriber(cls, subscriber: Subscriber):
def add_subscriber(cls, topic: str, subscriber: Callable):
"""
Add a subscriber to the publisher session, when a new session is created
Adds a subscriber for a given topic when a new session is created
if no publisher is provided.
"""
cls.subscribers.append(subscriber)
cls.subscribers.setdefault(topic, []).append(subscriber)

def __init__(self, repository: Repository = None, publisher: Publisher = None, messagebus: Messagebus = None):
self.repository = repository or Repository()
Expand All @@ -48,8 +47,8 @@ def __init__(self, repository: Repository = None, publisher: Publisher = None, m
self.publisher = publisher
else:
self.publisher = Publisher()
for subscriber in self.subscribers:
self.publisher.subscribe(subscriber)
for topic, subscriber in self.subscribers.items():
self.publisher.subscribe(topic, subscriber)

if messagebus:
self.messagebus = messagebus
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pybondi"
version = "0.2.7"
version = "0.3.0"
description = "A lightweight library for creating event driven systems using domain driven design."
authors = ["Eric Cardozo <[email protected]>"]
license = "MIT"
Expand Down
71 changes: 55 additions & 16 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,59 @@
from pytest import fixture
from pybondi.publisher import Publisher, Subscriber

def handle_event(event):
assert event == 'data'
from pytest import fixture, fail
from unittest.mock import Mock
from pybondi.publisher import Publisher

@fixture
def subscriber():
subscriber = Subscriber()
subscriber.subscribe('event', handle_event)
return subscriber
def publisher():
return Publisher()

@fixture
def publisher(subscriber):
publisher = Publisher()
publisher.subscribe(subscriber)
return publisher
def test_subscribe_and_publish_message(publisher: Publisher):
subscriber = Mock()
publisher.subscribe("topic1", subscriber)

publisher.handle("topic1", "test message")
publisher.commit()
subscriber.assert_called_once_with("test message")

def test_multiple_subscribers_on_same_topic(publisher: Publisher):
subscriber1 = Mock()
subscriber2 = Mock()
publisher.subscribe("topic1", subscriber1)
publisher.subscribe("topic1", subscriber2)
publisher.handle("topic1", "test message")
publisher.commit()

subscriber1.assert_called_once_with("test message")
subscriber2.assert_called_once_with("test message")

def test_no_subscribers_no_errors(publisher: Publisher):
# Handle a message with no subscribers
publisher.handle("topic1", "test message")
try:
publisher.commit()
except Exception as e:
fail(f"Commit failed with error: {e}")

def test_rollback_message(publisher: Publisher):
subscriber = Mock()
publisher.subscribe("topic1", subscriber)

publisher.handle("topic1", "test message")
publisher.rollback()

publisher.commit()
subscriber.assert_not_called()

def test_begin_transaction(publisher: Publisher):
subscriber = Mock()
publisher.subscribe("begin", subscriber)

publisher.begin()
subscriber.assert_called_once_with(None)

def test_publisher(publisher: Publisher):
publisher.publish('event', 'data')
def test_close_transaction(publisher: Publisher):
subscriber = Mock()
publisher.subscribe("close", subscriber)

publisher.close()

subscriber.assert_called_once_with(None)

0 comments on commit 97261fa

Please sign in to comment.