Skip to content

Commit

Permalink
Fix #70 Add DLQ rebumitter tool
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Nov 22, 2019
1 parent 4cd61ea commit ce03836
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 65 deletions.
93 changes: 44 additions & 49 deletions src/Resubmitter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

import {SQS_MAX_RECEIVE_BATCH, Squiss} from './Squiss';
import {IMessageToSend, ResubmitterConfig} from './Types';
import {ResubmitterConfig, ResubmitterMutator} from './Types';
import {Message} from './Message';

const DEFAULT_SQUISS_OPTS = {
Expand All @@ -15,77 +15,72 @@ export class Resubmitter {

public squissFrom: Squiss;
public squissTo: Squiss;
private numHandledMessages = 0;
private handledMessages = new Set<string>();
private _numHandledMessages = 0;
private _handledMessages = new Set<string>();
private readonly _limit: number;
private readonly _customMutator?: ResubmitterMutator;
private readonly _releaseTimeoutSeconds: number;

constructor(private config: ResubmitterConfig) {
constructor(config: ResubmitterConfig) {
this.squissFrom = new Squiss({
...this.config.resubmitFromQueueConfig,
...config.resubmitFromQueueConfig,
...DEFAULT_SQUISS_OPTS,
});
this.squissTo = new Squiss({
...this.config.resubmitToQueueConfig,
...config.resubmitToQueueConfig,
...DEFAULT_SQUISS_OPTS,
});
this._limit = config.limit;
this._customMutator = config.customMutator;
this._releaseTimeoutSeconds = config.releaseTimeoutSeconds;
}

public run() {
this.numHandledMessages = 0;
this.handledMessages = new Set<string>();
return this._iteration();
}

private _readMessages(numberOfMessageToRead: number) {
return this.squissFrom.getManualBatch(numberOfMessageToRead);
}

private _sendMessage(messageToSend: IMessageToSend, message: Message) {
return this.squissTo.sendMessage(messageToSend, undefined, message.attributes);
this._numHandledMessages = 0;
this._handledMessages = new Set<string>();
return Promise.resolve()
.then(() => {
return this._iteration();
});
}

private _changeMessageVisibility(message: Message) {
return message.changeVisibility(this.config.releaseTimeoutSeconds);
return message.changeVisibility(this._releaseTimeoutSeconds);
}

private _handleMessage(message: Message): Promise<void> {
return Promise.resolve().then(() => {
console.log(`${++this.numHandledMessages} messages handled`);
const location = message.raw.MessageId ?? '';
if (this.numHandledMessages > this.config.limit || this.handledMessages.has(location)) {
return this._changeMessageVisibility(message);
}
this.handledMessages.add(location);
let body = message.body;
if (this.config.customMutator) {
body = this.config.customMutator(body);
}
return this._sendMessage(body, message)
.then(() => {
return message.del();
})
.catch((err) => {
return this._changeMessageVisibility(message)
.then(() => {
return Promise.reject(err);
});
});
});
console.log(`${++this._numHandledMessages} messages handled`);
const location = message.raw.MessageId ?? '';
if (this._numHandledMessages > this._limit || this._handledMessages.has(location)) {
return this._changeMessageVisibility(message);
}
this._handledMessages.add(location);
let body = message.body;
if (this._customMutator) {
body = this._customMutator(body);
}
return this.squissTo.sendMessage(body, undefined, message.attributes)
.then(() => {
return message.del();
})
.catch((err) => {
return this._changeMessageVisibility(message)
.then(() => {
return Promise.reject(err);
});
});
}

private _iteration(): Promise<void> {
if (this.numHandledMessages >= this.config.limit || this.config.limit <= 0) {
return Promise.resolve();
}
const numberOfMessageToRead =
Math.min(SQS_MAX_RECEIVE_BATCH, Math.max(this.config.limit - this.numHandledMessages, 0));
if (numberOfMessageToRead <= 0) {
const remainingMessagesToHandle = this._limit - this._numHandledMessages;
if (remainingMessagesToHandle <= 0) {
return Promise.resolve();
}
return this._readMessages(numberOfMessageToRead)
const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remainingMessagesToHandle);
return this.squissFrom.getManualBatch(numberOfMessageToRead)
.then((messages) => {
if (!messages.length) {
this.numHandledMessages = this.config.limit;
return Promise.resolve();
this._numHandledMessages = this._limit;
}
const promises = messages.map(this._handleMessage.bind(this));
return Promise.all(promises).then(() => {
Expand Down
30 changes: 14 additions & 16 deletions src/Squiss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,24 +318,22 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
return this._getBatchRequest(queueUrl, Math.min(maxMessagesToGet, SQS_MAX_RECEIVE_BATCH)).promise();
})
.then((data) => {
if (data && data.Messages) {
const parsedMessage: Message[] = [];
const parseMessagesPromises = data.Messages.map((msg) => {
const message = this._createMessageInstance(msg);
return message.parse()
.then(() => {
parsedMessage.push(message);
})
.catch((e: Error) => {
message.release();
});
});
return Promise.all(parseMessagesPromises)
const parsedMessage: Message[] = [];
const messages = data?.Messages ?? [];
const parseMessagesPromises = messages.map((msg) => {
const message = this._createMessageInstance(msg);
return message.parse()
.then(() => {
return Promise.resolve(parsedMessage);
parsedMessage.push(message);
})
.catch((e: Error) => {
message.release();
});
}
return Promise.resolve([]);
});
return Promise.all(parseMessagesPromises)
.then(() => {
return Promise.resolve(parsedMessage);
});
});
}

Expand Down
66 changes: 66 additions & 0 deletions src/test/src/Resubmitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,70 @@ describe('resubmitter', () => {
return resubmitter.run();
});

it('should work with mutation', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
squissFrom!.sqs = new SQSStub(2, 0) as any as SQS;
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const resubmitter = new Resubmitter({
limit: 1,
releaseTimeoutSeconds: 30,
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
customMutator: (obj) => {
return obj;
},
});
resubmitter.squissFrom = squissFrom;
resubmitter.squissTo = squissTo;
return resubmitter.run();
});

it('should not do anything if limit is not positive', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
squissFrom!.sqs = new SQSStub(2, 0) as any as SQS;
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const resubmitter = new Resubmitter({
limit: 0,
releaseTimeoutSeconds: 30,
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
});
resubmitter.squissFrom = squissFrom;
resubmitter.squissTo = squissTo;
return resubmitter.run();
});

it('should stop if no more messages in queue', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
squissFrom!.sqs = new SQSStub(1, 0) as any as SQS;
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const resubmitter = new Resubmitter({
limit: 2,
releaseTimeoutSeconds: 30,
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
});
resubmitter.squissFrom = squissFrom;
resubmitter.squissTo = squissTo;
return resubmitter.run();
});

});

0 comments on commit ce03836

Please sign in to comment.