This extension provides a higher level API for managing routes and sending/receiving messages. It is based on the pure PHP implementation of the "Advanced Message Queuing Protocol" (AMQP): "videlalvaro/php-amqplib" (https://github.com/videlalvaro/php-amqplib/). As a rolemodel the spring-amqp project (http://www.springsource.org/spring-amqp) has to be mentioned.
Use the ExtensionManager to install the extension and configure a basic connection.
To override the default connection config simply modify the global connection-configuration array (eg. by placing it in the typo3conf/localconf.php or typo3conf/AdditionalConfiguration.php):
$GLOBALS['TYPO3_CONF_VARS']['EXT']['extConf']['amqp/connectionFactory'] = array(
'className' => 'Tx_Amqp_Messaging_AMQPConnectionFactory',
'options' => array(
'username' => 'guest',
'password' => 'guest',
'host' => 'localhost',
'vhost' => '/',
),
);
The provided class (className
) must implement Tx_Amqp_Messaging_AMQPConnectionFactoryInterface
.
The options-array (options
) is passed via setOptions() method to the concrete Factory implementation (see Tx_Amqp_Messaging_AMQPConnectionFactory
).
To test the basic configuration use the Reports module.
For a deeper understanding of the AMQP, please refer to the specification (https://www.rabbitmq.com/specification.html).
The following sections cover the basic usage of the API.
Following core components exist:
- ConnectionFactory: for creating the configured connections
- AMQPAdmin: for managing queues, exchanges and bindings
- AMQPService: for sending and receiving messages
- AMQPMessageListener: for asynchronous message consumption
Here's how to instantiate them:
// get the configured connection factory
$connectionFactory = Tx_Amqp_Util_ConfigurationHelper::getConnectionFactory();
// create an AMQP Service instance (for sending and receiving messages)
$amqpService = new Tx_Amqp_Messaging_AMQPService();
// create an AMQP Admin instance (for managing queues, exchanges and bindings)
$admin = new Tx_Amqp_Messaging_AMQPAdmin($amqpService);
// create an asynchronous message listener
$listener = new Tx_Amqp_Messaging_AMQPMessageListener($connectionFactory);
The administration service component is for managing queues, exchanges and bindings.
A queue can be declared as follows:
// create a durable, non-exclusive autodelete Queue named 'my_queue'
$queue = new Tx_Amqp_Messaging_Queue('my_queue', TRUE, FALSE, TRUE);
$admin->declareQueue($queue);
..or even with a more fluent API:
// you can even declare a queue with a more fluent API
$queue = Tx_Amqp_Messaging_Queue::create('my_queue')
->setDurable()
->setAutoDelete()
->setExclusive(FALSE);
$admin->declareQueue($queue);
A queue can be emptied and/or deleted:
// purge the queue
$admin->purgeQueue('my_queue');
// delete the queue
$admin->deleteQueue('my_queue');
A temporary non-exclusive, durable autoDelete queue can be declared by omiting the queue-description parameter. The queue-description of the server-generated queue is then returned.
// to declare a temporary queue (generated by the server) simply omit the $queue parameter
$queue = $admin->declareQueue();
$admin->deleteQueue($queue->getName());
For declaring exchanges the following exchange-types can be used:
-
Tx_Amqp_Messaging_HeadersExchange
-
Tx_Amqp_Messaging_DirectExchange
-
Tx_Amqp_Messaging_FanoutExchange
-
Tx_Amqp_Messaging_TopicExchange
-
Tx_Amqp_Messaging_CustomExchange
// declare a durable, autodelete direct exchange $admin->declareExchange(new Tx_Amqp_Messaging_DirectExchange('my_direct_exchange', TRUE, TRUE));
// delete an exchange $admin->deleteExchange('my_direct_exchange');
Bindings define the routing between exchanges and queues (or exchanges) by connecting them.
Assume we've already declared a queue 'my_queue' and an exchange named 'my_direct_exchange' - here's how we setup a binding using a routingKey 'my_routing_key':
$binding = new Tx_Amqp_Messaging_Binding('my_queue', Tx_Amqp_Messaging_Binding::DESTINATION_QUEUE, 'my_direct_exchange', 'my_routing_key');
$admin->declareBinding($binding);
To delete a binding the same binding-description needs to be passed along:
$admin->deleteBinding($binding);
Since it is not possible to send messages to non existing queues or exchanges we assume that the following exchanges/queues already exist.
Since every declared queue is automatically bound to the AMQP default exchange '' with a routingKey that matches the name of the queue we are able to send a message to a queue:
// send a message to the queue 'my_queue' by using the default exchange '' and the routingKey 'my_queue'
$amqpService->send(new \PhpAmqpLib\Message\AMQPMessage('a test message'), '', 'my_queue');
The receive works in a synchronous way by polling the given queue. If no message is found NULL is returned:
// poll (and acknowledge) a message from a queue named 'my_queue'
$message = $amqpService->receive('my_queue');
// returns the next message or NULL if no message was found
// send and receive (RPC exchange)
try {
// set timeout to 3 seconds
$amqpService->setReplyTimeoutInMs(3000);
$result = $amqpService->sendAndReceive(new \PhpAmqpLib\Message\AMQPMessage('test message'), '', 'my_queue');
// ...
} catch(PhpAmqpLib\Exception\AMQPTimeoutException $e) {
// is thrown if the remote service did not respond in time
}
$listener = new Tx_Amqp_Messaging_AMQPMessageListener($connectionFactory);
$listener->listen('my_queue', function(\PhpAmqpLib\Message\AMQPMessage $message) {
// process message
echo $message->body;
// skip listening
Tx_Amqp_Messaging_AMQPUtils::cancelListening($message);
});