From 30ea4781ad93b4511968745136fbb357e1ffcee4 Mon Sep 17 00:00:00 2001 From: Luca Menichetti Date: Fri, 17 Nov 2023 21:57:54 +0100 Subject: [PATCH] New EMR Serverless with SFN example (#55) * new emr serverless with sfn example * adding synchronous start job run example --- cdk/README.md | 4 +- cdk/emr-serverless-with-sfn/.gitignore | 11 + cdk/emr-serverless-with-sfn/README.md | 97 +++ cdk/emr-serverless-with-sfn/app.py | 49 ++ .../assets/jobs/pyspark-reader-example.py | 18 + .../assets/jobs/pyspark-writer-example.py | 19 + cdk/emr-serverless-with-sfn/cdk.json | 55 ++ .../requirements-dev.txt | 2 + cdk/emr-serverless-with-sfn/requirements.txt | 2 + cdk/emr-serverless-with-sfn/source.bat | 13 + .../stacks/__init__.py | 0 .../stacks/emr_serverless.py | 619 ++++++++++++++++++ .../stacks/emr_serverless_sm.py | 196 ++++++ .../stacks/emr_studio.py | 344 ++++++++++ cdk/emr-serverless-with-sfn/stacks/sfn.py | 204 ++++++ cdk/emr-serverless-with-sfn/stacks/vpc.py | 17 + cdk/emr-serverless-with-sfn/tests/__init__.py | 0 .../tests/unit/__init__.py | 0 .../test_emr_serverless_with_sfn_stack.py | 15 + 19 files changed, 1664 insertions(+), 1 deletion(-) create mode 100644 cdk/emr-serverless-with-sfn/.gitignore create mode 100644 cdk/emr-serverless-with-sfn/README.md create mode 100644 cdk/emr-serverless-with-sfn/app.py create mode 100644 cdk/emr-serverless-with-sfn/assets/jobs/pyspark-reader-example.py create mode 100644 cdk/emr-serverless-with-sfn/assets/jobs/pyspark-writer-example.py create mode 100644 cdk/emr-serverless-with-sfn/cdk.json create mode 100644 cdk/emr-serverless-with-sfn/requirements-dev.txt create mode 100644 cdk/emr-serverless-with-sfn/requirements.txt create mode 100644 cdk/emr-serverless-with-sfn/source.bat create mode 100644 cdk/emr-serverless-with-sfn/stacks/__init__.py create mode 100644 cdk/emr-serverless-with-sfn/stacks/emr_serverless.py create mode 100644 cdk/emr-serverless-with-sfn/stacks/emr_serverless_sm.py create mode 100644 cdk/emr-serverless-with-sfn/stacks/emr_studio.py create mode 100644 cdk/emr-serverless-with-sfn/stacks/sfn.py create mode 100644 cdk/emr-serverless-with-sfn/stacks/vpc.py create mode 100644 cdk/emr-serverless-with-sfn/tests/__init__.py create mode 100644 cdk/emr-serverless-with-sfn/tests/unit/__init__.py create mode 100644 cdk/emr-serverless-with-sfn/tests/unit/test_emr_serverless_with_sfn_stack.py diff --git a/cdk/README.md b/cdk/README.md index e0b8155..ffea566 100644 --- a/cdk/README.md +++ b/cdk/README.md @@ -1,4 +1,6 @@ # EMR Serverless CDK Examples - (external) [EMR Serverless with Delta Lake](https://github.com/HsiehShuJeng/cdk-emrserverless-with-delta-lake) -- [EMR Serverless with Amazon MWAA](./emr-serverless-with-mwaa/README.md) \ No newline at end of file +- [EMR Serverless with Amazon MWAA](./emr-serverless-with-mwaa/README.md) +- [EMR Serverless with Step Function](./emr-serverless-with-sfn/README.md) + diff --git a/cdk/emr-serverless-with-sfn/.gitignore b/cdk/emr-serverless-with-sfn/.gitignore new file mode 100644 index 0000000..f85a0a0 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/.gitignore @@ -0,0 +1,11 @@ +*.swp +package-lock.json +__pycache__ +.pytest_cache +.venv +*.egg-info +.idea + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/cdk/emr-serverless-with-sfn/README.md b/cdk/emr-serverless-with-sfn/README.md new file mode 100644 index 0000000..c6af9ff --- /dev/null +++ b/cdk/emr-serverless-with-sfn/README.md @@ -0,0 +1,97 @@ + +# EMR Serverless with Step Functions + +This is a CDK Python project that deploys Step Function State Machines to run EMR Serverless jobs. An EMR Serverless +application is created with two example jobs. To submit these jobs, the State Machines can implement a synchronous or an +asynchronous mechanism. Both examples are described below in details. + +## Getting Started + +- Install [CDK v2](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) +- Activate the Python virtualenv and install dependencies + +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install -r requirements-dev.txt +``` + +Once you've got CDK and Python setup, [bootstrap the CDK app](https://docs.aws.amazon.com/cdk/v2/guide/bootstrapping.html) +and deploy the Stacks: + +``` +cdk bootstrap +cdk deploy --all +``` + +The stack that's created by [EMRServerlessStack](./stacks/emr_serverless.py) uses [pre-initialized capacity](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/application-capacity.html) +so that Spark jobs can start instantly, but note that this can result in additional cost as resources are maintained for +a certain period of time after jobs finish their runs. + +## Run the State Machine + +The stack creates a main State Machine to submit EMR Serverless jobs by means of other State Machines (in +the example there are 2 jobs, a "reader" and a "writer"). One of these State Machines (the "writer") submit the job +asynchronously, which loops between a `check` and a `wait` state every 30 seconds until the job is completed. +It means that the function to submit the job is not blocking and the loop can be extended with other operations +to run without waiting for the job completion. + +- [./stacks/sfn.py](./stacks/sfn.py): the main State Machine +- [./stacks/emr_serverless_sm.py](./stacks/emr_serverless_sm.py): the State Machine to submit a EMR Serverless job (notice +the asynchronous flag) + +To submit the jobs it's necessary to navigate to the [Step Function](https://console.aws.amazon.com/states/) console, +open the "MainStateMachine_dev" state machine and click on "Start Execution" to trigger it's execution (with default +input). + +## Spark jobs + +In this CDK example, a Spark job writes some sample data into the default S3 bucket created for the EMR Studio assets, while another +jobs reads it back (folder name is `data/`). The PySpark code is uploaded into the same bucket under the `jobs/` folder. +The PySpark example jobs: + +- [./assets/jobs/pyspark-reader-example.py](./assets/jobs/pyspark-reader-example.py) +- [./assets/jobs/pyspark-writer-example.py](./assets/jobs/pyspark-writer-example.py) + +## CloudWatch dashboards + +The CDK Stack in [./stacks/emr_serverless.py](./stacks/emr_serverless.py) creates [CloudWatch +dashboards](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Dashboards.html) to monitor the +EMR Serverless Application resources deployed by this project. These dashboards are taken from the +[emr-serverless-with-mwaa](../emr-serverless-with-mwaa/README.md) CDK application which is available within the same +aws-sample repository as this project. + +## Environments + +The solution is parameterized so that multiple deployments of the same stacks can co-exist in the same account under +different names. That can be achieved by setting the `NAMESPACE` environment variable. By default, the NAMESPACE is set +to `dev`, which configures resources to have "dev" in their name (usually as suffix). + +Setting or exporting the NAMESPACE variable will deploy resources with a different name. For instance, in this case the +main State Machine will be called "MainStateMachine_test": + +``` +NAMESPACE=test cdk deploy --all +``` + +This is useful when testing new features in parallel. Attention: deploying the stacks with different namespaces will +create multiple resources (duplicated, if unchanged). This will generate extra cost, use this feature with caution. + +The same works when setting `AWS_REGION` for deploying the solution to a different region. + +``` +AWS_REGION=eu-west-1 NAMESPACE=test cdk destroy --all +``` + +## Cleaning up + +When you are finished with testing, clean up resources to avoid future charges. + +``` +cdk destroy --all # to destroy resources deployed in the dev namespace +NAMESPACE=test cdk destroy --all # if the resources are deployed in another namespace +AWS_REGION=eu-west-1 NAMESPACE=test cdk destroy --all # if the resources are deployed in another region and namespace +``` + +Namespaces `int` and `prod` are "protected", so that the S3 bucket for the +[EMR Studio assets](./stacks/emr_studio.py#L43) cannot be automatically deleted and must be destroyed manually. diff --git a/cdk/emr-serverless-with-sfn/app.py b/cdk/emr-serverless-with-sfn/app.py new file mode 100644 index 0000000..b3187fc --- /dev/null +++ b/cdk/emr-serverless-with-sfn/app.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +import os + +from stacks.vpc import VPCStack +from stacks.emr_studio import EMRStudioStack +from stacks.emr_serverless import EMRServerlessStack +from stacks.sfn import SfnEmrServerlessJobsStack + +import aws_cdk as cdk + +app = cdk.App() + +# Defining environment and namespace for the stacks +env = cdk.Environment( + account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION") + ) +namespace = os.getenv("NAMESPACE", "dev") # to allow multiple deployments in the same account + +vpc = VPCStack( + scope=app, construct_id=f"VPCStack-{namespace}", namespace=namespace, env=env +) + +emr_serverless = EMRServerlessStack( + scope=app, + construct_id=f"EMRServerless-{namespace}", + vpc=vpc.vpc, + namespace=namespace, + env=env, +) + +emr_studio = EMRStudioStack( + scope=app, + construct_id=f"EMRStudio-{namespace}", + vpc=vpc.vpc, + namespace=namespace, + env=env, +) + +SfnEmrServerlessJobsStack( + scope=app, + construct_id=f"SfnStack-{namespace}", + emr_serverless_app_id=emr_serverless.serverless_app.attr_application_id, + emr_serverless_app_arn=emr_serverless.serverless_app.attr_arn, + bucket=emr_studio.bucket, + namespace=namespace, + env=env, +) + +app.synth() diff --git a/cdk/emr-serverless-with-sfn/assets/jobs/pyspark-reader-example.py b/cdk/emr-serverless-with-sfn/assets/jobs/pyspark-reader-example.py new file mode 100644 index 0000000..ded7ffc --- /dev/null +++ b/cdk/emr-serverless-with-sfn/assets/jobs/pyspark-reader-example.py @@ -0,0 +1,18 @@ +""" +This is an example of how to read data from S3 using Spark. +""" + +from pyspark.sql import SparkSession + +import sys + +# parsing first argument to get the input bucket name and path (e.g. s3://my-bucket/my-path) +input_path = sys.argv[1] + +spark = SparkSession.builder.appName("ReaderExample").enableHiveSupport().getOrCreate() + +df = spark.read.parquet(input_path) + +df.printSchema() +df.show(20, False) + diff --git a/cdk/emr-serverless-with-sfn/assets/jobs/pyspark-writer-example.py b/cdk/emr-serverless-with-sfn/assets/jobs/pyspark-writer-example.py new file mode 100644 index 0000000..2edb0ad --- /dev/null +++ b/cdk/emr-serverless-with-sfn/assets/jobs/pyspark-writer-example.py @@ -0,0 +1,19 @@ +""" +This is an example of how to write data to S3 using Spark. +""" + +from pyspark.sql import SparkSession +import pyspark.sql.functions as F + +import sys + +# parsing first argument to get the output bucket name and path (e.g. s3://my-bucket/my-path) +output_path = sys.argv[1] + +spark = SparkSession.builder.appName("WriterExample").enableHiveSupport().getOrCreate() + +data = spark.range(100) \ + .withColumn("random_uuid", F.expr('replace(uuid(), "-", "")')) \ + .withColumn("random_double", F.rand()) + +data.repartition(1).write.mode("overwrite").parquet(output_path) diff --git a/cdk/emr-serverless-with-sfn/cdk.json b/cdk/emr-serverless-with-sfn/cdk.json new file mode 100644 index 0000000..aaa52df --- /dev/null +++ b/cdk/emr-serverless-with-sfn/cdk.json @@ -0,0 +1,55 @@ +{ + "app": "python3 app.py", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "requirements*.txt", + "source.bat", + "**/__init__.py", + "python/__pycache__", + "tests" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-iam:standardizedServicePrincipals": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true + } +} diff --git a/cdk/emr-serverless-with-sfn/requirements-dev.txt b/cdk/emr-serverless-with-sfn/requirements-dev.txt new file mode 100644 index 0000000..02cca51 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest==6.2.5 +pyspark==3.4.1 diff --git a/cdk/emr-serverless-with-sfn/requirements.txt b/cdk/emr-serverless-with-sfn/requirements.txt new file mode 100644 index 0000000..3260868 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/requirements.txt @@ -0,0 +1,2 @@ +aws-cdk-lib==2.89.0 +constructs>=10.0.0,<11.0.0 diff --git a/cdk/emr-serverless-with-sfn/source.bat b/cdk/emr-serverless-with-sfn/source.bat new file mode 100644 index 0000000..9e1a834 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/source.bat @@ -0,0 +1,13 @@ +@echo off + +rem The sole purpose of this script is to make the command +rem +rem source .venv/bin/activate +rem +rem (which activates a Python virtualenv on Linux or Mac OS X) work on Windows. +rem On Windows, this command just runs this batch file (the argument is ignored). +rem +rem Now we don't need to document a Windows command for activating a virtualenv. + +echo Executing .venv\Scripts\activate.bat for you +.venv\Scripts\activate.bat diff --git a/cdk/emr-serverless-with-sfn/stacks/__init__.py b/cdk/emr-serverless-with-sfn/stacks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cdk/emr-serverless-with-sfn/stacks/emr_serverless.py b/cdk/emr-serverless-with-sfn/stacks/emr_serverless.py new file mode 100644 index 0000000..7b4835e --- /dev/null +++ b/cdk/emr-serverless-with-sfn/stacks/emr_serverless.py @@ -0,0 +1,619 @@ +import aws_cdk as cdk +from aws_cdk import CfnOutput, Duration, Stack +from aws_cdk import aws_cloudwatch as cw +from aws_cdk import aws_ec2 as ec2 +from aws_cdk import aws_emrserverless as emrs +from constructs import Construct + + +class EMRServerlessStack(Stack): + serverless_app: emrs.CfnApplication + + def __init__( + self, + scope: Construct, + construct_id: str, + vpc: ec2.IVpc, + namespace: str, + **kwargs, + ) -> None: + super().__init__(scope, construct_id, **kwargs) + + # Create an EMR 6.11.0 Spark application in a VPC with pre-initialized capacity + self.serverless_app = emrs.CfnApplication( + self, + f"spark_app_{namespace}", + release_label="emr-6.11.0", + type="SPARK", + name=f"spark-app-{namespace}", + network_configuration=emrs.CfnApplication.NetworkConfigurationProperty( + subnet_ids=vpc.select_subnets().subnet_ids, + security_group_ids=[self.create_security_group(vpc).security_group_id], + ), + initial_capacity=[ + emrs.CfnApplication.InitialCapacityConfigKeyValuePairProperty( + key="Driver", + value=emrs.CfnApplication.InitialCapacityConfigProperty( + worker_count=2, + worker_configuration=emrs.CfnApplication.WorkerConfigurationProperty( + cpu="2vCPU", memory="4gb" + ), + ), + ), + emrs.CfnApplication.InitialCapacityConfigKeyValuePairProperty( + key="Executor", + value=emrs.CfnApplication.InitialCapacityConfigProperty( + worker_count=4, + worker_configuration=emrs.CfnApplication.WorkerConfigurationProperty( + cpu="2vCPU", memory="4gb" + ), + ), + ), + ], + auto_stop_configuration=emrs.CfnApplication.AutoStopConfigurationProperty( + enabled=True, idle_timeout_minutes=15 + ), + ) + + cdk.Tags.of(self.serverless_app).add("namespace", namespace) + + # Also create a CloudWatch Dashboard for the above application + dashboard = self.build_cloudwatch_dashboard( + self.serverless_app.attr_application_id, namespace + ) + + CfnOutput(self, "ApplicationID", value=self.serverless_app.attr_application_id) + CfnOutput(self, "CloudWatchDashboardName", value=dashboard.dashboard_name) + + def create_security_group(self, vpc: ec2.IVpc) -> ec2.SecurityGroup: + return ec2.SecurityGroup(self, "EMRServerlessSG", vpc=vpc) + + def build_cloudwatch_dashboard( + self, application_id: str, namespace: str + ) -> cw.Dashboard: + dashboard = cw.Dashboard( + self, + f"EMRServerlessDashboard_{namespace}", + dashboard_name=f"emr-serverless-{self.serverless_app.name.replace('_','-')}-{self.serverless_app.attr_application_id}", + ) + + # First we have a set of metrics for running workers broken down by the following: + # - WorkerType (Driver or Executor) + # - CapacityAllocationType (PreInitCapacity or OnDemandCapacity) + preinit_driver_running_workers = cw.Metric( + metric_name="RunningWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Driver", + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="Pre-Initialized", + period=Duration.minutes(1), + ) + preinit_executor_running_workers = cw.Metric( + metric_name="RunningWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Executor", + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="Pre-Initialized", + period=Duration.minutes(1), + ) + ondemand_driver_running_workers = cw.Metric( + metric_name="RunningWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Driver", + "CapacityAllocationType": "OnDemandCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="OnDemand", + period=Duration.minutes(1), + ) + ondemand_executor_running_workers = cw.Metric( + metric_name="RunningWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Executor", + "CapacityAllocationType": "OnDemandCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="OnDemand", + period=Duration.minutes(1), + ) + + idle_workers = cw.Metric( + metric_name="IdleWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + period=Duration.minutes(1), + ) + dashboard.add_widgets( + cw.GaugeWidget( + title="Pre-Initialized Capacity Worker Utilization %", + period=Duration.minutes(1), + width=12, + metrics=[ + cw.MathExpression( + expression="100*((m1+m2)/(m1+m2+m3))", + label="Pre-Init Worker Utilization %", + using_metrics={ + "m1": cw.Metric( + metric_name="RunningWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Driver", + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + ), + "m2": cw.Metric( + metric_name="RunningWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Executor", + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + ), + "m3": cw.Metric( + metric_name="IdleWorkerCount", + namespace="AWS/EMRServerless", + dimensions_map={ + "ApplicationId": self.serverless_app.attr_application_id, + }, + ), + }, + ) + ], + ), + cw.SingleValueWidget( + title="Running Drivers", + width=12, + height=6, + metrics=[ + preinit_driver_running_workers, + ondemand_driver_running_workers, + ], + sparkline=True, + ), + ) + + dashboard.add_widgets( + cw.SingleValueWidget( + title="Available Workers", + width=12, + height=6, + sparkline=True, + metrics=[ + cw.MathExpression( + expression="m1+m2+m5", + label="Pre-Initialized", + period=Duration.minutes(1), + using_metrics={ + "m1": preinit_driver_running_workers, + "m2": preinit_executor_running_workers, + "m5": idle_workers, + }, + ), + cw.MathExpression( + expression="m3+m4", + label="OnDemand", + period=Duration.minutes(1), + using_metrics={ + "m3": ondemand_driver_running_workers, + "m4": ondemand_executor_running_workers, + }, + color="#ff7f0e", + ), + ], + ), + cw.SingleValueWidget( + title="Running Executors", + width=12, + height=6, + sparkline=True, + metrics=[ + preinit_executor_running_workers, + ondemand_executor_running_workers, + ], + ), + ) + + # Finally we have a whole row dedicate to job state + job_run_states = [ + "SubmittedJobs", + "PendingJobs", + "ScheduledJobs", + "RunningJobs", + "SuccessJobs", + "FailedJobs", + "CancellingJobs", + "CancelledJobs", + ] + dashboard.add_widgets( + cw.SingleValueWidget( + title="Job Runs", + width=24, + height=6, + sparkline=True, + metrics=[ + cw.Metric( + metric_name=metric, + namespace="AWS/EMRServerless", + dimensions_map={ + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label=metric, + period=Duration.minutes(1), + ) + for metric in job_run_states + ], + ) + ) + + ## BEGIN: APPLICATION METRICS SECTION + dashboard.add_widgets( + cw.TextWidget( + markdown=f"Application Metrics\n---\nApplication metrics shows the capacity used by application **({application_id})**.", + height=2, + width=24, + ) + ) + + # Build up a list of metrics across different capacity metrics (cpu, memory, storage) + capacity_metric_names = ["CPUAllocated", "MemoryAllocated", "StorageAllocated"] + app_graph_widgets = [ + cw.GraphWidget( + title=name, + period=Duration.minutes(1), + width=12, + stacked=True, + left=[ + cw.MathExpression( + expression="m1+m2", + label="Pre-Initialized", + period=Duration.minutes(1), + using_metrics={ + "m1": cw.Metric( + metric_name=name, + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Driver", + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="Pre-Initialized Spark Driver", + period=Duration.minutes(1), + ), + "m2": cw.Metric( + metric_name=name, + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Executor", + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="Pre-Initialized Spark Executor", + period=Duration.minutes(1), + ), + }, + ), + cw.MathExpression( + expression="m3+m4", + label="OnDemand", + period=Duration.minutes(1), + color="#ff7f0e", + using_metrics={ + "m3": cw.Metric( + metric_name=name, + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Driver", + "CapacityAllocationType": "OnDemandCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="OnDemand Spark Driver", + period=Duration.minutes(1), + ), + "m4": cw.Metric( + metric_name=name, + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": "Spark_Executor", + "CapacityAllocationType": "OnDemandCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="OnDemand Spark Executor", + period=Duration.minutes(1), + ), + }, + ), + ], + ) + for name in capacity_metric_names + ] + + dashboard.add_widgets( + cw.GraphWidget( + title="Running Workers", + period=Duration.minutes(1), + width=12, + stacked=True, + left=[ + cw.MathExpression( + expression="m1+m2+m5", + label="Pre-Initialized", + period=Duration.minutes(1), + using_metrics={ + "m1": preinit_driver_running_workers, + "m2": preinit_executor_running_workers, + "m5": idle_workers, + }, + ), + cw.MathExpression( + expression="m3+m4", + label="OnDemand", + period=Duration.minutes(1), + using_metrics={ + "m3": ondemand_driver_running_workers, + "m4": ondemand_executor_running_workers, + }, + color="#ff7f0e", + ), + ], + ), + app_graph_widgets[0], + ) + dashboard.add_widgets(*app_graph_widgets[1:]) + ## END: APPLICATION METRICS SECTION + + ## BEGIN: PRE-INITIALIZED CAPACITY METRICS + dashboard.add_widgets( + cw.TextWidget( + markdown=f"Pre-Initialized Capacity Metrics\n---\nShows you the Pre-Initialized Capacity metrics for an Application.", + height=2, + width=24, + ) + ) + dashboard.add_widgets( + cw.GraphWidget( + title="Pre-Initialized Capacity: Total Workers", + period=Duration.minutes(1), + width=12, + stacked=True, + left=[ + cw.MathExpression( + expression="m1+m2+m3", + label="Pre-Initialized Total Workers", + period=Duration.minutes(1), + using_metrics={ + "m1": preinit_driver_running_workers, + "m2": preinit_executor_running_workers, + "m3": idle_workers, + }, + ) + ], + ), + cw.GraphWidget( + title="Pre-Initialized Capacity: Worker Utilization %", + period=Duration.minutes(1), + width=12, + stacked=True, + left=[ + cw.MathExpression( + expression="100*((m1+m2)/(m1+m2+m3))", + label="Pre-Initialized Capacity Worker Utilization %", + period=Duration.minutes(1), + using_metrics={ + "m1": preinit_driver_running_workers, + "m2": preinit_executor_running_workers, + "m3": idle_workers, + }, + ) + ], + ), + ) + dashboard.add_widgets( + cw.GraphWidget( + title="Pre-Initialized Capacity: Idle Workers", + period=Duration.minutes(1), + width=12, + stacked=True, + left=[idle_workers], + ) + ) + ## END: PRE-INITIALIZED CAPACITY METRICS + + ## Dynamically generate metrics broken down by drivers, executors and capacity + for name, worker_type in zip( + ["Driver", "Executor"], ["Spark_Driver", "Spark_Executor"] + ): + dashboard.add_widgets( + cw.TextWidget( + markdown=f"{name} Metrics\n---\n{name} metrics shows you the capacity used by Spark {name}s for Pre-Initialized and On-Demand capacity pools.", + height=2, + width=24, + ) + ) + for row in [ + [ + {"metric": "RunningWorkerCount", "name": f"Running {name}s Count"}, + {"metric": "CPUAllocated", "name": "CPU Allocated"}, + ], + [ + {"metric": "MemoryAllocated", "name": "Memory Allocated"}, + {"metric": "StorageAllocated", "name": "Storage Allocated"}, + ], + ]: + dashboard.add_widgets( + cw.GraphWidget( + title=row[0]["name"], + period=Duration.minutes(1), + width=12, + stacked=True, + left=[ + cw.Metric( + metric_name=row[0]["metric"], + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": worker_type, + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="Pre-Initialized", + period=Duration.minutes(1), + ), + cw.Metric( + metric_name=row[0]["metric"], + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": worker_type, + "CapacityAllocationType": "OnDemandCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="OnDemand", + period=Duration.minutes(1), + ), + ], + ), + cw.GraphWidget( + title=row[1]["name"], + period=Duration.minutes(1), + width=12, + stacked=True, + left=[ + cw.Metric( + metric_name=row[1]["metric"], + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": worker_type, + "CapacityAllocationType": "PreInitCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="Pre-Initialized", + period=Duration.minutes(1), + ), + cw.Metric( + metric_name=row[1]["metric"], + namespace="AWS/EMRServerless", + dimensions_map={ + "WorkerType": worker_type, + "CapacityAllocationType": "OnDemandCapacity", + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + label="OnDemand", + period=Duration.minutes(1), + ), + ], + ), + ) + + ## END: DYNAMICALLY GENERATED METRICS + + ## BEGIN: JOB METRICS + dashboard.add_widgets( + cw.TextWidget( + markdown=f"Job Metrics\n---\nJob metrics shows you the aggregate number of jobs in each job state.", + height=2, + width=24, + ) + ) + + dashboard.add_widgets( + cw.GraphWidget( + title="Running Jobs", + width=12, + height=6, + stacked=True, + left=[ + cw.Metric( + metric_name="RunningJobs", + namespace="AWS/EMRServerless", + dimensions_map={ + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + period=Duration.minutes(1), + ) + ], + ), + cw.GraphWidget( + title="Successful Jobs", + width=12, + height=6, + stacked=True, + left=[ + cw.Metric( + metric_name="SuccessJobs", + namespace="AWS/EMRServerless", + dimensions_map={ + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + period=Duration.minutes(1), + color="#2ca02c", + ) + ], + ), + ) + + dashboard.add_widgets( + cw.GraphWidget( + title="Failed Jobs", + width=12, + height=6, + stacked=True, + left=[ + cw.Metric( + metric_name="FailedJobs", + namespace="AWS/EMRServerless", + dimensions_map={ + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + period=Duration.minutes(1), + color="#d62728", + ) + ], + ), + cw.GraphWidget( + title="Cancelled Jobs", + width=12, + height=6, + stacked=True, + left=[ + cw.Metric( + metric_name="CancelledJobs", + namespace="AWS/EMRServerless", + dimensions_map={ + "ApplicationId": self.serverless_app.attr_application_id, + }, + statistic="Sum", + period=Duration.minutes(1), + color="#c5b0d5", + ) + ], + ), + ) + ## END: JOB METRICS + return dashboard diff --git a/cdk/emr-serverless-with-sfn/stacks/emr_serverless_sm.py b/cdk/emr-serverless-with-sfn/stacks/emr_serverless_sm.py new file mode 100644 index 0000000..aab2486 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/stacks/emr_serverless_sm.py @@ -0,0 +1,196 @@ +from typing import Any, cast + +from aws_cdk import Aws, Duration, Tags +from aws_cdk import aws_iam as iam +from aws_cdk import aws_stepfunctions as sfn +from constructs import Construct, IConstruct + + +class EmrServerlessStateMachineConstruct(Construct): + """ + Construct for an EMR Serverless State Machine. + If asynchronous is set to True, a loop is implemented, waiting for the job to complete and check the exit code. + The loop can be extended with other steps while the job is running. + """ + + def __init__( + self, + scope: Construct, + id: str, + *, + namespace: str, + sfn_label: str, + emr_serverless_application_id: str, + emr_serverless_application_arn: str, + emr_api_execution_timeout_min: int = 5, + spark_job_name: str, + spark_job_entry_point: str, + spark_job_arguments: list[Any], + spark_job_submit_parameters: str, + emr_execution_role_arn: str, + wait_duration_in_seconds: int = 30, + asynchronous: bool = False, + **kwargs, + ) -> None: + super().__init__(scope, id) + Tags.of(scope=cast(IConstruct, self)).add(key="namespace", value=namespace) + + state_machine_name = f"{sfn_label}-{namespace}" + + formatted_spark_job_submit_parameters = spark_job_submit_parameters.replace( + "\n", " " + ) + spark_args = ",".join( + [f"'{i}'" if i[0] != "$" else i for i in spark_job_arguments] + ) + start_job_run_task = sfn.CustomState( + self, + f"Submit Job ({'Sync' if not asynchronous else 'Async'})", + state_json={ + "Type": "Task", + "Resource": f"arn:aws:states:::emr-serverless:startJobRun{'.sync' if not asynchronous else ''}", + "Parameters": { + "ApplicationId": f"{emr_serverless_application_id}", + "ClientToken.$": "States.UUID()", + "ExecutionRoleArn": f"{emr_execution_role_arn}", + "ExecutionTimeoutMinutes": emr_api_execution_timeout_min, + "JobDriver": { + "SparkSubmit": { + "EntryPoint": f"{spark_job_entry_point}", + "EntryPointArguments.$": f"States.Array({spark_args})", + "SparkSubmitParameters": f"{formatted_spark_job_submit_parameters}", + } + }, + "Name": f"{spark_job_name}", + "Tags": { + "Region": Aws.REGION, + "Namespace": namespace, + "Label": state_machine_name, + }, + }, + }, + ) + + wait_step = sfn.Wait( + scope=self, + id=f"Wait {wait_duration_in_seconds} seconds", + time=sfn.WaitTime.duration(Duration.seconds(wait_duration_in_seconds)), + comment=f"Waiting for the EMR job to finish.", + ) + + check_job_status_step = sfn.CustomState( + scope=self, + id="Get job info", + state_json={ + "Type": "Task", + "Parameters": { + "ApplicationId.$": "$.ApplicationId", + "JobRunId.$": "$.JobRunId", + }, + "Resource": "arn:aws:states:::aws-sdk:emrserverless:getJobRun", + "ResultPath": "$.CheckJobStatusResult", + }, + ) + + job_killed = sfn.Fail( + self, + "Job Failed", + cause="EMR Job Failed", + error="The job failed (please retry)", + ) + job_succeed = sfn.Succeed(self, "Job Succeed") + + choice_step = ( + sfn.Choice(scope=self, id="Check job status") + .when( + sfn.Condition.string_equals( + "$.CheckJobStatusResult.JobRun.State", "CANCELLED" + ), + job_killed, + ) + .when( + sfn.Condition.string_equals( + "$.CheckJobStatusResult.JobRun.State", "CANCELLING" + ), + job_killed, + ) + .when( + sfn.Condition.string_equals( + "$.CheckJobStatusResult.JobRun.State", "FAILED" + ), + job_killed, + ) + .when( + sfn.Condition.string_equals( + "$.CheckJobStatusResult.JobRun.State", "SUCCESS" + ), + job_succeed, + ) + .otherwise(wait_step) + ) + + if asynchronous: + sfn_definition = ( + sfn.Chain.start(start_job_run_task) + .next(wait_step) + .next(check_job_status_step) + .next(choice_step) + ) + else: + sfn_definition = sfn.Chain.start(start_job_run_task) + + self.state_machine = sfn.StateMachine( + scope=self, + id=state_machine_name, + state_machine_name=state_machine_name, + definition_body=sfn.DefinitionBody.from_chainable(sfn_definition), + ) + + # IAM Permissions + sfn_policy = iam.Policy( + self, + f"{state_machine_name}-policy", + document=iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + "events:PutTargets", + "events:PutRule", + "events:DescribeRule", + ], + effect=iam.Effect.ALLOW, + resources=[ + f"arn:{Aws.PARTITION}:events:{Aws.REGION}:{Aws.ACCOUNT_ID}:rule/StepFunctions*", + f"arn:aws:events:{Aws.REGION}:{Aws.ACCOUNT_ID}:rule/StepFunctions*", + ], + ), + iam.PolicyStatement( + actions=[ + "xray:PutTelemetryRecords", + "xray:GetSamplingRules", + "xray:GetSamplingTargets", + "xray:PutTraceSegments", + "emr-serverless:StartJobRun", + "emr-serverless:GetJobRun", + "emr-serverless:CancelJobRun", + "emr-serverless:TagResource", + "emr-serverless:GetDashboardForJobRun", + ], + resources=[ + emr_serverless_application_arn, + f"{emr_serverless_application_arn}/*", + ], + effect=iam.Effect.ALLOW, + ), + iam.PolicyStatement( + actions=[ + "iam:PassRole", + ], + resources=[emr_execution_role_arn], + effect=iam.Effect.ALLOW, + ), + ], + ), + ) + + self.state_machine.role.attach_inline_policy(sfn_policy) diff --git a/cdk/emr-serverless-with-sfn/stacks/emr_studio.py b/cdk/emr-serverless-with-sfn/stacks/emr_studio.py new file mode 100644 index 0000000..b8a10ba --- /dev/null +++ b/cdk/emr-serverless-with-sfn/stacks/emr_studio.py @@ -0,0 +1,344 @@ +import aws_cdk as cdk +from aws_cdk import Aws +from aws_cdk import CfnOutput as CfnOutput +from aws_cdk import Stack as Stack +from aws_cdk import aws_ec2 as ec2 +from aws_cdk import aws_emr as emr +from aws_cdk import aws_iam as iam +from aws_cdk import aws_s3 as s3 +from constructs import Construct + + +class EMRStudioStack(Stack): + studio: emr.CfnStudio + + def __init__( + self, + scope: Construct, + construct_id: str, + vpc: ec2.IVpc, + namespace: str, + **kwargs, + ) -> None: + """ + Creates the necessary security groups, asset bucket, and use roles and policies for EMR Studio. + + Studios require the following + - An engine security group + - A workspace security group + - An s3 bucket for notebook assets + - Service role and user roles + - Session policies to limit user access inside the Studio + + In addition, we create a Service Catalog item for cluster templates. + """ + super().__init__(scope, construct_id, **kwargs) + # Create security groups specifically for EMR Studio + [engine_sg, workspace_sg] = self.create_security_groups(vpc, namespace) + + # We also need to appropriately tag the VPC and subnets + self.tag_vpc_and_subnets(vpc, namespace) + + # This is where Studio assets live like ipynb notebooks and git repos + protected_namespaces = {"int", "prod"} + + removal_policy = ( + cdk.RemovalPolicy.RETAIN + if namespace in protected_namespaces + else cdk.RemovalPolicy.DESTROY + ) + self.bucket = s3.Bucket( + self, + f"EMRStudioAssets_{namespace}", + versioned=True, + auto_delete_objects=False if namespace in protected_namespaces else True, + bucket_name=f"emr-studio-assets-{Aws.REGION}-{namespace}", + removal_policy=removal_policy, + block_public_access=s3.BlockPublicAccess.BLOCK_ALL, # type: ignore + ) + cdk.Tags.of(self.bucket).add("namespace", namespace) + + # The service role provides a way for Amazon EMR Studio to interoperate with other AWS services. + service_role = self.create_service_role(namespace) + + studio = emr.CfnStudio( + self, + construct_id, + name=f"EMRServerlessAdmin_{namespace}", + auth_mode="IAM", + vpc_id=vpc.vpc_id, + default_s3_location=self.bucket.s3_url_for_object(), + engine_security_group_id=engine_sg.security_group_id, + workspace_security_group_id=workspace_sg.security_group_id, + service_role=service_role.role_arn, + subnet_ids=vpc.select_subnets().subnet_ids, + ) + cdk.Tags.of(studio).add("namespace", namespace) + + CfnOutput(self, "EMRStudioURL", value=studio.attr_url) + CfnOutput( + self, + "EMRStudioServerlessURL", + value=f"{studio.attr_url}/#/serverless-applications", + ) + + def create_security_groups(self, vpc: ec2.Vpc, namespace: str): + engine_sg = ec2.SecurityGroup(self, f"EMRStudioEngine_{namespace}", vpc=vpc) + + # The workspace security group requires explicit egress access to the engine security group. + # For that reason, we disable the default allow all. + workspace_sg = ec2.SecurityGroup( + self, f"EMRWorkspaceEngine_{namespace}", vpc=vpc, allow_all_outbound=False + ) + engine_sg.add_ingress_rule( + workspace_sg, + ec2.Port.tcp(18888), + "Allow inbound traffic to EngineSecurityGroup ( from notebook to cluster for port 18888 )", + ) + workspace_sg.add_egress_rule( + engine_sg, + ec2.Port.tcp(18888), + "Allow outbound traffic from WorkspaceSecurityGroup ( from notebook to cluster for port 18888 )", + ) + workspace_sg.connections.allow_to_any_ipv4( + ec2.Port.tcp(443), "Required for outbound git access" + ) + + # We need to tag the security groups so EMR can make modifications + cdk.Tags.of(engine_sg).add("for-use-with-amazon-emr-managed-policies", "true") + cdk.Tags.of(engine_sg).add("namespace", namespace) + cdk.Tags.of(workspace_sg).add( + "for-use-with-amazon-emr-managed-policies", "true" + ) + cdk.Tags.of(workspace_sg).add("namespace", namespace) + + return [engine_sg, workspace_sg] + + @staticmethod + def tag_vpc_and_subnets(vpc: ec2.IVpc, namespace: str): + cdk.Tags.of(vpc).add("for-use-with-amazon-emr-managed-policies", "true") + cdk.Tags.of(vpc).add("namespace", namespace) + for subnet in vpc.public_subnets + vpc.private_subnets: + cdk.Tags.of(subnet).add("for-use-with-amazon-emr-managed-policies", "true") + cdk.Tags.of(subnet).add("namespace", namespace) + + def create_service_role(self, namespace: str) -> iam.Role: + return iam.Role( + self, + f"EMRStudioServiceRole_{namespace}", + assumed_by=iam.ServicePrincipal("elasticmapreduce.amazonaws.com"), + managed_policies=[ + iam.ManagedPolicy( + self, + f"EMRStudioServiceRolePolicy_{namespace}", + statements=[ + iam.PolicyStatement( + sid="AllowEMRReadOnlyActions", + actions=[ + "elasticmapreduce:ListInstances", + "elasticmapreduce:DescribeCluster", + "elasticmapreduce:ListSteps", + ], + resources=["*"], + ), + iam.PolicyStatement( + sid="AllowEC2ENIActionsWithEMRTags", + actions=[ + "ec2:CreateNetworkInterfacePermission", + "ec2:DeleteNetworkInterface", + ], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource="network-interface", + resource_name="*", + ) + ], + conditions={ + "StringEquals": { + "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true" + } + }, + ), + iam.PolicyStatement( + sid="AllowEC2ENIAttributeAction", + actions=["ec2:ModifyNetworkInterfaceAttribute"], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource=name, + resource_name="*", + ) + for name in [ + "instance", + "network-interface", + "security-group", + ] + ], + ), + iam.PolicyStatement( + sid="AllowEC2SecurityGroupActionsWithEMRTags", + actions=[ + "ec2:AuthorizeSecurityGroupEgress", + "ec2:AuthorizeSecurityGroupIngress", + "ec2:RevokeSecurityGroupEgress", + "ec2:RevokeSecurityGroupIngress", + "ec2:DeleteNetworkInterfacePermission", + ], + resources=["*"], + conditions={ + "StringEquals": { + "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true" + } + }, + ), + iam.PolicyStatement( + sid="AllowDefaultEC2SecurityGroupsCreationWithEMRTags", + actions=["ec2:CreateSecurityGroup"], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource="security-group", + resource_name="*", + ) + ], + conditions={ + "StringEquals": { + "aws:RequestTag/for-use-with-amazon-emr-managed-policies": "true" + } + }, + ), + iam.PolicyStatement( + sid="AllowDefaultEC2SecurityGroupsCreationInVPCWithEMRTags", + actions=["ec2:CreateSecurityGroup"], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource="vpc", + resource_name="*", + ) + ], + conditions={ + "StringEquals": { + "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true" + } + }, + ), + iam.PolicyStatement( + sid="AllowAddingEMRTagsDuringDefaultSecurityGroupCreation", + actions=["ec2:CreateTags"], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource="security-group", + resource_name="*", + ) + ], + conditions={ + "StringEquals": { + "aws:RequestTag/for-use-with-amazon-emr-managed-policies": "true", + "ec2:CreateAction": "CreateSecurityGroup", + } + }, + ), + iam.PolicyStatement( + sid="AllowEC2ENICreationWithEMRTags", + actions=["ec2:CreateNetworkInterface"], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource="network-interface", + resource_name="*", + ) + ], + conditions={ + "StringEquals": { + "aws:RequestTag/for-use-with-amazon-emr-managed-policies": "true" + } + }, + ), + iam.PolicyStatement( + sid="AllowEC2ENICreationInSubnetAndSecurityGroupWithEMRTags", + actions=["ec2:CreateNetworkInterface"], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource=name, + resource_name="*", + ) + for name in ["subnet", "security-group"] + ], + conditions={ + "StringEquals": { + "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true" + } + }, + ), + iam.PolicyStatement( + sid="AllowAddingTagsDuringEC2ENICreation", + actions=["ec2:CreateTags"], + resources=[ + cdk.Stack.format_arn( + self, + service="ec2", + resource="network-interface", + resource_name="*", + ) + ], + conditions={ + "StringEquals": { + "ec2:CreateAction": "CreateNetworkInterface" + } + }, + ), + iam.PolicyStatement( + sid="AllowEC2ReadOnlyActions", + actions=[ + "ec2:DescribeSecurityGroups", + "ec2:DescribeNetworkInterfaces", + "ec2:DescribeTags", + "ec2:DescribeInstances", + "ec2:DescribeSubnets", + "ec2:DescribeVpcs", + ], + resources=["*"], + ), + iam.PolicyStatement( + sid="AllowSecretsManagerReadOnlyActionsWithEMRTags", + actions=["secretsmanager:GetSecretValue"], + resources=[ + cdk.Stack.format_arn( + self, + service="secretsmanager", + resource="secret", + resource_name="*", + ) + ], + conditions={ + "StringEquals": { + "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true" + } + }, + ), + iam.PolicyStatement( + sid="S3permission", + actions=[ + "s3:PutObject", + "s3:GetObject", + "s3:GetEncryptionConfiguration", + "s3:ListBucket", + "s3:DeleteObject", + ], + resources=["arn:aws:s3:::*"], + ), + ], + ) + ], + ) diff --git a/cdk/emr-serverless-with-sfn/stacks/sfn.py b/cdk/emr-serverless-with-sfn/stacks/sfn.py new file mode 100644 index 0000000..9832436 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/stacks/sfn.py @@ -0,0 +1,204 @@ +from typing import cast + +import aws_cdk as cdk +from aws_cdk import Aws +from aws_cdk import Stack as Stack +from aws_cdk import aws_ec2 as ec2 +from aws_cdk import aws_iam as iam +from aws_cdk import aws_s3 as s3 +from aws_cdk import aws_s3_deployment as s3deploy +from aws_cdk import aws_stepfunctions as sfn +from constructs import Construct, IConstruct + +from stacks.emr_serverless_sm import EmrServerlessStateMachineConstruct + + +class SfnEmrServerlessJobsStack(Stack): + vpc: ec2.Vpc + + def __init__( + self, + scope: Construct, + construct_id: str, + emr_serverless_app_id: str, + emr_serverless_app_arn: str, + bucket: s3.Bucket, + namespace: str, + **kwargs, + ) -> None: + f"""This stack creates a simple EMR Serverless demo within Step Functions. + + Two example State Machines are created to submit EMR Serverless jobs in the same + EMR Serverless application. The first writes an example file and the second reads it back. + Submitting the jobs, the writer invokes the sparkJobRun API asynchronously, while the reader + is synchronous (more details in the project README). Both State Machines are run in sequence + by a third one, called 'MainStateMachine_${namespace}', which in this example must be + executed manually from the AWS console or CLI, with no additional parameters. + + :param scope: The scope of the stack. + :param construct_id: The ID of the stack. + :param emr_serverless_app_id: The ID of the EMR Serverless app. + :param emr_serverless_app_arn: The ARN of the EMR Serverless app. + :param bucket: The S3 bucket to use for the EMR Serverless demo. + :param namespace: The namespace of the stack. + :param kwargs: other arguments. + """ + super().__init__( + scope, + construct_id, + description="This stack creates a simple EMR Serverless demo within Step Functions", + **kwargs, + ) + cdk.Tags.of(scope=cast(IConstruct, self)).add(key="namespace", value=namespace) + + # Uploading the PySpark jobs to S3 + s3deploy.BucketDeployment( + scope=self, + id="UploadPySparkJobsToS3", + destination_bucket=bucket, + sources=[s3deploy.Source.asset("./assets/jobs/")], + destination_key_prefix=f"jobs/{namespace}", + ) + + emr_execution_role = iam.Role( + scope=self, + id="EMRExecutionRole", + assumed_by=iam.ServicePrincipal("emr-serverless.amazonaws.com"), + inline_policies={ + "EmrExecutionRolePolicy": iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + "s3:PutObject", + "s3:GetObject", + "s3:ListBucket", + "s3:DeleteObject", + ], + resources=[ + bucket.bucket_arn, + f"{bucket.bucket_arn}/*", + ], + ), + ] + ), + }, + path="/service-role/", + ) + + writer_output_path = f"s3://{bucket.bucket_name}/data/" + + job1 = EmrServerlessStateMachineConstruct( + scope=self, + id="PySparkJob1", + namespace=namespace, + sfn_label="spark-writer", + emr_serverless_application_id=emr_serverless_app_id, + emr_serverless_application_arn=emr_serverless_app_arn, + spark_job_name="Writer", + spark_job_entry_point=f"s3://{bucket.bucket_name}/jobs/{namespace}/pyspark-writer-example.py", + spark_job_arguments=[writer_output_path], + spark_job_submit_parameters="--conf spark.dynamicAllocation.initialExecutors=1", + emr_execution_role_arn=emr_execution_role.role_arn, + asynchronous=True + ) + + job2 = EmrServerlessStateMachineConstruct( + scope=self, + id="PySparkJob2", + namespace=namespace, + sfn_label="spark-reader", + emr_serverless_application_id=emr_serverless_app_id, + emr_serverless_application_arn=emr_serverless_app_arn, + spark_job_name="Reader", + spark_job_entry_point=f"s3://{bucket.bucket_name}/jobs/{namespace}/pyspark-reader-example.py", + spark_job_arguments=[writer_output_path], + spark_job_submit_parameters="--conf spark.dynamicAllocation.initialExecutors=1", + emr_execution_role_arn=emr_execution_role.role_arn, + asynchronous=False + ) + + writer_task = sfn.CustomState( + scope=self, + id="WriteStateMachine", + state_json={ + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": job1.state_machine.state_machine_arn, + "Input.$": "$", + }, + "Retry": [ + { + "ErrorEquals": ["States.ALL"], + "BackoffRate": 1, + "IntervalSeconds": 60, + "MaxAttempts": 3, + "Comment": "Retry x3 if the Spark job fails", + } + ], + }, + ) + + reader_task = sfn.CustomState( + scope=self, + id="ReaderStateMachine", + state_json={ + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": job2.state_machine.state_machine_arn, + "Input.$": "$", + }, + "Retry": [ + { + "ErrorEquals": ["States.ALL"], + "BackoffRate": 1, + "IntervalSeconds": 60, + "MaxAttempts": 3, + "Comment": "Retry x3 if the Spark job fails", + } + ], + }, + ) + + # Main State Machine definition + sfn_definition = sfn.Chain.start(writer_task).next(reader_task) + + sfn_policy = iam.Policy( + self, + f"MainStateMachine-{namespace}-policy", + document=iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + "events:PutTargets", + "events:PutRule", + "events:DescribeRule", + ], + effect=iam.Effect.ALLOW, + resources=[ + f"arn:{Aws.PARTITION}:events:{Aws.REGION}:{Aws.ACCOUNT_ID}:rule/StepFunctions*", + f"arn:aws:events:{Aws.REGION}:{Aws.ACCOUNT_ID}:rule/StepFunctions*", + ], + ), + iam.PolicyStatement( + actions=["states:StartExecution"], + effect=iam.Effect.ALLOW, + resources=[ + job1.state_machine.state_machine_arn, + job2.state_machine.state_machine_arn, + ], + ), + ], + ), + ) + + sm = sfn.StateMachine( + scope=self, + id=f"main-state-machine-{namespace}", + definition_body=sfn.DefinitionBody.from_chainable(sfn_definition), + timeout=cdk.Duration.minutes(60), + state_machine_name=f"MainStateMachine_{namespace}", + ) + + sm.role.attach_inline_policy(sfn_policy) diff --git a/cdk/emr-serverless-with-sfn/stacks/vpc.py b/cdk/emr-serverless-with-sfn/stacks/vpc.py new file mode 100644 index 0000000..18eeeb0 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/stacks/vpc.py @@ -0,0 +1,17 @@ +from aws_cdk import Stack as Stack +from aws_cdk import aws_ec2 as ec2 +from constructs import Construct + + +class VPCStack(Stack): + vpc: ec2.Vpc + + def __init__( + self, scope: Construct, construct_id: str, namespace: str, **kwargs + ) -> None: + super().__init__(scope, construct_id, **kwargs) + + # We create a simple VPC here + self.vpc = ec2.Vpc( + self, f"EMRServerlessDemo_{namespace}", max_azs=2 + ) # default is all AZs in region diff --git a/cdk/emr-serverless-with-sfn/tests/__init__.py b/cdk/emr-serverless-with-sfn/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cdk/emr-serverless-with-sfn/tests/unit/__init__.py b/cdk/emr-serverless-with-sfn/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cdk/emr-serverless-with-sfn/tests/unit/test_emr_serverless_with_sfn_stack.py b/cdk/emr-serverless-with-sfn/tests/unit/test_emr_serverless_with_sfn_stack.py new file mode 100644 index 0000000..73ba479 --- /dev/null +++ b/cdk/emr-serverless-with-sfn/tests/unit/test_emr_serverless_with_sfn_stack.py @@ -0,0 +1,15 @@ +import aws_cdk as core +import aws_cdk.assertions as assertions + +from stacks import EmrServerlessWithSfnStack + +# example tests. To run these tests, uncomment this file along with the example +# resource in emr_serverless_with_sfn/emr_serverless_with_sfn_stack.py +def test_sqs_queue_created(): + app = core.App() + stack = EmrServerlessWithSfnStack(app, "emr-serverless-with-sfn") + template = assertions.Template.from_stack(stack) + +# template.has_resource_properties("AWS::SQS::Queue", { +# "VisibilityTimeout": 300 +# })