From 1a25041c6d53d39eb47ff76d145fc10131789358 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Tue, 19 Mar 2024 11:12:31 +0100 Subject: [PATCH] Update --- .../rowtracking/MaterializedColumnSuite.scala | 77 ++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/MaterializedColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/MaterializedColumnSuite.scala index 02b63ae8438..551dd9f3702 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/MaterializedColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/MaterializedColumnSuite.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.delta.rowtracking -import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaRuntimeException, MaterializedRowCommitVersion, MaterializedRowId, RowTracking} +import org.apache.spark.sql.delta.{DeltaConfigs, DeltaIllegalStateException, DeltaLog, DeltaRuntimeException, MaterializedRowCommitVersion, MaterializedRowId, RowTracking} import org.apache.spark.sql.delta.rowid.RowIdTestUtils import org.apache.spark.sql.catalyst.TableIdentifier @@ -183,5 +183,80 @@ class MaterializedColumnSuite extends RowIdTestUtils } } } + + test(s"clone gives new materialized $name column name for existing empty target table") { + val sourceTableName = "source" + val targetTableName = "target" + + withTable(sourceTableName, targetTableName) { + withRowTrackingEnabled(enabled = true) { + spark.range(end = 1).toDF("col1").write.format("delta").saveAsTable(sourceTableName) + spark.range(end = 0).toDF("col2").write.format("delta").saveAsTable(targetTableName) + val oldMaterializedColumnName = getMaterializedColumnName(targetTableName).get + + sql(s"CREATE OR REPLACE TABLE $targetTableName SHALLOW CLONE $sourceTableName") + + val newMaterializedColumnName = getMaterializedColumnName(targetTableName).get + assert(oldMaterializedColumnName === newMaterializedColumnName) + val sourceMaterializedColName = getMaterializedColumnName(sourceTableName).get + assert(sourceMaterializedColName !== newMaterializedColumnName) + } + } + } + + test("double clone from an empty source table maintains the same " + + s"materialized $name column name") { + val sourceTableName = "source" + val targetTableName = "target" + + withTable(sourceTableName, targetTableName) { + withRowTrackingEnabled(enabled = true) { + spark.range(end = 0).toDF("col1").write.format("delta").saveAsTable(sourceTableName) + + sql(s"CREATE OR REPLACE TABLE $targetTableName SHALLOW CLONE $sourceTableName " + + s"TBLPROPERTIES ('${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true')") + val materializedColumnNameBefore = getMaterializedColumnName(targetTableName) + val sourceMaterializedColName = getMaterializedColumnName(sourceTableName) + assert(sourceMaterializedColName !== materializedColumnNameBefore) + + sql(s"CREATE OR REPLACE TABLE $targetTableName SHALLOW CLONE $sourceTableName") + val materializedColumnNameAfter = getMaterializedColumnName(targetTableName) + assert(materializedColumnNameBefore === materializedColumnNameAfter) + } + } + } + + test(s"self clone of an empty table maintains the same materialized $name column name") { + withRowTrackingEnabled(enabled = true) { + withTestTable { + spark.range(end = 0).toDF(testDataColumnName) + .write.format("delta").mode("overwrite").saveAsTable(testTableName) + + val materializedColumnNameBefore = getMaterializedColumnName(testTableName) + sql(s"CREATE OR REPLACE TABLE $testTableName SHALLOW CLONE $testTableName") + val materializedColumnNameAfter = getMaterializedColumnName(testTableName) + assert(materializedColumnNameBefore === materializedColumnNameAfter) + + val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(testTableName)) + assert(RowTracking.isEnabled(snapshot.protocol, snapshot.metadata)) + } + } + } + + test(s"can't clone materialized $name column name for existing non-empty target") { + val sourceTableName = "source" + val targetTableName = "target" + + withTable(sourceTableName, targetTableName) { + withRowTrackingEnabled(enabled = true) { + spark.range(end = 1).toDF("col1").write.format("delta").saveAsTable(sourceTableName) + spark.range(end = 1).toDF("col2").write.format("delta").saveAsTable(targetTableName) + + assert(intercept[DeltaIllegalStateException] { + sql(s"CREATE OR REPLACE TABLE $targetTableName SHALLOW CLONE $sourceTableName") + }.getErrorClass === "DELTA_UNSUPPORTED_NON_EMPTY_CLONE") + } + } + } } }