Skip to content

Commit

Permalink
Serverless ETL Pattern with AWS Glue and DynamoDB
Browse files Browse the repository at this point in the history
adding scripts folder

edits to test glue script
  • Loading branch information
mialarconchong committed Jan 12, 2024
1 parent c4d882d commit 2290773
Show file tree
Hide file tree
Showing 10 changed files with 596 additions and 0 deletions.
177 changes: 177 additions & 0 deletions sam-dynamodb-glue-transform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Serverless ETL Pattern with AWS Glue and DynamoDB

This AWS serverless pattern utilizes the Serverless Applicton Model (SAM) to deploy Glue scripts seamlessly, offering an efficient solution for managing multiple transformations. Users can focus on crafting their Glue scripts without worrying about the underlying infrastructure. This pattern will automatically configure and deploy the correct resources and permissions tailored to the user-specified data, such as the DynamoDB table where transformations will be performed and the associated script name. This approach provides users with a streamlined process to deploy their Glue scripts.

Learn more about this pattern at Serverless Land Patterns: << Add the live URL here >>

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed
* [Python](https://www.python.org/downloads/) installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
1. Change directory to the pattern directory:
```
cd serverless-patterns/sam-dynamodb-glue-transform/src
```
1. From the terminal, set your AWS account ID and default region environment variables:
```
export ACCOUNT=<your-aws-account-id>
export AWS_DEFAULT_REGION=<aws-region> (i.e us-east-1)
```
1. From the terminal, run the following commands to give `batch-orchestration-script.sh` and `run-job.sh` execution rights:
```
chmod +x scripts/batch-orchestration-script.sh
chmod +x scripts/run-job.sh
```
1. Install the toml library, if not already installed:
```
pip3 install toml
```
1. Put the glue script you would like to deploy in the `glue_jobs` folder
1. Specify the data for the transformation in the method `class TransformationType` found in the `data/etl_data.py`, following the same format as the sample.
1. Add an entry for your transformation in the method `def getTransformationByName` found in `data/etl_data.py`, following the same format as the sample.
1. Deploy the stack using the following command:
```
python3 main.py <script_name>
```
## Testing
1. From the terminal, create a sample DynamoDB table using the following command (this table will be used for testing):
```
aws dynamodb create-table \
--table-name TestTable \
--attribute-definitions \
AttributeName=id,AttributeType=S \
--key-schema \
AttributeName=id,KeyType=HASH \
--provisioned-throughput \
ReadCapacityUnits=5,WriteCapacityUnits=5 \
--table-class STANDARD
```
If successful, you should see a similar response:
```
{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"TableName": "TestTable",
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"TableStatus": "CREATING",
"CreationDateTime": "2024-01-04T18:40:06.837000-05:00",
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:<aws-region>:<account-id>:table/TestTable",
:
```
You can also navigate to DynamoDB in the AWS console to confirm TestTable is there.
1. From the terminal, enable point-in-time-recovery for the TestTable using the following command:
```
aws dynamodb update-continuous-backups \
--table-name TestTable \
--point-in-time-recovery-specification PointInTimeRecoveryEnabled=true
```
**NOTE: Point-in-time-recovery needs to be enabled on the DynamoDB table for this pattern to work**
1. From the terminal, we will populate TestTable with sample data with the following command:
```
aws dynamodb put-item \
--table-name TestTable \
--item \
'{"id": {"S": "1"}, "firstName": {"S": "John"}, "lastName": {"S": "Doe"}, "birthday": {"S": "January 1, 1999"}}'
aws dynamodb put-item \
--table-name TestTable \
--item \
'{"id": {"S": "2"}, "firstName": {"S": "Jane"}, "lastName": {"S": "Doe"}, "birthday": {"S": "April 1, 1990"}}'
```
Take note of the data being added: id, firstName, lastName, and birthday
1. A sample glue script has been created for this test, `glue_jobs/transform_testtable.py`. And the necessary information has already been added to `data/etl_data.py`.
Under `def getTransformationByName`:
```
if name == 'transform_testtable':
return TransformationType['transform_testtable'].value
```
Under `class TransformationType`:
```
transform_testtable = Transformations(
f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/TestTable",
"transform_testtable",
f"testtable-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}"
)
```
1. Time to deploy! In the terminal, run the following command:
```
python3 main.py transform_testtable
```
Once successfully deployed, the glue job will start running automatically. You should see a similar output:
```
Successfully created/updated stack - TestTableStack in <aws-region>
Running the Glue Job...
{
"JobRunId": "jr_1ddbec18fb4a0ccd58223d141751bad8491710c15a8c067b91a29a374c8e3434"
}
```
1. Navigate to AWS Glue on the AWS console. Click on 'ETL jobs', and click on the transformation deployed by this stack 'TransformTestTable'
1. It may take a few minutes for the job to successfully run. But once successful, navigate to DynamoDB on the console, and view the items in the TestTable. You should see that 'birthday' has been renamed to 'dateOfBirth', and a new column 'fullName' was added.
You're all set to continue using this pattern for other Glue scripts!
## Cleanup
1. Delete the TestTable
```
aws dynamodb delete-table \
--table-name TestTable
```
1. Delete the stack
```
sam delete --stack-name TestTableStack
```
----
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
58 changes: 58 additions & 0 deletions sam-dynamodb-glue-transform/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"title": "Serverless ETL Pattern with AWS Glue and DynamoDB ",
"description": "Effortlessly deploy Glue scripts with AWS SAM in this streamlined serverless pattern",
"language": "YAML",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This AWS serverless pattern utilizes the Serverless Applicton Model (SAM) to deploy Glue scripts seamlessly, offering an efficient solution for managing multiple transformations.",
"Users can focus on crafting their Glue scripts without worrying about the underlying infrastructure. This pattern will automatically configure and deploy the correct resources and permissions tailored to the user-specified data,",
"such as the DynamoDB table where transformations will be performed and the associated script name. This approach provides users with a streamlined process to deploy their Glue scripts"
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sam-dynamodb-glue-transform",
"templateURL": "serverless-patterns/sam-dynamodb-glue-transform",
"projectFolder": "sam-dynamodb-glue-transform/src",
"templateFile": "sam-dynamodb-glue-transform/src/template.yml"
}
},
"resources": {
"bullets": [
{
"text": "AWS Serverless Application Model",
"link": "https://aws.amazon.com/serverless/sam/"
},
{
"text": "AWS Glue",
"link": "https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html"
}
]
},
"deploy": {
"text": [
"sam deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Mia Alarcon Chong",
"image": "",
"bio": "AWS Cloud Application Developer",
"linkedin": "mialarcon"
}
]
}
Empty file.
Empty file.
42 changes: 42 additions & 0 deletions sam-dynamodb-glue-transform/src/data/etl_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
from enum import Enum


def getTransformationByName(name):
if name == 'transform_testtable':
return TransformationType['transform_testtable'].value

#if another transformation needs to be added, add an additional entry for it here, liek the sample below
elif name == 'ADDITIONAL_SAMPLE_TRANSFORM':
return TransformationType['ADDITIONAL_SAMPLE_TRANSFORM'].value

class Transformations:
def __init__(self, source_table_arn, transformation_name, s3_bucket_name):
self.source_table_arn = source_table_arn
self.transformation_name = transformation_name
self.s3_bucket_name = s3_bucket_name

def getSourceTableArn(self):
return self.source_table_arn

def getTransformationName(self):
return self.transformation_name

def getS3BucketName(self):
return self.s3_bucket_name


class TransformationType(Enum):
transform_testtable = Transformations(
f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/TestTable",
"transform_testtable",
f"testtable-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}"
)

# if another transformation needs to be added, add the required data here

ADDITIONAL_SAMPLE_TRANSFORM = Transformations(
f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/<insert-table-name>", # table ARN where transformations will be done
"script_name", # script/transformation name, this script should be found under the glue_jobs folder
f"<insert-table-name>-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}" # glue assets s3 bucket for the table
)
96 changes: 96 additions & 0 deletions sam-dynamodb-glue-transform/src/glue_jobs/transform_testtable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import sys
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.dynamicframe import DynamicFrameCollection
from pyspark.sql import SparkSession
from awsglue.transforms import DropNullFields
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()
args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_NAME", "SOURCE_TABLE_ARN"])
glueContext = GlueContext(SparkContext.getOrCreate())
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
bucket_name = args["BUCKET_NAME"]
source_table_arn = args["SOURCE_TABLE_ARN"]

config = {
"sourceTableARN": source_table_arn,
"targetTableName": "TestTable",
# below s3 bucket should be present
"s3BucketName": bucket_name,
# if below key path does not exist it will be created automatically given bucket exists
"s3KeyName": "temporary/ddbexport/table/testtable"
}

# Script generated for node Custom Transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
selected_dynf = dfc.select(list(dfc.keys())[0])
selected_df = selected_dynf.toDF()
print("Original DataFrame")
selected_df.show(n=5, vertical=True, truncate=False)

# example transformations

# Add new attribute "fullName" using the existing firstName and lastName values for each item
selected_df = selected_df.withColumn(
"fullName", f.when(
(f.col("firstName").isNotNull() & f.col("lastName").isNotNull()),
f.concat_ws(" ", f.col("firstName"), f.col("lastName")),
)
)

# Renaming the column "birthday" to "dateOfBirth" for each item
selected_df = selected_df.withColumn(
"dateOfBirth", f.when(
f.col("birthday").isNotNull(),
f.col("birthday")
)
)

selected_df = selected_df.drop(f.col("birthday")) # dropping because new renamed column was created

modified_dynf = DynamicFrame.fromDF(selected_df, glueContext, "modified_dynf")
modified_dynf = DropNullFields.apply(modified_dynf)

print("Modified DataFrame")
modified_dynf.toDF().show(n=5, vertical=True, truncate=False)
return modified_dynf

source = glueContext.create_dynamic_frame.from_options(
connection_type="dynamodb",
connection_options={
"dynamodb.export": "ddb",
"dynamodb.s3.bucket": config["s3BucketName"],
"dynamodb.s3.prefix": config["s3KeyName"],
"dynamodb.tableArn": config["sourceTableARN"],
"dynamodb.unnestDDBJson": True,
},
transformation_ctx="source",
)
source.printSchema()

df = source.toDF()
df.show()

# Script generated for node Custom Transform
transformation0 = MyTransform(
glueContext,
DynamicFrameCollection({"source": source}, glueContext),
)

glueContext.write_dynamic_frame_from_options(
frame=transformation0,
connection_type="dynamodb",
connection_options={
"dynamodb.output.tableName": config["targetTableName"],
"dynamodb.output.retry": "50",
"dynamodb.throughput.write.percent": "1.0"
}
)

job.commit()
Loading

0 comments on commit 2290773

Please sign in to comment.