Async decorator for CQRS command bus
composer require phpgears/cqrs-async
Require composer autoload file
require './vendor/autoload.php';
Command bus decorator to handle commands asynchronously
use Gears\CQRS\Async\AsyncCommandBus;
use Gears\CQRS\Async\Serializer\JsonCommandSerializer;
use Gears\CQRS\Async\Discriminator\ParameterCommandDiscriminator;
/* @var \Gears\CQRS\CommandBus $commandBus */
/* @var Gears\CQRS\Async\CommandQueue $commandQueue */
$commandQueue = new CustomCommandQueue(new JsonCommandSerializer());
$asyncCommandBus new AsyncCommandBus(
$commandBus,
$commandQueue,
new ParameterCommandDiscriminator('async')
);
$asyncCommand = new CustomCommand(['async' => true]);
$asyncCommandBus->handle($asyncCommand);
This part is highly dependent on your message queue, though command serializers can be used to deserialize queue message
This is just an example of the process
use Gears\CQRS\Async\ReceivedCommand;
use Gears\CQRS\Async\Serializer\JsonCommandSerializer;
/* @var \Gears\CQRS\Async\AsyncCommandBus $asyncCommandBus */
/* @var your_message_queue_manager $queue */
$serializer = new JsonCommandSerializer();
while (true) {
$message = $queue->getMessage();
if ($message !== null) {
$command = new ReceivedCommand($serializer->fromSerialized($message));
$asyncCommandBus->handle($command);
}
}
Deserialized commands should be wrapped in Gears\CQRS\Async\ReceivedCommand in order to avoid infinite loops should you decide to handle the commands to an async command bus. If you decide to use a non-async bus on the dequeue side you don't need to do this
Discriminates whether a command should or should not be enqueued based on arbitrary conditions
Three discriminators are provided in this package
Gears\CQRS\Async\Discriminator\ArrayCommandDiscriminator
selects commands if they are present in the array providedGears\CQRS\Async\Discriminator\ClassCommandDiscriminator
selects commands by their class or interfaceGears\CQRS\Async\Discriminator\ParameterCommandDiscriminator
selects commands by the presence of a command payload parameter (optionally by its value as well)
This is the one responsible for actual async handling, which would normally be sending the serialized command to a message queue system such as RabbitMQ
No implementation is provided but an abstract base class so you can extend from it
use Gears\CQRS\Async\AbstractCommandQueue;
class CustomCommandQueue extends AbstractCommandQueue
{
public function send(Command $command): void
{
// Do the actual enqueue of $this->getSerializedCommand($command);
}
}
You can use cqrs-async-queue-interop that uses queue-interop for enqueuing messages
Abstract command queue uses serializers to do command serialization so it can be sent to the message queue as a string message
Gears\CQRS\Async\Serializer\JsonCommandSerializer
is directly provided as a general serializer allowing maximum compatibility in case of commands being handled by other systems
You can create your own serializer if the one provided does not fit your needs, for example by using JMS serializer, by implementing Gears\CQRS\Async\Serializer\CommandSerializer
interface
On distributed systems, such as micro-service systems, commands can be dequeued on a completely different part of the system, this part should of course know about commands and their contents but could eventually not have access to the command class itself
For example in the context of Domain Events on DDD a bounded context could handle command delivered by another completely different bounded context and of course won't be able to deserialize the original command as it is located on another domain
This can be solved in one of two ways, transform messages coming out from the message queue before handing them to the command serializer, or better by creating a custom Gears\CQRS\Async\Serializer\CommandSerializer
encapsulating this transformation
Transformation can be as simple as changing command class to be reconstituted
Found a bug or have a feature request? Please open a new issue. Have a look at existing issues before.
See file CONTRIBUTING.md
See file LICENSE included with the source code for a copy of the license terms.