Skip to content

Commit

Permalink
feat: add the ability to extend errors from aws (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn authored May 8, 2024
1 parent 645901d commit f4b071b
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 23 deletions.
32 changes: 27 additions & 5 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export class Consumer extends TypedEventEmitter {
private isPolling = false;
private stopRequestedAtTimestamp: number;
public abortController: AbortController;
private extendedAWSErrors: boolean;

constructor(options: ConsumerOptions) {
super();
Expand All @@ -82,6 +83,7 @@ export class Consumer extends TypedEventEmitter {
this.pollingCompleteWaitTimeMs = options.pollingCompleteWaitTimeMs ?? 0;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.extendedAWSErrors = options.extendedAWSErrors ?? false;
this.sqs =
options.sqs ||
new SQSClient({
Expand Down Expand Up @@ -297,7 +299,11 @@ export class Consumer extends TypedEventEmitter {

return result;
} catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
throw toSQSError(
err,
`SQS receive message failed: ${err.message}`,
this.extendedAWSErrors,
);
}
}

Expand Down Expand Up @@ -439,7 +445,11 @@ export class Consumer extends TypedEventEmitter {
} catch (err) {
this.emit(
"error",
toSQSError(err, `Error changing visibility timeout: ${err.message}`),
toSQSError(
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
),
message,
);
}
Expand Down Expand Up @@ -470,7 +480,11 @@ export class Consumer extends TypedEventEmitter {
} catch (err) {
this.emit(
"error",
toSQSError(err, `Error changing visibility timeout: ${err.message}`),
toSQSError(
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
),
messages,
);
}
Expand Down Expand Up @@ -567,7 +581,11 @@ export class Consumer extends TypedEventEmitter {
this.sqsSendOptions,
);
} catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
throw toSQSError(
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
);
}
}

Expand Down Expand Up @@ -601,7 +619,11 @@ export class Consumer extends TypedEventEmitter {
this.sqsSendOptions,
);
} catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
throw toSQSError(
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
);
}
}
}
15 changes: 13 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ class SQSError extends Error {
service: string;
time: Date;
retryable: boolean;
fault: "client" | "server";
fault: AWSError["$fault"];
response?: AWSError["$response"];
metadata?: AWSError["$metadata"];

constructor(message: string) {
super(message);
Expand Down Expand Up @@ -67,7 +69,11 @@ function isConnectionError(err: Error): boolean {
* @param err The error object that was received.
* @param message The message to send with the error.
*/
function toSQSError(err: AWSError, message: string): SQSError {
function toSQSError(
err: AWSError,
message: string,
extendedAWSErrors: boolean,
): SQSError {
const sqsError = new SQSError(message);
sqsError.code = err.name;
sqsError.statusCode = err.$metadata?.httpStatusCode;
Expand All @@ -76,6 +82,11 @@ function toSQSError(err: AWSError, message: string): SQSError {
sqsError.fault = err.$fault;
sqsError.time = new Date();

if (extendedAWSErrors) {
sqsError.response = err.$response;
sqsError.metadata = err.$metadata;
}

return sqsError;
}

Expand Down
42 changes: 33 additions & 9 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ export interface ConsumerOptions {
* example to add middlewares.
*/
postReceiveMessageCallback?(): Promise<void>;
/**
* Set this to `true` if you want to receive additional information about the error
* that occurred from AWS, such as the response and metadata.
*/
extendedAWSErrors?: boolean;
}

/**
Expand Down Expand Up @@ -231,7 +236,7 @@ export type AWSError = {
/**
* Name, eg. ConditionalCheckFailedException
*/
name: string;
readonly name: string;

/**
* Human-readable error response message
Expand All @@ -246,7 +251,26 @@ export type AWSError = {
/**
* Whether the client or server are at fault.
*/
readonly $fault?: "client" | "server";
readonly $fault: "client" | "server";

/**
* Represents an HTTP message as received in reply to a request
*/
readonly $response?: {
/**
* The status code of the HTTP response.
*/
statusCode?: number;
/**
* The headers of the HTTP message.
*/
headers: Record<string, string>;
/**
* The body of the HTTP message.
* Can be: ArrayBuffer | ArrayBufferView | string | Uint8Array | Readable | ReadableStream
*/
body?: any;
};

/**
* The service that encountered the exception.
Expand All @@ -263,37 +287,37 @@ export type AWSError = {
readonly throttling?: boolean;
};

$metadata?: {
readonly $metadata: {
/**
* The status code of the last HTTP response received for this operation.
*/
httpStatusCode?: number;
readonly httpStatusCode?: number;

/**
* A unique identifier for the last request sent for this operation. Often
* requested by AWS service teams to aid in debugging.
*/
requestId?: string;
readonly requestId?: string;

/**
* A secondary identifier for the last request sent. Used for debugging.
*/
extendedRequestId?: string;
readonly extendedRequestId?: string;

/**
* A tertiary identifier for the last request sent. Used for debugging.
*/
cfId?: string;
readonly cfId?: string;

/**
* The number of times this operation was attempted.
*/
attempts?: number;
readonly attempts?: number;

/**
* The total amount of time (in milliseconds) that was spent waiting between
* retry attempts.
*/
totalRetryDelay?: number;
readonly totalRetryDelay?: number;
};
};
93 changes: 86 additions & 7 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ const mockChangeMessageVisibilityBatch = sinon.match.instanceOf(

class MockSQSError extends Error implements AWSError {
name: string;
$metadata: {
httpStatusCode: number;
};
$metadata: AWSError["$metadata"];
$service: string;
$retryable: {
throttling: boolean;
};
$fault: "client" | "server";
$retryable: AWSError["$retryable"];
$fault: AWSError["$fault"];
$response?:
| {
statusCode?: number | undefined;
headers: Record<string, string>;
body?: any;
}
| undefined;
time: Date;

constructor(message: string) {
Expand Down Expand Up @@ -245,6 +248,82 @@ describe("Consumer", () => {
assert.equal(err.time.toString(), receiveErr.time.toString());
assert.equal(err.service, receiveErr.$service);
assert.equal(err.fault, receiveErr.$fault);
assert.isUndefined(err.response);
assert.isUndefined(err.metadata);
});

it('includes the response and metadata in the error when "extendedAWSErrors" is true', async () => {
const receiveErr = new MockSQSError("Receive error");
receiveErr.name = "short code";
receiveErr.$retryable = {
throttling: false,
};
receiveErr.$metadata = {
httpStatusCode: 403,
};
receiveErr.time = new Date();
receiveErr.$service = "service";
receiveErr.$response = {
statusCode: 200,
headers: {},
body: "body",
};

sqs.send.withArgs(mockReceiveMessage).rejects(receiveErr);

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
extendedAWSErrors: true,
});

consumer.start();
const err: any = await pEvent(consumer, "error");
consumer.stop();

assert.ok(err);
assert.equal(err.response, receiveErr.$response);
assert.equal(err.metadata, receiveErr.$metadata);
});

it("does not include the response and metadata in the error when extendedAWSErrors is false", async () => {
const receiveErr = new MockSQSError("Receive error");
receiveErr.name = "short code";
receiveErr.$retryable = {
throttling: false,
};
receiveErr.$metadata = {
httpStatusCode: 403,
};
receiveErr.time = new Date();
receiveErr.$service = "service";
receiveErr.$response = {
statusCode: 200,
headers: {},
body: "body",
};

sqs.send.withArgs(mockReceiveMessage).rejects(receiveErr);

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
extendedAWSErrors: false,
});

consumer.start();
const err: any = await pEvent(consumer, "error");
consumer.stop();

assert.ok(err);
assert.isUndefined(err.response);
assert.isUndefined(err.metadata);
});

it("fires a timeout event if handler function takes too long", async () => {
Expand Down

0 comments on commit f4b071b

Please sign in to comment.