Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delta lake and emr-cli example #50

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ dist/
# macOS detritus
.DS_Store

# config files
.env
70 changes: 70 additions & 0 deletions examples/pyspark/delta-lake/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# This is a muti-stage Dockerfile that can be used to build many different types of
# bundled dependencies for PySpark projects.
# The `base` stage installs generic tools necessary for packaging.
#
# There are `export-` and `build-` stages for the different types of projects.
# - python-packages - Generic support for Python projects with pyproject.toml
# - poetry - Support for Poetry projects
#
# This Dockerfile is generated automatically as part of the emr-cli tool.
# Feel free to modify it for your needs, but leave the `build-` and `export-`
# stages related to your project.
#
# To build manually, you can use the following command, assuming
# the Docker BuildKit backend is enabled. https://docs.docker.com/build/buildkit/
#
# Example for building a poetry project and saving the output to dist/ folder
# docker build --target export-poetry --output dist .


## ----------------------------------------------------------------------------
## Base stage for python development
## ----------------------------------------------------------------------------
FROM --platform=linux/amd64 amazonlinux:2 AS base

RUN yum install -y python3 tar gzip

ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# EMR 6.x uses Python 3.7 - limit Poetry version to 1.5.1
ENV POETRY_VERSION=1.5.1
RUN python3 -m pip install --upgrade pip
RUN curl -sSL https://install.python-poetry.org | python3 -

ENV PATH="$PATH:/root/.local/bin"

WORKDIR /app

COPY . .

# Test stage - installs test dependencies defined in pyproject.toml
FROM base as test
RUN python3 -m pip install .[test]

## ----------------------------------------------------------------------------
## Build and export stages for standard Python projects
## ----------------------------------------------------------------------------
# Build stage - installs required dependencies and creates a venv package
FROM base as build-python
RUN python3 -m pip install venv-pack==0.2.0 && \
python3 -m pip install .
RUN mkdir /output && venv-pack -o /output/pyspark_deps.tar.gz

# Export stage - used to copy packaged venv to local filesystem
FROM scratch AS export-python
COPY --from=build-python /output/pyspark_deps.tar.gz /

## ----------------------------------------------------------------------------
## Build and export stages for Poetry Python projects
## ----------------------------------------------------------------------------
# Build stage for poetry
FROM base as build-poetry
RUN poetry self add poetry-plugin-bundle && \
poetry bundle venv dist/bundle --without dev && \
tar -czvf dist/pyspark_deps.tar.gz -C dist/bundle . && \
rm -rf dist/bundle

FROM scratch as export-poetry
COPY --from=build-poetry /app/dist/pyspark_deps.tar.gz /
78 changes: 78 additions & 0 deletions examples/pyspark/delta-lake/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# EMR Serverless Delta Lake with Poetry example

This example shows how to use the [`emr-cli`](https://github.com/awslabs/amazon-emr-cli) to deploy a Poetry-based project with Delta Lake to EMR Serverless.

As of EMR 6.9.0, Delta Lake jars are provided on the EMR Serverless image. This means you can use the `spark.jars` Spark configuration item to specify the path to the local Delta Lake jars. If you use a different version than what's provided with EMR Serverless, you can still use the `--packages` option to specify your version.

## Getting Started

> [!NOTE]
> This assumes you already have an EMR Serverless 6.9.0 application or have completed the pre-requisites in this repo's [README](/README.md).
To create an EMR Serverless application compatible with those code, use the following command:

```bash
aws emr-serverless create-application \
--release-label emr-6.9.0 \
--type SPARK
```

- Define some environment variables to be used later

```shell
export APPLICATION_ID=<APPLICATION_ID>
export S3_BUCKET=<YOUR_BUCKET_NAME>
export JOB_ROLE_ARN=arn:aws:iam::<ACCOUNT_ID>:role/emr-serverless-job-role
```

You can either `git clone` this project or use the `emr init` command to create a Poetry project and add the `delta-take` dependency yourself.

- Option 1: `git clone`

```
git clone https://github.com/aws-samples/emr-serverless-samples.git
cd emr-serverless-samples/examples/pyspark/delta-lake
poetry install
```

- Option 2: `emr init`

```
emr init --project-type poetry delta-lake
cd delta-lake
poetry add delta-spark==2.1.0
```

Copy `main.py` from this directory to your new folder.

## Deploying

```bash
emr run \
--application-id ${APPLICATION_ID} \
--job-role ${JOB_ROLE_ARN} \
--s3-code-uri s3://${S3_BUCKET}/tmp/emr-cli-delta-lake/ \
--s3-logs-uri s3://${S3_BUCKET}/logs/ \
--entry-point main.py \
--job-args ${S3_BUCKET} \
--spark-submit-opts "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar" \
--build --wait --show-stdout
```

> [!NOTE]
> Because of how `delta-spark` is packaged, this will include `pyspark` as a dependency. The `--build` flag packages and deploys a virtualenv with `delta-spark` and related dependencies.
You should see the following output:

```
[emr-cli]: Job submitted to EMR Serverless (Job Run ID: 00fgj5hq9e4le80m)
[emr-cli]: Waiting for job to complete...
[emr-cli]: Job state is now: SCHEDULED
[emr-cli]: Job state is now: RUNNING
[emr-cli]: Job state is now: SUCCESS
[emr-cli]: stdout for 00fgj5hq9e4le80m
--------------------------------------
Itsa Delta!
[emr-cli]: Job completed successfully!
```
26 changes: 26 additions & 0 deletions examples/pyspark/delta-lake/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import sys
import uuid

from delta import DeltaTable, configure_spark_with_delta_pip
from pyspark.sql import SparkSession

builder = (
SparkSession.builder.appName("DeltaExample")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

bucket_name = sys.argv[1]

url = f"s3://{bucket_name}/tmp/delta-lake/output/1.0.1/{uuid.uuid4()}/"

# creates a Delta table and outputs to target S3 bucket
spark.range(0, 5).write.format("delta").save(url)

if DeltaTable.isDeltaTable(spark, url):
print("Itsa Delta!")
Loading
Loading