Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter events in subscription engine #637

Merged
merged 6 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ deptrac:
- Attribute
- Clock
- Message
- MetadataEvent
- MetadataSubscriber
- Repository
- Schema
Expand Down
14 changes: 14 additions & 0 deletions src/Store/Criteria/EventsCriterion.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Criteria;

final class EventsCriterion
{
public function __construct(
/** @var list<string> */
public readonly array $events,
) {
}
}
6 changes: 6 additions & 0 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -26,6 +27,7 @@
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use PDO;
Expand Down Expand Up @@ -156,6 +158,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
6 changes: 6 additions & 0 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Store;

use Closure;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
Expand All @@ -24,6 +25,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
Expand Down Expand Up @@ -169,6 +171,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
throw new UnsupportedCriterion($criterion::class);
}
Expand Down
30 changes: 13 additions & 17 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
Expand Down Expand Up @@ -35,13 +33,21 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine
/** @var array<string, BatchableSubscriber> */
private array $batching = [];

private readonly MessageLoader $messageLoader;

public function __construct(
private readonly Store $messageStore,
Store|MessageLoader $messageStore,
SubscriptionStore $subscriptionStore,
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly LoggerInterface|null $logger = null,
) {
if ($messageStore instanceof MessageLoader) {
$this->messageLoader = $messageStore;
} else {
$this->messageLoader = new StoreMessageLoader($messageStore);
}

$this->subscriptionManager = new SubscriptionManager($subscriptionStore);
}

Expand Down Expand Up @@ -72,7 +78,7 @@ function (SubscriptionCollection $subscriptions) use ($skipBooting): Result {
/** @var list<Error> $errors */
$errors = [];

$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
Expand Down Expand Up @@ -193,9 +199,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$stream = $this->messageStore->load(
new Criteria(new FromIndexCriterion($startIndex)),
);
$stream = $this->messageLoader->load($startIndex, $subscriptions->toArray());

foreach ($stream as $message) {
$messageCounter++;
Expand Down Expand Up @@ -366,8 +370,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
$messageCounter = 0;

try {
$criteria = new Criteria(new FromIndexCriterion($startIndex));
$stream = $this->messageStore->load($criteria);
$stream = $this->messageLoader->load($startIndex, $subscriptions->toArray());

foreach ($stream as $message) {
$messageCounter++;
Expand Down Expand Up @@ -966,7 +969,7 @@ private function discoverNewSubscriptions(): void

if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) {
if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
$latestIndex = $this->messageLoader->lastIndex();
}

$subscription->changePosition($latestIndex);
Expand All @@ -986,13 +989,6 @@ private function discoverNewSubscriptions(): void
$this->subscriptionManager->flush();
}

private function latestIndex(): int
{
$stream = $this->messageStore->load(null, 1, null, true);

return $stream->index() ?: 0;
}

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
Expand Down
84 changes: 84 additions & 0 deletions src/Subscription/Engine/EventFilteredStoreMessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;

use function array_keys;

final class EventFilteredStoreMessageLoader implements MessageLoader
{
public function __construct(
private readonly Store $store,
private readonly EventMetadataFactory $eventMetadataFactory,
private readonly SubscriberAccessorRepository $subscriberRepository,
) {
}

/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream
{
$criteria = new Criteria(new FromIndexCriterion($startIndex));

$events = $this->events($subscriptions);

if ($events !== []) {
$criteria = $criteria->add(new EventsCriterion($events));
}

return $this->store->load($criteria);
}

/**
* @param list<Subscription> $subscriptions
*
* @return list<string>
*/
private function events(array $subscriptions): array
{
$eventNames = [];

foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriberRepository->get($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
return [];
}

$events = $subscriber->events();

foreach ($events as $event) {
if ($event === '*') {
return [];
}

$metadata = $this->eventMetadataFactory->metadata($event);

$eventNames[$metadata->name] = true;

foreach ($metadata->aliases as $alias) {
$eventNames[$alias] = true;
}
}
}

return array_keys($eventNames);
}

public function lastIndex(): int
{
$stream = $this->store->load(null, 1, null, true);

return $stream->index() ?: 0;
}
}
16 changes: 16 additions & 0 deletions src/Subscription/Engine/MessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscription;

interface MessageLoader
{
/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream;

public function lastIndex(): int;
}
32 changes: 32 additions & 0 deletions src/Subscription/Engine/StoreMessageLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Subscription\Subscription;

final class StoreMessageLoader implements MessageLoader
{
public function __construct(
private readonly Store $store,
) {
}

/** @param list<Subscription> $subscriptions */
public function load(int $startIndex, array $subscriptions): Stream
{
return $this->store->load(new Criteria(new FromIndexCriterion($startIndex)));
}

public function lastIndex(): int
{
$stream = $this->store->load(null, 1, null, true);

return $stream->index() ?: 0;
}
}
6 changes: 6 additions & 0 deletions src/Subscription/Engine/SubscriptionCollection.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ public function lowestPosition(): int

return $min;
}

/** @return list<Subscription> */
public function toArray(): array
{
return $this->subscriptions;
}
}
7 changes: 7 additions & 0 deletions src/Subscription/Subscriber/MetadataSubscriberAccessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver;

use function array_key_exists;
use function array_keys;
use function array_map;
use function array_merge;

Expand Down Expand Up @@ -84,6 +85,12 @@ public function teardownMethod(): Closure|null
return $this->subscriber->$method(...);
}

/** @return list<class-string|'*'> */
public function events(): array
{
return array_keys($this->metadata->subscribeMethods);
}

/**
* @param class-string $eventClass
*
Expand Down
23 changes: 16 additions & 7 deletions tests/Benchmark/SubscriptionEngineBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
Expand All @@ -13,6 +14,7 @@
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
Expand Down Expand Up @@ -63,20 +65,27 @@ public function setUp(): void
$profile = Profile::create($this->id, 'Peter');

for ($i = 1; $i < 10_000; $i++) {
$profile->changeEmail('peter' . $i . '@example.com');
$profile->changeName('Peter ' . $i);
}

$this->repository->save($profile);

$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository(
[
new ProfileProjector($connection),
new SendEmailProcessor(),
],
);

$this->subscriptionEngine = new DefaultSubscriptionEngine(
$this->store,
$subscriptionStore,
new MetadataSubscriberAccessorRepository(
[
new ProfileProjector($connection),
new SendEmailProcessor(),
],
new EventFilteredStoreMessageLoader(
$this->store,
new AttributeEventMetadataFactory(),
$subscriberAccessorRepository,
),
$subscriptionStore,
$subscriberAccessorRepository,
);
}

Expand Down
Loading
Loading