Skip to content

Commit

Permalink
Merge pull request #573 from patchlevel/catchup-subscription-engine
Browse files Browse the repository at this point in the history
add catch up subscription engine
  • Loading branch information
DavidBadura authored Apr 15, 2024
2 parents b573bcc + 296dd62 commit d0d6e4b
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 30 deletions.
15 changes: 3 additions & 12 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@
<code><![CDATA[$this->frozenDateTime->modify(sprintf('+%s seconds', $seconds))]]></code>
</PossiblyFalsePropertyAssignmentValue>
</file>
<file src="src/Console/Command/SubscriptionBootCommand.php">
<RedundantCondition>
<code><![CDATA[1]]></code>
</RedundantCondition>
<TypeDoesNotContainType>
<code><![CDATA[$finished]]></code>
<code><![CDATA[$finished]]></code>
</TypeDoesNotContainType>
</file>
<file src="src/Console/DoctrineHelper.php">
<ClassNotFinal>
<code><![CDATA[class DoctrineHelper]]></code>
Expand Down Expand Up @@ -151,14 +142,14 @@
<code><![CDATA[$subscriptionEngine]]></code>
</MissingConstructor>
</file>
<file src="tests/Integration/BankAccountSplitStream/Aggregate/BankAccount.php">
<file src="tests/Integration/BankAccountSplitStream/BankAccount.php">
<PropertyNotSetInConstructor>
<code><![CDATA[$balanceInCents]]></code>
<code><![CDATA[$id]]></code>
<code><![CDATA[$name]]></code>
</PropertyNotSetInConstructor>
</file>
<file src="tests/Integration/BasicImplementation/Aggregate/Profile.php">
<file src="tests/Integration/BasicImplementation/Profile.php">
<PropertyNotSetInConstructor>
<code><![CDATA[$id]]></code>
<code><![CDATA[$name]]></code>
Expand All @@ -181,7 +172,7 @@
<code><![CDATA[$name]]></code>
</PropertyNotSetInConstructor>
</file>
<file src="tests/Integration/Subscription/Aggregate/Profile.php">
<file src="tests/Integration/Subscription/Profile.php">
<PropertyNotSetInConstructor>
<code><![CDATA[$id]]></code>
<code><![CDATA[$name]]></code>
Expand Down
19 changes: 19 additions & 0 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,25 @@ $subscriptionEngine = new DefaultSubscriptionEngine(
$retryStrategy,
);
```
### Catchup Subscription Engine

If aggregates are used in the processors and new events are generated there,
then they are not part of the current subscription engine run and will only be processed during the next run or boot.
This is usually not a problem in dev or prod environment because a worker is used
and these events will be processed at some point. But in testing it is not so easy.
For this reason, we have the `CatchupSubscriptionEngine`.

```php
use Patchlevel\EventSourcing\Subscription\Engine\CatchupSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;

/** @var SubscriptionEngine $subscriptionStore */
$catchupSubscriptionEngine = new CatchupSubscriptionEngine($subscriptionEngine);
```
!!! tip

You can use the `CatchupSubscriptionEngine` in your tests to process the events immediately.

## Usage

The Subscription Engine has a few methods needed to use it effectively.
Expand Down
107 changes: 107 additions & 0 deletions src/Subscription/Engine/CatchUpSubscriptionEngine.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Subscription\Subscription;

use function array_merge;

use const PHP_INT_MAX;

final class CatchUpSubscriptionEngine implements SubscriptionEngine
{
public function __construct(
private readonly SubscriptionEngine $parent,
private readonly int|null $limit = null,
) {
}

public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result
{
return $this->parent->setup($criteria, $skipBooting);
}

public function boot(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult
{
$results = [];

$catchupLimit = $this->limit ?? PHP_INT_MAX;

for ($i = 0; $i < $catchupLimit; $i++) {
$lastResult = $this->parent->boot($criteria, $limit);

$results[] = $lastResult;

if ($lastResult->processedMessages === 0) {
break;
}
}

return $this->mergeResult(...$results);
}

public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult
{
$mergedResult = new ProcessedResult(0);

$catchupLimit = $this->limit ?? PHP_INT_MAX;

for ($i = 0; $i < $catchupLimit; $i++) {
$result = $this->parent->run($criteria, $limit);
$mergedResult = $this->mergeResult($mergedResult, $result);

if ($result->processedMessages === 0) {
break;
}
}

return $mergedResult;
}

public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->parent->teardown($criteria);
}

public function remove(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->parent->remove($criteria);
}

public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->parent->reactivate($criteria);
}

public function pause(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->parent->pause($criteria);
}

/** @return list<Subscription> */
public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array
{
return $this->parent->subscriptions($criteria);
}

private function mergeResult(ProcessedResult ...$results): ProcessedResult
{
$processedMessages = 0;
$streamFinished = false;
$errors = [];

foreach ($results as $result) {
$processedMessages += $result->processedMessages;
$streamFinished = $result->streamFinished;
$errors[] = $result->errors;
}

return new ProcessedResult(
$processedMessages,
$streamFinished,
array_merge(...$errors),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Aggregate;
namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream;

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\AccountId;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BalanceAdded;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BankAccountCreated;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\MonthPassed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Tests\DbalManager;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Aggregate\BankAccount;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BalanceAdded;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\BankAccountCreated;
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Events\MonthPassed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Tests\DbalManager;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Aggregate\Profile;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener\SendEmailListener;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\MessageDecorator\FooMessageDecorator;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Projection\ProfileProjector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Aggregate;
namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation;

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
Expand All @@ -11,7 +11,6 @@
use Patchlevel\EventSourcing\Attribute\Snapshot;
use Patchlevel\EventSourcing\Serializer\Normalizer\IdNormalizer;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\ProfileId;

#[Aggregate('profile')]
#[Snapshot('default', 100)]
Expand Down
12 changes: 12 additions & 0 deletions tests/Integration/Subscription/Events/AdminPromoted.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Integration\Subscription\Events;

use Patchlevel\EventSourcing\Attribute\Event;

#[Event('profile.admin_promoted')]
final class AdminPromoted
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Integration\Subscription\Aggregate;
namespace Patchlevel\EventSourcing\Tests\Integration\Subscription;

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Serializer\Normalizer\IdNormalizer;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\AdminPromoted;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\ProfileId;

#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
Expand All @@ -21,6 +21,8 @@ final class Profile extends BasicAggregateRoot
private ProfileId $id;
private string $name;

private bool $isAdmin = false;

public static function create(ProfileId $id, string $name): self
{
$self = new self();
Expand All @@ -34,11 +36,17 @@ public function changeName(string $name): void
$this->recordThat(new NameChanged($name));
}

public function promoteToAdmin(): void
{
$this->recordThat(new AdminPromoted());
}

#[Apply]
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
$this->isAdmin = false;
}

#[Apply]
Expand All @@ -47,8 +55,19 @@ protected function applyNameChanged(NameChanged $event): void
$this->name = $event->name;
}

#[Apply]
protected function applyAdminPromoted(AdminPromoted $event): void
{
$this->isAdmin = true;
}

public function name(): string
{
return $this->name;
}

public function isAdmin(): bool
{
return $this->isAdmin;
}
}
22 changes: 20 additions & 2 deletions tests/Integration/Subscription/Subscriber/ProfileProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
use Patchlevel\EventSourcing\Attribute\Processor;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Repository\RepositoryManager;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Aggregate\Profile;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Profile;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\ProfileId;

#[Processor('profile')]
final class ProfileProcessor
Expand All @@ -25,7 +27,23 @@ public function handleProfileCreated(ProfileCreated $profileCreated): void

$profile = $repository->load($profileCreated->profileId);

$profile->changeName('new name');
$profile->changeName('admin');

$repository->save($profile);
}

#[Subscribe(NameChanged::class)]
public function handleNameChanged(NameChanged $nameChanged, ProfileId $profileId): void
{
$repository = $this->repositoryManager->get(Profile::class);

$profile = $repository->load($profileId);

if ($profile->name() !== 'admin') {
return;
}

$profile->promoteToAdmin();

$repository->save($profile);
}
Expand Down
18 changes: 10 additions & 8 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
Expand All @@ -27,7 +28,6 @@
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Patchlevel\EventSourcing\Tests\DbalManager;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Aggregate\Profile;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerSubscriber;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor;
Expand Down Expand Up @@ -393,10 +393,12 @@ public function testProcessor(): void

$schemaDirector->create();

$engine = new DefaultSubscriptionEngine(
$store,
$subscriptionStore,
$subscriberAccessorRepository,
$engine = new CatchUpSubscriptionEngine(
new DefaultSubscriptionEngine(
$store,
$subscriptionStore,
$subscriberAccessorRepository,
),
);

self::assertEquals(
Expand Down Expand Up @@ -446,8 +448,8 @@ public function testProcessor(): void
/** @var list<Message> $messages */
$messages = iterator_to_array($store->load());

self::assertCount(2, $messages);
self::assertArrayHasKey(1, $messages);
self::assertCount(3, $messages);
self::assertArrayHasKey(2, $messages);

self::assertEquals(
new TraceHeader([
Expand All @@ -456,7 +458,7 @@ public function testProcessor(): void
'category' => 'event_sourcing/subscriber/processor',
],
]),
$messages[1]->header(TraceHeader::class),
$messages[2]->header(TraceHeader::class),
);
}

Expand Down
Loading

0 comments on commit d0d6e4b

Please sign in to comment.