Skip to content

Latest commit

 

History

History
1940 lines (1309 loc) · 68.6 KB

API.md

File metadata and controls

1940 lines (1309 loc) · 68.6 KB

4.x API Reference

BunnyBus

BunnyBus is a class that instantiates into a singleton. It hosts all features for communicating with RabbitMQ to provide an easy to use enterprise bus facade.

Note About Versioning

A note regarding versioning: BunnyBus attaches the version value found in its package.json file to all messages that are sent. Optionally through the validateVersion flag, any messages that are picked up from a subscribed queue which do not match the major semver will be rejected to the error queue. As an example, message sent from BunnyBus version 1.2.3 will only be accepted from other BunnyBus clients with semver range of 1.x.x.

Constructor

new BunnyBus(config)

Creates a new singleton instance of bunnybus. Accepts a configuration parameter. See config for allowed options.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus({ hostname : 'red-bee.cloudamqp.com' });

//do work;

Getters and Setters

config

Setter and Getter for singleton configuration. Accepts the following optional properties:

  • protocol - value for creating a secure connection. Used in the connection string. Defaults to amqp. [string] Optional
  • username - value of the username. Used in the connection string. Defaults to guest. [string] Optional
  • password - value of the password. Used in the connection string. Defaults to guest. [string] Optional
  • hostname - value of the server address. Just the host portion of the URI. eg red-bee.cloudamqp.com or rabbitbox or 56.23.0.123. Used in the connection string. Defaults to 127.0.0.1. [string] Optional
  • port - value of the port for client connections. Used in the conneciton string. Defaults to 5672. [number] Optional
  • vhost - value of the virtual host the user connects to. Used in the connection string. Defaults to %2f. [string] Optional
  • heartbeat - value negotiated between client and server on when the TCP tunnel is considered dead. Unit is a measurement of milliseconds. Used in the connection string. Defaults to 2000. [number] Optional
  • timeout - value for timing out any network operations. Unit is a measurement of milliseconds. Defaults to 2000. [number] Optional
  • globalExchange - value of the exchange to transact through for message publishing. This is the default used when one is not provided within the options for any BunnyBus methods that supports one transactionally. Defaults to default-exchange. [string] Optional
  • prefetch - value of the maximum number of unacknowledged messages allowable in a channel. Defaults to 5. [number] Optional
  • maxRetryCount - maximum amount of attempts a message can be requeued. This is the default used when one is not provided within the options for any BunnyBus methods that supports one transactionally. Defaults to 10. [number] Optional
  • validatePublisher - flag to dictate if the publishing source for messages being consumed must be bunnyBus. This is a safe guard to prevent unexpected message sources from entering the subscribing realm. A value of bunnyBus is stamped as a header property on the message during publish(). The subscribe() method will use the same value for authentication. Consumers detecting mismatched publishers will auto reject the message into an error queue. Defaults to false. [boolean] Optional
  • validateVersion - flag to dictate if major semver should be matched as part of the message subscription valiation. This is a safe guard to prevent mismatched bunnyBus drivers from pub/sub to each other. Consumers detecting mismatched major values will auto reject the message into an error queue. In order for this layer of validation to occur, validatePublisher must be allowed because the version value is set against the bunnyBus header. Defaults to false. [boolean] Optional
  • normalizeMessages - flag to dictate whether we should coerce incoming messages into our expected shapes. This is primarily used to correct malformed messages from clients other than bunnyBus and should only be used when validatePublisher is set to false. This will alter incoming messages. Defaults to false. [boolean] Optional
  • disableQueueBind - flag to dictate if automatic queue binding should be turned on/off as part of the consume setup process. Defaults to false. [boolean] Optional
  • dispatchType - enumerated value to select dispatch mechanism used. serial will flow messages to your message handler(s) in single file. concurrent will flow messages simultaneously to your message handler(s). Defaults to serial. [string] Optional
  • rejectUnroutedMessages - flag to direct messages that were unroutable to provided handlers to either be automatically rejected or acknowledged off the queue. The default is silent acknowledgements. Defaults to false. [boolean] Optional

Note that updates in the options directed at changing connection string will not take affect immediately. ConnectionManager.close() needs to be called manually to invoke a new connection with new settings.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

//deferred configuration
bunnyBus.config = { hostname : 'red-bee.cloudamqp.com'};

//do work

connections

Getter for connections. A reference to the Connection Manager.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

console.log(bunnyBus.connections.get('defaultConnection'));

// output : { name, connectionOptions, socketOptions, lock, blocked, connection }

channels

Getter for channels. A reference to the Channel Manager.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

console.log(bunnyBus.channels.get('channelForQueue1'));

// output : { name, queue, connectionContext, channelOptions, lock, channel }

subscriptions

Getter for subscriptions. A reference to the Subscription Manager.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

console.log(bunnyBus.subscriptions.get('queue'));

//output : { queue : 'queue1', consumerTag : 'abc123', handlers : {}, options : {}}

logger

Setter and Getter for logger. By default, BunnyBus will instantiate and set a logger using the EventEmitter. When a custom logger is set, BunnyBus will no longer emit log messages through the EventEmitter. The Setter will also validate the contract of the logger to ensure the following keys exist [debug, info, warn, error, fatal] and are of type Function. When validation fails, an error will be thrown.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const logHandler = (message) => {

    //forward the message to some where such as
    //process.stdout or console.log or syslog.
};

//custom logger
bunnyBus.logger = {
    info  : logHandler,
    debug : logHandler,
    warn  : logHandler,
    error : logHandler,
    fatal : logHandler
};

connectionString

Getter for AMQP connection string.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

console.log(bunnyBus.connectionString);
//output : amqp://guest:[email protected]:5672/%2f?heartbeat=2000

Methods

async createExchange(name, type, [options])

Creates an exchange.

parameter(s)
  • name - name of the exchange to be created. [string] Required
  • type - type of exchange to create. Possible values are (direct, fanout, header, topic) [string] Required
  • options - optional settings. Settings are proxied through to amqplib assertExchange. [Object] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.createExchange('default-exchange', 'topic');

async deleteExchange(name, [options])

Delete an exchange.

parameter(s)
  • name - name of the exchange to be deleted. [string] Required
  • options - optional settings. Settings are proxed through to amqplib deleteExchange. [Object] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.deleteExchange('default-exchange');

async checkExchange(name)

Checks if an exchange exists. The channel closes when the exchange does not exist.

parameter(s)
  • name - name of the exchange to be checked. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.checkExchange('default-exchange');

async createQueue(name, [options])

Creates a queue.

parameter(s)
  • name - name of the queue to be created. [string] Required
  • options - optional settings. Settings are proxied through to amqplib assertQueue. [Object] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.createQueue('queue1');

async deleteQueue(name, [options])

Delete a queue.

parameter(s)
  • name - name of the queue to be created. [string] Required
  • options - optional settings. Settings are proxied through to amqplib deleteQueue. [Object] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.deleteQueue('queue1');

async checkQueue(name)

Checks if a queue exists. The channel closes when the queue does not exist.

parameter(s)
  • name - name of the queue to be checked. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.checkQueue('queue1');

async purgeQueue(name)

Purges a queue. Will not throw error in cases where queue does not exist.

parameter(s)
  • name - name of the queue to be purged. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.purgeQueue('queue1');

async publish(message, [options])

Publish a message onto the bus.

parameter(s)
  • message - the content being sent to downstream subscribers. [string|Object|Buffer] Required
  • event - override value for the route key. The value must be supplied here or in options.routeKey. The value can be . separated for namespacing. [string] Optional.
  • options - optional settings. [Object] Optional
    • routeKey - value for the route key to route the message with. The value must be supplied here or in message.event. The value can be . separated for namespacing. [string] Optional
    • transactionId - value attached to the header of the message for tracing. When one is not supplied, a random 40 character token is generated. [string] Optional
    • source - value attached to the header of the message to help with track the origin of messages in your application. For applications that leverage this plugin in multiple modules, each module can supply its own module name so a message can be tracked to the creator. [string] Optional
    • globalExchange - value to override the exchange specified in config. [string] Optional
    • In addition to the above options, all of amqplib's configuration options (except for headers and immediate) from its sendToQueue and publish methods can also be passed as top-level properties in the publish options.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const message = {
    event : 'some.routeKey'
    // other stuff you want to send
}

await bunnyBus.publish(message);

async subscribe(queue, handlers, [options])

Subscribe to messages from a given queue.

parameter(s)
  • queue - the name of the queue to subscribe messages to. A queue with the provided name will be created if one does not exist. [string] Required
  • handlers - a key / handler hash where the key reflects the name of the message.event or routeKey. And the handler reflects a AsyncFunction as async (message, [meta, [ack, [reject, [requeue]]]]) => {}. [Object] Required
  • options - optional settings. [Object] Optional
    • queue - settings for the queue. Settings are proxied through to amqplib assertQueue. [Object] Optional
    • globalExchange - value of the exchange to transact through for message publishing. Defaults to one provided in the config. [string] Optional
    • maxRetryCount - maximum amount of attempts a message can be requeued. Defaults to one provided in the config. [number] Optional
    • validatePublisher - flag for validating messages having bunnyBus header. More info can be found in config. Defaults to one provided in the config. [boolean] Optional
    • validateVersion - flag for validating messages generated from the same major version. More info can be found in config. Defaults to one provided in the config. [boolean] Optional
    • disableQueueBind - flag for disabling automatic queue binding. More info can be found in config. Defaults to one provided in the config. [boolean] Optional
    • rejectUnroutedMessages - flag for enabling rejection for unroutable messages. More info can be found in config. Defaults to one provided in the config. [boolean]
    • meta - allows for meta data regarding the payload to be returned. Headers like the createdAt ISO string timestamp and the transactionId are included in the meta.headers object. Turning this on will adjust the handler to be an AsyncFunction as async (message, meta, [ack, [reject, [requeue]]]) => {}. [boolean] Optional
handlers
key

A key is the routeKey in RabbitMQ terminology. BunnyBus specifically leverages topic exchange to route a message from the exchange to any number of queues that are subscribed. The keys are normally dot notated and wild cards of * (can substitute for exactly one word) and # (can substitute for zero or more words). Keys can look like vineyard.wine-produced, vineyard.wine-sold, vineyard.*, vineyard.# and etc... A bug was found during this implementation regarding expected behavior of wildcard syntax here

handler

A handler is an asynchronous function which contains the following arity. Order matters.

  • message is what was received from the bus. The message does represent the RabbitMQ 'payload.content buffer. The original source of this object is from payload.content.
  • meta is only available when options.meta is set to true. This object will contain all payload related meta information like payload.properties.headers. Headers like the createdAt ISO string timestamp and the transactionId are included in the meta.headers object.
  • async ack([option) is an async function for acknowledging the message off the bus.
    • option - a placeholder for future optional parameters for ack. High chance of deprecation.
  • async reject([option) is an async function for rejecting the message off the bus to a predefined error queue. The error queue is named by default <your queue name>_error. It will also short circuit to error_bus when defaults can't be found.
    • option - An object with a property of reason to be supplied. [Object] Optional
  • async requeue() is an async function for requeuing the message back to the back of the queue. This is feature circumvents Rabbit's nack RPC. nack natively requeues but pushes the message to the front of the queue.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const handlers = {
    route.event1 : async (message, ack, reject, requeue) => {
        await ack();
    },
    route.event2 : async (message, ack, reject, requeue) => {
        if (//something not ready) {
            await requeue();
        } else {
            await ack();
        }
    }
}

await bunnyBus.subscribe('queue', handlers);

async unsubscribe(queue)

Unsubscribe active handlers that are listening to a queue.

parameter(s)
  • queue - the name of the queue. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnyBus.unsubscribe('queue1');

await send(message, queue, [options])

Send a message directly to a specified queue.

note(s)

When message.event or options.routeKey values are not provided for routeKey addressing. The message will be lost when subcribe() handles the queue because messages without a routeKey are discarded.

parameter(s)
  • message - the content being sent directly to specfied queue. [string|Object|Buffer] Required
  • event - override value for the route key. The value must be supplied here or in options.routeKey. The value can be . separated for namespacing. [string] Optional.
  • queue - the name of the queue. [string] Required
  • options - optional settings. [Object] Optional
    • routeKey - value for the route key to route the message with. The value must be supplied here or in message.event. The value can be . separated for namespacing. [string] Optional
    • transactionId - value attached to the header of the message for tracing. When one is not supplied, a random 40 character token is generated. [string] Optional
    • source - value attached to the header of the message to help with tracking the origination point of your application. For applications that leverage this plugin in multiple modules, each module can supply its own module name so a message can be tracked to the creator. [string] Optional
    • In addition to the above options, all of amqplib's configuration options (except for headers and immediate) from its sendToQueue and publish methods can also be passed as top-level properties in the send options.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const message = {
    // other stuff you want to send
}

await bunnyBus.send(message, 'queue1');

async get(queue, [options])

Pop a message directly off a queue. The payload returned is the RabbitMQ payload with payload.properties and payload.content in its original form.

parameter(s)
  • queue - the name of the queue. [string] Required
  • options - optional settings. Settings are proxied through to amqplib get. [Object] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const payload = await bunnyBus.get('queue1');

async getAll(queue, handler, [options])

Pop all messages directly off of a queue until there are no more. Handler is called for each message that is popped.

parameter(s)
  • queue - the name of the queue. [string] Required
  • handler - a handler reflects an AsyncFunction as async (message, [meta, [ack]]) => {}. [AsyncFunction] Required
  • options - optional settings. [Object] Optional
    • get - Settings are proxied through to amqplib get. [Object] Optional
    • meta - allows for meta data regarding the payload to be returned. Headers like the createdAt ISO string timestamp and the transactionId are included in the meta.headers object. Turning this on will adjust the handler to be an AsyncFunction as async (message, meta, [ack]) => {}. [boolean] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const handler = async (message, ack) => {
    await ack();
}

await bunnyBus.getAll('queue1', handler);

Internal-use Methods

The following methods are available in the public API, but manual use of them is highly discouraged.

async _autoBuildChannelContext(channelName, [queue = null])

Method handles the coordination for creating a connection and channel via the ConnectionManager and ChannelManager. This is also responsible for subscribing to error and close events available from the Connection and Channel context classes which proxy events from the corresponding underlying amqplib objects.

parameter(s)
  • channelName - name of the channel to be created or fetched. [string] Required
  • queue - name of queue that will be using the channel. Defaults to null. [string] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const channelContext = await bunnyBus._autoBuildChannelContext(channelForQueue1);
// output : { name, queue, connectionContext, channelOptions, lock, channel }

async _recoverConnection()

Auto retry mechanism to restore all channels and connection that were closed which should still remain active. This method primarily loops against all active subscribed queues to try and recover them. When failures happens, an invocation to process.exit(1) will be done. This is invoked internally through event handlers listening to connection and channel events.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

// something bad happened

await bunnyBus._recoverConnection();

async _recoverChannel(channelName)

Auto retry mechanism to restore a specific channel and attached connection that was closed. When failure happens, an invocation to process.exit(1) will be done. This is invoked internally through event handlers listening to channel events. This will not revive subscribed queues that are in block state registered through the SubscriptionManager

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

// something bad happened

await bunnyBus._recoverChannel(channelName);

async _ack(payload, channelName, [options])

The acknowledge method for removing a message off the queue. Mainly used in handlers through bind() parameter injection for methods like getAll() and subscribe.

parameter(s)
  • payload - raw payload from an AMQP result message response. [Object] Required
  • channelName - the originating channel the payload came from. [string] Required
  • options - not currently used.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const payload = await bunnyBus.getAll('queue1');
await bunnyBus.ack(payload, 'channelForQueue1');

async _requeue(payload, channelName, queue, [options])

Requeues message to any queue while it acknowledges the payload off of the original. This method does not push the message back to the original queue in the front position. It will put the message to any desired queue in the back position. Mainly used in handlers through bind() parameter injection for methods like getAll() and subscribe().

parameter(s)
  • payload - raw payload from an AMQP result message response. [Object] Required
  • channelName - the originating channel the payload came from. [string] Required
  • queue - the destination queue to push to. [string] Required
  • options - can supply AMQP specific values which is just proxied to sentToQueue [Object] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const payload = await bunnyBus.getAll('queue1');
await bunnyBus.requeue(payload, 'channelForQueue1', 'queue1');

async _reject(payload, channelName, [errorQueue, [options]])

Rejects a message by acknowledging off the originating queue and sending to an error queue of choice. Mainly used in handlers through bind() parameter injection for methods like getAll() and subscribe().

parameter(s)
  • payload - raw payload from an AMQP result message response. [Object] Required
  • channelName - the originating channel the payload came from. [string] Required
  • errorQueue - the destination error queue to push to. Defaults to a queue defined in config [string] Optional
  • options - can supply AMQP specific values which is just proxied to sentToQueue for the destination error queue. A property of reason can be supplied which will be caught and added to the message header. The property of reason is used uniformally within all rejection paths in the BunnyBus code base. [Object] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const payload = await bunnyBus.getAll('queue1');
await bunnyBus.reject(payload, 'channelForQueue1', 'queue1_error', { reason: 'some unforeseen failure' });

Events

BunnyBus extends EventEmitter for emitting logs and system specific events. Subscription to these events is optional. BunnyBus class also exposes static Getter properties for the name of these public events.

BunnyBus.LOG_DEBUG_EVENT

event key

  • log.debug - debug level logging message.

handler parameter(s)

  • ...message - log parameter of n-arity sent. [any]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.LOG_DEBUG_EVENT', console.log);

BunnyBus.LOG_INFO_EVENT

event key

  • log.info - info level logging message.

handler parameter(s)

  • ...message - log parameter of n-arity sent. [any]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.LOG_INFO_EVENT', console.log);

BunnyBus.LOG_WARN_EVENT

event key

  • log.warn - warn level logging message.

handler parameter(s)

  • ...message - log parameter of n-arity sent. [any]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.LOG_WARN_EVENT', console.log);

BunnyBus.LOG_ERROR_EVENT

event key

  • log.error - error level logging message.

handler parameter(s)

  • ...message - log parameter of n-arity sent. [any]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.LOG_ERROR_EVENT', console.log);

BunnyBus.LOG_FATAL_EVENT

event key

  • log.fatal - fatal level logging message.

handler parameter(s)

  • ...message - log parameter of n-arity sent. [any]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.LOG_FATAL_EVENT', console.log);

BunnyBus.PUBLISHED_EVENT

event key

  • bunnybus.published - emitted when publish() is successfully called.

handler parameter(s)

  • publishOptions - option sent along with the message header/fields [Object]
  • message - original payload published. [string|Object|Buffer]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.PUBLISHED_EVENT', (options, message) => {

    //do work
});

BunnyBus.MESSAGE_DISPATCHED_EVENT

event key

  • bunnybus.message-dispatched - emitted when subscribing handlers for a queue is about to be called.

handler parameter(s)

  • metaData - option sent along with the message header/fields [Object]
  • message - the parsed version of the content property from the original payload. [string|Object|Buffer]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.MESSAGE_DISPATCHED_EVENT', (metaData, message) => {

    //do work
});

BunnyBus.MESSAGE_ACKED_EVENT

event key

  • bunnybus.message-acked - emitted when _ack() is called succesfully.

handler parameter(s)

  • metaData - option sent along with the message header/fields [Object]
  • message - the parsed version of the content property from the original payload. [string|Object|Buffer]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.MESSAGE_ACKED_EVENT', (metaData, message) => {

    //do work
});

BunnyBus.MESSAGE_REQUEUED_EVENT

event key

  • bunnybus.message-requeued - emitted when _requeue() is called succesfully.

handler parameter(s)

  • metaData - option sent along with the message header/fields [Object]
  • message - the parsed version of the content property from the original payload. [string|Object|Buffer]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.MESSAGE_REQUEUED_EVENT', (metaData, message) => {

    //do work
});

BunnyBus.MESSAGE_REJECTED_EVENT

event key

  • bunnybus.message-rejected - emitted when _reject() is called succesfully.

handler parameter(s)

  • metaData - option sent along with the message header/fields [Object]
  • message - the parsed version of the content property from the original payload. [string|Object|Buffer]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.MESSAGE_REJECTED_EVENT', (metaData, message) => {

    //do work
});

BunnyBus.SUBSCRIBED_EVENT

event key

  • bunnybus.subscribed - emitted when subcribe() is successfully called.

handler parameter(s)

  • queue - name of queue subcribed for. [string]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.SUBSCRIBED_EVENT', (queue) => {

    console.log(queue);
    // output : 'queue1'
});

BunnyBus.UNSUBSCRIBED_EVENT

event key

  • bunnybus.unsubscribed - emitted when unsubcribe() is successfully called.

handler parameter(s)

  • queue - name of queue unsubscribed from. [string]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.UNSUBSCRIBED_EVENT', (queue) => {

    console.log(queue);
    // output : 'queue1'
});

BunnyBus.RECOVERING_CONNECTION_EVENT

event key

handler parameter(s)

  • connectionName - name of the connection involved with the recovery event.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.RECOVERING_CONNECTION_EVENT', (connectionName) => {

    // do work to handle the case when a connection or channel is having a failure
});

BunnyBus.RECOVERED_CONNECTION_EVENT

event key

  • bunnybus.recovered-connection - emitted when the corresponding recovering connection event is restored

handler parameter(s)

  • connectionName - name of the connection involved with the recovery event.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.RECOVERED_CONNECTION_EVENT', (connectionName) => {

    // do work to handle the case when a connection or channel is having a failure
});

BunnyBus.RECOVERING_CHANNEL_EVENT

event key

handler parameter(s)

  • channelName - name of the channel involved with the recovery event.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.RECOVERING_CHANNEL_EVENT', (channelName) => {

    // do work to handle the case when a connection or channel is having a failure
});

BunnyBus.RECOVERED_CHANNEL_EVENT

event key

  • bunnybus.recovered-channel - emitted when the corresponding recovering channel event is restored

handler parameter(s)

  • channelName - name of the channel involved with the recovery event.
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.RECOVERED_CHANNEL_EVENT', (channelName) => {

    // do work to handle the case when a connection or channel is having a failure
});

BunnyBus.RECOVERY_FAILED_EVENT

event key

  • bunnybus.recovery-failed - emitted when recovery efforts leads to a failed state.

handler parameter(s)

  • err - a error object of type Error [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.on('BunnyBus.RECOVERY_FAILED_EVENT', (err) => {

    // do work to handle the case when a connection or channel is having a failure
});

Connection

This class contains the actual amqplib connection objet along with contextual properties like name and options that were used to create the connection. The Connection is also an EventEmitter to support event proxying from the underlying amqplib connection object.

Getters and Setters

name

Getter for connection name. Value used for futher operation and identification of a connection.

connectionOptions

Getter for connection options supplied to amqplib.connect() interface. See config for allowed options. Only relevant subset is used.

socketOptions

Getter for socket / tls options supplied to amqplib.connect() interface that is then proxied to the underling net and tls libraries.

lock

Setter and Getter for mutual-exclusion lock for the instantiated object. Used to ensure operations for connection creation is done single file sequentially.

blocked

Setter and Getter for connection block signaling from RabbitMQ for cases of server resource starvation.

connection

Setter and Getter for the amqplib connection object.

Events

ConnectionManager.AMQP_CONNECTION_ERROR_EVENT

key value
  • amqp.connection.error - emitted when the amqplib connection errors.
handler parmaeters
  • err - a error object of type Error [Object]
  • connectionContext - connection context that is the Connecton class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.connections.get('connectionName').on(ConnectionManager.AMQP_CONNECTION_ERROR_EVENT, (err, context) => {

    console.error(err);
    console.log(context);
    // output : { name, connectionOptions, socketOptions, lock, blocked, connection }
});

ConnectionManager.AMQP_CONNECTION_CLOSE_EVENT

key value
  • amqp.connection.close - emitted when the amqplib connection closes.
handler parmaeters
  • connectionContext - connection context that is the Connecton class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.connections.get('connectionName').on(ConnectionManager.AMQP_CONNECTION_CLOSE_EVENT, (context) => {

    console.log(context);
    // output : { name, connectionOptions, socketOptions, lock, blocked, connection }
});

ConnectionManager.AMQP_CONNECTION_BLOCKED_EVENT

key value
  • amqp.connection.blocked - emitted when the amqplib connection is blocked to signal no more send/publish operations should be invoked.
handler parmaeters
  • connectionContext - connection context that is the Connecton class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.connections.get('connectionName').on(ConnectionManager.AMQP_CONNECTION_BLOCKED_EVENT, (context) => {

    console.log(context);
    // output : { name, connectionOptions, socketOptions, lock, blocked, connection }
});

ConnectionManager.AMQP_CONNECTION_UNBLOCKED_EVENT

key value
  • amqp.connection.unblocked - emitted when the amqplib connection is unblocked.
handler parmaeters
  • connectionContext - connection context that is the Connecton class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.connections.get('connectionName').on(ConnectionManager.AMQP_CONNECTION_UNBLOCKED_EVENT, (context) => {

    console.log(context);
    // output : { name, connectionOptions, socketOptions, lock, blocked, connection }
});

ConnectionManager.CONNECTION_REMOVED

key value
  • connectionManager.removed - emitted when the connection context is removed by close().
handler parmaeters
  • connectionContext - connection context that is the Connecton class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.connections.get('connectionName').on(ConnectionManager.CONNECTION_REMOVED, (context) => {

    console.log(context);
    // output : { name, connectionOptions, socketOptions, lock, blocked, connection }
});

ConnectionManager

This class manages the collection of all connections created within BunnyBus. The ConnectionManager is also an EventEmitter so when actions like remove is called, events are emitted.

Methods

async create(name, connectionOptions, [socketOptions])

Creates an amqplib connection.

parameter(s)
  • name - name of the connection. [string] Required
  • connectionOptions - options used to create the amqplib connection. See config for allowed options. Only relevant subset is used. [Object] Required
  • socketOptions - options used to configure the underlying socket/tls behavior. Refer to net / `tls' modules for configuration values. [Object] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const connectionContext = await bunnybus.connections.create('defaultConnection', { hostname, username, password });

contains(name)

Checks if a connection context exist with the specified name.

parameter(s)
  • name - name of the connection. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const exist = bunnybus.connections.contains('defaultConnection');
// exist : boolean

get(name)

Retrieves a connection context with the specified name.

parameter(s)
  • name - name of the connection. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const connectionContext = bunnybus.connections.get('defaultConnection');

list()

Returns all connections registered that are in any state of operability.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const connectionContexts = bunnybus.connections.list();

hasConnection(name)

Checks if an amqplib connection exist with the specified name.

parameter(s)
  • name - name of the connection. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const exist = bunnybus.connections.hasConnection('defaultConnection');
// exist : boolean

getConnection(name)

Retrieves an amqplib connection with the specified name.

parameter(s)
  • name - name of the connection. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const connection = bunnybus.connections.getConnection('defaultConnection');
// connection : amqplib connecton object

async remove(name)

Removes the connection context with the specified name from the ConnectionManager. Closes the underlying connection.

parameter(s)
  • name - name of the connection. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnybus.connections.remove('defaultConnection');

async close(name)

Closes the amqplib connection the specified name.

parameter(s)
  • name - name of the connection. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnybus.connections.close('defaultConnection');

Events

ConnectionManager.CONNECTION_REMOVED

key value
  • connectionManager.removed - emitted when the connection context is removed by close().
handler parmaeters
  • connectionContext - connection context that is the Connecton class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.connections.on(ConnectionManager.CONNECTION_REMOVED, (context) => {

    console.log(context);
    // output : { name, connectionOptions, socketOptions, lock, blocked, connection }
});

Channel

This class contains the actual amqplib channel object along with contextual properties like name, connection context and otions that were used to create the channel. The Channel is also an EventEmitter to support event proxying from the underlying amqplib channel object.

Getters and Setters

name

Getter for channel name. Value used for futher operation and identification of a channel.

connectionContext

Getter for connection context. The connection context contains the amqplib connection object that is used to create a channel from.

channelOptions

Getter for channel options supplied to amqplib.createConfirmChannel() interface. See config for allowed options. Only relevant subset is used like prefetch

lock

Setter and Getter for mutual-exclusion lock for the instantiated object. Used to ensure operations for channel creation is done single file sequentially.

channel

Setter and Getter for the amqplib channel object.

Events

ChannelManager.AMQP_CHANNEL_ERROR_EVENT

key value
  • amqp.channel.error - emitted when the amqplib channel errors.
handler parameters
  • err - a error object of type Error [Object]
  • channelContext - channel context that is the Channel class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.channels.get('channelName').on(ChannelManager.AMQP_CHANNEL_ERROR_EVENT, (err, context) => {

    console.error(err);
    console.log(context);
    // output : { name, queue, connectionContext, channelOptions, lock, channel }
});

ChannelManager.AMQP_CHANNEL_CLOSE_EVENT

key value
  • amqp.channel.close - emitted when the amqplib channel closes.
handler parmaeters
  • channelContext - channel context that is the Channel class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.channels.get('channelName').on(ChannelManager.AMQP_CHANNEL_CLOSE_EVENT, (context) => {

    console.log(context);
    // output : { name, queue, connectionContext, channelOptions, lock, channel }
});

ChannelManager.AMQP_CHANNEL_RETURN_EVENT

key value
  • amqp.channel.return - emitted when the amqplib channel returns a message that had no route recipient when published to an exchange.
handler parmaeters
  • payload - The message that was sent with no matching route recipient. [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.channels.get('channelName').on(ChannelManager.AMQP_CHANNEL_RETURN_EVENT, (payload) => {

    console.log(payload);
    // output : { properties, content }
});

ChannelManager.AMQP_CHANNEL_DRAIN_EVENT

key value
  • amqp.channel.drain - emitted when the amqplib channel signals resource capacity recovery allowing for more messages to be sent.
handler parmaeters
  • channelContext - channel context that is the Channel class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.channels.get('channelName').on(ChannelManager.AMQP_CHANNEL_DRAIN_EVENT, (context) => {

    console.log(context);
    // output : { name, queue, connectionContext, channelOptions, lock, channel }
});

ChannelManager.CHANNEL_REMOVED

key value
  • channelManager.removed - emitted when the channel context is removed by close().
handler parmaeters
  • channelContext - channel context that is the Channel class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.connections.get('connectionName').on(ChannelManager.CHANNEL_REMOVED, (context) => {

    console.log(context);
    // output : { name, queue, connectionContext, channelOptions, lock, channel }
});

ChannelManager

This class manages the collection of all connections created within BunnyBus. The ConnectionManager is also an EventEmitter so when actions like remove is called, events are emitted.

Methods

async create(name, [queue = null], connectionContext, channelOptions)

Creates an amqplib channel.

parameter(s)
  • name - name of the channel. [string] Required
  • queue - name of the queue the channel is supporting. Used primarily as a label to use for filtering and identification. Defaults to null. [string] Optional
  • connectionContext - the connection context to use for instantiation of a channel from. [Object] Required
  • channelOptions - options used to create the amqplib connection. See config for allowed options. Only relevant subset is used like prefetch [Object] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const channelContext = await bunnybus.channels.create('channelForQueue1', 'queue1', connectionContext, { prefetch });

contains(name)

Checks if a channel context exist with the specified name.

parameter(s)
  • name - name of the channel. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const exist = bunnybus.channels.contains('channelForQueue1');
// exist : boolean

get(name)

Retrieves a channel context with the specified name.

parameter(s)
  • name - name of the channel. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const channelContext = bunnybus.channels.get('channelForQueue1');

list()

Returns all channels registered that are in any state of operability.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const channelContexts = bunnybus.channels.list();

hasChannel(name)

Checks if an amqplib channel exist with the specified name.

parameter(s)
  • name - name of the channel. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const exist = bunnybus.channels.hasChannel('channelForQueue1');
// exist : boolean

getChannel(name)

Retrieves an amqplib channel with the specified name.

parameter(s)
  • name - name of the channel. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

const channel = bunnybus.channels.getConnection('channelForQueue1');
// connection : amqplib channel object

async remove(name)

Removes the channel context with the specified name from the ChannelManager. Closes the underlying channel.

parameter(s)
  • name - name of the channel. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnybus.channels.remove('channelForQueue1');

async close(name)

Closes the amqplib channel the specified name.

parameter(s)
  • name - name of the channel. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

await bunnybus.connections.close('channelForQueue1');

Events

ChannelManager.CHANNEL_REMOVED

key value
  • channelManager.removed - emitted when the channel context is removed by remove().
handler parmaeters
  • channelContext - channel context that is the Channel class [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.channels.on(ChannelManager.AMQP_CHANNEL_DRAIN_EVENT, (context) => {

    console.log(context);
    // output : { name, queue, connectionContext, channelOptions, lock, channel }
});

SubscriptionManager

This class manages the state for all subscriptions registered with queues. A subscription is an association between a queue and handlers associated with it. A subscription is created when subscribe() is invoked succesfully. The SubscriptionManager is also an EventEmitter so when actions like create, clear and remove are called, events are emitted so BunnyBus can apply the corresponding behavior to meet the desired state.

Methods

contains(queue, [withConsumerTag])

Checks if a queue has a subscription.

  • queue - the name of the queue. [string] Required
  • withConsumerTag - requires the condition of a subscription to be active. Defaults to true. [boolean] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.contains('queue1');

create(queue, handlers, [options])

Creates a subscription.

  • queue - the name of the queue. [string] Required
  • handlers - handlers parameter passed through the subscribe() method. [Object] Required
  • options - options parameter passed through the subscribe() method. [Object] Optional
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.create('queue1');
}

tag(queue, consumerTag)

Tag a subscription.

  • queue - the name of the queue. [string] Required
  • consumerTag - a value returned from the consume() method of amqplib. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.tag('queue1', 'abcd1234');
}

get(queue)

Returns a clone of the subscription if the queue exists. Returns undefined when it does not exist.

  • queue - the name of the queue. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.get('queue1');
}

clear(queue)

Clears a subscription of the consumerTag. Returns true when successful and false when not.

  • queue - the name of the queue. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.clear('queue1');
}

clearAll()

Clears all subscriptions of the consumerTag.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.clearAll();
}

remove(queue)

Removes a subscription from the registrar. Returns true when successful and false when not.

  • queue - the name of the queue. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.remove('queue1');
}

list()

Returns a list of cloned subscriptions in the registrar.

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.list();

//output : [ subscriptions ]
}

block(queue)

Adds a queue to the ban list. Queues in this list live in the desired state. Once a queue name is added to this list, BunnyBus will try to unsubscribe any active consumed queues.

  • queue - the name of the queue. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.block('queue1');
}

unblock(queue)

Removes a queue from the ban list. Queues in this list live in the desired state. Once a queue name is removed from this list, BunnyBus will try to re-subscribe any unactive queues.

  • queue - the name of the queue. [string] Required
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnybus.subscriptions.unblock('queue1');
}

Events

SubscriptionManager.CREATED_EVENT

key value
  • subscription.created - emitted when create() is succesfully called.
handler parmaeters
  • subscription - subscription context that was created [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.subscriptions.on(SubscriptionMananger.CREATED_EVENT, (context) => {

    console.log(context);
    // output : { queue, handlers, options, consumerTag }
});

SubscriptionManager.TAGGED_EVENT

key value
  • subscription.tagged - emitted when tag() is succesfully called.
handler parmaeters
  • subscription - subscription context that was tagged [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.subscriptions.on(SubscriptionMananger.TAGGED_EVENT, (context) => {

    console.log(context);
    // output : { queue, handlers, options, consumerTag }
});

SubscriptionManager.CLEARED_EVENT

key value
  • subscription.cleared - emitted when clear() is succesfully called.
handler parmaeters
  • subscription - subscription context that had its consumer tag removed [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.subscriptions.on(SubscriptionMananger.CLEARED_EVENT, (context) => {

    console.log(context);
    // output : { queue, handlers, options, consumerTag }
});

SubscriptionManager.REMOVED_EVENT

key value
  • subscription.removed - emitted when remove() is succesfully called.
handler parmaeters
  • subscription - subscription context that was removed [Object]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.subscriptions.on(SubscriptionMananger.REMOVED_EVENT, (context) => {

    console.log(context);
    // output : { queue, handlers, options, consumerTag }
});

SubscriptionManager.BLOCKED_EVENT

key value
  • subscription.blocked - emitted when block() is succesfully called.
handler parmaeters
  • queue - queue that was added to the block list [string]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.subscriptions.on(SubscriptionMananger.BLOCKED_EVENT, (queue) => {

    console.log(queue);
    // output : 'queue1'
});

SubscriptionManager.UNBLOCKED_EVENT

key value
  • subscription.unblocked - emitted when unblock() is succesfully called.
handler parmaeters
  • queue - queue that was removed from the block list [string]
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();

bunnyBus.subscriptions.on(SubscriptionMananger.UNBLOCKED_EVENT, (queue) => {

    console.log(queue);
    // output : 'queue1'
});

Error Types

All BunnyBus errors are extended from the native Error class.

  • IncompatibleLoggerError - thrown when the logger interface contract is not met when instance.logger is set.
  • NoConnectionError - thrown when no connection exist
  • NoChannelError - thrown when no channel exist
  • NoHeaderFieldError - thrown when an incoming message lacks a header field
  • NoRouteKeyError - thrown when no route key can be found. Lookup is done against payload.properties.headers.routeKey, options.routeKey, message.event and payload.fields.routingKey in that order.
  • SubscriptionExistError - thrown when subscribe() is called and handlers have already been registered against the queue
  • SubscriptionBlockedError - thrown when subscribe() is called and the queue is in a desired state of blocked. The handlers would still have registered, but it would take an unblock() call to allow for the handlers to continue its subscriptions.