This library provides a nestjs transporter by taking NSQ as its message broker. We could simply take the concept of a transporter as an event-driven framework as follows:
- As a message producer, I could send a message to NSQ to various topics by emitting various events based on the topic names.
- As a message consumer, I could receive a message by subscribing to those emitted events.
For more details on the overall architecture, please kinldy refer to this article
This library has not implemented the request-response communication style and currently it only supports the event-dispatching style.
import { DynamicModule, Module } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ClientNsq, NsqOptions } from 'nestjs-nsq-transporter';
@Module({
providers: [
{
provide: 'NSQ_CLIENT',
useFactory: (): ClientProxy => new ClientNsq(options),
}
SomeService,
],
})
export class SomeModule {}
import { Injectable } from '@nestjs/common';
@Injectable()
export class SomeService {
constructor(@Inject('NSQ_CLIENT') private nsqProducer: ClientNsq) {}
sendMessage(topic: string, msg: any) {
return this.nsqProducer.emit(topic, msg);
}
}
Notes: You can not blindly pass any key/value as the 2nd argument(msg
) of the emit
function because there are two special reserved keys: meta
and options
. The meta
is used by the default OutboundEventSerializer
to attach extra informations; options
is for setting the options for the nsq message producing, see the sample below.
this.nsqProducer.emit(topic,
{ foo: 1, bar: 2, meta: { component: 'XYZ'}, options: {retry: { retries: 3 } }
);
// The `emit` above will send a nsq message with payload as follow with 3 times retry strategy.
{ data: { foo: 1, bar: 2 }, meta: { component: 'XYZ'} }
import { MicroserviceOptions } from '@nestjs/microservices';
import { serverNsq } from 'nestjs-nsq-transporter';
app.connectMicroservice<MicroserviceOptions>({
strategy: new ServerNsq(options),
});
import { Injectable } from '@nestjs/common';
import { EventPattern, Ctx, Payload } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern({
topic: 'topic1',
channel: 'channel1',
options: { // optional
maxAttempts: 3
}
})
messageHandlerForTopic1(@Payload() payload: any, @Ctx() context: NsqContext)
// Handle messages
// Notes: if you throw an Error from this messageHandler, it's better to use `RpcException` so that we have explicit log in nsq-transporter.
}
}
The options
that passed to either ServerNsq
or ClientNsq
has the type as NsqOptions and the available fields are as folows:
Field Name | Is Required | Type | Description | Example |
---|---|---|---|---|
lookupdHTTPAddresses | No | string[] |
http address list for nsq lookupds | ['http://localhost:4161'] |
strategy | No | 'round_robin' or 'fan_out' |
message sending strategy | round_robin |
discardHandler | No | (arg: any) => void |
handler function to process when message is discarded | (arg: any) => console.log(arg) |
maxInFlight | No | number |
The maximum number of messages to process at once | 1 |
requeueDelay | No | number |
The default amount of time (milliseconds) a message requeued should be delayed by before being dispatched by nsqd. | 90000 |
lookupdPollInterval | No | number |
The frequency in seconds for querying lookupd instances. | 60 |
maxAttempts | No | number |
The number of times a given message will be attempted (given to MESSAGE handler) before it will be handed to the DISCARD handler and then automatically finished | 3 |
serializer | No | Serializer in @nestjs/microservices |
The instance of Serializer class which provides a serialize method to serialize the outbound message |
serialize(value: any) => value |
deserializer | No | ConsumerDeserializer in @nestjs/microservices |
The instance of ConsumerDeserializer class which provides a deserialize method to deserialize the inbound message |
deserialize(value: any) => value |