From ca4e3fdc7d75a7e67ad1313e976283d702c71237 Mon Sep 17 00:00:00 2001 From: juhoautio-rovio <59083504+juhoautio-rovio@users.noreply.github.com> Date: Fri, 6 Sep 2024 17:58:39 +0300 Subject: [PATCH] Update spark version to 3.4.1 (#54) Extras: - Updating some test dependencies - Including postgres connector in the package --------- Co-authored-by: fabricebaranski --- README.md | 2 +- examples/rovio-ingest-maven-example/pom.xml | 2 +- pom.xml | 63 ++++++++++--------- python/notebooks/druid_ingestion_test.ipynb | 12 ++-- .../druid_sketch_ingestion_test.ipynb | 10 +-- .../ingest/DruidDatasetExtensionsTest.java | 16 ++--- .../com/rovio/ingest/DruidSourceTest.java | 16 +---- .../ingest/DruidDatasetExtensionsSpec.scala | 6 +- 8 files changed, 58 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index 1c9d09d..6f3ce8d 100644 --- a/README.md +++ b/README.md @@ -213,7 +213,7 @@ To use a snapshot version: ```scala ("spark.jars.repositories", "https://s01.oss.sonatype.org/content/repositories/snapshots"), -("spark.jars.packages", "com.rovio.ingest:rovio-ingest:1.0.7_spark_3.0.1-SNAPSHOT") +("spark.jars.packages", "com.rovio.ingest:rovio-ingest:1.0.7_spark_3.4.1-SNAPSHOT") ``` ```scala diff --git a/examples/rovio-ingest-maven-example/pom.xml b/examples/rovio-ingest-maven-example/pom.xml index 731d85b..8285e6d 100644 --- a/examples/rovio-ingest-maven-example/pom.xml +++ b/examples/rovio-ingest-maven-example/pom.xml @@ -40,7 +40,7 @@ rovio-ingest 1.0.6_spark_3.0.1 diff --git a/pom.xml b/pom.xml index 752c366..9446b5d 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.rovio.ingest rovio-ingest - 1.0.7_spark_3.0.1-SNAPSHOT + 1.0.7_spark_3.4.1-SNAPSHOT jar rovio-ingest An implementation of the DatasourceV2 interface of Apache Spark™ for writing Spark Datasets to Apache Druid™ @@ -50,34 +50,30 @@ 29.0.0 8.0.28 - 42.2.23 + 42.6.0 1.12.129 1.8 4.9.3 2.14.3 - 1.7.32 + 2.0.12 5.7.2 4.13.2 1.0.1 2.12 - 10 - 3.0.1 + 17 + 3.4.1 3.0.0-M5 - 1.17.6 + 1.19.3 UTF-8 UTF-8 + 2.18.0 org.apache.logging.log4j log4j-core - 2.17.1 - - - org.apache.logging.log4j - log4j-core - 2.17.1 + ${log4j.version} org.scala-lang @@ -103,57 +99,57 @@ log4j - + org.roaringbitmap RoaringBitmap - + com.fasterxml.jackson.core jackson-annotations - + com.fasterxml.jackson.core jackson-databind - + com.fasterxml.jackson.module jackson-module-scala_2.12 - + com.fasterxml.jackson.core jackson-core - + com.fasterxml.jackson.datatype jackson-datatype-joda - + com.fasterxml.jackson.datatype jackson-datatype-guava - + com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider - + com.fasterxml.jackson.jaxrs jackson-jaxrs-smile-provider - + com.fasterxml.jackson.dataformat jackson-dataformat-smile - + org.antlr antlr4-runtime @@ -250,13 +246,6 @@ commons-compress 1.21 - - - org.junit.jupiter - junit-jupiter - ${junit-jupiter.version} - test - org.mockito mockito-core @@ -272,7 +261,7 @@ com.holdenkarau spark-testing-base_${scala.version} - ${spark.version}_1.1.0 + ${spark.version}_1.4.7 test @@ -305,6 +294,12 @@ ${junit-jupiter.version} test + + org.junit.jupiter + junit-jupiter-params + ${junit-jupiter.version} + test + org.junit.vintage @@ -428,6 +423,12 @@ ** + + org.postgresql:postgresql + + ** + + org.apache.commons:commons-pool2 diff --git a/python/notebooks/druid_ingestion_test.ipynb b/python/notebooks/druid_ingestion_test.ipynb index 4511e89..2ca6084 100644 --- a/python/notebooks/druid_ingestion_test.ipynb +++ b/python/notebooks/druid_ingestion_test.ipynb @@ -66,14 +66,14 @@ "2. A) Copy the shaded jar to s3:\n", "\n", " aws s3 --profile $AWS_PROFILE cp \\\n", - " target/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n", - " s3://$JAR_BUCKET/tmp/juho/druid/jars/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n", + " target/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n", + " s3://$JAR_BUCKET/tmp/juho/druid/jars/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n", "\n", "2. B) Copy the plain jar to s3: \n", "\n", " aws s3 --profile $AWS_PROFILE cp \\\n", - " target/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n", - " s3://$JAR_BUCKET/tmp/juho/druid/jars/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n", + " target/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n", + " s3://$JAR_BUCKET/tmp/juho/druid/jars/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n", "\n", "Then invert the boolean in the cell below to use it in spark_conf." ] @@ -200,7 +200,7 @@ " \"spark.sql.session.timeZone\": \"UTC\",\n", " # alternative if using a snapshot version\n", "# \"spark.jars.repositories\": \"https://s01.oss.sonatype.org/content/repositories/snapshots\",\n", - "# \"spark.jars.packages\": \"com.rovio.ingest:rovio-ingest:1.0.7_spark_3.0.1-SNAPSHOT\"\n", + "# \"spark.jars.packages\": \"com.rovio.ingest:rovio-ingest:1.0.7_spark_3.4.1-SNAPSHOT\"\n", " \"spark.jars.packages\": \"com.rovio.ingest:rovio-ingest:1.0.6_spark_3.0.1\"\n", " }\n", "}\n", @@ -220,7 +220,7 @@ "# Enable this to test with a manually built & copied jar instead of published package from maven\n", "if False:\n", " spark_conf[\"conf\"][\"spark.jars\"] = \\\n", - " f\"s3://{packages_bucket}/{PREFIX}druid/jars/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\"\n", + " f\"s3://{packages_bucket}/{PREFIX}druid/jars/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\"\n", " del spark_conf[\"conf\"][\"spark.jars.packages\"]\n", "\n", "set_spark_config(spark_conf)\n", diff --git a/python/notebooks/druid_sketch_ingestion_test.ipynb b/python/notebooks/druid_sketch_ingestion_test.ipynb index 80bbcd1..8087c34 100644 --- a/python/notebooks/druid_sketch_ingestion_test.ipynb +++ b/python/notebooks/druid_sketch_ingestion_test.ipynb @@ -66,14 +66,14 @@ "2. A) Copy the shaded jar to s3:\n", "\n", " aws s3 --profile $AWS_PROFILE cp \\\n", - " target/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n", - " s3://$JAR_BUCKET/tmp/vivek/druid/jars/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n", + " target/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n", + " s3://$JAR_BUCKET/tmp/vivek/druid/jars/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n", "\n", "2. B) Copy the plain jar to s3: \n", "\n", " aws s3 --profile $AWS_PROFILE cp \\\n", - " target/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n", - " s3://$JAR_BUCKET/tmp/vivek/druid/jars/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n", + " target/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n", + " s3://$JAR_BUCKET/tmp/vivek/druid/jars/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n", "\n", "Then invert the boolean in the cell below to use it in spark_conf." ] @@ -166,7 +166,7 @@ "if True:\n", " jars_base_path = \"s3://{packages_bucket}/{PREFIX}druid/jars\"\n", " jars = (\n", - " f\"{jars_base_path}/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar,\"\n", + " f\"{jars_base_path}/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar,\"\n", " f\"{jars_base_path}/datasketches-hive-1.2.0.jar,\"\n", " f\"{jars_base_path}/datasketches-java-4.1.0.jar,\"\n", " f\"{jars_base_path}/datasketches-memory-2.0.0.jar\"\n", diff --git a/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java b/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java index e26f96d..ad90258 100644 --- a/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java +++ b/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java @@ -268,7 +268,7 @@ public void shouldWriteDataSegmentsWithCorrectValues() throws IOException { ImmutableMap dimensions = ImmutableMap.builder() .put("string_column", "US") .put("__time", DateTime.parse("2019-10-16T00:01:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "true") .build(); Map data = parsed.get(0, dimensions); @@ -278,7 +278,7 @@ public void shouldWriteDataSegmentsWithCorrectValues() throws IOException { dimensions = ImmutableMap.builder() .put("string_column", "US") .put("__time", DateTime.parse("2019-10-16T00:02:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "false") .build(); data = parsed.get(0, dimensions); @@ -326,7 +326,7 @@ public void shouldWriteDataSegmentsWithCorrectValuesUsingPartialMetricSpec() thr ImmutableMap dimensions = ImmutableMap.builder() .put("string_column", "US") .put("__time", DateTime.parse("2019-10-16T00:01:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "true") .put("double_column", "100.0") .build(); @@ -338,7 +338,7 @@ public void shouldWriteDataSegmentsWithCorrectValuesUsingPartialMetricSpec() thr dimensions = ImmutableMap.builder() .put("string_column", "US") .put("__time", DateTime.parse("2019-10-16T00:02:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "false") .put("double_column", "-1.0") .build(); @@ -395,7 +395,7 @@ public void shouldWriteDataSegmentsWithSketchBuild() throws IOException { // String column is automatically excluded from dimensions as it is used for sketch aggregation. ImmutableMap dimensions = ImmutableMap.builder() .put("__time", DateTime.parse("2019-10-16T00:01:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "true") .put("long_column", "10") .put("double_column", "100.0") @@ -408,7 +408,7 @@ public void shouldWriteDataSegmentsWithSketchBuild() throws IOException { // String column is automatically excluded from dimensions as it is used for sketch aggregation. dimensions = ImmutableMap.builder() .put("__time", DateTime.parse("2019-10-16T00:02:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "false") .put("long_column", "-1") .put("double_column", "-1.0") @@ -460,7 +460,7 @@ public void shouldWriteDataSegmentsWithThetaSketchAsInputColumn() throws IOExcep assertTrue(parsed.containsRow(0)); ImmutableMap dimensions = ImmutableMap.builder() .put("__time", DateTime.parse("2019-10-16T00:01:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "true") .put("long_column", "10") .put("double_column", "100.0") @@ -471,7 +471,7 @@ public void shouldWriteDataSegmentsWithThetaSketchAsInputColumn() throws IOExcep dimensions = ImmutableMap.builder() .put("__time", DateTime.parse("2019-10-16T00:02:00Z")) - .put("string_date_column", "2019-10-16 00:00:00") + .put("string_date_column", "2019-10-16") .put("boolean_column", "false") .put("long_column", "-1") .put("double_column", "-1.0") diff --git a/src/test/java/com/rovio/ingest/DruidSourceTest.java b/src/test/java/com/rovio/ingest/DruidSourceTest.java index 7b4c4c2..e7678e8 100644 --- a/src/test/java/com/rovio/ingest/DruidSourceTest.java +++ b/src/test/java/com/rovio/ingest/DruidSourceTest.java @@ -71,21 +71,7 @@ public void failForNullTimestamp() { .options(options) .save()); assertThat(thrown.getCause().getMessage(), containsString( - "java.lang.IllegalStateException: Null value for column 'date'.")); - } - - @Test - public void failWhenPartitionedByNonTimeStampColumn() { - Dataset dataset = loadCsv(spark, "/data.csv").repartition(column("country")); - dataset.show(false); - SparkException thrown = assertThrows(SparkException.class, - () -> dataset.write() - .format("com.rovio.ingest.DruidSource") - .mode(SaveMode.Overwrite) - .options(options) - .save()); - assertThat(thrown.getCause().getMessage(), containsString( - "java.sql.BatchUpdateException: Duplicate entry")); + "Null value for column 'date'.")); } @Test diff --git a/src/test/scala/com/rovio/ingest/DruidDatasetExtensionsSpec.scala b/src/test/scala/com/rovio/ingest/DruidDatasetExtensionsSpec.scala index edea226..bbbf0d2 100644 --- a/src/test/scala/com/rovio/ingest/DruidDatasetExtensionsSpec.scala +++ b/src/test/scala/com/rovio/ingest/DruidDatasetExtensionsSpec.scala @@ -21,7 +21,9 @@ import com.rovio.ingest.model.DbType import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, SaveMode, SparkSession} import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.junit.JUnitRunner +import org.scalatest.flatspec.AnyFlatSpec // must define classes outside of the actual test methods, otherwise spark can't find them case class KpiRow(date: String, country: String, dau: Integer, revenue: Double, is_segmented: Boolean) @@ -31,7 +33,7 @@ case class ExpectedRow(`__PARTITION_TIME__`: String, `__PARTITION_NUM__`: Intege // This is needed for mvn test. It wouldn't find this test otherwise. @RunWith(classOf[JUnitRunner]) -class DruidDatasetExtensionsSpec extends FlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterEach { +class DruidDatasetExtensionsSpec extends AnyFlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterEach { before { DruidSourceBaseTest.MYSQL.start()