Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Serverless Patter - FIFO SQS Redrive #2060

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions fifo-sqs-redrive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Redrive SQS FIFO Queue messages from DLQ

The patterns shows the redrive capability of Amazon SQS FIFO queues from dead letter queue. The SAM template deploys two Amazon SQS FIO queues with DLQ, and two AWS Lambda functions to simulate the redrive capability of SQS.

Learn more about this pattern at Serverless Land Patterns:https://serverlessland.com/patterns/fifio-sqs-redrive

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

- [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
- [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
- [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
2. Change directory to the pattern directory:
```
cd fifio-sqs-redrive
biswanathmukherjee marked this conversation as resolved.
Show resolved Hide resolved
```
3. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
```
sam deploy --guided
```
4. During the prompts:
* Enter a stack name
* Enter `us-east-1` or any other supported AWS Region.
* Allow SAM CLI to create IAM roles with the required permissions. Please keep all other options to default.
5. Make a note of the output, which will be used during testing.

## How it works

* This template creates two Amazon SQS queues - `MyOriginalQueue.fifo` and `MyReProcessQueue.fifo` along with DLQ `MyDeadLetterQueue.fifo`.
* The template also creates two AWS Lambda functions `MyOriginalQueuePollerFunction` and `MyReProcessQueuePollerFunction` to poll messages from `MyOriginalQueue.fifo` and `MyReProcessQueue.fifo` respectively through event source mapping.
biswanathmukherjee marked this conversation as resolved.
Show resolved Hide resolved
* The Lambda function `MyOriginalQueuePollerFunction` raises exception to similate message processing failure. Hence the message moves to DLQ `MyDeadLetterQueue.fifo` once the retry is exhausted.
biswanathmukherjee marked this conversation as resolved.
Show resolved Hide resolved
* We then redrive the message from DLQ `MyDeadLetterQueue.fifo` to `MyReProcessQueue.fifo` using AWS CLI command.
* Message is successfully processed by `MyReProcessQueuePollerFunction` Lambda function.


Please refer to the architecture diagram below:

![End to End Architecture](image/architecture.png)

## Testing

1. Run the below aws cli command to push a JSON message into `MyOriginalQueue.fifo`. Please replace `{MyOriginalQueueUrl}` with the value received from the `sam deploy` command output and also replace `{your-region}` with your deployment region.
```bash
aws sqs send-message --queue-url {MyOriginalQueueUrl} --message-body '{"id":123,"name":"John","data":{"age":30,"job":"Developer"}}' --message-group-id "group1" --region {your-region}
```

Sample output:
```bash
{
"MD5OfMessageBody": "2a33xxxxxxxxxxxxxd34a",
"MessageId": "3809xxx-xxxx-xxxx-xxxx-exxxxb88b279",
"SequenceNumber": "18883xxxxxxx616"
}
```
2. Now, open [AWS SQS Console](https://console.aws.amazon.com/sqs), select your deployment region and validate the number of messages in `MyOriginalQueue.fifo`, `MyReProcessQueue.fifo` and `MyDeadLetterQueue.fifo` queues. You may have to refresh a few times while the message is being processed by `MyOriginalQueuePollerFunction` AWS Lambda function. The message should be in the DLQ. PLease refer to the diagram below:
![The message in DLQ](image/msg-in-dlq.png)




3. Also, validate from the Amazon CloudWatch log of `MyOriginalQueuePollerFunction` that it failed to process the message due to an exception.
![Lambda processing error](image/lambda-processing-error.png)



4. Now, let us redrive the message from `MyDeadLetterQueue.fifo` to `MyReProcessQueue.fifo` using the below aws cli command. Please replace `{MyOriginalQueueUrl}` and `{MyReProcessQueueArn}` with the value received from the `sam deploy` command output and also replace `{your-region}` with your deployment region.
```bash
aws sqs start-message-move-task --source-arn {MyDeadLetterQueueArn} --destination-arn {MyReProcessQueueArn} --region {your-region}
```

Sample output:
```bash
{
"TaskHandle": "eyJ0YXxxxxxxxxxxxxxxxxm8ifQ=="
}
```


5. Again, let us validate the number of messages in `MyOriginalQueue.fifo`, `MyReProcessQueue.fifo` and `MyDeadLetterQueue.fifo` queues from [AWS SQS Console](https://console.aws.amazon.com/sqs). You may have to refresh a few times while the message is being processed by `MyReProcessQueuePollerFunction` AWS Lambda function. There should not be any messages in any of the queues now as the message got successfully reprocessed now. PLease refer to the diagram below:
![The message in DLQ](image/all-msg-processed.png)



6. Also, validate from the Amazon CloudWatch log of `MyReProcessQueuePollerFunction` that it processed the message successfully.
![Lambda processing error](image/lambda-reprocessed-msg.png)


## Cleanup


1. Delete the stack
```bash
sam delete
```

----
Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
60 changes: 60 additions & 0 deletions fifo-sqs-redrive/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"title": "Redrive SQS FIFO Queue messages from DLQ",
"description": "The SAM template deploys Amazon SQS FIFO queues with DLQ and AWS Lambda functions to simulate the redrive capability of SQS FIFO queues from DLQ.",
"language": "Python",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This template creates two Amazon SQS queues - MyOriginalQueue.fifo and MyReProcessQueue.fifo along with DLQ MyDeadLetterQueue.fifo.",
"The template also creates two AWS Lambda functions MyOriginalQueuePollerFunction and MyReProcessQueuePollerFunction to poll messages from MyOriginalQueue.fifo and MyReProcessQueue.fifo respectively through event source mapping.",
"The Lambda function MyOriginalQueuePollerFunction raises exception to similate message processing failure. Hence the message moves to DLQ MyDeadLetterQueue.fifo once the retry is exhausted.",
biswanathmukherjee marked this conversation as resolved.
Show resolved Hide resolved
"We then redrive the message from DLQ MyDeadLetterQueue.fifo to MyReProcessQueue.fifo using AWS CLI command.",
"Message is successfully processed by MyReProcessQueuePollerFunction Lambda function."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/fifio-sqs-redrive",
"templateURL": "serverless-patterns/fifio-sqs-redrive",
"projectFolder": "fifio-sqs-redrive",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Moving messages out of a dead-letter queue",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html#sqs-dead-letter-queues-redrive"
},
{
"text": "AWS CLI Command to start message redrive",
"link": "https://awscli.amazonaws.com/v2/documentation/api/latest/reference/sqs/start-message-move-task.html"
}
]
},
"deploy": {
"text": [
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Biswanath Mukherjee",
"image": "https://d1rwvjey2iif32.cloudfront.net",
"bio": "I am a Sr. Solutions Architect working at AWS India.",
"linkedin": "biswanathmukherjee"
}
]
}
Binary file added fifo-sqs-redrive/image/all-msg-processed.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added fifo-sqs-redrive/image/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added fifo-sqs-redrive/image/lambda-reprocessed-msg.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added fifo-sqs-redrive/image/msg-in-dlq.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions fifo-sqs-redrive/originalmessage/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import json
import boto3
import os

sqs = boto3.client('sqs')

def lambda_handler(event, context):

for record in event['Records']:
message = json.loads(record['body'])
print(message)
raise Exception('Failed to process message')


18 changes: 18 additions & 0 deletions fifo-sqs-redrive/redrivenmessage/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import json
import boto3
import os

sqs = boto3.client('sqs')

def lambda_handler(event, context):

for record in event['Records']:
message = json.loads(record['body'])

print("Message redriven from DLQ")
print(f"Processed message: {message}")
# Delete message if processed successfully
sqs.delete_message(
QueueUrl=os.environ['QUEUE_URL'],
ReceiptHandle=record['receiptHandle']
)
145 changes: 145 additions & 0 deletions fifo-sqs-redrive/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: SQS with DLQ and Lambda event source

Resources:
## Define the SQS Queues ##
MyOriginalQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: MyOriginalQueue.fifo
FifoQueue: true
ContentBasedDeduplication: true
VisibilityTimeout: 30
MessageRetentionPeriod: 1209600
RedrivePolicy:
deadLetterTargetArn: !GetAtt MyDeadLetterQueue.Arn
maxReceiveCount: 1

MyDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: MyDeadLetterQueue.fifo
FifoQueue: true
ContentBasedDeduplication: true
MessageRetentionPeriod: 1209600
RedriveAllowPolicy:
redrivePermission: "allowAll"

MyReProcessQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: MyReProcessQueue.fifo
FifoQueue: true
ContentBasedDeduplication: true
VisibilityTimeout: 30
MessageRetentionPeriod: 1209600
RedrivePolicy:
deadLetterTargetArn: !GetAtt MyDeadLetterQueue.Arn
maxReceiveCount: 1


## Define the Lambda Functions ##
MyOriginalQueuePollerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: originalmessage/
Handler: app.lambda_handler
Runtime: python3.12
FunctionName: MyOriginalQueuePollerFunction
Policies:
- SQSPollerPolicy:
QueueName: !GetAtt MyOriginalQueue.QueueName
Environment:
Variables:
QUEUE_URL: !Ref MyOriginalQueue
Events:
SqsEvent:
Type: SQS
Properties:
Queue: !GetAtt MyOriginalQueue.Arn
BatchSize: 1
biswanathmukherjee marked this conversation as resolved.
Show resolved Hide resolved

MyReProcessQueuePollerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: redrivenmessage/
Handler: app.lambda_handler
Runtime: python3.12
FunctionName: MyReProcessQueuePollerFunction
Policies:
- SQSPollerPolicy:
QueueName: !GetAtt MyReProcessQueue.QueueName
Environment:
Variables:
QUEUE_URL: !Ref MyReProcessQueue
Events:
SqsEvent:
Type: SQS
Properties:
Queue: !GetAtt MyReProcessQueue.Arn
BatchSize: 1
biswanathmukherjee marked this conversation as resolved.
Show resolved Hide resolved


## Define the SQS Queue Policies ##
biswanathmukherjee marked this conversation as resolved.
Show resolved Hide resolved
MyOriginalQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Sid: Allow-Queue-operations
Effect: Allow
Principal:
AWS: !Sub arn:aws:iam::${AWS::AccountId}:root
Action:
- SQS:DeleteMessage
- SQS:ReceiveMessage
- SQS:SendMessage
Resource: !GetAtt MyOriginalQueue.Arn
Queues:
- !Ref MyOriginalQueue

MyDeadLetterQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Sid: Allow-Queue-operations
Effect: Allow
Principal:
AWS: !Sub arn:aws:iam::${AWS::AccountId}:root
Action:
- SQS:DeleteMessage
- SQS:ReceiveMessage
- SQS:SendMessage
Resource: !GetAtt MyDeadLetterQueue.Arn
Queues:
- !Ref MyDeadLetterQueue

MyReProcessQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Sid: Allow-Queue-operations
Effect: Allow
Principal:
AWS: !Sub arn:aws:iam::${AWS::AccountId}:root
Action:
- SQS:DeleteMessage
- SQS:ReceiveMessage
- SQS:SendMessage
Resource: !GetAtt MyReProcessQueue.Arn
Queues:
- !Ref MyReProcessQueue

Outputs:
MyOriginalQueueUrl:
Description: "MyOriginalQueue URL"
Value: !Ref MyOriginalQueue
MyDeadLetterQueueArn:
Description: "MyDeadLetterQueue ARN"
Value: !GetAtt MyDeadLetterQueue.Arn
MyReProcessQueueArn:
Description: "MyReProcessQueue ARN"
Value: !GetAtt MyReProcessQueue.Arn
Loading