Skip to content

Commit

Permalink
Merge pull request #637 from patchlevel/subscription-engine-filter-ev…
Browse files Browse the repository at this point in the history
…ents

filter events in subscription engine
  • Loading branch information
DavidBadura authored Dec 12, 2024
2 parents 4cca271 + be1020b commit 91dabc1
Show file tree
Hide file tree
Showing 17 changed files with 271 additions and 30 deletions.
1 change: 1 addition & 0 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ deptrac:
- Attribute
- Clock
- Message
- MetadataEvent
- MetadataSubscriber
- Repository
- Schema
Expand Down
1 change: 1 addition & 0 deletions docs/pages/our-backward-compatibility-promise.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ In our docs the features are marked like this:

This feature is still experimental and may change in the future.
Use it with caution.

3 changes: 3 additions & 0 deletions docs/pages/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;

$criteria = new Criteria(
Expand All @@ -329,6 +330,7 @@ $criteria = new Criteria(
new FromPlayheadCriterion(2),
new FromIndexCriterion(100),
new ArchivedCriterion(true),
new EventsCriterion(['profile.created', 'profile.name_changed']),
);
```
Or you can the criteria builder to create the criteria.
Expand All @@ -342,6 +344,7 @@ $criteria = (new CriteriaBuilder())
->fromPlayhead(2)
->fromIndex(100)
->archived(true)
->events(['profile.created', 'profile.name_changed'])
->build();
```
#### Stream
Expand Down
46 changes: 42 additions & 4 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,44 @@ There are two options here:

In order for the subscription engine to be able to do its work, you have to assemble it beforehand.

### Message Loader

The subscription engine needs a message loader to load the messages.
We provide two implementations by default.
Which one has a better performance depends on the use case.

#### Store Message Loader

The store message loader loads all the messages from the event store.

```php
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\StoreMessageLoader;

/** @var Store $store */
$messageLoader = new StoreMessageLoader($store);
```
#### Event Filtered Store Message Loader

The event filtered store message loader loads only the messages that are relevant for the subscribers.
It looks before loading the messages which subscribers are interested in the events.
Then it loads with a filter only the relevant messages.

```php
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader;

/**
* @var Store $store
* @var EventMetadataFactory $eventMetadataFactory
* @var SubscriberRepository $subscriberRepository
*/
$messageLoader = new EventFilteredStoreMessageLoader(
$store,
$eventMetadataFactory,
$subscriberRepository,
);
```
### Subscription Store

The Subscription Engine uses a subscription store to store the status of each subscription.
Expand Down Expand Up @@ -730,24 +768,24 @@ $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([
### Subscription Engine

Now we can create the subscription engine and plug together the necessary services.
The event store is needed to load the events, the Subscription Store to store the subscription state
The message loader is needed to load the messages, the Subscription Store to store the subscription state
and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy.

```php
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;

/**
* @var Store $eventStore
* @var MessageLoader $messageLoader
* @var DoctrineSubscriptionStore $subscriptionStore
* @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository
* @var NoRetryStrategy $retryStrategy
*/
$subscriptionEngine = new DefaultSubscriptionEngine(
$eventStore,
$messageLoader,
$subscriptionStore,
$subscriberAccessorRepository,
$retryStrategy,
Expand Down
15 changes: 15 additions & 0 deletions src/Store/Criteria/CriteriaBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ final class CriteriaBuilder
private int|null $fromPlayhead = null;
private bool|null $archived = null;

/** @var list<string>|null */
private array|null $events = null;

/** @experimental */
public function streamName(string|null $streamName): self
{
Expand Down Expand Up @@ -56,6 +59,14 @@ public function archived(bool|null $archived): self
return $this;
}

/** @param list<string>|null $events */
public function events(array|null $events): self
{
$this->events = $events;

return $this;
}

public function build(): Criteria
{
$criteria = [];
Expand Down Expand Up @@ -84,6 +95,10 @@ public function build(): Criteria
$criteria[] = new ArchivedCriterion($this->archived);
}

if ($this->events !== null) {
$criteria[] = new EventsCriterion($this->events);
}

return new Criteria(...$criteria);
}
}
14 changes: 14 additions & 0 deletions src/Store/Criteria/EventsCriterion.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Criteria;

final class EventsCriterion
{
public function __construct(
/** @var list<string> */
public readonly array $events,
) {
}
}
6 changes: 6 additions & 0 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -26,6 +27,7 @@
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use PDO;
Expand Down Expand Up @@ -156,6 +158,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
6 changes: 6 additions & 0 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -24,6 +25,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
Expand Down Expand Up @@ -169,6 +171,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
30 changes: 13 additions & 17 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
Expand Down Expand Up @@ -35,13 +33,21 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine
/** @var array<string, BatchableSubscriber> */
private array $batching = [];

private readonly MessageLoader $messageLoader;

public function __construct(
private readonly Store $messageStore,
Store|MessageLoader $messageStore,
SubscriptionStore $subscriptionStore,
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly LoggerInterface|null $logger = null,
) {
if ($messageStore instanceof MessageLoader) {
$this->messageLoader = $messageStore;
} else {
$this->messageLoader = new StoreMessageLoader($messageStore);
}

$this->subscriptionManager = new SubscriptionManager($subscriptionStore);
}

Expand Down Expand Up @@ -72,7 +78,7 @@ function (SubscriptionCollection $subscriptions) use ($skipBooting): Result {
/** @var list<Error> $errors */
$errors = [];

$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
Expand Down Expand Up @@ -193,9 +199,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$stream = $this->messageStore->load(
new Criteria(new FromIndexCriterion($startIndex)),
);
$stream = $this->messageLoader->load($startIndex, $subscriptions->toArray());

foreach ($stream as $message) {
$messageCounter++;
Expand Down Expand Up @@ -366,8 +370,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$criteria = new Criteria(new FromIndexCriterion($startIndex));
$stream = $this->messageStore->load($criteria);
$stream = $this->messageLoader->load($startIndex, $subscriptions->toArray());

foreach ($stream as $message) {
$messageCounter++;
Expand Down Expand Up @@ -966,7 +969,7 @@ private function discoverNewSubscriptions(): void

if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) {
if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();
}

$subscription->changePosition($latestIndex);
Expand All @@ -986,13 +989,6 @@ private function discoverNewSubscriptions(): void
$this->subscriptionManager->flush();
}

private function latestIndex(): int
{
$stream = $this->messageStore->load(null, 1, null, true);

return $stream->index() ?: 0;
}

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
Expand Down
84 changes: 84 additions & 0 deletions src/Subscription/Engine/EventFilteredStoreMessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;

use function array_keys;

final class EventFilteredStoreMessageLoader implements MessageLoader
{
public function __construct(
private readonly Store $store,
private readonly EventMetadataFactory $eventMetadataFactory,
private readonly SubscriberAccessorRepository $subscriberRepository,
) {
}

/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream
{
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events !== []) {
$criteria = $criteria->add(new EventsCriterion($events));
}

return $this->store->load($criteria);
}

/**
* @param list<Subscription> $subscriptions
*
* @return list<string>
*/
private function events(array $subscriptions): array
{
$eventNames = [];

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriberRepository->get($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
return [];
}

$events = $subscriber->events();

foreach ($events as $event) {
if ($event === '*') {
return [];
}

$metadata = $this->eventMetadataFactory->metadata($event);

$eventNames[$metadata->name] = true;

foreach ($metadata->aliases as $alias) {
$eventNames[$alias] = true;
}
}
}

return array_keys($eventNames);
}

public function lastIndex(): int
{
$stream = $this->store->load(null, 1, null, true);

return $stream->index() ?: 0;
}
}
Loading

0 comments on commit 91dabc1

Please sign in to comment.