diff --git a/.gitignore b/.gitignore index 84f6310dc..60815bdcf 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,8 @@ derby.log metastore_db/ out/ metorikku.iml -examples/output \ No newline at end of file +examples/output +e2e/*/output +e2e/*/*/output +e2e/*/warehouse +e2e/*/*/warehouse \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index f5eb36341..0794265c1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,9 +15,12 @@ branches: - /^v[\d\.]+$/ env: global: - - SPARK_VERSION=2.4.4 + - SPARK_VERSION=2.4.5 - HADOOP_VERSION=2.9.2 - - HIVE_VERSION=1.2.2 + - HIVE1_VERSION=1.2.2 + - HUDI_HIVE1_VERSION=0.4.7 + - HIVE_VERSION=2.3.3 + - HUDI_VERSION=0.5.1-incubating before_script: - docker pull $(grep -ioP '(?<=^from)\s+\S+' docker/spark/Dockerfile) - docker pull metorikku/spark:latest @@ -45,7 +48,8 @@ script: # Create all relevant docker images - travis_fold start "docker" - docker build -t metorikku/spark:$SPARK_VERSION --cache-from metorikku/spark:latest -t metorikku/spark:latest --build-arg SPARK_VERSION=$SPARK_VERSION --build-arg HADOOP_VERSION=$HADOOP_VERSION -f docker/spark/Dockerfile docker/spark || travis_terminate 1; - - docker build -t metorikku/hive:$HIVE_VERSION --cache-from metorikku/hive:latest -t metorikku/hive:latest --build-arg HIVE_VERSION=$HIVE_VERSION -f docker/hive/Dockerfile docker/hive || travis_terminate 1; + - docker build -t metorikku/hive:1 --cache-from metorikku/hive:1 -t metorikku/hive:HIVE1_VERSION --build-arg HIVE_VERSION=$HIVE1_VERSION --build-arg HUDI_HIVE1_VERSION=$HUDI_HIVE1_VERSION -f docker/hive1/Dockerfile docker/hive1 || travis_terminate 1; + - docker build -t metorikku/hive --cache-from metorikku/hive -t metorikku/hive:HIVE_VERSION --build-arg HIVE_VERSION=$HIVE_VERSION --build-arg HUDI_VERSION=$HUDI_VERSION -f docker/hive/Dockerfile docker/hive || travis_terminate 1; - docker build -t metorikku/metorikku:latest -f docker/metorikku/Dockerfile . || travis_terminate 1; - travis_fold end "docker" # Kafka to Kafka E2E @@ -56,10 +60,14 @@ script: - travis_fold start "file_to_influx_e2e" - (cd e2e/influxdb && ./test.sh) || travis_terminate 1; - travis_fold end "file_to_influx_e2e" - # File to hive E2E + # File to hive2 E2E - travis_fold start "file_to_hive_e2e" - (cd e2e/hive && ./test.sh) || travis_terminate 1; - travis_fold end "file_to_hive_e2e" + # File to hive1 E2E + - travis_fold start "file_to_hive1_e2e" + - (cd e2e/hive1 && ./test.sh) || travis_terminate 1; + - travis_fold end "file_to_hive1_e2e" # File to Elasticsearch E2E - travis_fold start "file_to_elasticsearch_e2e" - (cd e2e/elasticsearch && ./test.sh) || travis_terminate 1; @@ -81,7 +89,7 @@ deploy: condition: ($TRAVIS_BRANCH = master) || ($TRAVIS_TAG =~ ^v.*) - provider: script skip_cleanup: true - script: echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin && docker tag metorikku/metorikku metorikku/metorikku:"$TRAVIS_TAG"_spark_"$SPARK_VERSION" && docker push metorikku/spark && docker push metorikku/hive && docker push metorikku/metorikku + script: echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin && docker tag metorikku/metorikku metorikku/metorikku:"$TRAVIS_TAG"_spark_"$SPARK_VERSION" && docker push metorikku/spark && docker push metorikku/hive && docker push metorikku/hive:1 && docker push metorikku/metorikku on: all_branches: true condition: $TRAVIS_TAG =~ ^v.* diff --git a/README.md b/README.md index c50a731ee..0198b1e62 100644 --- a/README.md +++ b/README.md @@ -265,7 +265,7 @@ This will commit the offsets to kafka, as a new dummy consumer group. * we use ABRiS as a provided jar In order to deserialize your kafka stream messages (https://github.com/AbsaOSS/ABRiS), add the ```schemaRegistryUrl``` option to the kafka input config spark-submit command should look like so: -```spark-submit --repositories http://packages.confluent.io/maven/ --jars https://repo1.maven.org/maven2/za/co/absa/abris_2.11/3.1.1/abris_2.11-3.1.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,io.confluent:kafka-schema-registry-client:5.3.0,io.confluent:kafka-avro-serializer:5.3.0 --class com.yotpo.metorikku.Metorikku metorikku.jar``` +```spark-submit --repositories http://packages.confluent.io/maven/ --jars https://repo1.maven.org/maven2/za/co/absa/abris_2.11/3.1.1/abris_2.11-3.1.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,io.confluent:kafka-schema-registry-client:5.3.0,io.confluent:kafka-avro-serializer:5.3.0 --class com.yotpo.metorikku.Metorikku metorikku.jar``` * If your subject schema name is not ```-value``` (e.g. if the topic is a regex pattern) you can specify the schema subject in the ```schemaSubject``` section @@ -447,7 +447,7 @@ Metorikku supports reading/writing with [Apache Hudi](https://github.com/apache/ Hudi is a very exciting project that basically allows upserts and deletes directly on top of partitioned parquet data. In order to use Hudi with Metorikku you need to add to your classpath (via ```--jars``` or if running locally with ```-cp```) -an external JAR from here: https://repo1.maven.org/maven2/com/uber/hoodie/hoodie-spark-bundle/0.4.7/hoodie-spark-bundle-0.4.7.jar +an external JAR from here: https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.5.1-incubating/hudi-spark-bundle_2.11-0.5.1-incubating.jar To run Hudi jobs you also have to make sure you have the following spark configuration (pass with ```--conf``` or ```-D```): ```properties diff --git a/build.sbt b/build.sbt index 99879e1a2..3a9bb2b73 100644 --- a/build.sbt +++ b/build.sbt @@ -18,15 +18,16 @@ developers := List( ) scalaVersion := "2.11.12" -val sparkVersion = Option(System.getProperty("sparkVersion")).getOrElse("2.4.4") +val sparkVersion = Option(System.getProperty("sparkVersion")).getOrElse("2.4.5") val jacksonVersion = "2.9.9" lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4") lazy val excludeNetty = ExclusionRule(organization = "io.netty", name = "netty") lazy val excludeNettyAll = ExclusionRule(organization = "io.netty", name = "netty-all") -lazy val excludeHoodieTimeLineService = ExclusionRule(organization = "com.uber.hoodie", name = "hoodie-timeline-service") lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "avro") lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark") +lazy val excludeFasterXML = ExclusionRule(organization = "com.fasterxml.jackson.module", name= "jackson-module-scala_2.12") +lazy val excludeMetricsCore = ExclusionRule(organization = "io.dropwizard.metrics", name= "metrics-core") libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", @@ -45,7 +46,7 @@ libraryDependencies ++= Seq( "io.netty" % "netty" % "3.10.6.Final", "com.google.guava" % "guava" % "16.0.1", "com.typesafe.play" %% "play-json" % "2.6.2", - "com.databricks" %% "spark-redshift" % "3.0.0-preview1", + "com.databricks" %% "spark-redshift" % "3.0.0-preview1" excludeAll excludeAvro, "com.amazon.redshift" % "redshift-jdbc42" % "1.2.1.1001", "com.segment.analytics.java" % "analytics" % "2.0.0", "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.6", @@ -56,15 +57,15 @@ libraryDependencies ++= Seq( "com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion, "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % jacksonVersion, - "com.groupon.dse" % "spark-metrics" % "2.0.0", + "com.groupon.dse" % "spark-metrics" % "2.0.0" excludeAll excludeMetricsCore, "org.apache.commons" % "commons-text" % "1.6", "org.influxdb" % "influxdb-java" % "2.14", "org.apache.kafka" %% "kafka" % "2.2.0" % "provided", "za.co.absa" % "abris_2.11" % "3.1.1" % "provided" excludeAll(excludeAvro, excludeSpark), - "com.uber.hoodie" % "hoodie-spark" % "0.4.7" % "provided" excludeAll(excludeHoodieTimeLineService), - "com.uber.hoodie" % "hoodie-common" % "0.4.7" % "provided" excludeAll(excludeHoodieTimeLineService), + "org.apache.hudi" %% "hudi-spark-bundle" % "0.5.1-incubating" % "provided" excludeAll excludeFasterXML, + "org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided", "org.apache.avro" % "avro" % "1.8.2" % "provided", - "org.apache.hive" % "hive-jdbc" % "1.2.2" % "provided" excludeAll(excludeNetty, excludeNettyAll) + "org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll) ) // Temporary fix for https://github.com/databricks/spark-redshift/issues/315#issuecomment-285294306 diff --git a/docker/hive/Dockerfile b/docker/hive/Dockerfile index 3af9aa3eb..7b777915d 100644 --- a/docker/hive/Dockerfile +++ b/docker/hive/Dockerfile @@ -5,13 +5,15 @@ ENV ATLAS_HOME=/opt/atlas RUN mkdir -p $ATLAS_HOME/hook/hive ENV HADOOP_HOME=/opt/hadoop ENV HADOOP_VERSION=2.7.4 +RUN apt-get update && apt-get install -y ant + RUN wget -q https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz \ && tar -xzf hadoop-$HADOOP_VERSION.tar.gz \ && mv hadoop-$HADOOP_VERSION $HADOOP_HOME \ && rm hadoop-$HADOOP_VERSION.tar.gz ENV HIVE_HOME=/opt/hive -ENV HIVE_VERSION=1.2.2 +ENV HIVE_VERSION=2.3.3 RUN wget -q https://archive.apache.org/dist/hive/hive-$HIVE_VERSION/apache-hive-$HIVE_VERSION-bin.tar.gz \ && tar -xzf apache-hive-$HIVE_VERSION-bin.tar.gz \ && mv apache-hive-$HIVE_VERSION-bin $HIVE_HOME \ @@ -21,10 +23,12 @@ ENV MYSQL_CONNECTOR_VERSION=5.1.47 RUN wget -q https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_CONNECTOR_VERSION/mysql-connector-java-$MYSQL_CONNECTOR_VERSION.jar \ && mv mysql-connector-java-$MYSQL_CONNECTOR_VERSION.jar $HIVE_HOME/lib -ENV HUDI_VERSION=0.4.5 -RUN apt-get update && apt-get install -y ant -RUN wget -q https://repo1.maven.org/maven2/com/uber/hoodie/hoodie-hive-bundle/$HUDI_VERSION/hoodie-hive-bundle-$HUDI_VERSION.jar \ - && mv hoodie-hive-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib +ENV HUDI_VERSION=0.5.1-incubating +RUN wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-bundle/$HUDI_VERSION/hudi-hive-bundle-$HUDI_VERSION.jar \ + && mv hudi-hive-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib +RUN wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/$HUDI_VERSION/hudi-hadoop-mr-bundle-$HUDI_VERSION.jar \ + && mv hudi-hadoop-mr-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib + RUN wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/$HADOOP_VERSION/hadoop-aws-$HADOOP_VERSION.jar \ && mv hadoop-aws-$HADOOP_VERSION.jar $HIVE_HOME/lib @@ -86,4 +90,4 @@ RUN mkdir -p $ATLAS_HOME/hook-bin/ COPY atlas/import_hive.sh $ATLAS_HOME/hook-bin/ RUN chmod +x /$ATLAS_HOME/hook-bin/import_hive.sh -CMD /wait && /start-hive.sh +CMD /wait && /start-hive.sh \ No newline at end of file diff --git a/docker/hive/start-hive.sh b/docker/hive/start-hive.sh index 09a1859d5..160a908e0 100755 --- a/docker/hive/start-hive.sh +++ b/docker/hive/start-hive.sh @@ -72,6 +72,10 @@ cat >${HIVE_HOME}/conf/hive-site.xml <fs.s3n.awsSecretAccessKey ${AWS_SECRET_KEY} + + hive.security.authorization.enabled + false + EOL if [[ ! -z ${USE_ATLAS} ]] ; then @@ -109,4 +113,5 @@ fi $HIVE_HOME/bin/schematool -dbType ${DB_TYPE} -initSchema nohup ${HIVE_HOME}/bin/hive --service metastore -p ${METASTORE_PORT} & +sleep 10s ${HIVE_HOME}/bin/hiveserver2 --hiveconf hive.root.logger=INFO,console diff --git a/docker/hive1/Dockerfile b/docker/hive1/Dockerfile new file mode 100644 index 000000000..3783818fb --- /dev/null +++ b/docker/hive1/Dockerfile @@ -0,0 +1,89 @@ +FROM openjdk:8u212-b04-jre-stretch + +RUN mkdir /opt/atlas +ENV ATLAS_HOME=/opt/atlas +RUN mkdir -p $ATLAS_HOME/hook/hive +ENV HADOOP_HOME=/opt/hadoop +ENV HADOOP_VERSION=2.7.4 +RUN wget -q https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz \ + && tar -xzf hadoop-$HADOOP_VERSION.tar.gz \ + && mv hadoop-$HADOOP_VERSION $HADOOP_HOME \ + && rm hadoop-$HADOOP_VERSION.tar.gz + +ENV HIVE_HOME=/opt/hive +ENV HIVE_VERSION=1.2.2 +RUN wget -q https://archive.apache.org/dist/hive/hive-$HIVE_VERSION/apache-hive-$HIVE_VERSION-bin.tar.gz \ + && tar -xzf apache-hive-$HIVE_VERSION-bin.tar.gz \ + && mv apache-hive-$HIVE_VERSION-bin $HIVE_HOME \ + && rm apache-hive-$HIVE_VERSION-bin.tar.gz + +ENV MYSQL_CONNECTOR_VERSION=5.1.47 +RUN wget -q https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_CONNECTOR_VERSION/mysql-connector-java-$MYSQL_CONNECTOR_VERSION.jar \ + && mv mysql-connector-java-$MYSQL_CONNECTOR_VERSION.jar $HIVE_HOME/lib + +ENV HUDI_VERSION=0.4.7 +RUN apt-get update && apt-get install -y ant +RUN wget -q https://repo1.maven.org/maven2/com/uber/hoodie/hoodie-hive-bundle/$HUDI_VERSION/hoodie-hive-bundle-$HUDI_VERSION.jar \ + && mv hoodie-hive-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib +RUN wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/$HADOOP_VERSION/hadoop-aws-$HADOOP_VERSION.jar \ + && mv hadoop-aws-$HADOOP_VERSION.jar $HIVE_HOME/lib + +ENV AWS_JAVA_SDK_VERSION=1.7.4 +RUN wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/$AWS_JAVA_SDK_VERSION/aws-java-sdk-$AWS_JAVA_SDK_VERSION.jar \ + && mv aws-java-sdk-$AWS_JAVA_SDK_VERSION.jar $HIVE_HOME/lib + +RUN wget -q https://repo1.maven.org/maven2/net/logstash/log4j/jsonevent-layout/1.7/jsonevent-layout-1.7.jar \ + && mv jsonevent-layout-1.7.jar $HIVE_HOME/lib +RUN wget -q https://repo1.maven.org/maven2/net/minidev/json-smart/1.1.1/json-smart-1.1.1.jar \ + && mv json-smart-1.1.1.jar $HIVE_HOME/lib + +# Apache Atlas HiveHook installation +ENV ATLAS_VERSION=2.0.0 +ENV HBASE_VERSION=2.0.2 +ENV JACKSON_VERSION=2.9.9 +ENV JERSEY_VERSION=1.19 +ENV JSR311_VERSION=1.1 +ENV KAFKA_2_1_1_VERSION=2.0.0 +ENV SCALA_LIBRARY_VERSION=2.11.12 +ENV COMMONS_CONFIG_VERSION=1.10 + +RUN mkdir -p $ATLAS_HOME/hook/hive/atlas-hive-plugin-impl + +RUN wget -P ${ATLAS_HOME}/hook/hive/ https://repo1.maven.org/maven2/org/apache/atlas/atlas-plugin-classloader/$ATLAS_VERSION/atlas-plugin-classloader-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/ https://repo1.maven.org/maven2/org/apache/atlas/hive-bridge-shim/$ATLAS_VERSION/hive-bridge-shim-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/atlas-client-common/$ATLAS_VERSION/atlas-client-common-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/atlas-client-v1/$ATLAS_VERSION/atlas-client-v1-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/atlas-client-v2/$ATLAS_VERSION/atlas-client-v2-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/atlas-common/$ATLAS_VERSION/atlas-common-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/atlas-intg/$ATLAS_VERSION/atlas-intg-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/atlas-notification/$ATLAS_VERSION/atlas-notification-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/hdfs-model/$ATLAS_VERSION/hdfs-model-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/hive-bridge/$ATLAS_VERSION/hive-bridge-$ATLAS_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/hbase/hbase-common/$HBASE_VERSION/hbase-common-$HBASE_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/hbase/hbase-server/$HBASE_VERSION/hbase-server-$HBASE_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/$JACKSON_VERSION/jackson-annotations-$JACKSON_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/$JACKSON_VERSION/jackson-core-$JACKSON_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/$JACKSON_VERSION/jackson-databind-$JACKSON_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/fasterxml/jackson/module/jackson-module-jaxb-annotations/$JACKSON_VERSION/jackson-module-jaxb-annotations-$JACKSON_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/fasterxml/jackson/jaxrs/jackson-jaxrs-base/$JACKSON_VERSION/jackson-jaxrs-base-$JACKSON_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/fasterxml/jackson/jaxrs/jackson-jaxrs-json-provider/$JACKSON_VERSION/jackson-jaxrs-json-provider-$JACKSON_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/sun/jersey/jersey-json/$JERSEY_VERSION/jersey-json-$JERSEY_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/com/sun/jersey/contribs/jersey-multipart/$JERSEY_VERSION/jersey-multipart-$JERSEY_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/$KAFKA_2_1_1_VERSION/kafka-clients-$KAFKA_2_1_1_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.12/$KAFKA_2_1_1_VERSION/kafka_2.12-$KAFKA_2_1_1_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/scala-lang/scala-library/$SCALA_LIBRARY_VERSION/scala-library-$SCALA_LIBRARY_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/commons-configuration/commons-configuration/$COMMONS_CONFIG_VERSION/commons-configuration-$COMMONS_CONFIG_VERSION.jar +RUN wget -P ${ATLAS_HOME}/hook/hive/atlas-hive-plugin-impl/ https://repo1.maven.org/maven2/org/apache/atlas/hdfs-model/$ATLAS_VERSION/hdfs-model-$ATLAS_VERSION.jar + + +ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.5.0/wait /wait +RUN chmod +x /wait + +COPY start-hive.sh / +COPY log4j.json.properties . + +RUN mkdir -p $ATLAS_HOME/hook-bin/ +COPY atlas/import_hive.sh $ATLAS_HOME/hook-bin/ +RUN chmod +x /$ATLAS_HOME/hook-bin/import_hive.sh + +CMD /wait && /start-hive.sh \ No newline at end of file diff --git a/docker/hive1/atlas/import_hive.sh b/docker/hive1/atlas/import_hive.sh new file mode 100644 index 000000000..71408bf3b --- /dev/null +++ b/docker/hive1/atlas/import_hive.sh @@ -0,0 +1,156 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. +# +# resolve links - $0 may be a softlink +# Taken from Apache Atlas project repo, removed JAR_BIN and Removed Hoodie Jar from Classpath +PRG="${0}" + +[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true + +while [ -h "${PRG}" ]; do + ls=`ls -ld "${PRG}"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "${PRG}"`/"$link" + fi +done + +BASEDIR=`dirname ${PRG}` +BASEDIR=`cd ${BASEDIR}/..;pwd` + +if test -z "${JAVA_HOME}" +then + JAVA_BIN=`which java` + JAR_BIN=`which jar` +else + JAVA_BIN="${JAVA_HOME}/bin/java" + JAR_BIN="${JAVA_HOME}/bin/jar" +fi +export JAVA_BIN + +if [[ ! -e "${JAVA_BIN}" ]]; then + echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available." + exit 1 +fi + +# Construct Atlas classpath using jars from hook/hive/atlas-hive-plugin-impl/ directory. +for i in "${BASEDIR}/hook/hive/atlas-hive-plugin-impl/"*.jar; do + ATLASCPPATH="${ATLASCPPATH}:$i" +done + +# log dir for applications +ATLAS_LOG_DIR="${ATLAS_LOG_DIR:-$BASEDIR/logs}" +export ATLAS_LOG_DIR +LOGFILE="$ATLAS_LOG_DIR/import-hive.log" + +TIME=`date +%Y%m%d%H%M%s` + +#Add hive conf in classpath +if [ ! -z "$HIVE_CONF_DIR" ]; then + HIVE_CONF=$HIVE_CONF_DIR +elif [ ! -z "$HIVE_HOME" ]; then + HIVE_CONF="$HIVE_HOME/conf" +elif [ -e /etc/hive/conf ]; then + HIVE_CONF="/etc/hive/conf" +else + echo "Could not find a valid HIVE configuration" + exit 1 +fi + +echo Using Hive configuration directory ["$HIVE_CONF"] + + +if [ -f "${HIVE_CONF}/hive-env.sh" ]; then + . "${HIVE_CONF}/hive-env.sh" +fi + +if [ -z "$HIVE_HOME" ]; then + if [ -d "${BASEDIR}/../hive" ]; then + HIVE_HOME=${BASEDIR}/../hive + else + echo "Please set HIVE_HOME to the root of Hive installation" + exit 1 + fi +fi + +HIVE_CP="${HIVE_CONF}" + +for i in "${HIVE_HOME}/lib/"*.jar; do + if [[ $i == *"hoodie"* ]]; then + continue + fi + HIVE_CP="${HIVE_CP}:$i" +done + +#Add hadoop conf in classpath +if [ ! -z "$HADOOP_CLASSPATH" ]; then + HADOOP_CP=$HADOOP_CLASSPATH +elif [ ! -z "$HADOOP_HOME" ]; then + HADOOP_CP=`$HADOOP_HOME/bin/hadoop classpath` +elif [ $(command -v hadoop) ]; then + HADOOP_CP=`hadoop classpath` + echo $HADOOP_CP +else + echo "Environment variable HADOOP_CLASSPATH or HADOOP_HOME need to be set" + exit 1 +fi + +CP="${ATLASCPPATH}:${HIVE_CP}:${HADOOP_CP}" + +# If running in cygwin, convert pathnames and classpath to Windows format. +if [ "${CYGWIN}" == "true" ] +then + ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}` + LOGFILE=`cygpath -w ${LOGFILE}` + HIVE_CP=`cygpath -w ${HIVE_CP}` + HADOOP_CP=`cygpath -w ${HADOOP_CP}` + CP=`cygpath -w -p ${CP}` +fi + +JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=import-hive.log +-Dlog4j.configuration=atlas-hive-import-log4j.xml" + +IMPORT_ARGS= +JVM_ARGS= + +while true +do + option=$1 + shift + + case "$option" in + -d) IMPORT_ARGS="$IMPORT_ARGS -d $1"; shift;; + -t) IMPORT_ARGS="$IMPORT_ARGS -t $1"; shift;; + -f) IMPORT_ARGS="$IMPORT_ARGS -f $1"; shift;; + --database) IMPORT_ARGS="$IMPORT_ARGS --database $1"; shift;; + --table) IMPORT_ARGS="$IMPORT_ARGS --table $1"; shift;; + --filename) IMPORT_ARGS="$IMPORT_ARGS --filename $1"; shift;; + "") break;; + *) JVM_ARGS="$JVM_ARGS $option" + esac +done + +JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}" + +echo "Log file for import is $LOGFILE" + +"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS + +RETVAL=$? +[ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully!!! +[ $RETVAL -ne 0 ] && echo Failed to import Hive Meta Data!!! + +exit $RETVAL \ No newline at end of file diff --git a/docker/hive1/log4j.json.properties b/docker/hive1/log4j.json.properties new file mode 100644 index 000000000..2b31b0112 --- /dev/null +++ b/docker/hive1/log4j.json.properties @@ -0,0 +1,4 @@ +log4j.rootLogger=INFO, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=net.logstash.log4j.JSONEventLayoutV1 diff --git a/docker/hive1/start-hive.sh b/docker/hive1/start-hive.sh new file mode 100755 index 000000000..dcccb6d92 --- /dev/null +++ b/docker/hive1/start-hive.sh @@ -0,0 +1,116 @@ +#!/bin/bash + +CONNECTION_DRIVER_NAME=${CONNECTION_DRIVER_NAME:=com.mysql.jdbc.Driver} +HIVE_SERVER_PORT=${HIVE_SERVER_PORT:=10000} +SCHEMA_VERIFICATION=${SCHEMA_VERIFICATION:=false} +METASTORE_PORT=${METASTORE_PORT:=9083} +DEFAULT_FS=${DEFAULT_FS:=file:///} +DB_TYPE=${DB_TYPE:=mysql} +USE_ATLAS=${USE_ATLAS:=false} + +if [ ! -z ${JSON_LOG} ] ; then + echo "Setting Log type to JSON" + cat log4j.json.properties >> ${HIVE_HOME}/conf/hive-log4j.properties +fi + +cat >${HIVE_HOME}/conf/hive-site.xml < + + javax.jdo.option.ConnectionURL + ${CONNECTION_URL} + JDBC connect string for a JDBC metastore + + + javax.jdo.option.ConnectionDriverName + ${CONNECTION_DRIVER_NAME} + Driver class name for a JDBC metastore + + + javax.jdo.option.ConnectionUserName + ${CONNECTION_USER_NAME} + username to use against metastore database + + + javax.jdo.option.ConnectionPassword + ${CONNECTION_PASSWORD} + password to use against metastore database + + + hive.metastore.schema.verification + ${SCHEMA_VERIFICATION} + + + hive.metastore.warehouse.dir + ${WAREHOUSE_DIR} + + + + hive.metastore.uris + thrift://localhost:${METASTORE_PORT} + + + hive.server2.thrift.port + ${HIVE_SERVER_PORT} + + + fs.default.name + ${DEFAULT_FS} + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3n.awsAccessKeyId + ${AWS_ACCESS_KEY} + + + fs.s3n.awsSecretAccessKey + ${AWS_SECRET_KEY} + + + hive.security.authorization.enabled + false + +EOL + +if [[ ! -z ${USE_ATLAS} ]] ; then +cat >>${HIVE_HOME}/conf/hive-site.xml < + hive.exec.post.hooks + org.apache.atlas.hive.hook.HiveHook + + +EOL +# hive-env extra jars +cat >>${HIVE_HOME}/conf/hive-env.sh <${HIVE_HOME}/conf/atlas-application.properties <>${HIVE_HOME}/conf/hive-site.xml < +EOL +fi + +$HIVE_HOME/bin/schematool -dbType ${DB_TYPE} -initSchema + +nohup ${HIVE_HOME}/bin/hive --service metastore -p ${METASTORE_PORT} & +${HIVE_HOME}/bin/hiveserver2 --hiveconf hive.root.logger=INFO,console diff --git a/docker/spark/Dockerfile b/docker/spark/Dockerfile index 11ac5012a..0fde928df 100644 --- a/docker/spark/Dockerfile +++ b/docker/spark/Dockerfile @@ -1,6 +1,6 @@ FROM openjdk:8u212-b04-jre-stretch -ARG SPARK_VERSION=2.4.4 +ARG SPARK_VERSION=2.4.5 RUN wget -q https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop2.7.tgz \ && tar -xzf spark-${SPARK_VERSION}-bin-hadoop2.7.tgz \ @@ -22,6 +22,19 @@ RUN mkdir /opt/hadoop-${HADOOP_VERSION}/logs ENV HADOOP_PREFIX=/opt/hadoop-${HADOOP_VERSION} ENV HADOOP_CONF_DIR=/etc/hadoop +ENV HIVE_HOME=/opt/hive +ENV HIVE_VERSION=2.3.3 +RUN wget -q https://archive.apache.org/dist/hive/hive-$HIVE_VERSION/apache-hive-$HIVE_VERSION-bin.tar.gz \ + && tar -xzf apache-hive-$HIVE_VERSION-bin.tar.gz \ + && mv apache-hive-$HIVE_VERSION-bin $HIVE_HOME \ + && rm apache-hive-$HIVE_VERSION-bin.tar.gz + +ENV HUDI_VERSION=0.5.1-incubating +RUN wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-bundle/$HUDI_VERSION/hudi-hive-bundle-$HUDI_VERSION.jar \ + && mv hudi-hive-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib +RUN wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/$HUDI_VERSION/hudi-hadoop-mr-bundle-$HUDI_VERSION.jar \ + && mv hudi-hadoop-mr-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib + RUN apt-get update \ && apt-get install -y coreutils jq less inotify-tools python3 python3-setuptools \ && curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py \ diff --git a/docker/spark/scripts/entrypoint-submit.sh b/docker/spark/scripts/entrypoint-submit.sh index 16667adc8..c7417f8cc 100755 --- a/docker/spark/scripts/entrypoint-submit.sh +++ b/docker/spark/scripts/entrypoint-submit.sh @@ -8,6 +8,7 @@ MAX_RETRIES=${MAX_RETRIES:=300} MIN_WORKERS=${MIN_WORKERS:=1} SPARK_UI_PORT=${SPARK_UI_PORT:=4040} POST_SCRIPT=${POST_SCRIPT:=/scripts/finish-submit.sh} +USE_BUILTIN_HIVE_METASTORE=${USE_BUILTIN_HIVE_METASTORE:=true} # Atlas /scripts/add-atlas-integration.sh @@ -44,7 +45,13 @@ if [[ ! -z ${HIVE_METASTORE_URI} ]]; then echo -e " spark.sql.catalogImplementation=hive spark.hadoop.hive.metastore.uris=thrift://$HIVE_METASTORE_URI -spark.sql.hive.convertMetastoreParquet=false +" >> /spark/conf/spark-defaults.conf +fi + +if [[ "${USE_BUILTIN_HIVE_METASTORE}" == false ]]; then +echo -e " +spark.sql.hive.metastore.version=$HIVE_VERSION +spark.sql.hive.metastore.jars=/opt/hive/lib/* " >> /spark/conf/spark-defaults.conf fi diff --git a/docker/spark/spark-defaults.conf b/docker/spark/spark-defaults.conf index 83b625f0d..d51076262 100644 --- a/docker/spark/spark-defaults.conf +++ b/docker/spark/spark-defaults.conf @@ -1,15 +1,17 @@ + # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. - -spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem -spark.hadoop.fs.s3a.fast.upload=true +spark.executor.logs.rolling.maxRetainedFiles=0 +spark.executor.logs.rolling.maxSize=104857600 +spark.executor.logs.rolling.strategy=size spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.fast.upload=true +spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.s3a.multiobjectdelete.enable=false spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 +spark.port.maxRetries=0 spark.rdd.compress=true spark.serializer=org.apache.spark.serializer.KryoSerializer -spark.port.maxRetries=0 -spark.executor.logs.rolling.maxRetainedFiles=0 -spark.executor.logs.rolling.maxSize=104857600 -spark.executor.logs.rolling.strategy=size \ No newline at end of file +spark.sql.hive.convertMetastoreParquet=false +spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true \ No newline at end of file diff --git a/e2e/cdc/docker-compose.yml b/e2e/cdc/docker-compose.yml index 6a5293c65..12b2ca0aa 100644 --- a/e2e/cdc/docker-compose.yml +++ b/e2e/cdc/docker-compose.yml @@ -110,7 +110,9 @@ services: spark-submit: image: metorikku/metorikku environment: - - SUBMIT_COMMAND=spark-submit --repositories http://packages.confluent.io/maven/ --jars https://github.com/YotpoLtd/incubator-hudi/releases/download/0.4.7-patch/hoodie-spark-bundle-0.4.7.jar,https://repo1.maven.org/maven2/za/co/absa/abris_2.11/3.1.1/abris_2.11-3.1.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,io.confluent:kafka-schema-registry-client:5.3.0,io.confluent:kafka-avro-serializer:5.3.0,org.apache.kafka:kafka_2.11:2.2.0 --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.metastore.uris=thrift://hive:9083 --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/kafka/kafka_example_cdc.yaml + - SUBMIT_COMMAND=spark-submit --repositories http://packages.confluent.io/maven/ --jars https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.5.1-incubating/hudi-spark-bundle_2.11-0.5.1-incubating.jar,https://repo1.maven.org/maven2/za/co/absa/abris_2.11/3.1.1/abris_2.11-3.1.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,io.confluent:kafka-schema-registry-client:5.3.0,io.confluent:kafka-avro-serializer:5.3.0,org.apache.kafka:kafka_2.11:2.2.0 --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/kafka/kafka_example_cdc.yaml + - HIVE_METASTORE_URI=hive:9083 + - USE_BUILTIN_HIVE_METASTORE=false entrypoint: - /scripts/entrypoint-submit.sh volumes: @@ -142,7 +144,9 @@ services: hive-tester: image: metorikku/metorikku environment: - - SUBMIT_COMMAND=spark-submit --jars https://repo1.maven.org/maven2/com/uber/hoodie/hoodie-spark-bundle/0.4.7/hoodie-spark-bundle-0.4.7.jar --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.metastore.uris=thrift://hive:9083 --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.MetorikkuTester metorikku.jar --test-settings /test_metrics/hive_test.yaml + - SUBMIT_COMMAND=spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.5 --jars https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.5.1-incubating/hudi-spark-bundle_2.11-0.5.1-incubating.jar --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.MetorikkuTester metorikku.jar --test-settings /test_metrics/hive_test.yaml + - HIVE_METASTORE_URI=hive:9083 + - USE_BUILTIN_HIVE_METASTORE=false volumes: - ./output/:/examples/output/ - ./test_metrics:/test_metrics diff --git a/e2e/hive/docker-compose.yml b/e2e/hive/docker-compose.yml index 1ac90eede..863978c9b 100644 --- a/e2e/hive/docker-compose.yml +++ b/e2e/hive/docker-compose.yml @@ -5,6 +5,7 @@ services: environment: - SUBMIT_COMMAND=spark-submit --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/hive/movies.yaml - HIVE_METASTORE_URI=hive:9083 + - USE_BUILTIN_HIVE_METASTORE=false entrypoint: - /scripts/entrypoint-submit.sh volumes: @@ -18,6 +19,7 @@ services: environment: - SUBMIT_COMMAND=spark-submit --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.MetorikkuTester metorikku.jar --test-settings examples/hive/movies_test.yaml - HIVE_METASTORE_URI=hive:9083 + - USE_BUILTIN_HIVE_METASTORE=false volumes: - ./output/:/examples/output/ - ./warehouse:/warehouse diff --git a/e2e/hive1/docker-compose.yml b/e2e/hive1/docker-compose.yml new file mode 100644 index 000000000..caea9e9bb --- /dev/null +++ b/e2e/hive1/docker-compose.yml @@ -0,0 +1,58 @@ +version: '3' +services: + spark-submit: + image: metorikku/metorikku + environment: + - SUBMIT_COMMAND=spark-submit --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/hive/movies.yaml + - HIVE_METASTORE_URI=hive:9083 + entrypoint: + - /scripts/entrypoint-submit.sh + volumes: + - ./output/:/examples/output/ + depends_on: + - spark-master + - spark-worker + - hive + hive-tester: + image: metorikku/metorikku + environment: + - SUBMIT_COMMAND=spark-submit --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.MetorikkuTester metorikku.jar --test-settings examples/hive/movies_test.yaml + - HIVE_METASTORE_URI=hive:9083 + volumes: + - ./output/:/examples/output/ + - ./warehouse:/warehouse + entrypoint: + - /scripts/entrypoint-submit.sh + depends_on: + - spark-master + - spark-worker + spark-master: + image: metorikku/metorikku + entrypoint: + - /scripts/entrypoint-master.sh + logging: + driver: none + spark-worker: + image: metorikku/metorikku + entrypoint: + - /scripts/entrypoint-worker.sh + volumes: + - ./output/:/examples/output/ + - ./warehouse:/warehouse + logging: + driver: none + hive: + image: metorikku/hive:1 + environment: + - CONNECTION_URL=jdbc:mysql://hive-db:3306/hive?useSSL=false + - CONNECTION_USER_NAME=root + - CONNECTION_PASSWORD=pass + - WAREHOUSE_DIR=file:///warehouse + - WAIT_HOSTS=hive-db:3306 + depends_on: + - hive-db + hive-db: + image: mysql:5.7.25 + environment: + - MYSQL_ROOT_PASSWORD=pass + - MYSQL_DATABASE=hive diff --git a/e2e/hive1/test.sh b/e2e/hive1/test.sh new file mode 100755 index 000000000..536f9c52c --- /dev/null +++ b/e2e/hive1/test.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -e + +docker-compose up -d hive-db +docker-compose up --exit-code-from spark-submit spark-submit +docker-compose up --exit-code-from hive-tester hive-tester +exit_code=$(docker ps -aq -f label=com.docker.compose.project=hive | xargs -I{} docker inspect {} --format='{{.State.ExitCode}}' | paste -sd+ - | bc) +docker-compose down +exit $exit_code diff --git a/e2e/hudi/docker-compose.yml b/e2e/hudi/docker-compose.yml index e281e59ed..b1b5f3f71 100644 --- a/e2e/hudi/docker-compose.yml +++ b/e2e/hudi/docker-compose.yml @@ -3,7 +3,9 @@ services: spark-submit: image: metorikku/metorikku environment: - - SUBMIT_COMMAND=spark-submit --jars https://repo1.maven.org/maven2/com/uber/hoodie/hoodie-spark-bundle/0.4.7/hoodie-spark-bundle-0.4.7.jar --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.metastore.uris=thrift://hive:9083 --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/hudi/movies.yaml + - SUBMIT_COMMAND=spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.5 --jars https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.5.1-incubating/hudi-spark-bundle_2.11-0.5.1-incubating.jar --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/hudi/movies.yaml + - HIVE_METASTORE_URI=hive:9083 + - USE_BUILTIN_HIVE_METASTORE=false volumes: - ./output/:/examples/output/ entrypoint: @@ -14,7 +16,9 @@ services: hive-tester: image: metorikku/metorikku environment: - - SUBMIT_COMMAND=spark-submit --jars https://repo1.maven.org/maven2/com/uber/hoodie/hoodie-spark-bundle/0.4.7/hoodie-spark-bundle-0.4.7.jar --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.metastore.uris=thrift://hive:9083 --class com.yotpo.metorikku.MetorikkuTester metorikku.jar --test-settings examples/hudi/movies_test.yaml + - SUBMIT_COMMAND=spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.5 --jars https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.5.1-incubating/hudi-spark-bundle_2.11-0.5.1-incubating.jar --class com.yotpo.metorikku.MetorikkuTester metorikku.jar --test-settings examples/hudi/movies_test.yaml + - HIVE_METASTORE_URI=hive:9083 + - USE_BUILTIN_HIVE_METASTORE=false volumes: - ./output/:/examples/output/ entrypoint: diff --git a/e2e/kafka/docker-compose.yml b/e2e/kafka/docker-compose.yml index 729c21a0d..c468d11a5 100644 --- a/e2e/kafka/docker-compose.yml +++ b/e2e/kafka/docker-compose.yml @@ -3,7 +3,7 @@ services: spark-submit: image: metorikku/metorikku environment: - - SUBMIT_COMMAND=spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/kafka/kafka_example.yaml + - SUBMIT_COMMAND=spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/kafka/kafka_example.yaml entrypoint: - /scripts/entrypoint-submit.sh depends_on: diff --git a/examples/atlas/docker-compose.yml b/examples/atlas/docker-compose.yml index 060abbb88..64fa664bc 100644 --- a/examples/atlas/docker-compose.yml +++ b/examples/atlas/docker-compose.yml @@ -16,7 +16,7 @@ services: atlas-tester: image: metorikku/metorikku environment: - - SUBMIT_COMMAND=spark-submit --jars https://github.com/YotpoLtd/spark-atlas-connector/releases/download/latest/spark-atlas-connector-assembly.jar --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.metastore.uris=thrift://hive:9083 --conf spark.sql.warehouse.dir=/warehouse --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/hive/movies.yaml + - SUBMIT_COMMAND=spark-submit --jars https://github.com/YotpoLtd/spark-atlas-connector/releases/download/latest/spark-atlas-connector-assembly.jar --conf spark.sql.warehouse.dir=/warehouse --class com.yotpo.metorikku.Metorikku metorikku.jar -c examples/hive/movies.yaml - HIVE_METASTORE_URI=hive:9083 - USE_ATLAS=true - ATLAS_ZOOKEEPER_CONNECT=zookeeper:2181 diff --git a/examples/hive/movies.yaml b/examples/hive/movies.yaml index 632a9cdac..b2a7b8751 100644 --- a/examples/hive/movies.yaml +++ b/examples/hive/movies.yaml @@ -6,5 +6,5 @@ inputs: path: examples/file_inputs/movies.csv output: file: - dir: examples/output + dir: /examples/output showPreviewLines: 10 \ No newline at end of file diff --git a/examples/udf/build.sbt b/examples/udf/build.sbt index c73e8772d..84b43992a 100644 --- a/examples/udf/build.sbt +++ b/examples/udf/build.sbt @@ -4,7 +4,7 @@ version := "1.0" scalaVersion := "2.11.12" -val sparkVersion = "2.4.4" +val sparkVersion = "2.4.5" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index fa805dd14..817aa428f 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -38,4 +38,4 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR log4j.logger.com.yotpo.metorikku=INFO -log4j.logger.com.uber.hoodie=INFO \ No newline at end of file +log4j.logger.org.apache.hudi=INFO \ No newline at end of file diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala index f7889dd58..a4affd014 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/file/HudiOutputWriter.scala @@ -1,14 +1,14 @@ package com.yotpo.metorikku.output.writers.file -import java.util.concurrent.TimeUnit - -import com.codahale.metrics.{MetricFilter, ScheduledReporter} -import com.uber.hoodie.metrics.Metrics import com.yotpo.metorikku.configuration.job.output.Hudi import com.yotpo.metorikku.output.Writer +import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.metrics.Metrics import org.apache.log4j.LogManager -import org.apache.spark.sql.types.{DataType, StructField, StructType} -import org.apache.spark.sql.functions.{col, lit, max, when} import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{col, lit, max, when} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import java.util.concurrent.TimeUnit + // REQUIRED: -Dspark.serializer=org.apache.spark.serializer.KryoSerializer // http://hudi.incubator.apache.org/configurations.html @@ -64,7 +64,7 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext val writer = df.write - writer.format("com.uber.hoodie") + writer.format("org.apache.hudi") // Handle hudi job configuration hudiOutput match { @@ -107,15 +107,15 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext hudiOutputProperties.partitionBy match { case Some(partitionBy) => { writer.option("hoodie.datasource.write.partitionpath.field", partitionBy) - writer.option("hoodie.datasource.write.keygenerator.class", "com.uber.hoodie.SimpleKeyGenerator") + writer.option("hoodie.datasource.write.keygenerator.class", classOf[SimpleKeyGenerator].getName) } - case None => writer.option("hoodie.datasource.write.keygenerator.class", "com.uber.hoodie.NonpartitionedKeyGenerator") + case None => writer.option("hoodie.datasource.write.keygenerator.class", classOf[NonpartitionedKeyGenerator].getName) } hudiOutputProperties.hivePartitions match { case Some(hivePartitions) => { writer.option("hoodie.datasource.hive_sync.partition_fields", hivePartitions) - writer.option("hoodie.datasource.hive_sync.partition_extractor_class", "com.uber.hoodie.hive.MultiPartKeysValueExtractor") + writer.option("hoodie.datasource.hive_sync.partition_extractor_class", classOf[org.apache.hudi.hive.MultiPartKeysValueExtractor].getName) } case None => } @@ -133,27 +133,28 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext private def writeMetrics(): Unit = { try { - Metrics.getInstance().getReporter.asInstanceOf[ScheduledReporter].report() + Metrics.getInstance().getReporter.asInstanceOf[org.apache.hudi.com.codahale.metrics.ScheduledReporter].report() } catch { - case e: Throwable => log.info(s"Failed to report metrics ${e.getMessage}") + case e: Throwable => log.info(s"Failed to report metrics", e) } } private def resetMetrics(): Unit = { - val reporterScheduledPeriodInSeconds = 30 + val reporterScheduledPeriodInSeconds: java.lang.Long = 30L try { - Metrics.getInstance().getRegistry.removeMatching(MetricFilter.ALL) + Metrics.getInstance().getRegistry().removeMatching(org.apache.hudi.com.codahale.metrics.MetricFilter.ALL) } catch { - case e: Throwable => log.info(s"Failed to reset hudi metrics ${e.getMessage}") + case e: Throwable => log.info(s"Failed to reset hudi metrics", e) } try { - Metrics.getInstance().getReporter.asInstanceOf[ScheduledReporter].start(reporterScheduledPeriodInSeconds, TimeUnit.SECONDS) + Metrics.getInstance().getReporter.asInstanceOf[org.apache.hudi.com.codahale.metrics.ScheduledReporter] + .start(reporterScheduledPeriodInSeconds, TimeUnit.SECONDS) } catch { - case e: Throwable => log.info(s"Failed to start scheduled metrics ${e.getMessage}") + case e: Throwable => log.info(s"Failed to start scheduled metrics", e) } } @@ -171,7 +172,7 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext case None => } config.storageType match { - case Some(storageType) => writer.option("hoodie.datasource.write.storage.type", storageType) // MERGE_ON_READ/COPY_ON_WRITE + case Some(storageType) => writer.option("hoodie.datasource.write.table.type", storageType) // MERGE_ON_READ/COPY_ON_WRITE case None => } config.operation match { diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/file/OverwriteWithLatestAvroPayloadWithDelete.java b/src/main/scala/com/yotpo/metorikku/output/writers/file/OverwriteWithLatestAvroPayloadWithDelete.java index 59258badb..565b4f189 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/file/OverwriteWithLatestAvroPayloadWithDelete.java +++ b/src/main/scala/com/yotpo/metorikku/output/writers/file/OverwriteWithLatestAvroPayloadWithDelete.java @@ -3,11 +3,12 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; import java.io.IOException; -import java.util.Optional; -public class OverwriteWithLatestAvroPayloadWithDelete extends com.uber.hoodie.OverwriteWithLatestAvroPayload { +public class OverwriteWithLatestAvroPayloadWithDelete extends OverwriteWithLatestAvroPayload { private GenericRecord record; public OverwriteWithLatestAvroPayloadWithDelete(GenericRecord record, Comparable orderingVal) { @@ -15,21 +16,19 @@ public OverwriteWithLatestAvroPayloadWithDelete(GenericRecord record, Comparable this.record = record; } - public OverwriteWithLatestAvroPayloadWithDelete(Optional record) { + public OverwriteWithLatestAvroPayloadWithDelete(Option record) { super(record); } private Boolean isDeleteRecord() { Object deleteField = record.get("_hoodie_delete"); - return (deleteField != null && - deleteField instanceof Boolean && - (Boolean)deleteField == true); + return (deleteField instanceof Boolean && (Boolean) deleteField); } @Override - public Optional getInsertValue(Schema schema) throws IOException { + public Option getInsertValue(Schema schema) throws IOException { if (isDeleteRecord()) - return Optional.empty(); + return Option.empty(); else return super.getInsertValue(schema); }