Skip to content

Commit

Permalink
Fix #70 Add DLQ rebumitter tool
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Nov 30, 2019
1 parent ff12892 commit e8e7836
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 4 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
},
"homepage": "https://github.com/PruvoNet/squiss-ts#readme",
"dependencies": {
"aws-sdk": "^2.578.0",
"aws-sdk": "^2.580.0",
"iltorb": "2.4.3",
"linked-list": "^2.1.0",
"ts-type-guards": "^0.6.1",
Expand All @@ -60,7 +60,7 @@
"@types/chai-as-promised": "^7.1.2",
"@types/iltorb": "^2.3.0",
"@types/mocha": "^5.2.7",
"@types/node": "^12.12.11",
"@types/node": "^12.12.14",
"@types/proxyquire": "^1.3.28",
"@types/uuid": "^3.4.6",
"chai": "^4.2.0",
Expand All @@ -73,7 +73,7 @@
"sinon": "^7.5.0",
"sinon-chai": "^3.3.0",
"source-map-support": "^0.5.16",
"ts-node": "^8.5.2",
"ts-node": "^8.5.4",
"tslint": "^5.20.1",
"typescript": "^3.7.2"
}
Expand Down
2 changes: 1 addition & 1 deletion src/Squiss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
parsedMessage.push(message);
})
.catch((e: Error) => {
if (failedMessageVisibility){
if (failedMessageVisibility) {
message.changeVisibility(failedMessageVisibility);
} else {
message.release();
Expand Down
133 changes: 133 additions & 0 deletions src/test/src/Resubmitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,137 @@ describe('resubmitter', () => {
});
});

it('should keep handled messages if option set', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
const stub = new SQSStub(1, 0);
squissFrom!.sqs = stub as any as SQS;
if (stub.msgs[0]) {
stub.msgs[0].MessageId = 'myId';
}
const visibilitySpy = sinon.spy(squissFrom!.sqs, 'changeMessageVisibility');
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const spy = sinon.stub().resolvesArg(0);
const resubmitter = new Resubmitter({
limit: 1,
releaseTimeoutSeconds: 45,
queues: {
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
},
keepHandledMessages: true,
customMutator: spy,
});
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run()
.then(() => {
spy.should.have.callCount(1);
visibilitySpy.should.be.calledWith({
QueueUrl: 'foo_DLQ',
ReceiptHandle: '0',
VisibilityTimeout: 45,
});
});
});

it('should reject if failed to handle message', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
const stub = new SQSStub(1, 0);
squissFrom!.sqs = stub as any as SQS;
if (stub.msgs[0]) {
stub.msgs[0].MessageId = 'myId';
}
const visibilitySpy = sinon.spy(squissFrom!.sqs, 'changeMessageVisibility');
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const spy = sinon.stub().resolvesArg(0);
const sendMessageSpy = sinon.stub(squissTo!.sqs, 'sendMessage').returns({
promise: () => {
return Promise.reject(new Error('myFail'));
},
});
const resubmitter = new Resubmitter({
limit: 1,
releaseTimeoutSeconds: 45,
queues: {
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
},
customMutator: spy,
});
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run()
.then(() => {
throw new Error('was not supposed to succeed');
})
.catch((err) => {
err.message.should.eql('myFail');
spy.should.have.callCount(1);
sendMessageSpy.should.have.callCount(1);
visibilitySpy.should.be.calledWith({
QueueUrl: 'foo_DLQ',
ReceiptHandle: '0',
VisibilityTimeout: 45,
});
});
});

it('should continue if failed to handle message', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
const stub = new SQSStub(1, 0);
squissFrom!.sqs = stub as any as SQS;
if (stub.msgs[0]) {
stub.msgs[0].MessageId = 'myId';
}
const visibilitySpy = sinon.spy(squissFrom!.sqs, 'changeMessageVisibility');
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const spy = sinon.stub().resolvesArg(0);
const sendMessageSpy = sinon.stub(squissTo!.sqs, 'sendMessage').returns({
promise: () => {
return Promise.reject(new Error('myFail'));
},
});
const resubmitter = new Resubmitter({
limit: 1,
releaseTimeoutSeconds: 45,
queues: {
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
},
keepHandledMessages: true,
customMutator: spy,
continueOnFail: true,
});
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run()
.then(() => {
spy.should.have.callCount(1);
sendMessageSpy.should.have.callCount(1);
visibilitySpy.should.be.calledWith({
QueueUrl: 'foo_DLQ',
ReceiptHandle: '0',
VisibilityTimeout: 45,
});
});
});

});

0 comments on commit e8e7836

Please sign in to comment.