Skip to content

Commit

Permalink
feat: add support for consumer stop options (#76)
Browse files Browse the repository at this point in the history
* feat: add global stop options support

* feat: add stop options support by consumer
  • Loading branch information
cassiolacerda authored Mar 18, 2024
1 parent 0cfb340 commit 72cb675
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
23 changes: 16 additions & 7 deletions lib/sqs.service.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import { Inject, Injectable, Logger, LoggerService, OnModuleDestroy, OnModuleInit } from '@nestjs/common';

Check warning on line 1 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (16.x)

'Inject' is defined but never used

Check warning on line 1 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (16.x)

'Injectable' is defined but never used

Check warning on line 1 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (18.x)

'Inject' is defined but never used

Check warning on line 1 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (18.x)

'Injectable' is defined but never used

Check warning on line 1 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (20.x)

'Inject' is defined but never used

Check warning on line 1 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (20.x)

'Injectable' is defined but never used
import { Consumer } from 'sqs-consumer';
import { Consumer, StopOptions } from 'sqs-consumer';
import { Producer } from 'sqs-producer';
import { SQSClient, GetQueueAttributesCommand, PurgeQueueCommand, QueueAttributeName } from '@aws-sdk/client-sqs';
import { Message, QueueName, SqsConsumerEventHandlerMeta, SqsMessageHandlerMeta, SqsOptions } from './sqs.types';
import {
Message,
QueueName,
SqsConsumerEventHandlerMeta,
SqsConsumerMapValues,
SqsMessageHandlerMeta,
SqsOptions,
} from './sqs.types';
import { DiscoveryService } from '@golevelup/nestjs-discovery';
import { SQS_CONSUMER_EVENT_HANDLER, SQS_CONSUMER_METHOD, SQS_OPTIONS } from './sqs.constants';

Check warning on line 14 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (16.x)

'SQS_OPTIONS' is defined but never used

Check warning on line 14 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (18.x)

'SQS_OPTIONS' is defined but never used

Check warning on line 14 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (20.x)

'SQS_OPTIONS' is defined but never used

@Injectable()
export class SqsService implements OnModuleInit, OnModuleDestroy {

Check warning on line 17 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (16.x)

'SqsService' is defined but never used

Check warning on line 17 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (18.x)

'SqsService' is defined but never used

Check warning on line 17 in lib/sqs.service.ts

View workflow job for this annotation

GitHub Actions / test (20.x)

'SqsService' is defined but never used
public readonly consumers = new Map<QueueName, Consumer>();
public readonly consumers = new Map<QueueName, SqsConsumerMapValues>();
public readonly producers = new Map<QueueName, Producer>();

private logger: LoggerService;
private globalStopOptions: StopOptions;

public constructor(
@Inject(SQS_OPTIONS) public readonly options: SqsOptions,
Expand All @@ -20,6 +28,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy {

public async onModuleInit(): Promise<void> {
this.logger = this.options.logger ?? new Logger('SqsService', { timestamp: false });
this.globalStopOptions = this.options.globalStopOptions ?? {};

const messageHandlers = await this.discover.providerMethodsWithMetaAtKey<SqsMessageHandlerMeta>(
SQS_CONSUMER_METHOD,
Expand All @@ -29,7 +38,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy {
);

this.options.consumers?.forEach((options) => {
const { name, ...consumerOptions } = options;
const { name, stopOptions, ...consumerOptions } = options;
if (this.consumers.has(name)) {
throw new Error(`Consumer already exists: ${name}`);
}
Expand Down Expand Up @@ -61,7 +70,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy {
);
}
}
this.consumers.set(name, consumer);
this.consumers.set(name, { instance: consumer, stopOptions: stopOptions ?? this.globalStopOptions });
});

this.options.producers?.forEach((options) => {
Expand All @@ -75,13 +84,13 @@ export class SqsService implements OnModuleInit, OnModuleDestroy {
});

for (const consumer of this.consumers.values()) {
consumer.start();
consumer.instance.start();
}
}

public onModuleDestroy() {
for (const consumer of this.consumers.values()) {
consumer.stop();
consumer.instance.stop(consumer.stopOptions);
}
}

Expand Down
9 changes: 8 additions & 1 deletion lib/sqs.types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ConsumerOptions } from 'sqs-consumer';
import type { Consumer, ConsumerOptions, StopOptions } from 'sqs-consumer';
import type { Producer } from 'sqs-producer';
import type { LoggerService, ModuleMetadata, Type } from '@nestjs/common';
import type { MessageAttributeValue } from '@aws-sdk/client-sqs';
Expand All @@ -8,6 +8,12 @@ export type QueueName = string;

export type SqsConsumerOptions = Omit<ConsumerOptions, 'handleMessage' | 'handleMessageBatch'> & {
name: QueueName;
stopOptions?: StopOptions;
};

export type SqsConsumerMapValues = {
instance: Consumer;
stopOptions: StopOptions;
};

export type SqsProducerOptions = ProducerOptions & {
Expand All @@ -18,6 +24,7 @@ export interface SqsOptions {
consumers?: SqsConsumerOptions[];
producers?: SqsProducerOptions[];
logger?: LoggerService;
globalStopOptions?: StopOptions;
}

export interface SqsModuleOptionsFactory {
Expand Down

0 comments on commit 72cb675

Please sign in to comment.