diff --git a/src/Aggregate/StreamNameTranslator.php b/src/Aggregate/StreamNameTranslator.php index 00c74bf9..2bae7641 100644 --- a/src/Aggregate/StreamNameTranslator.php +++ b/src/Aggregate/StreamNameTranslator.php @@ -4,6 +4,9 @@ namespace Patchlevel\EventSourcing\Aggregate; +use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata; + +use function str_replace; use function strpos; use function substr; @@ -15,8 +18,12 @@ private function __construct() } /** @pure */ - public static function streamName(string $aggregate, string $aggregateId): string + public static function streamName(string|AggregateRootMetadata $aggregate, string $aggregateId): string { + if ($aggregate instanceof AggregateRootMetadata && $aggregate->streamName !== null) { + return str_replace('{id}', $aggregateId, $aggregate->streamName); + } + return $aggregate . '-' . $aggregateId; } diff --git a/src/Attribute/Stream.php b/src/Attribute/Stream.php new file mode 100644 index 00000000..c6bf8bea --- /dev/null +++ b/src/Attribute/Stream.php @@ -0,0 +1,18 @@ + $name */ + public function __construct( + public readonly string $name, + ) { + } +} diff --git a/src/Metadata/AggregateRoot/AggregateRootMetadata.php b/src/Metadata/AggregateRoot/AggregateRootMetadata.php index ce51220d..4689c7c3 100644 --- a/src/Metadata/AggregateRoot/AggregateRootMetadata.php +++ b/src/Metadata/AggregateRoot/AggregateRootMetadata.php @@ -19,9 +19,10 @@ public function __construct( /** @var array */ public readonly array $suppressEvents, public readonly bool $suppressAll, - public readonly Snapshot|null $snapshot, + public readonly Snapshot|null $snapshot = null, /** @var list */ public readonly array $childAggregates = [], + public readonly string|null $streamName = null, ) { } } diff --git a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php index 4c62c3cc..0d3ab327 100644 --- a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php +++ b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php @@ -10,6 +10,7 @@ use Patchlevel\EventSourcing\Attribute\ChildAggregate; use Patchlevel\EventSourcing\Attribute\Id; use Patchlevel\EventSourcing\Attribute\Snapshot as AttributeSnapshot; +use Patchlevel\EventSourcing\Attribute\Stream; use Patchlevel\EventSourcing\Attribute\SuppressMissingApply; use ReflectionClass; use ReflectionIntersectionType; @@ -50,6 +51,7 @@ public function metadata(string $aggregate): AggregateRootMetadata [$suppressEvents, $suppressAll] = $this->findSuppressMissingApply($reflectionClass); $applyMethods = $this->findApplyMethods($reflectionClass, $aggregate, $childAggregates); $snapshot = $this->findSnapshot($reflectionClass); + $streamName = $this->findStreamName($reflectionClass); $metadata = new AggregateRootMetadata( $aggregate, @@ -60,6 +62,7 @@ public function metadata(string $aggregate): AggregateRootMetadata $suppressAll, $snapshot, array_map(static fn (array $list) => $list[0], $childAggregates), + $streamName ?? $aggregateName . '-{id}', ); $this->aggregateMetadata[$aggregate] = $metadata; @@ -139,6 +142,23 @@ private function findSnapshot(ReflectionClass $reflector): Snapshot|null ); } + private function findStreamName(ReflectionClass $reflector): string|null + { + $attributes = $reflector->getAttributes(Stream::class); + + if ($attributes === []) { + return null; + } + + $streamName = $attributes[0]->newInstance()->name; + + if (class_exists($streamName) && is_a($streamName, AggregateRoot::class, true)) { + return $this->metadata($streamName)->streamName; + } + + return $attributes[0]->newInstance()->name; + } + /** @return list */ private function findChildAggregates(ReflectionClass $reflector): array { diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index 22b77ac5..f0a18342 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -111,7 +111,7 @@ public function load(AggregateRootId $id): AggregateRoot if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString())) + ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) ->archived(false) ->build(); } else { @@ -180,7 +180,7 @@ public function has(AggregateRootId $id): bool { if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString())) + ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) ->build(); } else { $criteria = (new CriteriaBuilder()) @@ -243,15 +243,18 @@ public function save(AggregateRoot $aggregate): void $messageDecorator = $this->messageDecorator; $clock = $this->clock; - $aggregateName = $this->metadata->name; + $metadata = $this->metadata; + $aggregateName = $metadata->name; - $useStreamHeader = $this->useStreamHeader; + $streamName = $this->useStreamHeader + ? StreamNameTranslator::streamName($metadata, $aggregateId) + : null; $messages = array_map( - static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $useStreamHeader) { - if ($useStreamHeader) { + static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $streamName) { + if ($streamName !== null) { $header = new StreamHeader( - StreamNameTranslator::streamName($aggregateName, $aggregateId), + $streamName, ++$playhead, $clock->now(), ); @@ -332,7 +335,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id): if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString())) + ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) ->fromPlayhead($aggregate->playhead()) ->build(); } else { diff --git a/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php b/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php index a7f4c568..84e8824e 100644 --- a/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php +++ b/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php @@ -11,7 +11,7 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter; use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; -use Patchlevel\EventSourcing\Store\DoctrineDbalStore; +use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager; @@ -39,7 +39,7 @@ public function tearDown(): void public function testSuccessful(): void { - $store = new DoctrineDbalStore( + $store = new StreamDoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), ); @@ -54,7 +54,10 @@ public function testSuccessful(): void $manager = new RunSubscriptionEngineRepositoryManager( new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), $store, null, null, @@ -62,7 +65,8 @@ public function testSuccessful(): void $engine, ); - $repository = $manager->get(Profile::class); + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -74,8 +78,11 @@ public function testSuccessful(): void $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); - $profile->changeName('Snow'); - $repository->save($profile); + $profileRepository->save($profile); + + $personalInformation = $personalInformationRepository->load($profileId); + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', @@ -87,18 +94,18 @@ public function testSuccessful(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('Snow', $result['name']); - $repository = $manager->get(Profile::class); - $profile = $repository->load($profileId); + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); self::assertInstanceOf(Profile::class, $profile); self::assertEquals($profileId, $profile->aggregateRootId()); self::assertSame(2, $profile->playhead()); - self::assertSame('Snow', $profile->name()); + self::assertSame('Snow', $personalInformation->name()); } public function testSnapshot(): void { - $store = new DoctrineDbalStore( + $store = new StreamDoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), ); @@ -113,7 +120,10 @@ public function testSnapshot(): void $manager = new RunSubscriptionEngineRepositoryManager( new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), $store, null, new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), @@ -121,7 +131,8 @@ public function testSnapshot(): void $engine, ); - $repository = $manager->get(Profile::class); + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -133,7 +144,7 @@ public function testSnapshot(): void $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); - $repository->save($profile); + $profileRepository->save($profile); $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', @@ -145,22 +156,22 @@ public function testSnapshot(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('John', $result['name']); - $repository = $manager->get(Profile::class); - // create snapshot - $repository->load($profileId); + $profileRepository->load($profileId); + $personalInformationRepository->load($profileId); // load from snapshot - $profile = $repository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); - $profile->changeName('Snow'); - $repository->save($profile); + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); - $profile = $repository->load($profileId); + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); self::assertInstanceOf(Profile::class, $profile); self::assertEquals($profileId, $profile->aggregateRootId()); self::assertSame(2, $profile->playhead()); - self::assertSame('Snow', $profile->name()); + self::assertSame('Snow', $personalInformation->name()); } } diff --git a/tests/Integration/ChildAggregate/PersonalInformation.php b/tests/Integration/ChildAggregate/PersonalInformation.php index 89d61a31..d1e83c52 100644 --- a/tests/Integration/ChildAggregate/PersonalInformation.php +++ b/tests/Integration/ChildAggregate/PersonalInformation.php @@ -4,19 +4,32 @@ namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate; -use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate; +use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; +use Patchlevel\EventSourcing\Attribute\Aggregate; use Patchlevel\EventSourcing\Attribute\Apply; +use Patchlevel\EventSourcing\Attribute\Id; +use Patchlevel\EventSourcing\Attribute\Stream; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; +use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; -final class PersonalInformation extends BasicChildAggregate +#[Aggregate('personal_information')] +#[Stream(Profile::class)] +final class PersonalInformation extends BasicAggregateRoot { - public function __construct( - private string $name, - ) { + #[Id] + private ProfileId $id; + + private string $name; + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->name = $event->name; } #[Apply(NameChanged::class)] - public function applyNameChanged(NameChanged $event): void + protected function applyNameChanged(NameChanged $event): void { $this->name = $event->name; } diff --git a/tests/Integration/ChildAggregate/Profile.php b/tests/Integration/ChildAggregate/Profile.php index bd0b438c..dc88958c 100644 --- a/tests/Integration/ChildAggregate/Profile.php +++ b/tests/Integration/ChildAggregate/Profile.php @@ -7,21 +7,20 @@ use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; use Patchlevel\EventSourcing\Attribute\Aggregate; use Patchlevel\EventSourcing\Attribute\Apply; -use Patchlevel\EventSourcing\Attribute\ChildAggregate; use Patchlevel\EventSourcing\Attribute\Id; use Patchlevel\EventSourcing\Attribute\Snapshot; +use Patchlevel\EventSourcing\Attribute\SuppressMissingApply; +use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; #[Aggregate('profile')] #[Snapshot('default', 1)] +#[SuppressMissingApply([NameChanged::class])] final class Profile extends BasicAggregateRoot { #[Id] private ProfileId $id; - #[ChildAggregate] - protected PersonalInformation $personalInformation; - public static function create(ProfileId $id, string $name): self { $self = new self(); @@ -34,16 +33,5 @@ public static function create(ProfileId $id, string $name): self protected function applyProfileCreated(ProfileCreated $event): void { $this->id = $event->profileId; - $this->personalInformation = new PersonalInformation($event->name); - } - - public function name(): string - { - return $this->personalInformation->name(); - } - - public function changeName(string $name): void - { - $this->personalInformation->changeName($name); } }