From b268fcf1a0200976980672b14f375d219b7cd4ae Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 10 Dec 2024 16:19:49 +0100 Subject: [PATCH] dont lock by discoverNewSubscriptions --- .../Engine/DefaultSubscriptionEngine.php | 63 +++++++++---------- .../Engine/DefaultSubscriptionEngineTest.php | 36 ++++++++++- 2 files changed, 66 insertions(+), 33 deletions(-) diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 8e352dee..80183637 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -939,44 +939,43 @@ function (array $subscriptions): void { private function discoverNewSubscriptions(): void { - $this->subscriptionManager->findForUpdate( - new SubscriptionCriteria(), - function (array $subscriptions): void { - $latestIndex = null; + $subscriptions = $this->subscriptionManager->find(new SubscriptionCriteria()); - foreach ($this->subscriberRepository->all() as $subscriber) { - foreach ($subscriptions as $subscription) { - if ($subscription->id() === $subscriber->id()) { - continue 2; - } - } + $latestIndex = null; - $subscription = new Subscription( - $subscriber->id(), - $subscriber->group(), - $subscriber->runMode(), - ); + foreach ($this->subscriberRepository->all() as $subscriber) { + foreach ($subscriptions as $subscription) { + if ($subscription->id() === $subscriber->id()) { + continue 2; + } + } - if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) { - if ($latestIndex === null) { - $latestIndex = $this->latestIndex(); - } + $subscription = new Subscription( + $subscriber->id(), + $subscriber->group(), + $subscriber->runMode(), + ); - $subscription->changePosition($latestIndex); - $subscription->active(); - } + if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) { + if ($latestIndex === null) { + $latestIndex = $this->latestIndex(); + } + + $subscription->changePosition($latestIndex); + $subscription->active(); + } - $this->subscriptionManager->add($subscription); + $this->subscriptionManager->add($subscription); - $this->logger?->info( - sprintf( - 'Subscription Engine: New Subscriber "%s" was found and added to the subscription store.', - $subscriber->id(), - ), - ); - } - }, - ); + $this->logger?->info( + sprintf( + 'Subscription Engine: New Subscriber "%s" was found and added to the subscription store.', + $subscriber->id(), + ), + ); + } + + $this->subscriptionManager->flush(); } private function latestIndex(): int diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index bef49ed4..bba39766 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -3164,6 +3164,41 @@ class { $engine->{$method}(); } + public function testDontLockGetSubscriptions(): void + { + $subscriber = new #[Subscriber('id1', RunMode::FromNow)] + class { + }; + + $subscriptionStore = $this->prophesize(LockableSubscriptionStore::class); + $subscriptionStore->inLock(Argument::type(Closure::class))->will( + /** @param array{Closure} $args */ + static fn (array $args): mixed => $args[0](), + )->shouldNotBeCalled(); + $subscriptionStore->find(Argument::any())->willReturn([])->shouldBeCalled(); + + $subscriptionStore->find( + new SubscriptionCriteria(), + )->willReturn([ + new Subscription('id1'), + ])->shouldBeCalled(); + + $subscriptionStore->remove(Argument::type(Subscription::class)); + $subscriptionStore->add(Argument::type(Subscription::class)); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([])); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore->reveal(), + new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), + ); + + $engine->subscriptions(); + } + public function testFromNowWithoutSetupDirectActive(): void { $subscriptionId = 'test'; @@ -3208,7 +3243,6 @@ public static function methodProvider(): Generator yield 'teardown' => ['teardown']; yield 'remove' => ['remove']; yield 'reactivate' => ['reactivate']; - yield 'subscriptions' => ['subscriptions']; } private function criteria(int $fromIndex = 0): Criteria