diff --git a/baseline.xml b/baseline.xml
index b5d658ceb..e45f67a5a 100644
--- a/baseline.xml
+++ b/baseline.xml
@@ -10,15 +10,6 @@
frozenDateTime->modify(sprintf('+%s seconds', $seconds))]]>
-
-
-
-
-
-
-
-
-
@@ -151,14 +142,14 @@
-
+
-
+
@@ -181,7 +172,7 @@
-
+
diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md
index 1cb9e7865..9e57c9ba3 100644
--- a/docs/pages/subscription.md
+++ b/docs/pages/subscription.md
@@ -658,6 +658,25 @@ $subscriptionEngine = new DefaultSubscriptionEngine(
$retryStrategy,
);
```
+### Catchup Subscription Engine
+
+If aggregates are used in the processors and new events are generated there,
+then they are not part of the current subscription engine run and will only be processed during the next run or boot.
+This is usually not a problem in dev or prod environment because a worker is used
+and these events will be processed at some point. But in testing it is not so easy.
+For this reason, we have the `CatchupSubscriptionEngine`.
+
+```php
+use Patchlevel\EventSourcing\Subscription\Engine\CatchupSubscriptionEngine;
+use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
+
+/** @var SubscriptionEngine $subscriptionStore */
+$catchupSubscriptionEngine = new CatchupSubscriptionEngine($subscriptionEngine);
+```
+!!! tip
+
+ You can use the `CatchupSubscriptionEngine` in your tests to process the events immediately.
+
## Usage
The Subscription Engine has a few methods needed to use it effectively.
diff --git a/src/Subscription/Engine/CatchUpSubscriptionEngine.php b/src/Subscription/Engine/CatchUpSubscriptionEngine.php
new file mode 100644
index 000000000..8a0b3cdf5
--- /dev/null
+++ b/src/Subscription/Engine/CatchUpSubscriptionEngine.php
@@ -0,0 +1,107 @@
+parent->setup($criteria, $skipBooting);
+ }
+
+ public function boot(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult
+ {
+ $results = [];
+
+ $catchupLimit = $this->limit ?? PHP_INT_MAX;
+
+ for ($i = 0; $i < $catchupLimit; $i++) {
+ $lastResult = $this->parent->boot($criteria, $limit);
+
+ $results[] = $lastResult;
+
+ if ($lastResult->processedMessages === 0) {
+ break;
+ }
+ }
+
+ return $this->mergeResult(...$results);
+ }
+
+ public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult
+ {
+ $mergedResult = new ProcessedResult(0);
+
+ $catchupLimit = $this->limit ?? PHP_INT_MAX;
+
+ for ($i = 0; $i < $catchupLimit; $i++) {
+ $result = $this->parent->run($criteria, $limit);
+ $mergedResult = $this->mergeResult($mergedResult, $result);
+
+ if ($result->processedMessages === 0) {
+ break;
+ }
+ }
+
+ return $mergedResult;
+ }
+
+ public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result
+ {
+ return $this->parent->teardown($criteria);
+ }
+
+ public function remove(SubscriptionEngineCriteria|null $criteria = null): Result
+ {
+ return $this->parent->remove($criteria);
+ }
+
+ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result
+ {
+ return $this->parent->reactivate($criteria);
+ }
+
+ public function pause(SubscriptionEngineCriteria|null $criteria = null): Result
+ {
+ return $this->parent->pause($criteria);
+ }
+
+ /** @return list */
+ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array
+ {
+ return $this->parent->subscriptions($criteria);
+ }
+
+ private function mergeResult(ProcessedResult ...$results): ProcessedResult
+ {
+ $processedMessages = 0;
+ $streamFinished = false;
+ $errors = [];
+
+ foreach ($results as $result) {
+ $processedMessages += $result->processedMessages;
+ $streamFinished = $result->streamFinished;
+ $errors[] = $result->errors;
+ }
+
+ return new ProcessedResult(
+ $processedMessages,
+ $streamFinished,
+ array_merge(...$errors),
+ );
+ }
+}
diff --git a/tests/Integration/BankAccountSplitStream/Aggregate/BankAccount.php b/tests/Integration/BankAccountSplitStream/BankAccount.php
similarity index 95%
rename from tests/Integration/BankAccountSplitStream/Aggregate/BankAccount.php
rename to tests/Integration/BankAccountSplitStream/BankAccount.php
index 61f4dd5b1..e48a3eccd 100644
--- a/tests/Integration/BankAccountSplitStream/Aggregate/BankAccount.php
+++ b/tests/Integration/BankAccountSplitStream/BankAccount.php
@@ -2,13 +2,12 @@
declare(strict_types=1);
-namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Aggregate;
+namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream;
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
-use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\AccountId;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BalanceAdded;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BankAccountCreated;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\MonthPassed;
diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php
index 9540725a6..23176e583 100644
--- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php
+++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php
@@ -17,7 +17,6 @@
use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Tests\DbalManager;
-use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Aggregate\BankAccount;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BalanceAdded;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BankAccountCreated;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\MonthPassed;
diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php
index 6e505a784..3762e0289 100644
--- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php
+++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php
@@ -18,7 +18,6 @@
use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Tests\DbalManager;
-use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Aggregate\Profile;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener\SendEmailListener;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\MessageDecorator\FooMessageDecorator;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Projection\ProfileProjector;
diff --git a/tests/Integration/BasicImplementation/Aggregate/Profile.php b/tests/Integration/BasicImplementation/Profile.php
similarity index 92%
rename from tests/Integration/BasicImplementation/Aggregate/Profile.php
rename to tests/Integration/BasicImplementation/Profile.php
index 4b8063a27..2809ceaf4 100644
--- a/tests/Integration/BasicImplementation/Aggregate/Profile.php
+++ b/tests/Integration/BasicImplementation/Profile.php
@@ -2,7 +2,7 @@
declare(strict_types=1);
-namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Aggregate;
+namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation;
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
@@ -11,7 +11,6 @@
use Patchlevel\EventSourcing\Attribute\Snapshot;
use Patchlevel\EventSourcing\Serializer\Normalizer\IdNormalizer;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated;
-use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\ProfileId;
#[Aggregate('profile')]
#[Snapshot('default', 100)]
diff --git a/tests/Integration/Subscription/Events/AdminPromoted.php b/tests/Integration/Subscription/Events/AdminPromoted.php
new file mode 100644
index 000000000..e63c8acae
--- /dev/null
+++ b/tests/Integration/Subscription/Events/AdminPromoted.php
@@ -0,0 +1,12 @@
+recordThat(new NameChanged($name));
}
+ public function promoteToAdmin(): void
+ {
+ $this->recordThat(new AdminPromoted());
+ }
+
#[Apply]
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
+ $this->isAdmin = false;
}
#[Apply]
@@ -47,8 +55,19 @@ protected function applyNameChanged(NameChanged $event): void
$this->name = $event->name;
}
+ #[Apply]
+ protected function applyAdminPromoted(AdminPromoted $event): void
+ {
+ $this->isAdmin = true;
+ }
+
public function name(): string
{
return $this->name;
}
+
+ public function isAdmin(): bool
+ {
+ return $this->isAdmin;
+ }
}
diff --git a/tests/Integration/Subscription/Subscriber/ProfileProcessor.php b/tests/Integration/Subscription/Subscriber/ProfileProcessor.php
index f1ee8faa1..1a571de36 100644
--- a/tests/Integration/Subscription/Subscriber/ProfileProcessor.php
+++ b/tests/Integration/Subscription/Subscriber/ProfileProcessor.php
@@ -7,8 +7,10 @@
use Patchlevel\EventSourcing\Attribute\Processor;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Repository\RepositoryManager;
-use Patchlevel\EventSourcing\Tests\Integration\Subscription\Aggregate\Profile;
+use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated;
+use Patchlevel\EventSourcing\Tests\Integration\Subscription\Profile;
+use Patchlevel\EventSourcing\Tests\Integration\Subscription\ProfileId;
#[Processor('profile')]
final class ProfileProcessor
@@ -25,7 +27,23 @@ public function handleProfileCreated(ProfileCreated $profileCreated): void
$profile = $repository->load($profileCreated->profileId);
- $profile->changeName('new name');
+ $profile->changeName('admin');
+
+ $repository->save($profile);
+ }
+
+ #[Subscribe(NameChanged::class)]
+ public function handleNameChanged(NameChanged $nameChanged, ProfileId $profileId): void
+ {
+ $repository = $this->repositoryManager->get(Profile::class);
+
+ $profile = $repository->load($profileId);
+
+ if ($profile->name() !== 'admin') {
+ return;
+ }
+
+ $profile->promoteToAdmin();
$repository->save($profile);
}
diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php
index 290bb8761..5e1977358 100644
--- a/tests/Integration/Subscription/SubscriptionTest.php
+++ b/tests/Integration/Subscription/SubscriptionTest.php
@@ -18,6 +18,7 @@
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
+use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
@@ -27,7 +28,6 @@
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Patchlevel\EventSourcing\Tests\DbalManager;
-use Patchlevel\EventSourcing\Tests\Integration\Subscription\Aggregate\Profile;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerSubscriber;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor;
@@ -393,10 +393,12 @@ public function testProcessor(): void
$schemaDirector->create();
- $engine = new DefaultSubscriptionEngine(
- $store,
- $subscriptionStore,
- $subscriberAccessorRepository,
+ $engine = new CatchUpSubscriptionEngine(
+ new DefaultSubscriptionEngine(
+ $store,
+ $subscriptionStore,
+ $subscriberAccessorRepository,
+ ),
);
self::assertEquals(
@@ -446,8 +448,8 @@ public function testProcessor(): void
/** @var list $messages */
$messages = iterator_to_array($store->load());
- self::assertCount(2, $messages);
- self::assertArrayHasKey(1, $messages);
+ self::assertCount(3, $messages);
+ self::assertArrayHasKey(2, $messages);
self::assertEquals(
new TraceHeader([
@@ -456,7 +458,7 @@ public function testProcessor(): void
'category' => 'event_sourcing/subscriber/processor',
],
]),
- $messages[1]->header(TraceHeader::class),
+ $messages[2]->header(TraceHeader::class),
);
}
diff --git a/tests/Unit/Subscription/Engine/CatchUpSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/CatchUpSubscriptionEngineTest.php
new file mode 100644
index 000000000..8b3ce2489
--- /dev/null
+++ b/tests/Unit/Subscription/Engine/CatchUpSubscriptionEngineTest.php
@@ -0,0 +1,221 @@
+prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedResult = new Result();
+
+ $parent->setup($criteria, true)->willReturn($expectedResult)->shouldBeCalledOnce();
+ $result = $engine->setup($criteria, true);
+
+ self::assertSame($expectedResult, $result);
+ }
+
+ public function testBootFinished(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedResult = new ProcessedResult(0);
+
+ $parent->boot($criteria, 42)->willReturn($expectedResult)->shouldBeCalledTimes(1);
+ $result = $engine->boot($criteria, 42);
+
+ self::assertEquals($expectedResult, $result);
+ }
+
+ public function testBootSecondTime(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $error = new Error(
+ 'foo',
+ 'bar',
+ new RuntimeException('baz'),
+ );
+
+ $parent->boot($criteria, 42)->willReturn(
+ new ProcessedResult(1),
+ new ProcessedResult(0, true, [$error]),
+ )->shouldBeCalledTimes(2);
+
+ $result = $engine->boot($criteria, 42);
+
+ self::assertEquals(new ProcessedResult(1, true, [$error]), $result);
+ }
+
+ public function testBootLimit(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal(), 2);
+ $criteria = new SubscriptionEngineCriteria();
+
+ $parent->boot($criteria, 42)->willReturn(
+ new ProcessedResult(1),
+ new ProcessedResult(1),
+ )->shouldBeCalledTimes(2);
+
+ $result = $engine->boot($criteria, 42);
+
+ self::assertEquals(new ProcessedResult(2), $result);
+ }
+
+ public function testRunFinished(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedResult = new ProcessedResult(0);
+
+ $parent->run($criteria, 42)->willReturn($expectedResult)->shouldBeCalledOnce();
+ $result = $engine->run($criteria, 42);
+
+ self::assertEquals($expectedResult, $result);
+ }
+
+ public function testRunSecondTime(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $error = new Error(
+ 'foo',
+ 'bar',
+ new RuntimeException('baz'),
+ );
+
+ $parent->run($criteria, 42)->willReturn(
+ new ProcessedResult(1, true, [$error]),
+ new ProcessedResult(0),
+ )->shouldBeCalledTimes(2);
+ $result = $engine->run($criteria, 42);
+
+ self::assertEquals(new ProcessedResult(1, false, [$error]), $result);
+ }
+
+ public function testRunLimit(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal(), 2);
+ $criteria = new SubscriptionEngineCriteria();
+
+ $parent->run($criteria, 42)->willReturn(
+ new ProcessedResult(1),
+ new ProcessedResult(1),
+ )->shouldBeCalledTimes(2);
+
+ $result = $engine->run($criteria, 42);
+
+ self::assertEquals(new ProcessedResult(2), $result);
+ }
+
+ public function testTeardown(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedResult = new Result();
+
+ $parent->teardown($criteria)->willReturn($expectedResult)->shouldBeCalledOnce();
+ $result = $engine->teardown($criteria);
+
+ self::assertSame($expectedResult, $result);
+ }
+
+ public function testRemove(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedResult = new Result();
+
+ $parent->remove($criteria)->willReturn($expectedResult)->shouldBeCalledOnce();
+ $result = $engine->remove($criteria);
+
+ self::assertSame($expectedResult, $result);
+ }
+
+ public function testReactivate(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedResult = new Result();
+
+ $parent->reactivate($criteria)->willReturn($expectedResult)->shouldBeCalledOnce();
+ $result = $engine->reactivate($criteria);
+
+ self::assertSame($expectedResult, $result);
+ }
+
+ public function testPause(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedResult = new Result();
+
+ $parent->pause($criteria)->willReturn($expectedResult)->shouldBeCalledOnce();
+ $result = $engine->pause($criteria);
+
+ self::assertSame($expectedResult, $result);
+ }
+
+ public function testSubscriptions(): void
+ {
+ $parent = $this->prophesize(SubscriptionEngine::class);
+
+ $engine = new CatchUpSubscriptionEngine($parent->reveal());
+ $criteria = new SubscriptionEngineCriteria();
+
+ $expectedSubscriptions = [new Subscription('foo')];
+
+ $parent->subscriptions($criteria)->willReturn($expectedSubscriptions)->shouldBeCalledOnce();
+ $subscriptions = $engine->subscriptions($criteria);
+
+ self::assertEquals($expectedSubscriptions, $subscriptions);
+ }
+}