Skip to content

Commit

Permalink
add early out in subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 5, 2024
1 parent 13b647a commit 87e3ce3
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 60 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ cs: vendor

.PHONY: phpstan
phpstan: vendor ## run phpstan static code analyser
vendor/bin/phpstan analyse
php -d memory_limit=312M vendor/bin/phpstan analyse

.PHONY: phpstan-baseline
phpstan-baseline: vendor ## run phpstan static code analyser
vendor/bin/phpstan analyse --generate-baseline
php -d memory_limit=312M vendor/bin/phpstan analyse --generate-baseline

.PHONY: psalm
psalm: vendor ## run psalm static code analyser
Expand Down
7 changes: 7 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@
<code><![CDATA[$update1]]></code>
</PossiblyUndefinedArrayOffset>
</file>
<file src="tests/Unit/Subscription/Engine/SubscriptionManagerTest.php">
<InvalidArgument>
<code><![CDATA[$result]]></code>
<code><![CDATA[$subscriptions]]></code>
<code><![CDATA[$subscriptions]]></code>
</InvalidArgument>
</file>
<file src="tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php">
<DeprecatedMethod>
<code><![CDATA[group]]></code>
Expand Down
92 changes: 40 additions & 52 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk
groups: $criteria->groups,
status: [Status::New],
),
function (array $subscriptions) use ($skipBooting): Result {
function (SubscriptionCollection $subscriptions) use ($skipBooting): Result {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.');

Expand Down Expand Up @@ -171,14 +171,14 @@ public function boot(
groups: $criteria->groups,
status: [Status::Booting],
),
function ($subscriptions) use ($limit): ProcessedResult {
function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.');

return new ProcessedResult(0, true);
}

$startIndex = $this->lowestSubscriptionPosition($subscriptions);
$startIndex = $subscriptions->lowestPosition();

$this->logger?->debug(
sprintf(
Expand All @@ -198,17 +198,15 @@ function ($subscriptions) use ($limit): ProcessedResult {
);

foreach ($stream as $message) {
$messageCounter++;

$index = $stream->index();

if ($index === null) {
throw new UnexpectedError('Stream index is null, this should not happen.');
}

foreach ($subscriptions as $subscription) {
if (!$subscription->isBooting()) {
continue;
}

if ($subscription->position() >= $index) {
$this->logger?->debug(
sprintf(
Expand All @@ -229,9 +227,17 @@ function ($subscriptions) use ($limit): ProcessedResult {
}

$errors[] = $error;
}

$messageCounter++;
$subscriptions->remove($subscription);

if (count($subscriptions) === 0) {

Check warning on line 233 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Identical": --- Original +++ New @@ @@ } $errors[] = $error; $subscriptions->remove($subscription); - if (count($subscriptions) === 0) { + if (count($subscriptions) !== 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); break 2; }
$this->logger?->info(
'Subscription Engine: No subscriptions in booting status, finish booting.',
);

break 2;

Check warning on line 238 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + break 1; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position for booting: %s', $index));

Check warning on line 238 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Break_": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + continue; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position for booting: %s', $index));
}
}

$this->logger?->debug(
sprintf(
Expand Down Expand Up @@ -265,6 +271,8 @@ function ($subscriptions) use ($limit): ProcessedResult {

if ($error) {
$errors[] = $error;

$subscriptions->remove($subscription);
}

$this->subscriptionManager->update($subscription);
Expand All @@ -275,10 +283,6 @@ function ($subscriptions) use ($limit): ProcessedResult {
$this->logger?->debug('Subscription Engine: End of stream for booting has been reached.');

foreach ($subscriptions as $subscription) {
if (!$subscription->isBooting()) {
continue;
}

if ($subscription->runMode() === RunMode::Once) {
$subscription->finished();
$this->subscriptionManager->update($subscription);
Expand Down Expand Up @@ -340,14 +344,14 @@ public function run(
groups: $criteria->groups,
status: [Status::Active],
),
function (array $subscriptions) use ($limit): ProcessedResult {
function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.');

return new ProcessedResult(0, true);
}

$startIndex = $this->lowestSubscriptionPosition($subscriptions);
$startIndex = $subscriptions->lowestPosition();

$this->logger?->debug(
sprintf(
Expand All @@ -366,17 +370,15 @@ function (array $subscriptions) use ($limit): ProcessedResult {
$stream = $this->messageStore->load($criteria);

foreach ($stream as $message) {
$messageCounter++;

$index = $stream->index();

if ($index === null) {
throw new UnexpectedError('Stream index is null, this should not happen.');
}

foreach ($subscriptions as $subscription) {
if (!$subscription->isActive()) {
continue;
}

if ($subscription->position() >= $index) {
$this->logger?->debug(
sprintf(
Expand All @@ -397,9 +399,17 @@ function (array $subscriptions) use ($limit): ProcessedResult {
}

$errors[] = $error;
}

$messageCounter++;
$subscriptions->remove($subscription);

Check warning on line 403 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ continue; } $errors[] = $error; - $subscriptions->remove($subscription); + if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); break 2;

if (count($subscriptions) === 0) {

Check warning on line 405 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Identical": --- Original +++ New @@ @@ } $errors[] = $error; $subscriptions->remove($subscription); - if (count($subscriptions) === 0) { + if (count($subscriptions) !== 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); break 2; }
$this->logger?->info(
'Subscription Engine: No subscriptions in booting status, finish booting.',
);

break 2;

Check warning on line 410 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + break 1; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $index));

Check warning on line 410 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Break_": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + continue; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $index));
}
}

$this->logger?->debug(sprintf(
'Subscription Engine: Current event stream position: %s',
Expand Down Expand Up @@ -427,6 +437,8 @@ function (array $subscriptions) use ($limit): ProcessedResult {

if ($error) {
$errors[] = $error;

$subscriptions->remove($subscription);

Check warning on line 441 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ $error = $this->ensureCommitBatch($subscription, $endIndex); if ($error) { $errors[] = $error; - $subscriptions->remove($subscription); + } $this->subscriptionManager->update($subscription); }
}

$this->subscriptionManager->update($subscription);
Expand All @@ -435,10 +447,6 @@ function (array $subscriptions) use ($limit): ProcessedResult {
}

foreach ($subscriptions as $subscription) {
if (!$subscription->isActive()) {
continue;
}

if ($subscription->runMode() !== RunMode::Once) {
continue;
}
Expand Down Expand Up @@ -481,7 +489,7 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): Resu
groups: $criteria->groups,
status: [Status::Detached],
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
/** @var list<Error> $errors */
$errors = [];

Expand Down Expand Up @@ -570,7 +578,7 @@ public function remove(SubscriptionEngineCriteria|null $criteria = null): Result
ids: $criteria->ids,
groups: $criteria->groups,
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
/** @var list<Error> $errors */
$errors = [];

Expand Down Expand Up @@ -662,7 +670,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Re
Status::Finished,
],
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());

Expand Down Expand Up @@ -725,7 +733,7 @@ public function pause(SubscriptionEngineCriteria|null $criteria = null): Result
Status::Error,
],
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
/** @var Subscription $subscription */
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
Expand Down Expand Up @@ -868,7 +876,7 @@ private function markDetachedSubscriptions(SubscriptionEngineCriteria $criteria)
groups: $criteria->groups,
status: [Status::Active, Status::Paused, Status::Finished],
),
function (array $subscriptions): void {
function (SubscriptionCollection $subscriptions): void {
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());

Expand Down Expand Up @@ -898,7 +906,7 @@ private function retrySubscriptions(SubscriptionEngineCriteria $criteria): void
groups: $criteria->groups,
status: [Status::Error],
),
function (array $subscriptions): void {
function (SubscriptionCollection $subscriptions): void {
/** @var Subscription $subscription */
foreach ($subscriptions as $subscription) {
$error = $subscription->subscriptionError();
Expand Down Expand Up @@ -941,7 +949,7 @@ private function discoverNewSubscriptions(): void
{
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(),
function (array $subscriptions): void {
function (SubscriptionCollection $subscriptions): void {
$latestIndex = null;

foreach ($this->subscriberRepository->all() as $subscriber) {
Expand Down Expand Up @@ -986,26 +994,6 @@ private function latestIndex(): int
return $stream->index() ?: 0;
}

/** @param list<Subscription> $subscriptions */
private function lowestSubscriptionPosition(array $subscriptions): int
{
$min = null;

foreach ($subscriptions as $subscription) {
if ($min !== null && $subscription->position() >= $min) {
continue;
}

$min = $subscription->position();
}

if ($min === null) {
return 0;
}

return $min;
}

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
Expand Down
67 changes: 67 additions & 0 deletions src/Subscription/Engine/SubscriptionCollection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Countable;
use IteratorAggregate;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Traversable;

use function array_filter;
use function array_values;
use function count;

/**
* @interal
* @implements IteratorAggregate<Subscription>
*/
final class SubscriptionCollection implements IteratorAggregate, Countable
{
/** @param list<Subscription> $subscriptions */
public function __construct(
private array $subscriptions = [],
) {
}

/** @return Traversable<Subscription> */
public function getIterator(): Traversable
{
yield from $this->subscriptions;
}

public function remove(Subscription $subscription): void
{
$this->subscriptions = array_values(
array_filter(
$this->subscriptions,
static fn (Subscription $s) => $s !== $subscription,
),
);
}

public function count(): int
{
return count($this->subscriptions);
}

public function lowestPosition(): int
{
$min = null;

foreach ($this->subscriptions as $subscription) {
if ($min !== null && $subscription->position() >= $min) {
continue;
}

$min = $subscription->position();
}

if ($min === null) {
return 0;
}

return $min;
}
}
14 changes: 11 additions & 3 deletions src/Subscription/Engine/SubscriptionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public function __construct(
}

/**
* @param Closure(list<Subscription>):T $closure
* @param Closure(SubscriptionCollection):T $closure
*
* @return T
*
Expand All @@ -42,7 +42,11 @@ public function findForUpdate(SubscriptionCriteria $criteria, Closure $closure):
{
if (!$this->subscriptionStore instanceof LockableSubscriptionStore) {
try {
return $closure($this->subscriptionStore->find($criteria));
return $closure(
new SubscriptionCollection(
$this->subscriptionStore->find($criteria),
),
);
} finally {
$this->flush();
}
Expand All @@ -52,7 +56,11 @@ public function findForUpdate(SubscriptionCriteria $criteria, Closure $closure):
/** @return T */
function () use ($closure, $criteria): mixed {
try {
return $closure($this->subscriptionStore->find($criteria));
return $closure(
new SubscriptionCollection(
$this->subscriptionStore->find($criteria),
),
);
} finally {
$this->flush();
}
Expand Down
Loading

0 comments on commit 87e3ce3

Please sign in to comment.