From 1a995c2069b326cef9f5a63295add80314a289d9 Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 13 Dec 2024 11:02:29 +0100 Subject: [PATCH] split stream header --- src/Console/OutputStyle.php | 49 ++++-- .../AggregateToStreamHeaderTranslator.php | 12 +- .../RecalculatePlayheadTranslator.php | 44 ++---- .../Translator/UntilEventTranslator.php | 25 +-- .../Message/MessageHeaderRegistry.php | 8 +- src/Repository/DefaultRepository.php | 48 +++--- src/Store/Header/PlayheadHeader.php | 18 +++ src/Store/Header/RecordedOnHeader.php | 19 +++ src/Store/Header/StreamNameHeader.php | 17 ++ src/Store/InMemoryStore.php | 11 +- src/Store/StreamDoctrineDbalStore.php | 30 +++- src/Store/StreamDoctrineDbalStoreStream.php | 14 +- src/Store/StreamHeader.php | 22 --- .../AggregateIdArgumentResolver.php | 4 +- .../Store/StreamDoctrineDbalStoreTest.php | 147 +++++++----------- .../AggregateToStreamHeaderTranslatorTest.php | 13 +- .../RecalculatePlayheadTranslatorTest.php | 2 +- tests/Unit/Store/InMemoryStoreTest.php | 44 +++--- .../Store/StreamDoctrineDbalStoreTest.php | 113 ++++++-------- .../Store/StreamDoctrineDbalStreamTest.php | 28 +++- 20 files changed, 351 insertions(+), 317 deletions(-) create mode 100644 src/Store/Header/PlayheadHeader.php create mode 100644 src/Store/Header/RecordedOnHeader.php create mode 100644 src/Store/Header/StreamNameHeader.php delete mode 100644 src/Store/StreamHeader.php diff --git a/src/Console/OutputStyle.php b/src/Console/OutputStyle.php index 20b44dbee..93c2a9582 100644 --- a/src/Console/OutputStyle.php +++ b/src/Console/OutputStyle.php @@ -5,13 +5,14 @@ namespace Patchlevel\EventSourcing\Console; use Patchlevel\EventSourcing\Aggregate\AggregateHeader; -use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; use Patchlevel\EventSourcing\Serializer\Encoder\Encoder; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\ArchivedHeader; -use Patchlevel\EventSourcing\Store\StreamHeader; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\StreamStartHeader; use Symfony\Component\Console\Style\SymfonyStyle; use Throwable; @@ -49,13 +50,38 @@ public function message( $customHeaders = array_filter( $message->headers(), - static fn ($header) => !$header instanceof StreamHeader + static fn ($header) => !$header instanceof StreamNameHeader + && !$header instanceof PlayheadHeader + && !$header instanceof RecordedOnHeader && !$header instanceof AggregateHeader && !$header instanceof ArchivedHeader && !$header instanceof StreamStartHeader, ); - $metaHeader = $this->metaHeader($message); + $streamName = null; + $playhead = null; + $recordedOn = null; + + if ($message->hasHeader(AggregateHeader::class)) { + $header = $message->header(AggregateHeader::class); + + $streamName = $header->streamName(); + $playhead = $header->playhead; + $recordedOn = $header->recordedOn; + } + + if ($message->hasHeader(StreamNameHeader::class)) { + $streamName = $message->header(StreamNameHeader::class)->streamName; + } + + if ($message->hasHeader(PlayheadHeader::class)) { + $playhead = $message->header(PlayheadHeader::class)->playhead; + } + + if ($message->hasHeader(RecordedOnHeader::class)) { + $recordedOn = $message->header(RecordedOnHeader::class)->recordedOn; + } + $streamStart = $message->hasHeader(StreamStartHeader::class); $achieved = $message->hasHeader(ArchivedHeader::class); @@ -70,9 +96,9 @@ public function message( ], [ [ - $metaHeader instanceof AggregateHeader ? $metaHeader->streamName() : $metaHeader->streamName, - $metaHeader->playhead, - $metaHeader->recordedOn?->format('Y-m-d H:i:s'), + $streamName, + $playhead, + $recordedOn?->format('Y-m-d H:i:s'), $streamStart ? 'yes' : 'no', $achieved ? 'yes' : 'no', ], @@ -98,13 +124,4 @@ public function throwable(Throwable $error): void $error = $error->getPrevious(); } while ($error !== null); } - - private function metaHeader(Message $message): AggregateHeader|StreamHeader - { - try { - return $message->header(AggregateHeader::class); - } catch (HeaderNotFound) { - return $message->header(StreamHeader::class); - } - } } diff --git a/src/Message/Translator/AggregateToStreamHeaderTranslator.php b/src/Message/Translator/AggregateToStreamHeaderTranslator.php index c8dce10f6..80bef3427 100644 --- a/src/Message/Translator/AggregateToStreamHeaderTranslator.php +++ b/src/Message/Translator/AggregateToStreamHeaderTranslator.php @@ -6,7 +6,9 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Store\StreamHeader; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; /** @experimental */ final class AggregateToStreamHeaderTranslator implements Translator @@ -23,11 +25,9 @@ public function __invoke(Message $message): array return [ $message ->removeHeader(AggregateHeader::class) - ->withHeader(new StreamHeader( - $aggregateHeader->streamName(), - $aggregateHeader->playhead, - $aggregateHeader->recordedOn, - )), + ->withHeader(new StreamNameHeader($aggregateHeader->streamName())) + ->withHeader(new PlayheadHeader($aggregateHeader->playhead)) + ->withHeader(new RecordedOnHeader($aggregateHeader->recordedOn)), ]; } } diff --git a/src/Message/Translator/RecalculatePlayheadTranslator.php b/src/Message/Translator/RecalculatePlayheadTranslator.php index 27185baaa..28439d288 100644 --- a/src/Message/Translator/RecalculatePlayheadTranslator.php +++ b/src/Message/Translator/RecalculatePlayheadTranslator.php @@ -5,9 +5,9 @@ namespace Patchlevel\EventSourcing\Message\Translator; use Patchlevel\EventSourcing\Aggregate\AggregateHeader; -use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Store\StreamHeader; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use function array_key_exists; @@ -19,42 +19,30 @@ final class RecalculatePlayheadTranslator implements Translator /** @return list */ public function __invoke(Message $message): array { - try { - $header = $message->header(AggregateHeader::class); - } catch (HeaderNotFound) { - try { - $header = $message->header(StreamHeader::class); - } catch (HeaderNotFound) { - return [$message]; - } - } + if ($message->hasHeader(StreamNameHeader::class) && $message->hasHeader(PlayheadHeader::class)) { + $streamName = $message->header(StreamNameHeader::class)->streamName; - $stream = $header instanceof StreamHeader ? $header->streamName : $header->streamName(); + $playhead = $this->nextPlayhead($streamName); - $playhead = $this->nextPlayhead($stream); - - if ($header->playhead === $playhead) { - return [$message]; + return [ + $message->withHeader(new PlayheadHeader($playhead)), + ]; } - if ($header instanceof StreamHeader) { + if ($message->hasHeader(AggregateHeader::class)) { + $header = $message->header(AggregateHeader::class); + return [ - $message->withHeader(new StreamHeader( - $header->streamName, - $playhead, + $message->withHeader(new AggregateHeader( + $header->aggregateName, + $header->aggregateId, + $this->nextPlayhead($header->streamName()), $header->recordedOn, )), ]; } - return [ - $message->withHeader(new AggregateHeader( - $header->aggregateName, - $header->aggregateId, - $playhead, - $header->recordedOn, - )), - ]; + return [$message]; } public function reset(): void diff --git a/src/Message/Translator/UntilEventTranslator.php b/src/Message/Translator/UntilEventTranslator.php index 3cdf739ad..5b7fae9e1 100644 --- a/src/Message/Translator/UntilEventTranslator.php +++ b/src/Message/Translator/UntilEventTranslator.php @@ -6,9 +6,8 @@ use DateTimeImmutable; use Patchlevel\EventSourcing\Aggregate\AggregateHeader; -use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Store\StreamHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; final class UntilEventTranslator implements Translator { @@ -20,22 +19,26 @@ public function __construct( /** @return list */ public function __invoke(Message $message): array { - try { + if ($message->hasHeader(AggregateHeader::class)) { $header = $message->header(AggregateHeader::class); - } catch (HeaderNotFound) { - try { - $header = $message->header(StreamHeader::class); - } catch (HeaderNotFound) { + + if ($header->recordedOn < $this->until) { return [$message]; } + + return []; } - $recordedOn = $header->recordedOn; + if ($message->hasHeader(RecordedOnHeader::class)) { + $header = $message->header(RecordedOnHeader::class); + + if ($header->recordedOn < $this->until) { + return [$message]; + } - if ($recordedOn < $this->until) { - return [$message]; + return []; } - return []; + return [$message]; } } diff --git a/src/Metadata/Message/MessageHeaderRegistry.php b/src/Metadata/Message/MessageHeaderRegistry.php index 55ed11306..4045fc984 100644 --- a/src/Metadata/Message/MessageHeaderRegistry.php +++ b/src/Metadata/Message/MessageHeaderRegistry.php @@ -6,7 +6,9 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Store\ArchivedHeader; -use Patchlevel\EventSourcing\Store\StreamHeader; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\StreamStartHeader; use function array_flip; @@ -73,7 +75,9 @@ public function headerNames(): array public static function createWithInternalHeaders(array $headerNameToClassMap = []): self { $internalHeaders = [ - 'stream' => StreamHeader::class, + 'streamName' => StreamNameHeader::class, + 'playhead' => PlayheadHeader::class, + 'recordedOn' => RecordedOnHeader::class, 'aggregate' => AggregateHeader::class, 'archived' => ArchivedHeader::class, 'newStreamStart' => StreamStartHeader::class, diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index 22b77ac5d..c5e1808e5 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -17,9 +17,11 @@ use Patchlevel\EventSourcing\Snapshot\SnapshotStore; use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid; use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Store\Stream; -use Patchlevel\EventSourcing\Store\StreamHeader; use Patchlevel\EventSourcing\Store\StreamStore; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Psr\Clock\ClockInterface; @@ -142,11 +144,7 @@ public function load(AggregateRootId $id): AggregateRoot } if ($this->useStreamHeader) { - $playhead = $firstMessage->header(StreamHeader::class)->playhead; - - if ($playhead === null) { - throw new AggregateNotFound($this->metadata->className, $id); - } + $playhead = $firstMessage->header(PlayheadHeader::class)->playhead; } else { $playhead = $firstMessage->header(AggregateHeader::class)->playhead; } @@ -245,27 +243,35 @@ public function save(AggregateRoot $aggregate): void $aggregateName = $this->metadata->name; - $useStreamHeader = $this->useStreamHeader; + $streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($aggregateName, $aggregateId) : null; $messages = array_map( - static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $useStreamHeader) { - if ($useStreamHeader) { - $header = new StreamHeader( - StreamNameTranslator::streamName($aggregateName, $aggregateId), - ++$playhead, - $clock->now(), - ); + static function (object $event) use ( + $aggregateName, + $aggregateId, + &$playhead, + $messageDecorator, + $clock, + $streamName, + ) { + $message = Message::create($event); + + if ($streamName !== null) { + $message = $message + ->withHeader(new StreamNameHeader($streamName)) + ->withHeader(new PlayheadHeader(++$playhead)) + ->withHeader(new RecordedOnHeader($clock->now())); } else { - $header = new AggregateHeader( - $aggregateName, - $aggregateId, - ++$playhead, - $clock->now(), + $message = $message->withHeader( + new AggregateHeader( + $aggregateName, + $aggregateId, + ++$playhead, + $clock->now(), + ), ); } - $message = Message::create($event)->withHeader($header); - if ($messageDecorator) { return $messageDecorator($message); } diff --git a/src/Store/Header/PlayheadHeader.php b/src/Store/Header/PlayheadHeader.php new file mode 100644 index 000000000..878c8f689 --- /dev/null +++ b/src/Store/Header/PlayheadHeader.php @@ -0,0 +1,18 @@ +header(AggregateHeader::class)->streamName(); } catch (HeaderNotFound) { try { - return $message->header(StreamHeader::class)->streamName; + return $message->header(StreamNameHeader::class)->streamName; } catch (HeaderNotFound) { return null; } @@ -117,7 +118,7 @@ static function (Message $message) use ($streamName): bool { return $message->header(AggregateHeader::class)->streamName() !== $streamName; } catch (HeaderNotFound) { try { - return $message->header(StreamHeader::class)->streamName !== $streamName; + return $message->header(StreamNameHeader::class)->streamName !== $streamName; } catch (HeaderNotFound) { return true; } @@ -168,7 +169,7 @@ static function (Message $message, int $index) use ($criteria): bool { $messageStreamName = $message->header(AggregateHeader::class)->streamName(); } catch (HeaderNotFound) { try { - $messageStreamName = $message->header(StreamHeader::class)->streamName; + $messageStreamName = $message->header(StreamNameHeader::class)->streamName; } catch (HeaderNotFound) { return false; } @@ -194,7 +195,7 @@ static function (Message $message, int $index) use ($criteria): bool { $playhead = $message->header(AggregateHeader::class)->playhead; } catch (HeaderNotFound) { try { - $playhead = $message->header(StreamHeader::class)->playhead; + $playhead = $message->header(PlayheadHeader::class)->playhead; } catch (HeaderNotFound) { return false; } diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index 04b0ff704..57ec0bbac 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -29,6 +29,9 @@ use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use PDO; use Psr\Clock\ClockInterface; @@ -223,23 +226,34 @@ function () use ($messages): void { $data = $this->eventSerializer->serialize($message->event()); try { - $streamHeader = $message->header(StreamHeader::class); + $streamName = $message->header(StreamNameHeader::class)->streamName; + $parameters[] = $streamName; } catch (HeaderNotFound $e) { throw new MissingDataForStorage($e->name, $e); } - $parameters[] = $streamHeader->streamName; - $parameters[] = $streamHeader->playhead; + $playhead = null; + + if ($message->hasHeader(PlayheadHeader::class)) { + $playhead = $message->header(PlayheadHeader::class)->playhead; + } + + $parameters[] = $playhead; $parameters[] = $data->name; $parameters[] = $data->payload; - $parameters[] = $streamHeader->recordedOn ?: $this->clock->now(); + if ($message->hasHeader(RecordedOnHeader::class)) { + $parameters[] = $message->header(RecordedOnHeader::class)->recordedOn; + } else { + $parameters[] = $this->clock->now(); + } + $types[$offset + 4] = $dateTimeType; $streamStart = $message->hasHeader(StreamStartHeader::class); - if ($streamStart) { - $achievedUntilPlayhead[$streamHeader->streamName] = $streamHeader->playhead; + if ($streamStart && $playhead) { + $achievedUntilPlayhead[$streamName] = $playhead; } $parameters[] = $streamStart; @@ -377,7 +391,9 @@ public function configureSchema(Schema $schema, Connection $connection): void private function getCustomHeaders(Message $message): array { $filteredHeaders = [ - StreamHeader::class, + StreamNameHeader::class, + PlayheadHeader::class, + RecordedOnHeader::class, StreamStartHeader::class, ArchivedHeader::class, ]; diff --git a/src/Store/StreamDoctrineDbalStoreStream.php b/src/Store/StreamDoctrineDbalStoreStream.php index caf8514ad..d7d843dc0 100644 --- a/src/Store/StreamDoctrineDbalStoreStream.php +++ b/src/Store/StreamDoctrineDbalStoreStream.php @@ -15,6 +15,9 @@ use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Serializer\SerializedEvent; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Traversable; /** @@ -129,11 +132,12 @@ private function buildGenerator( $event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload'])); $message = Message::create($event) - ->withHeader(new StreamHeader( - $data['stream'], - $data['playhead'] === null ? null : (int)$data['playhead'], - $dateTimeType->convertToPHPValue($data['recorded_on'], $platform), - )); + ->withHeader(new StreamNameHeader($data['stream'])) + ->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform))); + + if ($data['playhead'] !== null) { + $message = $message->withHeader(new PlayheadHeader((int)$data['playhead'])); + } if ($data['archived']) { $message = $message->withHeader(new ArchivedHeader()); diff --git a/src/Store/StreamHeader.php b/src/Store/StreamHeader.php deleted file mode 100644 index 24c7ece07..000000000 --- a/src/Store/StreamHeader.php +++ /dev/null @@ -1,22 +0,0 @@ -header(StreamHeader::class)->streamName; + $stream = $message->header(StreamNameHeader::class)->streamName; $aggregateId = StreamNameTranslator::aggregateId($stream); return $class::fromString($aggregateId); diff --git a/tests/Integration/Store/StreamDoctrineDbalStoreTest.php b/tests/Integration/Store/StreamDoctrineDbalStoreTest.php index 9812dbb1d..9ed8f5781 100644 --- a/tests/Integration/Store/StreamDoctrineDbalStoreTest.php +++ b/tests/Integration/Store/StreamDoctrineDbalStoreTest.php @@ -12,15 +12,16 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; -use Patchlevel\EventSourcing\Store\StreamHeader; use Patchlevel\EventSourcing\Store\StreamStore; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Patchlevel\EventSourcing\Tests\DbalManager; use Patchlevel\EventSourcing\Tests\Integration\Store\Events\ExternEvent; use Patchlevel\EventSourcing\Tests\Integration\Store\Events\ProfileCreated; use PHPUnit\Framework\TestCase; - use function iterator_to_array; use function json_decode; use function sprintf; @@ -62,17 +63,13 @@ public function testSave(): void $messages = [ Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 2, - new DateTimeImmutable('2020-01-02 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))), ]; $this->store->save(...$messages); @@ -105,13 +102,13 @@ public function testSave(): void ); } - public function testSaveWithNullableValues(): void + public function testSaveWithOnlyStreamName(): void { $messages = [ Message::create(new ExternEvent('test 1')) - ->withHeader(new StreamHeader('extern')), + ->withHeader(new StreamNameHeader('extern')), Message::create(new ExternEvent('test 2')) - ->withHeader(new StreamHeader('extern')), + ->withHeader(new StreamNameHeader('extern')), ]; $this->store->save(...$messages); @@ -150,17 +147,13 @@ public function testSaveWithTransactional(): void $messages = [ Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 2, - new DateTimeImmutable('2020-01-02 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))), ]; $this->store->transactional(function () use ($messages): void { @@ -203,17 +196,13 @@ public function testUniqueConstraint(): void $messages = [ Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 1, - new DateTimeImmutable('2020-01-02 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), ]; $this->store->save(...$messages); @@ -227,11 +216,9 @@ public function testSave10000Messages(): void for ($i = 1; $i <= 10000; $i++) { $messages[] = Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - $i, - new DateTimeImmutable('2020-01-01 00:00:00'), - )); + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader($i)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))); } $this->store->save(...$messages); @@ -247,11 +234,9 @@ public function testLoad(): void $profileId = ProfileId::generate(); $message = Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )); + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))); $this->store->save($message); @@ -269,16 +254,16 @@ public function testLoad(): void self::assertNotSame($message, $loadedMessage); self::assertEquals($message->event(), $loadedMessage->event()); self::assertEquals( - $message->header(StreamHeader::class)->streamName, - $loadedMessage->header(StreamHeader::class)->streamName, + $message->header(StreamNameHeader::class)->streamName, + $loadedMessage->header(StreamNameHeader::class)->streamName, ); self::assertEquals( - $message->header(StreamHeader::class)->playhead, - $loadedMessage->header(StreamHeader::class)->playhead, + $message->header(PlayheadHeader::class)->playhead, + $loadedMessage->header(PlayheadHeader::class)->playhead, ); self::assertEquals( - $message->header(StreamHeader::class)->recordedOn, - $loadedMessage->header(StreamHeader::class)->recordedOn, + $message->header(RecordedOnHeader::class)->recordedOn, + $loadedMessage->header(RecordedOnHeader::class)->recordedOn, ); } finally { $stream?->close(); @@ -292,21 +277,15 @@ public function testLoadWithWildcard(): void $messages = [ Message::create(new ProfileCreated($profileId1, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId1->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId1->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), Message::create(new ProfileCreated($profileId2, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId2->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId2->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), Message::create(new ExternEvent('test message')) - ->withHeader(new StreamHeader( - 'foo', - )), + ->withHeader(new StreamNameHeader('foo')), ]; $this->store->save(...$messages); @@ -330,19 +309,15 @@ public function testStreams(): void $messages = [ Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 2, - new DateTimeImmutable('2020-01-02 00:00:00'), - )), - Message::create(new ExternEvent('foo bar')) - ->withHeader(new StreamHeader('foo')), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ExternEvent('test message')) + ->withHeader(new StreamNameHeader('foo')), ]; $this->store->save(...$messages); @@ -361,19 +336,15 @@ public function testRemote(): void $messages = [ Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 1, - new DateTimeImmutable('2020-01-01 00:00:00'), - )), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), Message::create(new ProfileCreated($profileId, 'test')) - ->withHeader(new StreamHeader( - sprintf('profile-%s', $profileId->toString()), - 2, - new DateTimeImmutable('2020-01-02 00:00:00'), - )), - Message::create(new ExternEvent('foo bar')) - ->withHeader(new StreamHeader('foo')), + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ExternEvent('test message')) + ->withHeader(new StreamNameHeader('foo')), ]; $this->store->save(...$messages); diff --git a/tests/Unit/Message/Translator/AggregateToStreamHeaderTranslatorTest.php b/tests/Unit/Message/Translator/AggregateToStreamHeaderTranslatorTest.php index 009e096e6..52727eacb 100644 --- a/tests/Unit/Message/Translator/AggregateToStreamHeaderTranslatorTest.php +++ b/tests/Unit/Message/Translator/AggregateToStreamHeaderTranslatorTest.php @@ -8,7 +8,9 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Translator\AggregateToStreamHeaderTranslator; -use Patchlevel\EventSourcing\Store\StreamHeader; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; @@ -61,12 +63,9 @@ public function testMigrateHeader(): void $message = $result[0]; self::assertFalse($message->hasHeader(AggregateHeader::class)); - self::assertTrue($message->hasHeader(StreamHeader::class)); - $streamHeader = $message->header(StreamHeader::class); - - self::assertEquals($aggregateHeader->recordedOn, $streamHeader->recordedOn); - self::assertEquals('profile-1', $streamHeader->streamName); - self::assertEquals(1, $streamHeader->playhead); + self::assertEquals($aggregateHeader->recordedOn, $message->header(RecordedOnHeader::class)->recordedOn); + self::assertEquals('profile-1', $message->header(StreamNameHeader::class)->streamName); + self::assertEquals(1, $message->header(PlayheadHeader::class)->playhead); } } diff --git a/tests/Unit/Message/Translator/RecalculatePlayheadTranslatorTest.php b/tests/Unit/Message/Translator/RecalculatePlayheadTranslatorTest.php index a4ea92590..90b40d2fd 100644 --- a/tests/Unit/Message/Translator/RecalculatePlayheadTranslatorTest.php +++ b/tests/Unit/Message/Translator/RecalculatePlayheadTranslatorTest.php @@ -49,7 +49,7 @@ public function testRecalculatePlayheadWithSamePlayhead(): void $result = $translator($message); - self::assertSame([$message], $result); + self::assertEquals([$message], $result); } public function testRecalculateMultipleMessages(): void diff --git a/tests/Unit/Store/InMemoryStoreTest.php b/tests/Unit/Store/InMemoryStoreTest.php index 8d4e059e8..03e03bd7a 100644 --- a/tests/Unit/Store/InMemoryStoreTest.php +++ b/tests/Unit/Store/InMemoryStoreTest.php @@ -15,15 +15,15 @@ use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\InMemoryStore; -use Patchlevel\EventSourcing\Store\StreamHeader; use Patchlevel\EventSourcing\Store\UnsupportedCriterion; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\TestCase; use Prophecy\PhpUnit\ProphecyTrait; use stdClass; - use function iterator_to_array; /** @covers \Patchlevel\EventSourcing\Store\InMemoryStore */ @@ -92,9 +92,9 @@ public function testLoadByAggregateName(): void public function testLoadByStreamName(): void { $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) - ->withHeader(new StreamHeader('foo')); + ->withHeader(new StreamNameHeader('foo')); $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) - ->withHeader(new StreamHeader('bar')); + ->withHeader(new StreamNameHeader('bar')); $message3 = new Message(new ProfileVisited(ProfileId::fromString('3'))); $store = new InMemoryStore([$message1, $message2, $message3]); @@ -109,11 +109,11 @@ public function testLoadByStreamName(): void public function testLoadByStreamNameWithLike(): void { $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) - ->withHeader(new StreamHeader('foo-3')); + ->withHeader(new StreamNameHeader('foo-3')); $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) - ->withHeader(new StreamHeader('bar-1')); + ->withHeader(new StreamNameHeader('bar-1')); $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) - ->withHeader(new StreamHeader('bar-2')); + ->withHeader(new StreamNameHeader('bar-2')); $store = new InMemoryStore([$message1, $message2, $message3]); @@ -131,7 +131,8 @@ public function testLoadFromPlayhead(): void $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) ->withHeader(new AggregateHeader('foo', '1', 2, new DateTimeImmutable())); $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) - ->withHeader(new StreamHeader('foo-1', 3, new DateTimeImmutable())); + ->withHeader(new StreamNameHeader('foo-1')) + ->withHeader(new PlayheadHeader(3)); $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); $store = new InMemoryStore([$message1, $message2, $message3, $message4]); @@ -150,7 +151,8 @@ public function testLoadFromIndex(): void $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) ->withHeader(new AggregateHeader('foo', '1', 2, new DateTimeImmutable())); $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) - ->withHeader(new StreamHeader('foo-1', 3, new DateTimeImmutable())); + ->withHeader(new StreamNameHeader('foo-1')) + ->withHeader(new PlayheadHeader(3)); $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); $store = new InMemoryStore([$message1, $message2, $message3, $message4]); @@ -165,11 +167,11 @@ public function testLoadFromIndex(): void public function testLoadByStreamNameWithLikeAll(): void { $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) - ->withHeader(new StreamHeader('foo-3')); + ->withHeader(new StreamNameHeader('foo-3')); $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) - ->withHeader(new StreamHeader('bar-1')); + ->withHeader(new StreamNameHeader('bar-1')); $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) - ->withHeader(new StreamHeader('bar-2')); + ->withHeader(new StreamNameHeader('bar-2')); $store = new InMemoryStore([$message1, $message2, $message3]); @@ -301,11 +303,11 @@ public function testSaveWithExistingMessages(): void public function testStreams(): void { $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) - ->withHeader(new StreamHeader('foo')); + ->withHeader(new StreamNameHeader('foo')); $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) - ->withHeader(new StreamHeader('bar')); + ->withHeader(new StreamNameHeader('bar')); $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) - ->withHeader(new StreamHeader('bar')); + ->withHeader(new StreamNameHeader('bar')); $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); $store = new InMemoryStore([$message1, $message2, $message3, $message4]); @@ -316,11 +318,11 @@ public function testStreams(): void public function testRemove(): void { $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) - ->withHeader(new StreamHeader('foo')); + ->withHeader(new StreamNameHeader('foo')); $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) - ->withHeader(new StreamHeader('bar')); + ->withHeader(new StreamNameHeader('bar')); $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) - ->withHeader(new StreamHeader('bar')); + ->withHeader(new StreamNameHeader('bar')); $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); $store = new InMemoryStore([$message1, $message2, $message3, $message4]); @@ -351,11 +353,11 @@ static function () use (&$called): void { public function testClear(): void { $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) - ->withHeader(new StreamHeader('foo')); + ->withHeader(new StreamNameHeader('foo')); $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) - ->withHeader(new StreamHeader('bar')); + ->withHeader(new StreamNameHeader('bar')); $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) - ->withHeader(new StreamHeader('bar')); + ->withHeader(new StreamNameHeader('bar')); $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); $store = new InMemoryStore([$message1, $message2, $message3, $message4]); diff --git a/tests/Unit/Store/StreamDoctrineDbalStoreTest.php b/tests/Unit/Store/StreamDoctrineDbalStoreTest.php index e2347005c..dc4c51951 100644 --- a/tests/Unit/Store/StreamDoctrineDbalStoreTest.php +++ b/tests/Unit/Store/StreamDoctrineDbalStoreTest.php @@ -25,10 +25,12 @@ use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Serializer\SerializedEvent; use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\InvalidStreamName; use Patchlevel\EventSourcing\Store\MissingDataForStorage; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; -use Patchlevel\EventSourcing\Store\StreamHeader; use Patchlevel\EventSourcing\Store\StreamStartHeader; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Patchlevel\EventSourcing\Store\WrongQueryResult; @@ -45,7 +47,6 @@ use Prophecy\PhpUnit\ProphecyTrait; use Psr\Clock\ClockInterface; use RuntimeException; - use function iterator_to_array; use function method_exists; @@ -458,11 +459,11 @@ public function testLoadWithOneEvent(): void self::assertInstanceOf(Message::class, $message); self::assertInstanceOf(ProfileCreated::class, $message->event()); - self::assertSame('profile-1', $message->header(StreamHeader::class)->streamName); - self::assertSame(1, $message->header(StreamHeader::class)->playhead); + self::assertSame('profile-1', $message->header(StreamNameHeader::class)->streamName); + self::assertSame(1, $message->header(PlayheadHeader::class)->playhead); self::assertEquals( new DateTimeImmutable('2021-02-17 10:00:00'), - $message->header(StreamHeader::class)->recordedOn, + $message->header(RecordedOnHeader::class)->recordedOn, ); iterator_to_array($stream); @@ -560,11 +561,11 @@ public function testLoadWithTwoEvents(): void self::assertInstanceOf(Message::class, $message); self::assertInstanceOf(ProfileCreated::class, $message->event()); - self::assertSame('profile-1', $message->header(StreamHeader::class)->streamName); - self::assertSame(1, $message->header(StreamHeader::class)->playhead); + self::assertSame('profile-1', $message->header(StreamNameHeader::class)->streamName); + self::assertSame(1, $message->header(PlayheadHeader::class)->playhead); self::assertEquals( new DateTimeImmutable('2021-02-17 10:00:00'), - $message->header(StreamHeader::class)->recordedOn, + $message->header(RecordedOnHeader::class)->recordedOn, ); $stream->next(); @@ -575,11 +576,11 @@ public function testLoadWithTwoEvents(): void self::assertInstanceOf(Message::class, $message); self::assertInstanceOf(ProfileEmailChanged::class, $message->event()); - self::assertSame('profile-1', $message->header(StreamHeader::class)->streamName); - self::assertSame(2, $message->header(StreamHeader::class)->playhead); + self::assertSame('profile-1', $message->header(StreamNameHeader::class)->streamName); + self::assertSame(2, $message->header(PlayheadHeader::class)->playhead); self::assertEquals( new DateTimeImmutable('2021-02-17 11:00:00'), - $message->header(StreamHeader::class)->recordedOn, + $message->header(RecordedOnHeader::class)->recordedOn, ); } @@ -827,11 +828,9 @@ public function testSaveWithOneEvent(): void { $recordedOn = new DateTimeImmutable(); $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - 1, - $recordedOn, - )); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader($recordedOn)); $eventSerializer = $this->prophesize(EventSerializer::class); $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( @@ -867,7 +866,7 @@ public function testSaveWithOneEvent(): void $singleTableStore->save($message); } - public function testSaveWithoutStreamHeader(): void + public function testSaveWithoutStreamNameHeader(): void { $recordedOn = new DateTimeImmutable(); $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))); @@ -912,17 +911,13 @@ public function testSaveWithTwoEvents(): void { $recordedOn = new DateTimeImmutable(); $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - 1, - $recordedOn, - )); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader($recordedOn)); $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('d'))) - ->withHeader(new StreamHeader( - 'profile-1', - 2, - $recordedOn, - )); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader($recordedOn)); $eventSerializer = $this->prophesize(EventSerializer::class); $eventSerializer->serialize($message1->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( @@ -986,17 +981,13 @@ public function testSaveWithUniqueConstraintViolation(): void { $recordedOn = new DateTimeImmutable(); $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - 1, - $recordedOn, - )); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader($recordedOn)); $message2 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - 1, - $recordedOn, - )); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader($recordedOn)); $eventSerializer = $this->prophesize(EventSerializer::class); $eventSerializer->serialize($message1->event())->shouldBeCalledTimes(2)->willReturn(new SerializedEvent( @@ -1061,11 +1052,9 @@ public function testSaveWithThousandEvents(): void $messages = []; for ($i = 1; $i <= 10000; $i++) { $messages[] = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - $i, - $recordedOn, - )); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader($i)) + ->withHeader(new RecordedOnHeader($recordedOn)); } $eventSerializer = $this->prophesize(EventSerializer::class); @@ -1104,11 +1093,9 @@ public function testSaveWithCustomHeaders(): void $recordedOn = new DateTimeImmutable(); $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - 1, - $recordedOn, - )) + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader($recordedOn)) ->withHeaders($customHeaders); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -1435,19 +1422,15 @@ public function testArchiveMessagesDifferentAggregates(): void { $recordedOn = new DateTimeImmutable(); $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - 5, - $recordedOn, - )) + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(5)) + ->withHeader(new RecordedOnHeader($recordedOn)) ->withHeader(new StreamStartHeader()); $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('2'), Email::fromString('d'))) - ->withHeader(new StreamHeader( - 'profile-2', - 42, - $recordedOn, - )) + ->withHeader(new StreamNameHeader('profile-2')) + ->withHeader(new PlayheadHeader(42)) + ->withHeader(new RecordedOnHeader($recordedOn)) ->withHeader(new StreamStartHeader()); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -1542,19 +1525,15 @@ public function testArchiveMessagesSameAggregate(): void { $recordedOn = new DateTimeImmutable(); $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new StreamHeader( - 'profile-1', - 5, - $recordedOn, - )) + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(5)) + ->withHeader(new RecordedOnHeader($recordedOn)) ->withHeader(new StreamStartHeader()); $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('d'))) - ->withHeader(new StreamHeader( - 'profile-1', - 42, - $recordedOn, - )) + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(42)) + ->withHeader(new RecordedOnHeader($recordedOn)) ->withHeader(new StreamStartHeader()); $eventSerializer = $this->prophesize(EventSerializer::class); diff --git a/tests/Unit/Store/StreamDoctrineDbalStreamTest.php b/tests/Unit/Store/StreamDoctrineDbalStreamTest.php index aea21cede..95bdb9f07 100644 --- a/tests/Unit/Store/StreamDoctrineDbalStreamTest.php +++ b/tests/Unit/Store/StreamDoctrineDbalStreamTest.php @@ -12,16 +12,17 @@ use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Serializer\SerializedEvent; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\StreamClosed; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStoreStream; -use Patchlevel\EventSourcing\Store\StreamHeader; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use PHPUnit\Framework\TestCase; use Prophecy\PhpUnit\ProphecyTrait; use Throwable; - use function iterator_to_array; /** @covers \Patchlevel\EventSourcing\Store\StreamDoctrineDbalStoreStream */ @@ -75,7 +76,9 @@ public function testOneMessage(): void Email::fromString('info@patchlevel.de'), ); $message = Message::create($event) - ->withHeader(new StreamHeader('profile-1', 1, new DateTimeImmutable('2022-10-10 10:10:10'))); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); $eventSerializer = $this->prophesize(EventSerializer::class); $eventSerializer->deserialize(new SerializedEvent('profile_created', '{}')) @@ -156,11 +159,16 @@ public function testMultipleMessages(): void $messages = [ Message::create($event) - ->withHeader(new StreamHeader('profile-1', 1, new DateTimeImmutable('2022-10-10 10:10:10'))), + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), Message::create($event) - ->withHeader(new StreamHeader('profile-2', null, new DateTimeImmutable('2022-10-10 10:10:10'))), + ->withHeader(new StreamNameHeader('profile-2')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), Message::create($event) - ->withHeader(new StreamHeader('profile-3', 1, new DateTimeImmutable('2022-10-10 10:10:10'))), + ->withHeader(new StreamNameHeader('profile-3')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), ]; $eventSerializer = $this->prophesize(EventSerializer::class); @@ -238,7 +246,9 @@ public function testWithNoList(): void Email::fromString('info@patchlevel.de'), ); $message = Message::create($event) - ->withHeader(new StreamHeader('profile-1', 1, new DateTimeImmutable('2022-10-10 10:10:10'))); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); $eventSerializer = $this->prophesize(EventSerializer::class); $eventSerializer->deserialize(new SerializedEvent('profile_created', '{}')) @@ -295,7 +305,9 @@ public function testClose(): void Email::fromString('info@patchlevel.de'), ); $message = Message::create($event) - ->withHeader(new StreamHeader('profile-1', 1, new DateTimeImmutable('2022-10-10 10:10:10'))); + ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); $eventSerializer = $this->prophesize(EventSerializer::class); $eventSerializer->deserialize(new SerializedEvent('profile_created', '{}'))