Skip to content

Commit

Permalink
[Kernel] Add test utilities like checkAnswer and checkTable (#2034)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [x] Kernel
- [ ] Other (fill in here)

## Description

Improves the testing infrastructure for Scala tests in Delta Kernel.

For now adds it to `kernel-defaults` but if we have tests with `ColumnarBatch`s in `kernel-api` we can move it there.

## How was this patch tested?

Refactors existing tests to use the new infra.
  • Loading branch information
allisonport-db authored Sep 15, 2023
1 parent a9f8c8d commit 19b6c9e
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,59 @@ package io.delta.kernel.defaults

import io.delta.golden.GoldenTableUtils.goldenTablePath

import io.delta.kernel.Table
import io.delta.kernel.defaults.client.DefaultTableClient
import io.delta.kernel.defaults.utils.DefaultKernelTestUtils
import io.delta.kernel.defaults.utils.{TestRow, TestUtils}
import io.delta.kernel.defaults.utils.DefaultKernelTestUtils.getTestResourceFilePath
import org.apache.hadoop.conf.Configuration
import org.scalatest.funsuite.AnyFunSuite

class DeletionVectorSuite extends AnyFunSuite with TestUtils {

test("end-to-end usage: reading a table with dv") {
val path = DefaultKernelTestUtils.getTestResourceFilePath("basic-dv-no-checkpoint")
val expectedResult = Seq.range(start = 2, end = 10).toSet

val snapshot = Table.forPath(path).getLatestSnapshot(defaultTableClient)
val result = readSnapshot(snapshot).map { row =>
row.getLong(0)
}

assert(result.toSet === expectedResult)
checkTable(
path = getTestResourceFilePath("basic-dv-no-checkpoint"),
expectedAnswer = (2L until 10L).map(TestRow(_))
)
}

test("end-to-end usage: reading a table with dv with checkpoint") {
val path = DefaultKernelTestUtils.getTestResourceFilePath("basic-dv-with-checkpoint")
val expectedResult = Seq.range(start = 0, end = 500).filter(_ % 11 != 0).toSet

val snapshot = Table.forPath(path).getLatestSnapshot(defaultTableClient)
val result = readSnapshot(snapshot).map { row =>
row.getLong(0)
}

assert(result.toSet === expectedResult)
checkTable(
path = getTestResourceFilePath("basic-dv-with-checkpoint"),
expectedAnswer = (0L until 500L).filter(_ % 11 != 0).map(TestRow(_))
)
}

test("end-to-end usage: reading partitioned dv table with checkpoint") {
// kernel expects a fully qualified path
val path = "file:" + goldenTablePath("dv-partitioned-with-checkpoint")
val expectedResult = (0 until 50).map(x => (x%10, x, s"foo${x % 5}"))
.filter{ case (_, col1, _) =>
!(col1 % 2 == 0 && col1 < 30)
}.toSet

val conf = new Configuration()
// Set the batch size small enough so there will be multiple batches
conf.setInt("delta.kernel.default.parquet.reader.batch-size", 2)
val tableClient = DefaultTableClient.create(conf)

val snapshot = Table.forPath(path).getLatestSnapshot(tableClient)
val result = readSnapshot(snapshot, tableClient = tableClient).map { row =>
(row.getInt(0), row.getInt(1), row.getString(2))
}
val expectedResult = (0 until 50).map(x => (x%10, x, s"foo${x % 5}"))
.filter{ case (_, col1, _) =>
!(col1 % 2 == 0 && col1 < 30)
}

assert (result.toSet == expectedResult)
checkTable(
path = "file:" + goldenTablePath("dv-partitioned-with-checkpoint"),
expectedAnswer = expectedResult.map(TestRow.fromTuple(_)),
tableClient = tableClient
)
}

// TODO: update to use goldenTables once bug is fixed in delta-spark see issue #1886
test(
"end-to-end usage: reading partitioned dv table with checkpoint with columnMappingMode=name") {
val path = DefaultKernelTestUtils.getTestResourceFilePath("dv-with-columnmapping")
val expectedResult = (0 until 50).map(x => (x%10, x, s"foo${x % 5}"))
.filter{ case (_, col1, _) =>
!(col1 % 2 == 0 && col1 < 30)
}.toSet

val snapshot = Table.forPath(path).getLatestSnapshot(defaultTableClient)
val result = readSnapshot(snapshot).map { row =>
(row.getInt(0), row.getInt(1), row.getString(2))
}

assert (result.toSet == expectedResult)
}
checkTable(
path = getTestResourceFilePath("dv-with-columnmapping"),
expectedAnswer = expectedResult.map(TestRow.fromTuple(_))
)
}


// TODO detect corrupted DV checksum
// TODO detect corrupted dv size
// TODO multiple dvs in one file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import org.scalatest.funsuite.AnyFunSuite
import io.delta.golden.GoldenTableUtils.goldenTablePath

import io.delta.kernel.Table

import io.delta.kernel.defaults.internal.DefaultKernelUtils
import io.delta.kernel.defaults.utils.{TestRow, TestUtils}

class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {

Expand Down Expand Up @@ -54,52 +54,44 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
4 | null | null
*/

def row0: (Int, Option[Long]) = (
def row0: TestRow = TestRow(
0,
Some(1580544550000000L) // 2020-02-01 08:09:10 UTC to micros since the epoch
1580544550000000L // 2020-02-01 08:09:10 UTC to micros since the epoch
)

def row1: (Int, Option[Long]) = (
def row1: TestRow = TestRow(
1,
Some(915181200000000L) // 1999-01-01 09:00:00 UTC to micros since the epoch
915181200000000L // 1999-01-01 09:00:00 UTC to micros since the epoch
)

def row2: (Int, Option[Long]) = (
def row2: TestRow = TestRow(
2,
Some(946717200000000L) // 2000-01-01 09:00:00 UTC to micros since the epoch
946717200000000L // 2000-01-01 09:00:00 UTC to micros since the epoch
)

def row3: (Int, Option[Long]) = (
def row3: TestRow = TestRow(
3,
Some(-31536000000000L) // 1969-01-01 00:00:00 UTC to micros since the epoch
-31536000000000L // 1969-01-01 00:00:00 UTC to micros since the epoch
)

def row4: (Int, Option[Long]) = (
def row4: TestRow = TestRow(
4,
None
null
)

// TODO: refactor this once testing utilities have support for Rows/ColumnarBatches
def utcTableExpectedResult: Set[(Int, Option[Long])] = Set(row0, row1, row2, row3, row4)
def utcTableExpectedResult: Seq[TestRow] = Seq(row0, row1, row2, row3, row4)

def testTimestampTable(
goldenTableName: String,
timeZone: String,
expectedResult: Set[(Int, Option[Long])]): Unit = {
expectedResult: Seq[TestRow]): Unit = {
withTimeZone(timeZone) {
// kernel expects a fully qualified path
val path = "file:" + goldenTablePath(goldenTableName)
val snapshot = Table.forPath(path).getLatestSnapshot(defaultTableClient)

// for now omit "part" column since we don't support reading timestamp partition values
val readSchema = snapshot.getSchema(defaultTableClient)
.withoutField("part")

val result = readSnapshot(snapshot, readSchema).map { row =>
(row.getInt(0), if (row.isNullAt(1)) Option.empty[Long] else Some(row.getLong(1)))
}

assert(result.toSet == expectedResult)
checkTable(
path = "file:" + goldenTablePath(goldenTableName),
expectedAnswer = expectedResult,
// for now omit "part" column since we don't support reading timestamp partition values
readCols = Seq("id", "time")
)
}
}

Expand All @@ -113,9 +105,16 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
}

// PST table - all the "time" col timestamps are + 8 hours
def pstTableExpectedResult: Set[(Int, Option[Long])] = utcTableExpectedResult.map {
case (id, col) =>
(id, col.map(_ + DefaultKernelUtils.DateTimeConstants.MICROS_PER_HOUR * 8))
def pstTableExpectedResult: Seq[TestRow] = utcTableExpectedResult.map { testRow =>
val values = testRow.toSeq
TestRow(
values(0),
if (values(1) == null) {
null
} else {
values(1).asInstanceOf[Long] + DefaultKernelUtils.DateTimeConstants.MICROS_PER_HOUR * 8
}
)
}

for (timeZone <- Seq("UTC", "Iceland", "PST", "America/Los_Angeles")) {
Expand All @@ -138,30 +137,23 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
).map { tup =>
(new BigDecimal(tup._1), new BigDecimal(tup._2), new BigDecimal(tup._3),
new BigDecimal(tup._4))
}.toSet

// kernel expects a fully qualified path
val path = "file:" + goldenTablePath(tablePath)
val snapshot = Table.forPath(path).getLatestSnapshot(defaultTableClient)

val result = readSnapshot(snapshot).map { row =>
(row.getDecimal(0), row.getDecimal(1), row.getDecimal(2), row.getDecimal(3))
}

assert(expectedResult == result.toSet)
checkTable(
path = "file:" + goldenTablePath(tablePath),
expectedAnswer = expectedResult.map(TestRow.fromTuple(_))
)
}
}

test("end to end: multi-part checkpoint") {
val expectedResult = Seq(0) ++ (0 until 30)

// kernel expects a fully qualified path
val path = "file:" + goldenTablePath("multi-part-checkpoint")
val snapshot = Table.forPath(path).getLatestSnapshot(defaultTableClient)
val result = readSnapshot(snapshot).map { row =>
row.getLong(0)
}
//////////////////////////////////////////////////////////////////////////////////
// Misc tests
//////////////////////////////////////////////////////////////////////////////////

assert(result.toSet == expectedResult.toSet)
test("end to end: multi-part checkpoint") {
checkTable(
path = "file:" + goldenTablePath("multi-part-checkpoint"),
expectedAnswer = (Seq(0L) ++ (0L until 30L)).map(TestRow(_))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.funsuite.AnyFunSuite

// scalastyle:off println
class LogReplaySuite extends AnyFunSuite {
// TODO: refactor to use TestUtils

private val tableClient = DefaultTableClient.create(new Configuration() {{
// Set the batch sizes to small so that we get to test the multiple batch scenarios.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import java.io.File
import java.math.BigDecimal

import org.scalatest.funsuite.AnyFunSuite

import org.apache.hadoop.conf.Configuration
import io.delta.golden.GoldenTableUtils.goldenTableFile

import io.delta.kernel.types.{DecimalType, IntegerType, StructType}

import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader
import io.delta.kernel.defaults.utils.{TestRow, TestUtils}

class ParquetBatchReaderSuite extends AnyFunSuite with TestUtils {

Expand All @@ -47,8 +46,8 @@ class ParquetBatchReaderSuite extends AnyFunSuite with TestUtils {

test("decimals encoded using dictionary encoding ") {
val expectedResult = (0 until 1000000).map { i =>
(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2))
}.toSet
TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2))
}

val readSchema = new StructType()
.add("id", IntegerType.INSTANCE)
Expand All @@ -59,12 +58,8 @@ class ParquetBatchReaderSuite extends AnyFunSuite with TestUtils {
val batchReader = new ParquetBatchReader(new Configuration())
for (file <- Seq(DECIMAL_TYPES_DICT_FILE_V1, DECIMAL_TYPES_DICT_FILE_V2)) {
val batches = batchReader.read(file, readSchema)

val result = batches.toSeq.flatMap(_.getRows.toSeq).map { row =>
(row.getInt(0), row.getDecimal(1), row.getDecimal(2), row.getDecimal(3))
}

assert(expectedResult == result.toSet)
val result = batches.toSeq.flatMap(_.getRows.toSeq)
checkAnswer(result, expectedResult)
}
}

Expand All @@ -80,22 +75,22 @@ class ParquetBatchReaderSuite extends AnyFunSuite with TestUtils {
val expectedResult = (0 until 99998).map { i =>
if (i % 85 == 0) {
val n = BigDecimal.valueOf(i)
(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5))
TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5))
} else {
val negation = if (i % 33 == 0) {
-1
} else {
1
}
val n = BigDecimal.valueOf(i*negation)
(
TestRow(
i,
n.movePointLeft(1),
expand(n).movePointLeft(5),
expand(expand(expand(n))).movePointLeft(5)
)
}
}.toSet
}

val readSchema = new StructType()
.add("id", IntegerType.INSTANCE)
Expand All @@ -106,11 +101,8 @@ class ParquetBatchReaderSuite extends AnyFunSuite with TestUtils {
val batchReader = new ParquetBatchReader(new Configuration())
val batches = batchReader.read(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema)

val result = batches.toSeq.flatMap(_.getRows.toSeq).map { row =>
(row.getInt(0), row.getDecimal(1), row.getDecimal(2), row.getDecimal(3))
}

assert(expectedResult == result.toSet)
val result = batches.toSeq.flatMap(_.getRows.toSeq)
checkAnswer(result, expectedResult)
}

//////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 19b6c9e

Please sign in to comment.