Skip to content

Commit

Permalink
[Spark] Column mapping removal: support tables with deletion vectors,…
Browse files Browse the repository at this point in the history
… column constraints and generated columns. (#2753)

## Description
Add additional tests for tables with deletion vectors, generated columns
and column constraints for column mapping removal.

## How was this patch tested?
New unit tests

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
sabir-akhadov authored Mar 18, 2024
1 parent 6ccbabb commit 72fad38
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ trait DeltaColumnMappingBase extends DeltaLogging {
// No change.
(oldMode == newMode) ||
// Downgrade allowed with a flag.
(removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) ||
(removalAllowed && (oldMode != NoMapping && newMode == NoMapping)) ||
// Upgrade always allowed.
(oldMode == NoMapping && newMode == NameMapping)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,19 @@

package org.apache.spark.sql.delta.columnmapping

import org.apache.spark.sql.delta.DeltaAnalysisException
import org.apache.spark.sql.delta.DeltaColumnMappingUnsupportedException
import org.apache.spark.sql.delta.DeltaConfigs
import org.apache.spark.sql.delta.DeltaLog
import io.delta.tables.DeltaTable

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.schema.DeltaInvariantViolationException
import org.apache.spark.sql.delta.sources.DeltaSQLConf._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier

/**
* Test removing column mapping from a table.
*/
class RemoveColumnMappingSuite
extends RemoveColumnMappingSuiteUtils
{
class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils {

test("column mapping cannot be removed without the feature flag") {
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {
Expand All @@ -48,6 +47,16 @@ class RemoveColumnMappingSuite
}
}

test("table without column mapping enabled") {
sql(s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')
|AS SELECT 1 as a
|""".stripMargin)

unsetColumnMappingProperty(useUnset = true)
}

test("invalid column names") {
val invalidColName1 = colName("col1")
val invalidColName2 = colName("col2")
Expand All @@ -58,7 +67,7 @@ class RemoveColumnMappingSuite
|""".stripMargin)
val e = intercept[DeltaAnalysisException] {
// Try to remove column mapping.
sql(s"ALTER TABLE $testTableName SET TBLPROPERTIES ('delta.columnMapping.mode' = 'none')")
unsetColumnMappingProperty(useUnset = true)
}
assert(e.errorClass
.contains("DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING"))
Expand Down Expand Up @@ -217,4 +226,91 @@ class RemoveColumnMappingSuite
assert(sql(s"SELECT $secondColumn FROM $testTableName WHERE $secondColumn IS NOT NULL").count()
== 0)
}

test("remove column mapping from a table with deletion vectors") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = true)
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
sql(s"DELETE FROM $testTableName WHERE $logicalColumnName % 2 = 0")
testRemovingColumnMapping()
}

test("remove column mapping from a table with a generated column") {
// Note: generate expressions are using logical column names and renaming referenced columns
// is forbidden.
DeltaTable.create(spark)
.tableName(testTableName)
.addColumn(logicalColumnName, "LONG")
.addColumn(
DeltaTable.columnBuilder(secondColumn)
.dataType("LONG")
.generatedAlwaysAs(s"$logicalColumnName + 1")
.build())
.property(DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")
.execute()
// Insert data into the table.
spark.range(totalRows)
.selectExpr(s"id as $logicalColumnName")
.writeTo(testTableName)
.append()
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName))
assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn)
testRemovingColumnMapping()
// Verify the generated column is still there.
assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn)
// Insert more rows.
spark.range(totalRows)
.selectExpr(s"id + $totalRows as $logicalColumnName")
.writeTo(testTableName)
.append()
// Verify the generated column values are correct.
checkAnswer(sql(s"SELECT $logicalColumnName, $secondColumn FROM $testTableName"),
(0 until totalRows * 2).map(i => Row(i, i + 1)))
}

test("column constraints are preserved") {
// Note: constraints are using logical column names and renaming is forbidden until
// constraint is dropped.
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
val constraintName = "secondcolumnaddone"
val constraintExpr = s"$secondColumn = $logicalColumnName + 1"
sql(s"ALTER TABLE $testTableName ADD CONSTRAINT " +
s"$constraintName CHECK ($constraintExpr)")
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName))
assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") ==
constraintExpr)
testRemovingColumnMapping()
// Verify the constraint is still there.
assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") ==
constraintExpr)
// Verify the constraint is still enforced.
intercept[DeltaInvariantViolationException] {
sql(s"INSERT INTO $testTableName VALUES (0, 0)")
}
}

test("remove column mapping in id mode") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf._
import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.col

Expand Down Expand Up @@ -52,12 +54,9 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui

import testImplicits._

protected def testRemovingColumnMapping(
unsetTableProperty: Boolean = false): Any = {
protected def testRemovingColumnMapping(unsetTableProperty: Boolean = false): Any = {
// Verify the input data is as expected.
checkAnswer(
spark.table(tableName = testTableName).select(logicalColumnName),
spark.range(totalRows).select(col("id").as(logicalColumnName)))
val originalData = spark.table(tableName = testTableName).select(logicalColumnName).collect()
// Add a schema comment and verify it is preserved after the rewrite.
val comment = "test comment"
sql(s"ALTER TABLE $testTableName ALTER COLUMN $logicalColumnName COMMENT '$comment'")
Expand All @@ -73,10 +72,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
unsetColumnMappingProperty(useUnset = unsetTableProperty)

verifyRewrite(
unsetTableProperty,
unsetTableProperty = unsetTableProperty,
deltaLog,
originalFiles,
startingVersion)
startingVersion,
originalData = originalData)
// Verify the schema comment is preserved after the rewrite.
assert(deltaLog.update().schema.head.getComment().get == comment,
"Should preserve the schema comment.")
Expand All @@ -90,10 +90,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
unsetTableProperty: Boolean,
deltaLog: DeltaLog,
originalFiles: Array[AddFile],
startingVersion: Long): Unit = {
startingVersion: Long,
originalData: Array[Row]): Unit = {
checkAnswer(
spark.table(tableName = testTableName).select(logicalColumnName),
spark.range(totalRows).select(col("id").as(logicalColumnName)))
originalData)

val newSnapshot = deltaLog.update()
assert(newSnapshot.version - startingVersion == 1, "Should rewrite the table in one commit.")
Expand Down

0 comments on commit 72fad38

Please sign in to comment.