Skip to content

Commit

Permalink
Merge pull request #1638 from marakere/marakere-feature-kinesis-lambd…
Browse files Browse the repository at this point in the history
…a-error-handling

New serverless pattern - kinesis-lambda-error-handling
  • Loading branch information
mavi888 authored Oct 9, 2023
2 parents 55222ef + f3cdafb commit 097e08a
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 0 deletions.
108 changes: 108 additions & 0 deletions kinesis-lambda-error-handling/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Effective consumer strategies for handling Kinesis Data Stream anomalies

The purpose of this pattern is to showcase on how to handle the consumer(AWS Lambda) failure when reading/processing the records from the Amazon Kinesis data stream.

A Kinesis data stream is a set of shards. Each shard contains a sequence of data records. A consumer is an application that processes the data from a Kinesis data stream. The event source mapping that reads records from the Kinesis stream, invokes AWS Lambda function synchronously, and retries on errors. If Lambda throttles the function or returns an error without invoking the function, Lambda retries until the records expire or exceed the maximum age that you configure on the event source mapping.

If the error handling measures fail, Lambda discards the records and continues processing batches from the stream. With the default settings, this means that a bad record can block processing on the affected shard for up to one week. To avoid this, we are going to configure function's event source mapping in this pattern with a reasonable number of retries and a maximum record age.

To retain a record of discarded batches, we are going to configure a failed-event destination. Lambda sends the failed record to the destination - AWS SQS.

Learn more about this pattern at [Serverless Land Patterns](https://serverlessland.com/patterns/kinesisds-lambda-error-handling).

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Installed
* [AWS SAM](https://aws.amazon.com/serverless/sam/) The AWS Serverless Application Model (SAM) is an open-source framework for building serverless applications.

* [AWS Cloud9](https://aws.amazon.com/cloud9/) Other alternative is to use Cloud IDE which comes with prepackaged programming languages, CLIs needed for this project.

## Deployment Instructions

1. Clone the project to your local working directory

```sh
git clone https://github.com/aws-samples/serverless-patterns/
```

2. Change the working directory to this pattern's directory

```sh
cd serverless-patterns/kinesis-lambda-error-handling
```
3. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yml file:
```
sam build
sam deploy --guided
```
4. During the prompts:

- Enter a stack name
- Enter the desired AWS Region
- Allow SAM CLI to create IAM roles with the required permissions.

Once you have run `sam deploy --guided` mode and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in the future to use these defaults.
Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing.

## How it works
![Reference Architecture](./images/kinesis-lambda-error-handling.jpg)

The pattern builds infrastructure with Amazon Kinesis data stream, a consumer Lambda function, SQS queue to load the failed records for further troubleshooting, and cloudwatch to validate the logs for the success, failure with retries.

The event source mapping that reads records from your Kinesis stream, invokes AWS Lambda function synchronously, and retries on errors. If Lambda throttles the function or returns an error without invoking the function, Lambda retries until the records expire or exceed the maximum age that you configure on the event source mapping.

## Testing
To test the pattern, we are breaking the test cases into two scenarios.

### Scenario 1: Put messages without the Poision pill
- test-records-without-poison-pill.json --> Holds the base64 data with partition key
- without-poison-pill-put-records.sh --> bash script to put records into Kinesis data stream

```
chmod +x kinesis-producer/*.sh;
./kinesis-producer/without-poison-pill-put-records.sh
```
- In the Cloudwatch logs, you should see all the 3 messages processed without any exception.

### Scenario 2: Put messages with the Poision pill
- test-records-with-poison-pill.json --> Holds the base64 data with partition key
- with-poison-pill-put-records.sh --> bash script to put records into Kinesis data stream

```
./kinesis-producer/with-poison-pill-put-records.sh
```

- In the Cloudwatch logs, you should see poison message and Lambda splitting the batch in half to resume each half separately. Finally placing the posion message to AWS SQS for further troubleshooting.

## Validation
After the Scenario 2, the invalid/poison pill message is put in the SQS queue for the further research.
Replace the AWSACCOUNTID with the AWS account number in the validation-scripts/read-sqs-queue.sh

```
vi validation-scripts/read-sqs-queue.sh
```
Run the script as below to see the message details.
```
chmod +x validation-scripts/read-sqs-queue.sh;
./validation-scripts/read-sqs-queue.sh
```

We highly recommend to use [Amazon Kinesis Data Generator(KDG)](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) for the high volume testing. The KDG makes it simple to send test data to your Amazon Kinesis stream or Amazon Kinesis Firehose delivery stream.

## Cleanup
```
sam delete
```

## Reference
- [AWS SAM](https://aws.amazon.com/serverless/sam/)
- [AWS Lambda with Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html)
- [Lambda event source mappaings](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html)
- [Amazon Kinesis Data Generator](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/)
----
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
62 changes: 62 additions & 0 deletions kinesis-lambda-error-handling/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"title": "Effective consumer strategies for handling Kinesis Data Stream anomalies",
"description": "To showcase on how to handle the consumer(AWS Lambda) failure when reading the records from the Amazon Kinesis data stream.",
"language": "Python",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"The purpose of this pattern is to deploy the infrastructure to showcase on how handle the consumer(AWS Lambda) failure when reading the records from the Amazon Kinesis data stream.",
"If the error handling measures fail, Lambda discards the records and continues processing batches from the stream. With the default settings, this means that a bad record can block processing on the affected shard for up to one week. To avoid this, in this project we are going to configure function's event source mapping with a reasonable number of retries and a maximum record age.",
"To retain a record of discarded batches, we are going to configure a failed-event destination. Lambda sends the record to the destination - AWS SQS."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/kinesis-lambda-error-handling",
"templateURL": "serverless-patterns/kinesis-lambda-error-handling",
"projectFolder": "kinesis-lambda-error-handling",
"templateFile": "kinesis-lambda-error-handling/template.yaml"
}
},
"resources": {
"bullets": [{
"text": "AWS SAM",
"link": "https://aws.amazon.com/serverless/sam/"
},
{
"text": "AWS Lambda with Amazon Kinesis",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html"
},
{
"text": "Lambda event source mappings",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html"
},
{
"text": "Amazon Kinesis Data Generator",
"link": "https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/"
}
]
},
"deploy": {
"text": [
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the README in the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": ["Delete the stack: <code>sam delete</code>."]
},
"authors": [
{
"name": "Manjunath Arakere",
"image": "https://d2908q01vomqb2.cloudfront.net/22d200f8670dbdb3e253a90eee5098477c95c23d/2023/07/06/marakere.jpg",
"bio": "Senior Solutions Architect @ AWS. Serverless enthusiast."
}
]
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import boto3
import logging
import os
import base64
import json

# Initialize AWS SDK clients Kinesis
kinesis_client = boto3.client('kinesis')

# Environment variables for lambda function read message from Kinesis data Stream
KINESIS_STREAM_NAME = os.environ['KINESIS_STREAM_NAME']

# Initialize and configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Entry point to the Lambda function
def lambda_handler(event, context):
# Record/Data should match the below Keys and the data type
required_fields = {"AtomicNumber": int, "Element": str, "Symbol": str, "AtomicMass": float}
logger.info(f"Incoming event: --> {event}")

# Variable to print the Unique sequence of each record
curRecordSequenceNumber = ""

# Loop through the Records to read each record
for record in event['Records']:
curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
logger.info(f"Sequence Number of the current record --> {curRecordSequenceNumber}")

# Convert the base64 data into utf before validating for the schema
payload = json.loads(base64.b64decode(record['kinesis']['data']).decode('utf-8'))
logger.info(f"Individual record content --> {payload}")
if not isinstance(payload, dict):
logger.info("Invalid JSON Data Structure.The parsed data does not adhere to the expeced JSON data structure.")
raise ValueError("Invalid JSON Data Structure",
"The parsed data does not adhere to the expeced JSON data structure.")

# Verify if the key, value are as per expectations
for key, value_type in required_fields.items():
if key not in payload:
logger.info(f"Missing '{key}' field in JSON.")
raise ValueError(f"Missing '{key}' field in JSON.")
if not isinstance(payload[key], value_type):
logger.info(f"'{key}' field should be of type {value_type.__name__}.")
raise ValueError(f"'{key}' field should be of type {value_type.__name__}.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogNCwKICAgIkVsZW1lbnQiOiAiQmVyeWxsaXVtIiwKICAgIlN5bWJvbCI6ICJCZSIsCiAgICJBdG9taWNNYXNzIjogOS4wMTIKfQ==",
"PartitionKey": "key"
},
{
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogNSwKICAgIkVsZW1lbnQiOiAiQm9yb24iLAogICAiU3ltYm9sIjogIkIiLAogICAiQXRvbWljTWFzcyI6ICIxMC44MTEiIAp9",
"PartitionKey": "key"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogMSwKICAgIkVsZW1lbnQiOiAiSHlkcm9nZW4iLAogICAiU3ltYm9sIjogIkgiLAogICAiQXRvbWljTWFzcyI6IDEuMDA3Cn0=",
"PartitionKey": "key"
},
{
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogMiwKICAgIkVsZW1lbnQiOiAiSGVsaXVtIiwKICAgIlN5bWJvbCI6ICJIZSIsCiAgICJBdG9taWNNYXNzIjogNC4wMDIKfQ==",
"PartitionKey": "key"
},
{
"Data": "ewogICAiQXRvbWljTnVtYmVyIjogMywKICAgIkVsZW1lbnQiOiAiTGl0aGl1bSIsCiAgICJTeW1ib2wiOiAiTGkiLAogICAiQXRvbWljTWFzcyI6IDYuOTQxCiB9",
"PartitionKey": "key"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
aws kinesis put-records \
--stream-name DataActivityKinesisStream \
--records file://kinesis-producer/test-records-with-poison-pill.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
aws kinesis put-records \
--stream-name DataActivityKinesisStream \
--records file://kinesis-producer/test-records-without-poison-pill.json
90 changes: 90 additions & 0 deletions kinesis-lambda-error-handling/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: >
AWS SAM Template for Serverless pattern
Kinesis DataStream triggers Lambda with error handling and invalid record is put into SQS for further analysis
AWS Services used: AWS Lambda, SQS, Kineis, CloudWatch, IAM
Cloud9 is recommended as it comes with all the neceassry packages and cli to run this Serverless pattern
Resources:
# Create the Kinesis Data Stream
DataActivityKinesisStream:
Type: 'AWS::Kinesis::Stream'
Properties:
Name: DataActivityKinesisStream
RetentionPeriodHours: 25
ShardCount: 1
Tags:
- Key: AppName
Value: serverless-pattern

# Create the lambda function to consume the message from Kinesis Data Stream, process and then decide if it's. valid or not.
KinesisDataStreamProcessor:
Type: 'AWS::Serverless::Function'
Properties:
FunctionName: KinesisDataStreamProcessor
CodeUri: kinesis-consumer/
Handler: kinesis-consumer-fn.lambda_handler
Runtime: python3.9
MemorySize: 128
Timeout: 10
Tracing: Active
Tags:
AppName: serverless-pattern
Environment:
Variables:
KINESIS_STREAM_NAME: !Ref DataActivityKinesisStream
Policies:
- SQSSendMessagePolicy:
QueueName: !GetAtt AnomalyDataQueue.QueueName
Events:
DataActivityKinesisEvent:
Type: Kinesis
Properties:
Stream: !GetAtt DataActivityKinesisStream.Arn
BatchSize: 5
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt AnomalyDataQueue.Arn
MaximumRetryAttempts: 5
StartingPosition: TRIM_HORIZON
MaximumRecordAgeInSeconds: 3600
BisectBatchOnFunctionError: true
# Refer to below link for other values for starting position
# https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartingPosition.html

# Create an SQS queue to put the failed messages
AnomalyDataQueue:
Type: 'AWS::SQS::Queue'
Properties:
QueueName: AnomalyDataQueue
Tags:
- Key: AppName
Value: serverless-pattern

# Create the cloudwatch log group to validate and verify the application run status
EventMonitor:
Type: 'AWS::Logs::LogGroup'
Properties:
LogGroupName: KinesisDataStreamProcessorLogs
Tags:
- Key: AppName
Value: serverless-pattern

Outputs:
KinesisStreamName:
Description: Kinesis Stream Name
Value: !Ref DataActivityKinesisStream

LambdaFunctionName:
Description: Lambda Function Name
Value: !Ref KinesisDataStreamProcessor

SQSQueueName:
Description: SQS Queue Name
Value: !GetAtt AnomalyDataQueue.QueueName

CloudWatchLogGroupName:
Description: CloudWatch Group Name
Value: !Ref EventMonitor
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# AWS CLI command to read the 10 messages from the AWS SQS queue
aws sqs receive-message \
--queue-url https://sqs.us-east-1.amazonaws.com/AWSACCOUNTID/AnomalyDataQueue \
--attribute-names All \
--message-attribute-names All \
--max-number-of-messages 10

0 comments on commit 097e08a

Please sign in to comment.