Skip to content

Commit

Permalink
Merge pull request #6 from mr-raccoon-97/main
Browse files Browse the repository at this point in the history
Bump version
  • Loading branch information
mapache-software authored Nov 7, 2024
2 parents b87effb + 09665a4 commit e25346a
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Python package
name: CD
on:
push:
branches:
Expand Down
26 changes: 26 additions & 0 deletions .github/workflows/python-CI.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: CI
on: [push, pull_request]

jobs:
tests:
strategy:
fail-fast: false
matrix:
python-version: ["3.12"]
poetry-version: ["latest", "1.8.3"]
os: [ubuntu-22.04, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Run image
uses: abatilo/actions-poetry@v2
with:
poetry-version: ${{ matrix.poetry-version }}
- name: Install dependencies
run: poetry install
- name: Test with pytest
run: |
poetry run pytest
71 changes: 69 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pybondi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pybondi.messagebus import Event
from pybondi.messagebus import Command
from pybondi.messagebus import Messagebus
from pybondi.aggregate import Aggregate
from pybondi.aggregate import Root
from pybondi.aggregate import Factory
from pybondi.session import Session
from pybondi.publisher import Publisher
from pybondi.repository import Repository
from pybondi.callbacks import Callbacks
14 changes: 7 additions & 7 deletions pybondi/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ def __init__(self, id: Any):
self.events = deque[Event]()

def publish(self, event: Event):
assert isinstance(event, Event), 'The event must be an instance of Event'
'''
Publishes an event to the aggregate.
Publishes a domain event to the aggregate.
Parameters:
event: The event to be published.
'''
Expand All @@ -29,14 +30,13 @@ def publish(self, event: Event):
class Aggregate(ABC):
'''
Aggregate is an abstract class that represents a collection of domain objects that are
treated as a single unit. It is responsible for maintaining the consistency of the
domain objects by applying events to them.
treated as a single unit. It is responsible for maintaining the consistency of the domain
objects by enforcing invariants and ensuring that all changes are made through
well-defined operations.
Attributes:
root: The root of the aggregate.
'''
root: Root

'''
def __init__(self, root: Root):
self.root = root

Expand All @@ -49,7 +49,7 @@ class Factory(ABC):
'''

@abstractmethod
def create(self, *args, **kwargs) -> Aggregate:
def __call__(self, *args, **kwargs) -> Aggregate:
'''
Creates a new aggregate.
'''
Expand Down
84 changes: 77 additions & 7 deletions pybondi/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,111 @@
from abc import ABC, abstractmethod
from typing import Any
from typing import Sequence
from mlbus.publisher import Publisher
from pybondi.publisher import Publisher

class Callback(ABC):
'''
Callbacks should be injected into the aggregate's methods to allow it to process
data and communicate their results to the message publisher.
'''

def __init__(self):
self.publisher = Publisher()

def bind(self, publisher: Publisher):
'''
Bind a publisher to the callback object.
'''
self.publisher = publisher

def set(self, name: str, value: Any) -> None:
'''
Set a value on the callback object.
Paramaters:
name: The name of the attribute.
value: The value to set.
'''
setattr(self, name, value)

@abstractmethod
def __call__(self, *args, **kwargs): ...
def __call__(self, *args, **kwargs):
'''
Call the callback object. Data from the aggregate's methods should be passed
to the callback object through this method, and processed accordingly.
The callback object should also communicate the results of the processing to
the message publisher directly or should implement a buffer to store the results
until the flush method is called.
'''
...

@abstractmethod
def flush(self): ...
def flush(self):
'''
Flush the callback object. If the callback object has a buffer, the buffer should
be flushed and the data should be sent to the message publisher.
'''
...

@abstractmethod
def reset(self): ...
def reset(self):
'''
Reset the callback object. The callback object should reset any internal state
that it maintains, if any.
'''
...


class Callbacks:
'''
Callbacks is a class that manages a group of callback objects. It is responsible for
calling the callback objects, flushing their buffers, and resetting their internal
state, as if they were a single callback object.
Example:
callback = Callbacks([SomeCallback(), OtherCallback())
'''

def __init__(self, callbacks: Sequence[Callback]):
self.publisher = Publisher()
self.list = list[Callback](callbacks)

def set(self, name: str, value: Any) -> None:
[setattr(callback, name, value) for callback in self.list]

def bind(self, publisher: Publisher):
'''
Bind a publisher to all the callback objects.
'''
[callback.bind(publisher) for callback in self.list]

def set(self, name: str, value: Any) -> None:
'''
Set a value to all the callback objects.
Paramaters:
name: The name of the attribute.
value: The value to set.
'''
[callback.set(name, value) for callback in self.list]


def __call__(self, *args, **kwargs):
'''
Call the callbacks. Data from the aggregate's methods should be passed
to the callback objects through this method, and processed accordingly.
'''
[callback(*args, **kwargs) for callback in self.list]

def flush(self):
'''
Flush the callbacks. If the callback objects have a buffer, the buffer should
be flushed and the data should be sent to the message publisher.
'''
[callback.flush() for callback in self.list]

def reset(self):
'''
Reset the callbacks. The callback objects should reset any internal state
that they maintain, if any.
'''
[callback.reset() for callback in self.list]
38 changes: 28 additions & 10 deletions pybondi/messagebus.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC
from abc import ABC, abstractmethod
from typing import Callable
from typing import Any
from typing import Optional
Expand All @@ -11,13 +11,19 @@ class Event(ABC):
"""
Event is an abstract base class for domain events.
"""
...


class Command(ABC):
"""
Command is a class representing a request to perform an action.
"""
...
@abstractmethod
def execute(self):
"""
Executes the command.
"""
...

class Messagebus:
"""
Expand All @@ -35,22 +41,34 @@ def __init__(self):
self.consumers = dict[type[Event], list[Callable[[Event], None]]]()
self.queue = deque[Command | Event]()

def register(self, command_type: type[Command], handler: Callable[[Command], None]):
"""
Sets a handler for a given command type. A command type can only have one handler.
Parameters:
command_type: The type of the command.
handler: The handler to be registered.
"""
self.handlers[command_type] = handler

def subscribe(self, event_type: type[Event], consumer: Callable[[Event], None]):
"""
Adds a consumer for a given event type. An event type can have multiple consumers.
Parameters:
event_type: The type of the event.
consumer: The consumer to be added.
"""
self.consumers.setdefault(event_type, []).append(consumer)

def handle(self, command: Command):
"""
Handles a given command by invoking its corresponding handler.
Handles a given command by invoking its corresponding handler
or executing it by default.
Parameters:
command: The command to be handled.
Raises:
ValueError: If no handler is found for the command type.
"""
handler = self.handlers.get(type(command), None)
if not handler:
raise ValueError(f"Command not found for message {command}")
handler(command)

command.execute() if not handler else handler(command)

def consume(self, event: Event):
"""
Expand Down
2 changes: 1 addition & 1 deletion pybondi/repository.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from mlbus.aggregate import Aggregate
from pybondi.aggregate import Aggregate

class Repository(ABC):
"""
Expand Down
Loading

0 comments on commit e25346a

Please sign in to comment.