diff --git a/lib/sqs.service.ts b/lib/sqs.service.ts index d5d75af..03c411b 100644 --- a/lib/sqs.service.ts +++ b/lib/sqs.service.ts @@ -1,17 +1,25 @@ import { Inject, Injectable, Logger, LoggerService, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; -import { Consumer } from 'sqs-consumer'; +import { Consumer, StopOptions } from 'sqs-consumer'; import { Producer } from 'sqs-producer'; import { SQSClient, GetQueueAttributesCommand, PurgeQueueCommand, QueueAttributeName } from '@aws-sdk/client-sqs'; -import { Message, QueueName, SqsConsumerEventHandlerMeta, SqsMessageHandlerMeta, SqsOptions } from './sqs.types'; +import { + Message, + QueueName, + SqsConsumerEventHandlerMeta, + SqsConsumerMapValues, + SqsMessageHandlerMeta, + SqsOptions, +} from './sqs.types'; import { DiscoveryService } from '@golevelup/nestjs-discovery'; import { SQS_CONSUMER_EVENT_HANDLER, SQS_CONSUMER_METHOD, SQS_OPTIONS } from './sqs.constants'; @Injectable() export class SqsService implements OnModuleInit, OnModuleDestroy { - public readonly consumers = new Map(); + public readonly consumers = new Map(); public readonly producers = new Map(); private logger: LoggerService; + private globalStopOptions: StopOptions; public constructor( @Inject(SQS_OPTIONS) public readonly options: SqsOptions, @@ -20,6 +28,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { public async onModuleInit(): Promise { this.logger = this.options.logger ?? new Logger('SqsService', { timestamp: false }); + this.globalStopOptions = this.options.globalStopOptions ?? {}; const messageHandlers = await this.discover.providerMethodsWithMetaAtKey( SQS_CONSUMER_METHOD, @@ -29,7 +38,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { ); this.options.consumers?.forEach((options) => { - const { name, ...consumerOptions } = options; + const { name, stopOptions, ...consumerOptions } = options; if (this.consumers.has(name)) { throw new Error(`Consumer already exists: ${name}`); } @@ -61,7 +70,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { ); } } - this.consumers.set(name, consumer); + this.consumers.set(name, { instance: consumer, stopOptions: stopOptions ?? this.globalStopOptions }); }); this.options.producers?.forEach((options) => { @@ -75,13 +84,13 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { }); for (const consumer of this.consumers.values()) { - consumer.start(); + consumer.instance.start(); } } public onModuleDestroy() { for (const consumer of this.consumers.values()) { - consumer.stop(); + consumer.instance.stop(consumer.stopOptions); } } diff --git a/lib/sqs.types.ts b/lib/sqs.types.ts index e2c1098..23034b5 100644 --- a/lib/sqs.types.ts +++ b/lib/sqs.types.ts @@ -1,4 +1,4 @@ -import type { ConsumerOptions } from 'sqs-consumer'; +import type { Consumer, ConsumerOptions, StopOptions } from 'sqs-consumer'; import type { Producer } from 'sqs-producer'; import type { LoggerService, ModuleMetadata, Type } from '@nestjs/common'; import type { MessageAttributeValue } from '@aws-sdk/client-sqs'; @@ -8,6 +8,12 @@ export type QueueName = string; export type SqsConsumerOptions = Omit & { name: QueueName; + stopOptions?: StopOptions; +}; + +export type SqsConsumerMapValues = { + instance: Consumer; + stopOptions: StopOptions; }; export type SqsProducerOptions = ProducerOptions & { @@ -18,6 +24,7 @@ export interface SqsOptions { consumers?: SqsConsumerOptions[]; producers?: SqsProducerOptions[]; logger?: LoggerService; + globalStopOptions?: StopOptions; } export interface SqsModuleOptionsFactory {