Skip to content

Commit

Permalink
working apps spark3.5 and spark 3.1 build and test
Browse files Browse the repository at this point in the history
  • Loading branch information
cbb330 committed Nov 24, 2024
1 parent b7a38eb commit ab39faa
Show file tree
Hide file tree
Showing 38 changed files with 2,703 additions and 135 deletions.
68 changes: 68 additions & 0 deletions apps/spark-3.5/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
plugins {
// dependency in apps-spark-conventions
id 'com.github.johnrengelman.shadow' version '7.1.2'
id 'openhouse.apps-spark-conventions'
}

ext {
log4jVersion = "2.20.0"

sparkVersion = '3.5.2'
icebergVersion = '1.5.2'
sparkVersionSuffix = "3.5"
openhouseSparkRuntimeModule = ":integrations:spark:spark-${sparkVersionSuffix}:openhouse-spark-3.5-runtime_2.12"
icebergSparkRuntimeModule = "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:${icebergVersion}"
tablesTestFixturesModule = ":tables-test-fixtures:tables-test-fixtures-iceberg-1.5_2.12"
}

dependencies {
compileOnly (project(path: openhouseSparkRuntimeModule)) {
exclude group: 'io.netty'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
exclude group: 'org.apache.hadoop', module: 'hadoop-client'
}

implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
implementation(project(':libs:datalayout')) {
exclude group: 'org.apache.iceberg', module: 'iceberg-spark-runtime-3.1_2.12'
}
implementation("org.apache.iceberg:iceberg-bundled-guava:${icebergVersion}")
implementation("org.apache.iceberg:iceberg-data:${icebergVersion}")
implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")
implementation("org.apache.iceberg:iceberg-common:${icebergVersion}")
implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) {
exclude group: 'io.netty'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
exclude group: 'org.apache.hadoop', module: 'hadoop-client'
}
implementation ('org.apache.spark:spark-sql_2.12:' + sparkVersion) {
exclude group: 'io.netty'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
exclude group: 'org.apache.hadoop', module: 'hadoop-client'
}
implementation (icebergSparkRuntimeModule) {
exclude group: 'io.netty'
}

testImplementation (project(path: openhouseSparkRuntimeModule, configuration: 'shadow')) {
exclude group: 'io.netty'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
exclude group: 'org.apache.hadoop', module: 'hadoop-client'
}
testImplementation(project(tablesTestFixturesModule)) {
exclude group: "io.netty"
}
}

sourceSets {
main {
java {
srcDirs = ['src/main/java', project(':apps:openhouse-spark-apps_2.12').sourceSets.main.java.srcDirs]
}
}
test {
java {
srcDirs = ['src/test/java', project(':apps:openhouse-spark-apps_2.12').sourceSets.test.java.srcDirs]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package com.linkedin.openhouse.catalog.e2e;

import static org.apache.iceberg.types.Types.NestedField.*;

import com.linkedin.openhouse.jobs.spark.Operations;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.junit.jupiter.api.Assertions;

public class MinimalSparkMoRTest extends OpenHouseSparkITest {
private static final Schema SCHEMA =
new Schema(
required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get()));

private static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();

private static final DataFile FILE_A =
DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();
private static final DataFile FILE_B =
DataFiles.builder(SPEC)
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1")
.withRecordCount(1)
.build();

private void setupTableWithTwoDataFiles(String tableName, Operations ops) {
final int numInserts = 2;
prepareTable(ops, tableName, false);
populateTable(ops, tableName, numInserts);
ops.spark()
.sql(
"ALTER TABLE db.test_data_compaction SET TBLPROPERTIES ('write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'write.delete.distribution-mode'='range');")
.show();
ops.spark().sql("select * from db.test_data_compaction").drop("summary").show(80, false);
ops.spark()
.sql("select * from db.test_data_compaction.snapshots")
.drop("summary")
.show(80, false);
}

private void createDeletes(Operations ops) {
ops.spark().sql("DELETE FROM db.test_data_compaction WHERE data = 'v6'").show();
}

// @Test
public void testDataCompactionPartialProgressNonPartitionedTable() throws Exception {
final String tableName = "db.test_data_compaction";

BiFunction<Operations, Table, RewriteDataFiles.Result> rewriteFunc =
(ops, table) ->
ops.rewriteDataFiles(
table,
1024 * 1024, // 1MB
1024, // 1KB
1024 * 1024 * 2, // 2MB
2,
1,
true,
10);

try (Operations ops = Operations.withCatalog(getSparkSession(), null)) {
// setupTestTable();
// // create table with two big datafiles and it creates two delete files
// assertDeleteFilesCreated();
//
// setupTestTable();
// // same as above, then run compaction, and two delete files exist
// assertCompactionCanCreateDanglingDeletes();
//
// setupTestTable();
// // a file in an old snapshot is expired / removable if it no longer represents the
// underlying data
// // meaning a newer snapshot has overwritten the data
// // so for delete file it means that the equality
//
//
// // cow will create new datafile
//
// // delete file to represent a new snapshot with one row edit
//
// // cow will create an entirely new datafile
//
// // then delete file will be unused, should expire.
//
//
// // then do the same but with compaction, it won't expire
// assertDanglingDeleteFilesCanNeverExpire();
//
// setupTestTable();
// assertRewritePositionDeleteRemovesDanglingDeletes();
//
// setupTestTable();
// assertEqualityDeletesNotCompactable();
//
// setupTestTable();
// assertProcessToCompactEqualityDeletes();
//
// setupTestTable();
// assertCompactionCanRemoveDeletes();

Table table = ops.getTable(tableName);
// log.info("Loaded table {}, location {}", table.name(), table.location());
RewriteDataFiles.Result result = rewriteFunc.apply(ops, table);
populateTable(ops, tableName, 3);
Dataset<Row> metadataTable =
SparkTableUtil.loadMetadataTable(ops.spark(), table, MetadataTableType.FILES)
.selectExpr("content", "file_path", "file_size_in_bytes")
.dropDuplicates(
"file_path",
"file_size_in_bytes"); // tOdo: can the same file_path have two diff sizes

// Aggregate counts and sums based on `content` values
Dataset<Row> contentStats =
metadataTable
.groupBy("content")
.agg(
functions.count("content").alias("count"),
functions.sum("file_size_in_bytes").alias("total_size"));

// Collect the result as a map for quick lookup of counts and sums by content value
Map<Integer, Row> statsMap =
contentStats.collectAsList().stream()
.collect(
Collectors.toMap(
row -> row.getInt(0), // content value (0, 1, or 2)
row -> row // Row containing count and total_size
));
// log.info(
// "Added {} data files, rewritten {} data files, rewritten {} bytes",
// result.addedDataFilesCount(),
// result.rewrittenDataFilesCount(),
// result.rewrittenBytesCount());
Assertions.assertEquals(0, result.addedDataFilesCount());
Assertions.assertEquals(0, result.rewrittenDataFilesCount());

populateTable(ops, tableName, 3);
ops.spark().sql("DELETE FROM db.test_data_compaction WHERE data = 'v6'").show();
populateTable(ops, tableName, 3);
RewriteDataFiles.Result result2 = rewriteFunc.apply(ops, table);
Assertions.assertEquals(0, result2.addedDataFilesCount());
Assertions.assertEquals(0, result2.rewrittenDataFilesCount());
}
}

private static void prepareTable(Operations ops, String tableName, boolean isPartitioned) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
if (isPartitioned) {
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (data string, ts timestamp) partitioned by (days(ts))",
tableName))
.show();
} else {
ops.spark()
.sql(String.format("CREATE TABLE %s (data string, ts timestamp)", tableName))
.show();
}
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static List<Long> getSnapshotIds(Operations ops, String tableName) {
// log.info("Getting snapshot Ids");
List<Row> snapshots =
ops.spark().sql(String.format("SELECT * FROM %s.snapshots", tableName)).collectAsList();
// snapshots.forEach(s -> log.info(s.toString()));
return snapshots.stream()
.map(r -> r.getLong(r.fieldIndex("snapshot_id")))
.collect(Collectors.toList());
}

private static void populateTable(Operations ops, String tableName, int numRows) {
populateTable(ops, tableName, numRows, 0);
}

private static void populateTable(Operations ops, String tableName, int numRows, int dayLag) {
populateTable(ops, tableName, numRows, dayLag, System.currentTimeMillis() / 1000);
}

private static void populateTable(
Operations ops, String tableName, int numRows, int dayLag, long timestampSeconds) {
String timestampEntry =
String.format("date_sub(from_unixtime(%d), %d)", timestampSeconds, dayLag);
StringBuilder valuesBatch = new StringBuilder();
for (int row = 0; row < numRows; ++row) {
valuesBatch.setLength(0); // Clear the batch for each iteration
for (int i = 0; i < 10; i++) {
valuesBatch.append(String.format("('v%d', %s)", row + i, timestampEntry));
if (i < 9) {
valuesBatch.append(", ");
}
}

ops.spark()
.sql(String.format("INSERT INTO %s VALUES %s", tableName, valuesBatch.toString()))
.show();

// ops.spark().sql(String.format("DELETE FROM db.test_data_compaction WHERE data = 'v%d'",
// row)).show();
}
}
}
Loading

0 comments on commit ab39faa

Please sign in to comment.