diff --git a/README.md b/README.md index 485fb433..d95962cd 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,11 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve * Everything is included in the package for event sourcing * Based on [doctrine dbal](https://github.com/doctrine/dbal) and their ecosystem * Developer experience oriented and fully typed -* [Snapshots](docs/snapshots.md) system to quickly rebuild the aggregates -* [Pipeline](docs/pipeline.md) to build new [projections](docs/projection.md) or to migrate events -* [Scheme management](docs/store.md) and [doctrine migration](docs/store.md) support -* Dev [tools](docs/tools.md) such as a realtime event watcher -* Built in [cli commands](docs/cli.md) with [symfony](https://symfony.com/) +* [Snapshots](https://patchlevel.github.io/event-sourcing-docs/latest/snapshots/) system to quickly rebuild the aggregates +* [Pipeline](https://patchlevel.github.io/event-sourcing-docs/latest/pipeline/) to build new [projections](https://patchlevel.github.io/event-sourcing-docs/latest/projection/) or to migrate events +* [Scheme management](https://patchlevel.github.io/event-sourcing-docs/latest/store/) and [doctrine migration](https://patchlevel.github.io/event-sourcing-docs/latest/migration/) support +* Dev [tools](https://patchlevel.github.io/event-sourcing-docs/latest/watch_server/) such as a realtime event watcher +* Built in [cli commands](https://patchlevel.github.io/event-sourcing-docs/latest/cli/) with [symfony](https://symfony.com/) ## Installation @@ -26,7 +26,8 @@ composer require patchlevel/event-sourcing ## Documentation -* [Docs](https://patchlevel.github.io/event-sourcing-docs/latest) +* Latest [Docs](https://patchlevel.github.io/event-sourcing-docs/latest) +* 1.3 [Docs](https://github.com/patchlevel/event-sourcing/blob/1.3.x/README.md) ## Integration diff --git a/docs/pages/aggregate.md b/docs/pages/aggregate.md index dac9cc57..a642270d 100644 --- a/docs/pages/aggregate.md +++ b/docs/pages/aggregate.md @@ -7,7 +7,7 @@ [DDD Aggregate - Martin Flower](https://martinfowler.com/bliki/DDD_Aggregate.html) -An AggregateRoot has to inherit from `AggregateRoot` and need to implement the method `aggregateRootId`. +An Aggregate has to inherit from `AggregateRoot` and need to implement the method `aggregateRootId`. `aggregateRootId` is the identifier from `AggregateRoot` like a primary key for an entity. The events will be added later, but the following is enough to make it executable: @@ -29,7 +29,7 @@ final class Profile extends AggregateRoot return $this->id; } - public static function create(string $id): self + public static function register(string $id): self { $self = new self(); // todo: record create event @@ -43,13 +43,13 @@ final class Profile extends AggregateRoot The aggregate is not yet finished and has only been built to the point that you can instantiate the object. -!!! note +!!! tip An aggregateId can be an **uuid**, you can find more about this [here](./uuid.md). We use a so-called named constructor here to create an object of the AggregateRoot. The constructor itself is protected and cannot be called from outside. -But it is possible to define different named constructors for different use-cases like `createFromRegistration`. +But it is possible to define different named constructors for different use-cases like `import`. After the basic structure for an aggregate is in place, it could theoretically be saved: @@ -58,16 +58,13 @@ use Patchlevel\EventSourcing\Repository\Repository; final class CreateProfileHandler { - private Repository $profileRepository; - - public function __construct(Repository $profileRepository) - { - $this->profileRepository = $profileRepository; - } + public function __construct( + private readonly Repository $profileRepository + ) {} public function __invoke(CreateProfile $command): void { - $profile = Profile::create($command->id()); + $profile = Profile::register($command->id()); $this->profileRepository->save($profile); } @@ -88,13 +85,13 @@ final class CreateProfileHandler ## Create a new aggregate In order that an aggregate is actually saved, at least one event must exist in the DB. -For our aggregate we create the Event `ProfileCreated`: +For our aggregate we create the Event `ProfileRegistered`: ```php use Patchlevel\EventSourcing\Attribute\Event; -#[Event('profile.created')] -final class ProfileCreated +#[Event('profile.registered')] +final class ProfileRegistered { public function __construct( public readonly string $profileId, @@ -130,16 +127,16 @@ final class Profile extends AggregateRoot return $this->name; } - public static function create(string $id, string $name): self + public static function register(string $id, string $name): self { $self = new self(); - $self->recordThat(new ProfileCreated($id, $name)); + $self->recordThat(new ProfileRegistered($id, $name)); return $self; } #[Apply] - protected function applyProfileCreated(ProfileCreated $event): void + protected function applyProfileRegistered(ProfileRegistered $event): void { $this->id = $event->profileId; $this->name = $event->name; @@ -151,13 +148,13 @@ final class Profile extends AggregateRoot Prefixing the apply methods with "apply" improves readability. -In our named constructor `create` we have now created the event and recorded it with the method `record`. -The aggregate remembers all recorded events in order to save them later. +In our named constructor `register` we have now created the event and recorded it with the method `recordThat`. +The aggregate remembers all new recorded events in order to save them later. At the same time, a defined apply method is executed directly so that we can change our state. So that the AggregateRoot also knows which method it should call, -we have to provide it with the `Apply` [attributes](https://www.php.net/manual/en/language.attributes.overview.php). -We did that in the `applyProfileCreated` method. +we have to mark it with the `Apply` [attributes](https://www.php.net/manual/en/language.attributes.overview.php). +We did that in the `applyProfileRegistered` method. In this method we change the `Profile` properties `id` and `name` with the transferred values. ### Modify an aggregate @@ -206,10 +203,10 @@ final class Profile extends AggregateRoot return $this->name; } - public static function create(string $id, string $name): static + public static function register(string $id, string $name): static { $self = new static(); - $self->recordThat(new ProfileCreated($id, $name)); + $self->recordThat(new ProfileRegistered($id, $name)); return $self; } @@ -220,7 +217,7 @@ final class Profile extends AggregateRoot } #[Apply] - protected function applyProfileCreated(ProfileCreated $event): void + protected function applyProfileRegistered(ProfileRegistered $event): void { $this->id = $event->profileId; $this->name = $event->name; @@ -276,6 +273,7 @@ The `applyNameChanged` method was also called again internally to adjust the sta When the `save` method is called on the repository, all newly recorded events are then fetched and written to the database. +In this specific case only the `NameChanged` changed event. ## Multiple apply attributes on the same method @@ -461,10 +459,10 @@ final class Profile extends AggregateRoot private string $id; private Name $name; - public static function create(string $id, Name $name): static + public static function register(string $id, Name $name): static { $self = new static(); - $self->recordThat(new ProfileCreated($id, $name)); + $self->recordThat(new ProfileRegistered($id, $name)); return $self; } @@ -528,7 +526,7 @@ use Patchlevel\EventSourcing\Attribute\Aggregate; use Patchlevel\EventSourcing\Attribute\Apply; use Patchlevel\EventSourcing\Attribute\SuppressMissingApply; -#[Aggregate('profile')] +#[Aggregate('hotel')] #[SuppressMissingApply([FullyBooked::class])] final class Hotel extends AggregateRoot { @@ -559,6 +557,72 @@ final class Hotel extends AggregateRoot } ``` +## Working with dates + +An aggregate should always be deterministic. In other words, whenever I execute methods on the aggregate, +I always get the same result. This also makes testing much easier. + +But that often doesn't seem to be possible, e.g. if you want to save a createAt date. +But you can pass this information by yourself. + +```php +use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Apply; + +#[Aggregate('profile')] +final class Profile extends AggregateRoot +{ + private string $id; + private Name $name; + private DateTimeImmutable $registeredAt; + + public static function register(string $id, string $name, DateTimeImmutable $registeredAt): static + { + $self = new static(); + $self->recordThat(new ProfileRegistered($id, $name, $registeredAt)); + + return $self; + } + + // ... +} +``` + +But if you still want to make sure that the time is "now" and not in the past or future, you can pass a clock. + +```php +use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Apply; +use Patchlevel\EventSourcing\Clock\Clock; + +#[Aggregate('profile')] +final class Profile extends AggregateRoot +{ + private string $id; + private Name $name; + private DateTimeImmutable $registeredAt; + + public static function register(string $id, string $name, Clock $clock): static + { + $self = new static(); + $self->recordThat(new ProfileRegistered($id, $name, $clock->now())); + + return $self; + } + + // ... +} +``` + +Now you can pass the `SystemClock` to determine the current time. +Or for test purposes the `FrozenClock`, which always returns the same time. + +!!! note + + You can find out more about clock [here](./clock.md). + ## Aggregate Root Registry The library needs to know about all aggregates so that the correct aggregate class is used to load from the database. diff --git a/docs/pages/cli.md b/docs/pages/cli.md index 216a0602..93d108dc 100644 --- a/docs/pages/cli.md +++ b/docs/pages/cli.md @@ -45,6 +45,17 @@ The creation, deletion and rebuilding of the projections is also possible via th The [pipeline](./pipeline.md) will be used to rebuild the projection. +## Outbox commands + +Interacting with the outbox store is also possible via the cli. + +* OutboxInfoCommand: `event-sourcing:outbox:info` +* OutboxConsumeCommand: `event-sourcing:outbox:consume` + +!!! note + + You can find out more about outbox [here](outbox.md). + ## CLI example A cli php file can look like this: diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md index 9db6b977..bd971aba 100644 --- a/docs/pages/event_bus.md +++ b/docs/pages/event_bus.md @@ -3,7 +3,8 @@ This library uses the core principle called [event bus](https://martinfowler.com/articles/201701-event-driven.html). For all events that are persisted (when the `save` method has been executed on the [repository](./repository.md)), -the event wrapped in a message will be dispatched to the `event bus`. All listeners are then called for each event/message. +the event wrapped in a message will be dispatched to the `event bus`. All listeners are then called for each +event/message. ## Message @@ -15,27 +16,60 @@ A message contains the following information: * playhead * event * recorded on +* custom headers Each event is packed into a message and dispatched using the event bus. ```php +use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\EventBus\Message; -$event = new NameChanged('foo'); -$message = new Message( - Profile::class, // aggregate class - 'bca7576c-536f-4428-b694-7b1f00c714b7', // aggregate id - 2, // playhead - $event // event object -); +$clock = SystemClock(); +$message = Message::create(new NameChanged('foo')) + ->withAggregateClass(Profile::class) + ->withAggregateId('bca7576c-536f-4428-b694-7b1f00c714b7') + ->withPlayhead(2) + ->withRecordedOn($clock->now()); $eventBus->dispatch($message); ``` -You don't have to create the message yourself, -it is automatically created, saved and dispatched in the repository. +!!! note + + The message object is immutable. + +You don't have to create the message yourself, +it is automatically created, saved and dispatched in the [repository](repository.md). + +### Custom headers + +You can also enrich your own header or metadata information. +This information is then accessible in the message object and is also stored in the database. + +```php +use Patchlevel\EventSourcing\EventBus\Message; + +$message = Message::create(new NameChanged('foo')) + // ... + ->withCustomHeader('application-id', 'app'); +``` + +!!! note + + You can read about how to pass additional headers to the message object in the [message decorator](message_decorator.md) docs. + +You can also access your custom headers. + +```php +use Patchlevel\EventSourcing\EventBus\Message; + +$message->customHeader('application-id'); // app +$message->customHeaders(); // ['application-id' => 'app'] +``` + +## Event Bus -## Default event bus +### Default event bus The library also delivers a light-weight event bus. This can only register listener and dispatch events. @@ -53,10 +87,10 @@ $eventBus->addListener($projectionListener); you can also add listeners after `ProjectionListener` to access the [projections](./projection.md). -## Symfony event bus +### Symfony event bus -You can also use the [symfony message bus](https://symfony.com/doc/current/components/messenger.html) -which is much more powerful. +You can also use the [symfony message bus](https://symfony.com/doc/current/components/messenger.html) +which is much more powerful. To use the optional symfony messenger you first have to `install` the packet. diff --git a/docs/pages/events.md b/docs/pages/events.md index 23f88e6b..8a9a9373 100644 --- a/docs/pages/events.md +++ b/docs/pages/events.md @@ -46,7 +46,7 @@ final class ProfileCreated So that the events can be saved in the database, they must be serialized and deserialized. That's what the serializer is for. -The library comes with a `JsonSerializer` that can be given further instructions using attributes. +The library comes with a `DefaultEventSerializer` that can be given further instructions using attributes. ```php use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 69a66fe5..542dfa94 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -3,14 +3,6 @@ In our little getting started example, we manage hotels. We keep the example small, so we can only create hotels and let guests check in and check out. -- [x] Create some events -- [x] Define an aggregate root -- [x] Create projections -- [x] Add a processor -- [x] Configure your application -- [x] Setup database -- [x] Play with your domain - ## Define some events First we define the events that happen in our system. @@ -274,7 +266,7 @@ use Doctrine\DBAL\DriverManager; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Projection\MetadataAwareProjectionHandler; use Patchlevel\EventSourcing\Projection\ProjectionListener; -use Patchlevel\EventSourcing\Repository\DefaultRepository; +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\SingleTableStore; @@ -303,7 +295,13 @@ $store = new SingleTableStore( 'eventstore' ); -$hotelRepository = new DefaultRepository($store, $eventBus, Hotel::class); +$repositoryManager = new DefaultRepositoryManager( + $aggregateRegistry, + $store, + $eventBus +); + +$repository = $repositoryManager->get(Hotel::class); ``` !!! note diff --git a/docs/pages/message_decorator.md b/docs/pages/message_decorator.md index 1d9380c6..6b9c89d0 100644 --- a/docs/pages/message_decorator.md +++ b/docs/pages/message_decorator.md @@ -2,22 +2,21 @@ There are usecases where you want to add some extra context to your events like metadata which is not directly relevant for your domain. With `MessageDecorator` we are providing a solution to add this metadata to your events. The metadata -will also be persisted in the database and can be retrieved later on. We are internally using this to save the point of -time the event is recorded. Here is the code from this message decorator. +will also be persisted in the database and can be retrieved later on. + +## Create own decorator + +You can also use this feature to add your own metadata to your events. For this the have an extra methods on `Message` +to add data `withCustomHeader` and to read this data later on `customHeader`. ```php -use Patchlevel\EventSourcing\Clock\Clock; use Patchlevel\EventSourcing\EventBus\Message; -final class RecordedOnDecorator implements MessageDecorator +final class OnSystemRecordedDecorator implements MessageDecorator { - public function __construct(private readonly Clock $clock) - { - } - public function __invoke(Message $message): Message { - return $message->withRecordedOn($this->clock->now()); + return $message->withCustomHeader('system', 'accounting_system'); } } ``` @@ -26,24 +25,41 @@ final class RecordedOnDecorator implements MessageDecorator The Message is immutable, for more information look up [here](event_bus.md#message). -You can also use this feature to add your own metadata to your events. For this the have an extra methods on `Message` -to add data `withCustomHeader` and to read this data later on `customHeader`. +!!! tip + + You can also set multiple headers with `withCustomHeaders` which expects an hashmap. + +## Use own decorator + +To use your own message decorator, you have to pass it to the `DefaultRepositoryManager`. ```php -use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\EventBus\Decorator\ChainMessageDecorator; +use Patchlevel\EventSourcing\EventBus\Decorator\RecordedOnDecorator; +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; -final class OnSystemRecordedDecorator implements MessageDecorator -{ - public function __invoke(Message $message): Message - { - return $message->withCustomHeader('system', 'accounting_system'); - } -} +$decorator = new ChainMessageDecorator([ + new RecordedOnDecorator($clock), + new OnSystemRecordedDecorator() +]); + +$repositoryManager = new DefaultRepositoryManager( + $aggregateRootRegistry, + $store, + $eventBus, + null, + $decorator +); + +$repository = $repositoryManager->get(Profile::class); ``` -!!! tip +!!! warning - You can also set multiple headers with `withCustomHeaders` which expects an hashmap. + We also use the decorator to fill in the `recordedOn` time. + If you want to add your own decorator, then you need to make sure to add the `RecordedOnDecorator` as well. + You can e.g. solve with the `ChainMessageDecorator`. -## Adding a message decorator +!!! note + You can find out more about repository [here](repository). \ No newline at end of file diff --git a/docs/pages/outbox.md b/docs/pages/outbox.md index 59d67e17..6bd41988 100644 --- a/docs/pages/outbox.md +++ b/docs/pages/outbox.md @@ -1,3 +1,78 @@ # Outbox -// Todo \ No newline at end of file +There is the problem that errors can occur when saving an aggregate or in the individual event listeners. +This means that you either saved an aggregate, but an error occurred in the email listener, so that no email went out. +Or that an email was sent but the aggregate could not be saved. + +Both cases are very bad and can only be solved if both the saving of an aggregate +and the dispatching of the events are in a transaction. + +The best way to ensure this is to store the events to be dispatched together +with the aggregate in a transaction in the same database. + +After the transaction becomes successful, the events can be loaded from the outbox table with a worker +and then dispatched into the correct event bus. As soon as the events have been dispatched, +they are deleted from the outbox table. If an error occurs when dispatching, the whole thing will be retrieved later. + +## Configuration + +First you have to replace the correct event bus with an outbox event bus. +This stores the events to be dispatched in the database. + +```php +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; +use Patchlevel\EventSourcing\Outbox\OutboxEventBus; + +$outboxEventBus = new OutboxEventBus($store); + +$repositoryManager = new DefaultRepositoryManager( + $aggregateRootRegistry, + $store, + $outboxEventBus +); +``` + +And then you have to define the consumer. This gets the right event bus. +It is used to load the events to be dispatched from the database, dispatch the events and then empty the outbox table. + +```php +$consumer = new StoreOutboxConsumer($store, $realEventBus); +$consumer->consume(); +``` + +## Using outbox + +So that this is also executed in a transaction, you have to make sure that a transaction has also been started. + +```php +$store->transactional(function () use ($command, $profileRepository) { + $profile = Profile::register( + $command->id(), + $command->email() + ); + + $profileRepository->save($profile); +}); +``` + +!!! note + + You can find out more about transaction [here](store.md#transaction). + +You can also interact directly with the outbox store. + +```php +$store->saveOutboxMessage($message); +$store->markOutboxMessageConsumed($message); + +$store->retrieveOutboxMessages(); +$store->countOutboxMessages() +``` + +!!! note + + Both single table store and multi table store implement the outbox store. + +!!! tip + + Interacting with the outbox store is also possible via the [cli](cli.md). \ No newline at end of file diff --git a/docs/pages/pipeline.md b/docs/pages/pipeline.md index ec91531b..17964a0c 100644 --- a/docs/pages/pipeline.md +++ b/docs/pages/pipeline.md @@ -178,6 +178,29 @@ $target = new InMemoryTarget(); $messages = $target->messages(); ``` +### Custom Target + +You can also define your own target. To do this, you need to implement the `Target` interface. + +```php +use Patchlevel\EventSourcing\EventBus\Message; + +final class OtherStoreTarget implements Target +{ + private OtherStore $store; + + public function __construct(OtherStore $store) + { + $this->store = $store; + } + + public function save(Message $message): void + { + $this->store->save($message); + } +} +``` + ## Middlewares Middelwares can be used to manipulate, delete or expand messages or events during the process. @@ -303,3 +326,55 @@ $middleware = new ChainMiddleware([ new RecalculatePlayheadMiddleware() ]); ``` + +### Custom middleware + +You can also write a custom middleware. The middleware gets a message and can return `N` messages. +There are the following possibilities: + +* Return only the message to an array to leave it unchanged. +* Put another message in the array to swap the message. +* Return an empty array to remove the message. +* Or return multiple messages to enrich the stream. + +In our case, the domain has changed a bit. +In the beginning we had a `ProfileCreated` event that just created a profile. +Now we have a `ProfileRegistered` and a `ProfileActivated` event, +which should replace the `ProfileCreated` event. + +```php +use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware; + +final class SplitProfileCreatedMiddleware implements Middleware +{ + public function __invoke(Message $message): array + { + $event = $message->event(); + + if (!$event instanceof ProfileCreated) { + return [$message]; + } + + $profileRegisteredMessage = Message::createWithHeaders( + new ProfileRegistered($event->id(), $event->name()), + $message->headers() + ); + + $profileActivatedMessage = Message::createWithHeaders( + new ProfileActivated($event->id()), + $message->headers() + ); + + return [$profileRegisteredMessage, $profileActivatedMessage]; + } +} +``` + +!!! warning + + Since we changed the number of messages, we have to recalculate the playhead. + +!!! note + + You can find more about messages [here](event_bus.md). \ No newline at end of file diff --git a/docs/pages/projection.md b/docs/pages/projection.md index 2474db6c..192b18c2 100644 --- a/docs/pages/projection.md +++ b/docs/pages/projection.md @@ -2,10 +2,10 @@ With `projections` you can create your data optimized for reading. projections can be adjusted, deleted or rebuilt at any time. -This is possible because the source of truth remains untouched +This is possible because the source of truth remains untouched and everything can always be reproduced from the events. -The target of a projection can be anything. +The target of a projection can be anything. Either a file, a relational database, a no-sql database like mongodb or an elasticsearch. ## Define Projection @@ -62,13 +62,18 @@ final class ProfileProjection implements Projection You should not execute any actions with projections, otherwise these will be executed again if you rebuild the projection! +!!! tip + + If you are using psalm then you can install the event sourcing [plugin](https://github.com/patchlevel/event-sourcing-psalm-plugin) + to make the event method return the correct type. + Projections have a `create` and a `drop` method that is executed when the projection is created or deleted. In some cases it may be that no schema has to be created for the projection, as the target does it automatically. -In order for the projection to know which method is responsible for which event, +In order for the projection to know which method is responsible for which event, the methods must be given the `Handle` attribute with the respective event class name. -As soon as the event has been dispatched, the appropriate methods are then executed. +As soon as the event has been dispatched, the appropriate methods are then executed. Several projections can also listen to the same event. ## Register projections @@ -98,8 +103,8 @@ $eventBus->addListener(new ProjectionListener($projectionHandler)); ## Setup Projection -A projection schama or database usually has to be created beforehand. -And with a rebuild, the projection has to be deleted. +A projection schama or database usually has to be created beforehand. +And with a rebuild, the projection has to be deleted. To make this possible, projections have two methods `create` and `drop` that can be defined and executed. ### Create Projection Schema diff --git a/docs/pages/repository.md b/docs/pages/repository.md index 6a5a256a..cf559f99 100644 --- a/docs/pages/repository.md +++ b/docs/pages/repository.md @@ -1,25 +1,22 @@ # Repository A `repository` takes care of storing and loading the `aggregates`. -The [design pattern](https://martinfowler.com/eaaCatalog/repository.html) of the same name is also used. +He is also responsible for building [messages](event_bus.md) from the events and then dispatching them to the event bus. Every aggregate needs a repository to be stored. And each repository is only responsible for one aggregate. -## Create +## Create a repository -We offer two implementations. One is a `DefaultRepository` that only reads or writes the data from one store. -And a `SnapshotRepository` that holds a state of the aggregate in a cache -so that loading and rebuilding of the aggregate is faster. +The best way to create a repository is to use the `DefaultRepositoryManager`. +This helps to build the repository correctly. -Both repositories implement the `Repository` interface. -This interface can be used for the typehints so that a change is possible at any time. +The `DefaultRepositoryManager` needs some services to work. +For one, it needs [AggregateRootRegistry](aggregate.md#aggregate-root-registry) so that it knows which aggregates exist. +The [store](store.md), which is then given to the repository so that it can save and load the events at the end. +And the [EventBus](event_bus.md) to publish the new events. -### Default Repository - -The default repository acts directly with the `store` and therefore needs one. -The [event bus](./event_bus.md) is used as a further parameter to dispatch new events. -Finally, the `aggregate` class is needed, which aggregates the repository should take care of. +After plugging the `DefaultRepositoryManager` together, you can create the repository associated with the aggregate. ```php use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; @@ -35,23 +32,30 @@ $repository = $repositoryManager->get(Profile::class); !!! note - You can find out more about stores [here](./store.md) + The same repository instance is always returned for a specific aggregate. + +### Snapshots -### Default Repository Manager +Loading events for an aggregate is superfast. +You can have thousands of events in the database that load in a few milliseconds and build the corresponding aggregate. -// todo +But at some point you realize that it takes time. To counteract this there is a snapshot store. ```php use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; +use Patchlevel\EventSourcing\Snapshot\Adapter\Psr16SnapshotAdapter; use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; -$snapshot = new DefaultSnapshotStore([/* adapters */]); +$adapter = new Psr16SnapshotAdapter($cache); +$snapshotStore = new DefaultSnapshotStore([ + 'default' => $adapter +]); $repositoryManager = new DefaultRepositoryManager( $aggregateRootRegistry, $store, $eventBus, - $snapshot + $snapshotStore ); $repository = $repositoryManager->get(Profile::class); @@ -59,14 +63,45 @@ $repository = $repositoryManager->get(Profile::class); !!! note - You can find out more about snapshots [here](./snapshots.md) + You can find out more about snapshots [here](snapshots.md). + +### Decorator + +If you want to add more metadata to the message, like e.g. an application id, then you can use decorator. + +```php +use Patchlevel\EventSourcing\EventBus\Decorator\RecordedOnDecorator; +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; + +$decorator = new RecordedOnDecorator($clock); -## Usage +$repositoryManager = new DefaultRepositoryManager( + $aggregateRootRegistry, + $store, + $eventBus, + null, + $decorator +); + +$repository = $repositoryManager->get(Profile::class); +``` + +!!! warning + + We also use the decorator to fill in the `recordedOn` time. + If you want to add your own decorator, then you need to make sure to add the `RecordedOnDecorator` as well. + You can e.g. solve with the `ChainMessageDecorator`. + +!!! note + + You can find out more about message decorator [here](message_decorator.md). + +## Use the repository Each `repository` has three methods that are responsible for loading an `aggregate`, saving it or checking whether it exists. -### Save +### Save an aggregate An `aggregate` can be `saved`. All new events that have not yet been written to the database are fetched from the aggregate. @@ -84,7 +119,12 @@ $repository->save($profile); All events are written to the database with one transaction in order to ensure data consistency. -### Load +!!! tip + + If you want to make sure that dispatching events and storing events is transaction safe, + then you should look at the [outbox](outbox.md) pattern. + +### Load an aggregate An `aggregate` can be loaded using the `load` method. All events for the aggregate are loaded from the database and the current state is rebuilt. @@ -93,12 +133,16 @@ All events for the aggregate are loaded from the database and the current state $profile = $repository->load('229286ff-6f95-4df6-bc72-0a239fe7b284'); ``` +!!! warning + + When the method is called, the aggregate is always reloaded from the database and rebuilt. + !!! note You can only fetch one aggregate at a time and don't do any complex queries either. Projections are used for this purpose. -### Has +### Has an aggregate You can also check whether an `aggregate` with a certain id exists. It is checked whether any event with this id exists in the database. @@ -114,7 +158,6 @@ if($repository->has('229286ff-6f95-4df6-bc72-0a239fe7b284')) { The query is fast and does not load any event. This means that the state of the aggregate is not rebuild either. - ## Custom Repository In clean code you want to have explicit type hints for the repositories diff --git a/docs/pages/snapshots.md b/docs/pages/snapshots.md index dfc4e62b..cd1f70cf 100644 --- a/docs/pages/snapshots.md +++ b/docs/pages/snapshots.md @@ -15,65 +15,110 @@ Here, however, only the last events are loaded from the database and not all. ## Configuration -To use the snapshot system, the `SnapshotRepository` must be used. -In addition, a `SnapshotStore` must then be given. +First of all you have to define a snapshot store. This store may have multiple adapters for different caches. +These caches also need a name so that you can determine which aggregates should be stored in which cache. ```php -use Patchlevel\EventSourcing\Repository\DefaultRepository; +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Snapshot\Adapter\Psr16SnapshotAdapter; use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; -$adapter = new Psr16SnapshotAdapter($cache); $snapshotStore = new DefaultSnapshotStore([ - 'default' => $adapter + 'default' => new Psr16SnapshotAdapter($defaultCache), + 'other_cache' => new Psr16SnapshotAdapter($otherCache), ]); +``` + +After creating the snapshot store, you need to pass that store to the DefaultRepositoryManager. + +```php +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; -$repository = new DefaultRepository($store, $eventStream, Profile::class, $snapshotStore); +$snapshotStore = // ... + +$repositoryManager = new DefaultRepositoryManager( + $aggregateRootRegistry, + $store, + $eventBus, + $snapshotStore +); ``` !!! note You can read more about Repository [here](./repository.md). -So that the state can also be cached, the aggregate must be taught how to `serialize` and `deserialize` its state. -To do this, the aggregate must inherit from the `SnapshotableAggregateRoot` -instead of the `AggregateRoot` and implement the necessary methods. +Next we need to tell the Aggregate to take a snapshot of it. We do this using the snapshot attribute. +There we also specify where it should be saved. ```php -use Patchlevel\EventSourcing\Aggregate\SnapshotableAggregateRoot; +use Patchlevel\EventSourcing\Aggregate\AggregateRoot; use Patchlevel\EventSourcing\Attribute\Aggregate; use Patchlevel\EventSourcing\Attribute\Snapshot; #[Aggregate('profile')] #[Snapshot('default')] -final class Profile extends SnapshotableAggregateRoot +final class Profile extends AggregateRoot { // ... +} +``` + +When taking a snapshot, all properties are extracted and saved. +When loading, this data is written back to the properties. +In other words, in the end everything has to be serializable. +To ensure this, the same system is used as for the events. +You can define normalizers to bring the properties into the correct format. + +```php +use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Snapshot; + +#[Aggregate('profile')] +#[Snapshot('default')] +final class Profile extends AggregateRoot +{ + public string $id; + public string $name, + #[Normalize(new DateTimeImmutableNormalizer())] + public DateTimeImmutable $createdAt; - protected function serialize(): array - { - return [ - 'id' => $this->id, - ]; - } - - protected static function deserialize(array $payload): static - { - $self = new static(); - $self->id = $payload['id']; - - return $self; - } + // ... } ``` +!!! danger + + If anything changes in the properties of the aggregate, then the cache must be cleared! + !!! warning In the end it has to be possible to serialize it as json. -## Batch +!!! note + + You can find more about normalizer [here](normalizer.md). + +### Snapshot batching + +Since the loading of events in itself is quite fast and only becomes noticeably slower with thousands of events, +we do not need to create a snapshot after each event. That would also have a negative impact on performance. +Instead, we can also create a snapshot after `N` events. +The remaining events that are not in the snapshot are then loaded from store. + +```php +use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Snapshot; -// Todo +#[Aggregate('profile')] +#[Snapshot('default', batch: 1000)] +final class Profile extends AggregateRoot +{ + // ... +} +``` ## Adapter diff --git a/docs/pages/store.md b/docs/pages/store.md index fd3a1e14..308acb20 100644 --- a/docs/pages/store.md +++ b/docs/pages/store.md @@ -1,7 +1,7 @@ # Store -In the end, the events/messages have to be saved somewhere. -The library is based on [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) +In the end, the events/messages have to be saved somewhere. +The library is based on [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) and offers two different store strategies. But it is also possible to develop your own store by implementing the `Store` interface. @@ -29,8 +29,8 @@ We offer two store strategies that you can choose as you like. ### Single Table Store -With the `SingleTableStore` everything is saved in one table. -The dbal connection is needed, a mapping of the aggregate class and aggregate name +With the `SingleTableStore` everything is saved in one table. +The dbal connection is needed, a mapping of the aggregate class and aggregate name and, last but not least, the table name. ```php @@ -54,9 +54,9 @@ $store = new SingleTableStore( ### Multi Table Store -With the `MultiTableStore` a separate table is created for each aggregate type. -In addition, a meta table is created by referencing all events in the correct order. -The dbal connection is needed, a mapping of the aggregate class and table name +With the `MultiTableStore` a separate table is created for each aggregate type. +In addition, a meta table is created by referencing all events in the correct order. +The dbal connection is needed, a mapping of the aggregate class and table name and, last but not least, the table name for the metadata. ```php @@ -80,7 +80,51 @@ $store = new MultiTableStore( ## Transaction -// TODO +Our stores also implement the `TransactionStore` interface. +This allows you to combine several aggregate interactions in one transaction +and thus ensure that everything is saved together or none of it. + +Since the library is based on doctrine dbal, our implementation is just a proxy. + +!!! note + + You can find more about dbal transaction [here](https://www.doctrine-project.org/projects/doctrine-dbal/en/latest/reference/transactions.html). + +### Begin transaction + +```php +$store->transactionBegin(); +``` + +### Commit transaction + +```php +$store->transactionCommit(); +``` + +### Rollback transaction + +```php +$store->transactionRollback(); +``` + +### Transactional function + +There is also the possibility of executing a function in a transaction. +Then dbal takes care of starting a transaction, committing it and then possibly rollback it again. + +```php +$store->transactional(function () use ($command, $bankAccountRepository) { + $accountFrom = $bankAccountRepository->get($command->from()); + $accountTo = $bankAccountRepository->get($command->to()); + + $accountFrom->transferMoney($command->to(), $command->amount()); + $accountTo->receiveMoney($command->from(), $command->amount()); + + $bankAccountRepository->save($accountFrom); + $bankAccountRepository->save($accountTo); +}); +``` !!! tip diff --git a/docs/pages/upcasting.md b/docs/pages/upcasting.md index 84e6bd8b..dfec7fad 100644 --- a/docs/pages/upcasting.md +++ b/docs/pages/upcasting.md @@ -49,7 +49,9 @@ use Patchlevel\EventSourcing\Serializer\Upcast\Upcast; final class LegacyEventNameUpaster implements Upcaster { - public function __construct(private readonly EventRegistry $eventRegistry){} + public function __construct( + private readonly EventRegistry $eventRegistry + ){} public function __invoke(Upcast $upcast): Upcast { @@ -58,6 +60,26 @@ final class LegacyEventNameUpaster implements Upcaster } ``` +## Use upcasting + +After we have defined the upcasting rules, we also have to pass the whole thing to the serializer. +Since we have multiple upcasters, we use a chain here. + +```php +use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; +use Patchlevel\EventSourcing\Serializer\Upcast\UpcasterChain; + +$upcaster = new UpcasterChain([ + new ProfileCreatedEmailLowerCastUpcaster(), + new LegacyEventNameUpaster($eventRegistry) +]); + +$serializer = DefaultEventSerializer::createFromPaths( + ['src/Domain'], + $upcaster +); +``` + ## Update event stream But what if we need it also in our stream because some other applications has also access on it? Or want to cleanup our @@ -79,7 +101,11 @@ final class EventStreamCleanupCommand extends Command protected function execute(InputInterface $input, OutputInterface $output): int { - $pipeline = new Pipeline(new StoreSource($sourceStore), new StoreTarget($targetStore)); + $pipeline = new Pipeline( + new StoreSource($sourceStore), + new StoreTarget($targetStore) + ); + $pipeline->run(); } ``` diff --git a/docs/pages/watch_server.md b/docs/pages/watch_server.md index 17474f46..1472dfba 100644 --- a/docs/pages/watch_server.md +++ b/docs/pages/watch_server.md @@ -12,7 +12,7 @@ The watch client and the listener are used to send all events that are saved to use Patchlevel\EventSourcing\WatchServer\SocketWatchServerClient; use Patchlevel\EventSourcing\WatchServer\WatchListener; -$watchServerClient = new SocketWatchServerClient('127.0.0.1:5000', $serializer); +$watchServerClient = new SocketWatchServerClient('127.0.0.1:5000', $eventSerializer); $watchListener = new WatchListener($watchServerClient); ``` @@ -30,7 +30,7 @@ As soon as you execute `start`, the server will be started until you terminate t use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\WatchServer\SocketWatchServer; -$watchServer = new SocketWatchServer('127.0.0.1:5000', $serializer); +$watchServer = new SocketWatchServer('127.0.0.1:5000', $eventSerializer); $watchServer->listen( function (Message $message) { var_dump($message); @@ -55,7 +55,7 @@ use Symfony\Component\Console\Application; $cli = new Application('Event-Sourcing CLI'); $cli->setCatchExceptions(true); -$watchServer = new SocketWatchServer('127.0.0.1:5000', $serializer); +$watchServer = new SocketWatchServer('127.0.0.1:5000', $eventSerializer); $command = new WatchCommand($watchServer); $cli->addCommands([