diff --git a/sam-dynamodb-glue-transform/README.md b/sam-dynamodb-glue-transform/README.md new file mode 100644 index 000000000..b8181ff07 --- /dev/null +++ b/sam-dynamodb-glue-transform/README.md @@ -0,0 +1,177 @@ +# Serverless ETL Pattern with AWS Glue and DynamoDB + +This AWS serverless pattern utilizes the Serverless Applicton Model (SAM) to deploy Glue scripts seamlessly, offering an efficient solution for managing multiple transformations. Users can focus on crafting their Glue scripts without worrying about the underlying infrastructure. This pattern will automatically configure and deploy the correct resources and permissions tailored to the user-specified data, such as the DynamoDB table where transformations will be performed and the associated script name. This approach provides users with a streamlined process to deploy their Glue scripts. + +Learn more about this pattern at Serverless Land Patterns: << Add the live URL here >> + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* [Python](https://www.python.org/downloads/) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd serverless-patterns/sam-dynamodb-glue-transform/src + ``` +1. From the terminal, set your AWS account ID and default region environment variables: + ``` + export ACCOUNT= + export AWS_DEFAULT_REGION= (i.e us-east-1) + ``` + +1. From the terminal, run the following commands to give `batch-orchestration-script.sh` and `run-job.sh` execution rights: + ``` + chmod +x scripts/batch-orchestration-script.sh + chmod +x scripts/run-job.sh + ``` + +1. Install the toml library, if not already installed: + ``` + pip3 install toml + ``` + +1. Put the glue script you would like to deploy in the `glue_jobs` folder + +1. Specify the data for the transformation in the method `class TransformationType` found in the `data/etl_data.py`, following the same format as the sample. + +1. Add an entry for your transformation in the method `def getTransformationByName` found in `data/etl_data.py`, following the same format as the sample. + +1. Deploy the stack using the following command: + ``` + python3 main.py + ``` + +## Testing + +1. From the terminal, create a sample DynamoDB table using the following command (this table will be used for testing): + ``` + aws dynamodb create-table \ + --table-name TestTable \ + --attribute-definitions \ + AttributeName=id,AttributeType=S \ + --key-schema \ + AttributeName=id,KeyType=HASH \ + --provisioned-throughput \ + ReadCapacityUnits=5,WriteCapacityUnits=5 \ + --table-class STANDARD + ``` + + If successful, you should see a similar response: + ``` + { + "TableDescription": { + "AttributeDefinitions": [ + { + "AttributeName": "id", + "AttributeType": "S" + } + ], + "TableName": "TestTable", + "KeySchema": [ + { + "AttributeName": "id", + "KeyType": "HASH" + } + ], + "TableStatus": "CREATING", + "CreationDateTime": "2024-01-04T18:40:06.837000-05:00", + "ProvisionedThroughput": { + "NumberOfDecreasesToday": 0, + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + }, + "TableSizeBytes": 0, + "ItemCount": 0, + "TableArn": "arn:aws:dynamodb:::table/TestTable", + : + ``` + You can also navigate to DynamoDB in the AWS console to confirm TestTable is there. + +1. From the terminal, enable point-in-time-recovery for the TestTable using the following command: + ``` + aws dynamodb update-continuous-backups \ + --table-name TestTable \ + --point-in-time-recovery-specification PointInTimeRecoveryEnabled=true + ``` + **NOTE: Point-in-time-recovery needs to be enabled on the DynamoDB table for this pattern to work** + + +1. From the terminal, we will populate TestTable with sample data with the following command: + ``` + aws dynamodb put-item \ + --table-name TestTable \ + --item \ + '{"id": {"S": "1"}, "firstName": {"S": "John"}, "lastName": {"S": "Doe"}, "birthday": {"S": "January 1, 1999"}}' + + aws dynamodb put-item \ + --table-name TestTable \ + --item \ + '{"id": {"S": "2"}, "firstName": {"S": "Jane"}, "lastName": {"S": "Doe"}, "birthday": {"S": "April 1, 1990"}}' + ``` + + Take note of the data being added: id, firstName, lastName, and birthday + +1. A sample glue script has been created for this test, `glue_jobs/transform_testtable.py`. And the necessary information has already been added to `data/etl_data.py`. + + Under `def getTransformationByName`: + ``` + if name == 'transform_testtable': + return TransformationType['transform_testtable'].value + ``` + Under `class TransformationType`: + ``` + transform_testtable = Transformations( + f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/TestTable", + "transform_testtable", + f"testtable-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}" + ) + ``` + +1. Time to deploy! In the terminal, run the following command: + ``` + python3 main.py transform_testtable + ``` + + Once successfully deployed, the glue job will start running automatically. You should see a similar output: + ``` + Successfully created/updated stack - TestTableStack in + + Running the Glue Job... + { + "JobRunId": "jr_1ddbec18fb4a0ccd58223d141751bad8491710c15a8c067b91a29a374c8e3434" + } + ``` + +1. Navigate to AWS Glue on the AWS console. Click on 'ETL jobs', and click on the transformation deployed by this stack 'TransformTestTable' + +1. It may take a few minutes for the job to successfully run. But once successful, navigate to DynamoDB on the console, and view the items in the TestTable. You should see that 'birthday' has been renamed to 'dateOfBirth', and a new column 'fullName' was added. + + +You're all set to continue using this pattern for other Glue scripts! + +## Cleanup + +1. Delete the TestTable + ``` + aws dynamodb delete-table \ + --table-name TestTable + ``` +1. Delete the stack + ``` + sam delete --stack-name TestTableStack + ``` +---- +Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/sam-dynamodb-glue-transform/example-pattern.json b/sam-dynamodb-glue-transform/example-pattern.json new file mode 100644 index 000000000..1d9c18069 --- /dev/null +++ b/sam-dynamodb-glue-transform/example-pattern.json @@ -0,0 +1,58 @@ +{ + "title": "Serverless ETL Pattern with AWS Glue and DynamoDB ", + "description": "Effortlessly deploy Glue scripts with AWS SAM in this streamlined serverless pattern", + "language": "YAML", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This AWS serverless pattern utilizes the Serverless Applicton Model (SAM) to deploy Glue scripts seamlessly, offering an efficient solution for managing multiple transformations.", + "Users can focus on crafting their Glue scripts without worrying about the underlying infrastructure. This pattern will automatically configure and deploy the correct resources and permissions tailored to the user-specified data,", + "such as the DynamoDB table where transformations will be performed and the associated script name. This approach provides users with a streamlined process to deploy their Glue scripts" + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sam-dynamodb-glue-transform", + "templateURL": "serverless-patterns/sam-dynamodb-glue-transform", + "projectFolder": "sam-dynamodb-glue-transform/src", + "templateFile": "sam-dynamodb-glue-transform/src/template.yml" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS Serverless Application Model", + "link": "https://aws.amazon.com/serverless/sam/" + }, + { + "text": "AWS Glue", + "link": "https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html" + } + ] + }, + "deploy": { + "text": [ + "sam deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Mia Alarcon Chong", + "image": "", + "bio": "AWS Cloud Application Developer", + "linkedin": "mialarcon" + } + ] +} diff --git a/sam-dynamodb-glue-transform/src/__init__.py b/sam-dynamodb-glue-transform/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sam-dynamodb-glue-transform/src/data/__init.__.py b/sam-dynamodb-glue-transform/src/data/__init.__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sam-dynamodb-glue-transform/src/data/etl_data.py b/sam-dynamodb-glue-transform/src/data/etl_data.py new file mode 100644 index 000000000..a32e820e3 --- /dev/null +++ b/sam-dynamodb-glue-transform/src/data/etl_data.py @@ -0,0 +1,42 @@ +import os +from enum import Enum + + +def getTransformationByName(name): + if name == 'transform_testtable': + return TransformationType['transform_testtable'].value + + #if another transformation needs to be added, add an additional entry for it here, liek the sample below + elif name == 'ADDITIONAL_SAMPLE_TRANSFORM': + return TransformationType['ADDITIONAL_SAMPLE_TRANSFORM'].value + +class Transformations: + def __init__(self, source_table_arn, transformation_name, s3_bucket_name): + self.source_table_arn = source_table_arn + self.transformation_name = transformation_name + self.s3_bucket_name = s3_bucket_name + + def getSourceTableArn(self): + return self.source_table_arn + + def getTransformationName(self): + return self.transformation_name + + def getS3BucketName(self): + return self.s3_bucket_name + + +class TransformationType(Enum): + transform_testtable = Transformations( + f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/TestTable", + "transform_testtable", + f"testtable-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}" + ) + + # if another transformation needs to be added, add the required data here + + ADDITIONAL_SAMPLE_TRANSFORM = Transformations( + f"arn:aws:dynamodb:{os.getenv('AWS_DEFAULT_REGION')}:{os.getenv('ACCOUNT')}:table/", # table ARN where transformations will be done + "script_name", # script/transformation name, this script should be found under the glue_jobs folder + f"-aws-glue-assets-{os.getenv('ACCOUNT')}-{os.getenv('AWS_DEFAULT_REGION')}" # glue assets s3 bucket for the table + ) \ No newline at end of file diff --git a/sam-dynamodb-glue-transform/src/glue_jobs/transform_testtable.py b/sam-dynamodb-glue-transform/src/glue_jobs/transform_testtable.py new file mode 100644 index 000000000..faadabf07 --- /dev/null +++ b/sam-dynamodb-glue-transform/src/glue_jobs/transform_testtable.py @@ -0,0 +1,96 @@ +import sys +from awsglue.transforms import * +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +from awsglue.utils import getResolvedOptions +from awsglue.dynamicframe import DynamicFrame +from awsglue.dynamicframe import DynamicFrameCollection +from pyspark.sql import SparkSession +from awsglue.transforms import DropNullFields +import pyspark.sql.functions as f + +spark = SparkSession.builder.getOrCreate() +args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_NAME", "SOURCE_TABLE_ARN"]) +glueContext = GlueContext(SparkContext.getOrCreate()) +job = Job(glueContext) +job.init(args["JOB_NAME"], args) +bucket_name = args["BUCKET_NAME"] +source_table_arn = args["SOURCE_TABLE_ARN"] + +config = { + "sourceTableARN": source_table_arn, + "targetTableName": "TestTable", + # below s3 bucket should be present + "s3BucketName": bucket_name, + # if below key path does not exist it will be created automatically given bucket exists + "s3KeyName": "temporary/ddbexport/table/testtable" +} + +# Script generated for node Custom Transform +def MyTransform(glueContext, dfc) -> DynamicFrameCollection: + selected_dynf = dfc.select(list(dfc.keys())[0]) + selected_df = selected_dynf.toDF() + print("Original DataFrame") + selected_df.show(n=5, vertical=True, truncate=False) + + # example transformations + + # Add new attribute "fullName" using the existing firstName and lastName values for each item + selected_df = selected_df.withColumn( + "fullName", f.when( + (f.col("firstName").isNotNull() & f.col("lastName").isNotNull()), + f.concat_ws(" ", f.col("firstName"), f.col("lastName")), + ) + ) + + # Renaming the column "birthday" to "dateOfBirth" for each item + selected_df = selected_df.withColumn( + "dateOfBirth", f.when( + f.col("birthday").isNotNull(), + f.col("birthday") + ) + ) + + selected_df = selected_df.drop(f.col("birthday")) # dropping because new renamed column was created + + modified_dynf = DynamicFrame.fromDF(selected_df, glueContext, "modified_dynf") + modified_dynf = DropNullFields.apply(modified_dynf) + + print("Modified DataFrame") + modified_dynf.toDF().show(n=5, vertical=True, truncate=False) + return modified_dynf + +source = glueContext.create_dynamic_frame.from_options( + connection_type="dynamodb", + connection_options={ + "dynamodb.export": "ddb", + "dynamodb.s3.bucket": config["s3BucketName"], + "dynamodb.s3.prefix": config["s3KeyName"], + "dynamodb.tableArn": config["sourceTableARN"], + "dynamodb.unnestDDBJson": True, + }, + transformation_ctx="source", +) +source.printSchema() + +df = source.toDF() +df.show() + +# Script generated for node Custom Transform +transformation0 = MyTransform( + glueContext, + DynamicFrameCollection({"source": source}, glueContext), +) + +glueContext.write_dynamic_frame_from_options( + frame=transformation0, + connection_type="dynamodb", + connection_options={ + "dynamodb.output.tableName": config["targetTableName"], + "dynamodb.output.retry": "50", + "dynamodb.throughput.write.percent": "1.0" + } +) + +job.commit() \ No newline at end of file diff --git a/sam-dynamodb-glue-transform/src/main.py b/sam-dynamodb-glue-transform/src/main.py new file mode 100644 index 000000000..907ffdbb2 --- /dev/null +++ b/sam-dynamodb-glue-transform/src/main.py @@ -0,0 +1,64 @@ +import data.etl_data +import toml +import sys +import re +import subprocess +import os + + +def update_param_overrides(s, key, v): + pattern = rf'{key}="(.*?)"' + replacement = f'{key}="{v}"' + return re.sub(pattern, replacement, s) + +# this might not be needed +def update_template(s, key, v): + pattern = rf'{key}: "(.*?)"' + replacement = f'{key}: "{v}"' + return re.sub(pattern, replacement, s) + +if __name__ == """__main__""": + if(len(sys.argv) < 2): + print("Please pass in transformation name") + exit() + + transform_config = data.etl_data.getTransformationByName(sys.argv[1]) + job_name = transform_config.getTransformationName().split("_")[0].capitalize() + transform_config.getSourceTableArn().split("/")[1] + + with open('samconfig.toml', 'r') as file: + config = toml.load(file) + param_overrides = config['default']['deploy']['parameters']["parameter_overrides"] + + param_overrides = update_param_overrides(param_overrides, "TransformTaskName", transform_config.getTransformationName()) + param_overrides = update_param_overrides(param_overrides, "S3BucketName", transform_config.getS3BucketName()) + param_overrides = update_param_overrides(param_overrides, "SourceTableARN", transform_config.getSourceTableArn()) + param_overrides = update_param_overrides(param_overrides, "JobName", job_name) + + + config['default']['deploy']['parameters']['parameter_overrides'] = param_overrides + + stack_name = transform_config.getSourceTableArn().split("/")[1] + "Stack" + config['default']['deploy']['parameters']['stack_name'] = stack_name + config['default']['deploy']['parameters']['s3_prefix'] = stack_name + + print(config) + + with open('samconfig.toml', 'w') as file: + toml.dump(config, file) + + script_location = "glue_jobs/" + transform_config.getTransformationName() + ".py" + + with open("template.yml", 'r') as file: + content = file.read() + + updated_content = update_template(content, "ScriptLocation", script_location) + + with open("template.yml", 'w') as file: + file.write(updated_content) + + curr_dir = os.getcwd() + os.chdir("./scripts") + subprocess.run(["./batch-orchestration-script.sh"]) + subprocess.run(["./run-job.sh", job_name]) + + os.chdir(curr_dir) \ No newline at end of file diff --git a/sam-dynamodb-glue-transform/src/scripts/batch-orchestration-script.sh b/sam-dynamodb-glue-transform/src/scripts/batch-orchestration-script.sh new file mode 100644 index 000000000..51fe26508 --- /dev/null +++ b/sam-dynamodb-glue-transform/src/scripts/batch-orchestration-script.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +bucket_name="aws-sam-cli-samclisourcebucket-${ACCOUNT}-${AWS_DEFAULT_REGION}" + +BUCKET_EXISTS=$(aws s3api head-bucket --bucket ${bucket_name} 2>&1 || true) +if [ -z "$BUCKET_EXISTS" ]; then + echo "Bucket exists" +else + aws s3api create-bucket --bucket ${bucket_name} --region "${AWS_DEFAULT_REGION}" --create-bucket-configuration LocationConstraint="${AWS_DEFAULT_REGION}" + echo "Bucket created: ${bucket_name}" +fi + +curr_path="${PWD}" + +cd .. + +sam build + +sam deploy --s3-bucket ${bucket_name} + +cd ${curr_path} + +wait \ No newline at end of file diff --git a/sam-dynamodb-glue-transform/src/scripts/run-job.sh b/sam-dynamodb-glue-transform/src/scripts/run-job.sh new file mode 100644 index 000000000..f6fbaedc9 --- /dev/null +++ b/sam-dynamodb-glue-transform/src/scripts/run-job.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +JOB_STATUS=$(aws glue get-job-runs --job-name ${1} --query 'JobRuns[0].JobRunState' --output text) + +if [[ ${JOB_STATUS} != "RUNNING" && ${JOB_STATUS} != "STARTING" ]]; then + echo "Running the Glue Job..." + aws glue start-job-run --job-name ${1} \ + --arguments='--enable-metrics=true,--enable-job-insights=true,--enable-continuous-cloudwatch-log=true,--enable-auto-scaling=true' +else + echo "The Glue job is either running or starting." +fi \ No newline at end of file diff --git a/sam-dynamodb-glue-transform/src/template.yml b/sam-dynamodb-glue-transform/src/template.yml new file mode 100644 index 000000000..6f3f4516f --- /dev/null +++ b/sam-dynamodb-glue-transform/src/template.yml @@ -0,0 +1,125 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Glue transform template + +Parameters: + WorkerType: + Type: String + Default: 'G.2X' + Description: > + The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, or G.2X. + NumberOfWorkers: + Type: Number + Default: 145 + Description: > + The number of workers of a defined workerType that are allocated when a job runs. + JobName: + Type: String + Description: > + Name of the Glue Job + TransformTaskName: + Type: String + Description: > + Name of the transform task file to import + S3BucketName: + Type: String + ConstraintDescription: '[a-z0-9\-]+' + Default: 'exportedtables' + Description: > + The bucket name for the exported tables. + SourceTableARN: + Type: String + Description: > + Source Dynamo DB table ARN + +Resources: + S3Bucket: + DeletionPolicy: Retain + Type: AWS::S3::Bucket + Properties: + BucketName: !Ref S3BucketName + + # Glue Job for initial load + ImportLoadJob: + Type: AWS::Glue::Job + Properties: + Role: !Ref GlueJobRole + Description: Job created with CloudFormation + DefaultArguments: + "--WORKER_TYPE": !Ref WorkerType + "--NUM_WORKERS": !Ref NumberOfWorkers + "--BUCKET_NAME": !Ref S3BucketName + "--SOURCE_TABLE_ARN": !Ref SourceTableARN + Command: + Name: glueetl + PythonVersion: 3 + ScriptLocation: "" + NumberOfWorkers: !Ref NumberOfWorkers + WorkerType: !Ref WorkerType + GlueVersion: 4.0 + ExecutionProperty: + MaxConcurrentRuns: 1 + Name: !Ref JobName + + GlueJobRole: + Type: AWS::IAM::Role + Properties: + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: "Allow" + Principal: + Service: + - "glue.amazonaws.com" + Action: + - "sts:AssumeRole" + Path: "/" + Policies: + - PolicyName: "DynamoDB" + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: "Allow" + Action: + - dynamodb:BatchGetItem + - dynamodb:DescribeTable + - dynamodb:GetItem + - dynamodb:Scan + - dynamodb:Query + - dynamodb:BatchWriteItem + - dynamodb:PutItem + - dynamodb:DeleteItem + - dynamodb:UpdateItem + - dynamodb:ExportTableToPointInTime + - dynamodb:ListTables + Resource: + - !Ref SourceTableARN + - Effect: "Allow" + Action: + - dynamodb:DescribeExport + Resource: + - !Join + - '' + - - !Ref SourceTableARN + - '*' + + - PolicyName: "GlueS3BucketAccess" + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: "Allow" + Action: + - s3:Get* + - s3:Put* + - s3:Delete* + - s3:List* + Resource: + - "arn:aws:s3:::aws-sam-cli-samclisourcebucket-*/*" + - !Join + - '' + - - 'arn:aws:s3:::' + - !Ref S3BucketName + - '*' \ No newline at end of file