Contains RabbitMQ-based transport for establishing a net of loosely coupled microservices with a simple RPC-style calling interface using Node.js
npm i @microfleet/transport-amqp -S
yarn add @microfleet/transport-amqp
Heavily relies on dropbox/amqp-coffee
& @microfleet/amqp-coffee
lib for establishing communication with RabbitMQ using AMQP protocol
Using AMQP is often not as easy as it sounds - managing connection to your cluster, lifecycle of the queues, bindings & exchanges, as well guaranteed delivery of messages - that all can be a burden. This module has been used in production for over 5 years now and had many iterations to fine-tune reconnection strategies and message delivery issues
Essentially it gives you an opinionated subset of AMQP spec to create guaranteed completion worker queues or an RPC service.
Please consult with configuration schemas where every possible setting is annotated and described
There are several strategies that are supported with this module out-of-the-box:
- 1 consumption queue for all routes (internally - 1 channel that consumes a queue bound to all routing keys)
- 1 RPC queue (internally uses 1 channel for publishing and 1 channel for consumption of responses)
- 1 consumption queue per each route
And mix of each of these strategies.
Each message that is delivered through AMQP to consumer will be received using the following routing function. Messages must be encoded in JSON, which is transparently handled by this library and follow idiomatic { err, message } structure for Node.js
const router = (message, properties, actions, next) => {
// message - anything that was sent to the consumer
// if sent using this library - you can send any type of data supported by Node.js
// except for streams
// properties - this is basic message content
// more info can be found here - https://www.rabbitmq.com/amqp-0-9-1-reference.html#class.basic
// those of value are:
// `properties.headers`, `properties.correlationId`, `properties.replyTo`
// actions - if `neck` (prefetchCount >= 0, noAck: false) is defined, it would have
// - .ack()
// - .reject()
// - .retry()
// next - standard callback with (err, response), if no `replyTo` is set response will only be logged into
// console. This would be the case when someone published a message and they don't care about the response
// typically that would happen when the task is considered long-running and we can't reliably respond fast
// enough for the publisher
};
const AMQPTransport = require('@microfleet/transport-amqp');
const options = {
// will create queue with the following name:
queue: 'permanent-queue-name',
// will bind that queue on the exchange
listen: ['routing.key', 'prefix.#', '*.log'],
// will create exchange with that name
exchange: 'node-services',
};
AMQPTransport.connect(options, router).then((amqp) => {
// at this point we've connected
// created a consumed queue and router is called when messages are delivered to it
// amqp is a connected instance of an AMQPTransport
});
const AMQPTransport = require('@microfleet/transport-amqp');
const options = {
// will create queue with the following name:
queue: 'permanent-queue-name',
// will bind that queue on the exchange
listen: ['routing.key', 'prefix.#', '*.log'],
// will create exchange with that name
exchange: 'node-services',
};
// this is an Array of queue options for each queue
// you can overwrite queue names here
const queueOpts = [{
// extra settings for queue `permanent-queue-name-routing.key`
arguments: {
'x-dead-letter-exchange': 'something'
}
}, {
// overwrite name of the queue for the second route
queue: 'awesome-queue',
}];
AMQPTransport.multiConnect(options, router, queueOpts).then((amqp) => {
// at this point we've connected
// created several consumed queues with names:
// * `permanent-queue-name-routing.key` bound to `routing.key`
// * `awesome-queue` bound to `prefix.#`
// * `permanent-queue-name-..log` bound to `*.log`
// amqp is a connected instance of an AMQPTransport
});
Once you have a long-running microservice handling messages - you can interact with it using same adapter
const AMQPTransport = require('@microfleet/transport-amqp');
const opts = { private: true }; // establish private queue right after connecting
// no router passed - means we are not creating consumed queue right away
// we may still do it later, but for now it's only good to do RPC calls
AMQPTransport.connect(opts).then((amqp) => {
// we've connected to RabbitMQ server and are ready to send messages
// send messages using routing keys:
// return Promise, which resolves
// based on publishOptions
// * confirm - waits for commit from AMQP server before resolving
// * immediate - waits for the message to be delivered, if it can't be - rejects
// * other options - read more in the schema.js file linked earlier
amqp.publish(routingKey, message, publishOptions, [parentSpan])
.then(() => {
// sent
})
.catch((err) => {
// failed to send
})
// same as publish, but sets correlation-id and reply-to properties
// on the message, allowing consumer to response
// resolves with
amqp.publishAndWait(routingKey, message, publishOptions, [parentSpan])
.then((response) => {
// do whatever you want
})
.catch((err) => {
// either failed to send or response contained an error - work with it here
})
// Other option is to work not with the routing keys, but with queues directly
// for that there are 2 similar methods
// apart from `routingKey` and `queueName` - everything else works the same way
amqp.send(queueName, message, publishOptions, [parentSpan])
amqp.sendAndWait(queueName, message, publishOptions, [parentSpan])
});
If the graceful shutdown of your service is needed, to stop receiving incoming messages but continue processing, call closeAllConsumers()
.
This method closes all consumers but leaves the transport connection active. You can process all incoming messages and securely close connections.
AMQPTransport.connect(options, router).then((amqp) => {
service.on('close', async () => {
await amqp.closeAllConsumers();
// do everything you need
// ..
await amqp.close();
})
});