-
Notifications
You must be signed in to change notification settings - Fork 937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New serverless pattern - kinesis-lambda-error-handling #1638
Changes from 3 commits
fb39298
c1ca2c2
e2a1064
f3cdafb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
# Effective consumer strategies for handling Kinesis Data Stream anomalies | ||
|
||
The purpose of this pattern is to showcase on how to handle the consumer(AWS Lambda) failure when reading/processing the records from the Amazon Kinesis data stream. | ||
|
||
A Kinesis data stream is a set of shards. Each shard contains a sequence of data records. A consumer is an application that processes the data from a Kinesis data stream. The event source mapping that reads records from the Kinesis stream, invokes AWS Lambda function synchronously, and retries on errors. If Lambda throttles the function or returns an error without invoking the function, Lambda retries until the records expire or exceed the maximum age that you configure on the event source mapping. | ||
|
||
If the error handling measures fail, Lambda discards the records and continues processing batches from the stream. With the default settings, this means that a bad record can block processing on the affected shard for up to one week. To avoid this, we are going to configure function's event source mapping in this pattern with a reasonable number of retries and a maximum record age. | ||
|
||
To retain a record of discarded batches, we are going to configure a failed-event destination. Lambda sends the failed record to the destination - AWS SQS. | ||
|
||
Learn more about this pattern at [Serverless Land Patterns](https://serverlessland.com/patterns/kinesisds-lambda-error-handling). | ||
|
||
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. | ||
|
||
## Requirements | ||
|
||
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. | ||
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured | ||
* [Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Installed | ||
* [AWS SAM](https://aws.amazon.com/serverless/sam/) The AWS Serverless Application Model (SAM) is an open-source framework for building serverless applications. | ||
|
||
* [AWS Cloud9](https://aws.amazon.com/cloud9/) Other alternative is to use Cloud IDE which comes with prepackaged programming languages, CLIs needed for this project. | ||
|
||
## Deployment Instructions | ||
|
||
1. Clone the project to your local working directory | ||
|
||
```sh | ||
git clone https://github.com/aws-samples/serverless-patterns/ | ||
``` | ||
|
||
2. Change the working directory to this pattern's directory | ||
|
||
```sh | ||
cd serverless-patterns/kinesis-lambda-error-handling | ||
``` | ||
3. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yml file: | ||
``` | ||
sam build | ||
sam deploy --guided | ||
``` | ||
4. During the prompts: | ||
|
||
- Enter a stack name | ||
- Enter the desired AWS Region | ||
- Allow SAM CLI to create IAM roles with the required permissions. | ||
|
||
Once you have run `sam deploy --guided` mode and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in the future to use these defaults. | ||
Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. | ||
|
||
## How it works | ||
![Reference Architecture](/images/kinesis-lambda-error-handling.jpg) | ||
|
||
The pattern builds infrastructure with Amazon Kinesis data stream, a consumer Lambda function, SQS queue to load the failed records for further troubleshooting, and cloudwatch to validate the logs for the success, failure with retries. | ||
|
||
The event source mapping that reads records from your Kinesis stream, invokes AWS Lambda function synchronously, and retries on errors. If Lambda throttles the function or returns an error without invoking the function, Lambda retries until the records expire or exceed the maximum age that you configure on the event source mapping. | ||
|
||
## Testing | ||
To test the pattern, we are breaking the test cases into two scenarios. | ||
|
||
### Scenario 1: Put messages without the Poision pill | ||
- test-records-without-poison-pill.json --> Holds the base64 data with partition key | ||
- without-poison-pill-put-records.sh --> bash script to put records into Kinesis data stream | ||
|
||
``` | ||
chmod +x kinesis-producer/*.sh; | ||
./kinesis-producer/without-poison-pill-put-records.sh | ||
``` | ||
|
||
- Navigate to AWS Console, and then to Cloudwatch, Log groups, select the log group and the latest Log stream | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about using sam logs to do this in the CLI? As these instructions are not quite clear of what we should be looking at. |
||
- In the logs, you should see all the 3 messages processed without any exception. | ||
|
||
### Scenario 2: Put messages with the Poision pill | ||
- test-records-with-poison-pill.json --> Holds the base64 data with partition key | ||
- with-poison-pill-put-records.sh --> bash script to put records into Kinesis data stream | ||
|
||
``` | ||
./kinesis-producer/with-poison-pill-put-records.sh | ||
``` | ||
|
||
- Cloudwatch logs shows that there was one invalid or poison message, so here the bisectonfailure was applied which lead Lambda to split the batch in half and resume each half separately. Maximum retry attempts and maximum record age limit the number of retries on a failed batch. As the retry limit is 5, it will retry 5 times, before the message is put to AWS SQS. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, use sam logs to get the logs |
||
|
||
## Validation | ||
After the Scenario 2, the invalid/poison pill message is put in the SQS queue for the further research. | ||
Replace the AWSACCOUNTID with the AWS account number in the validation-scripts/read-sqs-queue.sh | ||
|
||
``` | ||
vi validation-scripts/read-sqs-queue.sh | ||
``` | ||
Run the script as below to see the message details. | ||
``` | ||
chmod +x validation-scripts/read-sqs-queue.sh; | ||
./validation-scripts/read-sqs-queue.sh | ||
``` | ||
|
||
We highly recommend to use [Amazon Kinesis Data Generator(KDG)](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) for the high volume testing. The KDG makes it simple to send test data to your Amazon Kinesis stream or Amazon Kinesis Firehose delivery stream. | ||
|
||
## Cleanup | ||
sam delete | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. format as bash / code |
||
|
||
## Reference | ||
- [AWS SAM](https://aws.amazon.com/serverless/sam/) | ||
- [AWS Lambda with Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html) | ||
- [Lambda event source mappaings](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) | ||
- [Amazon Kinesis Data Generator](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) | ||
---- | ||
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
{ | ||
"title": "Effective consumer strategies for handling Kinesis Data Stream anomalies", | ||
"description": "To showcase on how to handle the consumer(AWS Lambda) failure when reading the records from the Amazon Kinesis data stream.", | ||
"language": "Python", | ||
"level": "200", | ||
"framework": "SAM", | ||
"introBox": { | ||
"headline": "How it works", | ||
"text": [ | ||
"The purpose of this pattern is to deploy the infrastructure to showcase on how handle the consumer(AWS Lambda) failure when reading the records from the Amazon Kinesis data stream.", | ||
"If the error handling measures fail, Lambda discards the records and continues processing batches from the stream. With the default settings, this means that a bad record can block processing on the affected shard for up to one week. To avoid this, in this project we are going to configure function's event source mapping with a reasonable number of retries and a maximum record age.", | ||
"To retain a record of discarded batches, we are going to configure a failed-event destination. Lambda sends the record to the destination - AWS SQS." | ||
] | ||
}, | ||
"gitHub": { | ||
"template": { | ||
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/kinesis-lambda-error-handling", | ||
"templateURL": "serverless-patterns/kinesis-lambda-error-handling", | ||
"projectFolder": "kinesis-lambda-error-handling", | ||
"templateFile": "kinesis-lambda-error-handling/template.yaml" | ||
} | ||
}, | ||
"resources": { | ||
"bullets": [{ | ||
"text": "AWS SAM", | ||
"link": "https://aws.amazon.com/serverless/sam/" | ||
}, | ||
{ | ||
"text": "AWS Lambda with Amazon Kinesis", | ||
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html" | ||
}, | ||
{ | ||
"text": "Lambda event source mappings", | ||
"link": "https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html" | ||
}, | ||
{ | ||
"text": "Amazon Kinesis Data Generator", | ||
"link": "https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/" | ||
} | ||
] | ||
}, | ||
"deploy": { | ||
"text": [ | ||
"sam deploy --guided" | ||
] | ||
}, | ||
"testing": { | ||
"text": [ | ||
"See the README in the GitHub repo for detailed testing instructions." | ||
] | ||
}, | ||
"cleanup": { | ||
"text": ["Delete the stack: <code>sam delete</code>."] | ||
}, | ||
"authors": [ | ||
{ | ||
"name": "Manjunath Arakere", | ||
"image": "https://d2908q01vomqb2.cloudfront.net/22d200f8670dbdb3e253a90eee5098477c95c23d/2023/07/06/marakere.jpg", | ||
"bio": "Senior Solutions Architect @ AWS. Serverless enthusiast." | ||
} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import boto3 | ||
import logging | ||
import os | ||
import base64 | ||
import json | ||
|
||
# Initialize AWS SDK clients Kinesis | ||
kinesis_client = boto3.client('kinesis') | ||
|
||
# Environment variables for lambda function read message from Kinesis data Stream | ||
KINESIS_STREAM_NAME = os.environ['KINESIS_STREAM_NAME'] | ||
|
||
# Initialize and configure logging | ||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
|
||
# Entry point to the Lambda function | ||
def lambda_handler(event, context): | ||
# Record/Data should match the below Keys and the data type | ||
required_fields = {"AtomicNumber": int, "Element": str, "Symbol": str, "AtomicMass": float} | ||
logger.info(f"Incoming event: --> {event}") | ||
|
||
# Variable to print the Unique sequence of each record | ||
curRecordSequenceNumber = "" | ||
|
||
# Loop through the Records to read each record | ||
for record in event['Records']: | ||
curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] | ||
logger.info(f"Sequence Number of the current record --> {curRecordSequenceNumber}") | ||
|
||
# Convert the base64 data into utf before validating for the schema | ||
payload = json.loads(base64.b64decode(record['kinesis']['data']).decode('utf-8')) | ||
logger.info(f"Individual record content --> {payload}") | ||
if not isinstance(payload, dict): | ||
logger.info("Invalid JSON Data Structure.The parsed data does not adhere to the expeced JSON data structure.") | ||
raise ValueError("Invalid JSON Data Structure", | ||
"The parsed data does not adhere to the expeced JSON data structure.") | ||
|
||
# Verify if the key, value are as per expectations | ||
for key, value_type in required_fields.items(): | ||
if key not in payload: | ||
logger.info(f"Missing '{key}' field in JSON.") | ||
raise ValueError(f"Missing '{key}' field in JSON.") | ||
if not isinstance(payload[key], value_type): | ||
logger.info(f"'{key}' field should be of type {value_type.__name__}.") | ||
raise ValueError(f"'{key}' field should be of type {value_type.__name__}.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
[ | ||
{ | ||
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogNCwKICAgIkVsZW1lbnQiOiAiQmVyeWxsaXVtIiwKICAgIlN5bWJvbCI6ICJCZSIsCiAgICJBdG9taWNNYXNzIjogOS4wMTIKfQ==", | ||
"PartitionKey": "key" | ||
}, | ||
{ | ||
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogNSwKICAgIkVsZW1lbnQiOiAiQm9yb24iLAogICAiU3ltYm9sIjogIkIiLAogICAiQXRvbWljTWFzcyI6ICIxMC44MTEiIAp9", | ||
"PartitionKey": "key" | ||
} | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
[ | ||
{ | ||
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogMSwKICAgIkVsZW1lbnQiOiAiSHlkcm9nZW4iLAogICAiU3ltYm9sIjogIkgiLAogICAiQXRvbWljTWFzcyI6IDEuMDA3Cn0=", | ||
"PartitionKey": "key" | ||
}, | ||
{ | ||
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogMiwKICAgIkVsZW1lbnQiOiAiSGVsaXVtIiwKICAgIlN5bWJvbCI6ICJIZSIsCiAgICJBdG9taWNNYXNzIjogNC4wMDIKfQ==", | ||
"PartitionKey": "key" | ||
}, | ||
{ | ||
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogMywKICAgIkVsZW1lbnQiOiAiTGl0aGl1bSIsCiAgICJTeW1ib2wiOiAiTGkiLAogICAiQXRvbWljTWFzcyI6IDYuOTQxCiB9", | ||
"PartitionKey": "key" | ||
} | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
aws kinesis put-records \ | ||
--stream-name DataActivityKinesisStream \ | ||
--records file://kinesis-producer/test-records-with-poison-pill.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
aws kinesis put-records \ | ||
--stream-name DataActivityKinesisStream \ | ||
--records file://kinesis-producer/test-records-without-poison-pill.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
AWSTemplateFormatVersion: '2010-09-09' | ||
Transform: 'AWS::Serverless-2016-10-31' | ||
Description: > | ||
AWS SAM Template for Serverless pattern | ||
Kinesis DataStream triggers Lambda with error handling and invalid record is put into SQS for further analysis | ||
AWS Services used: AWS Lambda, SQS, Kineis, CloudWatch, IAM | ||
Cloud9 is recommended as it comes with all the neceassry packages and cli to run this Serverless pattern | ||
|
||
Resources: | ||
# Create the Kinesis Data Stream | ||
DataActivityKinesisStream: | ||
Type: 'AWS::Kinesis::Stream' | ||
Properties: | ||
Name: DataActivityKinesisStream | ||
RetentionPeriodHours: 25 | ||
ShardCount: 1 | ||
Tags: | ||
- Key: AppName | ||
Value: serverless-pattern | ||
|
||
# Create the lambda function to consume the message from Kinesis Data Stream, process and then decide if it's. valid or not. | ||
KinesisDataStreamProcessor: | ||
Type: 'AWS::Serverless::Function' | ||
Properties: | ||
FunctionName: KinesisDataStreamProcessor | ||
CodeUri: kinesis-consumer/ | ||
Handler: kinesis-consumer-fn.lambda_handler | ||
Runtime: python3.9 | ||
MemorySize: 128 | ||
Timeout: 10 | ||
Tracing: Active | ||
Tags: | ||
AppName: serverless-pattern | ||
Environment: | ||
Variables: | ||
KINESIS_STREAM_NAME: !Ref DataActivityKinesisStream | ||
Policies: | ||
- SQSSendMessagePolicy: | ||
QueueName: !GetAtt AnomalyDataQueue.QueueName | ||
Events: | ||
DataActivityKinesisEvent: | ||
Type: Kinesis | ||
Properties: | ||
Stream: !GetAtt DataActivityKinesisStream.Arn | ||
BatchSize: 5 | ||
DestinationConfig: | ||
OnFailure: | ||
Type: SQS | ||
Destination: !GetAtt AnomalyDataQueue.Arn | ||
MaximumRetryAttempts: 5 | ||
StartingPosition: TRIM_HORIZON | ||
MaximumRecordAgeInSeconds: 3600 | ||
BisectBatchOnFunctionError: true | ||
# Refer to below link for other values for starting position | ||
# https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartingPosition.html | ||
|
||
# Create an SQS queue to put the failed messages | ||
AnomalyDataQueue: | ||
Type: 'AWS::SQS::Queue' | ||
Properties: | ||
QueueName: AnomalyDataQueue | ||
Tags: | ||
- Key: AppName | ||
Value: serverless-pattern | ||
|
||
# Create the cloudwatch log group to validate and verify the application run status | ||
EventMonitor: | ||
Type: 'AWS::Logs::LogGroup' | ||
Properties: | ||
LogGroupName: KinesisDataStreamProcessorLogs | ||
Tags: | ||
- Key: AppName | ||
Value: serverless-pattern | ||
|
||
Outputs: | ||
KinesisStreamName: | ||
Description: Kinesis Stream Name | ||
Value: !Ref DataActivityKinesisStream | ||
|
||
LambdaFunctionName: | ||
Description: Lambda Function Name | ||
Value: !Ref KinesisDataStreamProcessor | ||
|
||
SQSQueueName: | ||
Description: SQS Queue Name | ||
Value: !GetAtt AnomalyDataQueue.QueueName | ||
|
||
CloudWatchLogGroupName: | ||
Description: CloudWatch Group Name | ||
Value: !Ref EventMonitor |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# AWS CLI command to read the 10 messages from the AWS SQS queue | ||
aws sqs receive-message \ | ||
--queue-url https://sqs.us-east-1.amazonaws.com/AWSACCOUNTID/AnomalyDataQueue \ | ||
--attribute-names All \ | ||
--message-attribute-names All \ | ||
--max-number-of-messages 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
image not loading in the readme