Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Add DynamoDB Reader/Writer #497

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ Also make sure to check out all our [examples](examples).
#### Supported input/output:

Currently Metorikku supports the following inputs:
**CSV, JSON, parquet, JDBC, Kafka, Cassandra, Elasticsearch**
**CSV, JSON, parquet, JDBC, Kafka, Cassandra, Elasticsearch, DynamoDB**

And the following outputs:
**CSV, JSON, parquet, Redshift, Cassandra, Segment, JDBC, Kafka, Elasticsearch**<br />
**CSV, JSON, parquet, Redshift, Cassandra, Segment, JDBC, Kafka, Elasticsearch, DynamoDB**<br />

### Running Metorikku
There are currently 3 options to run Metorikku.
Expand Down Expand Up @@ -410,6 +410,14 @@ We use elasticsearch-hadoop as a provided jar - spark-submit command should look

Check out the [example](examples/elasticsearch) and also the [Elasticsearch E2E test](e2e/elasticsearch) for further details.

##### DynamoDB output
Dynamodb output allows bulk writing to DynamoDB
We use spark-dynamodb as a provided jar - spark-submit command should look like so:

```spark-submit --packages com.audienceproject:spark-dynamodb_2.12:1.1.2 --class com.yotpo.metorikku.Metorikku metorikku.jar```

Check out the [example](examples/dynamodb) for further details.

#### Docker
Metorikku is provided with a [docker image](https://hub.docker.com/r/metorikku/metorikku).

Expand Down
46 changes: 46 additions & 0 deletions e2e/dynamodb/docker-compose-spark-3.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: '3'
services:
spark-submit:
image: metorikku/metorikku:standalone
environment:
- SUBMIT_COMMAND=spark-submit --packages com.audienceproject:spark-dynamodb_2.12:1.1.2 --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/dynamodb/movies.yaml
entrypoint:
- /scripts/entrypoint-submit.sh
depends_on:
- spark-master
- spark-worker
spark-master:
image: metorikku/metorikku:standalone
entrypoint:
- /scripts/entrypoint-master.sh
logging:
driver: none
spark-worker:
image: metorikku/metorikku:standalone
entrypoint:
- /scripts/entrypoint-worker.sh
logging:
driver: none
dynamodb:
image: amazon/dynamodb-local
ports:
- 8000:8000
ulimits:
memlock:
soft: -1
hard: -1
dynamodb-tester:
image: "circleci/buildpack-deps:stretch-curl"
volumes:
- ./scripts:/scripts
- ./mocks:/mocks
command: /scripts/dynamodb_test.sh
environment:
- MOCK_OUTPUT=/mocks/movies.json
- DEBUG=true
dynamodb-init:
image: "circleci/buildpack-deps:stretch-curl"
volumes:
- ./scripts:/scripts
command: /scripts/dynamodb_init.sh
environment: {}
46 changes: 46 additions & 0 deletions e2e/dynamodb/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: '3'
services:
spark-submit:
image: metorikku/metorikku:spark2_standalone
environment:
- SUBMIT_COMMAND=spark-submit --packages com.audienceproject:spark-dynamodb_2.12:1.1.2 --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/dynamodb/movies.yaml
entrypoint:
- /scripts/entrypoint-submit.sh
depends_on:
- spark-master
- spark-worker
spark-master:
image: metorikku/metorikku:spark2_standalone
entrypoint:
- /scripts/entrypoint-master.sh
logging:
driver: none
spark-worker:
image: metorikku/metorikku:spark2_standalone
entrypoint:
- /scripts/entrypoint-worker.sh
logging:
driver: none
dynamodb:
image: amazon/dynamodb-local
ports:
- 8000:8000
ulimits:
memlock:
soft: -1
hard: -1
dynamodb-tester:
image: "circleci/buildpack-deps:stretch-curl"
volumes:
- ./scripts:/scripts
- ./mocks:/mocks
command: /scripts/dynamodb_test.sh
environment:
- MOCK_OUTPUT=/mocks/movies.json
- DEBUG=true
dynamodb-init:
image: "circleci/buildpack-deps:stretch-curl"
volumes:
- ./scripts:/scripts
command: /scripts/dynamodb_init.sh
environment: {}
13 changes: 13 additions & 0 deletions e2e/dynamodb/mocks/movies.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"userid": 1,
"movieid": 1
}
{
"movieid": 2
}
{
"movieid": 3
}
{
"movieid": 4
}
12 changes: 12 additions & 0 deletions e2e/dynamodb/scripts/dynamodb_init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
DYNAMODB=${DYNAMODB:=dynamodb:8000}
TABLE_NAME=${TABLE_NAME:=movies}

# wait for dynamodb to be ready
nc -z $DYNAMODB > /dev/null
until [ $? -eq "0" ];
do
echo "Waiting for DynamoDB to be ready..."
sleep 2s
done
echo "DynamoDB is ready!"
21 changes: 21 additions & 0 deletions e2e/dynamodb/scripts/dynamodb_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
DYNAMODB=${DYNAMODB:=dynamodb:8000}
TABLE_NAME=${TABLE_NAME:=movies}

runQuery() {
curl -X GET "http://${DYNAMODB}/${TABLE_NAME}"
}

results=$(runQuery)

# Compare results
if [[ ! -z ${DEBUG} ]]; then
echo -e "Results:\n$results"
fi
echo "$results" > /tmp/test_results

jq -c . ${MOCK_OUTPUT} > /tmp/mock_unpretty
jq -c . /tmp/test_results > /tmp/test_results_unpretty

echo "Comparing mock and test output..."
diff -w /tmp/mock_unpretty /tmp/test_results_unpretty
10 changes: 10 additions & 0 deletions e2e/dynamodb/test-spark-3.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
set -e

docker-compose -f docker-compose-spark-3.yml up -d dynamodb
docker-compose -f docker-compose-spark-3.yml up --exit-code-from dynamodb-init dynamodb-init
docker-compose -f docker-compose-spark-3.yml up --exit-code-from spark-submit spark-submit
docker-compose -f docker-compose-spark-3.yml up --exit-code-from dynamodb-tester dynamodb-tester
exit_code=$(docker ps -aq -f label=com.docker.compose.project=dynamodb | xargs -I{} docker inspect {} --format='{{.State.ExitCode}}' | paste -sd+ - | bc)
docker-compose -f docker-compose-spark-3.yml down
exit $exit_code
10 changes: 10 additions & 0 deletions e2e/dynamodb/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
set -e

docker-compose -f docker-compose.yml up -d dynamodb
docker-compose -f docker-compose.yml up --exit-code-from dynamodb-init dynamodb-init
docker-compose -f docker-compose.yml up --exit-code-from spark-submit spark-submit
docker-compose -f docker-compose.yml up --exit-code-from dynamodb-tester dynamodb-tester
exit_code=$(docker ps -aq -f label=com.docker.compose.project=dynamodb | xargs -I{} docker inspect {} --format='{{.State.ExitCode}}' | paste -sd+ - | bc)
docker-compose -f docker-compose.yml down
exit $exit_code
21 changes: 21 additions & 0 deletions e2e/elasticsearch/scripts/dynamodb_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
DYNAMODB=${DYNAMODB:=dynamodb:8000}
TABLE_NAME=movies

runQuery() {
aws dynamodb scan --table-name $TABLE_NAME --endpoint-url $DYNAMODB
}

results=$(runQuery)

# Compare results
if [[ ! -z ${DEBUG} ]]; then
echo -e "Results:\n$results"
fi
echo "$results" > /tmp/test_results

jq -c . ${MOCK_OUTPUT} > /tmp/mock_unpretty
jq -c . /tmp/test_results > /tmp/test_results_unpretty

echo "Comparing mock and test output..."
diff -w /tmp/mock_unpretty /tmp/test_results_unpretty
7 changes: 2 additions & 5 deletions e2e/elasticsearch/scripts/elasticsearch_test.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
#!/bin/bash
ELASTICSEARCH=${ELASTICSEARCH:=elasticsearch:9200}
MAX_RETRIES=${MAX_RETRIES:=60}
INDEX_NAME=${INDEX_NAME:=}
SORTBY=${SORTBY:=}
DYNAMODB=${DYNAMODB:=dynamodb:8000}

runQuery() {
curl -X GET "http://${ELASTICSEARCH}/${INDEX_NAME}/_search" -H 'Content-Type: application/json' -d "{\"sort\": [\"$SORTBY\"]}" | jq '.hits["hits"][]["_source"]'
curl -X GET "http://${DYNAMODB}/movies"
}

results=$(runQuery)
Expand Down
12 changes: 12 additions & 0 deletions examples/dynamodb/movies.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
metrics:
- examples/dynamodb/movies_metric.yaml

inputs:
movies:
file:
path: examples/mocks/movies.jsonl
ratings:
file:
path: examples/mocks/ratings.jsonl
output:
dynamodb: {}
16 changes: 16 additions & 0 deletions examples/dynamodb/movies_metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
steps:
- dataFrameName: RatedMovies
sql:
SELECT userid,
movies.movieid,
AVG(rating)
FROM movies
LEFT JOIN ratings ON ratings.movieid = movies.movieid
GROUP BY 1,2
output:
- dataFrameName: RatedMovies
outputType: Dynamodb
outputOptions:
tableName: movies
options:
dynamodb.endpoint: dynamodb:8000
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ case class Input(file: Option[File],
kafka: Option[Kafka],
cassandra: Option[Cassandra],
elasticsearch: Option[Elasticsearch],
mongo: Option[MongoDB]) extends InputConfig {
mongo: Option[MongoDB],
dynamodb: Option[Dynamodb]) extends InputConfig {
def getReader(name: String): Reader = {
Seq(file, fileDateRange, jdbc, kafka, cassandra, elasticsearch, mongo).find(
Seq(file, fileDateRange, jdbc, kafka, cassandra, elasticsearch, mongo, dynamodb).find(
x => x.isDefined
).get.get.getReader(name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ case class Output(cassandra: Option[Cassandra] = None,
file: Option[File] = None,
kafka: Option[Kafka] = None,
elasticsearch: Option[Elasticsearch] = None,
hudi: Option[Hudi] = None)
hudi: Option[Hudi] = None,
dynamodb: Option[Dynamodb] = None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.yotpo.metorikku.configuration.job.input

import com.yotpo.metorikku.configuration.job.InputConfig
import com.yotpo.metorikku.input.Reader
import com.yotpo.metorikku.input.readers.dynamodb.DynamodbInput

case class Dynamodb(tableName: String,
region: Option[String],
roleArn: Option[String],
readPartitions: Option[Int],
maxPartitionBytes: Option[Int],
defaultParallelism: Option[Int],
targetCapacity: Option[Int],
stronglyConsistentReads: Option[Boolean],
bytesPerRCU: Option[Int],
filterPushdown: Option[Boolean],
throughput: Option[Int],
options: Option[Map[String, String]]
) extends InputConfig {
override def getReader(name: String): Reader = DynamodbInput(
name=name,
tableName=tableName,
region=region,
roleArn=roleArn,
readPartitions=readPartitions,
maxPartitionBytes=maxPartitionBytes,
defaultParallelism=defaultParallelism,
targetCapacity=targetCapacity,
stronglyConsistentReads=stronglyConsistentReads,
bytesPerRCU=bytesPerRCU,
filterPushdown=filterPushdown,
throughput=throughput,
options=options
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.yotpo.metorikku.configuration.job.output

case class Dynamodb(region: Option[String], roleArn: Option[String])
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ object OutputType extends Enumeration {
File,
Kafka,
Catalog,
Hudi = Value
Hudi,
Dynamodb = Value
}

class OutputTypeReference extends TypeReference[OutputType.type]
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.yotpo.metorikku.input.readers.dynamodb

import com.yotpo.metorikku.input.Reader
import org.apache.spark.sql.{DataFrame, SparkSession}

case class DynamodbInput(name: String,
tableName: String,
region: Option[String],
roleArn: Option[String],
readPartitions: Option[Int],
maxPartitionBytes: Option[Int],
defaultParallelism: Option[Int],
targetCapacity: Option[Int],
stronglyConsistentReads: Option[Boolean],
bytesPerRCU: Option[Int],
filterPushdown: Option[Boolean],
throughput: Option[Int],
options: Option[Map[String, String]]) extends Reader {
def read(sparkSession: SparkSession): DataFrame = {
var dynamodbOptions = Map("tableName" -> tableName)

if (region.nonEmpty) {
dynamodbOptions += ("region" -> region.get.toString)
}
if (roleArn.nonEmpty) {
dynamodbOptions += ("roleArn" -> roleArn.get.toString)
}
if (readPartitions.nonEmpty) {
dynamodbOptions += ("readPartitions" -> readPartitions.get.toString)
}
if (maxPartitionBytes.nonEmpty) {
dynamodbOptions += ("maxPartitionBytes" -> maxPartitionBytes.get.toString)
}
if (defaultParallelism.nonEmpty) {
dynamodbOptions += ("defaultParallelism" -> defaultParallelism.get.toString)
}
if (targetCapacity.nonEmpty) {
dynamodbOptions += ("targetCapacity" -> targetCapacity.get.toString)
}
if (stronglyConsistentReads.nonEmpty) {
dynamodbOptions += ("stronglyConsistentReads" -> stronglyConsistentReads.get.toString)
}
if (bytesPerRCU.nonEmpty) {
dynamodbOptions += ("bytesPerRCU" -> bytesPerRCU.get.toString)
}
if (filterPushdown.nonEmpty) {
dynamodbOptions += ("filterPushdown" -> filterPushdown.get.toString)
}
if (throughput.nonEmpty) {
dynamodbOptions += ("throughput" -> throughput.get.toString)
}
dynamodbOptions ++= options.getOrElse(Map())

sparkSession.read.options(dynamodbOptions).format("dynamodb").load()
}
}

Loading