diff --git a/deptrac.yaml b/deptrac.yaml index b405ba863..b793814f6 100644 --- a/deptrac.yaml +++ b/deptrac.yaml @@ -154,6 +154,7 @@ deptrac: - Attribute - Clock - Message + - MetadataEvent - MetadataSubscriber - Repository - Schema diff --git a/docs/pages/our-backward-compatibility-promise.md b/docs/pages/our-backward-compatibility-promise.md index 70c3ffe9d..70f7f1ffd 100644 --- a/docs/pages/our-backward-compatibility-promise.md +++ b/docs/pages/our-backward-compatibility-promise.md @@ -44,3 +44,4 @@ In our docs the features are marked like this: This feature is still experimental and may change in the future. Use it with caution. + \ No newline at end of file diff --git a/docs/pages/store.md b/docs/pages/store.md index 040cc8fcd..205f54263 100644 --- a/docs/pages/store.md +++ b/docs/pages/store.md @@ -321,6 +321,7 @@ use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion; use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; $criteria = new Criteria( @@ -329,6 +330,7 @@ $criteria = new Criteria( new FromPlayheadCriterion(2), new FromIndexCriterion(100), new ArchivedCriterion(true), + new EventsCriterion(['profile.created', 'profile.name_changed']), ); ``` Or you can the criteria builder to create the criteria. @@ -342,6 +344,7 @@ $criteria = (new CriteriaBuilder()) ->fromPlayhead(2) ->fromIndex(100) ->archived(true) + ->events(['profile.created', 'profile.name_changed']) ->build(); ``` #### Stream diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index b7e1335dc..52f8731b8 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -645,6 +645,44 @@ There are two options here: In order for the subscription engine to be able to do its work, you have to assemble it beforehand. +### Message Loader + +The subscription engine needs a message loader to load the messages. +We provide two implementations by default. +Which one has a better performance depends on the use case. + +#### Store Message Loader + +The store message loader loads all the messages from the event store. + +```php +use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Subscription\Engine\StoreMessageLoader; + +/** @var Store $store */ +$messageLoader = new StoreMessageLoader($store); +``` +#### Event Filtered Store Message Loader + +The event filtered store message loader loads only the messages that are relevant for the subscribers. +It looks before loading the messages which subscribers are interested in the events. +Then it loads with a filter only the relevant messages. + +```php +use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; + +/** + * @var Store $store + * @var EventMetadataFactory $eventMetadataFactory + * @var SubscriberRepository $subscriberRepository + */ +$messageLoader = new EventFilteredStoreMessageLoader( + $store, + $eventMetadataFactory, + $subscriberRepository, +); +``` ### Subscription Store The Subscription Engine uses a subscription store to store the status of each subscription. @@ -730,24 +768,24 @@ $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([ ### Subscription Engine Now we can create the subscription engine and plug together the necessary services. -The event store is needed to load the events, the Subscription Store to store the subscription state +The message loader is needed to load the messages, the Subscription Store to store the subscription state and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy. ```php -use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader; use Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; /** - * @var Store $eventStore + * @var MessageLoader $messageLoader * @var DoctrineSubscriptionStore $subscriptionStore * @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository * @var NoRetryStrategy $retryStrategy */ $subscriptionEngine = new DefaultSubscriptionEngine( - $eventStore, + $messageLoader, $subscriptionStore, $subscriberAccessorRepository, $retryStrategy, diff --git a/src/Store/Criteria/CriteriaBuilder.php b/src/Store/Criteria/CriteriaBuilder.php index 1a45f1e74..850789040 100644 --- a/src/Store/Criteria/CriteriaBuilder.php +++ b/src/Store/Criteria/CriteriaBuilder.php @@ -13,6 +13,9 @@ final class CriteriaBuilder private int|null $fromPlayhead = null; private bool|null $archived = null; + /** @var list|null */ + private array|null $events = null; + /** @experimental */ public function streamName(string|null $streamName): self { @@ -56,6 +59,14 @@ public function archived(bool|null $archived): self return $this; } + /** @param list|null $events */ + public function events(array|null $events): self + { + $this->events = $events; + + return $this; + } + public function build(): Criteria { $criteria = []; @@ -84,6 +95,10 @@ public function build(): Criteria $criteria[] = new ArchivedCriterion($this->archived); } + if ($this->events !== null) { + $criteria[] = new EventsCriterion($this->events); + } + return new Criteria(...$criteria); } } diff --git a/src/Store/Criteria/EventsCriterion.php b/src/Store/Criteria/EventsCriterion.php new file mode 100644 index 000000000..310d74614 --- /dev/null +++ b/src/Store/Criteria/EventsCriterion.php @@ -0,0 +1,14 @@ + */ + public readonly array $events, + ) { + } +} diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 1916e025a..28a9a977b 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Store; use Closure; +use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\MariaDBPlatform; @@ -26,6 +27,7 @@ use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use PDO; @@ -156,6 +158,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void $builder->andWhere('id > :index'); $builder->setParameter('index', $criterion->fromIndex, Types::INTEGER); break; + case EventsCriterion::class: + $builder->andWhere('event IN (:events)'); + $builder->setParameter('events', $criterion->events, ArrayParameterType::STRING); + break; default: throw new UnsupportedCriterion($criterion::class); } diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index 703c561fb..04b0ff704 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Store; use Closure; +use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\MariaDBPlatform; @@ -24,6 +25,7 @@ use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; @@ -169,6 +171,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void $builder->andWhere('id > :index'); $builder->setParameter('index', $criterion->fromIndex, Types::INTEGER); break; + case EventsCriterion::class: + $builder->andWhere('event IN (:events)'); + $builder->setParameter('events', $criterion->events, ArrayParameterType::STRING); + break; default: throw new UnsupportedCriterion($criterion::class); } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index cdb4b6b2c..aa73984ed 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -5,8 +5,6 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Store\Criteria\Criteria; -use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy; @@ -35,13 +33,21 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine /** @var array */ private array $batching = []; + private readonly MessageLoader $messageLoader; + public function __construct( - private readonly Store $messageStore, + Store|MessageLoader $messageStore, SubscriptionStore $subscriptionStore, private readonly SubscriberAccessorRepository $subscriberRepository, private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(), private readonly LoggerInterface|null $logger = null, ) { + if ($messageStore instanceof MessageLoader) { + $this->messageLoader = $messageStore; + } else { + $this->messageLoader = new StoreMessageLoader($messageStore); + } + $this->subscriptionManager = new SubscriptionManager($subscriptionStore); } @@ -72,7 +78,7 @@ function (SubscriptionCollection $subscriptions) use ($skipBooting): Result { /** @var list $errors */ $errors = []; - $latestIndex = $this->latestIndex(); + $latestIndex = $this->messageLoader->lastIndex(); foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); @@ -193,9 +199,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $stream = $this->messageStore->load( - new Criteria(new FromIndexCriterion($startIndex)), - ); + $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); foreach ($stream as $message) { $messageCounter++; @@ -366,8 +370,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - $stream = $this->messageStore->load($criteria); + $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); foreach ($stream as $message) { $messageCounter++; @@ -966,7 +969,7 @@ private function discoverNewSubscriptions(): void if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) { if ($latestIndex === null) { - $latestIndex = $this->latestIndex(); + $latestIndex = $this->messageLoader->lastIndex(); } $subscription->changePosition($latestIndex); @@ -986,13 +989,6 @@ private function discoverNewSubscriptions(): void $this->subscriptionManager->flush(); } - private function latestIndex(): int - { - $stream = $this->messageStore->load(null, 1, null, true); - - return $stream->index() ?: 0; - } - private function handleError(Subscription $subscription, Throwable $throwable): void { $subscription->error($throwable); diff --git a/src/Subscription/Engine/EventFilteredStoreMessageLoader.php b/src/Subscription/Engine/EventFilteredStoreMessageLoader.php new file mode 100644 index 000000000..217588d40 --- /dev/null +++ b/src/Subscription/Engine/EventFilteredStoreMessageLoader.php @@ -0,0 +1,84 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream + { + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + + $events = $this->events($subscriptions); + + if ($events !== []) { + $criteria = $criteria->add(new EventsCriterion($events)); + } + + return $this->store->load($criteria); + } + + /** + * @param list $subscriptions + * + * @return list + */ + private function events(array $subscriptions): array + { + $eventNames = []; + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber instanceof MetadataSubscriberAccessor) { + return []; + } + + $events = $subscriber->events(); + + foreach ($events as $event) { + if ($event === '*') { + return []; + } + + $metadata = $this->eventMetadataFactory->metadata($event); + + $eventNames[$metadata->name] = true; + + foreach ($metadata->aliases as $alias) { + $eventNames[$alias] = true; + } + } + } + + return array_keys($eventNames); + } + + public function lastIndex(): int + { + $stream = $this->store->load(null, 1, null, true); + + return $stream->index() ?: 0; + } +} diff --git a/src/Subscription/Engine/MessageLoader.php b/src/Subscription/Engine/MessageLoader.php new file mode 100644 index 000000000..1bdf5a655 --- /dev/null +++ b/src/Subscription/Engine/MessageLoader.php @@ -0,0 +1,16 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream; + + public function lastIndex(): int; +} diff --git a/src/Subscription/Engine/StoreMessageLoader.php b/src/Subscription/Engine/StoreMessageLoader.php new file mode 100644 index 000000000..eae336dd1 --- /dev/null +++ b/src/Subscription/Engine/StoreMessageLoader.php @@ -0,0 +1,32 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream + { + return $this->store->load(new Criteria(new FromIndexCriterion($startIndex))); + } + + public function lastIndex(): int + { + $stream = $this->store->load(null, 1, null, true); + + return $stream->index() ?: 0; + } +} diff --git a/src/Subscription/Engine/SubscriptionCollection.php b/src/Subscription/Engine/SubscriptionCollection.php index f1ed60b55..a8fba233c 100644 --- a/src/Subscription/Engine/SubscriptionCollection.php +++ b/src/Subscription/Engine/SubscriptionCollection.php @@ -64,4 +64,10 @@ public function lowestPosition(): int return $min; } + + /** @return list */ + public function toArray(): array + { + return $this->subscriptions; + } } diff --git a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php index ab1ac328d..42336abee 100644 --- a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php +++ b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; use function array_key_exists; +use function array_keys; use function array_map; use function array_merge; @@ -84,6 +85,12 @@ public function teardownMethod(): Closure|null return $this->subscriber->$method(...); } + /** @return list */ + public function events(): array + { + return array_keys($this->metadata->subscribeMethods); + } + /** * @param class-string $eventClass * diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index fe5fdfde2..37d2d7638 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; +use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; @@ -13,6 +14,7 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -63,20 +65,27 @@ public function setUp(): void $profile = Profile::create($this->id, 'Peter'); for ($i = 1; $i < 10_000; $i++) { + $profile->changeEmail('peter' . $i . '@example.com'); $profile->changeName('Peter ' . $i); } $this->repository->save($profile); + $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository( + [ + new ProfileProjector($connection), + new SendEmailProcessor(), + ], + ); + $this->subscriptionEngine = new DefaultSubscriptionEngine( - $this->store, - $subscriptionStore, - new MetadataSubscriberAccessorRepository( - [ - new ProfileProjector($connection), - new SendEmailProcessor(), - ], + new EventFilteredStoreMessageLoader( + $this->store, + new AttributeEventMetadataFactory(), + $subscriberAccessorRepository, ), + $subscriptionStore, + $subscriberAccessorRepository, ); } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index ede3f43b2..088f7ddf1 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Clock\FrozenClock; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; +use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -21,6 +22,7 @@ use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -92,10 +94,12 @@ public function testHappyPath(): void $schemaDirector->create(); + $subscriberRepository = new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]); + $engine = new DefaultSubscriptionEngine( - $store, + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), $subscriptionStore, - new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]), + $subscriberRepository, ); self::assertEquals( diff --git a/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php b/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php index 6876fdd77..83f6d16b2 100644 --- a/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php +++ b/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php @@ -9,6 +9,7 @@ use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use PHPUnit\Framework\TestCase; @@ -33,6 +34,7 @@ public function testFull(): void ->fromIndex(1) ->fromPlayhead(1) ->archived(true) + ->events(['foo', 'bar']) ->build(); self::assertEquals( @@ -42,6 +44,7 @@ public function testFull(): void new FromIndexCriterion(1), new FromPlayheadCriterion(1), new ArchivedCriterion(true), + new EventsCriterion(['foo', 'bar']), ), $criteria, );