Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter events in subscription engine #637

Merged
merged 6 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ deptrac:
- Attribute
- Clock
- Message
- MetadataEvent
- MetadataSubscriber
- Repository
- Schema
Expand Down
1 change: 1 addition & 0 deletions docs/pages/our-backward-compatibility-promise.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ In our docs the features are marked like this:

This feature is still experimental and may change in the future.
Use it with caution.

3 changes: 3 additions & 0 deletions docs/pages/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;

$criteria = new Criteria(
Expand All @@ -329,6 +330,7 @@ $criteria = new Criteria(
new FromPlayheadCriterion(2),
new FromIndexCriterion(100),
new ArchivedCriterion(true),
new EventsCriterion(['profile.created', 'profile.name_changed']),
);
```
Or you can the criteria builder to create the criteria.
Expand All @@ -342,6 +344,7 @@ $criteria = (new CriteriaBuilder())
->fromPlayhead(2)
->fromIndex(100)
->archived(true)
->events(['profile.created', 'profile.name_changed'])
->build();
```
#### Stream
Expand Down
46 changes: 42 additions & 4 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,44 @@ There are two options here:

In order for the subscription engine to be able to do its work, you have to assemble it beforehand.

### Message Loader

The subscription engine needs a message loader to load the messages.
We provide two implementations by default.
Which one has a better performance depends on the use case.

#### Store Message Loader

The store message loader loads all the messages from the event store.

```php
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\StoreMessageLoader;

/** @var Store $store */
$messageLoader = new StoreMessageLoader($store);
```
#### Event Filtered Store Message Loader

The event filtered store message loader loads only the messages that are relevant for the subscribers.
It looks before loading the messages which subscribers are interested in the events.
Then it loads with a filter only the relevant messages.

```php
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader;

/**
* @var Store $store
* @var EventMetadataFactory $eventMetadataFactory
* @var SubscriberRepository $subscriberRepository
*/
$messageLoader = new EventFilteredStoreMessageLoader(
$store,
$eventMetadataFactory,
$subscriberRepository,
);
```
### Subscription Store

The Subscription Engine uses a subscription store to store the status of each subscription.
Expand Down Expand Up @@ -730,24 +768,24 @@ $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([
### Subscription Engine

Now we can create the subscription engine and plug together the necessary services.
The event store is needed to load the events, the Subscription Store to store the subscription state
The message loader is needed to load the messages, the Subscription Store to store the subscription state
and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy.

```php
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;

/**
* @var Store $eventStore
* @var MessageLoader $messageLoader
* @var DoctrineSubscriptionStore $subscriptionStore
* @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository
* @var NoRetryStrategy $retryStrategy
*/
$subscriptionEngine = new DefaultSubscriptionEngine(
$eventStore,
$messageLoader,
$subscriptionStore,
$subscriberAccessorRepository,
$retryStrategy,
Expand Down
15 changes: 15 additions & 0 deletions src/Store/Criteria/CriteriaBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ final class CriteriaBuilder
private int|null $fromPlayhead = null;
private bool|null $archived = null;

/** @var list<string>|null */
private array|null $events = null;

/** @experimental */
public function streamName(string|null $streamName): self
{
Expand Down Expand Up @@ -56,6 +59,14 @@ public function archived(bool|null $archived): self
return $this;
}

/** @param list<string>|null $events */
public function events(array|null $events): self
{
$this->events = $events;

return $this;
}

public function build(): Criteria
{
$criteria = [];
Expand Down Expand Up @@ -84,6 +95,10 @@ public function build(): Criteria
$criteria[] = new ArchivedCriterion($this->archived);
}

if ($this->events !== null) {
$criteria[] = new EventsCriterion($this->events);
}

return new Criteria(...$criteria);
}
}
14 changes: 14 additions & 0 deletions src/Store/Criteria/EventsCriterion.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Criteria;

final class EventsCriterion
{
public function __construct(
/** @var list<string> */
public readonly array $events,
) {
}
}
6 changes: 6 additions & 0 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -26,6 +27,7 @@
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use PDO;
Expand Down Expand Up @@ -156,6 +158,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
6 changes: 6 additions & 0 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -24,6 +25,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
Expand Down Expand Up @@ -169,6 +171,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
30 changes: 13 additions & 17 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
Expand Down Expand Up @@ -35,13 +33,21 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine
/** @var array<string, BatchableSubscriber> */
private array $batching = [];

private readonly MessageLoader $messageLoader;

public function __construct(
private readonly Store $messageStore,
Store|MessageLoader $messageStore,
SubscriptionStore $subscriptionStore,
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly LoggerInterface|null $logger = null,
) {
if ($messageStore instanceof MessageLoader) {
$this->messageLoader = $messageStore;
} else {
$this->messageLoader = new StoreMessageLoader($messageStore);
}

$this->subscriptionManager = new SubscriptionManager($subscriptionStore);
}

Expand Down Expand Up @@ -72,7 +78,7 @@ function (SubscriptionCollection $subscriptions) use ($skipBooting): Result {
/** @var list<Error> $errors */
$errors = [];

$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
Expand Down Expand Up @@ -193,9 +199,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$stream = $this->messageStore->load(
new Criteria(new FromIndexCriterion($startIndex)),
);
$stream = $this->messageLoader->load($startIndex, $subscriptions->toArray());

foreach ($stream as $message) {
$messageCounter++;
Expand Down Expand Up @@ -366,8 +370,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$criteria = new Criteria(new FromIndexCriterion($startIndex));
$stream = $this->messageStore->load($criteria);
$stream = $this->messageLoader->load($startIndex, $subscriptions->toArray());

foreach ($stream as $message) {
$messageCounter++;
Expand Down Expand Up @@ -966,7 +969,7 @@ private function discoverNewSubscriptions(): void

if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) {
if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();
}

$subscription->changePosition($latestIndex);
Expand All @@ -986,13 +989,6 @@ private function discoverNewSubscriptions(): void
$this->subscriptionManager->flush();
}

private function latestIndex(): int
{
$stream = $this->messageStore->load(null, 1, null, true);

return $stream->index() ?: 0;
}

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
Expand Down
84 changes: 84 additions & 0 deletions src/Subscription/Engine/EventFilteredStoreMessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;

use function array_keys;

final class EventFilteredStoreMessageLoader implements MessageLoader
{
public function __construct(
private readonly Store $store,
private readonly EventMetadataFactory $eventMetadataFactory,
private readonly SubscriberAccessorRepository $subscriberRepository,
) {
}

/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream
{
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events !== []) {
$criteria = $criteria->add(new EventsCriterion($events));
}

return $this->store->load($criteria);
}

/**
* @param list<Subscription> $subscriptions
*
* @return list<string>
*/
private function events(array $subscriptions): array
{
$eventNames = [];

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriberRepository->get($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
return [];
}

$events = $subscriber->events();

foreach ($events as $event) {
if ($event === '*') {
return [];
}

$metadata = $this->eventMetadataFactory->metadata($event);

$eventNames[$metadata->name] = true;

foreach ($metadata->aliases as $alias) {
$eventNames[$alias] = true;
}
}
}

return array_keys($eventNames);
}

public function lastIndex(): int
{
$stream = $this->store->load(null, 1, null, true);

return $stream->index() ?: 0;
}
}
Loading
Loading