Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis-Lambda Integration with error handling #1616

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cdk-kinesis-lambda-java/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.classpath.txt
target
.classpath
.project
.idea
.settings
.vscode
*.iml

# CDK asset staging directory
.cdk.staging
cdk.out

80 changes: 80 additions & 0 deletions cdk-kinesis-lambda-java/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Kinesis Data Streams with Lambda Integration

![architecture diagram](architecture.png)

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS CDK Toolkit](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured
* [Java 11+](https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html) installed
* [Docker](https://docs.docker.com/get-docker/) Installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:

```
git clone https://github.com/aws-samples/serverless-patterns
```
2. Change directory to the pattern directory:

```
cd serverless-patterns/cdk-kinesis-lambda-java
```
3. From the command line, use AWS CDK to deploy the AWS resources for the serverless application

```bash
cd infrastructure
```
4. From the command line, Synthesize the cdk stack to emits the synthesized CloudFormation template. Set up will make sure to build and package
the lambda functions residing in software directory.

```bash
cdk synth
```
5. From the command line, use AWS CDK to deploy the AWS resources.

```bash
cdk deploy
```
Alternatively infrastructure/deploy.sh can be used to build and deploy the stack

6. Note the outputs of CDK and copy the Kinesis Resource Name. Use the copied stream name in the producer.sh file which will be used in the next step to put records in to the stream created.

## How it works

This Kinesis-Lambda integration pattern makes use of the aws-kinesisstreams-lambda [Solution construct](https://docs.aws.amazon.com/solutions/latest/constructs/aws-kinesisstreams-lambda.html) to create the infrastructure.

Lambda get triggered based on the events from the Kinesis Data Stream. For any error in invocation of the lambda function events are persisted in the configured dead-letter SQS queue.

In the example the Kinesis Event Source is configured with `maxretryattempt` as 1, bisectBatchOnError set to true, and `reportBatchItemFailures` set to true with batch size of 3.

Lambda code has been updated to handle exception on any error due to event processing as per the best practice to return the sequence number. Using this configuration and approach duplicate message reprocessing can be avoided.

For more details on handling Success and Failure conditions in Kinesis data streams consumption, refer the [documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-errors).
## Testing
Update the producer.sh file with Kinesis stream name which got created. Update the number of messages to get published in to the stream by updating the number in loop as shown in the below statement

while [ $a -lt 24 ]

From the command line

```bash
cd ../software
cd KinesisCliProducers
sh producers.sh
```
This will publish the messages in the Kinesis stream and the lambda function gets triggered based on that.

## Cleanup

1. Delete the stack
```bash
cdk destroy
```
----
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
Binary file added cdk-kinesis-lambda-java/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
54 changes: 54 additions & 0 deletions cdk-kinesis-lambda-java/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"title": "Kinesis to Lambda with error handling",
"description": "Create a Java Lambda function with Event Source as Kinesis Data source",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"description": "Create a Java Lambda function with Event Source as Kinesis Data source",
"description": "A Java serverless example that connects Kinesis Data Streams to AWS Lambda. ",

"language": "Java",
"level": "200",
"framework": "CDK",
"introBox": {
"headline": "How it works",
"text": [
"This sample project demonstrates how to use AWS Lambda (Java runtime) to subscribe to events from a Kinesis Data Stream with error handling to avoid redundant message processing",
"This example uses AWS CDK constructs"
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/cdk-kinesis-lambda-java",
"templateURL": "serverless-patterns/cdk-kinesis-lambda-java",
"projectFolder": "cdk-kinesis-lambda-java",
"templateFile": "infrastructure/src/main/java/com/myorg/InfrastructureStack.java"
}
},
"resources": {
"bullets": [
{
"text": "AWS CDK Construct for Kinesis and Lambda",
"link": "https://docs.aws.amazon.com/solutions/latest/constructs/aws-kinesisstreams-lambda.html"
}
]
},
"deploy": {
"text": [
"cdk deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>cdk destroy</code>."
]
},
"authors": [
{
"name": "Shiva Mahalingam",
"image": "",
"bio": "Solutions Architect @ AWS",
"linkedin": "shivamahalingam",
"twitter": ""
}
]
}
13 changes: 13 additions & 0 deletions cdk-kinesis-lambda-java/infrastructure/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.classpath.txt
target
.classpath
.project
.idea
.settings
.vscode
*.iml

# CDK asset staging directory
.cdk.staging
cdk.out

5 changes: 5 additions & 0 deletions cdk-kinesis-lambda-java/infrastructure/cdk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"app": "mvn -e -q compile exec:java",
"context": {
}
}
6 changes: 6 additions & 0 deletions cdk-kinesis-lambda-java/infrastructure/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
cd ../software/KinesisLambdaClient
mvn clean package
cd ../../infrastructure
mvn clean compile
cdk synth
cdk deploy
77 changes: 77 additions & 0 deletions cdk-kinesis-lambda-java/infrastructure/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>infrastructure</artifactId>
<version>0.1</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<cdk.version>2.44.0</cdk.version>
<junit.version>5.9.1</junit.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<mainClass>com.myorg.InfrastructureApp</mainClass>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- AWS Cloud Development Kit -->
<dependency>
<groupId>software.amazon.awscdk</groupId>
<artifactId>aws-cdk-lib</artifactId>
<version>${cdk.version}</version>
</dependency>

<!-- Respective AWS Construct Libraries -->
<dependency>
<groupId>software.amazon.awscdk</groupId>
<artifactId>apigatewayv2-integrations-alpha</artifactId>
<version>2.44.0-alpha.0</version>
</dependency>
<dependency>
<groupId>software.constructs</groupId>
<artifactId>constructs</artifactId>
<version>10.1.120</version>
</dependency>
<dependency>
<groupId>software.amazon.awsconstructs</groupId>
<artifactId>kinesisstreamslambda</artifactId>
<version>2.42.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.23.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.myorg;

import software.amazon.awscdk.App;

public final class InfrastructureApp {
public static void main(final String[] args) {
App app = new App();

new InfrastructureStack(app, "KinesisDSLambdaStack");

app.synth();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.myorg;

import java.util.Arrays;
import java.util.List;

import software.amazon.awscdk.App;
import software.amazon.awscdk.BundlingOptions;
import software.amazon.awscdk.services.lambda.*;
import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.KinesisEventSourceProps;
import software.amazon.awsconstructs.services.kinesisstreamslambda.KinesisStreamsToLambda;
import software.amazon.awsconstructs.services.kinesisstreamslambda.KinesisStreamsToLambdaProps;
import software.constructs.Construct;
import software.amazon.awscdk.DockerVolume;
import software.amazon.awscdk.Duration;
import software.amazon.awscdk.Stack;
import software.amazon.awscdk.StackProps;
import software.amazon.awscdk.services.s3.assets.AssetOptions;

import static java.util.Collections.singletonList;
import static software.amazon.awscdk.BundlingOutput.ARCHIVED;

public class InfrastructureStack extends Stack {
public InfrastructureStack(final App parent, final String id) {
this(parent, id, null);
}

public InfrastructureStack(final Construct parent, final String id, final StackProps props) {
super(parent, id, props);

List<String> kinesisLambdaClientPackagingInstructions = Arrays.asList(
"/bin/sh",
"-c",
"cd KinesisLambdaClient " +
"&& mvn clean install " +
"&& cp /asset-input/KinesisLambdaClient/target/KinesisLambdaClient.jar /asset-output/"
);

BundlingOptions.Builder builderOptions = BundlingOptions.builder()
.command(kinesisLambdaClientPackagingInstructions)
.image(Runtime.JAVA_11.getBundlingImage())
.volumes(singletonList(
// Mount local .m2 repo to avoid download all the dependencies again inside the container
DockerVolume.builder()
.hostPath(System.getProperty("user.home") + "/.m2/")
.containerPath("/root/.m2/")
.build()
))
.user("root")
.outputType(ARCHIVED);

new KinesisStreamsToLambda(this, "KinesisToLambdaPattern", new KinesisStreamsToLambdaProps.Builder()
.kinesisEventSourceProps(new KinesisEventSourceProps.Builder()
.startingPosition(StartingPosition.TRIM_HORIZON)
.batchSize(3)
.maxBatchingWindow(Duration.seconds(20))
.maxRecordAge(Duration.seconds(3600))
.bisectBatchOnError(true)
.retryAttempts(1)
.reportBatchItemFailures(true)
.build())
.lambdaFunctionProps(new FunctionProps.Builder()
.runtime(Runtime.JAVA_11)
.code(Code.fromAsset("../software/", AssetOptions.builder()
.bundling(builderOptions
.command(kinesisLambdaClientPackagingInstructions)
.build())
.build()))
.handler("com.myorg.kinesis.client.App")
.memorySize(1024)
.functionName("KinesisLambdaClient")
.reservedConcurrentExecutions(1)
.timeout(Duration.seconds(10))
.build())
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.myorg;

import software.amazon.awscdk.App;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.IOException;

import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;

public class InfrastructureStackTest {
private final static ObjectMapper JSON =
new ObjectMapper().configure(SerializationFeature.INDENT_OUTPUT, true);

@Test
public void testStack() throws IOException {
App app = new App();
InfrastructureStack stack = new InfrastructureStack(app, "test");

JsonNode actual = JSON.valueToTree(app.synth().getStackArtifact(stack.getArtifactId()).getTemplate());

assertThat(actual.toString())
.contains("AWS::ApiGatewayV2::Api")
.contains("AWS::Lambda::Function");
}
}
15 changes: 15 additions & 0 deletions cdk-kinesis-lambda-java/software/KinesisCliProducers/producers.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
a=0
# -lt is less than operator

#Iterate the loop until a less than 10
while [ $a -lt 24 ]
do
# Print the values
aws kinesis put-record --stream-name LambdaPackagingStack-KinesisToLambdaPatternKinesisStreamFA60BE3F-ODRbhifDN9wS \
--data '{"user_id":"user1", "score": 100}' \
--partition-key $a
echo $a

# increment the value
a=`expr $a + 1`
done
Loading
Loading