Skip to content

Commit

Permalink
split stream header
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 13, 2024
1 parent 91dabc1 commit 1a995c2
Show file tree
Hide file tree
Showing 20 changed files with 351 additions and 317 deletions.
49 changes: 33 additions & 16 deletions src/Console/OutputStyle.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Serializer\Encoder\Encoder;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Symfony\Component\Console\Style\SymfonyStyle;
use Throwable;
Expand Down Expand Up @@ -49,13 +50,38 @@ public function message(

$customHeaders = array_filter(
$message->headers(),
static fn ($header) => !$header instanceof StreamHeader
static fn ($header) => !$header instanceof StreamNameHeader
&& !$header instanceof PlayheadHeader
&& !$header instanceof RecordedOnHeader
&& !$header instanceof AggregateHeader
&& !$header instanceof ArchivedHeader
&& !$header instanceof StreamStartHeader,
);

$metaHeader = $this->metaHeader($message);
$streamName = null;
$playhead = null;
$recordedOn = null;

if ($message->hasHeader(AggregateHeader::class)) {

Check warning on line 65 in src/Console/OutputStyle.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "IfNegation": --- Original +++ New @@ @@ $streamName = null; $playhead = null; $recordedOn = null; - if ($message->hasHeader(AggregateHeader::class)) { + if (!$message->hasHeader(AggregateHeader::class)) { $header = $message->header(AggregateHeader::class); $streamName = $header->streamName(); $playhead = $header->playhead;
$header = $message->header(AggregateHeader::class);

$streamName = $header->streamName();
$playhead = $header->playhead;
$recordedOn = $header->recordedOn;
}

if ($message->hasHeader(StreamNameHeader::class)) {
$streamName = $message->header(StreamNameHeader::class)->streamName;
}

if ($message->hasHeader(PlayheadHeader::class)) {
$playhead = $message->header(PlayheadHeader::class)->playhead;
}

if ($message->hasHeader(RecordedOnHeader::class)) {
$recordedOn = $message->header(RecordedOnHeader::class)->recordedOn;
}

$streamStart = $message->hasHeader(StreamStartHeader::class);
$achieved = $message->hasHeader(ArchivedHeader::class);

Expand All @@ -70,9 +96,9 @@ public function message(
],
[

Check warning on line 97 in src/Console/OutputStyle.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "ArrayItemRemoval": --- Original +++ New @@ @@ $streamStart = $message->hasHeader(StreamStartHeader::class); $achieved = $message->hasHeader(ArchivedHeader::class); $this->title($data->name); - $this->horizontalTable(['stream', 'playhead', 'recordedOn', 'streamStart', 'archived'], [[$streamName, $playhead, $recordedOn?->format('Y-m-d H:i:s'), $streamStart ? 'yes' : 'no', $achieved ? 'yes' : 'no']]); + $this->horizontalTable(['stream', 'playhead', 'recordedOn', 'streamStart', 'archived'], []); if ($customHeaders !== []) { $this->block($headersSerializer->serialize(array_values($customHeaders))); }
[

Check warning on line 98 in src/Console/OutputStyle.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "ArrayItemRemoval": --- Original +++ New @@ @@ $streamStart = $message->hasHeader(StreamStartHeader::class); $achieved = $message->hasHeader(ArchivedHeader::class); $this->title($data->name); - $this->horizontalTable(['stream', 'playhead', 'recordedOn', 'streamStart', 'archived'], [[$streamName, $playhead, $recordedOn?->format('Y-m-d H:i:s'), $streamStart ? 'yes' : 'no', $achieved ? 'yes' : 'no']]); + $this->horizontalTable(['stream', 'playhead', 'recordedOn', 'streamStart', 'archived'], [[$playhead, $recordedOn?->format('Y-m-d H:i:s'), $streamStart ? 'yes' : 'no', $achieved ? 'yes' : 'no']]); if ($customHeaders !== []) { $this->block($headersSerializer->serialize(array_values($customHeaders))); }
$metaHeader instanceof AggregateHeader ? $metaHeader->streamName() : $metaHeader->streamName,
$metaHeader->playhead,
$metaHeader->recordedOn?->format('Y-m-d H:i:s'),
$streamName,
$playhead,
$recordedOn?->format('Y-m-d H:i:s'),
$streamStart ? 'yes' : 'no',
$achieved ? 'yes' : 'no',
],
Expand All @@ -98,13 +124,4 @@ public function throwable(Throwable $error): void
$error = $error->getPrevious();
} while ($error !== null);
}

private function metaHeader(Message $message): AggregateHeader|StreamHeader
{
try {
return $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
return $message->header(StreamHeader::class);
}
}
}
12 changes: 6 additions & 6 deletions src/Message/Translator/AggregateToStreamHeaderTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;

/** @experimental */
final class AggregateToStreamHeaderTranslator implements Translator
Expand All @@ -23,11 +25,9 @@ public function __invoke(Message $message): array
return [
$message
->removeHeader(AggregateHeader::class)
->withHeader(new StreamHeader(
$aggregateHeader->streamName(),
$aggregateHeader->playhead,
$aggregateHeader->recordedOn,
)),
->withHeader(new StreamNameHeader($aggregateHeader->streamName()))
->withHeader(new PlayheadHeader($aggregateHeader->playhead))
->withHeader(new RecordedOnHeader($aggregateHeader->recordedOn)),
];
}
}
44 changes: 16 additions & 28 deletions src/Message/Translator/RecalculatePlayheadTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;

use function array_key_exists;

Expand All @@ -19,42 +19,30 @@ final class RecalculatePlayheadTranslator implements Translator
/** @return list<Message> */
public function __invoke(Message $message): array
{
try {
$header = $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
try {
$header = $message->header(StreamHeader::class);
} catch (HeaderNotFound) {
return [$message];
}
}
if ($message->hasHeader(StreamNameHeader::class) && $message->hasHeader(PlayheadHeader::class)) {

Check warning on line 22 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "LogicalAnd": --- Original +++ New @@ @@ /** @return list<Message> */ public function __invoke(Message $message) : array { - if ($message->hasHeader(StreamNameHeader::class) && $message->hasHeader(PlayheadHeader::class)) { + if ($message->hasHeader(StreamNameHeader::class) || $message->hasHeader(PlayheadHeader::class)) { $streamName = $message->header(StreamNameHeader::class)->streamName; $playhead = $this->nextPlayhead($streamName); return [$message->withHeader(new PlayheadHeader($playhead))];

Check warning on line 22 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "LogicalAndSingleSubExprNegation": --- Original +++ New @@ @@ /** @return list<Message> */ public function __invoke(Message $message) : array { - if ($message->hasHeader(StreamNameHeader::class) && $message->hasHeader(PlayheadHeader::class)) { + if (!$message->hasHeader(StreamNameHeader::class) && $message->hasHeader(PlayheadHeader::class)) { $streamName = $message->header(StreamNameHeader::class)->streamName; $playhead = $this->nextPlayhead($streamName); return [$message->withHeader(new PlayheadHeader($playhead))];

Check warning on line 22 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "LogicalAndSingleSubExprNegation": --- Original +++ New @@ @@ /** @return list<Message> */ public function __invoke(Message $message) : array { - if ($message->hasHeader(StreamNameHeader::class) && $message->hasHeader(PlayheadHeader::class)) { + if ($message->hasHeader(StreamNameHeader::class) && !$message->hasHeader(PlayheadHeader::class)) { $streamName = $message->header(StreamNameHeader::class)->streamName; $playhead = $this->nextPlayhead($streamName); return [$message->withHeader(new PlayheadHeader($playhead))];
$streamName = $message->header(StreamNameHeader::class)->streamName;

$stream = $header instanceof StreamHeader ? $header->streamName : $header->streamName();
$playhead = $this->nextPlayhead($streamName);

$playhead = $this->nextPlayhead($stream);

if ($header->playhead === $playhead) {
return [$message];
return [
$message->withHeader(new PlayheadHeader($playhead)),
];
}

if ($header instanceof StreamHeader) {
if ($message->hasHeader(AggregateHeader::class)) {
$header = $message->header(AggregateHeader::class);

return [
$message->withHeader(new StreamHeader(
$header->streamName,
$playhead,
$message->withHeader(new AggregateHeader(
$header->aggregateName,
$header->aggregateId,
$this->nextPlayhead($header->streamName()),
$header->recordedOn,
)),
];
}

return [
$message->withHeader(new AggregateHeader(
$header->aggregateName,
$header->aggregateId,
$playhead,
$header->recordedOn,
)),
];
return [$message];
}

public function reset(): void
Expand Down
25 changes: 14 additions & 11 deletions src/Message/Translator/UntilEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;

final class UntilEventTranslator implements Translator
{
Expand All @@ -20,22 +19,26 @@ public function __construct(
/** @return list<Message> */
public function __invoke(Message $message): array
{
try {
if ($message->hasHeader(AggregateHeader::class)) {
$header = $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
try {
$header = $message->header(StreamHeader::class);
} catch (HeaderNotFound) {

if ($header->recordedOn < $this->until) {

Check warning on line 25 in src/Message/Translator/UntilEventTranslator.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "LessThan": --- Original +++ New @@ @@ { if ($message->hasHeader(AggregateHeader::class)) { $header = $message->header(AggregateHeader::class); - if ($header->recordedOn < $this->until) { + if ($header->recordedOn <= $this->until) { return [$message]; } return [];
return [$message];
}

return [];
}

$recordedOn = $header->recordedOn;
if ($message->hasHeader(RecordedOnHeader::class)) {
$header = $message->header(RecordedOnHeader::class);

if ($header->recordedOn < $this->until) {
return [$message];
}

if ($recordedOn < $this->until) {
return [$message];
return [];
}

return [];
return [$message];
}
}
8 changes: 6 additions & 2 deletions src/Metadata/Message/MessageHeaderRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;

use function array_flip;
Expand Down Expand Up @@ -73,7 +75,9 @@ public function headerNames(): array
public static function createWithInternalHeaders(array $headerNameToClassMap = []): self
{
$internalHeaders = [
'stream' => StreamHeader::class,
'streamName' => StreamNameHeader::class,
'playhead' => PlayheadHeader::class,
'recordedOn' => RecordedOnHeader::class,
'aggregate' => AggregateHeader::class,
'archived' => ArchivedHeader::class,
'newStreamStart' => StreamStartHeader::class,
Expand Down
48 changes: 27 additions & 21 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid;
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\StreamStore;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
Expand Down Expand Up @@ -142,11 +144,7 @@ public function load(AggregateRootId $id): AggregateRoot
}

if ($this->useStreamHeader) {
$playhead = $firstMessage->header(StreamHeader::class)->playhead;

if ($playhead === null) {
throw new AggregateNotFound($this->metadata->className, $id);
}
$playhead = $firstMessage->header(PlayheadHeader::class)->playhead;
} else {
$playhead = $firstMessage->header(AggregateHeader::class)->playhead;
}
Expand Down Expand Up @@ -245,27 +243,35 @@ public function save(AggregateRoot $aggregate): void

$aggregateName = $this->metadata->name;

$useStreamHeader = $this->useStreamHeader;
$streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($aggregateName, $aggregateId) : null;

$messages = array_map(
static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $useStreamHeader) {
if ($useStreamHeader) {
$header = new StreamHeader(
StreamNameTranslator::streamName($aggregateName, $aggregateId),
++$playhead,
$clock->now(),
);
static function (object $event) use (
$aggregateName,
$aggregateId,
&$playhead,
$messageDecorator,
$clock,
$streamName,
) {
$message = Message::create($event);

if ($streamName !== null) {
$message = $message
->withHeader(new StreamNameHeader($streamName))
->withHeader(new PlayheadHeader(++$playhead))
->withHeader(new RecordedOnHeader($clock->now()));
} else {
$header = new AggregateHeader(
$aggregateName,
$aggregateId,
++$playhead,
$clock->now(),
$message = $message->withHeader(
new AggregateHeader(
$aggregateName,
$aggregateId,
++$playhead,
$clock->now(),
),
);
}

$message = Message::create($event)->withHeader($header);

if ($messageDecorator) {
return $messageDecorator($message);
}
Expand Down
18 changes: 18 additions & 0 deletions src/Store/Header/PlayheadHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

/**
* @psalm-immutable
* @experimental
*/
final class PlayheadHeader
{
/** @param positive-int $playhead */
public function __construct(
public readonly int $playhead,
) {
}
}
19 changes: 19 additions & 0 deletions src/Store/Header/RecordedOnHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

use DateTimeImmutable;

/**
* @psalm-immutable
* @experimental
*/
final class RecordedOnHeader
{
public function __construct(
public readonly DateTimeImmutable $recordedOn,
) {
}
}
17 changes: 17 additions & 0 deletions src/Store/Header/StreamNameHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

/**
* @psalm-immutable
* @experimental
*/
final class StreamNameHeader
{
public function __construct(
public readonly string $streamName,
) {
}
}
Loading

0 comments on commit 1a995c2

Please sign in to comment.