diff --git a/src/ILogger.ts b/src/ILogger.ts index ac813ec..d8c870f 100644 --- a/src/ILogger.ts +++ b/src/ILogger.ts @@ -5,5 +5,7 @@ export interface ILogger { info: (message: string) => void; + warn: (message: string) => void; + error: (message: string) => void; } diff --git a/src/LoggerWrapper.ts b/src/LoggerWrapper.ts index e711384..ba15e1a 100644 --- a/src/LoggerWrapper.ts +++ b/src/LoggerWrapper.ts @@ -19,6 +19,10 @@ export class LoggerWrapper implements ILogger { this._logger?.info && this._logger.info(message); } + warn(message: string) { + this._logger?.warn && this._logger.warn(message); + } + error(message: string) { this._logger?.error ? this._logger.error(message) : console.log(message); } diff --git a/src/queue/Queue.ts b/src/queue/Queue.ts index 7a2a92e..bf51b91 100644 --- a/src/queue/Queue.ts +++ b/src/queue/Queue.ts @@ -1,5 +1,6 @@ import {SNS} from '@aws-sdk/client-sns'; import { + ChangeMessageVisibilityCommandInput, DeleteMessageCommandInput, ReceiveMessageCommandInput, SendMessageCommandInput, @@ -159,4 +160,16 @@ export class Queue { }; await this.sqs.deleteMessage(request); } + + async changeMessageVisibility( + receiptHandle: string, + visibilityTimeout: number + ) { + const request: ChangeMessageVisibilityCommandInput = { + QueueUrl: this.queueUrl, + ReceiptHandle: receiptHandle, + VisibilityTimeout: visibilityTimeout, + }; + await this.sqs.changeMessageVisibility(request); + } } diff --git a/src/queue/QueueSubjectListener.ts b/src/queue/QueueSubjectListener.ts index c8e6f47..a5b5cbe 100644 --- a/src/queue/QueueSubjectListener.ts +++ b/src/queue/QueueSubjectListener.ts @@ -1,4 +1,7 @@ -import {ReceiveMessageCommandInput} from '@aws-sdk/client-sqs'; +import { + ReceiveMessageCommandInput, + QueueAttributeName, +} from '@aws-sdk/client-sqs'; import {ILogger} from '../ILogger'; import {LoggerWrapper} from '../LoggerWrapper'; import {Queue} from './Queue'; @@ -11,12 +14,37 @@ export type QueueSubjectListenerOptions = { }; export type QueueSubjectListenerMessageHandler = { - (message: unknown, subject: string): void; + (message: unknown, subject: string): Promise; }; +export type QueueSubjectListenerRetryPolicyOptions = { + maxAttempts: number; + backoffDelaySeconds: number; + retryPolicy?: ( + attempt: number, + backoffDelaySeconds: number, + error: unknown + ) => number; +}; + +export const LinearRetryPolicy = ( + attempt: number, + backoffDelaySeconds: number +): number => backoffDelaySeconds * attempt; + +export const ExponentialRetryPolicy = ( + attempt: number, + backoffDelaySeconds: number +) => Math.pow(attempt, backoffDelaySeconds); + export class QueueSubjectListener { - public handlers: Record> = - {}; + public handlers: Record< + string, + Array<{ + handler: QueueSubjectListenerMessageHandler; + retryPolicyOptions?: QueueSubjectListenerRetryPolicyOptions; + }> + > = {}; public isStopped = false; public logger: ILogger; @@ -37,9 +65,22 @@ export class QueueSubjectListener { this.isStopped = true; } - onSubject(subjectName: string, handler: QueueSubjectListenerMessageHandler) { + onSubject( + subjectName: string, + handler: QueueSubjectListenerMessageHandler, + retryPolicyOptions?: QueueSubjectListenerRetryPolicyOptions + ) { this.handlers[subjectName] = this.handlers[subjectName] || []; - this.handlers[subjectName].push(handler); + this.handlers[subjectName].push({ + handler, + retryPolicyOptions: retryPolicyOptions + ? { + maxAttempts: retryPolicyOptions.maxAttempts || 3, + backoffDelaySeconds: retryPolicyOptions.backoffDelaySeconds || 10, + retryPolicy: retryPolicyOptions.retryPolicy || LinearRetryPolicy, + } + : undefined, + }); } listen(params?: ReceiveMessageCommandInput) { @@ -66,6 +107,7 @@ export class QueueSubjectListener { MaxNumberOfMessages: maxNumberOfMessagesOrUndefined, VisibilityTimeout, WaitTimeSeconds, + AttributeNames: [QueueAttributeName.All], }; const response = await this.queue.receiveMessage(currentParams); @@ -89,6 +131,7 @@ export class QueueSubjectListener { message: { message: JSON.parse(json.Message), subject: json.Subject, + attributes: m.Attributes, }, }; } catch (error) { @@ -100,28 +143,84 @@ export class QueueSubjectListener { cntInFlight += messages.length; const promises = messages.map(async m => { - const {message, subject} = m.message; + const {message, subject, attributes} = m.message; + let shouldRetry = false; + let visibilityTimeout: number | undefined; try { if (this.handlers[subject] || this.handlers['*']) { + const subjectHandlers = (this.handlers[subject] || []).concat( + this.handlers['*'] || [] + ); await Promise.all( - (this.handlers[subject] || []) - .concat(this.handlers['*'] || []) - .map(async h => { - try { - await h(message, subject); - } catch (error) { - typeof error === 'string' && this.logger.error(error); + subjectHandlers.map(async h => { + try { + shouldRetry = false; + await h.handler(message, subject); + } catch (error) { + typeof error === 'string' && this.logger.error(error); + + if (!h.retryPolicyOptions) return; + + if (Object.keys(subjectHandlers).length > 1) { + this.logger.info( + `Multiple handlers for message with subject "${m.message.subject}"` + ); + return; } - }) + + const {maxAttempts, backoffDelaySeconds, retryPolicy} = + h.retryPolicyOptions; + const attempt = parseInt( + attributes?.ApproximateReceiveCount || '1' + ); + + if (attempt < maxAttempts) { + shouldRetry = true; + visibilityTimeout = retryPolicy?.( + attempt, + backoffDelaySeconds, + error + ); + + this.logger.debug( + `Message with subject "${m.message.subject}" will be retried` + ); + } + } + }) ); } if (!m.handle) throw Error("'handle' property on message was undefined."); - await this.queue.deleteMessage(m.handle); + if (!shouldRetry) { + await this.queue.deleteMessage(m.handle); + + this.logger.debug( + `Message with subject "${m.message.subject}" deleted` + ); + return; + } + + if ( + typeof visibilityTimeout === 'number' && + visibilityTimeout >= 0 && + visibilityTimeout !== currentParams.VisibilityTimeout + ) { + if (visibilityTimeout >= 0 && visibilityTimeout <= 43200) { + await this.queue.changeMessageVisibility( + m.handle, + visibilityTimeout + ); + } else { + this.logger.warn( + `Invalid visibilityTimeout value: ${visibilityTimeout}` + ); + } + } this.logger.debug( - `Message with subject "${m.message.subject}" deleted` + `Message with subject "${m.message.subject}" kept, visibilityTimeout: ${visibilityTimeout}` ); } catch (error) { typeof error === 'string' && this.logger.error(error); diff --git a/test/QueueSubjectListener.test.ts b/test/QueueSubjectListener.test.ts new file mode 100644 index 0000000..1a7f97a --- /dev/null +++ b/test/QueueSubjectListener.test.ts @@ -0,0 +1,197 @@ +import {Queue} from '../src'; +import {QueueSubjectListener} from '../src/queue/QueueSubjectListener'; + +const messages = [ + { + Body: JSON.stringify({ + Subject: 'test', + Message: JSON.stringify({id: '123', test: 'test'}), + }), + ReceiptHandle: 'test', + }, +]; + +describe('QueueSubjectListener', () => { + describe('listen', () => { + it('should be able to listen to queue and call handler', async () => { + const queueMock = { + receiveMessage: jest.fn().mockResolvedValueOnce({ + Messages: messages, + }), + deleteMessage: jest.fn(Promise.resolve), + } as unknown as Queue; + + const sut = new QueueSubjectListener(queueMock, null, { + maxConcurrentMessage: 1, + waitTimeSeconds: 0, + visibilityTimeout: 0, + receiveTimeout: () => 0, + }); + + const handler = jest.fn(() => Promise.resolve()); + + sut.onSubject('test', handler); + sut.listen(); + + await new Promise(resolve => setTimeout(resolve, 10)); + sut.stop(); + + expect(handler).toHaveBeenCalledTimes(1); + expect(queueMock.deleteMessage).toHaveBeenCalledTimes(1); + }); + + it('should be able to listen to queue and call handler with retry', async () => { + const queueMock = { + receiveMessage: jest.fn().mockResolvedValueOnce({ + Messages: messages, + }), + deleteMessage: jest.fn(Promise.resolve), + changeMessageVisibility: jest.fn(Promise.resolve), + } as unknown as Queue; + + const sut = new QueueSubjectListener(queueMock, null, { + maxConcurrentMessage: 1, + waitTimeSeconds: 0, + visibilityTimeout: 0, + receiveTimeout: () => 0, + }); + + const handler = jest.fn(() => Promise.reject('error')); + + sut.onSubject('test', handler, {maxAttempts: 2, backoffDelaySeconds: 1}); + sut.listen(); + + await new Promise(resolve => setTimeout(resolve, 10)); + sut.stop(); + + expect(handler).toHaveBeenCalledTimes(1); + expect(queueMock.deleteMessage).not.toHaveBeenCalled(); + expect(queueMock.changeMessageVisibility).toHaveBeenCalledTimes(1); + }); + + it('should not retry when multiple handlers are registered', async () => { + const queueMock = { + receiveMessage: jest.fn().mockResolvedValueOnce({ + Messages: messages, + }), + deleteMessage: jest.fn(Promise.resolve), + } as unknown as Queue; + + const sut = new QueueSubjectListener(queueMock, null, { + maxConcurrentMessage: 1, + waitTimeSeconds: 0, + visibilityTimeout: 0, + receiveTimeout: () => 0, + }); + + const handler1 = jest.fn(() => Promise.reject('error')); + const handler2 = jest.fn(() => Promise.resolve()); + + sut.onSubject('test', handler1, {maxAttempts: 2, backoffDelaySeconds: 1}); + sut.onSubject('test', handler2); + sut.listen(); + + await new Promise(resolve => setTimeout(resolve, 10)); + sut.stop(); + + expect(handler1).toHaveBeenCalledTimes(1); + expect(handler2).toHaveBeenCalledTimes(1); + expect(queueMock.deleteMessage).toHaveBeenCalledTimes(1); + }); + + it('should retry when multiple handlers are registered with different subjects', async () => { + const queueMock = { + receiveMessage: jest.fn().mockResolvedValueOnce({ + Messages: messages, + }), + deleteMessage: jest.fn(Promise.resolve), + changeMessageVisibility: jest.fn(Promise.resolve), + } as unknown as Queue; + + const sut = new QueueSubjectListener(queueMock, null, { + maxConcurrentMessage: 1, + waitTimeSeconds: 0, + visibilityTimeout: 0, + receiveTimeout: () => 0, + }); + + const handler1 = jest.fn(() => Promise.reject('error')); + const handler2 = jest.fn(() => Promise.resolve()); + + sut.onSubject('test', handler1, {maxAttempts: 2, backoffDelaySeconds: 1}); + sut.onSubject('test2', handler2); + sut.listen(); + + await new Promise(resolve => setTimeout(resolve, 10)); + sut.stop(); + + expect(handler1).toHaveBeenCalledTimes(1); + expect(handler2).not.toHaveBeenCalled(); + expect(queueMock.deleteMessage).not.toHaveBeenCalled(); + expect(queueMock.changeMessageVisibility).toHaveBeenCalledTimes(1); + }); + + it('should not retry when no retry policy is set', async () => { + const queueMock = { + receiveMessage: jest.fn().mockResolvedValueOnce({ + Messages: messages, + }), + deleteMessage: jest.fn(Promise.resolve), + } as unknown as Queue; + + const sut = new QueueSubjectListener(queueMock, null, { + maxConcurrentMessage: 1, + waitTimeSeconds: 0, + visibilityTimeout: 0, + receiveTimeout: () => 0, + }); + + const handler = jest.fn(() => Promise.reject('error')); + + sut.onSubject('test', handler); + sut.listen(); + + await new Promise(resolve => setTimeout(resolve, 10)); + sut.stop(); + + expect(handler).toHaveBeenCalledTimes(1); + expect(queueMock.deleteMessage).toHaveBeenCalledTimes(1); + }); + + it('should call the retryPolicy when retrying', async () => { + const queueMock = { + receiveMessage: jest.fn().mockResolvedValueOnce({ + Messages: messages, + }), + deleteMessage: jest.fn(Promise.resolve), + changeMessageVisibility: jest.fn(Promise.resolve), + } as unknown as Queue; + + const retryPolicy = jest.fn(() => 1); + + const sut = new QueueSubjectListener(queueMock, null, { + maxConcurrentMessage: 1, + waitTimeSeconds: 0, + visibilityTimeout: 0, + receiveTimeout: () => 0, + }); + + const handler = jest.fn(() => Promise.reject('error')); + + sut.onSubject('test', handler, { + maxAttempts: 2, + backoffDelaySeconds: 1, + retryPolicy: retryPolicy, + }); + sut.listen(); + + await new Promise(resolve => setTimeout(resolve, 10)); + sut.stop(); + + expect(handler).toHaveBeenCalledTimes(1); + expect(queueMock.deleteMessage).not.toHaveBeenCalled(); + expect(queueMock.changeMessageVisibility).toHaveBeenCalledTimes(1); + expect(retryPolicy).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/test/test.ts b/test/test.ts index a43cb64..9d9fad2 100644 --- a/test/test.ts +++ b/test/test.ts @@ -58,6 +58,42 @@ describe('QueueSubjectListener', () => { listener.stop(); }); + + it('should be able to listen to queue and call handler with retry', async () => { + const queueName = 'test-retry-queueName'; + const subjectName = 'test_retry_subject'; + const topicName = 'test_retry_topic'; + const queue = await Queue.createQueue(queueName, localstackEndpoint); + const listener = new QueueSubjectListener(queue, null, { + maxConcurrentMessage: 1, + visibilityTimeout: 0, + waitTimeSeconds: 0, + }); + + const handler = jest.fn(() => Promise.reject('error')); + + listener.onSubject(subjectName, handler, { + maxAttempts: 2, + backoffDelaySeconds: 0, + }); + + listener.listen(); + + const topic = await Topic.createTopic( + topicName, + subjectName, + localstackEndpoint + ); + await queue.subscribeTopic(topic); + const event = {id: '123', test: 'test'}; + await topic.push(event); + + await new Promise(resolve => setTimeout(resolve, 4000)); + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenCalledWith(event, subjectName); + + listener.stop(); + }, 10000); }); it('should run lambda func', async () => {