Skip to content

Commit

Permalink
feat: Support for retrying to deliver SQS message
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrjoh committed Aug 23, 2024
1 parent eca5d2c commit c7153b0
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 17 deletions.
13 changes: 13 additions & 0 deletions src/queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {SNS} from '@aws-sdk/client-sns';
import {
ChangeMessageVisibilityCommandInput,
DeleteMessageCommandInput,
ReceiveMessageCommandInput,
SendMessageCommandInput,
Expand Down Expand Up @@ -159,4 +160,16 @@ export class Queue {
};
await this.sqs.deleteMessage(request);
}

async changeMessageVisibility(
receiptHandle: string,
visibilityTimeout: number
) {
const request: ChangeMessageVisibilityCommandInput = {
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
VisibilityTimeout: visibilityTimeout,
};
await this.sqs.changeMessageVisibility(request);
}
}
99 changes: 82 additions & 17 deletions src/queue/QueueSubjectListener.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import {ReceiveMessageCommandInput} from '@aws-sdk/client-sqs';
import {
ReceiveMessageCommandInput,
QueueAttributeName,
} from '@aws-sdk/client-sqs';
import {ILogger} from '../ILogger';
import {LoggerWrapper} from '../LoggerWrapper';
import {Queue} from './Queue';
Expand All @@ -11,12 +14,22 @@ export type QueueSubjectListenerOptions = {
};

export type QueueSubjectListenerMessageHandler = {
(message: unknown, subject: string): void;
(message: unknown, subject: string): Promise<void>;
};

export type QueueSubjectListenerRetryPolicy = {
maxAttempts: number;
backoffDelay: number;
};

export class QueueSubjectListener {
public handlers: Record<string, Array<QueueSubjectListenerMessageHandler>> =
{};
public handlers: Record<
string,
Array<{
handler: QueueSubjectListenerMessageHandler;
retryPolicy?: QueueSubjectListenerRetryPolicy;
}>
> = {};
public isStopped = false;

public logger: ILogger;
Expand All @@ -37,9 +50,13 @@ export class QueueSubjectListener {
this.isStopped = true;
}

onSubject(subjectName: string, handler: QueueSubjectListenerMessageHandler) {
onSubject(
subjectName: string,
handler: QueueSubjectListenerMessageHandler,
retryPolicy?: QueueSubjectListenerRetryPolicy
) {
this.handlers[subjectName] = this.handlers[subjectName] || [];
this.handlers[subjectName].push(handler);
this.handlers[subjectName].push({handler, retryPolicy});
}

listen(params?: ReceiveMessageCommandInput) {
Expand All @@ -66,6 +83,7 @@ export class QueueSubjectListener {
MaxNumberOfMessages: maxNumberOfMessagesOrUndefined,
VisibilityTimeout,
WaitTimeSeconds,
AttributeNames: [QueueAttributeName.All],
};

const response = await this.queue.receiveMessage(currentParams);
Expand All @@ -89,6 +107,7 @@ export class QueueSubjectListener {
message: {
message: JSON.parse(json.Message),
subject: json.Subject,
attributes: m.Attributes,
},
};
} catch (error) {
Expand All @@ -100,28 +119,74 @@ export class QueueSubjectListener {
cntInFlight += messages.length;

const promises = messages.map(async m => {
const {message, subject} = m.message;
const {message, subject, attributes} = m.message;
let shouldRetry = false;
let visibilityTimeout: number | undefined;
try {
if (this.handlers[subject] || this.handlers['*']) {
const subjectHandlers = (this.handlers[subject] || []).concat(
this.handlers['*'] || []
);
await Promise.all(
(this.handlers[subject] || [])
.concat(this.handlers['*'] || [])
.map(async h => {
try {
await h(message, subject);
} catch (error) {
typeof error === 'string' && this.logger.error(error);
subjectHandlers.map(async h => {
try {
shouldRetry = false;
await h.handler(message, subject);
} catch (error) {
typeof error === 'string' && this.logger.error(error);

if (!h.retryPolicy?.maxAttempts) return;

if (Object.keys(subjectHandlers).length > 1) {
this.logger.info(
`Multiple handlers for message with subject "${m.message.subject}"`
);
return;
}

const maxAttempts = h.retryPolicy.maxAttempts;
const backoffDelay = h.retryPolicy.backoffDelay;
const attempts = parseInt(
attributes?.ApproximateReceiveCount || '1'
);

if (attempts < maxAttempts) {
shouldRetry = true;
visibilityTimeout = attempts * backoffDelay;

this.logger.debug(
`Message with subject "${m.message.subject}" will be retried`
);
}
})
}
})
);
}
if (!m.handle)
throw Error("'handle' property on message was undefined.");

await this.queue.deleteMessage(m.handle);
if (!shouldRetry) {
await this.queue.deleteMessage(m.handle);

this.logger.debug(
`Message with subject "${m.message.subject}" deleted`
);
return;
}

if (
typeof visibilityTimeout === 'number' &&
visibilityTimeout >= 0 &&
visibilityTimeout !== currentParams.VisibilityTimeout
) {
await this.queue.changeMessageVisibility(
m.handle,
visibilityTimeout
);
}

this.logger.debug(
`Message with subject "${m.message.subject}" deleted`
`Message with subject "${m.message.subject}" kept`
);
} catch (error) {
typeof error === 'string' && this.logger.error(error);
Expand Down
33 changes: 33 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,39 @@ describe('QueueSubjectListener', () => {

listener.stop();
});

it('should be able to listen to queue and call handler with retry', async () => {
const queueName = 'test-retry-queueName';
const subjectName = 'test_retry_subject';
const topicName = 'test_retry_topic';
const queue = await Queue.createQueue(queueName, localstackEndpoint);
const listener = new QueueSubjectListener(queue, null, {
maxConcurrentMessage: 1,
visibilityTimeout: 0,
waitTimeSeconds: 0,
});

const handler = jest.fn(() => Promise.reject('error'));

listener.onSubject(subjectName, handler, {maxAttempts: 2, backoffDelay: 0});

listener.listen();

const topic = await Topic.createTopic(
topicName,
subjectName,
localstackEndpoint
);
await queue.subscribeTopic(topic);
const event = {id: '123', test: 'test'};
await topic.push(event);

await new Promise(resolve => setTimeout(resolve, 4000));
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith(event, subjectName);

listener.stop();
}, 10000);
});

it('should run lambda func', async () => {
Expand Down

0 comments on commit c7153b0

Please sign in to comment.