Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC replace child aggregates with micro aggregates #658

Draft
wants to merge 1 commit into
base: 3.6.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/Aggregate/StreamNameTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Patchlevel\EventSourcing\Aggregate;

use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;

use function str_replace;
use function strpos;
use function substr;

Expand All @@ -15,9 +18,13 @@
}

/** @pure */
public static function streamName(string $aggregate, string $aggregateId): string
public static function streamName(string|AggregateRootMetadata $aggregate, string $aggregateId): string
{
if ($aggregate instanceof AggregateRootMetadata && $aggregate->streamName !== null) {

Check failure on line 23 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ImpurePropertyFetch

src/Aggregate/StreamNameTranslator.php:23:60: ImpurePropertyFetch: Cannot access a property on a mutable object from a pure context (see https://psalm.dev/234)
return str_replace('{id}', $aggregateId, $aggregate->streamName);

Check failure on line 24 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ImpurePropertyFetch

src/Aggregate/StreamNameTranslator.php:24:54: ImpurePropertyFetch: Cannot access a property on a mutable object from a pure context (see https://psalm.dev/234)
}

return $aggregate . '-' . $aggregateId;

Check failure on line 27 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Binary operation "." between Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata|string and '-' results in an error.

Check failure on line 27 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

PossiblyInvalidOperand

src/Aggregate/StreamNameTranslator.php:27:16: PossiblyInvalidOperand: Cannot concatenate with a Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata<Patchlevel\EventSourcing\Aggregate\AggregateRoot>|string (see https://psalm.dev/163)
}

public static function aggregateName(string $stream): string
Expand Down
18 changes: 18 additions & 0 deletions src/Attribute/Stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

#[Attribute(Attribute::TARGET_CLASS)]
final class Stream
{
/** @param string|class-string<AggregateRoot> $name */

Check failure on line 13 in src/Attribute/Stream.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Deptrac (locked, 8.3, ubuntu-latest)

Patchlevel\EventSourcing\Attribute\Stream must not depend on Patchlevel\EventSourcing\Aggregate\AggregateRoot (Attribute on Aggregate)
public function __construct(
public readonly string $name,
) {
}
}
3 changes: 2 additions & 1 deletion src/Metadata/AggregateRoot/AggregateRootMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ public function __construct(
/** @var array<class-string, true> */
public readonly array $suppressEvents,
public readonly bool $suppressAll,
public readonly Snapshot|null $snapshot,
public readonly Snapshot|null $snapshot = null,
/** @var list<string> */
public readonly array $childAggregates = [],
public readonly string|null $streamName = null,
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Patchlevel\EventSourcing\Attribute\ChildAggregate;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\Snapshot as AttributeSnapshot;
use Patchlevel\EventSourcing\Attribute\Stream;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
use ReflectionClass;
use ReflectionIntersectionType;
Expand Down Expand Up @@ -50,6 +51,7 @@ public function metadata(string $aggregate): AggregateRootMetadata
[$suppressEvents, $suppressAll] = $this->findSuppressMissingApply($reflectionClass);
$applyMethods = $this->findApplyMethods($reflectionClass, $aggregate, $childAggregates);
$snapshot = $this->findSnapshot($reflectionClass);
$streamName = $this->findStreamName($reflectionClass);

$metadata = new AggregateRootMetadata(
$aggregate,
Expand All @@ -60,6 +62,7 @@ public function metadata(string $aggregate): AggregateRootMetadata
$suppressAll,
$snapshot,
array_map(static fn (array $list) => $list[0], $childAggregates),
$streamName ?? $aggregateName . '-{id}',
);

$this->aggregateMetadata[$aggregate] = $metadata;
Expand Down Expand Up @@ -139,6 +142,23 @@ private function findSnapshot(ReflectionClass $reflector): Snapshot|null
);
}

private function findStreamName(ReflectionClass $reflector): string|null
{
$attributes = $reflector->getAttributes(Stream::class);

if ($attributes === []) {
return null;
}

$streamName = $attributes[0]->newInstance()->name;

if (class_exists($streamName) && is_a($streamName, AggregateRoot::class, true)) {
return $this->metadata($streamName)->streamName;
}

return $attributes[0]->newInstance()->name;
}

/** @return list<array{string, ReflectionClass}> */
private function findChildAggregates(ReflectionClass $reflector): array
{
Expand Down
19 changes: 11 additions & 8 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public function load(AggregateRootId $id): AggregateRoot

if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString()))
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->archived(false)
->build();
} else {
Expand Down Expand Up @@ -180,7 +180,7 @@ public function has(AggregateRootId $id): bool
{
if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString()))
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->build();
} else {
$criteria = (new CriteriaBuilder())
Expand Down Expand Up @@ -243,15 +243,18 @@ public function save(AggregateRoot $aggregate): void
$messageDecorator = $this->messageDecorator;
$clock = $this->clock;

$aggregateName = $this->metadata->name;
$metadata = $this->metadata;
$aggregateName = $metadata->name;

$useStreamHeader = $this->useStreamHeader;
$streamName = $this->useStreamHeader
? StreamNameTranslator::streamName($metadata, $aggregateId)
: null;

$messages = array_map(
static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $useStreamHeader) {
if ($useStreamHeader) {
static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $streamName) {
if ($streamName !== null) {
$header = new StreamHeader(
StreamNameTranslator::streamName($aggregateName, $aggregateId),
$streamName,
++$playhead,
$clock->now(),
);
Expand Down Expand Up @@ -332,7 +335,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id):

if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString()))
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->fromPlayhead($aggregate->playhead())
->build();
} else {
Expand Down
53 changes: 32 additions & 21 deletions tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter;
use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager;
Expand Down Expand Up @@ -39,7 +39,7 @@ public function tearDown(): void

public function testSuccessful(): void
{
$store = new DoctrineDbalStore(
$store = new StreamDoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
);
Expand All @@ -54,15 +54,19 @@ public function testSuccessful(): void

$manager = new RunSubscriptionEngineRepositoryManager(
new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
new AggregateRootRegistry([
'profile' => Profile::class,
'personal_information' => PersonalInformation::class,
]),
$store,
null,
null,
),
$engine,
);

$repository = $manager->get(Profile::class);
$profileRepository = $manager->get(Profile::class);
$personalInformationRepository = $manager->get(PersonalInformation::class);

$schemaDirector = new DoctrineSchemaDirector(
$this->connection,
Expand All @@ -74,8 +78,11 @@ public function testSuccessful(): void

$profileId = ProfileId::generate();
$profile = Profile::create($profileId, 'John');
$profile->changeName('Snow');
$repository->save($profile);
$profileRepository->save($profile);

$personalInformation = $personalInformationRepository->load($profileId);
$personalInformation->changeName('Snow');
$personalInformationRepository->save($personalInformation);

$result = $this->connection->fetchAssociative(
'SELECT * FROM projection_profile WHERE id = ?',
Expand All @@ -87,18 +94,18 @@ public function testSuccessful(): void
self::assertSame($profileId->toString(), $result['id']);
self::assertSame('Snow', $result['name']);

$repository = $manager->get(Profile::class);
$profile = $repository->load($profileId);
$profile = $profileRepository->load($profileId);
$personalInformation = $personalInformationRepository->load($profileId);

self::assertInstanceOf(Profile::class, $profile);
self::assertEquals($profileId, $profile->aggregateRootId());
self::assertSame(2, $profile->playhead());
self::assertSame('Snow', $profile->name());
self::assertSame('Snow', $personalInformation->name());
}

public function testSnapshot(): void
{
$store = new DoctrineDbalStore(
$store = new StreamDoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
);
Expand All @@ -113,15 +120,19 @@ public function testSnapshot(): void

$manager = new RunSubscriptionEngineRepositoryManager(
new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
new AggregateRootRegistry([
'profile' => Profile::class,
'personal_information' => PersonalInformation::class,
]),
$store,
null,
new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]),
),
$engine,
);

$repository = $manager->get(Profile::class);
$profileRepository = $manager->get(Profile::class);
$personalInformationRepository = $manager->get(PersonalInformation::class);

$schemaDirector = new DoctrineSchemaDirector(
$this->connection,
Expand All @@ -133,7 +144,7 @@ public function testSnapshot(): void

$profileId = ProfileId::generate();
$profile = Profile::create($profileId, 'John');
$repository->save($profile);
$profileRepository->save($profile);

$result = $this->connection->fetchAssociative(
'SELECT * FROM projection_profile WHERE id = ?',
Expand All @@ -145,22 +156,22 @@ public function testSnapshot(): void
self::assertSame($profileId->toString(), $result['id']);
self::assertSame('John', $result['name']);

$repository = $manager->get(Profile::class);

// create snapshot
$repository->load($profileId);
$profileRepository->load($profileId);
$personalInformationRepository->load($profileId);

// load from snapshot
$profile = $repository->load($profileId);
$personalInformation = $personalInformationRepository->load($profileId);

$profile->changeName('Snow');
$repository->save($profile);
$personalInformation->changeName('Snow');
$personalInformationRepository->save($personalInformation);

$profile = $repository->load($profileId);
$profile = $profileRepository->load($profileId);
$personalInformation = $personalInformationRepository->load($profileId);

self::assertInstanceOf(Profile::class, $profile);
self::assertEquals($profileId, $profile->aggregateRootId());
self::assertSame(2, $profile->playhead());
self::assertSame('Snow', $profile->name());
self::assertSame('Snow', $personalInformation->name());
}
}
25 changes: 19 additions & 6 deletions tests/Integration/ChildAggregate/PersonalInformation.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,32 @@

namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate;

use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate;
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\Stream;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated;

final class PersonalInformation extends BasicChildAggregate
#[Aggregate('personal_information')]
#[Stream(Profile::class)]
final class PersonalInformation extends BasicAggregateRoot
{
public function __construct(
private string $name,
) {
#[Id]
private ProfileId $id;

private string $name;

#[Apply(ProfileCreated::class)]
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
}

#[Apply(NameChanged::class)]
public function applyNameChanged(NameChanged $event): void
protected function applyNameChanged(NameChanged $event): void
{
$this->name = $event->name;
}
Expand Down
18 changes: 3 additions & 15 deletions tests/Integration/ChildAggregate/Profile.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\ChildAggregate;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\Snapshot;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated;

#[Aggregate('profile')]
#[Snapshot('default', 1)]
#[SuppressMissingApply([NameChanged::class])]
final class Profile extends BasicAggregateRoot
{
#[Id]
private ProfileId $id;

#[ChildAggregate]
protected PersonalInformation $personalInformation;

public static function create(ProfileId $id, string $name): self
{
$self = new self();
Expand All @@ -34,16 +33,5 @@ public static function create(ProfileId $id, string $name): self
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->personalInformation = new PersonalInformation($event->name);
}

public function name(): string
{
return $this->personalInformation->name();
}

public function changeName(string $name): void
{
$this->personalInformation->changeName($name);
}
}
Loading