Skip to content

Commit

Permalink
add better handling on message expire
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Mar 14, 2019
1 parent 7d4636f commit fb11ca9
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- Expose method to check if message was handled
- If message extended time is finished, release the message slot, mark it as handled and emit `timeoutReached` event
- Message is now also event emitter, and all event related to a message will also be emitted on it
- Expose SQS typings for direct usage of the underlying SQS instance without adding it as a dependency to your project
- Allow to pass `MessageGroupId` and `MessageDeduplicationId` FIFO related parameters when sending a message
### Fixed
- Fix mocha test options

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Are you using Squiss to create your queue, as well? Squiss will use `opts.receiv
- **opts.queuePolicy** If specified, will be set as the access policy of the queue when `createQueue` is called. See [the AWS Policy documentation](http://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html) for more information.

### squiss.createQueue()
Creates the configured queue! This returns a promise that resolves with the new queue's URL when it's complete. Note that this can only be called if you set `opts.queueName` when instantiating Squiss.
Creates the configured queue! This returns a promise that resolves with the new queue's URL when it's complete. Note that this can only be called if you set `opts.queueName` when instantiating Squiss.

### squiss.deleteMessage(Message)
Deletes a message, given the full Message object sent to the `message` event. It's much easier to call `message.del()`, but if you need to do it right from the Squiss instance, this is how. Note that the message probably won't be deleted immediately -- it'll be queued for a batch delete. See the constructor notes for how to configure the specifics of that.
Expand All @@ -101,7 +101,7 @@ Deletes all the messages in a queue and init in flight
Sends an individual message to the configured queue, and returns a promise that resolves with AWS's official message metadata: an object containing `MessageId`, `MD5OfMessageAttributes`, and `MD5OfMessageBody`. Arguments:
- **message**. The message to push to the queue. If it's a string, great! If it's an Object, Squiss will call JSON.stringify on it.
- **delay** _optional_. The amount of time, in seconds, to wait before making the message available in the queue. If not specified, the queue's configured value will be used.
- **attributes** _optional_. An optional attributes mapping to associate with the message (will be converted to SQS format automatically). For more information, see [the official AWS documentation](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#sendMessage-property).
- **attributes** _optional_. An optional attributes mapping to associate with the message (will be converted to SQS format automatically). Passing `FIFO_MessageDeduplicationId` and/or `FIFO_MessageGroupId` will be removed and converted to the `MessageDeduplicationId` and `MessageGroupId` message attributes accordingly (needed for FIFO queues). For more information, see [the official AWS documentation](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#sendMessage-property).

### squiss.sendMessages(messages, delay, attributes)
Sends an array of any number of messages to the configured SQS queue, breaking them down into appropriate batch requests executed in parallel (or as much as the default HTTP agent allows). It returns a promise that resolves with a response closely aligned to the official AWS SDK's sendMessageBatch, except the results from all batch requests are merged. Expect a result similar to:
Expand Down
4 changes: 2 additions & 2 deletions src/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class Message extends EventEmitter {
* @returns {Object|string} The parsed message, or the original message string if the format type is unknown.
* @private
*/
public static _formatMessage(msg: string | undefined, format: BodyFormat) {
private static formatMessage(msg: string | undefined, format: BodyFormat) {
switch (format) {
case 'json':
return JSON.parse(msg || EMPTY_BODY);
Expand Down Expand Up @@ -77,7 +77,7 @@ export class Message extends EventEmitter {
this.topicName = unwrapped.TopicArn.substr(unwrapped.TopicArn.lastIndexOf(':') + 1);
}
}
this.body = Message._formatMessage(this.body, opts.bodyFormat);
this.body = Message.formatMessage(this.body, opts.bodyFormat);
this._squiss = opts.squiss;
this._handled = false;
this.attributes = parseMessageAttributes(opts.msg.MessageAttributes);
Expand Down
2 changes: 2 additions & 0 deletions src/attributeUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const BINARY_TYPE = 'Binary';
export type IMessageAttribute = number | string | SQS.Binary | undefined;

export interface IMessageAttributes {
FIFO_MessageDeduplicationId?: string;
FIFO_MessageGroupId?: string;
[k: string]: IMessageAttribute;
}

Expand Down
10 changes: 10 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {createMessageAttributes, IMessageAttributes} from './attributeUtils';
import {isString} from 'ts-type-guards';
import {SQS} from 'aws-sdk';

export {SQS} from 'aws-sdk';

/**
* The maximum number of messages that can be sent in an SQS sendMessageBatch request.
* @type {number}
Expand Down Expand Up @@ -450,6 +452,14 @@ export class Squiss extends EventEmitter {
params.DelaySeconds = delay;
}
if (attributes) {
if (attributes.FIFO_MessageGroupId) {
params.MessageGroupId = attributes.FIFO_MessageGroupId;
delete attributes.FIFO_MessageGroupId;
}
if (attributes.FIFO_MessageDeduplicationId) {
params.MessageDeduplicationId = attributes.FIFO_MessageDeduplicationId;
delete attributes.FIFO_MessageDeduplicationId;
}
params.MessageAttributes = createMessageAttributes(attributes);
}
return this.sqs.sendMessage(params).promise();
Expand Down
47 changes: 46 additions & 1 deletion test/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -920,11 +920,56 @@ describe('index', () => {
inst!.sqs = new SQSStub() as any as SQS;
const buffer = Buffer.from('s');
const spy = sinon.spy(inst!.sqs, 'sendMessage');
return inst!.sendMessage('bar', 10, {baz: 'fizz', num: 1, bin: buffer, empty: undefined}).then(() => {
return inst!.sendMessage('bar', 10, {
baz: 'fizz',
num: 1,
bin: buffer,
empty: undefined,
}).then(() => {
spy.should.be.calledWith({
QueueUrl: 'foo',
MessageBody: 'bar',
DelaySeconds: 10,
MessageAttributes: {
baz: {
DataType: 'String',
StringValue: 'fizz',
},
empty: {
DataType: 'String',
StringValue: '',
},
num: {
DataType: 'Number',
StringValue: '1',
},
bin: {
DataType: 'Binary',
BinaryValue: buffer,
},
},
});
});
});
it('sends a message with a delay and attributes and fifo attributes', () => {
inst = new Squiss({queueUrl: 'foo'});
inst!.sqs = new SQSStub() as any as SQS;
const buffer = Buffer.from('s');
const spy = sinon.spy(inst!.sqs, 'sendMessage');
return inst!.sendMessage('bar', 10, {
FIFO_MessageGroupId: 'groupId',
FIFO_MessageDeduplicationId: 'dedupId',
baz: 'fizz',
num: 1,
bin: buffer,
empty: undefined,
}).then(() => {
spy.should.be.calledWith({
QueueUrl: 'foo',
MessageBody: 'bar',
DelaySeconds: 10,
MessageDeduplicationId: 'dedupId',
MessageGroupId: 'groupId',
MessageAttributes: {
baz: {
DataType: 'String',
Expand Down

0 comments on commit fb11ca9

Please sign in to comment.