-
Notifications
You must be signed in to change notification settings - Fork 937
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2041 from mialarconchong/miachong-feature-sam-dyn…
…amodb-glue New serverless pattern - sam-dynamodb-glue-transform
- Loading branch information
Showing
10 changed files
with
596 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
# Serverless ETL Pattern with AWS Glue and DynamoDB | ||
|
||
This AWS serverless pattern utilizes the Serverless Applicton Model (SAM) to deploy Glue scripts seamlessly, offering an efficient solution for managing multiple transformations. Users can focus on crafting their Glue scripts without worrying about the underlying infrastructure. This pattern will automatically configure and deploy the correct resources and permissions tailored to the user-specified data, such as the DynamoDB table where transformations will be performed and the associated script name. This approach provides users with a streamlined process to deploy their Glue scripts. | ||
|
||
Learn more about this pattern at Serverless Land Patterns: << Add the live URL here >> | ||
|
||
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. | ||
|
||
## Requirements | ||
|
||
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. | ||
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured | ||
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) | ||
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed | ||
* [Python](https://www.python.org/downloads/) installed | ||
|
||
## Deployment Instructions | ||
|
||
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: | ||
``` | ||
git clone https://github.com/aws-samples/serverless-patterns | ||
``` | ||
1. Change directory to the pattern directory: | ||
``` | ||
cd serverless-patterns/sam-dynamodb-glue-transform/src | ||
``` | ||
1. From the terminal, set your AWS account ID and default region environment variables: | ||
``` | ||
export ACCOUNT=<your-aws-account-id> | ||
export AWS_DEFAULT_REGION=<aws-region> (i.e us-east-1) | ||
``` | ||
1. From the terminal, run the following commands to give `batch-orchestration-script.sh` and `run-job.sh` execution rights: | ||
``` | ||
chmod +x scripts/batch-orchestration-script.sh | ||
chmod +x scripts/run-job.sh | ||
``` | ||
1. Install the toml library, if not already installed: | ||
``` | ||
pip3 install toml | ||
``` | ||
1. Put the glue script you would like to deploy in the `glue_jobs` folder | ||
1. Specify the data for the transformation in the method `class TransformationType` found in the `data/etl_data.py`, following the same format as the sample. | ||
1. Add an entry for your transformation in the method `def getTransformationByName` found in `data/etl_data.py`, following the same format as the sample. | ||
1. Deploy the stack using the following command: | ||
``` | ||
python3 main.py <script_name> | ||
``` | ||
## Testing | ||
1. From the terminal, create a sample DynamoDB table using the following command (this table will be used for testing): | ||
``` | ||
aws dynamodb create-table \ | ||
--table-name TestTable \ | ||
--attribute-definitions \ | ||
AttributeName=id,AttributeType=S \ | ||
--key-schema \ | ||
AttributeName=id,KeyType=HASH \ | ||
--provisioned-throughput \ | ||
ReadCapacityUnits=5,WriteCapacityUnits=5 \ | ||
--table-class STANDARD | ||
``` | ||
If successful, you should see a similar response: | ||
``` | ||
{ | ||
"TableDescription": { | ||
"AttributeDefinitions": [ | ||
{ | ||
"AttributeName": "id", | ||
"AttributeType": "S" | ||
} | ||
], | ||
"TableName": "TestTable", | ||
"KeySchema": [ | ||
{ | ||
"AttributeName": "id", | ||
"KeyType": "HASH" | ||
} | ||
], | ||
"TableStatus": "CREATING", | ||
"CreationDateTime": "2024-01-04T18:40:06.837000-05:00", | ||
"ProvisionedThroughput": { | ||
"NumberOfDecreasesToday": 0, | ||
"ReadCapacityUnits": 5, | ||
"WriteCapacityUnits": 5 | ||
}, | ||
"TableSizeBytes": 0, | ||
"ItemCount": 0, | ||
"TableArn": "arn:aws:dynamodb:<aws-region>:<account-id>:table/TestTable", | ||
: | ||
``` | ||
You can also navigate to DynamoDB in the AWS console to confirm TestTable is there. | ||
1. From the terminal, enable point-in-time-recovery for the TestTable using the following command: | ||
``` | ||
aws dynamodb update-continuous-backups \ | ||
--table-name TestTable \ | ||
--point-in-time-recovery-specification PointInTimeRecoveryEnabled=true | ||
``` | ||
**NOTE: Point-in-time-recovery needs to be enabled on the DynamoDB table for this pattern to work** | ||
1. From the terminal, we will populate TestTable with sample data with the following command: | ||
``` | ||
aws dynamodb put-item \ | ||
--table-name TestTable \ | ||
--item \ | ||
'{"id": {"S": "1"}, "firstName": {"S": "John"}, "lastName": {"S": "Doe"}, "birthday": {"S": "January 1, 1999"}}' | ||
aws dynamodb put-item \ | ||
--table-name TestTable \ | ||
--item \ | ||
'{"id": {"S": "2"}, "firstName": {"S": "Jane"}, "lastName": {"S": "Doe"}, "birthday": {"S": "April 1, 1990"}}' | ||
``` | ||
Take note of the data being added: id, firstName, lastName, and birthday | ||
1. A sample glue script has been created for this test, `glue_jobs/transform_testtable.py`. And the necessary information has already been added to `data/etl_data.py`. | ||
Under `def getTransformationByName`: | ||
``` | ||
if name == 'transform_testtable': | ||
return TransformationType['transform_testtable'].value | ||
``` | ||
Under `class TransformationType`: | ||
``` | ||
transform_testtable = Transformations( | ||
f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/TestTable", | ||
"transform_testtable", | ||
f"testtable-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}" | ||
) | ||
``` | ||
1. Time to deploy! In the terminal, run the following command: | ||
``` | ||
python3 main.py transform_testtable | ||
``` | ||
Once successfully deployed, the glue job will start running automatically. You should see a similar output: | ||
``` | ||
Successfully created/updated stack - TestTableStack in <aws-region> | ||
Running the Glue Job... | ||
{ | ||
"JobRunId": "jr_1ddbec18fb4a0ccd58223d141751bad8491710c15a8c067b91a29a374c8e3434" | ||
} | ||
``` | ||
1. Navigate to AWS Glue on the AWS console. Click on 'ETL jobs', and click on the transformation deployed by this stack 'TransformTestTable' | ||
1. It may take a few minutes for the job to successfully run. But once successful, navigate to DynamoDB on the console, and view the items in the TestTable. You should see that 'birthday' has been renamed to 'dateOfBirth', and a new column 'fullName' was added. | ||
You're all set to continue using this pattern for other Glue scripts! | ||
## Cleanup | ||
1. Delete the TestTable | ||
``` | ||
aws dynamodb delete-table \ | ||
--table-name TestTable | ||
``` | ||
1. Delete the stack | ||
``` | ||
sam delete --stack-name TestTableStack | ||
``` | ||
---- | ||
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
{ | ||
"title": "Serverless ETL Pattern with AWS Glue and DynamoDB ", | ||
"description": "Effortlessly deploy Glue scripts with AWS SAM in this streamlined serverless pattern", | ||
"language": "YAML", | ||
"level": "200", | ||
"framework": "SAM", | ||
"introBox": { | ||
"headline": "How it works", | ||
"text": [ | ||
"This AWS serverless pattern utilizes the Serverless Applicton Model (SAM) to deploy Glue scripts seamlessly, offering an efficient solution for managing multiple transformations.", | ||
"Users can focus on crafting their Glue scripts without worrying about the underlying infrastructure. This pattern will automatically configure and deploy the correct resources and permissions tailored to the user-specified data,", | ||
"such as the DynamoDB table where transformations will be performed and the associated script name. This approach provides users with a streamlined process to deploy their Glue scripts" | ||
] | ||
}, | ||
"gitHub": { | ||
"template": { | ||
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sam-dynamodb-glue-transform", | ||
"templateURL": "serverless-patterns/sam-dynamodb-glue-transform", | ||
"projectFolder": "sam-dynamodb-glue-transform/src", | ||
"templateFile": "template.yml" | ||
} | ||
}, | ||
"resources": { | ||
"bullets": [ | ||
{ | ||
"text": "AWS Serverless Application Model", | ||
"link": "https://aws.amazon.com/serverless/sam/" | ||
}, | ||
{ | ||
"text": "AWS Glue", | ||
"link": "https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html" | ||
} | ||
] | ||
}, | ||
"deploy": { | ||
"text": [ | ||
"sam deploy" | ||
] | ||
}, | ||
"testing": { | ||
"text": [ | ||
"See the GitHub repo for detailed testing instructions." | ||
] | ||
}, | ||
"cleanup": { | ||
"text": [ | ||
"Delete the stack: <code>sam delete</code>." | ||
] | ||
}, | ||
"authors": [ | ||
{ | ||
"name": "Mia Alarcon Chong", | ||
"image": "", | ||
"bio": "AWS Cloud Application Developer", | ||
"linkedin": "mialarcon" | ||
} | ||
] | ||
} |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import os | ||
from enum import Enum | ||
|
||
|
||
def getTransformationByName(name): | ||
if name == 'transform_testtable': | ||
return TransformationType['transform_testtable'].value | ||
|
||
#if another transformation needs to be added, add an additional entry for it here, liek the sample below | ||
elif name == 'ADDITIONAL_SAMPLE_TRANSFORM': | ||
return TransformationType['ADDITIONAL_SAMPLE_TRANSFORM'].value | ||
|
||
class Transformations: | ||
def __init__(self, source_table_arn, transformation_name, s3_bucket_name): | ||
self.source_table_arn = source_table_arn | ||
self.transformation_name = transformation_name | ||
self.s3_bucket_name = s3_bucket_name | ||
|
||
def getSourceTableArn(self): | ||
return self.source_table_arn | ||
|
||
def getTransformationName(self): | ||
return self.transformation_name | ||
|
||
def getS3BucketName(self): | ||
return self.s3_bucket_name | ||
|
||
|
||
class TransformationType(Enum): | ||
transform_testtable = Transformations( | ||
f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/TestTable", | ||
"transform_testtable", | ||
f"testtable-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}" | ||
) | ||
|
||
# if another transformation needs to be added, add the required data here | ||
|
||
ADDITIONAL_SAMPLE_TRANSFORM = Transformations( | ||
f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/<insert-table-name>", # table ARN where transformations will be done | ||
"script_name", # script/transformation name, this script should be found under the glue_jobs folder | ||
f"<insert-table-name>-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}" # glue assets s3 bucket for the table | ||
) |
96 changes: 96 additions & 0 deletions
96
sam-dynamodb-glue-transform/src/glue_jobs/transform_testtable.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import sys | ||
from awsglue.transforms import * | ||
from pyspark.context import SparkContext | ||
from awsglue.context import GlueContext | ||
from awsglue.job import Job | ||
from awsglue.utils import getResolvedOptions | ||
from awsglue.dynamicframe import DynamicFrame | ||
from awsglue.dynamicframe import DynamicFrameCollection | ||
from pyspark.sql import SparkSession | ||
from awsglue.transforms import DropNullFields | ||
import pyspark.sql.functions as f | ||
|
||
spark = SparkSession.builder.getOrCreate() | ||
args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_NAME", "SOURCE_TABLE_ARN"]) | ||
glueContext = GlueContext(SparkContext.getOrCreate()) | ||
job = Job(glueContext) | ||
job.init(args["JOB_NAME"], args) | ||
bucket_name = args["BUCKET_NAME"] | ||
source_table_arn = args["SOURCE_TABLE_ARN"] | ||
|
||
config = { | ||
"sourceTableARN": source_table_arn, | ||
"targetTableName": "TestTable", | ||
# below s3 bucket should be present | ||
"s3BucketName": bucket_name, | ||
# if below key path does not exist it will be created automatically given bucket exists | ||
"s3KeyName": "temporary/ddbexport/table/testtable" | ||
} | ||
|
||
# Script generated for node Custom Transform | ||
def MyTransform(glueContext, dfc) -> DynamicFrameCollection: | ||
selected_dynf = dfc.select(list(dfc.keys())[0]) | ||
selected_df = selected_dynf.toDF() | ||
print("Original DataFrame") | ||
selected_df.show(n=5, vertical=True, truncate=False) | ||
|
||
# example transformations | ||
|
||
# Add new attribute "fullName" using the existing firstName and lastName values for each item | ||
selected_df = selected_df.withColumn( | ||
"fullName", f.when( | ||
(f.col("firstName").isNotNull() & f.col("lastName").isNotNull()), | ||
f.concat_ws(" ", f.col("firstName"), f.col("lastName")), | ||
) | ||
) | ||
|
||
# Renaming the column "birthday" to "dateOfBirth" for each item | ||
selected_df = selected_df.withColumn( | ||
"dateOfBirth", f.when( | ||
f.col("birthday").isNotNull(), | ||
f.col("birthday") | ||
) | ||
) | ||
|
||
selected_df = selected_df.drop(f.col("birthday")) # dropping because new renamed column was created | ||
|
||
modified_dynf = DynamicFrame.fromDF(selected_df, glueContext, "modified_dynf") | ||
modified_dynf = DropNullFields.apply(modified_dynf) | ||
|
||
print("Modified DataFrame") | ||
modified_dynf.toDF().show(n=5, vertical=True, truncate=False) | ||
return modified_dynf | ||
|
||
source = glueContext.create_dynamic_frame.from_options( | ||
connection_type="dynamodb", | ||
connection_options={ | ||
"dynamodb.export": "ddb", | ||
"dynamodb.s3.bucket": config["s3BucketName"], | ||
"dynamodb.s3.prefix": config["s3KeyName"], | ||
"dynamodb.tableArn": config["sourceTableARN"], | ||
"dynamodb.unnestDDBJson": True, | ||
}, | ||
transformation_ctx="source", | ||
) | ||
source.printSchema() | ||
|
||
df = source.toDF() | ||
df.show() | ||
|
||
# Script generated for node Custom Transform | ||
transformation0 = MyTransform( | ||
glueContext, | ||
DynamicFrameCollection({"source": source}, glueContext), | ||
) | ||
|
||
glueContext.write_dynamic_frame_from_options( | ||
frame=transformation0, | ||
connection_type="dynamodb", | ||
connection_options={ | ||
"dynamodb.output.tableName": config["targetTableName"], | ||
"dynamodb.output.retry": "50", | ||
"dynamodb.throughput.write.percent": "1.0" | ||
} | ||
) | ||
|
||
job.commit() |
Oops, something went wrong.