Skip to content

Commit

Permalink
New EMR Serverless with SFN example (#55)
Browse files Browse the repository at this point in the history
* new emr serverless with sfn example
* adding synchronous start job run example
  • Loading branch information
meniluca authored Nov 17, 2023
1 parent 8cfc628 commit 30ea478
Show file tree
Hide file tree
Showing 19 changed files with 1,664 additions and 1 deletion.
4 changes: 3 additions & 1 deletion cdk/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# EMR Serverless CDK Examples

- (external) [EMR Serverless with Delta Lake](https://github.com/HsiehShuJeng/cdk-emrserverless-with-delta-lake)
- [EMR Serverless with Amazon MWAA](./emr-serverless-with-mwaa/README.md)
- [EMR Serverless with Amazon MWAA](./emr-serverless-with-mwaa/README.md)
- [EMR Serverless with Step Function](./emr-serverless-with-sfn/README.md)

11 changes: 11 additions & 0 deletions cdk/emr-serverless-with-sfn/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
*.swp
package-lock.json
__pycache__
.pytest_cache
.venv
*.egg-info
.idea

# CDK asset staging directory
.cdk.staging
cdk.out
97 changes: 97 additions & 0 deletions cdk/emr-serverless-with-sfn/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@

# EMR Serverless with Step Functions

This is a CDK Python project that deploys Step Function State Machines to run EMR Serverless jobs. An EMR Serverless
application is created with two example jobs. To submit these jobs, the State Machines can implement a synchronous or an
asynchronous mechanism. Both examples are described below in details.

## Getting Started

- Install [CDK v2](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html)
- Activate the Python virtualenv and install dependencies

```
source .venv/bin/activate
pip install -r requirements.txt
pip install -r requirements-dev.txt
```

Once you've got CDK and Python setup, [bootstrap the CDK app](https://docs.aws.amazon.com/cdk/v2/guide/bootstrapping.html)
and deploy the Stacks:

```
cdk bootstrap
cdk deploy --all
```

The stack that's created by [EMRServerlessStack](./stacks/emr_serverless.py) uses [pre-initialized capacity](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/application-capacity.html)
so that Spark jobs can start instantly, but note that this can result in additional cost as resources are maintained for
a certain period of time after jobs finish their runs.

## Run the State Machine

The stack creates a main State Machine to submit EMR Serverless jobs by means of other State Machines (in
the example there are 2 jobs, a "reader" and a "writer"). One of these State Machines (the "writer") submit the job
asynchronously, which loops between a `check` and a `wait` state every 30 seconds until the job is completed.
It means that the function to submit the job is not blocking and the loop can be extended with other operations
to run without waiting for the job completion.

- [./stacks/sfn.py](./stacks/sfn.py): the main State Machine
- [./stacks/emr_serverless_sm.py](./stacks/emr_serverless_sm.py): the State Machine to submit a EMR Serverless job (notice
the asynchronous flag)

To submit the jobs it's necessary to navigate to the [Step Function](https://console.aws.amazon.com/states/) console,
open the "MainStateMachine_dev" state machine and click on "Start Execution" to trigger it's execution (with default
input).

## Spark jobs

In this CDK example, a Spark job writes some sample data into the default S3 bucket created for the EMR Studio assets, while another
jobs reads it back (folder name is `data/`). The PySpark code is uploaded into the same bucket under the `jobs/` folder.
The PySpark example jobs:

- [./assets/jobs/pyspark-reader-example.py](./assets/jobs/pyspark-reader-example.py)
- [./assets/jobs/pyspark-writer-example.py](./assets/jobs/pyspark-writer-example.py)

## CloudWatch dashboards

The CDK Stack in [./stacks/emr_serverless.py](./stacks/emr_serverless.py) creates [CloudWatch
dashboards](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Dashboards.html) to monitor the
EMR Serverless Application resources deployed by this project. These dashboards are taken from the
[emr-serverless-with-mwaa](../emr-serverless-with-mwaa/README.md) CDK application which is available within the same
aws-sample repository as this project.

## Environments

The solution is parameterized so that multiple deployments of the same stacks can co-exist in the same account under
different names. That can be achieved by setting the `NAMESPACE` environment variable. By default, the NAMESPACE is set
to `dev`, which configures resources to have "dev" in their name (usually as suffix).

Setting or exporting the NAMESPACE variable will deploy resources with a different name. For instance, in this case the
main State Machine will be called "MainStateMachine_test":

```
NAMESPACE=test cdk deploy --all
```

This is useful when testing new features in parallel. Attention: deploying the stacks with different namespaces will
create multiple resources (duplicated, if unchanged). This will generate extra cost, use this feature with caution.

The same works when setting `AWS_REGION` for deploying the solution to a different region.

```
AWS_REGION=eu-west-1 NAMESPACE=test cdk destroy --all
```

## Cleaning up

When you are finished with testing, clean up resources to avoid future charges.

```
cdk destroy --all # to destroy resources deployed in the dev namespace
NAMESPACE=test cdk destroy --all # if the resources are deployed in another namespace
AWS_REGION=eu-west-1 NAMESPACE=test cdk destroy --all # if the resources are deployed in another region and namespace
```

Namespaces `int` and `prod` are "protected", so that the S3 bucket for the
[EMR Studio assets](./stacks/emr_studio.py#L43) cannot be automatically deleted and must be destroyed manually.
49 changes: 49 additions & 0 deletions cdk/emr-serverless-with-sfn/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import os

from stacks.vpc import VPCStack
from stacks.emr_studio import EMRStudioStack
from stacks.emr_serverless import EMRServerlessStack
from stacks.sfn import SfnEmrServerlessJobsStack

import aws_cdk as cdk

app = cdk.App()

# Defining environment and namespace for the stacks
env = cdk.Environment(
account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
)
namespace = os.getenv("NAMESPACE", "dev") # to allow multiple deployments in the same account

vpc = VPCStack(
scope=app, construct_id=f"VPCStack-{namespace}", namespace=namespace, env=env
)

emr_serverless = EMRServerlessStack(
scope=app,
construct_id=f"EMRServerless-{namespace}",
vpc=vpc.vpc,
namespace=namespace,
env=env,
)

emr_studio = EMRStudioStack(
scope=app,
construct_id=f"EMRStudio-{namespace}",
vpc=vpc.vpc,
namespace=namespace,
env=env,
)

SfnEmrServerlessJobsStack(
scope=app,
construct_id=f"SfnStack-{namespace}",
emr_serverless_app_id=emr_serverless.serverless_app.attr_application_id,
emr_serverless_app_arn=emr_serverless.serverless_app.attr_arn,
bucket=emr_studio.bucket,
namespace=namespace,
env=env,
)

app.synth()
18 changes: 18 additions & 0 deletions cdk/emr-serverless-with-sfn/assets/jobs/pyspark-reader-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
This is an example of how to read data from S3 using Spark.
"""

from pyspark.sql import SparkSession

import sys

# parsing first argument to get the input bucket name and path (e.g. s3://my-bucket/my-path)
input_path = sys.argv[1]

spark = SparkSession.builder.appName("ReaderExample").enableHiveSupport().getOrCreate()

df = spark.read.parquet(input_path)

df.printSchema()
df.show(20, False)

19 changes: 19 additions & 0 deletions cdk/emr-serverless-with-sfn/assets/jobs/pyspark-writer-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
This is an example of how to write data to S3 using Spark.
"""

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import sys

# parsing first argument to get the output bucket name and path (e.g. s3://my-bucket/my-path)
output_path = sys.argv[1]

spark = SparkSession.builder.appName("WriterExample").enableHiveSupport().getOrCreate()

data = spark.range(100) \
.withColumn("random_uuid", F.expr('replace(uuid(), "-", "")')) \
.withColumn("random_double", F.rand())

data.repartition(1).write.mode("overwrite").parquet(output_path)
55 changes: 55 additions & 0 deletions cdk/emr-serverless-with-sfn/cdk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"app": "python3 app.py",
"watch": {
"include": [
"**"
],
"exclude": [
"README.md",
"cdk*.json",
"requirements*.txt",
"source.bat",
"**/__init__.py",
"python/__pycache__",
"tests"
]
},
"context": {
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/core:target-partitions": [
"aws",
"aws-cn"
],
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
"@aws-cdk/aws-iam:minimizePolicies": true,
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
"@aws-cdk/core:enablePartitionLiterals": true,
"@aws-cdk/aws-events:eventsTargetQueueSameAccount": true,
"@aws-cdk/aws-iam:standardizedServicePrincipals": true,
"@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true,
"@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true,
"@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true,
"@aws-cdk/aws-route53-patters:useCertificate": true,
"@aws-cdk/customresources:installLatestAwsSdkDefault": false,
"@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true,
"@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true,
"@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true,
"@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true,
"@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true,
"@aws-cdk/aws-redshift:columnId": true,
"@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true,
"@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true,
"@aws-cdk/aws-apigateway:requestValidatorUniqueId": true,
"@aws-cdk/aws-kms:aliasNameRef": true,
"@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true,
"@aws-cdk/core:includePrefixInUniqueNameGeneration": true,
"@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true
}
}
2 changes: 2 additions & 0 deletions cdk/emr-serverless-with-sfn/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pytest==6.2.5
pyspark==3.4.1
2 changes: 2 additions & 0 deletions cdk/emr-serverless-with-sfn/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
aws-cdk-lib==2.89.0
constructs>=10.0.0,<11.0.0
13 changes: 13 additions & 0 deletions cdk/emr-serverless-with-sfn/source.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
@echo off

rem The sole purpose of this script is to make the command
rem
rem source .venv/bin/activate
rem
rem (which activates a Python virtualenv on Linux or Mac OS X) work on Windows.
rem On Windows, this command just runs this batch file (the argument is ignored).
rem
rem Now we don't need to document a Windows command for activating a virtualenv.

echo Executing .venv\Scripts\activate.bat for you
.venv\Scripts\activate.bat
Empty file.
Loading

0 comments on commit 30ea478

Please sign in to comment.