From fb39298504ad978b1ebd814a7389bb582c36aee7 Mon Sep 17 00:00:00 2001 From: Manju Date: Wed, 6 Sep 2023 11:12:56 -0400 Subject: [PATCH 1/4] new serverless pattern for Effective consumer strategies for handling Kinesis Data Stream anomalies --- kinesis-lambda-error-handling/README.md | 104 ++++++++++++++++++ .../example-pattern.json | 63 +++++++++++ .../images/kinesis-lambda-error-handling.jpg | Bin 0 -> 19998 bytes .../kinesis-consumer/kinesis-consumer-fn.py | 46 ++++++++ .../test-records-with-poison-pill.json | 10 ++ .../test-records-without-poison-pill.json | 14 +++ .../with-poison-pill-put-records.sh | 3 + .../without-poison-pill-put-records.sh | 3 + kinesis-lambda-error-handling/template.yaml | 90 +++++++++++++++ .../validation-scripts/read-sqs-queue.sh | 6 + 10 files changed, 339 insertions(+) create mode 100644 kinesis-lambda-error-handling/README.md create mode 100644 kinesis-lambda-error-handling/example-pattern.json create mode 100644 kinesis-lambda-error-handling/images/kinesis-lambda-error-handling.jpg create mode 100644 kinesis-lambda-error-handling/kinesis-consumer/kinesis-consumer-fn.py create mode 100644 kinesis-lambda-error-handling/kinesis-producer/test-records-with-poison-pill.json create mode 100644 kinesis-lambda-error-handling/kinesis-producer/test-records-without-poison-pill.json create mode 100644 kinesis-lambda-error-handling/kinesis-producer/with-poison-pill-put-records.sh create mode 100644 kinesis-lambda-error-handling/kinesis-producer/without-poison-pill-put-records.sh create mode 100644 kinesis-lambda-error-handling/template.yaml create mode 100644 kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh diff --git a/kinesis-lambda-error-handling/README.md b/kinesis-lambda-error-handling/README.md new file mode 100644 index 000000000..37b2b5bc2 --- /dev/null +++ b/kinesis-lambda-error-handling/README.md @@ -0,0 +1,104 @@ +# Effective consumer strategies for handling Kinesis Data Stream anomalies + +The purpose of this pattern is to showcase on how to handle the consumer(AWS Lambda) failure when reading/processing the records from the Amazon Kinesis data stream. + +A Kinesis data stream is a set of shards. Each shard contains a sequence of data records. A consumer is an application that processes the data from a Kinesis data stream. The event source mapping that reads records from the Kinesis stream, invokes AWS Lambda function synchronously, and retries on errors. If Lambda throttles the function or returns an error without invoking the function, Lambda retries until the records expire or exceed the maximum age that you configure on the event source mapping. + +If the error handling measures fail, Lambda discards the records and continues processing batches from the stream. With the default settings, this means that a bad record can block processing on the affected shard for up to one week. To avoid this, we are going to configure function's event source mapping in this pattern with a reasonable number of retries and a maximum record age. + +To retain a record of discarded batches, we are going to configure a failed-event destination. Lambda sends the failed record to the destination - AWS SQS. + +Learn more about this pattern at [Serverless Land Patterns](https://serverlessland.com/patterns/kinesisds-lambda-error-handling). + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Installed +* [AWS SAM](https://aws.amazon.com/serverless/sam/) The AWS Serverless Application Model (SAM) is an open-source framework for building serverless applications. + +* [AWS Cloud9](https://aws.amazon.com/cloud9/) Other alternative is to use Cloud IDE which comes with prepackaged programming languages, CLIs needed for this project. + +## Deployment Instructions + +1. Clone the project to your local working directory + + ```sh + git clone https://github.com/aws-samples/serverless-patterns/ + ``` + +2. Change the working directory to this pattern's directory + + ```sh + cd serverless-patterns/kinesis-lambda-error-handling + ``` +3. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yml file: + ``` + sam build + sam deploy --guided + ``` +4. During the prompts: + + - Enter a stack name + - Enter the desired AWS Region + - Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in the future to use these defaults. + Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works +![Reference Architecture](/images/kinesis-lambda-error-handling.jpg) + +The pattern builds infrastructure with Amazon Kinesis data stream, a consumer Lambda function, SQS queue to load the failed records for further troubleshooting, and cloudwatch to validate the logs for the success, failure with retries. + +The event source mapping that reads records from your Kinesis stream, invokes AWS Lambda function synchronously, and retries on errors. If Lambda throttles the function or returns an error without invoking the function, Lambda retries until the records expire or exceed the maximum age that you configure on the event source mapping. + +## Testing +To test the pattern, we are breaking the test cases into two scenarios. + +### Scenario 1: Put messages without the Poision pill +- test-records-without-poison-pill.json --> Holds the base64 data with partition key +- without-poison-pill-put-records.sh --> bash script to put records into Kinesis data stream + +``` +chmod +x kinesis-producer/*.sh; +./kinesis-producer/without-poison-pill-put-records.sh +``` + +- Navigate to AWS Console, and then to Cloudwatch, Log groups, and select the log group KinesisDataStreamProcessorLog and the latest Log stream +- In the logs, you should see all the 3 messages processed without any exception. + +### Scenario 2: Put messages with the Poision pill +- test-records-with-poison-pill.json --> Holds the base64 data with partition key +- with-poison-pill-put-records.sh --> bash script to put records into Kinesis data stream + +``` +./kinesis-producer/with-poison-pill-put-records.sh +``` + +- Cloudwatch logs shows that there was one invalid or poison message, so here the bisectonfailure was applied which lead Lambda to split the batch in half and resume each half separately. Maximum retry attempts and maximum record age limit the number of retries on a failed batch. As the retry limit is 5, it will retry 5 times, before the message is put to AWS SQS. + +## Validation +After the Scenario 2, the invalid/poison pill message is put in the SQS queue for the further research. +Replace the account number in the validation-scripts/read-sqs-queue.sh and run the script as below to see the message details. + +``` +chmod +x validation-scripts/read-sqs-queue.sh; +./validation-scripts/read-sqs-queue.sh +``` + +We highly recommend to use [Amazon Kinesis Data Generator(KDG)](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) for the high volume testing. The KDG makes it simple to send test data to your Amazon Kinesis stream or Amazon Kinesis Firehose delivery stream. + +## Cleanup +sam delete + +## Reference +- [AWS SAM](https://aws.amazon.com/serverless/sam/) +- [AWS Lambda with Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html) +- [Lambda event source mappaings](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) +- [Amazon Kinesis Data Generator](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) +---- +Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/kinesis-lambda-error-handling/example-pattern.json b/kinesis-lambda-error-handling/example-pattern.json new file mode 100644 index 000000000..6dd3dec67 --- /dev/null +++ b/kinesis-lambda-error-handling/example-pattern.json @@ -0,0 +1,63 @@ +{ + "title": "Effective consumer strategies for handling Kinesis Data Stream anomalies", + "description": "To showcase on how to handle the consumer(AWS Lambda) failure when reading the records from the Amazon Kinesis data stream.", + "language": "", + "level": "200", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "The purpose of this pattern is to deploy the infrastructure to showcase on how handle the consumer(AWS Lambda) failure when reading the records from the Amazon Kinesis data stream.", + "If the error handling measures fail, Lambda discards the records and continues processing batches from the stream. With the default settings, this means that a bad record can block processing on the affected shard for up to one week. To avoid this, in this project we are going to configure function's event source mapping with a reasonable number of retries and a maximum record age.", + "To retain a record of discarded batches, we are going to configure a failed-event destination. Lambda sends the record to the destination - AWS SQS." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/kinesis-lambda-error-handling", + "templateURL": "serverless-patterns/kinesis-lambda-error-handling", + "projectFolder": "kinesis-lambda-error-handling", + "templateFile": "kinesis-lambda-error-handling/template.yaml" + } + }, + "resources": { + "bullets": [{ + "text": "AWS SAM", + "link": "https://aws.amazon.com/serverless/sam/" + }, + { + "text": "AWS Lambda with Amazon Kinesis", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html" + }, + { + "text": "Lambda event source mappings", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html" + }, + { + "text": "Amazon Kinesis Data Generator", + "link": "https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the README in the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": ["Delete the stack: sam delete."] + }, + "authors": [ + { + "name": "Manjunath Arakere", + "image": "https://d2908q01vomqb2.cloudfront.net/22d200f8670dbdb3e253a90eee5098477c95c23d/2023/07/06/marakere.jpg", + "bio": "Senior Solutions Architect @ AWS. Serverless enthusiast." + } + ] +} \ No newline at end of file diff --git a/kinesis-lambda-error-handling/images/kinesis-lambda-error-handling.jpg b/kinesis-lambda-error-handling/images/kinesis-lambda-error-handling.jpg new file mode 100644 index 0000000000000000000000000000000000000000..16622535a6ebafc0825fa95bae98908fc098ed4b GIT binary patch literal 19998 zcmeIZ1yo$kvM@SH2*EwLB*5Sj2KNM?!NM@O1$TD>A%WmNgKLn%1_|y-aEGA5U4n$5 zAy{~md+zzZtatDE*Z@b!*n_sp+n+s@=P)cXfC5?ab{8fC!=hRsh_&0|4AX ze*m{@cV@t{vL?@=>Iz^L`F~XO0nmW)1ORY!@^FJH$~@B5(|`0}8+U_2Wzo0=8fUZq12_8vH+OOSO&^V>k#KbI{4MJ@{MH!H(n(7b{fmYE(gNH8 zP=F#p=J)-h|D(Y<4*(E30|4&D|6OL51^_gK005M8f0r@80{{r$0s!@+f0zB;CoZOL zrvJeX1AV<~Wd#5n7XbkHdH?|FX8-`l;6Linmw%z#BXkulS}$kx#~R=Oumn5;fB{Yb za{vz-0|A}@_y9t;^8i`Ey}Q5R_w^ncFfcKG0~RLceN1dD92{&cY-}7nLOdK?0$glt zd}4e8LLwq!A{;ytQW7FkG*0xJ$(`RV?_pq}8xrASj zAd83%qHsf2)6Corp4WbeW>5QDI9k#_MgN06CdT~-Sm@ek#DKeZ?qb}l7eotlOQ;@WGRr*_ayJj^SiPMC;GucnCB8=t zkOs`90zRP-SDHR;jdiN`x^%36)QQC+ewa91@$~I`Pft&1Ry`~!FUc}8?{P(JQiO=Y zR>`zApKnH(OyMUC@(x4r+g z&edj6T(Mo*IMR=pTV*f2a2Z%@0pVA6Ll)7TdoORW8HyLErw_XCoqB6VtKo}-cELyT zlo;D#9uX&cP64lCcU@K&8U3dXa!m4@GX<)hG+wLucq`=yCU1ZFdQe0_H_ke2*>rX( zUec*d!S;z&;%|0g1UsZ#)eDz5bd#&dsKghf*?$oK!GFAa;0gBVAHx|E3m=DN2{F|8eZ=FMv zdH&}H|1yaGd4m7x0{_1?(b#%lS7*sb3-#981~}{`FvDF<)^IN)efh2~{@JnpXUHyr zjq-v@Ee&jqf(11&>&M5)7^N}*pyZJ)ecQXaMry6aCJ$3p;yb)0rBu-#a(5qpM1aE2 zNoze_)vE_p_qZ|a^#}dna3(>b#ZTi%Yv{9yW|gJHCd0zi@C?{QzngJPe4S8I7pN&J z%0Q!>n}?lyPiVZ~#&i>C1BUa5uqK7HEk-}Q;_=dK!#7A7wIZSN?1E{+dkSNkcQ?ff zHq6=Iz;LmjN!T$(tSSjmt;)^S7hQEdhVb%g*xaar*)LkF>D|=QIkR`f_IrXINaVBm4<`M44b_v7+6Y} z_mmT!bHh!uTxsM)LO^gv5EQHKgBQ%aJlM@qPHs50O@~EcLEfC`vM{Sc1&P3(u!%Hg zGNT~xl8^!8T7FS|zDY8$#K`=jug?vUeEvkBo`3X*#Vugg>fX=E&XWQQRcAMwD5nVi zy1J)BC4v~!^k5!g8vP$fmDX0vK8Pu%DTxwYu(@x1uk$S87GRl5xUNY!lCZRFZ{p@$ z7bslBY@gC!0L3wyb&&AbG9X%GNF7qw>kAhYTDPnVb)pMm6+edL=-F*VyA&cyzxQo) z*WEX3&@e`sgZp=9Z{%T&IG7>fRwS?1R!#VWJ*Vef3wGi7YW6IT5ljw=ij_eU2??*J z@Ru6M${Z{{1j6`mt}&{qoh}@V^jOFrpM-^Ju(N}pnLaUCjx6>X$ILw<9pe-@TZVS* zbMnQH7J9Z2+_eDLcZcQLj-ChiW^2b=n+Oq;%8xuY6>#|Jgkm8~o zOGPK;1V$9W2qODe)HWN1kiz(gTB=-eMQ!y8Ms{rH8is*dw%#taevO zSvb3w;=WX>ZKl6Owh*Hcx38RnYPUlDcQ^NuB&~zdbs;S4Z~xUtqI0S)ORJ+N?70yhe?*A6(KK4OJ@4m=`v%%y1%bDlfbmK6b)%|)3QzbVj9lDWzn?hFOYzAHL;>2szd z=sQD+P3vG!(959#bi!d<&HY^0+U{UJC)3Z8C}xT1w`c~}0*4D6>ol0M#_AU4oOSem z>Iq~K!5f=`%M~8a=Kc_nEk|0+f8(RTMY!}$+h`D@&2CgXhee?V#37ca2yX9GoFrb1 z?6UBuEl=7{ZnH@(`rL`tfi+Z@<%->Zz_l?H9^GsAq^B-_Ij$+faBW$epJSFnDm}Gb zb#*hL;uF7G9Wh4W&N#x9s~+rbGn>Yi1r)Iin&ujLmQYP~jl z4KL6d3)r@ER@F?;__<}vM@N&@U>a-Ej0Y~whq@^{Kr%fq0NK}qWC4$|O0(wTBJG=W zm9zaG)!S(N^Z`B8&D+YN7WD{l_jlO(e+saNuMes9cFO6#ku_?(-TLK&&viN-`yc zA1O$VR_k>g7W7A{+L^Nbq<@sh-yK2YKP5Z5Oq6NWr142K_Mz4SZMa$}AHsd1k=zzQ)3L z8NzwmojCe>4&g(SHH_btF(t(c5>(J@pLMrXXzGzIr*Xpu9s2RRquMAd{?u=#_TZx8v@R?5{Fra%7*|8?pZ403z^Dx!I&Y{1*Cb6n+b6 z&0l%~BA^DDlZ1=r!#t)>u%G(9WBR2pxLP*8+yWRG9{A|TsqlFR z^uO%!`SDL)1b^NA9j`aFloB8_Y0Nh7gEW7t@oQQ1GyL!GjKYTI>0y8PHTBtYv&4T= zIy`Qm@vUv1h|nj8Lt%)1wz&AdYYV9`BQ+mM&RYP~{FrE8u~{RdM2$b7gnxiTQEtKO z1prg#^?!X@cY0$|pDOFKZt;CSEHBZ2+pq{Bv1`u;Sq}zDAJg*8vpcg}n)o{}F<-vO zE>De+u6-QH7fm#W4cRXAP+a+y0_E#TSxg5w< zrf6iju|Diy0~g-{K3Q)ighesv=FFB}_frqe#l83`iKM>JW{kdos7mukDmuQ_K{5}F zB-GaSB(zKsTINDRGkZjZgZvwEx6H_#ELqp5pozhk6E{7tg5~?RtoS28ESGQTy{afQ zw>-3S?NF#yDDn&9zl0)l9$zm#yUs(|X`hS}35D*c5OZfKlU5^3g+Ug<62vlH45#YY zj5;h>O?cr`qNw8e5R1~ER;=-r5UcoDG8l#d0ZIH;UdtDKJtt5<@yelPUmnwf5Sv~1 zi%rCuar#&cVm~>D#dXm|KSkgm$23dw`L5;Y&6)>7mnu5e@*!)+vld($jNR05(~gb| zSJn__^nlRk>b3UcbZN$x8)sI{{L$BzR234PC*c}`T$$V=5NOS)&+5Svc;uRV0t235 zh)d`3Eib}Tl9O~M6rrckL17{LB8s)F)*{|Qu6&?Xlo-l48t!=z>dqZXI3QP=jcT%X}>Pq)@J_JEs7VyK#9b?*3LF z%*v&s{4Pbgj+S(%-i=0je!HLd*`m5Ypq-e6^TZRtJ#~@)&cDLNR4TQ1q)cXVZvhKt znzdx#Sihh=O?da8-vSc5GHlP{;Fkt!7YE$8fQ^*eOWFKDnCVC6fRHc+%RM6&+$%E0 zt8kA@f1^OIyud@fTfl2~0mrmYg9p)^2&|sm&y7lNflg&`gm$4TVzEy_RB}X%H3g== z78^J~w}ct%tizN`Td1iDL!snqPhh7@!`}1bo+3?~wKaC5plE%Uq9T~{JoLxPRfVVF zA42?WPc1ALYHu3kn^PlW9CEV-W!e2vrbZ`rqj7B@rpvy;E{W-hJ?eVa z`;`#+2zA<=`a{xRQU(?Z=y1TGgUy8uhXlyU~T6wjUo@2JWi7 zlA(*=v#oppG(qEO1bqQnp_f>`Deo`rK8_EL+`I!|x}(?q{tL*~_G_0XMoQP3O7lhE zpT|3!c&1-3!}e?Z?RAgfRXx?CMh3-evKx8cE8u>_@H~M^Gl98Ctd$=7>xh`>aP9mj zhu_$H4Rt;z7DNyI%6t5CV%)Y(OFXE^;)z;!?-0E*7idmp7$Qx7Aj8<7c1%p1cJTOB zsok!kLO2DRxWf?F$cM?v%*hdfFdTdc)3h07^r)76CKVMiIj(`8K?2vp3D>l)etd_i z>9kJPT+BY)a8r2p;f~`W_=m@#7@ro&#Y^{$G%d|Z0n)x{`w%7~=SKg+#Ck+~F8 zu#JN&wt~Qtf;ZAm=q~$Ln^2gt?UuHUKAB-?68W)oS;%o{vq1OG8><-;`Glctrm%#m zQ22hTBFwUQqPIm4vwW6+Is1z03uKsbcX}9Pb5^%`6sbI5$05krP<>>TVg~`uO386L z(3sBYOj=wnXd!T9^227kKQ-F|Q-%U+E@IzZ+iJWR>|K+fqN2#2$F8cVK*d{Z#GgNLL<4qaFmEgwthWWAbs4Ir{z zQFGpYJwB>x^W6s75&h>Y$mT*RtS|rff{2;}!|MrYReJ`TLdNJnUqO}|)#8vR035ig zM}njQg;i%37jLr6e<%^D7+uR=EJFFhQ&^FkR;ajmXRuo9$VLR@?DQUX7Y{^*-bX&^8Uz z%4>q)vu;nYWSaO}T$Q;2OHgXUC$yafr+Q7QNRNP*d<`}7Ou8%9ee=)u(xW4ok5Fxl z{EX9@VM}S8I?Pr37BHa{@@9O(d5&h1ilQ_oS*$myf>ij`O#LMKftYdRTTLJzey+4b z=5+qqsJLR7#N|dmZMY0X7tasflUbBj)Aj(PS=R*PghKiWYCdY5(5ttD9CBHr7+W z1=H#-%yy|85BJXSWaBc&7S=}6Umq9za^?;9PL32-fut*?r{iphG8WM|Bx*+H{+R3- z%d}@+a_p{5af_}~Ia;;|*L-x4^>p-@e&6pdNK`>d(V|b^5%Jv};Y$DnA~qL;ZUHaW zr-~0XJxT;Uhc!|9{pY>%O1(o`Z3KhY!q*_++^^6vLc1E*y%~jm(IGifCySHQ+?-p0 zG1HQSdArNaf5EW(!rNJ2K}06Bi#s_*PCcBU`Ej>cnYWQ**EIzlQJ-yWN#6 zB`fS3=CSUw{52Un%GIrt&60@JY6RDQL{YI?b{n9gn!o9&Qn8Qdr_j>}qUS#*SJGc9 zb+s&i*fW&!d)P0_42CeivxC5;0`5TN1xfD?vZ=(Tr;Zo_9r2FqS9E=C)Nr@iB1GG* zS>GfFj8E_N52D5-wWc~p{XEYWP0*o&h{$N6B@6@AtiR|h>gmT?+T19PEdPQ?(aG~e zp+eueLW;3s^K4(_c52ORZX=yCG6a;q zPWprL!5<9nmfqF+gOcuV2A^>L4WjTATRzrqS>{*76OJdvP)L?Ib{!K(6x6e@WX2Y4 zSJ#$J&JXRg;Jr_s9$$wDPys)jbpR#m9?JpV*+N>4!h?Qw;$NsJ)d$Zll^(0RaTJR! zJ;N3bO&Q*NUWF$pm@Lg30a1zDuD&FxzDx>+MB#u-SRm4OY8kjs3$sqjG=E*ZRL{Q@ zK2xAMR-AFG557!dm&=E!^aGQqC>~qW+yc7#T!n&xTo>cyJd5@@|dTkJMkF~At zU~&1;_{&x+3(<=$YM*q0qZ;iJh8-=V=G6}FUp6S;wGnc4C>f_Z_5(k+Ur6Db>aCPb z#0_(#ku?b_%VzW8I968XxB@h-{kU;MEV`a2XA13Nz_M!@$yzep2U<@d_OgZB#dH0o zJp`(hvuP_aot^f*;}A*1oX6aXYxHU@Nb_R-dP~R<$o`Gbcu1PL-Pm`T!COEHi{dX4 zN_TzrC!Nq?(f*%F(gHe>w}2lpt;bM)%kmdhf}k(`K8M%U{yM*^KjwDr{*J7S-Z^4^ zoT6K;rZx`{W|!XQdX=td#WWtk$VO#Rh+{yizA)8>O)yWuLH}iP44)kgW*#PJ02GL1 zCU52{B$4Aq_x1Ji|KTnHFGDXW|Bl0u|Js?%iM8%<93(dws~H(b>{e6|?AUv5Y)*a6 zF5O+Z|1X=pe)ycE;t$F{7y!^+*YdZw2LGYeoqvOrf`@r+IoOhDBv)DHbDdaXb}8J}W%BK7Ha`bW?H+lCx#u~lJ#9iA z-y|m@!^Te(u~F3=97RReBqeVtusXY+i4yZVT2-b`w%0nq0**`x9I`a*sSN}QCN=HZ zoyNR)$nb%rAQ%Qoh8K?*5PMqx@f^6sFJxU~1#Z|l;^Nc38OHS0Iao!Di$Y=7%)Uj2 zG{d<`0Z87jx=}weuXUdAHC2&Apo9d+6Yk65MRy*b?|LRY0H0h!NI@Mtj>3b-;^bNz z!^5=qDvGDqrt_hR`4VYq*(dzclznz~X=8%%9( zuE1WjM;ot<~?~5OM^%kl3qT(2UX~P!oA`pmU{2*iHi58#kf_oBs8+U1heDRJS6B&|uVnPV+?Kb6+cEPDwdMvd#P@%;b zYNzM#u54&PGBGFjw460qW}bUjx=4pbcS*i+SPDjIULK$ZBfgNmt8NQLvVPZ!%lf%x~WP< z;TumYFr%kiJ+^$TCKof}V{gmVdz?+s^}0~^R5`7H?U%RuuE`CU>jFsn=jmQ8app(9 zhAT?7-i>^du(eg#`)(DbO&8o~+lxH>y6&>7;yLPz%}O|!MCqJCMFavPX5?W)dE2t4~~_U67BZSS8;vNH9q+D!RyaM+P1 zE((onczGwjI9NnXPE9&B8F8lZzr#8EdRg{s-1;UY3G${+&GmP-(cE` zk>o8PVXbxVM;AKEF23`}`aRB*%;nKL$n5m7r7`7@{7ps%#%RFZf;Xb?U)@u=5iASH zOO1SzIU{|i@KyS>)R>BV3cT+d+a~}A_{q4-(ThFzD{gc|tt$}2Cs|>qgab(n+HcAn zUQ84h+j+*W%P&!)ASWlMU}fe1R>&KPz5DeRV8}4WX|km>mBg<-@^xq8;X_f{M)qhB zoiZ{0fq}xhox>#Jhz(xTew?r}81Hx9eo}I5`KolTPTVBv6lKzUJu-^1ti5j`JKd>> zdQ1}PY-B{EJE>aG!kAHnD4paCH5fm++~Myzkbf+HL;AS)rOE^Lw~2IVwH`54c*!w& z;UZ20F&Zm<$M^d`qVxrfiVwIm}A6v)+zTbCma@ zgh~5k$+}O@I*7a}T02fmgzepFYF;zG&|Zz#+? zveR@*UXgq=qU;OerM!1zVeZ3a&W4$J5SVk$nEw_K@PPW6zHOWz16b4j@&h}vQMHui zz99o%uukBz*sRdogXO_K3u6$|#+)6AvsB|l6|M6u6~+znsUovoN}u9$M(i)dWvbPX zex%=k{nj3FF(PO*syz8iGi*nZ-z}qvrK&Ax5W*Jj>o#D-Yl)Cf-9-fHz37X;5oGIY zAYK`I?%@$v!R|#&f3wCFH5^vTBcErubE=+FeLpX}SeP9^oMt7kHs zkPw>51j)=!-3dX&7TGt=8iagXLrDp9vaFLQrKIj15FoV4zw(fwXpFSFWuHDuz{Z^1 zp9u|P1hcl<)-V&VRApMzGznu{2I$zF3lwGtH`F)#0<*Y#hKu`3nsXenfW&1nnAVxe z%DW?qMJ<9i$E7)i#k-}QeH)|?3WtX*;}6NM4v+Xosv&BFIIaryw}6++#Kdmo$Dj5q zzL^vlX*zGgn=2K;T9RyxPK-R}yoGtDi>CPHoFijuZ0L2Ex@z4XUr_}Wd}>`OBvg(4 zgV9S{&J*Rt%AUPf80~EOOXFt>^0O}=yCTWel+`9VqsoDwccbUJ66G)yrf}82>%02& z@6xNXbN9wJSy@LHfQQ1#52QQv{M56nkY|fHIiyi-T-4Ra`(oj2eJhNd*Il(>8(HyU{$=J zevx}t3m@11q*B0H(~0^Ahj92TP$IoR@syx}EtZpaXlI~oVow5G?zd2on4Z+aZP9{Y zSB7!0EK3o_j9mTj;?=K3eGI9(%okU<|7t{7a8Li!hX@07&_8K=BYJsIlrCLIYiKh5 z9&k@X#8*%S{XphH5H43(tCWU}^<)su!s=0?{i$1spuD~VSEQX?b#4}b+_zZP;OJQ3U zaIXojq=G~{@cX52xm3q{C=Mpc%jN?A;Blxb%hb$Hoi|xhokRsmlU%_dyTS9oD;;e8 zqF{}LY(gq#bm-Y7Ys2~wym{gOBx(?4J9>N*ehWxp-Hz>cDYv4$So_e;*&!8(HYfwd zG48=6hY-U(QP!pyDsj<5yn>omXZ=|w?x=yIoDntk9($crb(~q8rfD~IcWRKAy9yQ9 z+GnY*TET!KX)Jq8n&=?C$he$CB*i+S zF!8D&t7Q1RMgGG!>zv2HL$-000ba%XDVi{Uhb$$YO<&*=e4oW8K(#nIgc`k#LPd`k zw?lqadJCY|F3o9qUkurD3%sPxo#MGN09Jea3KC?vXoVkP0|OvE&po5(65_OLs%L^7 z(X)spAq$9OpmsG`lKCEePsbFf(RU%>W2rIo#91QWU>>Z7QJL`jjD+5T0z|^P%Z}-Y zAcwPXM;n#2=EITA`EfSWB=Ux2nt(7zi5A1$HuX&q? z43Q?2=cOC&e>b<9thh9L9ys&(7NE*hOQ*%Y(wcK4!*Dt-u5tQ{;cNbuap#sFI+EfQ z|M40?75x;Q#VFb6_f(%oQ#_=|ey+d7qJxE|CMNG~xPNZ0;EwRo9Co_xI-JUnSJU5Gj%OHOS)$MQlfZGZZTJv4z}*9`8^%X^Na zDPQZzS{{M|ygaXkSG$}Y8Bo|KL-JQyHg-eVL=cXR+ce5CvgOClW?{54;h&SZVqW%( zf2HVid0A&wth~w?TyOxxQyF=NPrJ~*12s$Tl5men*qg{n$zbRfb{eqYJb9-{58G2GES8X$% zKeCyO*U#F<&Io2Zp{v)i@^vh=HN2>85qO&H(!ya}sQJ=^7d_}p zsFTm#8xOJKZ_zZ7YhGk^d5qu+_vEolq}4-dr3P!HkwEnafDEidf=+4=UNe zgkRuw4eLMYOY;Qvl{1wi+#Pm#C=1hMXqnpIEi49e(XWH)z0G}+VP6VYrV>4pmxrzW z5>F%P@MNFLfeZh6GDoTM!m95)0 zsI?_&PQ%b(-ANA_P@EC0{PoP)&evjnhiTMH~oO6mhizp}nFf58Bj==i*gj{|*P2G6#J)$b@lt2cX8 z3V9H!LU0M@kC}DZ;*-qPb{YqaOiwf#Ko`bf+Io^~%RIZpO%b)}YJ{7&deHcMO-r>u zmkH8aA*lSDTw&q1h#!7QY*6(GO;7}PB>s{B|sJ3izrHEqP7l_K~XVdqR%@3fXnSF9v9m|s+ zrLfU6LXKXWq-j95mH@L=;OZ>V;q)_PFICboO+t&4%_&6+H+G8H1u6S4Al!32ue4H_ zzja3PFaPjH=|`jDz6nVySvGb_&knWu@l;CJQQk07`8*jNka$q^@tEr$OWzL#0e`M` z|G(aF1^Q{y3%sm|+eiMgh*491v2IODsi#s;r{3KwwICo41`Wn?%Ow`nwD_hG&0{86 zqw?E)0_Bi_X-qoPP*jo4d<|qO$;?m`mTfiUpfFtH3ZBibhJkrBfh6kJ56V5zc?y<3 z5$#4LLdoADt2^SfOm#wI@hQpg?G>o`%4xZ44em?Fyp_{3ecqXn_xKCl00J+E8Y=_p z@`{;Tq$9b8Se(kz3(_~q&c3GlB4xqd z{Trj5AUl0VLz(ybo52n-a`X^Qv$m_K6;MAUvdy<(<6HNDw(9(qbGsW@cPMWa zx5D96=ff}Aw!53jHS(Xlw$$o6)~Mv`vbZ@q@{Cin1qK*7mf${v!wa(U zSqwuJPh(XLQwSyS-fOJB5^HK;{$8@2(cJP;$}mbySs|Do>BgDiP@7rruxwDrmLryu zH(W&Evfh0c@?sq9vMeL?Ql7mH`pX5W*!hL5o850q=kAY*nhWOL?2J5FUOWHdu*btT zFD+seN#NjgSR(PGDMY*T0^hhElAFq9BmQ&=F?}>p^_<(`(JqzQc-L^~F+w2ClHFx! zBFoX@XJ6l0bUO(Do}OS&#p=RLXnepvu?#3hD~PGT@&ytp~PqnB0F3^I0l;7k;%# z?pfzz*ospGzBEoQ%rBOtwh`2T_9sv?6=`s?vJyLHzwC60QJ5vKO7PwVjr!U0ORV?I zc>6jT>DTkEA8BYnF3D3%d=?gJmtz`%-op)DgIL1cZl_epE1{H1gK6oG)Em>BV3WWa zdB3w^>uX@#Gz|v5E(6S;Qg%mog}!M#vXEk9$W zG0}!Ls{!JSC68q$KgLQN=1{b zMdOlG)*p%A)uY12b1J3G1UecjVC%X>XH4;Py+|}Twvb?=dz&A*gRg7Goc{o-<2b7) zUg!N0xHKC0=}UR&*G-N_pC)xw3|z{KtI3nL8R+35YDHPV0Q*gK`YhFPe4KgNgSX0Jv z%2QN!`}{!Z8V|)4WLY$LA=(gN3A5Zup=IMFl9)IrI$m+WiUljcv7AS!1_HN0C!nFD z*2g`5>@#87R|n<2h8pGe`q7g}s*Rzc3zZ>%#|N@8C@v>Pp9puv4TYIezaGzq10^l9 zi>{C9dM{+V<(qR`OT*IO@Itu!=X%)U##@^t{?|Nri-#pzU@Nm8eIJpCe)M(}QO;I) z?C0lP$o8U=)TUuhqm12x-Eg3ciV#?^JcUHTQ6h*@>Vg%%pg5oy{OL5{&mLPQ4c6<) zw6K%mx0`^%qb3{e)<>Z=fs5{ioo4hZo55M41K3Lhd~W(CbUa^bqMDTtW^PWkjqOTX zKL5N0Oumr!3VvMNj8ND!>a0n(YALpIoxo;>)e#+m&PcRsr!-#l*Cuf4)h4U-HDoIY zJ7P+V5>iu2NHG{%vNbgnLbO}7*P@$Xo8diceAGb7{%718u1@{S!9ob|PfnI!41|}q zbnka<8SXQG>|Fh*%2ZE(CHo%%@pk%O0kH}?Af_ysPB`a@$uydaZ3>R(P-m%F-J!b2 z56L6Y;7F0p(+@{}BqrbB6G;+r7P3b8Pc|4h;=jua75euD<2=y#a>#Wrke5?$aw1f0D5TvW;X0;>^J_0JEP*_{YSWvN6 zw|pmiS8mCkX2D}+7OXGVC5Z)-WySN*D6tv6JA>S8FJjXQIn*Yyr%G7rk-kn+&&Dl8}=3^;bD>d+BDYB{qN|d&xIm#M79VCOgPqEP&I8t9&4I8tVNO)}zMEpJx!BA_diMO#sUWTr1uN^;p&_?V{~XOQ#ccP>jTjh5 zzv=Q0nd`MUmpL@3Z`ytRP8n$b?3Fon*F4_om&gFBL!C3yTR^pOU*@-lX%J5AIA&~x zEtb7zfqylNDQB=VR8#X=k-jyTh7*`=^kqx})z9cd`9zd!5zUYGdMa8cXT4AZSx``n zeG!FEzc_@~=fkM^#)N``YOQS9)?v}}S-VJeSKDpj22$Qg$JFGZp_X1fSNY<$iyoCj zML5s1)i_1ep&F_G^Qklq>hPc33r@XpI`;?r*OyNzGa($6nS7lJVW`4BN zvat#-8uB1tZAr-X2_$!Gs@4s+8kW`N*uH`&j!p*aE)M7qOu1C`ywsZXl(WWyO=VgYw2lD#Y@Y6ZX2s8{;_RU25C)U`Nx*Ga5_NwsCmVO)Xnr)Mw^TLfZKxtMq zBFnAppozZx{G0rIC-s-T575aOLPAWY3^lHPr}Acjc2$>5PkLxx4GZplK1;YMC%j|= zDbl~vl+xSTzK(jhZBqXTa3BBazb8HUP9Qt>W`MO_sZhFUL!rA5X=tcyEUB#^^+f9= zdUjZCeKTMEGW3I#dO1^5wd~JQ$xfS3LbUk>jSKL^;?+cl#h<7j2A);vS-w#12viPY zPwBuR{uc*2w>w8)(cL2K2=+2`?6udj^oe8crSwMsQFB&L#jC%#wI2uA(hb8Ix-|}H z6aW|K(|3-9W48dNbx-ja!|T6T1aDl9i)S!PN9HdGMc+BO1qjVC9^9a+Aa_q-$!h@Z-Piwg zw7-AvMEc2oJ#pvJ8VD~b31@rwl!G{wq-bq?F0r7=D9*F}0j zb$AcfeJak=qm#{;pIjoe{BZuOFfD@`zXSn2r5%L&1zbn%hcV*LRjjl18I|H1_W1+X z;r(CvY}e1ER&xO8*EpYB+xBJU%EnKb;U8h8H|zh3l;J-Kef|&8RTMpDe8j$Eh-dIp zcy%S!c5)+5%k~*@va;)UUZVY{nCq0q^73BP5*ypBtaAW>Zt>rp_7|ogP*?xZbE%eB z!vn@iIFy~*DJ6HcV~Z_=uaGau@RJ;u74hqYQt|lJ&Oa?g_KweRw;oNX;)MTb{)|ba zDVfLO2lfVUr?V>5DB}EY(`){x>3#lWP@b^xQ*%_PX!oPf)OP>XKP^N`eeo}genICD z79w$C_X>3$W9*;v&(^^kH-A}O=$+|4pZ|R}(axVM`gDFsWt3Q)ozgP#s^q6dguw(L z#t^t_x(a$tgNj~MxyQs9^b}z3bg&hZzNJ+YZE@5XWuH~)gd3#%@de;6LF>Q8#nmKw zOz*R&bgNovV1>O6vUDYEuIVsv#1>>%JR>Gf{jx|`0Q`a+Gd85^6oes#S>m#^U?@bF z&^{?&yuzs*rwiK`J%?E1Jsb^g_GwKP9}Rki&Id*sFenEEOa+Ba*w|WB7%$Cn@Blkx zRq{oqp>eL{#}sc9ee^diX^xsgV@$Xo*<7~#sy1xYJFPI#eyU{lXueR_(Un`FPX{EJ zJS^8*TT4RpHjTR4Z$Miqs{83qzN>M=!{{D+FH_UP_)5(KP1oqB%1N^tq~wglF3txA zc%ZpdU^xUjJtkDkZfC4yZAsx~gB#rilYgKGVeNb4k{-E_RV!9VZV3k##If^ptuyPj zuXqCm6IpDa7(?#{EN5<#pUWM4UEw<#Rg8+wrO($@MIVgFUv@Qh6^6|W@fs(18)H_* z_h+<%^cFPjkhv>B7f>kKltcKoCkwHIgnZc?m1G{&W(wkkY0vXB{G}va#kZA`y;8l% zgX)mX0Th^9o_3N1sM1N>*6%S!G@Nea{K4I*F1v~?Jp3qDmcM_ephJ`8b$GT~zUiYE z$o=x4;eACU^UuFASJ#!>wF8;u5&R%V#wCeGaqn_R1I)%tw^hxfAwkNaHajQD4?lqG zo!nGZ3Gf98I|`0t{U)NV^i>j%+MhK<6S~uGi;uR)8W{7^T_p+K87&oDY96HTg9lb} z35fDl8V+Aw1FNA1bB$K3R4?L!bSi!-+eUOIhWHPaM7m|8N_#WHlZ9zrW1qoz-_K(f zsZ73QZ)F~eb7ihmYEqVZs;*y8O}~Yoj5t5$S=S-@T*EGbK~@rRsfSSf^z9+989ClU zP^#VgxjgOpSRjWWFLw!n>HRVULl$WPuotyCX*g{Q{oy}=S(?>R!8T*rrwCn?xK_4d zwH(lN4OdR(7Z6HH84^01K#T;y_Nd?Mf!cH80Ex)^rElR%t#Z1=(~1W%gzmJpd~FYr ziwTn@6|6g%y%5GnLwUKYt7;U4)pi2>#CR5AN{I-5N5|ah2(WVcfJAgG1DgyDH|!By zkH4K=QcU@UKDXZ{*I~hS1Zg<3(m2lL6CR6b%D~5t5JILf5Dp@iUqUY}TfaQ|C*_jM^Ms4nGU~xZXZV|7-;wW zdd>E}ROpM`)miAQHOJf76cmncH#;^zzj5{1`>L$Yi&BIJ*SLO{f}GEC+$|tHWD$0S z;~B6exsp%_O4i7QrBFTbxAhnn|22XZK-(U`yFa z%aH?F`jZvt;Xb%h*0kq3sjEGhdfG}bWjc_}5!n2_Z z{QRIVAm(h$j}_d%!r@3Lt}zT9A#NBUWq%tpyQQx>8$xKVMwx2xcby z0ov%)E*TMoL6ox6v)mw!){7Es{RSq=r=w%nP-xzi264jAXv;nFZUM<5-1r@H6D{Rh zQ15sD!t|_3jtDKKNy86#?^HYJumKS z$T<(wV`vtT>Y;bvm zHi;r)$tOeAP;7}L3{_CuW-gGkZ@XjYv4kuv;@1tS!_*|FiGjhm^vYcjvSFG7#@)Wm zLN&|<{VWE|3>^?n1VpJWjx}hLkGMF+l!xM*;0~QTJ5_7X=7f;c?r6azMuo{E&%VC! zpF3~)-5;uusg-5Q?ncSs4gv8$tlK{;m|7;L2kSBAwrP?==XdthA1zqvJnwqe+0&_J zRtWE_|DnohY<#Ujx)`f~WRB0*0D`s2EPOa?T;7+JEm%<%uuYWOp_2os_9OMk!i3)k2cQ0#k3soo z&h%ivg;(N#oIVUiBw8xF?9(~QYaA~TWvOo3g$0Gq3r$aTLe;4c*xA(9B)2SNvhQk+ zAIR!xKUvx^z}<7(n$qg^`&S-& zo4@B1z(Om1B~r}xi0{?7K1~A0Sl-G!M@Oifirk2$FgsVIJ)dcPc8XLYd!bi+tVp=y z)+!0Ka~HZp6_yeO6*o9F}4Gllxv$30#g zla9P%c0EjZe3O8j3(IrDr|p5_)TicQ*qJNMsF=*45+7U7w`*R z1S@_(d>K1_^;=#YJHI{cDr+0;y#hs8G*RKHLgVBWNUs&2j*d_?`RV7Xa6K5i_JCcN z0;}4KHGIsxFbIitFa)vGkhQ@sCOj8aCu;&3$SY+JPqoEcY&_#IWq4wFz7%fgD$*}h zPyY+L)Hmt(y;ERMzH}UB%fvMBf@pUDGQ?r)3IOm?{W9O>t=<#RXi2j@h&$B zM8``3Hm0ey^-C=k+*V3pa@fldN{qrUjMVCBqH6@9+w(Ews`5weU3B=P4KxMmQS0dH z-Q;!1Oy~~i1>x$w&gQRGLe5MgG-V)Ja`{Ca4C%6WL@+A#| XbdI=;`YP4%h5vMW06LNKcJ}`QCuOKa literal 0 HcmV?d00001 diff --git a/kinesis-lambda-error-handling/kinesis-consumer/kinesis-consumer-fn.py b/kinesis-lambda-error-handling/kinesis-consumer/kinesis-consumer-fn.py new file mode 100644 index 000000000..3419bf939 --- /dev/null +++ b/kinesis-lambda-error-handling/kinesis-consumer/kinesis-consumer-fn.py @@ -0,0 +1,46 @@ +import boto3 +import logging +import os +import base64 +import json + +# Initialize AWS SDK clients Kinesis +kinesis_client = boto3.client('kinesis') + +# Environment variables for lambda function read message from Kinesis data Stream +KINESIS_STREAM_NAME = os.environ['KINESIS_STREAM_NAME'] + +# Initialize and configure logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +# Entry point to the Lambda function +def lambda_handler(event, context): + # Record/Data should match the below Keys and the data type + required_fields = {"AtomicNumber": int, "Element": str, "Symbol": str, "AtomicMass": float} + logger.info(f"Incoming event: --> {event}") + + # Variable to print the Unique sequence of each record + curRecordSequenceNumber = "" + + # Loop through the Records to read each record + for record in event['Records']: + curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] + logger.info(f"Sequence Number of the current record --> {curRecordSequenceNumber}") + + # Convert the base64 data into utf before validating for the schema + payload = json.loads(base64.b64decode(record['kinesis']['data']).decode('utf-8')) + logger.info(f"Individual record content --> {payload}") + if not isinstance(payload, dict): + logger.info("Invalid JSON Data Structure.The parsed data does not adhere to the expeced JSON data structure.") + raise ValueError("Invalid JSON Data Structure", + "The parsed data does not adhere to the expeced JSON data structure.") + + # Verify if the key, value are as per expectations + for key, value_type in required_fields.items(): + if key not in payload: + logger.info(f"Missing '{key}' field in JSON.") + raise ValueError(f"Missing '{key}' field in JSON.") + if not isinstance(payload[key], value_type): + logger.info(f"'{key}' field should be of type {value_type.__name__}.") + raise ValueError(f"'{key}' field should be of type {value_type.__name__}.") \ No newline at end of file diff --git a/kinesis-lambda-error-handling/kinesis-producer/test-records-with-poison-pill.json b/kinesis-lambda-error-handling/kinesis-producer/test-records-with-poison-pill.json new file mode 100644 index 000000000..fc65f9eb8 --- /dev/null +++ b/kinesis-lambda-error-handling/kinesis-producer/test-records-with-poison-pill.json @@ -0,0 +1,10 @@ +[ + { + "Data": "ewogICAiQXRvbWljTnVtYmVyIjogNCwKICAgIkVsZW1lbnQiOiAiQmVyeWxsaXVtIiwKICAgIlN5bWJvbCI6ICJCZSIsCiAgICJBdG9taWNNYXNzIjogOS4wMTIKfQ==", + "PartitionKey": "key" + }, + { + "Data": "ewogICAiQXRvbWljTnVtYmVyIjogNSwKICAgIkVsZW1lbnQiOiAiQm9yb24iLAogICAiU3ltYm9sIjogIkIiLAogICAiQXRvbWljTWFzcyI6ICIxMC44MTEiIAp9", + "PartitionKey": "key" + } +] \ No newline at end of file diff --git a/kinesis-lambda-error-handling/kinesis-producer/test-records-without-poison-pill.json b/kinesis-lambda-error-handling/kinesis-producer/test-records-without-poison-pill.json new file mode 100644 index 000000000..5315713b5 --- /dev/null +++ b/kinesis-lambda-error-handling/kinesis-producer/test-records-without-poison-pill.json @@ -0,0 +1,14 @@ +[ + { + "Data": "ewogICAiQXRvbWljTnVtYmVyIjogMSwKICAgIkVsZW1lbnQiOiAiSHlkcm9nZW4iLAogICAiU3ltYm9sIjogIkgiLAogICAiQXRvbWljTWFzcyI6IDEuMDA3Cn0=", + "PartitionKey": "key" + }, + { + "Data": "ewogICAiQXRvbWljTnVtYmVyIjogMiwKICAgIkVsZW1lbnQiOiAiSGVsaXVtIiwKICAgIlN5bWJvbCI6ICJIZSIsCiAgICJBdG9taWNNYXNzIjogNC4wMDIKfQ==", + "PartitionKey": "key" + }, + { + "Data": "ewogICAiQXRvbWljTnVtYmVyIjogMywKICAgIkVsZW1lbnQiOiAiTGl0aGl1bSIsCiAgICJTeW1ib2wiOiAiTGkiLAogICAiQXRvbWljTWFzcyI6IDYuOTQxCiB9", + "PartitionKey": "key" + } +] \ No newline at end of file diff --git a/kinesis-lambda-error-handling/kinesis-producer/with-poison-pill-put-records.sh b/kinesis-lambda-error-handling/kinesis-producer/with-poison-pill-put-records.sh new file mode 100644 index 000000000..eeba8f9ad --- /dev/null +++ b/kinesis-lambda-error-handling/kinesis-producer/with-poison-pill-put-records.sh @@ -0,0 +1,3 @@ +aws kinesis put-records \ + --stream-name DataActivityKinesisStream \ + --records file://kinesis-producer/test-records-with-poison-pill.json \ No newline at end of file diff --git a/kinesis-lambda-error-handling/kinesis-producer/without-poison-pill-put-records.sh b/kinesis-lambda-error-handling/kinesis-producer/without-poison-pill-put-records.sh new file mode 100644 index 000000000..aec1bbbd1 --- /dev/null +++ b/kinesis-lambda-error-handling/kinesis-producer/without-poison-pill-put-records.sh @@ -0,0 +1,3 @@ +aws kinesis put-records \ + --stream-name DataActivityKinesisStream \ + --records file://kinesis-producer/test-records-without-poison-pill.json \ No newline at end of file diff --git a/kinesis-lambda-error-handling/template.yaml b/kinesis-lambda-error-handling/template.yaml new file mode 100644 index 000000000..d47c96fce --- /dev/null +++ b/kinesis-lambda-error-handling/template.yaml @@ -0,0 +1,90 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: 'AWS::Serverless-2016-10-31' +Description: > + AWS SAM Template for Serverless pattern + Kinesis DataStream triggers Lambda with error handling and invalid record is put into SQS for further analysis + AWS Services used: AWS Lambda, SQS, Kineis, CloudWatch, IAM + Cloud9 is recommended as it comes with all the neceassry packages and cli to run this Serverless pattern + +Resources: + # Create the Kinesis Data Stream + DataActivityKinesisStream: + Type: 'AWS::Kinesis::Stream' + Properties: + Name: DataActivityKinesisStream + RetentionPeriodHours: 25 + ShardCount: 1 + Tags: + - Key: AppName + Value: serverless-pattern + + # Create the lambda function to consume the message from Kinesis Data Stream, process and then decide if it's. valid or not. + KinesisDataStreamProcessor: + Type: 'AWS::Serverless::Function' + Properties: + FunctionName: KinesisDataStreamProcessor + CodeUri: kinesis-consumer/ + Handler: kinesis-consumer-fn.lambda_handler + Runtime: python3.9 + MemorySize: 128 + Timeout: 10 + Tracing: Active + Tags: + AppName: serverless-pattern + Environment: + Variables: + KINESIS_STREAM_NAME: !Ref DataActivityKinesisStream + Policies: + - SQSSendMessagePolicy: + QueueName: !GetAtt AnomalyDataQueue.QueueName + Events: + DataActivityKinesisEvent: + Type: Kinesis + Properties: + Stream: !GetAtt DataActivityKinesisStream.Arn + BatchSize: 5 + DestinationConfig: + OnFailure: + Type: SQS + Destination: !GetAtt AnomalyDataQueue.Arn + MaximumRetryAttempts: 5 + StartingPosition: TRIM_HORIZON + MaximumRecordAgeInSeconds: 3600 + BisectBatchOnFunctionError: true + # Refer to below link for other values for starting position + # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartingPosition.html + + # Create an SQS queue to put the failed messages + AnomalyDataQueue: + Type: 'AWS::SQS::Queue' + Properties: + QueueName: AnomalyDataQueue + Tags: + - Key: AppName + Value: serverless-pattern + +# Create the cloudwatch log group to validate and verify the application run status + EventMonitor: + Type: 'AWS::Logs::LogGroup' + Properties: + LogGroupName: KinesisDataStreamProcessorLogs + Tags: + - Key: AppName + Value: serverless-pattern + +Outputs: + KinesisStreamName: + Description: Kinesis Stream Name + Value: !Ref DataActivityKinesisStream + + LambdaFunctionName: + Description: Lambda Function Name + Value: !Ref KinesisDataStreamProcessor + + SQSQueueName: + Description: SQS Queue Name + Value: !GetAtt AnomalyDataQueue.QueueName + + CloudWatchLogGroupName: + Description: CloudWatch Group Name + Value: !Ref EventMonitor \ No newline at end of file diff --git a/kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh b/kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh new file mode 100644 index 000000000..3efacb78f --- /dev/null +++ b/kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh @@ -0,0 +1,6 @@ +# AWS CLI command to read the 10 messages from the AWS SQS queue +aws sqs receive-message \ + --queue-url https://sqs.us-east-1.amazonaws.com//MyQueue \ + --attribute-names All \ + --message-attribute-names All \ + --max-number-of-messages 10 \ No newline at end of file From c1ca2c23d1858a5006762e75871b650b34e748d5 Mon Sep 17 00:00:00 2001 From: Manju Date: Wed, 6 Sep 2023 11:36:04 -0400 Subject: [PATCH 2/4] fix to address the typos and queue name --- kinesis-lambda-error-handling/README.md | 10 +++++++--- .../validation-scripts/read-sqs-queue.sh | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/kinesis-lambda-error-handling/README.md b/kinesis-lambda-error-handling/README.md index 37b2b5bc2..ff28dc1a6 100644 --- a/kinesis-lambda-error-handling/README.md +++ b/kinesis-lambda-error-handling/README.md @@ -67,7 +67,7 @@ chmod +x kinesis-producer/*.sh; ./kinesis-producer/without-poison-pill-put-records.sh ``` -- Navigate to AWS Console, and then to Cloudwatch, Log groups, and select the log group KinesisDataStreamProcessorLog and the latest Log stream +- Navigate to AWS Console, and then to Cloudwatch, Log groups, select the log group and the latest Log stream - In the logs, you should see all the 3 messages processed without any exception. ### Scenario 2: Put messages with the Poision pill @@ -82,8 +82,12 @@ chmod +x kinesis-producer/*.sh; ## Validation After the Scenario 2, the invalid/poison pill message is put in the SQS queue for the further research. -Replace the account number in the validation-scripts/read-sqs-queue.sh and run the script as below to see the message details. +Replace the AWSACCOUNTID with the AWS account number in the validation-scripts/read-sqs-queue.sh +``` +vi validation-scripts/read-sqs-queue.sh +``` +Run the script as below to see the message details. ``` chmod +x validation-scripts/read-sqs-queue.sh; ./validation-scripts/read-sqs-queue.sh @@ -101,4 +105,4 @@ sam delete - [Amazon Kinesis Data Generator](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) ---- Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 \ No newline at end of file +SPDX-License-Identifier: MIT-0 diff --git a/kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh b/kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh index 3efacb78f..57346406a 100644 --- a/kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh +++ b/kinesis-lambda-error-handling/validation-scripts/read-sqs-queue.sh @@ -1,6 +1,6 @@ # AWS CLI command to read the 10 messages from the AWS SQS queue aws sqs receive-message \ - --queue-url https://sqs.us-east-1.amazonaws.com//MyQueue \ + --queue-url https://sqs.us-east-1.amazonaws.com/AWSACCOUNTID/AnomalyDataQueue \ --attribute-names All \ --message-attribute-names All \ - --max-number-of-messages 10 \ No newline at end of file + --max-number-of-messages 10 From e2a1064b95040d97c225f4656cdcd4b9944f7f34 Mon Sep 17 00:00:00 2001 From: Manju Date: Thu, 7 Sep 2023 06:35:50 -0400 Subject: [PATCH 3/4] updated the language and framework values --- kinesis-lambda-error-handling/example-pattern.json | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kinesis-lambda-error-handling/example-pattern.json b/kinesis-lambda-error-handling/example-pattern.json index 6dd3dec67..c42a10b34 100644 --- a/kinesis-lambda-error-handling/example-pattern.json +++ b/kinesis-lambda-error-handling/example-pattern.json @@ -1,9 +1,9 @@ { "title": "Effective consumer strategies for handling Kinesis Data Stream anomalies", "description": "To showcase on how to handle the consumer(AWS Lambda) failure when reading the records from the Amazon Kinesis data stream.", - "language": "", + "language": "Python", "level": "200", - "framework": "AWS SAM", + "framework": "SAM", "introBox": { "headline": "How it works", "text": [ @@ -41,7 +41,6 @@ }, "deploy": { "text": [ - "sam build", "sam deploy --guided" ] }, From f3cdafb555388f4edbaa78f584d374c7cd8f5778 Mon Sep 17 00:00:00 2001 From: Manju Date: Mon, 18 Sep 2023 10:07:56 -0400 Subject: [PATCH 4/4] addressing the review comments --- kinesis-lambda-error-handling/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kinesis-lambda-error-handling/README.md b/kinesis-lambda-error-handling/README.md index ff28dc1a6..817e7ae86 100644 --- a/kinesis-lambda-error-handling/README.md +++ b/kinesis-lambda-error-handling/README.md @@ -49,7 +49,7 @@ Important: this application uses various AWS services and there are costs associ Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. ## How it works -![Reference Architecture](/images/kinesis-lambda-error-handling.jpg) +![Reference Architecture](./images/kinesis-lambda-error-handling.jpg) The pattern builds infrastructure with Amazon Kinesis data stream, a consumer Lambda function, SQS queue to load the failed records for further troubleshooting, and cloudwatch to validate the logs for the success, failure with retries. @@ -66,9 +66,7 @@ To test the pattern, we are breaking the test cases into two scenarios. chmod +x kinesis-producer/*.sh; ./kinesis-producer/without-poison-pill-put-records.sh ``` - -- Navigate to AWS Console, and then to Cloudwatch, Log groups, select the log group and the latest Log stream -- In the logs, you should see all the 3 messages processed without any exception. +- In the Cloudwatch logs, you should see all the 3 messages processed without any exception. ### Scenario 2: Put messages with the Poision pill - test-records-with-poison-pill.json --> Holds the base64 data with partition key @@ -78,7 +76,7 @@ chmod +x kinesis-producer/*.sh; ./kinesis-producer/with-poison-pill-put-records.sh ``` -- Cloudwatch logs shows that there was one invalid or poison message, so here the bisectonfailure was applied which lead Lambda to split the batch in half and resume each half separately. Maximum retry attempts and maximum record age limit the number of retries on a failed batch. As the retry limit is 5, it will retry 5 times, before the message is put to AWS SQS. +- In the Cloudwatch logs, you should see poison message and Lambda splitting the batch in half to resume each half separately. Finally placing the posion message to AWS SQS for further troubleshooting. ## Validation After the Scenario 2, the invalid/poison pill message is put in the SQS queue for the further research. @@ -96,7 +94,9 @@ chmod +x validation-scripts/read-sqs-queue.sh; We highly recommend to use [Amazon Kinesis Data Generator(KDG)](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) for the high volume testing. The KDG makes it simple to send test data to your Amazon Kinesis stream or Amazon Kinesis Firehose delivery stream. ## Cleanup +``` sam delete +``` ## Reference - [AWS SAM](https://aws.amazon.com/serverless/sam/)