Skip to content

Commit

Permalink
Update spark version to 3.4.1 (#54)
Browse files Browse the repository at this point in the history
Extras:
- Updating some test dependencies
- Including postgres connector in the package

---------

Co-authored-by: fabricebaranski <[email protected]>
  • Loading branch information
juhoautio-rovio and fabricebaranski authored Sep 6, 2024
1 parent 64487a3 commit ca4e3fd
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 69 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ To use a snapshot version:

```scala
("spark.jars.repositories", "https://s01.oss.sonatype.org/content/repositories/snapshots"),
("spark.jars.packages", "com.rovio.ingest:rovio-ingest:1.0.7_spark_3.0.1-SNAPSHOT")
("spark.jars.packages", "com.rovio.ingest:rovio-ingest:1.0.7_spark_3.4.1-SNAPSHOT")
```

```scala
Expand Down
2 changes: 1 addition & 1 deletion examples/rovio-ingest-maven-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<artifactId>rovio-ingest</artifactId>
<version>1.0.6_spark_3.0.1</version>
<!-- NOTE: This requires the sonatype snapshot repository. See <repositories>.
<version>1.0.7_spark_3.0.1-SNAPSHOT</version>
<version>1.0.7_spark_3.4.1-SNAPSHOT</version>
-->
</dependency>
<dependency>
Expand Down
63 changes: 32 additions & 31 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

<groupId>com.rovio.ingest</groupId>
<artifactId>rovio-ingest</artifactId>
<version>1.0.7_spark_3.0.1-SNAPSHOT</version>
<version>1.0.7_spark_3.4.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rovio-ingest</name>
<description>An implementation of the DatasourceV2 interface of Apache Spark™ for writing Spark Datasets to Apache Druid™</description>
Expand All @@ -50,34 +50,30 @@
<properties>
<druid.version>29.0.0</druid.version>
<mysql.connector.version>8.0.28</mysql.connector.version>
<postgresql.version>42.2.23</postgresql.version>
<postgresql.version>42.6.0</postgresql.version>
<aws.sdk.version>1.12.129</aws.sdk.version>
<java.version>1.8</java.version>
<antlr4.version>4.9.3</antlr4.version>
<jackson.version>2.14.3</jackson.version>
<slf4j.version>1.7.32</slf4j.version>
<slf4j.version>2.0.12</slf4j.version>
<junit-jupiter.version>5.7.2</junit-jupiter.version>
<junit.version>4.13.2</junit.version>
<junit-platform.version>1.0.1</junit-platform.version>
<scala.version>2.12</scala.version>
<scala.minor.version>10</scala.minor.version>
<spark.version>3.0.1</spark.version>
<scala.minor.version>17</scala.minor.version>
<spark.version>3.4.1</spark.version>
<surefire.version>3.0.0-M5</surefire.version>
<testcontainers.version>1.17.6</testcontainers.version>
<testcontainers.version>1.19.3</testcontainers.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<log4j.version>2.18.0</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand All @@ -103,57 +99,57 @@
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a newer version than what spark 3.0.1 brings -->
<!-- Druid library needs a newer version than what spark 3.4.1 brings -->
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-smile-provider</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<!-- Druid library needs a older version than what spark 3.4.1 brings -->
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</exclusion>
Expand Down Expand Up @@ -250,13 +246,6 @@
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand All @@ -272,7 +261,7 @@
<dependency>
<groupId>com.holdenkarau</groupId>
<artifactId>spark-testing-base_${scala.version}</artifactId>
<version>${spark.version}_1.1.0</version>
<version>${spark.version}_1.4.7</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -305,6 +294,12 @@
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<!-- JUnit Jupiter Engine to depend on the JUnit4 engine and JUnit 4 API -->
<dependency>
<groupId>org.junit.vintage</groupId>
Expand Down Expand Up @@ -428,6 +423,12 @@
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.postgresql:postgresql</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.commons:commons-pool2</artifact>
<includes>
Expand Down
12 changes: 6 additions & 6 deletions python/notebooks/druid_ingestion_test.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@
"2. A) Copy the shaded jar to s3:\n",
"\n",
" aws s3 --profile $AWS_PROFILE cp \\\n",
" target/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/juho/druid/jars/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n",
" target/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/juho/druid/jars/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n",
"\n",
"2. B) Copy the plain jar to s3: \n",
"\n",
" aws s3 --profile $AWS_PROFILE cp \\\n",
" target/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/juho/druid/jars/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n",
" target/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/juho/druid/jars/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n",
"\n",
"Then invert the boolean in the cell below to use it in spark_conf."
]
Expand Down Expand Up @@ -200,7 +200,7 @@
" \"spark.sql.session.timeZone\": \"UTC\",\n",
" # alternative if using a snapshot version\n",
"# \"spark.jars.repositories\": \"https://s01.oss.sonatype.org/content/repositories/snapshots\",\n",
"# \"spark.jars.packages\": \"com.rovio.ingest:rovio-ingest:1.0.7_spark_3.0.1-SNAPSHOT\"\n",
"# \"spark.jars.packages\": \"com.rovio.ingest:rovio-ingest:1.0.7_spark_3.4.1-SNAPSHOT\"\n",
" \"spark.jars.packages\": \"com.rovio.ingest:rovio-ingest:1.0.6_spark_3.0.1\"\n",
" }\n",
"}\n",
Expand All @@ -220,7 +220,7 @@
"# Enable this to test with a manually built & copied jar instead of published package from maven\n",
"if False:\n",
" spark_conf[\"conf\"][\"spark.jars\"] = \\\n",
" f\"s3://{packages_bucket}/{PREFIX}druid/jars/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\"\n",
" f\"s3://{packages_bucket}/{PREFIX}druid/jars/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\"\n",
" del spark_conf[\"conf\"][\"spark.jars.packages\"]\n",
"\n",
"set_spark_config(spark_conf)\n",
Expand Down
10 changes: 5 additions & 5 deletions python/notebooks/druid_sketch_ingestion_test.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@
"2. A) Copy the shaded jar to s3:\n",
"\n",
" aws s3 --profile $AWS_PROFILE cp \\\n",
" target/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/vivek/druid/jars/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n",
" target/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/vivek/druid/jars/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n",
"\n",
"2. B) Copy the plain jar to s3: \n",
"\n",
" aws s3 --profile $AWS_PROFILE cp \\\n",
" target/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/vivek/druid/jars/original-rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar\n",
" target/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar \\\n",
" s3://$JAR_BUCKET/tmp/vivek/druid/jars/original-rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar\n",
"\n",
"Then invert the boolean in the cell below to use it in spark_conf."
]
Expand Down Expand Up @@ -166,7 +166,7 @@
"if True:\n",
" jars_base_path = \"s3://{packages_bucket}/{PREFIX}druid/jars\"\n",
" jars = (\n",
" f\"{jars_base_path}/rovio-ingest-1.0.7_spark_3.0.1-SNAPSHOT.jar,\"\n",
" f\"{jars_base_path}/rovio-ingest-1.0.7_spark_3.4.1-SNAPSHOT.jar,\"\n",
" f\"{jars_base_path}/datasketches-hive-1.2.0.jar,\"\n",
" f\"{jars_base_path}/datasketches-java-4.1.0.jar,\"\n",
" f\"{jars_base_path}/datasketches-memory-2.0.0.jar\"\n",
Expand Down
16 changes: 8 additions & 8 deletions src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void shouldWriteDataSegmentsWithCorrectValues() throws IOException {
ImmutableMap<String, Object> dimensions = ImmutableMap.<String, Object>builder()
.put("string_column", "US")
.put("__time", DateTime.parse("2019-10-16T00:01:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "true")
.build();
Map<String, Object> data = parsed.get(0, dimensions);
Expand All @@ -278,7 +278,7 @@ public void shouldWriteDataSegmentsWithCorrectValues() throws IOException {
dimensions = ImmutableMap.<String, Object>builder()
.put("string_column", "US")
.put("__time", DateTime.parse("2019-10-16T00:02:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "false")
.build();
data = parsed.get(0, dimensions);
Expand Down Expand Up @@ -326,7 +326,7 @@ public void shouldWriteDataSegmentsWithCorrectValuesUsingPartialMetricSpec() thr
ImmutableMap<String, Object> dimensions = ImmutableMap.<String, Object>builder()
.put("string_column", "US")
.put("__time", DateTime.parse("2019-10-16T00:01:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "true")
.put("double_column", "100.0")
.build();
Expand All @@ -338,7 +338,7 @@ public void shouldWriteDataSegmentsWithCorrectValuesUsingPartialMetricSpec() thr
dimensions = ImmutableMap.<String, Object>builder()
.put("string_column", "US")
.put("__time", DateTime.parse("2019-10-16T00:02:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "false")
.put("double_column", "-1.0")
.build();
Expand Down Expand Up @@ -395,7 +395,7 @@ public void shouldWriteDataSegmentsWithSketchBuild() throws IOException {
// String column is automatically excluded from dimensions as it is used for sketch aggregation.
ImmutableMap<String, Object> dimensions = ImmutableMap.<String, Object>builder()
.put("__time", DateTime.parse("2019-10-16T00:01:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "true")
.put("long_column", "10")
.put("double_column", "100.0")
Expand All @@ -408,7 +408,7 @@ public void shouldWriteDataSegmentsWithSketchBuild() throws IOException {
// String column is automatically excluded from dimensions as it is used for sketch aggregation.
dimensions = ImmutableMap.<String, Object>builder()
.put("__time", DateTime.parse("2019-10-16T00:02:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "false")
.put("long_column", "-1")
.put("double_column", "-1.0")
Expand Down Expand Up @@ -460,7 +460,7 @@ public void shouldWriteDataSegmentsWithThetaSketchAsInputColumn() throws IOExcep
assertTrue(parsed.containsRow(0));
ImmutableMap<String, Object> dimensions = ImmutableMap.<String, Object>builder()
.put("__time", DateTime.parse("2019-10-16T00:01:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "true")
.put("long_column", "10")
.put("double_column", "100.0")
Expand All @@ -471,7 +471,7 @@ public void shouldWriteDataSegmentsWithThetaSketchAsInputColumn() throws IOExcep

dimensions = ImmutableMap.<String, Object>builder()
.put("__time", DateTime.parse("2019-10-16T00:02:00Z"))
.put("string_date_column", "2019-10-16 00:00:00")
.put("string_date_column", "2019-10-16")
.put("boolean_column", "false")
.put("long_column", "-1")
.put("double_column", "-1.0")
Expand Down
16 changes: 1 addition & 15 deletions src/test/java/com/rovio/ingest/DruidSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,7 @@ public void failForNullTimestamp() {
.options(options)
.save());
assertThat(thrown.getCause().getMessage(), containsString(
"java.lang.IllegalStateException: Null value for column 'date'."));
}

@Test
public void failWhenPartitionedByNonTimeStampColumn() {
Dataset<Row> dataset = loadCsv(spark, "/data.csv").repartition(column("country"));
dataset.show(false);
SparkException thrown = assertThrows(SparkException.class,
() -> dataset.write()
.format("com.rovio.ingest.DruidSource")
.mode(SaveMode.Overwrite)
.options(options)
.save());
assertThat(thrown.getCause().getMessage(), containsString(
"java.sql.BatchUpdateException: Duplicate entry"));
"Null value for column 'date'."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.rovio.ingest.model.DbType
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.junit.JUnitRunner
import org.scalatest.flatspec.AnyFlatSpec

// must define classes outside of the actual test methods, otherwise spark can't find them
case class KpiRow(date: String, country: String, dau: Integer, revenue: Double, is_segmented: Boolean)
Expand All @@ -31,7 +33,7 @@ case class ExpectedRow(`__PARTITION_TIME__`: String, `__PARTITION_NUM__`: Intege

// This is needed for mvn test. It wouldn't find this test otherwise.
@RunWith(classOf[JUnitRunner])
class DruidDatasetExtensionsSpec extends FlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterEach {
class DruidDatasetExtensionsSpec extends AnyFlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterEach {

before {
DruidSourceBaseTest.MYSQL.start()
Expand Down

0 comments on commit ca4e3fd

Please sign in to comment.