From 6fc81bd8fb58a11870b318451a13eaf7039b7e5f Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 13 Dec 2024 15:31:00 +0100 Subject: [PATCH] introduce event id in stream store & fix split stream logic --- .../Message/MessageHeaderRegistry.php | 2 + src/Store/Header/EventIdHeader.php | 17 +++ src/Store/StreamDoctrineDbalStore.php | 52 ++++--- src/Store/StreamDoctrineDbalStoreStream.php | 8 +- .../Store/StreamDoctrineDbalStoreTest.php | 104 ++++++++++++-- .../Store/StreamDoctrineDbalStoreTest.php | 133 ++++++++++-------- .../Store/StreamDoctrineDbalStreamTest.php | 42 ++++-- 7 files changed, 250 insertions(+), 108 deletions(-) create mode 100644 src/Store/Header/EventIdHeader.php diff --git a/src/Metadata/Message/MessageHeaderRegistry.php b/src/Metadata/Message/MessageHeaderRegistry.php index 4045fc984..605303ed4 100644 --- a/src/Metadata/Message/MessageHeaderRegistry.php +++ b/src/Metadata/Message/MessageHeaderRegistry.php @@ -6,6 +6,7 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Store\ArchivedHeader; +use Patchlevel\EventSourcing\Store\Header\EventIdHeader; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; @@ -81,6 +82,7 @@ public static function createWithInternalHeaders(array $headerNameToClassMap = [ 'aggregate' => AggregateHeader::class, 'archived' => ArchivedHeader::class, 'newStreamStart' => StreamStartHeader::class, + 'eventId' => EventIdHeader::class, ]; return new self($headerNameToClassMap + $internalHeaders); diff --git a/src/Store/Header/EventIdHeader.php b/src/Store/Header/EventIdHeader.php new file mode 100644 index 000000000..434cf99db --- /dev/null +++ b/src/Store/Header/EventIdHeader.php @@ -0,0 +1,17 @@ +transactional( function () use ($messages): void { - /** @var array $achievedUntilPlayhead */ - $achievedUntilPlayhead = []; + /** @var array $achievedUntilEventId */ + $achievedUntilEventId = []; $booleanType = Type::getType(Types::BOOLEAN); $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); @@ -201,8 +203,9 @@ function () use ($messages): void { $columns = [ 'stream', 'playhead', - 'event', - 'payload', + 'event_id', + 'event_name', + 'event_payload', 'recorded_on', 'new_stream_start', 'archived', @@ -232,13 +235,19 @@ function () use ($messages): void { throw new MissingDataForStorage($e->name, $e); } - $playhead = null; - if ($message->hasHeader(PlayheadHeader::class)) { - $playhead = $message->header(PlayheadHeader::class)->playhead; + $parameters[] = $message->header(PlayheadHeader::class)->playhead; + } else { + $parameters[] = null; + } + + if ($message->hasHeader(EventIdHeader::class)) { + $eventId = $message->header(EventIdHeader::class)->eventId; + } else { + $eventId = Uuid::uuid7()->toString(); } - $parameters[] = $playhead; + $parameters[] = $eventId; $parameters[] = $data->name; $parameters[] = $data->payload; @@ -248,19 +257,19 @@ function () use ($messages): void { $parameters[] = $this->clock->now(); } - $types[$offset + 4] = $dateTimeType; + $types[$offset + 5] = $dateTimeType; $streamStart = $message->hasHeader(StreamStartHeader::class); - if ($streamStart && $playhead) { - $achievedUntilPlayhead[$streamName] = $playhead; + if ($streamStart) { + $achievedUntilEventId[$streamName] = $eventId; } $parameters[] = $streamStart; - $types[$offset + 5] = $booleanType; + $types[$offset + 6] = $booleanType; $parameters[] = $message->hasHeader(ArchivedHeader::class); - $types[$offset + 6] = $booleanType; + $types[$offset + 7] = $booleanType; $parameters[] = $this->headersSerializer->serialize($this->getCustomHeaders($message)); @@ -283,21 +292,21 @@ function () use ($messages): void { $this->executeSave($columns, $placeholders, $parameters, $types, $this->connection); } - foreach ($achievedUntilPlayhead as $stream => $playhead) { + foreach ($achievedUntilEventId as $stream => $eventId) { $this->connection->executeStatement( sprintf( <<<'SQL' - UPDATE %s + UPDATE %1$s SET archived = true WHERE stream = :stream - AND playhead < :playhead + AND id < (SELECT id FROM %1$s WHERE event_id = :event_id) AND archived = false SQL, $this->config['table_name'], ), [ 'stream' => $stream, - 'playhead' => $playhead, + 'event_id' => $eventId, ], ); } @@ -366,10 +375,13 @@ public function configureSchema(Schema $schema, Connection $connection): void ->setNotnull(true); $table->addColumn('playhead', Types::INTEGER) ->setNotnull(false); - $table->addColumn('event', Types::STRING) + $table->addColumn('event_id', Types::STRING) + ->setLength(255) + ->setNotnull(true); + $table->addColumn('event_name', Types::STRING) ->setLength(255) ->setNotnull(true); - $table->addColumn('payload', Types::JSON) + $table->addColumn('event_payload', Types::JSON) ->setNotnull(true); $table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE) ->setNotnull(true); @@ -383,6 +395,7 @@ public function configureSchema(Schema $schema, Connection $connection): void ->setNotnull(true); $table->setPrimaryKey(['id']); + $table->addUniqueIndex(['event_id']); $table->addUniqueIndex(['stream', 'playhead']); $table->addIndex(['stream', 'playhead', 'archived']); } @@ -392,6 +405,7 @@ private function getCustomHeaders(Message $message): array { $filteredHeaders = [ StreamNameHeader::class, + EventIdHeader::class, PlayheadHeader::class, RecordedOnHeader::class, StreamStartHeader::class, diff --git a/src/Store/StreamDoctrineDbalStoreStream.php b/src/Store/StreamDoctrineDbalStoreStream.php index d7d843dc0..06fbb9642 100644 --- a/src/Store/StreamDoctrineDbalStoreStream.php +++ b/src/Store/StreamDoctrineDbalStoreStream.php @@ -15,6 +15,7 @@ use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Serializer\SerializedEvent; +use Patchlevel\EventSourcing\Store\Header\EventIdHeader; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; @@ -120,7 +121,7 @@ private function buildGenerator( /** @var DateTimeTzImmutableType $dateTimeType */ $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); - /** @var array{id: positive-int, stream: string, playhead: int|string|null, event: string, payload: string, recorded_on: string, archived: int|string, new_stream_start: int|string, custom_headers: string} $data */ + /** @var array{id: positive-int, stream: string, playhead: int|string|null, event_id: string, event_name: string, event_payload: string, recorded_on: string, archived: int|string, new_stream_start: int|string, custom_headers: string} $data */ foreach ($result->iterateAssociative() as $data) { if ($this->position === null) { $this->position = 0; @@ -129,11 +130,12 @@ private function buildGenerator( } $this->index = $data['id']; - $event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload'])); + $event = $eventSerializer->deserialize(new SerializedEvent($data['event_name'], $data['event_payload'])); $message = Message::create($event) ->withHeader(new StreamNameHeader($data['stream'])) - ->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform))); + ->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform))) + ->withHeader(new EventIdHeader($data['event_id'])); if ($data['playhead'] !== null) { $message = $message->withHeader(new PlayheadHeader((int)$data['playhead'])); diff --git a/tests/Integration/Store/StreamDoctrineDbalStoreTest.php b/tests/Integration/Store/StreamDoctrineDbalStoreTest.php index af4c41309..d1252d08e 100644 --- a/tests/Integration/Store/StreamDoctrineDbalStoreTest.php +++ b/tests/Integration/Store/StreamDoctrineDbalStoreTest.php @@ -12,10 +12,12 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; +use Patchlevel\EventSourcing\Store\Header\EventIdHeader; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; +use Patchlevel\EventSourcing\Store\StreamStartHeader; use Patchlevel\EventSourcing\Store\StreamStore; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Patchlevel\EventSourcing\Tests\DbalManager; @@ -85,10 +87,10 @@ public function testSave(): void self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1['stream']); self::assertEquals('1', $result1['playhead']); self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']); - self::assertEquals('profile.created', $result1['event']); + self::assertEquals('profile.created', $result1['event_name']); self::assertEquals( ['profileId' => $profileId->toString(), 'name' => 'test'], - json_decode($result1['payload'], true), + json_decode($result1['event_payload'], true), ); $result2 = $result[1]; @@ -96,10 +98,10 @@ public function testSave(): void self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2['stream']); self::assertEquals('2', $result2['playhead']); self::assertStringContainsString('2020-01-02 00:00:00', $result2['recorded_on']); - self::assertEquals('profile.created', $result2['event']); + self::assertEquals('profile.created', $result2['event_name']); self::assertEquals( ['profileId' => $profileId->toString(), 'name' => 'test'], - json_decode($result2['payload'], true), + json_decode($result2['event_payload'], true), ); } @@ -124,10 +126,10 @@ public function testSaveWithOnlyStreamName(): void self::assertEquals('extern', $result1['stream']); self::assertEquals(null, $result1['playhead']); self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']); - self::assertEquals('extern', $result1['event']); + self::assertEquals('extern', $result1['event_name']); self::assertEquals( ['message' => 'test 1'], - json_decode($result1['payload'], true), + json_decode($result1['event_payload'], true), ); $result2 = $result[1]; @@ -135,10 +137,10 @@ public function testSaveWithOnlyStreamName(): void self::assertEquals('extern', $result2['stream']); self::assertEquals(null, $result2['playhead']); self::assertStringContainsString('2020-01-01 00:00:00', $result2['recorded_on']); - self::assertEquals('extern', $result2['event']); + self::assertEquals('extern', $result2['event_name']); self::assertEquals( ['message' => 'test 2'], - json_decode($result2['payload'], true), + json_decode($result2['event_payload'], true), ); } @@ -171,10 +173,10 @@ public function testSaveWithTransactional(): void self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1['stream']); self::assertEquals('1', $result1['playhead']); self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']); - self::assertEquals('profile.created', $result1['event']); + self::assertEquals('profile.created', $result1['event_name']); self::assertEquals( ['profileId' => $profileId->toString(), 'name' => 'test'], - json_decode($result1['payload'], true), + json_decode($result1['event_payload'], true), ); $result2 = $result[1]; @@ -182,14 +184,68 @@ public function testSaveWithTransactional(): void self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2['stream']); self::assertEquals('2', $result2['playhead']); self::assertStringContainsString('2020-01-02 00:00:00', $result2['recorded_on']); - self::assertEquals('profile.created', $result2['event']); + self::assertEquals('profile.created', $result2['event_name']); self::assertEquals( ['profileId' => $profileId->toString(), 'name' => 'test'], - json_decode($result2['payload'], true), + json_decode($result2['event_payload'], true), ); } - public function testUniqueConstraint(): void + public function testSplitStream(): void + { + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('1')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new EventIdHeader('2')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))) + ->withHeader(new StreamStartHeader()), + ]; + + $this->store->save(...$messages); + + /** @var list> $result */ + $result = $this->connection->fetchAllAssociative('SELECT * FROM event_store ORDER BY id'); + + self::assertCount(2, $result); + + $result1 = $result[0]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1['stream']); + self::assertEquals('1', $result1['playhead']); + self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']); + self::assertEquals('profile.created', $result1['event_name']); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result1['event_payload'], true), + ); + + self::assertEquals(1, $result1['archived']); + self::assertEquals(0, $result1['new_stream_start']); + + $result2 = $result[1]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2['stream']); + self::assertEquals('2', $result2['playhead']); + self::assertStringContainsString('2020-01-02 00:00:00', $result2['recorded_on']); + self::assertEquals('profile.created', $result2['event_name']); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result2['event_payload'], true), + ); + + self::assertEquals(0, $result2['archived']); + self::assertEquals(1, $result2['new_stream_start']); + } + + public function testUniqueStreamNameAndPlayheadConstraint(): void { $this->expectException(UniqueConstraintViolation::class); @@ -209,6 +265,26 @@ public function testUniqueConstraint(): void $this->store->save(...$messages); } + public function testUniqueEventIdConstraint(): void + { + $this->expectException(UniqueConstraintViolation::class); + + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new EventIdHeader('1')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new EventIdHeader('1')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + ]; + + $this->store->save(...$messages); + } + public function testSave10000Messages(): void { $profileId = ProfileId::generate(); @@ -331,7 +407,7 @@ public function testStreams(): void ], $streams); } - public function testRemote(): void + public function testRemove(): void { $profileId = ProfileId::fromString('0190e47e-77e9-7b90-bf62-08bbf0ab9b4b'); diff --git a/tests/Unit/Store/StreamDoctrineDbalStoreTest.php b/tests/Unit/Store/StreamDoctrineDbalStoreTest.php index 963950ccd..00a56a491 100644 --- a/tests/Unit/Store/StreamDoctrineDbalStoreTest.php +++ b/tests/Unit/Store/StreamDoctrineDbalStoreTest.php @@ -25,6 +25,7 @@ use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Serializer\SerializedEvent; use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; +use Patchlevel\EventSourcing\Store\Header\EventIdHeader; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; @@ -394,8 +395,9 @@ public function testLoadWithOneEvent(): void 'id' => 1, 'stream' => 'profile-1', 'playhead' => '1', - 'event' => 'profile.created', - 'payload' => '{"profileId": "1", "email": "s"}', + 'event_id' => '1', + 'event_name' => 'profile.created', + 'event_payload' => '{"profileId": "1", "email": "s"}', 'recorded_on' => '2021-02-17 10:00:00', 'archived' => '0', 'new_stream_start' => '0', @@ -483,8 +485,9 @@ public function testLoadWithTwoEvents(): void 'id' => 1, 'stream' => 'profile-1', 'playhead' => '1', - 'event' => 'profile.created', - 'payload' => '{"profileId": "1", "email": "s"}', + 'event_id' => '1', + 'event_name' => 'profile.created', + 'event_payload' => '{"profileId": "1", "email": "s"}', 'recorded_on' => '2021-02-17 10:00:00', 'archived' => '0', 'new_stream_start' => '0', @@ -494,8 +497,9 @@ public function testLoadWithTwoEvents(): void 'id' => 2, 'stream' => 'profile-1', 'playhead' => '2', - 'event' => 'profile.email_changed', - 'payload' => '{"profileId": "1", "email": "d"}', + 'event_id' => '2', + 'event_name' => 'profile.email_changed', + 'event_payload' => '{"profileId": "1", "email": "d"}', 'recorded_on' => '2021-02-17 11:00:00', 'archived' => '0', 'new_stream_start' => '0', @@ -830,6 +834,7 @@ public function testSaveWithOneEvent(): void $recordedOn = new DateTimeImmutable(); $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) ->withHeader(new StreamNameHeader('profile-1')) + ->withHeader(new EventIdHeader('1')) ->withHeader(new PlayheadHeader(1)) ->withHeader(new RecordedOnHeader($recordedOn)); @@ -850,12 +855,12 @@ public function testSaveWithOneEvent(): void ); $mockedConnection->executeStatement( - "INSERT INTO event_store (stream, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?)", - ['profile-1', 1, 'profile_created', '', $recordedOn, false, false, '[]'], + "INSERT INTO event_store (stream, playhead, event_id, event_name, event_payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + ['profile-1', 1, '1', 'profile_created', '', $recordedOn, false, false, '[]'], [ - 4 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 5 => Type::getType(Types::BOOLEAN), + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), ], )->shouldBeCalledOnce(); @@ -869,7 +874,6 @@ public function testSaveWithOneEvent(): void public function testSaveWithoutStreamNameHeader(): void { - $recordedOn = new DateTimeImmutable(); $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -888,15 +892,7 @@ public function testSaveWithoutStreamNameHeader(): void static fn (array $args): mixed => $args[0](), ); - $mockedConnection->executeStatement( - "INSERT INTO event_store (stream, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?)", - ['profile-1', 1, 'profile_created', '', $recordedOn, false, false, []], - [ - 4 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 5 => Type::getType(Types::BOOLEAN), - 6 => Type::getType(Types::BOOLEAN), - ], - )->shouldNotBeCalled(); + $mockedConnection->executeStatement(Argument::any(), Argument::any(), Argument::any())->shouldNotBeCalled(); $singleTableStore = new StreamDoctrineDbalStore( $mockedConnection->reveal(), @@ -914,11 +910,13 @@ public function testSaveWithTwoEvents(): void $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) - ->withHeader(new RecordedOnHeader($recordedOn)); + ->withHeader(new RecordedOnHeader($recordedOn)) + ->withHeader(new EventIdHeader('1')); $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('d'))) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(2)) - ->withHeader(new RecordedOnHeader($recordedOn)); + ->withHeader(new RecordedOnHeader($recordedOn)) + ->withHeader(new EventIdHeader('2')); $eventSerializer = $this->prophesize(EventSerializer::class); $eventSerializer->serialize($message1->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( @@ -941,10 +939,11 @@ public function testSaveWithTwoEvents(): void ); $mockedConnection->executeStatement( - "INSERT INTO event_store (stream, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO event_store (stream, playhead, event_id, event_name, event_payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", [ 'profile-1', 1, + '1', 'profile_created', '', $recordedOn, @@ -953,6 +952,7 @@ public function testSaveWithTwoEvents(): void '[]', 'profile-1', 2, + '2', 'profile_email_changed', '', $recordedOn, @@ -961,12 +961,12 @@ public function testSaveWithTwoEvents(): void '[]', ], [ - 4 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 5 => Type::getType(Types::BOOLEAN), + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), 6 => Type::getType(Types::BOOLEAN), - 12 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 13 => Type::getType(Types::BOOLEAN), - 14 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), ], )->shouldBeCalledOnce(); @@ -984,10 +984,12 @@ public function testSaveWithUniqueConstraintViolation(): void $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('1')) ->withHeader(new RecordedOnHeader($recordedOn)); $message2 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('2')) ->withHeader(new RecordedOnHeader($recordedOn)); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -1007,10 +1009,11 @@ public function testSaveWithUniqueConstraintViolation(): void ); $mockedConnection->executeStatement( - "INSERT INTO event_store (stream, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO event_store (stream, playhead, event_id, event_name, event_payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", [ 'profile-1', 1, + '1', 'profile_created', '', $recordedOn, @@ -1019,6 +1022,7 @@ public function testSaveWithUniqueConstraintViolation(): void '[]', 'profile-1', 1, + '2', 'profile_created', '', $recordedOn, @@ -1027,12 +1031,12 @@ public function testSaveWithUniqueConstraintViolation(): void '[]', ], [ - 4 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 5 => Type::getType(Types::BOOLEAN), + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), 6 => Type::getType(Types::BOOLEAN), - 12 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 13 => Type::getType(Types::BOOLEAN), - 14 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), ], )->shouldBeCalledOnce()->willThrow(UniqueConstraintViolationException::class); @@ -1097,6 +1101,7 @@ public function testSaveWithCustomHeaders(): void ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) ->withHeader(new RecordedOnHeader($recordedOn)) + ->withHeader(new EventIdHeader('1')) ->withHeaders($customHeaders); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -1116,12 +1121,12 @@ public function testSaveWithCustomHeaders(): void ); $mockedConnection->executeStatement( - "INSERT INTO event_store (stream, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?)", - ['profile-1', 1, 'profile_created', '', $recordedOn, false, false, '{foo: "foo", baz: "baz"}'], + "INSERT INTO event_store (stream, playhead, event_id, event_name, event_payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + ['profile-1', 1, '1', 'profile_created', '', $recordedOn, false, false, '{foo: "foo", baz: "baz"}'], [ - 4 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 5 => Type::getType(Types::BOOLEAN), + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), ], )->shouldBeCalledOnce(); @@ -1392,10 +1397,13 @@ public function testConfigureSchema(): void ->setNotnull(true); $table->addColumn('playhead', Types::INTEGER) ->setNotnull(false); - $table->addColumn('event', Types::STRING) + $table->addColumn('event_id', Types::STRING) + ->setLength(255) + ->setNotnull(true); + $table->addColumn('event_name', Types::STRING) ->setLength(255) ->setNotnull(true); - $table->addColumn('payload', Types::JSON) + $table->addColumn('event_payload', Types::JSON) ->setNotnull(true); $table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE) ->setNotnull(true); @@ -1409,6 +1417,7 @@ public function testConfigureSchema(): void ->setNotnull(true); $table->setPrimaryKey(['id']); + $table->addUniqueIndex(['event_id']); $table->addUniqueIndex(['stream', 'playhead']); $table->addIndex(['stream', 'playhead', 'archived']); @@ -1426,12 +1435,14 @@ public function testArchiveMessagesDifferentAggregates(): void ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(5)) ->withHeader(new RecordedOnHeader($recordedOn)) + ->withHeader(new EventIdHeader('1')) ->withHeader(new StreamStartHeader()); $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('2'), Email::fromString('d'))) ->withHeader(new StreamNameHeader('profile-2')) ->withHeader(new PlayheadHeader(42)) ->withHeader(new RecordedOnHeader($recordedOn)) + ->withHeader(new EventIdHeader('2')) ->withHeader(new StreamStartHeader()); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -1455,10 +1466,11 @@ public function testArchiveMessagesDifferentAggregates(): void ); $mockedConnection->executeStatement( - "INSERT INTO event_store (stream, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO event_store (stream, playhead, event_id, event_name, event_payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", [ 'profile-1', 5, + '1', 'profile_created', '', $recordedOn, @@ -1467,6 +1479,7 @@ public function testArchiveMessagesDifferentAggregates(): void '[]', 'profile-2', 42, + '2', 'profile_email_changed', '', $recordedOn, @@ -1475,12 +1488,12 @@ public function testArchiveMessagesDifferentAggregates(): void '[]', ], [ - 4 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 5 => Type::getType(Types::BOOLEAN), + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), 6 => Type::getType(Types::BOOLEAN), - 12 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 13 => Type::getType(Types::BOOLEAN), - 14 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), ], )->shouldBeCalledOnce(); @@ -1489,12 +1502,12 @@ public function testArchiveMessagesDifferentAggregates(): void UPDATE event_store SET archived = true WHERE stream = :stream - AND playhead < :playhead + AND id < (SELECT id FROM event_store WHERE event_id = :event_id) AND archived = false SQL, [ 'stream' => 'profile-1', - 'playhead' => 5, + 'event_id' => '1', ], )->shouldBeCalledOnce(); @@ -1503,12 +1516,12 @@ public function testArchiveMessagesDifferentAggregates(): void UPDATE event_store SET archived = true WHERE stream = :stream - AND playhead < :playhead + AND id < (SELECT id FROM event_store WHERE event_id = :event_id) AND archived = false SQL, [ 'stream' => 'profile-2', - 'playhead' => 42, + 'event_id' => '2', ], )->shouldBeCalledOnce(); @@ -1529,12 +1542,14 @@ public function testArchiveMessagesSameAggregate(): void ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(5)) ->withHeader(new RecordedOnHeader($recordedOn)) + ->withHeader(new EventIdHeader('3')) ->withHeader(new StreamStartHeader()); $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('d'))) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(42)) ->withHeader(new RecordedOnHeader($recordedOn)) + ->withHeader(new EventIdHeader('7')) ->withHeader(new StreamStartHeader()); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -1558,10 +1573,11 @@ public function testArchiveMessagesSameAggregate(): void ); $mockedConnection->executeStatement( - "INSERT INTO event_store (stream, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO event_store (stream, playhead, event_id, event_name, event_payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", [ 'profile-1', 5, + '3', 'profile_created', '', $recordedOn, @@ -1570,6 +1586,7 @@ public function testArchiveMessagesSameAggregate(): void '[]', 'profile-1', 42, + '7', 'profile_email_changed', '', $recordedOn, @@ -1578,12 +1595,12 @@ public function testArchiveMessagesSameAggregate(): void '[]', ], [ - 4 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 5 => Type::getType(Types::BOOLEAN), + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), 6 => Type::getType(Types::BOOLEAN), - 12 => Type::getType(Types::DATETIMETZ_IMMUTABLE), - 13 => Type::getType(Types::BOOLEAN), - 14 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), ], )->shouldBeCalledOnce(); @@ -1592,12 +1609,12 @@ public function testArchiveMessagesSameAggregate(): void UPDATE event_store SET archived = true WHERE stream = :stream - AND playhead < :playhead + AND id < (SELECT id FROM event_store WHERE event_id = :event_id) AND archived = false SQL, [ 'stream' => 'profile-1', - 'playhead' => 42, + 'event_id' => '7', ], )->shouldBeCalledOnce(); diff --git a/tests/Unit/Store/StreamDoctrineDbalStreamTest.php b/tests/Unit/Store/StreamDoctrineDbalStreamTest.php index 957ea56e4..8696ee1e5 100644 --- a/tests/Unit/Store/StreamDoctrineDbalStreamTest.php +++ b/tests/Unit/Store/StreamDoctrineDbalStreamTest.php @@ -12,6 +12,7 @@ use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Serializer\SerializedEvent; +use Patchlevel\EventSourcing\Store\Header\EventIdHeader; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; @@ -61,8 +62,9 @@ public function testOneMessage(): void $messages = [ [ 'id' => 1, - 'event' => 'profile_created', - 'payload' => '{}', + 'event_id' => '1', + 'event_name' => 'profile_created', + 'event_payload' => '{}', 'stream' => 'profile-1', 'playhead' => 1, 'recorded_on' => '2022-10-10 10:10:10', @@ -79,6 +81,7 @@ public function testOneMessage(): void $message = Message::create($event) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('1')) ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -120,8 +123,9 @@ public function testMultipleMessages(): void $messagesArray = [ [ 'id' => 1, - 'event' => 'profile_created', - 'payload' => '{}', + 'event_id' => '1', + 'event_name' => 'profile_created', + 'event_payload' => '{}', 'stream' => 'profile-1', 'playhead' => 1, 'recorded_on' => '2022-10-10 10:10:10', @@ -131,8 +135,9 @@ public function testMultipleMessages(): void ], [ 'id' => 2, - 'event' => 'profile_created2', - 'payload' => '{}', + 'event_id' => '2', + 'event_name' => 'profile_created2', + 'event_payload' => '{}', 'stream' => 'profile-2', 'playhead' => null, 'recorded_on' => '2022-10-10 10:10:10', @@ -142,8 +147,9 @@ public function testMultipleMessages(): void ], [ 'id' => 3, - 'event' => 'profile_created3', - 'payload' => '{}', + 'event_id' => '3', + 'event_name' => 'profile_created3', + 'event_payload' => '{}', 'stream' => 'profile-3', 'playhead' => 1, 'recorded_on' => '2022-10-10 10:10:10', @@ -162,13 +168,16 @@ public function testMultipleMessages(): void Message::create($event) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('1')) ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), Message::create($event) ->withHeader(new StreamNameHeader('profile-2')) + ->withHeader(new EventIdHeader('2')) ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), Message::create($event) ->withHeader(new StreamNameHeader('profile-3')) ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('3')) ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), ]; @@ -231,8 +240,9 @@ public function testWithNoList(): void $messages = [ 5 => [ 'id' => 5, - 'event' => 'profile_created', - 'payload' => '{}', + 'event_id' => '1', + 'event_name' => 'profile_created', + 'event_payload' => '{}', 'stream' => 'profile-1', 'playhead' => 1, 'recorded_on' => '2022-10-10 10:10:10', @@ -249,6 +259,7 @@ public function testWithNoList(): void $message = Message::create($event) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('1')) ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -290,8 +301,9 @@ public function testClose(): void $messages = [ [ 'id' => 1, - 'event' => 'profile_created', - 'payload' => '{}', + 'event_id' => '1', + 'event_name' => 'profile_created', + 'event_payload' => '{}', 'stream' => 'profile-1', 'playhead' => 1, 'recorded_on' => '2022-10-10 10:10:10', @@ -308,6 +320,7 @@ public function testClose(): void $message = Message::create($event) ->withHeader(new StreamNameHeader('profile-1')) ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('1')) ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); $eventSerializer = $this->prophesize(EventSerializer::class); @@ -369,8 +382,9 @@ public function testPosition(): void $messages = [ [ 'id' => 1, - 'event' => 'profile_created', - 'payload' => '{}', + 'event_id' => '1', + 'event_name' => 'profile_created', + 'event_payload' => '{}', 'stream' => 'profile-1', 'playhead' => 1, 'recorded_on' => '2022-10-10 10:10:10',