diff --git a/apps/spark-3.5/build.gradle b/apps/spark-3.5/build.gradle new file mode 100644 index 00000000..6dad002a --- /dev/null +++ b/apps/spark-3.5/build.gradle @@ -0,0 +1,68 @@ +plugins { + // dependency in apps-spark-conventions + id 'com.github.johnrengelman.shadow' version '7.1.2' + id 'openhouse.apps-spark-conventions' +} + +ext { + log4jVersion = "2.20.0" + + sparkVersion = '3.5.2' + icebergVersion = '1.5.2' + sparkVersionSuffix = "3.5" + openhouseSparkRuntimeModule = ":integrations:spark:spark-${sparkVersionSuffix}:openhouse-spark-3.5-runtime_2.12" + icebergSparkRuntimeModule = "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:${icebergVersion}" + tablesTestFixturesModule = ":tables-test-fixtures:tables-test-fixtures-iceberg-1.5_2.12" +} + +dependencies { + compileOnly (project(path: openhouseSparkRuntimeModule)) { + exclude group: 'io.netty' + exclude group: 'org.apache.hadoop', module: 'hadoop-common' + exclude group: 'org.apache.hadoop', module: 'hadoop-client' + } + + implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}" + implementation(project(':libs:datalayout')) { + exclude group: 'org.apache.iceberg', module: 'iceberg-spark-runtime-3.1_2.12' + } + implementation("org.apache.iceberg:iceberg-bundled-guava:${icebergVersion}") + implementation("org.apache.iceberg:iceberg-data:${icebergVersion}") + implementation("org.apache.iceberg:iceberg-core:${icebergVersion}") + implementation("org.apache.iceberg:iceberg-common:${icebergVersion}") + implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) { + exclude group: 'io.netty' + exclude group: 'org.apache.hadoop', module: 'hadoop-common' + exclude group: 'org.apache.hadoop', module: 'hadoop-client' + } + implementation ('org.apache.spark:spark-sql_2.12:' + sparkVersion) { + exclude group: 'io.netty' + exclude group: 'org.apache.hadoop', module: 'hadoop-common' + exclude group: 'org.apache.hadoop', module: 'hadoop-client' + } + implementation (icebergSparkRuntimeModule) { + exclude group: 'io.netty' + } + + testImplementation (project(path: openhouseSparkRuntimeModule, configuration: 'shadow')) { + exclude group: 'io.netty' + exclude group: 'org.apache.hadoop', module: 'hadoop-common' + exclude group: 'org.apache.hadoop', module: 'hadoop-client' + } + testImplementation(project(tablesTestFixturesModule)) { + exclude group: "io.netty" + } +} + +sourceSets { + main { + java { + srcDirs = ['src/main/java', project(':apps:openhouse-spark-apps_2.12').sourceSets.main.java.srcDirs] + } + } + test { + java { + srcDirs = ['src/test/java', project(':apps:openhouse-spark-apps_2.12').sourceSets.test.java.srcDirs] + } + } +} diff --git a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/MinimalSparkMoRTest.java b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/MinimalSparkMoRTest.java new file mode 100644 index 00000000..70b30911 --- /dev/null +++ b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/MinimalSparkMoRTest.java @@ -0,0 +1,224 @@ +package com.linkedin.openhouse.catalog.e2e; + +import static org.apache.iceberg.types.Types.NestedField.*; + +import com.linkedin.openhouse.jobs.spark.Operations; +import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.functions; +import org.junit.jupiter.api.Assertions; + +public class MinimalSparkMoRTest extends OpenHouseSparkITest { + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + + private static final DataFile FILE_A = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + private static final DataFile FILE_B = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=1") + .withRecordCount(1) + .build(); + + private void setupTableWithTwoDataFiles(String tableName, Operations ops) { + final int numInserts = 2; + prepareTable(ops, tableName, false); + populateTable(ops, tableName, numInserts); + ops.spark() + .sql( + "ALTER TABLE db.test_data_compaction SET TBLPROPERTIES ('write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'write.delete.distribution-mode'='range');") + .show(); + ops.spark().sql("select * from db.test_data_compaction").drop("summary").show(80, false); + ops.spark() + .sql("select * from db.test_data_compaction.snapshots") + .drop("summary") + .show(80, false); + } + + private void createDeletes(Operations ops) { + ops.spark().sql("DELETE FROM db.test_data_compaction WHERE data = 'v6'").show(); + } + + // @Test + public void testDataCompactionPartialProgressNonPartitionedTable() throws Exception { + final String tableName = "db.test_data_compaction"; + + BiFunction rewriteFunc = + (ops, table) -> + ops.rewriteDataFiles( + table, + 1024 * 1024, // 1MB + 1024, // 1KB + 1024 * 1024 * 2, // 2MB + 2, + 1, + true, + 10); + + try (Operations ops = Operations.withCatalog(getSparkSession(), null)) { + // setupTestTable(); + // // create table with two big datafiles and it creates two delete files + // assertDeleteFilesCreated(); + // + // setupTestTable(); + // // same as above, then run compaction, and two delete files exist + // assertCompactionCanCreateDanglingDeletes(); + // + // setupTestTable(); + // // a file in an old snapshot is expired / removable if it no longer represents the + // underlying data + // // meaning a newer snapshot has overwritten the data + // // so for delete file it means that the equality + // + // + // // cow will create new datafile + // + // // delete file to represent a new snapshot with one row edit + // + // // cow will create an entirely new datafile + // + // // then delete file will be unused, should expire. + // + // + // // then do the same but with compaction, it won't expire + // assertDanglingDeleteFilesCanNeverExpire(); + // + // setupTestTable(); + // assertRewritePositionDeleteRemovesDanglingDeletes(); + // + // setupTestTable(); + // assertEqualityDeletesNotCompactable(); + // + // setupTestTable(); + // assertProcessToCompactEqualityDeletes(); + // + // setupTestTable(); + // assertCompactionCanRemoveDeletes(); + + Table table = ops.getTable(tableName); + // log.info("Loaded table {}, location {}", table.name(), table.location()); + RewriteDataFiles.Result result = rewriteFunc.apply(ops, table); + populateTable(ops, tableName, 3); + Dataset metadataTable = + SparkTableUtil.loadMetadataTable(ops.spark(), table, MetadataTableType.FILES) + .selectExpr("content", "file_path", "file_size_in_bytes") + .dropDuplicates( + "file_path", + "file_size_in_bytes"); // tOdo: can the same file_path have two diff sizes + + // Aggregate counts and sums based on `content` values + Dataset contentStats = + metadataTable + .groupBy("content") + .agg( + functions.count("content").alias("count"), + functions.sum("file_size_in_bytes").alias("total_size")); + + // Collect the result as a map for quick lookup of counts and sums by content value + Map statsMap = + contentStats.collectAsList().stream() + .collect( + Collectors.toMap( + row -> row.getInt(0), // content value (0, 1, or 2) + row -> row // Row containing count and total_size + )); + // log.info( + // "Added {} data files, rewritten {} data files, rewritten {} bytes", + // result.addedDataFilesCount(), + // result.rewrittenDataFilesCount(), + // result.rewrittenBytesCount()); + Assertions.assertEquals(0, result.addedDataFilesCount()); + Assertions.assertEquals(0, result.rewrittenDataFilesCount()); + + populateTable(ops, tableName, 3); + ops.spark().sql("DELETE FROM db.test_data_compaction WHERE data = 'v6'").show(); + populateTable(ops, tableName, 3); + RewriteDataFiles.Result result2 = rewriteFunc.apply(ops, table); + Assertions.assertEquals(0, result2.addedDataFilesCount()); + Assertions.assertEquals(0, result2.rewrittenDataFilesCount()); + } + } + + private static void prepareTable(Operations ops, String tableName, boolean isPartitioned) { + ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show(); + if (isPartitioned) { + ops.spark() + .sql( + String.format( + "CREATE TABLE %s (data string, ts timestamp) partitioned by (days(ts))", + tableName)) + .show(); + } else { + ops.spark() + .sql(String.format("CREATE TABLE %s (data string, ts timestamp)", tableName)) + .show(); + } + ops.spark().sql(String.format("DESCRIBE %s", tableName)).show(); + } + + private static List getSnapshotIds(Operations ops, String tableName) { + // log.info("Getting snapshot Ids"); + List snapshots = + ops.spark().sql(String.format("SELECT * FROM %s.snapshots", tableName)).collectAsList(); + // snapshots.forEach(s -> log.info(s.toString())); + return snapshots.stream() + .map(r -> r.getLong(r.fieldIndex("snapshot_id"))) + .collect(Collectors.toList()); + } + + private static void populateTable(Operations ops, String tableName, int numRows) { + populateTable(ops, tableName, numRows, 0); + } + + private static void populateTable(Operations ops, String tableName, int numRows, int dayLag) { + populateTable(ops, tableName, numRows, dayLag, System.currentTimeMillis() / 1000); + } + + private static void populateTable( + Operations ops, String tableName, int numRows, int dayLag, long timestampSeconds) { + String timestampEntry = + String.format("date_sub(from_unixtime(%d), %d)", timestampSeconds, dayLag); + StringBuilder valuesBatch = new StringBuilder(); + for (int row = 0; row < numRows; ++row) { + valuesBatch.setLength(0); // Clear the batch for each iteration + for (int i = 0; i < 10; i++) { + valuesBatch.append(String.format("('v%d', %s)", row + i, timestampEntry)); + if (i < 9) { + valuesBatch.append(", "); + } + } + + ops.spark() + .sql(String.format("INSERT INTO %s VALUES %s", tableName, valuesBatch.toString())) + .show(); + + // ops.spark().sql(String.format("DELETE FROM db.test_data_compaction WHERE data = 'v%d'", + // row)).show(); + } + } +} diff --git a/apps/spark/build.gradle b/apps/spark/build.gradle index ec6781ad..c0bf4393 100644 --- a/apps/spark/build.gradle +++ b/apps/spark/build.gradle @@ -1,38 +1,49 @@ plugins { - id 'openhouse.java-conventions' - id 'openhouse.hadoop-conventions' - id 'openhouse.iceberg-conventions-1.2' - id 'openhouse.maven-publish' + // dependency in apps-spark-conventions id 'com.github.johnrengelman.shadow' version '7.1.2' -} - -configurations { - // Excluding these libraries avoids competing implementations for LoggerFactory - // Standardizing on slf4j + log4j2 as implementation. - all*.exclude module : 'spring-boot-starter-logging' - all*.exclude module : 'logback-classic' - shadow.extendsFrom implementation + id 'openhouse.apps-spark-conventions' } ext { + log4jVersion = "2.18.0" + sparkVersion = '3.1.1' icebergVersion = '1.2.0' + sparkVersionSuffix = "3.1" + openhouseSparkRuntimeModule = ":integrations:spark:spark-${sparkVersionSuffix}:openhouse-spark-runtime_2.12" + icebergSparkRuntimeModule = "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:${icebergVersion}" + tablesTestFixturesModule = ":tables-test-fixtures:tables-test-fixtures_2.12" } dependencies { - implementation project(':iceberg:openhouse:internalcatalog') - implementation project(':client:hts') - implementation project(':client:jobsclient') - implementation project(':client:tableclient') - implementation project(':client:secureclient') - implementation project(':services:common') - implementation project(':cluster:storage') - compileOnly (project(path: ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12')) { + compileOnly (project(path: openhouseSparkRuntimeModule)) { exclude group: 'io.netty' exclude group: 'org.apache.hadoop', module: 'hadoop-common' exclude group: 'org.apache.hadoop', module: 'hadoop-client' } + + implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}" implementation project(':libs:datalayout') + implementation("org.apache.iceberg:iceberg-bundled-guava") { + version { + strictly("${icebergVersion}") + } + } + implementation("org.apache.iceberg:iceberg-data") { + version { + strictly("${icebergVersion}") + } + } + implementation("org.apache.iceberg:iceberg-core") { + version { + strictly("${icebergVersion}") + } + } + implementation("org.apache.iceberg:iceberg-common") { + version { + strictly("${icebergVersion}") + } + } implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) { exclude group: 'io.netty' exclude group: 'org.apache.hadoop', module: 'hadoop-common' @@ -43,116 +54,16 @@ dependencies { exclude group: 'org.apache.hadoop', module: 'hadoop-common' exclude group: 'org.apache.hadoop', module: 'hadoop-client' } - implementation ('org.apache.hadoop:hadoop-common:2.10.0') { - exclude group: 'io.netty' - exclude group: 'org.apache.curator', module: 'curator-client' - exclude group: 'org.apache.commons', module: 'commons-lang3' - - } - implementation ('org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:' + icebergVersion) { + implementation (icebergSparkRuntimeModule) { exclude group: 'io.netty' } - implementation 'commons-cli:commons-cli:1.5.0' - implementation 'org.reflections:reflections:0.10.2' - implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' - implementation 'io.netty:netty-resolver-dns-native-macos:4.1.75.Final:osx-x86_64' - implementation 'org.springframework.retry:spring-retry:1.3.3' - implementation 'org.apache.logging.log4j:log4j-core:2.18.0' - implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0' - implementation 'org.apache.logging.log4j:log4j-api:2.18.0' - implementation 'com.fasterxml.jackson.core:jackson-core:2.13.3' - implementation 'com.fasterxml.jackson.core:jackson-annotations:2.13.3' - implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.3' - implementation 'com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.13.3' - implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.3' - implementation 'com.fasterxml.woodstox:woodstox-core:6.2.7' - // open telemetry related classed. Latest Okhttp version is 4.10.0, pinning to 4.9.3 to avoid dependency issues - implementation 'com.squareup.okhttp3:okhttp:' + ok_http3_version - implementation 'com.squareup.okhttp:okhttp:2.7.5' - implementation 'com.squareup.okio:okio:3.2.0' - implementation 'com.squareup.okio:okio-jvm:3.2.0' - implementation 'org.jetbrains.kotlin:kotlin-stdlib:2.0.20' - implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk7:2.0.20' - implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:2.0.20' - implementation 'io.opentelemetry:opentelemetry-api:1.18.0' - implementation 'io.opentelemetry:opentelemetry-exporter-otlp:1.18.0' - implementation 'io.opentelemetry:opentelemetry-sdk:1.18.0' - implementation 'io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.14.0-alpha' - implementation 'io.opentelemetry:opentelemetry-semconv:1.14.0-alpha' - implementation 'org.apache.commons:commons-lang3:3.12.0' - - testImplementation (project(path: ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12', configuration: 'shadow')) { + testImplementation (project(path: openhouseSparkRuntimeModule, configuration: 'shadow')) { exclude group: 'io.netty' exclude group: 'org.apache.hadoop', module: 'hadoop-common' exclude group: 'org.apache.hadoop', module: 'hadoop-client' } - // Otherwise throws the error: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0 - testImplementation 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.13.1' - testImplementation 'org.mockito:mockito-inline:4.11.0' - testImplementation 'org.powermock:powermock-module-junit4:2.0.9' - testImplementation 'org.powermock:powermock-api-mockito2:2.0.9' - testImplementation(project(':tables-test-fixtures:tables-test-fixtures_2.12')) { + testImplementation(project(tablesTestFixturesModule)) { exclude group: "io.netty" } - testRuntimeOnly("org.eclipse.jetty:jetty-server:11.0.2") - -} - -// Need spark runtime to be built before this test for this project to run successfully because compileOnly and -// testImplementation dependencies are not triggering it. -test.dependsOn ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:build' - -shadowJar { - zip64 = true - archiveClassifier.set('uber') - mergeServiceFiles() // merge META-INF/services configuration files to allow FileSystem to be discovered - dependencies { - // unnecessary dependencies from iceberg-spark3-runtime - exclude(dependency('org.apache.iceberg::')) - // this dependency will be provided at runtime - exclude(dependency('org.apache.iceberg:iceberg-spark3-runtime::')) - - relocate('io', 'openhouse.relocated.io') { - exclude 'io.netty.resolver.dns.macos.**' // dynamically loaded classes - } - relocate('com', 'openhouse.relocated.com') { - exclude 'com.linkedin.openhouse.**' // don't want our classes to be shaded - exclude 'com.ctc.wstx.**' // dynamically loaded classes - exclude 'com.squareup.**' - exclude '%regex[com.sun.security.*]' - } - relocate 'okhttp3', 'openhouse.relocated.okhttp3' - relocate 'okio', 'openhouse.relocated.okio' - relocate 'reactor', 'openhouse.relocated.reactor' - relocate('org','openhouse.relocated.org') { - exclude 'org.apache.iceberg.**' // these are runtime classes, we shouldn't relocate them unless we shade them - exclude '%regex[org.apache.hadoop.*]' // these are runtime classes too, use regex to exclude string literals - exclude 'org.apache.commons.**' // these are part of method signatures reused in sub-classes - exclude 'org.apache.avro.**' // these runtime classes too - exclude 'org.apache.spark.**' // these runtime classes too - exclude 'org.springframework.**' // otherwise fails with ClassNotFoundException: org.springframework.http.codec.ClientCodecConfigurer - exclude 'org.log4j.**' - exclude 'org.slf4j.**' - exclude 'org.apache.log4j.**' - exclude 'org.apache.logging.**' // otherwise fails with add log4j-core to the classpath - exclude 'org.xml.sax.**' // otherwise fails with NoClassDefFoundError: org/xml/sax/ContentHandler - exclude '%regex[org.w3c.*]' - exclude '%regex[org.ietf.*]' - } - } -} - -// https://github.com/johnrengelman/shadow/issues/335 -// By default shadow doesn't configure the build task to depend on the shadowJar task. -tasks.build.dependsOn tasks.shadowJar - -test { - if (JavaVersion.current() >= JavaVersion.VERSION_1_9) { - jvmArgs \ - '--add-opens=java.base/java.nio=ALL-UNNAMED', - '--add-exports=java.base/sun.nio.ch=ALL-UNNAMED', - '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED', - '--add-exports=java.base/sun.util.calendar=ALL-UNNAMED' - } -} +} \ No newline at end of file diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index a4c955b8..5c881d2d 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -1,6 +1,5 @@ package com.linkedin.openhouse.jobs.spark; -import avro.shaded.com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import com.linkedin.openhouse.jobs.util.SparkJobUtil; @@ -29,6 +28,7 @@ import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.spark.sql.SparkSession; import scala.collection.JavaConverters; diff --git a/build.gradle b/build.gradle index 6bcad697..516a504e 100644 --- a/build.gradle +++ b/build.gradle @@ -60,8 +60,6 @@ allprojects { if (it.path != ':integrations:spark:spark-3.5:openhouse-spark-3.5-itest') { configurations.all { resolutionStrategy { - force 'com.fasterxml.jackson:jackson-bom:2.13.4' - force 'com.fasterxml.jackson.core:jackson-databind:2.13.4' force 'org.apache.orc:orc-core:1.8.3' force 'com.google.guava:guava:31.1-jre' } @@ -128,5 +126,4 @@ tasks.register('CopyGitHooksTask', Copy) { println 'Make the git hook available in .git/hooks directory.' from file('scripts/git-hooks') into file('.git/hooks/') -} - +} \ No newline at end of file diff --git a/buildSrc/src/main/groovy/openhouse.apps-spark-conventions.gradle b/buildSrc/src/main/groovy/openhouse.apps-spark-conventions.gradle new file mode 100644 index 00000000..864c4080 --- /dev/null +++ b/buildSrc/src/main/groovy/openhouse.apps-spark-conventions.gradle @@ -0,0 +1,123 @@ +plugins { + id 'openhouse.java-conventions' + id 'openhouse.hadoop-conventions' + id 'openhouse.maven-publish' + id 'com.github.johnrengelman.shadow' +} + +configurations { + // Excluding these libraries avoids competing implementations for LoggerFactory + // Standardizing on slf4j + log4j2 as implementation. + all*.exclude module : 'spring-boot-starter-logging' + all*.exclude module : 'logback-classic' + shadow.extendsFrom implementation +} + +dependencies { + implementation project(':iceberg:openhouse:internalcatalog') + implementation project(':client:hts') + implementation project(':client:jobsclient') + implementation project(':client:tableclient') + implementation project(':client:secureclient') + implementation project(':services:common') + implementation project(':cluster:storage') + + implementation("org.apache.iceberg:iceberg-bundled-guava") + implementation("org.apache.iceberg:iceberg-data") + implementation("org.apache.iceberg:iceberg-core") + implementation("org.apache.iceberg:iceberg-common") + + implementation ('org.apache.hadoop:hadoop-common:2.10.0') { + exclude group: 'io.netty' + exclude group: 'org.apache.curator', module: 'curator-client' + exclude group: 'org.apache.commons', module: 'commons-lang3' + } + implementation 'commons-cli:commons-cli:1.5.0' + implementation 'org.reflections:reflections:0.10.2' + implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'io.netty:netty-resolver-dns-native-macos:4.1.75.Final:osx-x86_64' + implementation 'org.springframework.retry:spring-retry:1.3.3' + implementation "org.apache.logging.log4j:log4j-slf4j-impl" + implementation 'com.fasterxml.woodstox:woodstox-core:6.2.7' + implementation('org.testcontainers:testcontainers:1.19.8') + + // open telemetry related classed. Latest Okhttp version is 4.10.0, pinning to 4.9.3 to avoid dependency issues + implementation 'com.squareup.okhttp3:okhttp:' + ok_http3_version + implementation 'com.squareup.okhttp:okhttp:2.7.5' + implementation 'com.squareup.okio:okio:3.2.0' + implementation 'com.squareup.okio:okio-jvm:3.2.0' + implementation 'org.jetbrains.kotlin:kotlin-stdlib:2.0.20' + implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk7:2.0.20' + implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:2.0.20' + implementation 'io.opentelemetry:opentelemetry-api:1.18.0' + implementation 'io.opentelemetry:opentelemetry-exporter-otlp:1.18.0' + implementation 'io.opentelemetry:opentelemetry-sdk:1.18.0' + implementation 'io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.14.0-alpha' + implementation 'io.opentelemetry:opentelemetry-semconv:1.14.0-alpha' + implementation 'org.apache.commons:commons-lang3:3.12.0' + + // Otherwise throws the error: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0 + testImplementation 'org.mockito:mockito-inline:4.11.0' + testImplementation 'org.powermock:powermock-module-junit4:2.0.9' + testImplementation 'org.powermock:powermock-api-mockito2:2.0.9' + testRuntimeOnly("org.eclipse.jetty:jetty-server:11.0.2") + +} + +shadowJar { + zip64 = true + archiveClassifier.set('uber') + mergeServiceFiles() // merge META-INF/services configuration files to allow FileSystem to be discovered + dependencies { + // unnecessary dependencies from iceberg-spark3-runtime + exclude(dependency('org.apache.iceberg::')) + // this dependency will be provided at runtime + exclude(dependency('org.apache.iceberg:iceberg-spark3-runtime::')) + + // can possibly be removed with gradle8 and shadow upgrade + // https://github.com/GradleUp/shadow/issues/877#issuecomment-2472922043 + exclude 'META-INF/versions/19/' + + relocate('io', 'openhouse.relocated.io') { + exclude 'io.netty.resolver.dns.macos.**' // dynamically loaded classes + } + relocate('com', 'openhouse.relocated.com') { + exclude 'com.linkedin.openhouse.**' // don't want our classes to be shaded + exclude 'com.ctc.wstx.**' // dynamically loaded classes + exclude 'com.squareup.**' + exclude '%regex[com.sun.security.*]' + } + relocate 'okhttp3', 'openhouse.relocated.okhttp3' + relocate 'okio', 'openhouse.relocated.okio' + relocate 'reactor', 'openhouse.relocated.reactor' + relocate('org','openhouse.relocated.org') { + exclude 'org.apache.iceberg.**' // these are runtime classes, we shouldn't relocate them unless we shade them + exclude '%regex[org.apache.hadoop.*]' // these are runtime classes too, use regex to exclude string literals + exclude 'org.apache.commons.**' // these are part of method signatures reused in sub-classes + exclude 'org.apache.avro.**' // these runtime classes too + exclude 'org.apache.spark.**' // these runtime classes too + exclude 'org.springframework.**' // otherwise fails with ClassNotFoundException: org.springframework.http.codec.ClientCodecConfigurer + exclude 'org.log4j.**' + exclude 'org.slf4j.**' + exclude 'org.apache.log4j.**' + exclude 'org.apache.logging.**' // otherwise fails with add log4j-core to the classpath + exclude 'org.xml.sax.**' // otherwise fails with NoClassDefFoundError: org/xml/sax/ContentHandler + exclude '%regex[org.w3c.*]' + exclude '%regex[org.ietf.*]' + } + } +} + +// https://github.com/johnrengelman/shadow/issues/335 +// By default shadow doesn't configure the build task to depend on the shadowJar task. +tasks.build.dependsOn tasks.shadowJar + +test { + if (JavaVersion.current() >= JavaVersion.VERSION_1_9) { + jvmArgs \ + '--add-opens=java.base/java.nio=ALL-UNNAMED', + '--add-exports=java.base/sun.nio.ch=ALL-UNNAMED', + '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED', + '--add-exports=java.base/sun.util.calendar=ALL-UNNAMED' + } +} diff --git a/integrations/java-iceberg-1.5/openhouse-java-itest/build.gradle b/integrations/java-iceberg-1.5/openhouse-java-itest/build.gradle new file mode 100644 index 00000000..7cbd7e38 --- /dev/null +++ b/integrations/java-iceberg-1.5/openhouse-java-itest/build.gradle @@ -0,0 +1,24 @@ +plugins { + id 'openhouse.java-minimal-conventions' + id 'openhouse.maven-publish' +} + +ext { + icebergVersion = '1.5.2' +} + +sourceSets { + test { + java { + srcDirs += project(':integrations:java:openhouse-java-itest').sourceSets.test.java.srcDirs + } + } +} + +dependencies { + testImplementation(project(path: ':integrations:java-iceberg-1.5:openhouse-java-iceberg-1.5-runtime', configuration: 'shadow')) + + testImplementation "com.squareup.okhttp3:okhttp:4.9.3" + testImplementation "com.squareup.okhttp3:mockwebserver:4.9.3" + testImplementation "org.apache.iceberg:iceberg-bundled-guava:" + icebergVersion +} \ No newline at end of file diff --git a/integrations/java-iceberg-1.5/openhouse-java-runtime/build.gradle b/integrations/java-iceberg-1.5/openhouse-java-runtime/build.gradle new file mode 100644 index 00000000..2c42ab3a --- /dev/null +++ b/integrations/java-iceberg-1.5/openhouse-java-runtime/build.gradle @@ -0,0 +1,94 @@ +plugins { + id 'openhouse.java-minimal-conventions' + id 'openhouse.maven-publish' + id 'com.github.johnrengelman.shadow' version '7.1.2' +} + +configurations { + fatJarPackagedDependencies { + // Following exclusions are not needed during runtime + // and often cause conflict with existing classpath. + exclude(group: 'org.slf4j') // logging libraries + exclude(group: 'org.apache.log4j') + exclude(group: 'org.apache.logging.log4j') + exclude(group: 'org.mapstruct') + exclude(group: 'io.micrometer') // not used in client + exclude(group: 'ch.qos.logback') + } + shadow.extendsFrom implementation +} + +ext { + icebergVersion = '1.5.2' + sparkVersion = '3.5.2' + springVersion = '2.7.8' + hadoopVersion = '2.10.0' +} + +sourceSets { + main { + java { + srcDirs += project(':integrations:java:openhouse-java-runtime').sourceSets.main.java.srcDirs + } + resources { + srcDirs += project(':integrations:java:openhouse-java-runtime').sourceSets.main.resources.srcDirs + } + } +} + +dependencies { + compileOnly project(':client:secureclient') + compileOnly project(':client:tableclient') + compileOnly("org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:" + icebergVersion) + compileOnly ("org.springframework.boot:spring-boot-starter-webflux:" + springVersion) + + implementation 'org.apache.commons:commons-lang3:3.12.0' + fatJarPackagedDependencies(project(':client:secureclient')) + implementation("org.apache.iceberg:iceberg-core:" + icebergVersion) + implementation("org.apache.hadoop:hadoop-client:" + hadoopVersion) { + exclude group: 'junit', module: 'junit' + exclude group: 'javax', module: 'servlet-api' + exclude group: "io.netty" + exclude group: 'com.zaxxer', module: 'HikariCP-java7' + exclude group: 'org.apache.commons', module: 'commons-lang3' + } +} + +// Following codeblock completely relocates contents of the jar +// except for source code written in the module. As a result, +// we remove chances of classpath conflicts during runtime/compiletime. +shadowJar { + dependencies { + exclude("org/springframework/http/codec/CodecConfigurer.properties") + exclude("javax/**") + exclude("okio/**") + exclude("kotlin/**") + + relocate ('io.', 'com.linkedin.openhouse.relocated.io.') + relocate('org','com.linkedin.openhouse.relocated.org') { + exclude 'org.xml.sax.**' + exclude 'org.apache.hadoop.**' + exclude 'org.apache.iceberg.**' + exclude 'org.slf4j.**' + } + relocate('reactor', 'com.linkedin.openhouse.relocated.reactor') + relocate('com.linkedin.openhouse.jobs.client', 'com.linkedin.openhouse.gen.job.client') + relocate('com.linkedin.openhouse.tables.client', 'com.linkedin.openhouse.gen.tables.client') + relocate('com.linkedin.openhouse.client.ssl', 'com.linkedin.openhouse.gen.client.ssl') + relocate('com.linkedin.openhouse.housetables.client', 'com.linkedin.openhouse.gen.housetables.client') + relocate('com', 'com.linkedin.openhouse.relocated.com') { + exclude 'com.linkedin.openhouse.**' + } + } + // Jackson file not needed in newer version + exclude 'module-info.class' + // service file not needed for client + exclude 'log4j2.springboot' + + configurations = [project.configurations.fatJarPackagedDependencies] + mergeServiceFiles() + archiveClassifier.set('uber') + zip64 true +} + +jar.enabled=true diff --git a/integrations/spark-3.5/openhouse-spark-itest/build.gradle b/integrations/spark-3.5/openhouse-spark-itest/build.gradle new file mode 100644 index 00000000..13a2b161 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/build.gradle @@ -0,0 +1,84 @@ +plugins { + id 'openhouse.java-minimal-conventions' + id 'openhouse.maven-publish' +} + +ext { + sparkVersion = '3.5.2' +} + +sourceSets { + test { + java { + srcDirs = ['src/test/java', project(':integrations:spark:openhouse-spark-itest').sourceSets.test.java.srcDirs] + exclude '**/PartitionTest.java', '**/AlterTable*.java', '**/CreateTable*.java', '**/DescribeTableTest.java', '**/DropTableTest.java', '**/ShowTablesTest.java', '**/InsertIntoTableTest.java' + } + } +} + +dependencies { + + testImplementation 'com.google.code.gson:gson:2.8.9' + + testImplementation(project(path: ':integrations:spark-3.5:openhouse-spark-3.5-runtime_2.12', configuration: 'shadow')) { + exclude group: 'org.apache.commons', module: 'commons-lang3' + } + + testImplementation("org.apache.spark:spark-sql_2.12:" + sparkVersion){ + // These classes are available from `client-codegen-convention.gradle` + exclude group: "io.netty" + } + + testImplementation project(':tables-test-fixtures-iceberg-1.5_2.12') + testImplementation 'org.junit.platform:junit-platform-runner:1.8.2' + // Required to test /tables mockserver + testImplementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation "com.squareup.okhttp3:okhttp:4.9.3" + testImplementation "com.squareup.okhttp3:mockwebserver:4.9.3" + testImplementation "io.netty:netty-resolver-dns-native-macos:4.1.70.Final:osx-x86_64" +} + +// Adding testing resources from :services:tables module +// this is to avoid duplicating schema.json files used for evolution tests. +sourceSets { + test { + resources { + srcDirs += [ + project(':services:tables').sourceSets.test.resources + ] + } + } +} + +// Specify order of tests to avoid spark session conflicts +// Non-openhouse catalog tests should run before e2e tests start +task statementTest(type: Test) { + filter { + includeTestsMatching 'com.linkedin.openhouse.spark.statementtest.*' + } + jvmArgs \ + '--add-exports=java.base/sun.nio.ch=ALL-UNNAMED' +} + +task catalogTest(type: Test) { + filter { + includeTestsMatching 'com.linkedin.openhouse.spark.catalogtest.*' + } + jvmArgs \ + '--add-exports=java.base/sun.nio.ch=ALL-UNNAMED' +} + +test { + filter { + excludeTestsMatching 'com.linkedin.openhouse.spark.statementtest.*' + excludeTestsMatching 'com.linkedin.openhouse.spark.catalogtest.*' + } + jvmArgs \ + '--add-opens=java.base/java.nio=ALL-UNNAMED', + '--add-exports=java.base/sun.nio.ch=ALL-UNNAMED', + '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED', + '--add-exports=java.base/sun.util.calendar=ALL-UNNAMED' +} + +test.dependsOn statementTest +statementTest.dependsOn catalogTest diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/MockHelpersSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/MockHelpersSpark3_5.java new file mode 100644 index 00000000..f44fc2ef --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/MockHelpersSpark3_5.java @@ -0,0 +1,323 @@ +package com.linkedin.openhouse.spark; + +import static com.linkedin.openhouse.spark.SparkTestBase.*; +import static org.apache.iceberg.CatalogUtil.*; + +import com.linkedin.openhouse.gen.tables.client.api.SnapshotApi; +import com.linkedin.openhouse.gen.tables.client.api.TableApi; +import com.linkedin.openhouse.gen.tables.client.invoker.ApiClient; +import com.linkedin.openhouse.gen.tables.client.model.AclPolicy; +import com.linkedin.openhouse.gen.tables.client.model.ClusteringColumn; +import com.linkedin.openhouse.gen.tables.client.model.GetAclPoliciesResponseBody; +import com.linkedin.openhouse.gen.tables.client.model.GetAllDatabasesResponseBody; +import com.linkedin.openhouse.gen.tables.client.model.GetAllTablesResponseBody; +import com.linkedin.openhouse.gen.tables.client.model.GetDatabaseResponseBody; +import com.linkedin.openhouse.gen.tables.client.model.GetTableResponseBody; +import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec; +import com.linkedin.openhouse.relocated.com.fasterxml.jackson.databind.JsonNode; +import com.linkedin.openhouse.relocated.com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.openhouse.relocated.com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.stream.Collectors; +import lombok.SneakyThrows; +import okhttp3.mockwebserver.MockResponse; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class MockHelpersSpark3_5 { + + private static final ObjectMapper mapper = + ApiClient.createDefaultObjectMapper(ApiClient.createDefaultDateFormat()); + + /** + * Helper method to create {@link GetAllDatabasesResponseBody} from list of {@link + * GetDatabaseResponseBody} + */ + public static GetAllDatabasesResponseBody mockGetAllDatabasesResponseBody( + GetDatabaseResponseBody... drs) { + Map hashmap = new HashMap<>(); + hashmap.put("results", Arrays.asList(drs)); + return mapper.convertValue(hashmap, GetAllDatabasesResponseBody.class); + } + + /** Helper method to create {@link GetDatabaseResponseBody} from table required fields. */ + public static GetDatabaseResponseBody mockGetDatabaseResponseBody( + String databaseId, String clusterId) { + Map hashmap = new HashMap<>(); + hashmap.put("databaseId", databaseId); + hashmap.put("clusterId", clusterId); + return mapper.convertValue(hashmap, GetDatabaseResponseBody.class); + } + + /** + * Helper method to create {@link GetAllTablesResponseBody} from list of {@link + * GetTableResponseBody} + */ + public static GetAllTablesResponseBody mockGetAllTableResponseBody(GetTableResponseBody... trs) { + Map hashmap = new HashMap<>(); + hashmap.put("results", Arrays.asList(trs)); + return mapper.convertValue(hashmap, GetAllTablesResponseBody.class); + } + + /** Helper method to create {@link GetTableResponseBody} from table required fields. */ + public static GetTableResponseBody mockGetTableResponseBody( + String databaseId, + String tableId, + String clusterId, + String tableUri, + String tableUUID, + String tableLocation, + String tableVersion, + String schema, + TimePartitionSpec timePartitionSpec, + List clustering) { + Map hashmap = new HashMap<>(); + hashmap.put("databaseId", databaseId); + hashmap.put("tableId", tableId); + hashmap.put("clusterId", clusterId); + hashmap.put("tableUri", tableUri); + hashmap.put("tableUUID", tableUUID); + hashmap.put("tableLocation", tableLocation); + hashmap.put("tableVersion", tableVersion); + hashmap.put("schema", schema); + hashmap.put("timePartitioning", timePartitionSpec); + hashmap.put("clustering", clustering); + hashmap.put("policies", null); + return mapper.convertValue(hashmap, GetTableResponseBody.class); + } + + /** Helper method to create {@link GetTableResponseBody} from table optional fields. */ + @SneakyThrows + public static GetTableResponseBody decorateResponse( + GetTableResponseBody getTableResponseBody, Map tblProps) { + JsonNode jsonNode = mapper.valueToTree(getTableResponseBody); + ((ObjectNode) jsonNode).put("tableProperties", mapper.convertValue(tblProps, ObjectNode.class)); + return mapper.treeToValue(jsonNode, GetTableResponseBody.class); + } + + public static GetAclPoliciesResponseBody mockGetAclPoliciesResponseBody(AclPolicy... aclPolicy) { + Map hashmap = new HashMap<>(); + hashmap.put("results", Arrays.asList(aclPolicy)); + return mapper.convertValue(hashmap, GetAclPoliciesResponseBody.class); + } + + public static AclPolicy mockAclPolicy(String role, String principal) { + Map hashmap = new HashMap<>(); + hashmap.put("role", role); + hashmap.put("principal", principal); + return mapper.convertValue(hashmap, AclPolicy.class); + } + + /** Helper method to create {@link MockResponse} that plugs in nicely to mockWebServer. */ + @SneakyThrows + public static MockResponse mockResponse(int status, Object jsonObj) { + ; + return new MockResponse() + .setResponseCode(status) + .setBody(mapper.writeValueAsString(jsonObj)) + .addHeader("Content-Type", "application/json"); + } + + /** + * Helper method to get a valid metadata.json path. Provides an option to also insert data into + * that table. + * + * @param tableIdentifier + * @param createData set to true if data needs to be inserted + * @return the metadata_json path for the table + */ + @SneakyThrows + public static String mockTableLocationDefaultSchema( + TableIdentifier tableIdentifier, Boolean createData) { + String tableName = + String.format( + "testhelper.%s.%s", tableIdentifier.namespace().toString(), tableIdentifier.name()); + spark.sql( + String.format( + "CREATE OR REPLACE TABLE %s (col1 string, col2 string) USING iceberg", tableName)); + if (createData) { + spark.sql(String.format("INSERT INTO %s VALUES ('1', 'a'), ('2', 'b')", tableName)); + } + return craftMetadataLocation(tableIdentifier, "testhelper"); + } + + /** + * Helper method to get a valid metadata.json path after running an SQL operation. + * + * @param tableIdentifier + * @param sql sql should have %t as the table identifier, for example: "insert into %t values.." + * @return the metadata_json path for the table after the operation + */ + @SneakyThrows + public static String mockTableLocationAfterOperation( + TableIdentifier tableIdentifier, String sql) { + String tableName = + String.format( + "testhelper.%s.%s", tableIdentifier.namespace().toString(), tableIdentifier.name()); + sql = sql.replace("%t", tableName); + spark.sql(sql); + return craftMetadataLocation(tableIdentifier, "testhelper"); + } + + /** + * Helper method to get a valid metadata.json path. Compare to the method + * com.linkedin.openhouse.spark.MockHelpersSpark3_5#mockTableLocationDefaultSchema(org.apache.iceberg.catalog.TableIdentifier, + * java.lang.Boolean) this method doesn't provide option to load data but provide API to specify + * schema or specify partitionedByString. + * + * @param tableIdentifier + * @param ddlSchema schema of the mocking table. + * @return the metadata_json path for the table + */ + @SneakyThrows + public static String mockTableLocation( + TableIdentifier tableIdentifier, String ddlSchema, String partitionedByString) { + String tableName = + String.format( + "testhelper.%s.%s", tableIdentifier.namespace().toString(), tableIdentifier.name()); + spark.sql( + String.format( + "CREATE TABLE %s (%s) USING iceberg %s", tableName, ddlSchema, partitionedByString)); + return craftMetadataLocation(tableIdentifier, "testhelper"); + } + + public static DataFile createDummyDataFile(String dataPath, PartitionSpec partitionSpec) + throws IOException { + Files.write(Paths.get(dataPath), Lists.newArrayList(), StandardCharsets.UTF_8); + return DataFiles.builder(partitionSpec) + .withPath(dataPath) + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + } + + public static Snapshot mockDummySnapshot( + TableIdentifier tableIdentifier, + String dataPath, + PartitionSpec partitionSpec, + String catalogName) + throws IOException { + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + catalogName, + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, + spark.conf().get("spark.sql.catalog.testhelper.warehouse"), + ICEBERG_CATALOG_TYPE, + "hadoop"), + new Configuration()); + Table table = catalog.loadTable(tableIdentifier); + return table.newAppend().appendFile(createDummyDataFile(dataPath, partitionSpec)).apply(); + } + + private static String craftMetadataLocation(TableIdentifier tableIdentifier, String catalogName) + throws IOException { + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + catalogName, + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, + spark.conf().get("spark.sql.catalog.testhelper.warehouse"), + ICEBERG_CATALOG_TYPE, + "hadoop"), + new Configuration()); + Table table = catalog.loadTable(tableIdentifier); + Path metadataLocation = + Paths.get(((BaseTable) table).operations().current().metadataFileLocation()); + // HadoopCatalog created metadata file has name format v1.metadata.json, which is not compatible + // with BaseMetastoreTableOperations + return Files.copy( + metadataLocation, + metadataLocation.resolveSibling( + new Random().nextInt(Integer.MAX_VALUE) + "-.metadata.json")) + .toString(); + } + + public static String mockTableLocationDefaultSchema(TableIdentifier tableIdentifier) { + return mockTableLocationDefaultSchema(tableIdentifier, false); + } + + /** + * This method converts schema in Iceberg literal format to something that SQL DDL can incorporate + * as part of it. Functionality limitation: IT DOESN'T SUPPORT nested schema. + */ + public static String convertSchemaToDDLComponent(String icebergSchema) { + return SchemaParser.fromJson(icebergSchema).columns().stream() + .map(x -> x.name() + " " + x.type().toString()) + .collect(Collectors.joining(", ")); + } + + /** + * This method converts each top-level field in Iceberg literal format to an array of + * {, , ""} in which the third element is supposed to be document. It is + * being placed with empty string for simplicity of tests. Functionality limitation: IT DOESN'T + * SUPPORT nested schema. + * + *

Spark SQL shows "long" as a "bigint", but their semantics are same, + * + * @see Spark SQL + * datatypes + */ + public static List convertSchemaToFieldArray(String icebergSchema) { + return SchemaParser.fromJson(icebergSchema).columns().stream() + .map( + x -> + x.type().toString().equals("long") + ? Arrays.asList(x.name(), "bigint", null).toArray(new String[3]) + : Arrays.asList(x.name(), x.type().toString(), null).toArray(new String[3])) + .collect(Collectors.toList()); + } + + /** + * Check if the given field is a timestamp type. + * + *

returns empty optional if type is not timestamp. + */ + private static Optional getTimestampType(Schema.Field f) { + if (f.schema().getLogicalType() != null + && f.schema().getLogicalType().getName().contains("timestamp")) { + return Optional.of("timestamp"); + } else { + return Optional.empty(); + } + } + + /** Helper method to get ApiClient for the running mockWebServer Instance. */ + public static TableApi getTableApiClient() { + ApiClient apiClient = new ApiClient(); + apiClient.setBasePath( + String.format("http://%s:%s", mockTableService.getHostName(), mockTableService.getPort())); + return new TableApi(apiClient); + } + + /** Helper method to get ApiClient for the running mockWebServer Instance. */ + public static SnapshotApi getSnapshotApiClient() { + ApiClient apiClient = new ApiClient(); + apiClient.setBasePath( + String.format("http://%s:%s", mockTableService.getHostName(), mockTableService.getPort())); + return new SnapshotApi(apiClient); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/PartitionTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/PartitionTestSpark3_5.java new file mode 100644 index 00000000..6529d1dd --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/PartitionTestSpark3_5.java @@ -0,0 +1,43 @@ +package com.linkedin.openhouse.spark.catalogtest; + +import static org.junit.jupiter.api.Assertions.*; + +import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; + +public class PartitionTestSpark3_5 extends OpenHouseSparkITest { + @Test + public void testCreateTablePartitionedWithNestedColumn2() throws Exception { + try (SparkSession spark = getSparkSession()) { + List transformList = + Arrays.asList("days(time)", "header.time", "truncate(10, header.time)"); + List expectedResult = + Arrays.asList("days(time)", "bigint", "truncate(10, header.time)"); + for (int i = 0; i < transformList.size(); i++) { + String transform = transformList.get(i); + String tableName = + transform + .replaceAll("\\.", "_") + .replaceAll("\\(", "_") + .replaceAll("\\)", "") + .replaceAll(", ", "_"); + spark.sql( + String.format( + "CREATE TABLE openhouse.d1.%s (time timestamp, header struct) partitioned by (%s)", + tableName, transform)); + // verify that partition spec is correct + List description = + spark.sql(String.format("DESCRIBE TABLE openhouse.d1.%s", tableName)) + .select("data_type").collectAsList().stream() + .map(row -> row.getString(0)) + .collect(Collectors.toList()); + assertTrue(description.contains(expectedResult.get(i))); + spark.sql(String.format("DROP TABLE openhouse.d1.%s", tableName)); + } + } + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/AlterTableSchemaTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/AlterTableSchemaTestSpark3_5.java new file mode 100644 index 00000000..a1cf3f58 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/AlterTableSchemaTestSpark3_5.java @@ -0,0 +1,56 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.linkedin.openhouse.spark.ResourceIoHelper; +import com.linkedin.openhouse.spark.SparkTestBase; +import lombok.SneakyThrows; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class AlterTableSchemaTestSpark3_5 { + + @SneakyThrows + @Test + public void testAlterTableUpdateSchema() { + String mockTableLocation = + mockTableLocationDefaultSchema(TableIdentifier.of("dbAlterS", "tb1")); + Object existingTable = + mockGetTableResponseBody( + "dbAlterS", + "tb1", + "c1", + "dbAlterS.tb1.c1", + "UUID", + mockTableLocation, + "v1", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue( // doCommit() + mockResponse( + 200, + mockGetTableResponseBody( + "dbAlterS", + "tb1", + "c1", + "dbAlterS.tb1.c1", + "UUID", + mockTableLocation, + "v1", + ResourceIoHelper.getSchemaJsonFromResource("evolved_dummy_healthy_schema.json"), + null, + null))); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> spark.sql("ALTER TABLE openhouse.dbAlterS.tb1 ADD columns (favorite_number int)")); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/AlterTableTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/AlterTableTestSpark3_5.java new file mode 100644 index 00000000..ad127d05 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/AlterTableTestSpark3_5.java @@ -0,0 +1,81 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.linkedin.openhouse.gen.tables.client.model.GetTableResponseBody; +import com.linkedin.openhouse.spark.SparkTestBase; +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class AlterTableTestSpark3_5 { + + @Test + public void testSetTableProps() { + final String key = "key"; + final String value = "value"; + + GetTableResponseBody existingTable = + mockGetTableResponseBody( + "dbAlter", + "tb1", + "c1", + "dbAlter.tb1.c1", + "UUID", + mockTableLocationDefaultSchema(TableIdentifier.of("dbAlter", "tb1")), + "v1", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + + Map tblProps = new HashMap<>(); + tblProps.put(key, value); + mockTableService.enqueue( + mockResponse(200, decorateResponse(existingTable, tblProps))); // doCommit() + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> + spark.sql( + String.format( + "ALTER TABLE openhouse.dbAlter.tb1 SET TBLPROPERTIES('%s'='%s')", key, value))); + } + + @Test + public void testUnsetTableProps() { + final String key = "key"; + final String value = "value"; + Map tblProps = new HashMap<>(); + tblProps.put(key, value); + + GetTableResponseBody existingTable = + mockGetTableResponseBody( + "dbUnset", + "tb1", + "c1", + "dbUnset.tb1.c1", + "UUID", + mockTableLocationDefaultSchema(TableIdentifier.of("dbUnset", "tb1")), + "v1", + baseSchema, + null, + null); + GetTableResponseBody decoratedTable = decorateResponse(existingTable, tblProps); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue(mockResponse(200, decoratedTable)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> + spark.sql( + String.format("ALTER TABLE openhouse.dbUnset.tb1 UNSET TBLPROPERTIES('%s')", key))); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTablePartitionedTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTablePartitionedTestSpark3_5.java new file mode 100644 index 00000000..0514ad2b --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTablePartitionedTestSpark3_5.java @@ -0,0 +1,203 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.google.common.collect.ImmutableList; +import com.linkedin.openhouse.gen.tables.client.model.GetTableResponseBody; +import com.linkedin.openhouse.spark.SparkTestBase; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class CreateTablePartitionedTestSpark3_5 { + + @Test + public void testSimpleCreateTimePartitionAndClusteredTable() { + String tbName = "tbpartitionedclustered"; + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + GetTableResponseBody getTableResponseBody = + mockGetTableResponseBody( + "dbCreate", + tbName, + "c1", + "dbCreate.tbpartitionedclustered.c1", + "UUID", + mockTableLocation( + TableIdentifier.of("dbDesc", tbName), + convertSchemaToDDLComponent(baseSchema), + "PARTITIONED BY (days(timestampCol), name, id, count)"), + "v1", + baseSchema, + null, + null); + + mockTableService.enqueue(mockResponse(201, getTableResponseBody)); // doCommit() + mockTableService.enqueue(mockResponse(200, getTableResponseBody)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> + spark.sql( + "CREATE TABLE openhouse.dbCreate.$TB_NAME ($SCHEMA) USING ICEBERG PARTITIONED BY (days(timestampCol), name, id, count)" + .replace("$TB_NAME", tbName) + .replace("$SCHEMA", convertSchemaToDDLComponent(baseSchema)))); + } + + @Test + public void testCreateTimePartitionAndTransformClusteredTable() { + String tbName = "tbpartitionedtransformclustered"; + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + GetTableResponseBody getTableResponseBody = + mockGetTableResponseBody( + "dbCreate", + tbName, + "c1", + "dbCreate.tbpartitionedclustered.c1", + "UUID", + mockTableLocation( + TableIdentifier.of("dbDesc", tbName), + convertSchemaToDDLComponent(baseSchema), + "PARTITIONED BY (days(timestampCol), truncate(100, name))"), + "v1", + baseSchema, + null, + null); + + mockTableService.enqueue(mockResponse(201, getTableResponseBody)); // doCommit() + mockTableService.enqueue(mockResponse(200, getTableResponseBody)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> + spark.sql( + "CREATE TABLE openhouse.dbCreate.$TB_NAME ($SCHEMA) USING ICEBERG PARTITIONED BY (days(timestampCol), truncate(100, name))" + .replace("$TB_NAME", tbName) + .replace("$SCHEMA", convertSchemaToDDLComponent(baseSchema)))); + } + + @Test + public void testCreateTimePartitionedTableSuccessful() { + for (String transform : ImmutableList.of("days", "months", "hours", "years")) { + String tbName = "tbpartitioned" + transform; + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + GetTableResponseBody getTableResponseBody = + mockGetTableResponseBody( + "dbCreate", + tbName, + "c1", + "dbCreate.tbpartitioned.c1", + "UUID", + mockTableLocation( + TableIdentifier.of("dbDesc", tbName), + convertSchemaToDDLComponent(baseSchema), + String.format("PARTITIONED BY (%s(timestampCol))", transform)), + "v1", + baseSchema, + null, + null); + + mockTableService.enqueue(mockResponse(201, getTableResponseBody)); // doCommit() + mockTableService.enqueue(mockResponse(200, getTableResponseBody)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> + spark.sql( + "CREATE TABLE openhouse.dbCreate.$TB_NAME ($SCHEMA) PARTITIONED BY ($TRANSFORM(timestampCol))" + .replace("$TB_NAME", tbName) + .replace("$SCHEMA", convertSchemaToDDLComponent(baseSchema)) + .replace("$TRANSFORM", transform))); + } + } + + @Test + public void testCreateStringClusteringTableSuccessful() { + for (String transform : + ImmutableList.of( + "identity(name)", "name", "identity(count)", "count", "truncate(10, timeLong)")) { + String tbName = + "tbclustered_" + transform.replace('(', '_').replace(')', '_').replace(", ", "_"); + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + GetTableResponseBody getTableResponseBody = + mockGetTableResponseBody( + "dbCreate", + tbName, + "c1", + "dbCreate.tbpartitioned.c1", + "UUID", + mockTableLocation( + TableIdentifier.of("dbDesc", tbName), + convertSchemaToDDLComponent(baseSchema), + String.format("PARTITIONED BY (%s)", transform)), + "v1", + baseSchema, + null, + null); + + mockTableService.enqueue(mockResponse(201, getTableResponseBody)); // doCommit() + mockTableService.enqueue(mockResponse(200, getTableResponseBody)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> + spark.sql( + "CREATE TABLE openhouse.dbCreate.$TB_NAME ($SCHEMA) PARTITIONED BY ($TRANSFORM)" + .replace("$TB_NAME", tbName) + .replace("$SCHEMA", convertSchemaToDDLComponent(baseSchema)) + .replace("$TRANSFORM", transform))); + } + } + + @Test + public void testCreatePartitionedTableUnsupported() { + for (String transform : ImmutableList.of("bucket(2, name)", "bucket(2, count)")) { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + spark.sql( + "CREATE TABLE openhouse.dbCreate.tb_bad_partitioned ($SCHEMA) PARTITIONED BY ($TRANSFORM)" + .replace("$SCHEMA", convertSchemaToDDLComponent(baseSchema)) + .replace("$TRANSFORM", transform))); + Assertions.assertTrue( + exception + .getMessage() + .contains( + "please provide one of the following transforms (hour,day,month,year), for example: PARTITIONED BY category")); + } + } + + @Test + public void testCreatePartitionedTableMultipleTimePartitioning() { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + spark.sql( + "CREATE TABLE openhouse.dbCreate.tb_bad_partitioned (timestampCol1 timestamp, timestampCol2 timestamp) PARTITIONED BY (days(timestampCol1), months(timestampCol2))")); + Assertions.assertTrue( + exception + .getMessage() + .contains( + "OpenHouse only supports 1 timestamp-based column partitioning, 2 were provided: timestampCol1_day, timestampCol2_month")); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTableTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTableTestSpark3_5.java new file mode 100644 index 00000000..8b6fd33a --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTableTestSpark3_5.java @@ -0,0 +1,85 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.baseSchema; +import static com.linkedin.openhouse.spark.SparkTestBase.mockTableService; +import static com.linkedin.openhouse.spark.SparkTestBase.spark; + +import com.linkedin.openhouse.gen.tables.client.model.GetTableResponseBody; +import com.linkedin.openhouse.relocated.org.springframework.web.reactive.function.client.WebClientResponseException; +import com.linkedin.openhouse.spark.SparkTestBase; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class CreateTableTestSpark3_5 { + @Test + public void testCreateTable() { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + GetTableResponseBody getTableResponseBody = + mockGetTableResponseBody( + "dbCreate", + "tb1", + "c1", + "dbCreate.tb1.c1", + "UUID", + mockTableLocationDefaultSchema(TableIdentifier.of("dbCreate", "tb1")), + "v1", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(201, getTableResponseBody)); // doCommit() + mockTableService.enqueue(mockResponse(200, getTableResponseBody)); // doRefresh() + + String ddlWithSchema = + "CREATE TABLE openhouse.dbCreate.tb1 (" + convertSchemaToDDLComponent(baseSchema) + ")"; + Assertions.assertDoesNotThrow(() -> spark.sql(ddlWithSchema)); + // TODO: When we are out of mock, we should verify the created schema as well. + } + + @Test + public void testCreateTableAlreadyExists() { + mockTableService.enqueue( + mockResponse( + 201, + mockGetTableResponseBody( + "dbCreate", + "tbExists", + "c1", + "dbCreate.tbExists.c1", + "UUID", + mockTableLocationDefaultSchema(TableIdentifier.of("dbCreate", "tbExists")), + "v1", + baseSchema, + null, + null))); // doRefresh() + + Assertions.assertThrows( + TableAlreadyExistsException.class, + () -> spark.sql("CREATE TABLE openhouse.dbCreate.tbExists (col1 string, col2 string)")); + } + + @Test + public void testAuthZFailure() { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue( + mockResponse( + 403, + "{\"status\":\"FORBIDDEN\",\"error\":\"forbidden\",\"message\":\"Operation on database dbCreate failed as user sraikar is unauthorized\"}")); + WebClientResponseException exception = + Assertions.assertThrows( + WebClientResponseException.class, + () -> spark.sql("CREATE TABLE openhouse.dbCreate.tbError (col1 string, col2 string)")); + Assertions.assertTrue( + exception + .getMessage() + .contains("Operation on database dbCreate failed as user sraikar is unauthorized")); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTableWithPropsTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTableWithPropsTestSpark3_5.java new file mode 100644 index 00000000..4bd25ef8 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/CreateTableWithPropsTestSpark3_5.java @@ -0,0 +1,49 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.baseSchema; +import static com.linkedin.openhouse.spark.SparkTestBase.mockTableService; +import static com.linkedin.openhouse.spark.SparkTestBase.spark; + +import com.linkedin.openhouse.gen.tables.client.model.GetTableResponseBody; +import com.linkedin.openhouse.spark.SparkTestBase; +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class CreateTableWithPropsTestSpark3_5 { + @Test + public void testCreateTableWithPropsSuccessful() { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + GetTableResponseBody mockResponse = + mockGetTableResponseBody( + "dbCreate", + "tbprop", + "c1", + "dbCreate.tbprop", + "UUID", + mockTableLocationDefaultSchema(TableIdentifier.of("dbCreate", "tbprop")), + "v1", + baseSchema, + null, + null); + + Map tblProps = new HashMap<>(); + tblProps.put("k", "v"); + GetTableResponseBody responseWithProp = decorateResponse(mockResponse, tblProps); + mockTableService.enqueue(mockResponse(201, responseWithProp)); // doCommit() + mockTableService.enqueue(mockResponse(200, responseWithProp)); // doRefresh() + + Assertions.assertDoesNotThrow( + () -> + spark.sql( + "CREATE TABLE openhouse.dbCreate.tbprop (col1 string, col2 string) USING iceberg")); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/DescribeTableTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/DescribeTableTestSpark3_5.java new file mode 100644 index 00000000..e9798446 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/DescribeTableTestSpark3_5.java @@ -0,0 +1,108 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.google.common.collect.ImmutableList; +import com.linkedin.openhouse.spark.SparkTestBase; +import java.util.List; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class DescribeTableTestSpark3_5 { + + @Test + public void testDescribeTable() { + String schemaForDdl = convertSchemaToDDLComponent(baseSchema); + mockTableService.enqueue( + mockResponse( + 200, + mockGetTableResponseBody( + "dbDesc", + "tb1", + "c1", + "dbDesc.tb1.c1", + "UUID", + mockTableLocation(TableIdentifier.of("dbDesc", "tb1"), schemaForDdl, ""), + "v1", + baseSchema, + null, + null))); + + Dataset rows = spark.sql("DESCRIBE TABLE openhouse.dbDesc.tb1"); + spark.sql("DESCRIBE TABLE openhouse.dbDesc.tb1").show(false); + validateSchema(rows, baseSchema); + } + + @Test + public void testDescribeTableDoesNotExist() { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); + + AnalysisException ex = + Assertions.assertThrows( + AnalysisException.class, + () -> spark.sql("DESCRIBE TABLE openhouse.dbDesc.tbNotExist").show()); + + Assertions.assertTrue( + ex.getMessage() + .contains( + "[TABLE_OR_VIEW_NOT_FOUND] The table or view `openhouse`.`dbDesc`.`tbNotExist` cannot be found. Verify the spelling and correctness of the schema and catalog.")); + } + + @Test + public void testDescribeTableOnValidPartitionedTable() { + for (String transform : ImmutableList.of("days", "months", "hours", "years")) { + String tbName = "tbDescPartitioned" + transform; + String transformedPartitioned = "$TRANSFORM(timestampCol)".replace("$TRANSFORM", transform); + mockTableService.enqueue( + mockResponse( + 201, + mockGetTableResponseBody( + "dbDesc", + tbName, + "c1", + "dbDesc.tbpartitioned.c1", + "UUID", + mockTableLocation( + TableIdentifier.of("dbDesc", tbName), + convertSchemaToDDLComponent(baseSchema), + String.format("PARTITIONED BY (%s(timestampCol))", transform)), + "v1", + baseSchema, + null, + null))); // doRefresh() + + Dataset rows = + spark.sql("DESCRIBE TABLE openhouse.dbDesc.$TB_NAME".replace("$TB_NAME", tbName)); + validateSchema(rows, baseSchema); + validatePartitioning(rows, transformedPartitioned); + } + } + + /** Validating the collect rows contains expected partitioning. */ + private static void validatePartitioning(Dataset rows, String transformedPartitioned) { + List rowsCollected = rows.collectAsList(); + Assertions.assertTrue( + rowsCollected.contains( + new GenericRowWithSchema( + new String[] {"Part 0", transformedPartitioned, ""}, rows.schema()))); + } + + /** Validating the collect rows contains expected schema. */ + public static void validateSchema(Dataset rows, String expectedSchema) { + List rowsCollected = rows.collectAsList(); + + for (String[] fieldInArray : convertSchemaToFieldArray(expectedSchema)) { + Assertions.assertTrue( + rowsCollected.contains(new GenericRowWithSchema(fieldInArray, rows.schema()))); + } + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/DropTableTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/DropTableTestSpark3_5.java new file mode 100644 index 00000000..7bf47729 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/DropTableTestSpark3_5.java @@ -0,0 +1,127 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.linkedin.openhouse.javaclient.exception.WebClientResponseWithMessageException; +import com.linkedin.openhouse.spark.SparkTestBase; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.spark.sql.AnalysisException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class DropTableTestSpark3_5 { + @Test + public void testDropTable() { + Object existingTable = + mockGetTableResponseBody( + "dbDrop", + "t1", + "c1", + "dbDrop.t1", + "u1", + mockTableLocation( + TableIdentifier.of("dbDrop", "t1"), convertSchemaToDDLComponent(baseSchema), ""), + "V1", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh() + mockTableService.enqueue(mockResponse(204, null)); // doRefresh() + mockTableService.enqueue( + mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() for describe + mockTableService.enqueue( + mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() for describe + + String ddl = "DROP TABLE openhouse.dbDrop.t1"; + Assertions.assertDoesNotThrow(() -> spark.sql(ddl)); + + AnalysisException ex = + Assertions.assertThrows( + AnalysisException.class, () -> spark.sql("DESCRIBE TABLE openhouse.dbDrop.t1").show()); + + Assertions.assertTrue( + ex.getMessage() + .contains( + "[TABLE_OR_VIEW_NOT_FOUND] The table or view `openhouse`.`dbDrop`.`t1` cannot be found. Verify the spelling and correctness of the schema and catalog.")); + } + + @Test + public void testDropTableNotExist() { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + String ddl = "DROP TABLE openhouse.dbDrop.t1"; + AnalysisException ex = + Assertions.assertThrows(AnalysisException.class, () -> spark.sql(ddl).show()); + + Assertions.assertTrue( + ex.getMessage() + .contains( + "[TABLE_OR_VIEW_NOT_FOUND] The table or view `openhouse`.`dbDrop`.`t1` cannot be found. Verify the spelling and correctness of the schema and catalog.")); + } + + @Test + public void testDropTableCheckExist() { + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh() + + String ddl = "DROP TABLE IF EXISTS openhouse.dbDrop.t1"; + Assertions.assertDoesNotThrow(() -> spark.sql(ddl)); + } + + @Test + public void test503Error() { + Object existingTable = + mockGetTableResponseBody( + "dbDrop", + "t2", + "c1", + "dbDrop.t2", + "u1", + mockTableLocation( + TableIdentifier.of("dbDrop", "t2"), convertSchemaToDDLComponent(baseSchema), ""), + "V1", + baseSchema, + null, + null); + mockTableService.enqueue( + mockResponse(200, existingTable)); // doRefresh() initially returns the table + mockTableService.enqueue( + mockResponse( + 503, + "{\"status\":\"SERVICE_UNAVAILABLE\",\"error\":\"Service Unavailable\",\"message\":\"Drop table failed as service is unavailable\"}")); + WebClientResponseWithMessageException exception = + Assertions.assertThrows( + WebClientResponseWithMessageException.class, + () -> spark.sql("DROP TABLE openhouse.dbDrop.t2")); + Assertions.assertTrue( + exception.getMessage().contains("\"Drop table failed as service is unavailable")); + } + + @Test + public void testConcurrentDropError() { + Object existingTable = + mockGetTableResponseBody( + "dbDrop", + "t3", + "c1", + "dbDrop.t3", + "u1", + mockTableLocation( + TableIdentifier.of("dbDrop", "t3"), convertSchemaToDDLComponent(baseSchema), ""), + "V1", + baseSchema, + null, + null); + mockTableService.enqueue( + mockResponse(200, existingTable)); // doRefresh() initially returns the table + mockTableService.enqueue( + mockResponse(404, null)); // returns 404 as concurrent deletion has happened + + String ddl = "DROP TABLE IF EXISTS openhouse.dbDrop.t3"; + Assertions.assertDoesNotThrow(() -> spark.sql(ddl)); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/ShowTablesTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/ShowTablesTestSpark3_5.java new file mode 100644 index 00000000..78d85f64 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/ddl/ShowTablesTestSpark3_5.java @@ -0,0 +1,57 @@ +package com.linkedin.openhouse.spark.e2e.ddl; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.google.common.collect.ImmutableList; +import com.linkedin.openhouse.spark.SparkTestBase; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.ValidationException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class ShowTablesTestSpark3_5 { + + @Test + public void testShowTables() { + mockTableService.enqueue( + mockResponse( + 200, + mockGetAllTableResponseBody( + mockGetTableResponseBody( + "dbShow", "tb1", null, null, null, null, null, null, null, null), + mockGetTableResponseBody( + "dbShow", "tb2", null, null, null, null, null, null, null, null)))); + + List actualRows = + spark.sql("SHOW TABLES IN openhouse.dbShow").collectAsList().stream() + .map(row -> row.mkString(".")) + .collect(Collectors.toList()); + + Assertions.assertTrue( + actualRows.containsAll(ImmutableList.of("dbShow.tb1.false", "dbShow.tb2.false"))); + } + + @Test + public void testShowTablesEmpty() { + mockTableService.enqueue(mockResponse(200, mockGetAllTableResponseBody())); + + Assertions.assertTrue(spark.sql("SHOW TABLES IN openhouse.dbShow").collectAsList().isEmpty()); + } + + @Test + public void testShowTablesValidationFailure() { + ValidationException validationException = + Assertions.assertThrows( + ValidationException.class, () -> spark.sql("SHOW TABLES in openhouse")); + Assertions.assertTrue(validationException.getMessage().contains("DatabaseId was not provided")); + + spark.sql("Use openhouse"); + validationException = + Assertions.assertThrows(ValidationException.class, () -> spark.sql("SHOW TABLES")); + Assertions.assertTrue(validationException.getMessage().contains("DatabaseId was not provided")); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/dml/InsertIntoTableTestSpark3_5.java b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/dml/InsertIntoTableTestSpark3_5.java new file mode 100644 index 00000000..3f7c9c81 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/dml/InsertIntoTableTestSpark3_5.java @@ -0,0 +1,125 @@ +package com.linkedin.openhouse.spark.e2e.dml; + +import static com.linkedin.openhouse.spark.MockHelpersSpark3_5.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.linkedin.openhouse.javaclient.exception.WebClientResponseWithMessageException; +import com.linkedin.openhouse.spark.SparkTestBase; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class InsertIntoTableTestSpark3_5 { + + @Test + public void testInsertIntoFreshTable() { + Object existingTable = + mockGetTableResponseBody( + "dbInsert", + "tbl", + "testCluster", + "dbInsert.tb1", + "ABCD", + mockTableLocationDefaultSchema(TableIdentifier.of("dbInsert", "tbl")), + "V1", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh + Object tableAfterInsert = + mockGetTableResponseBody( + "dbInsert", + "tbl", + "testCluster", + "dbInsert.tb1", + "ABCD", + mockTableLocationAfterOperation( + TableIdentifier.of("dbInsert", "tbl"), + "INSERT INTO %t VALUES ('1', 'a'), ('2', 'b')"), + "V2", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, tableAfterInsert)); // doCommit + mockTableService.enqueue(mockResponse(200, tableAfterInsert)); // doRefresh + mockTableService.enqueue(mockResponse(200, tableAfterInsert)); // doRefresh + + Assertions.assertDoesNotThrow( + () -> spark.sql("INSERT INTO openhouse.dbInsert.tbl VALUES ('1', 'a'), ('2', 'b')")); + } + + @Test + public void testInsertIntoTableWithData() { + Object existingTable = + mockGetTableResponseBody( + "dbInsert", + "tbl2", + "testCluster", + "dbInsert.tbl2", + "ABCD", + mockTableLocationDefaultSchema( + TableIdentifier.of("dbInsert", "tbl2"), true), // Set true + "V1", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh + Object tableAfterInsert = + mockGetTableResponseBody( + "dbInsert", + "tbl2", + "testCluster", + "dbInsert.tbl2", + "ABCD", + mockTableLocationAfterOperation( + TableIdentifier.of("dbInsert", "tbl2"), + "INSERT INTO %t VALUES ('3', 'c'), ('4', 'd')"), + "V2", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, tableAfterInsert)); // doCommit + mockTableService.enqueue(mockResponse(200, tableAfterInsert)); // doRefresh + mockTableService.enqueue(mockResponse(200, tableAfterInsert)); // doRefresh + + Assertions.assertDoesNotThrow( + () -> spark.sql("INSERT INTO openhouse.dbInsert.tbl2 VALUES ('3', 'c'), ('4', 'd')")); + } + + @Test + public void testAuthZFailure() { + Object existingTable = + mockGetTableResponseBody( + "dbInsert", + "tblError", + "testCluster", + "dbInsert.tblError", + "ABCD", + mockTableLocationDefaultSchema( + TableIdentifier.of("dbInsert", "tblError"), true), // Set true + "V1", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh + mockTableService.enqueue(mockResponse(200, existingTable)); // doRefresh + mockTableService.enqueue( + mockResponse( + 403, + "{\"status\":\"FORBIDDEN\",\"error\":\"forbidden\",\"message\":\"Operation on table dbInsert.tblError failed as user sraikar is unauthorized\"}")); + WebClientResponseWithMessageException exception = + Assertions.assertThrows( + WebClientResponseWithMessageException.class, + () -> + spark.sql("INSERT INTO openhouse.dbInsert.tblError VALUES ('3', 'c'), ('4', 'd')")); + Assertions.assertTrue( + ExceptionUtils.getStackTrace(exception) + .contains( + "Operation on table dbInsert.tblError failed as user sraikar is unauthorized")); + } +} diff --git a/integrations/spark-3.5/openhouse-spark-itest/src/test/resources/dummy.token b/integrations/spark-3.5/openhouse-spark-itest/src/test/resources/dummy.token new file mode 100644 index 00000000..21ad79d4 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-itest/src/test/resources/dummy.token @@ -0,0 +1 @@ +eyJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2NTk2ODI4MDAsImp0aSI6IkRVTU1ZX0FOT05ZTU9VU19VU0VSZGJjNDk3MTMtMzM5ZC00Y2ZkLTkwMDgtZDY4NzlhZDQwZjE2Iiwic3ViIjoie1wiQ09ERVwiOlwiRFVNTVlfQ09ERVwiLFwiVVNFUi1JRFwiOlwiRFVNTVlfQU5PTllNT1VTX1VTRVJcIn0ifQ.W2WVBrMacFrXS8Xa29k_V_yD0yca2nEet5mSYq27Ayo \ No newline at end of file diff --git a/integrations/spark-3.5/openhouse-spark-runtime/build.gradle b/integrations/spark-3.5/openhouse-spark-runtime/build.gradle new file mode 100644 index 00000000..45c70937 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/build.gradle @@ -0,0 +1,74 @@ +plugins { + id 'openhouse.java-minimal-conventions' + id 'openhouse.maven-publish' + id 'com.github.johnrengelman.shadow' version '7.1.2' + id 'scala' +} + +ext { + icebergVersion = '1.5.2' + sparkVersion = '3.5.2' +} + +configurations { + fatJarPackagedDependencies { + exclude(group: 'org.antlr') // included in spark + exclude(group: 'org.mapstruct') + } + shadow.extendsFrom implementation +} + +// Set source for antlr generated directory +sourceSets { + main { + java { + srcDirs += "${project(':integrations:spark:openhouse-spark-runtime_2.12').buildDir}/generated-src/antlr/main" + } + } +} + +dependencies { + compileOnly(project(path: ':integrations:java-iceberg-1.5:openhouse-java-iceberg-1.5-runtime', configuration: 'shadow')) + compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.apache.arrow' + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'org.roaringbitmap' + exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'org.apache.hadoop', module: 'hadoop-client' + } + + testImplementation("org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:" + icebergVersion) { + exclude group: "io.netty" + } + testImplementation(project(':tables-test-fixtures-iceberg-1.5_2.12')) { + exclude group: "io.netty" + } + testImplementation('org.apache.spark:spark-sql_2.12:' + sparkVersion){ + // These classes are available from `client-codegen-convention.gradle` + } + testImplementation(project(path: ':integrations:java-iceberg-1.5:openhouse-java-iceberg-1.5-runtime', configuration: 'shadow')) + + fatJarPackagedDependencies(project(path: ':integrations:java-iceberg-1.5:openhouse-java-iceberg-1.5-runtime', configuration: 'shadow')) { + transitive = false + } + fatJarPackagedDependencies(project(path: ':integrations:spark:openhouse-spark-runtime_2.12', configuration: 'shadow')) { + transitive = false + } + implementation(project(path: ':integrations:spark:openhouse-spark-runtime_2.12', configuration: 'shadow')) + implementation("org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:" + icebergVersion) +} + + +shadowJar { + dependencies { + exclude("javax/**") + } + + configurations = [project.configurations.fatJarPackagedDependencies] + mergeServiceFiles() + archiveClassifier.set('uber') + zip64 true +} + +jar.enabled=true diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSparkSqlExtensionsParser.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSparkSqlExtensionsParser.scala new file mode 100644 index 00000000..7f85b9e5 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSparkSqlExtensionsParser.scala @@ -0,0 +1,141 @@ +package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions + +import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser.QuotedIdentifierContext +import org.antlr.v4.runtime.misc.Interval +import org.antlr.v4.runtime.tree.TerminalNodeImpl +import org.antlr.v4.runtime.{BaseErrorListener, CharStream, CharStreams, CodePointCharStream, CommonToken, CommonTokenStream, IntStream, ParserRuleContext, RecognitionException, Recognizer, Token} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsPostProcessor +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.{DataType, StructType} + +import java.util.Locale + +class OpenhouseSparkSqlExtensionsParser (delegate: ParserInterface) extends ParserInterface { + private lazy val astBuilder = new OpenhouseSqlExtensionsAstBuilder(delegate) + + override def parsePlan(sqlText: String): LogicalPlan = { + if (isOpenhouseCommand(sqlText)) { + parse(sqlText) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] + } else { + delegate.parsePlan(sqlText) + } + } + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText); + } + + def parseRawDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + private def isOpenhouseCommand(sqlText: String): Boolean = { + val normalized = sqlText.toLowerCase(Locale.ROOT) + // Strip simple SQL comments that terminate a line, e.g. comments starting with `--` . + .replaceAll("--.*?\\n", " ") + // Strip newlines. + .replaceAll("\\s+", " ") + // Strip comments of the form /* ... */. This must come after stripping newlines so that + // comments that span multiple lines are caught. + .replaceAll("/\\*.*?\\*/", " ") + // Strip doubles spaces post comments extraction, ex: ALTER /* ... \n ...*/ TABLE + .replaceAll("\\s+", " ") + .trim() + (normalized.startsWith("alter table") && + (normalized.contains("set policy")) || + (normalized.contains("modify column") && + normalized.contains("set tag"))) || + normalized.startsWith("grant") || + normalized.startsWith("revoke") || + normalized.startsWith("show grants") + + } + + protected def parse[T](command: String)(toResult: OpenhouseSqlExtensionsParser => T): T = { + val lexer = new OpenhouseSqlExtensionsLexer(new UpperCaseCharStream(CharStreams.fromString(command))) + lexer.removeErrorListeners() + lexer.addErrorListener(OpenhouseParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = new OpenhouseSqlExtensionsParser(tokenStream) + parser.addParseListener(OpenhouseSqlExtensionsPostProcessor) + parser.removeErrorListeners() + parser.addErrorListener(OpenhouseParseErrorListener) + + toResult(parser) + } + + override def parseQuery(sqlText: String): LogicalPlan = { + parsePlan(sqlText) + } +} + +/* Copied from Apache Spark's to avoid dependency on Spark Internals */ +class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { + override def consume(): Unit = wrapped.consume + override def getSourceName(): String = wrapped.getSourceName + override def index(): Int = wrapped.index + override def mark(): Int = wrapped.mark + override def release(marker: Int): Unit = wrapped.release(marker) + override def seek(where: Int): Unit = wrapped.seek(where) + override def size(): Int = wrapped.size + + override def getText(interval: Interval): String = wrapped.getText(interval) + + // scalastyle:off + override def LA(i: Int): Int = { + val la = wrapped.LA(i) + if (la == 0 || la == IntStream.EOF) la + else Character.toUpperCase(la) + } + // scalastyle:on +} + +case object OpenhouseParseErrorListener extends BaseErrorListener { + override def syntaxError( + recognizer: Recognizer[_, _], + offendingSymbol: scala.Any, + line: Int, + charPositionInLine: Int, + msg: String, + e: RecognitionException): Unit = { + throw new OpenhouseParseException(msg, line, charPositionInLine) + } +} + +/* Extends AnalysisException to access protected constructor */ +class OpenhouseParseException( + message: String, + line: Int, + startPosition: Int) extends AnalysisException(message, Some(line), Some(startPosition)) {} + +case object OpenhouseSqlExtensionsPostProcessor extends OpenhouseSqlExtensionsBaseListener { + override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = { + val token = ctx.BACKQUOTED_IDENTIFIER.getSymbol().asInstanceOf[CommonToken] + token.setText(token.getText.replace("`", "")) + } +} \ No newline at end of file diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/GrantRevokeStatement.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/GrantRevokeStatement.scala new file mode 100644 index 00000000..4eb8f125 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/GrantRevokeStatement.scala @@ -0,0 +1,10 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class GrantRevokeStatement(isGrant: Boolean, resourceType: GrantableResourceType, resourceName: Seq[String], privilege: String, principal: String) extends LeafCommand { + override def simpleString(maxFields: Int): String = { + s"GrantRevokeStatement: isGrant ${isGrant}, ${resourceType} ${resourceName} ${privilege} ${principal}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetColumnPolicyTag.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetColumnPolicyTag.scala new file mode 100644 index 00000000..598c25e6 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetColumnPolicyTag.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class SetColumnPolicyTag(tableName: Seq[String], colName: String, policyTags: Seq[String]) extends LeafCommand { + override def simpleString(maxFields: Int): String = { + s"SetColumnPolicyTag: ${tableName} ${colName} ${policyTags}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala new file mode 100644 index 00000000..674762a7 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class SetReplicationPolicy(tableName: Seq[String], replicationPolicies: Seq[(String, Option[String])]) extends LeafCommand { + override def simpleString(maxFields: Int): String = { + s"SetReplicationPolicy: ${tableName} ${replicationPolicies}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetRetentionPolicy.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetRetentionPolicy.scala new file mode 100644 index 00000000..3d627523 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetRetentionPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class SetRetentionPolicy(tableName: Seq[String], granularity: String, count: Int, colName: Option[String], colPattern: Option[String]) extends LeafCommand { + override def simpleString(maxFields: Int): String = { + s"SetRetentionPolicy: ${tableName} ${count} ${granularity} ${colName.getOrElse("")} ${colPattern.getOrElse("")}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetSharingPolicy.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetSharingPolicy.scala new file mode 100644 index 00000000..c7554217 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetSharingPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class SetSharingPolicy(tableName: Seq[String], sharing: String) extends LeafCommand { + override def simpleString(maxFields: Int): String = { + s"SetSharingPolicy: ${tableName} ${sharing}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/ShowGrantsStatement.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/ShowGrantsStatement.scala new file mode 100644 index 00000000..3e515fef --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/ShowGrantsStatement.scala @@ -0,0 +1,18 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.types.StringType + + +case class ShowGrantsStatement(resourceType: GrantableResourceType, resourceName: Seq[String]) extends LeafCommand { + + override lazy val output: Seq[Attribute] = Seq( + AttributeReference("privilege", StringType, nullable = false)(), + AttributeReference("principal", StringType, nullable = false)() + ) + override def simpleString(maxFields: Int): String = { + s"ShowGrantsStatement: ${resourceType} ${resourceName}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/GrantRevokeStatementExec.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/GrantRevokeStatementExec.scala new file mode 100644 index 00000000..7addc1a3 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/GrantRevokeStatementExec.scala @@ -0,0 +1,50 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + + +import com.linkedin.openhouse.javaclient.api.SupportsGrantRevoke +import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes +import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType +import com.linkedin.openhouse.spark.sql.execution.datasources.v2.mapper.IcebergCatalogMapper +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec + +case class GrantRevokeStatementExec( + isGrant: Boolean, + resourceType: GrantableResourceType, + catalog: TableCatalog, + ident: Identifier, + privilege: String, + principal: String) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + /** Extract {@link OpenHouseCatalog} from {@link TableCatalog} */ + IcebergCatalogMapper.toIcebergCatalog(catalog) match { + /** Call {@link SupportsGrantRevoke#updateTableAclPolicies} or {@link SupportsGrantRevoke#updateDatabaseAclPolicies} */ + case grantRevokableCatalog: SupportsGrantRevoke => + resourceType match { + case GrantableResourceTypes.TABLE => + grantRevokableCatalog.updateTableAclPolicies(Spark3Util.identifierToTableIdentifier(ident), isGrant, privilege, principal) + case GrantableResourceTypes.DATABASE => + grantRevokableCatalog.updateDatabaseAclPolicies(toNamespace(ident), isGrant, privilege, principal) + } + case _ => + throw new UnsupportedOperationException(s"Catalog '${catalog.name()}' does not support Grant Revoke Statements") + } + Nil + } + + override def simpleString(maxFields: Int): String = { + s"GrantRevokeStatementExec: ${catalog.name()} $isGrant $ident $privilege $principal" + } + + private def toNamespace(ident: Identifier): Namespace = { + val dbArray: Array[String] = ident.namespace() :+ ident.name() + Namespace.of(dbArray:_*) + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetColumnPolicyTagExec.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetColumnPolicyTagExec.scala new file mode 100644 index 00000000..4fc7f559 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetColumnPolicyTagExec.scala @@ -0,0 +1,37 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec + +case class SetColumnPolicyTagExec( + catalog: TableCatalog, + ident: Identifier, + colName: String, + policyTags: Seq[String]) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = s"""{"columnTags":{"${colName}": {"tags": [${policyTags.mkString(", ")}]}}}""" + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set column policy tags for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetColumnPolicyTagExec: ${catalog} ${ident} ${colName} ${policyTags}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala new file mode 100644 index 00000000..3e1459d6 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala @@ -0,0 +1,26 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec + +case class SetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, replicationPolicies: Seq[(String, Option[String])]) extends LeafV2CommandExec{ + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = s"""{"replication":{"config":[${replicationPolicies.map(replication => s"""{"destination":"${replication._1}","interval":"${replication._2.getOrElse("")}"}""").mkString(",")}]}}""" + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set replication policy for non-Openhouse table: $table") + } + Nil + } + + override def output: Seq[Attribute] = Nil +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetRetentionPolicyExec.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetRetentionPolicyExec.scala new file mode 100644 index 00000000..6073e0e4 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetRetentionPolicyExec.scala @@ -0,0 +1,52 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec + +case class SetRetentionPolicyExec( + catalog: TableCatalog, + ident: Identifier, + granularity: String, + count: Int, + colName: Option[String], + colPattern: Option[String] + ) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = { + (colName, colPattern) match { + case (None, None) => s"""{"retention":{"count":${count},"granularity":"${granularity}"}}""" + case (Some(nameVal), Some(patternVal)) => { + val columnPattern = s"""{"columnName":"${nameVal}","pattern": "${patternVal}"}""" + s"""{"retention":{"count":${count},"granularity":"${granularity}", "columnPattern":${columnPattern}}}""" + } + case (Some(nameVal), None) => { + val columnPattern = s"""{"columnName":"${nameVal}","pattern": ""}""" + s"""{"retention":{"count":${count},"granularity":"${granularity}", "columnPattern":${columnPattern}}}""" + } + } + } + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set retention policy for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetRetentionPolicyExec: ${catalog} ${ident} ${count} ${granularity} ${colName.getOrElse("")} ${colPattern.getOrElse("")}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetSharingPolicyExec.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetSharingPolicyExec.scala new file mode 100644 index 00000000..b017fad1 --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetSharingPolicyExec.scala @@ -0,0 +1,36 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec + +case class SetSharingPolicyExec( + catalog: TableCatalog, + ident: Identifier, + sharing: String) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = s"""{"sharingEnabled": ${sharing}}""" + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set sharing policy for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetSharingPolicyExec: ${catalog} ${ident} ${sharing}" + } +} diff --git a/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/ShowGrantsStatementExec.scala b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/ShowGrantsStatementExec.scala new file mode 100644 index 00000000..ff65f41d --- /dev/null +++ b/integrations/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/ShowGrantsStatementExec.scala @@ -0,0 +1,52 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import com.linkedin.openhouse.javaclient.api.SupportsGrantRevoke +import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes +import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType +import com.linkedin.openhouse.spark.sql.execution.datasources.v2.mapper.IcebergCatalogMapper +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + + +case class ShowGrantsStatementExec( + output: Seq[Attribute], + resourceType: GrantableResourceType, + catalog: TableCatalog, + ident: Identifier) extends LeafV2CommandExec with LeafExecNode { + + override protected def run(): Seq[InternalRow] = { + /** Extract {@link OpenHouseCatalog} from {@link TableCatalog} */ + IcebergCatalogMapper.toIcebergCatalog(catalog) match { + /** Call {@link SupportsGrantRevoke#updateTableAclPolicies} or {@link SupportsGrantRevoke#updateDatabaseAclPolicies} */ + case grantRevokableCatalog: SupportsGrantRevoke => + (resourceType match { + case GrantableResourceTypes.TABLE => + grantRevokableCatalog.getTableAclPolicies(Spark3Util.identifierToTableIdentifier(ident)) + case GrantableResourceTypes.DATABASE => + grantRevokableCatalog.getDatabaseAclPolicies(toNamespace(ident)) + }).asScala.map { aclPolicy => + val row: Array[Any] = Array(UTF8String.fromString(aclPolicy.getPrivilege), UTF8String.fromString(aclPolicy.getPrincipal)) + new GenericInternalRow(row) + } + case _ => + throw new UnsupportedOperationException(s"Catalog '${catalog.name()}' does not support Grant Revoke Statements") + } + } + + override def simpleString(maxFields: Int): String = { + s"ShowGrantsStatementExec: ${catalog.name()} $ident" + } + + private def toNamespace(ident: Identifier): Namespace = { + val dbArray: Array[String] = ident.namespace() :+ ident.name() + Namespace.of(dbArray:_*) + } +} diff --git a/libs/datalayout/build.gradle b/libs/datalayout/build.gradle index 541ce47a..f7f6f39c 100644 --- a/libs/datalayout/build.gradle +++ b/libs/datalayout/build.gradle @@ -1,7 +1,6 @@ plugins { id 'openhouse.java-conventions' id 'openhouse.hadoop-conventions' - id 'openhouse.iceberg-conventions-1.2' id 'openhouse.maven-publish' } @@ -10,10 +9,18 @@ ext { sparkVersion = '3.1.1' springVersion = '2.7.8' hadoopVersion = '2.10.0' + sparkVersionSuffix = "3.1" + openhouseSparkRuntimeModule = ":integrations:spark:spark-${sparkVersionSuffix}:openhouse-spark-runtime_2.12" + tablesTestFixturesModule = ":tables-test-fixtures:tables-test-fixtures_2.12" } dependencies { - compileOnly project(':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12') + compileOnly project(openhouseSparkRuntimeModule) + implementation('org.apache.iceberg:iceberg-bundled-guava:' + icebergVersion) + implementation('org.apache.iceberg:iceberg-data:' + icebergVersion) + implementation('org.apache.iceberg:iceberg-core:' + icebergVersion) + implementation('org.apache.iceberg:iceberg-common:' + icebergVersion) + implementation('org.testcontainers:testcontainers:1.19.8') implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) { exclude group: 'io.netty' exclude group: 'org.apache.hadoop', module: 'hadoop-common' @@ -27,7 +34,7 @@ dependencies { implementation 'org.apache.hadoop:hadoop-common:' + hadoopVersion implementation 'org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:' + icebergVersion - testImplementation (project(path: ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12', configuration: 'shadow')) { + testImplementation (project(path: openhouseSparkRuntimeModule, configuration: 'shadow')) { exclude group: 'io.netty' exclude group: 'org.apache.hadoop', module: 'hadoop-common' exclude group: 'org.apache.hadoop', module: 'hadoop-client' @@ -38,7 +45,7 @@ dependencies { testImplementation 'org.mockito:mockito-inline:4.11.0' testImplementation 'org.powermock:powermock-module-junit4:2.0.9' testImplementation 'org.powermock:powermock-api-mockito2:2.0.9' - testImplementation(project(':tables-test-fixtures:tables-test-fixtures_2.12')) + testImplementation(project(tablesTestFixturesModule)) testRuntimeOnly("org.eclipse.jetty:jetty-server:11.0.2") } @@ -50,4 +57,4 @@ test { '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED', '--add-exports=java.base/sun.util.calendar=ALL-UNNAMED' } -} +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index c9a5baa1..e883b172 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,7 @@ pluginManagement { rootProject.name = 'openhouse' include ':apps:spark' +include ':apps:spark-3.5' include ':client:common' include ':client:hts' @@ -68,4 +69,4 @@ project(':integrations:java:iceberg-1.5:openhouse-java-runtime').name = 'openhou project(':integrations:java:iceberg-1.5:openhouse-java-itest').name = 'openhouse-java-iceberg-1.5-itest' project(':integrations:spark:spark-3.5:openhouse-spark-runtime').name = 'openhouse-spark-3.5-runtime_2.12' project(':integrations:spark:spark-3.5:openhouse-spark-itest').name = 'openhouse-spark-3.5-itest' -project(':tables-test-fixtures:tables-test-fixtures-iceberg-1.5').name = 'tables-test-fixtures-iceberg-1.5_2.12' +project(':tables-test-fixtures:tables-test-fixtures-iceberg-1.5').name = 'tables-test-fixtures-iceberg-1.5_2.12' \ No newline at end of file diff --git a/tables-test-fixtures-iceberg-1.5/build.gradle b/tables-test-fixtures-iceberg-1.5/build.gradle new file mode 100644 index 00000000..9baad4d8 --- /dev/null +++ b/tables-test-fixtures-iceberg-1.5/build.gradle @@ -0,0 +1,245 @@ +plugins { + id 'openhouse.java-minimal-conventions' + id 'com.github.johnrengelman.shadow' version '7.1.2' + id 'openhouse.maven-publish' + id 'openhouse.iceberg-conventions-1.5.2' +} + +import com.github.jengelman.gradle.plugins.shadow.transformers.PropertiesFileTransformer + +ext { + sparkVersion = '3.5.2' +} + +configurations { + + all { + exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' + } + shadow.extendsFrom implementation +} + +project.evaluationDependsOn(':tables-test-fixtures_2.12') + +sourceSets { + main { + java { + srcDirs += project(':tables-test-fixtures_2.12').sourceSets.main.java.srcDirs + } + resources { + srcDirs += project(':tables-test-fixtures_2.12').sourceSets.main.resources.srcDirs + } + } + test { + java { + srcDirs += project(':tables-test-fixtures_2.12').sourceSets.test.java.srcDirs + } + } +} + +dependencies { + implementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version + implementation 'org.springframework.boot:spring-boot-starter-test:' + spring_web_version + implementation(project(':services:tables')) { + exclude group: 'org.apache.hadoop' + exclude group: 'org.testcontainers' + exclude group: 'org.ow2.asm' + exclude group: 'org.xerial' + exclude group: 'javax' + } + + + compileOnly 'org.springframework.boot:spring-boot-starter-tomcat:' + spring_web_version + compileOnly('org.apache.spark:spark-sql_2.12:' + sparkVersion){ + // These classes are available from `client-codegen-convention.gradle` + exclude group: "io.netty" + } + + implementation 'org.springframework.boot:spring-boot-starter-webflux:' + spring_web_version + implementation ('org.springframework.boot:spring-boot-starter-data-jpa:' + spring_web_version) { + exclude(group: 'org.springframework', module: 'spring-aspects') + exclude(group: 'org.springframework.boot', module: 'spring-boot-starter-aop') + } + + implementation ('org.springframework.boot:spring-boot-starter-actuator:2.7.8') { + exclude(group: 'io.micrometer', module: 'micrometer-core') + } + implementation group: 'com.zaxxer', name: 'HikariCP', version: '4.0.3' + compileOnly('org.projectlombok:lombok:' + '1.18.20') + implementation (project(':cluster:storage')) { + exclude group: 'org.testcontainers' + exclude group: 'javax' + exclude group: 'org.xerial' + + } + implementation (project(':iceberg:openhouse:internalcatalog')) { + exclude group: 'org.apache.hadoop' + exclude group: 'org.ow2.asm' + exclude group: 'org.testcontainers' + exclude group: 'javax' + exclude group: 'org.xerial' + exclude group: 'com.h2database' + exclude group: 'io.micrometer' + } + + testImplementation('org.apache.hadoop:hadoop-common:3.2.0'){ + transitive = false + exclude group: "io.netty" + } + testRuntimeOnly("org.eclipse.jetty:jetty-server:11.0.2") +} + +shadowJar { + zip64 true + + archiveClassifier.set('uber') + + mergeServiceFiles() + + append 'META-INF/spring.handlers' + append 'META-INF/spring.schemas' + append 'META-INF/spring.tooling' + transform(PropertiesFileTransformer) { + paths = ['META-INF/spring.factories' ] + mergeStrategy = "merge" + } + + dependencies { + exclude(dependency('com.fasterxml.jackson.module::')) + exclude(dependency('com.fasterxml.jackson.core::')) + + relocate('com.google', 'openhouse.relocated.com.google') + relocate('com.jayway', 'openhouse.relocated.com.jayway') + + // TODO: relocate zaxxer, ehcache, h2, hibernate. With relocation it fails by either picking up HouseTableRepositoryImp or with a missing validatorBean + + relocate('org.jboss', 'openhouse.relocated.org.jboss') + relocate('com.ctc', 'openhouse.relocated.com.ctc') + relocate('com.azure', 'openhouse.relocated.com.azure') + + + relocate('ch.qos.logback', 'openhouse.relocated.ch.qos.logback') + exclude(dependency('org.apache.hadoop::')) + exclude(dependency('org.apache.logging.log4j::')) + exclude(dependency('org.slf4j::')) + exclude(dependency('org.log4j::')) + exclude(dependency('org.apache.log4j::')) + exclude(dependency('org.apache.spark::')) + + relocate('org.codehaus', 'openhouse.relocated.org.codehaus') + relocate('org.reactivestreams', 'openhouse.relocated.org.reactivestreams') + relocate('org.junit', 'openhouse.relocated.org.junit') + relocate ('org.yaml.snakeyaml', 'openhouse.relocated.org.yaml.snakeyaml') + relocate ('org.json', 'openhouse.relocated.org.json') + relocate('org.openapitools', 'openhouse.relocated.org.openapitools') + relocate ('org.apache.iceberg', 'openhouse.relocated.org.apache.iceberg') { + exclude 'org.apache.iceberg.spark.SparkCatalog' + } + + + + relocate('org.apache.avro', 'openhouse.relocated.org.apache.avro') + relocate('org.apache.commons', 'openhouse.relocated.org.apache.commons') + relocate('org.apache.curator', 'openhouse.relocated.org.apache.curator') + relocate('org.apache.directory', 'openhouse.relocated.org.apache.directory') + relocate('org.apache.el', 'openhouse.relocated.org.apache.el') + relocate('org.apache.hc', 'openhouse.relocated.org.apache.hc') + relocate('org.apache.htrace', 'openhouse.relocated.org.apache.htrace') + relocate('org.apache.http', 'openhouse.relocated.org.apache.http') + relocate('org.apache.juli', 'openhouse.relocated.org.apache.juli') + relocate('org.apache.jute', 'openhouse.relocated.org.apache.jute') + relocate('org.apache.log4j', 'openhouse.relocated.org.apache.log4j') + relocate('org.apache.naming', 'openhouse.relocated.org.apache.naming') + relocate('org.apache.parquet', 'openhouse.relocated.org.apache.parquet') + relocate('org.apache.orc', 'openhouse.relocated.org.apache.orc') + relocate('org.apache.yetus', 'openhouse.relocated.org.apache.yetus') + relocate('org.apache.zookeeper', 'openhouse.relocated.org.apache.zookeeper') + relocate('org.aspectj', 'openhouse.relocated.org.aspectj') + relocate('org.HdrHistogram', 'openhouse.relocated.org.HdrHistogram') + + + relocate('com.fasterxml', 'openhouse.relocated.com.fasterxml') { + exclude 'com.fasterxml.jackson.databind.**' + exclude 'com.fasterxml.jackson.core.**' + exclude 'com.fasterxml.jackson.module.**' + exclude 'com.fasterxml.jackson.annotation.**' + } + + relocate('reactor', 'openhouse.relocated.reactor') + + relocate('io.airlift', 'openhouse.relocated.io.airlift') + relocate('io.swagger', 'openhouse.relocated.io.swagger') + relocate('io.jsonwebtoken', 'openhouse.relocated.io.jsonwebtoken') + relocate('io.prometheus', 'openhouse.relocated.io.prometheus') + + relocate('net', 'openhouse.relocated.net') + relocate('junit', 'openhouse.relocated.junit') + relocate('antlr', 'openhouse.relocated.antlr') + relocate('aj', 'openhouse.relocated.aj') + relocate('edu', 'openhouse.relocated.edu') + relocate('software', 'openhouse.relocated.software') + + exclude("com/sun/**") + exclude("com/microsoft/**") + relocate('com.github', 'openhouse.relocated.com.github') + exclude("com/jamesmurty/**") + exclude("com/jcraft/**") + exclude("com/nimbusds/**") + + exclude("org/aopalliance/**") + exclude("org/atteo/**") + exclude("org/assertj/**") + exclude("org/checkerframework/**") + exclude("org/fusesource/**") + + + exclude("org/hamcrest/**") + exclude("org/iq80/**") + exclude("org/jets3t/**") + exclude("org/intellij/**") + exclude("org/jetbrains/**") + exclude("org/jvnet/**") + exclude("org/jets3t/**") + exclude("org/LatencyUtils/**") + exclude("org/mapstruct/**") + exclude("org/mockito/**") + exclude("org/mortbay/**") + exclude("org/objenesis/**") + exclude("org/objectweb/**") + + exclude("org/opentest4j/**") + exclude("org/rnorth/**") + exclude("org/roaringbitmap/**") + exclude("org/skyscreamer/**") + + //TODO: Relocate org.springframework if required. + exclude("org/terracotta/**") + exclude("org/testcontainers/**") + exclude("org/threeten/**") + exclude("org/xerial/**") + exclude("org/xmlunit/**") + exclude("org/znerd/**") + + exclude("jline/**") + exclude("linux/**") + exclude("contribs/**") + exclude("microsoft/**") + } + + exclude '**/public-suffix-list.txt' + exclude 'assets/**/**/**/**/**/**/*.properties' + exclude '**/*.dll' + exclude '**/*.ico' + exclude '**/*.dylib' + exclude '**/**/*.so' +} + +// https://github.com/johnrengelman/shadow/issues/335 +// By default shadow doesn't configure the build task to depend on the shadowJar task. +tasks.build.dependsOn tasks.shadowJar + +test { + if (JavaVersion.current() >= JavaVersion.VERSION_1_9){ + jvmArgs '--add-opens=java.base/java.net=ALL-UNNAMED' + } +}