From bec4d1d98cdc64ee45d82c3b02929d9c162a5ad9 Mon Sep 17 00:00:00 2001 From: mr-raccoon-97 Date: Thu, 14 Nov 2024 22:51:40 +0000 Subject: [PATCH] Simplified publisher --- pybondi/publisher.py | 125 +++++++++++++++------------------------- pybondi/session.py | 17 +++--- pyproject.toml | 2 +- tests/test_publisher.py | 71 ++++++++++++++++++----- 4 files changed, 110 insertions(+), 105 deletions(-) diff --git a/pybondi/publisher.py b/pybondi/publisher.py index 9aa8b4f..a0928c2 100644 --- a/pybondi/publisher.py +++ b/pybondi/publisher.py @@ -5,10 +5,21 @@ class Base(ABC): """ - An abstract base class for subscribers to a publisher. + An abstract base class for a publisher. - Subscribers are responsible for receiving messages, managing transactions, - and closing their connections. + A publisher is responsible for publishing messages to external systems + using callables as subscribers. It should implement the methods to + mantain transacional consistency and resource management using the + begin, commit, rollback and close methods. + + + The difference between a publisher and a messagebus is that a publisher + is responsible for publish data from the inside of the bounded context + to outside systems. A message bus is responsible for routing events and + commands within the bounded context, and handle them inside a transaction. + + Messages passed to the publisher should be serializable objects and + inmutable. """ @abstractmethod @@ -44,96 +55,52 @@ def close(self) -> None: Closes the subscriber and its connection. """ -class Subscriber(Base): +class Publisher(Base): + ''' + An in memory publisher using a simple queue as buffer for messages. + ''' + def __init__(self): - self.handlers = dict[str, list[Callable]]() + self.subscribers = dict[str, list[Callable]]() self.queue = deque[tuple[str, Any]]() - def subscribe(self, topic: str, handler: Callable): - self.handlers.setdefault(topic, []).append(handler) + def subscribe(self, topic: str, subscriber: Callable): + ''' + Subscribes a new handler to a topic. Each topic can have multiple + subscribers. + + Parameters: + topic: The topic to subscribe to. + handler: The handler to subscribe. + ''' + self.subscribers.setdefault(topic, []).append(subscriber) def handle(self, topic: str, message: Any): + ''' + Receives a message from the publisher. + ''' self.queue.append((topic, message)) def commit(self): + ''' + Commit all messages enqueued to their respective subscribers. + ''' while self.queue: topic, message = self.queue.popleft() - [handler(message) for handler in self.handlers.get(topic, [])] + [subscriber(message) for subscriber in self.subscribers.get(topic, [])] def rollback(self): + ''' + Rollback all messages enqueued. + ''' self.queue.clear() def begin(self): - [handler('begin', None) for handler in self.handlers.get('begin', [])] + ''' + Begin a new transaction. + ''' + [subscriber(None) for subscriber in self.subscribers.get('begin', [])] def close(self): - [handler('begin', None) for handler in self.handlers.get('close', [])] - - -class Publisher: - """ - A publisher that manages a set of subscribers and publishes messages to them. - - The publisher is responsible for subscribing and unsubscribing subscribers, - publishing messages to subscribed topics, and coordinating transactions - across all subscribers. - - The difference between a publisher and a message bus is that a publisher - is responsible for publish data from the inside of the bounded context - to outside systems. A message bus is responsible for routing events and - commands within the bounded context. - """ - - def __init__(self) -> None: - """ - Initializes the publisher with an empty subscriber dictionary. - """ - self.subscribers = list[Subscriber]() - - def subscribe(self, subscriber: Subscriber) -> None: - """ - Subscribes a new subscriber to the publisher. - - Parameters: - subscriber: The subscriber to subscribe. - """ - assert isinstance(subscriber, Base), 'Subscriber should inherit from Base' - self.subscribers.append(subscriber) - - def publish(self, topic: str, message: Any) -> None: - """ - Publishes a message to a specific topic. - - The message is sent to all subscribers of the given topic. - - Parameters: - topic: The topic to publish to. - message: The message to be published. - """ - [subscriber.handle(topic, message) for subscriber in self.subscribers] - - def commit(self) -> None: - """ - Commits the current transaction for all subscribers. - """ - [subscriber.commit() for subscriber in self.subscribers] - - - def rollback(self) -> None: - """ - Rolls back the current transaction for all subscribers. - """ - [subscriber.rollback() for subscriber in self.subscribers] - - - def begin(self) -> None: - """ - Starts a new transaction for all subscribers. - """ - [subscriber.begin() for subscriber in self.subscribers] - - def close(self) -> None: - """ - Closes all subscribers and their connections. - """ - [subscriber.close() for subscriber in self.subscribers] \ No newline at end of file + [subscriber(None) for subscriber in self.subscribers.get('close', [])] + self.queue.clear() \ No newline at end of file diff --git a/pybondi/session.py b/pybondi/session.py index cae9bc0..6af4825 100644 --- a/pybondi/session.py +++ b/pybondi/session.py @@ -5,21 +5,20 @@ from pybondi.messagebus import Messagebus, Command, Event from pybondi.repository import Repository from pybondi.publisher import Publisher -from pybondi.publisher import Base as Subscriber class Session: """ A session manages a unit of work, cordinates the repository, the message bus and the publisher, mantaing the transactional boundaries. """ - subscribers = list[Subscriber]() event_handlers = dict[type[Event], list[Callable[[Event], None]]]() command_handlers = dict[type[Command], Callable[[Command], None]]() + subscribers = dict[str, list[Callable]]() @classmethod def add_event_handler(cls, event_type: type[Event], handler: Callable[[Event], None]): """ - Adds an event handler for a given event type, when a new session is created + Adds an event handler for a given event type when a new session is created if no message bus is provided. """ cls.event_handlers.setdefault(event_type, []).append(handler) @@ -27,18 +26,18 @@ def add_event_handler(cls, event_type: type[Event], handler: Callable[[Event], N @classmethod def add_command_handler(cls, command_type: type[Command], handler: Callable[[Command], None]): """ - Adds a command handler for a given command type, when a new session is created + Adds a command handler for a given command type when a new session is created if no message bus is provided. """ cls.command_handlers[command_type] = handler @classmethod - def add_subscriber(cls, subscriber: Subscriber): + def add_subscriber(cls, topic: str, subscriber: Callable): """ - Add a subscriber to the publisher session, when a new session is created + Adds a subscriber for a given topic when a new session is created if no publisher is provided. """ - cls.subscribers.append(subscriber) + cls.subscribers.setdefault(topic, []).append(subscriber) def __init__(self, repository: Repository = None, publisher: Publisher = None, messagebus: Messagebus = None): self.repository = repository or Repository() @@ -48,8 +47,8 @@ def __init__(self, repository: Repository = None, publisher: Publisher = None, m self.publisher = publisher else: self.publisher = Publisher() - for subscriber in self.subscribers: - self.publisher.subscribe(subscriber) + for topic, subscriber in self.subscribers.items(): + self.publisher.subscribe(topic, subscriber) if messagebus: self.messagebus = messagebus diff --git a/pyproject.toml b/pyproject.toml index 7cce20c..c4a6864 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pybondi" -version = "0.2.7" +version = "0.3.0" description = "A lightweight library for creating event driven systems using domain driven design." authors = ["Eric Cardozo "] license = "MIT" diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 51c6e69..571c096 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -1,20 +1,59 @@ -from pytest import fixture -from pybondi.publisher import Publisher, Subscriber - -def handle_event(event): - assert event == 'data' +from pytest import fixture, fail +from unittest.mock import Mock +from pybondi.publisher import Publisher @fixture -def subscriber(): - subscriber = Subscriber() - subscriber.subscribe('event', handle_event) - return subscriber +def publisher(): + return Publisher() -@fixture -def publisher(subscriber): - publisher = Publisher() - publisher.subscribe(subscriber) - return publisher +def test_subscribe_and_publish_message(publisher: Publisher): + subscriber = Mock() + publisher.subscribe("topic1", subscriber) + + publisher.handle("topic1", "test message") + publisher.commit() + subscriber.assert_called_once_with("test message") + +def test_multiple_subscribers_on_same_topic(publisher: Publisher): + subscriber1 = Mock() + subscriber2 = Mock() + publisher.subscribe("topic1", subscriber1) + publisher.subscribe("topic1", subscriber2) + publisher.handle("topic1", "test message") + publisher.commit() + + subscriber1.assert_called_once_with("test message") + subscriber2.assert_called_once_with("test message") + +def test_no_subscribers_no_errors(publisher: Publisher): + # Handle a message with no subscribers + publisher.handle("topic1", "test message") + try: + publisher.commit() + except Exception as e: + fail(f"Commit failed with error: {e}") + +def test_rollback_message(publisher: Publisher): + subscriber = Mock() + publisher.subscribe("topic1", subscriber) + + publisher.handle("topic1", "test message") + publisher.rollback() + + publisher.commit() + subscriber.assert_not_called() + +def test_begin_transaction(publisher: Publisher): + subscriber = Mock() + publisher.subscribe("begin", subscriber) + + publisher.begin() + subscriber.assert_called_once_with(None) -def test_publisher(publisher: Publisher): - publisher.publish('event', 'data') \ No newline at end of file +def test_close_transaction(publisher: Publisher): + subscriber = Mock() + publisher.subscribe("close", subscriber) + + publisher.close() + + subscriber.assert_called_once_with(None)