Skip to content

Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.

License

Notifications You must be signed in to change notification settings

nest-x/nestx-amqp

Repository files navigation

nestx-amqp

NPM Github Workflow Status Codecov Semantic-Release

Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.


Features

  • Provide an AmqpModule create AmqpConnectionManager async
  • Provide an injectable amqp connection manager at global
  • Provide decorators like @PublishQueue and @SubscribeQueue as method decorator for simple usage

Installation

yarn add nestx-amqp

Examples

Register Module Async

import { Module } from '@nestjs/common'
import { AmqpModule } from 'nestx-amqp'

@Module({
  imports: [
    AmqpModule.forRootAsync({
      useFactory: () => ({
        urls: ['amqp://guest:guest@localhost:5672?heartbeat=60'],
      }),
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Inject AmqpConnectionManager

Use Symbol AMQP_CONNECTION for Injection:

Below is an abstract producer code sample.

import { Inject, OnModuleInit } from '@nestjs/common'
import { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'
import { Options } from 'amqplib'
import { AMQP_CONNECTION } from 'nestx-amqp'

export abstract class SimpleAbstractProducer implements OnModuleInit {
  channel: ChannelWrapper;

  abstract getQueue(): string;
  abstract getQueueOptions(): Options.AssertQueue;

  constructor(
    @Inject(AMQP_CONNECTION)
    readonly connectionManager: AmqpConnectionManager
  ) {}

  async onModuleInit() {
    this.channel = this.connectionManager.createChannel({
      json: true,
      setup: (channel) => channel.assertQueue(this.queue),
    })
    await this.channel.waitForConnect();
  }

  async send(message, options?: Options.Publish) {
    await this.channel.sendToQueue(this.queue, message, options);
  }
}

Advanced Usage with Decorators

Currently, only support direct queue publish and subscribe

Interface Queue

export interface Queue {
  name: string;
  options?: Options.AssertQueue;
}

export interface RetryOptions {
  maxAttempts: number;
}

export interface BaseConsumeOptions {
  prefetch: number;
  exceptionQueue?: string;
}

export type PublishQueueOptions = Options.Publish;
export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions> & Options.Consume;

@PublishQueue()

Provide a MethodDecorator easily publishing message to queue

Options:

@PublishQueue(queue: string | Queue, options?: amqplib.Options.Publish)
yourPublishQueueMethod(content:any, options?: amqplib.Options.Publish){}

Example:

(You must register and enable AmqpModule)

@Injectable()
class TestMessageService {
  queue = 'TEST.QUEUE';

  @PublishQueue(queue)
  async testPublishQueue(content) {
    console.log(`call test publish queue with ${JSON.stringify(content)}`);
  }
}

@SubscribeQueue()

Provide a MethodDecorator easily consuming message and support simply requeue logic

Options:

@SubscribeQueue(nameOrQueue: string | Queue, options?: ConsumeQueueOptions)
yourSubscribeQueueMethod(content){}

ConsumeQueueOptions:

export interface RetryOptions {
  maxAttempts: number;
}

export interface BaseConsumeOptions {
  prefetch: number;
  exceptionQueue?: string;
}

export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions>;

Example:

You must register and enable AmqpModule

@Injectable()
class TestMessageService {
  queue = 'TEST.QUEUE';

  @SubscribeQueue(queue)
  async testSubscribeQueue(content) {
    // do your business handling code
    // save db? send email?
    console.log(`handling content ${JSON.stringify(content)}`);
  }
}

Interface Exchange

import { Options } from 'amqplib'

/**
 * @desc simply wrap amqp exchange definitions as interface
 * */
export interface Exchange {
  name: string
  type: string | 'direct' | 'fanout' | 'topic' | 'headers'
  options?: Options.AssertExchange
}

/**
 * @desc wrap amqp.Channel.publish(exchange: string, routingKey: string, content, options?: Publish): boolean
 *       as interface
 * */
export interface PublishExchangeOptions {
  routingKey: string
  options?: Options.Publish
}

@PublishExchange()

Not Stable

Provide a MethodDecorator easily publishing message to exchange

Options:

@PublishExchange(exchange: string | Exchange, options?: PublishExchangeOptions)
yourPublishExchangeMethod(content:any, options?: PublishExchangeOptions){}

Example:

No Example for stable usage, you can refer to unit test case (or submit PR)


@UseAmqpConnection(name?:string)

Provide a MethodDecorator easily spec connection (when you register AmqpModule) with @PublisQueue() and @SubscribeQueue)

Recommend if you want to develop npm package using spec named connection

Example:

@Injectable()
class AmqpLoggerService {
  queue = 'LOGGER.QUEUE'

  @UseAmqpConnection('logger')
  @PublishQueue(queue)
  async logSideEffect(content) {
    // just do nothing here and auto send to LOGGER.QUEUE with spec `logger` connection
  }
}

for more details, you can refer unittest cases.

Change Log

See CHANGELOG.md


LICENSE

Released under MIT License.