Skip to content

Commit

Permalink
Fix schema evolution issue with nested struct (within a map) and colu…
Browse files Browse the repository at this point in the history
…mn renaming

Resolved the issue described in [Bug delta-io#3227](delta-io#3227) where adding a field inside a struct (nested within a map) while renaming a top column caused the operation to fail.

The fix focuses on handling schema changes without affecting the integrity of existing data structures, specifically avoiding issues with nested fields within a map and renamed columns.

fix!:renamed the added DeltaWriteExample to EvolutionW
ithMap

fix!: Modified TypeWideningInsertSchemaEvolutionSuite to accommodate that schema evolution is now allowed for maps

Signed-off-by: Sola Richard Olorunfemi <[email protected]>

fix!: addCastToMap to handle complex types. Added tests to cover new abilities

fix: resolved scalaStyle error

fix: yet another scalaStyle issue

fix!:made some schema evolution for maps tests in DeltaInsertIntoTableSuite more flexible

fix: DeltaAnalysis
  • Loading branch information
Sola Richard Olorunfemi authored and Sola Richard Olorunfemi committed Nov 23, 2024
1 parent ec0ab0d commit b1ab9ef
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 26 deletions.
98 changes: 98 additions & 0 deletions examples/scala/src/main/scala/example/EvolutionWithMap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package example

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object EvolutionWithMap {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("EvolutionWithMap")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()

import spark.implicits._

val tableName = "insert_map_schema_evolution"

try {
// Define initial schema
val initialSchema = StructType(Seq(
StructField("key", IntegerType, nullable = false),
StructField("metrics", MapType(StringType, StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("value", IntegerType, nullable = false)
))))
))

val data = Seq(
Row(1, Map("event" -> Row(1, 1)))
)

val rdd = spark.sparkContext.parallelize(data)

val initialDf = spark.createDataFrame(rdd, initialSchema)

initialDf.write
.option("overwriteSchema", "true")
.mode("overwrite")
.format("delta")
.saveAsTable(s"$tableName")

// Define the schema with simulteneous change in a StructField name
// And additional field in a map column
val evolvedSchema = StructType(Seq(
StructField("renamed_key", IntegerType, nullable = false),
StructField("metrics", MapType(StringType, StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("value", IntegerType, nullable = false),
StructField("comment", StringType, nullable = true)
))))
))

val evolvedData = Seq(
Row(1, Map("event" -> Row(1, 1, "deprecated")))
)

val evolvedRDD = spark.sparkContext.parallelize(evolvedData)

val modifiedDf = spark.createDataFrame(evolvedRDD, evolvedSchema)

// The below would fail without schema evolution for map types
modifiedDf.write
.mode("append")
.option("mergeSchema", "true")
.format("delta")
.insertInto(s"$tableName")

spark.sql(s"SELECT * FROM $tableName").show(false)

} finally {

// Cleanup
spark.sql(s"DROP TABLE IF EXISTS $tableName")

spark.stop()
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap


/**
* Analysis rules for Delta. Currently, these rules enable schema enforcement / evolution with
* INSERT INTO.
Expand Down Expand Up @@ -913,8 +914,8 @@ class DeltaAnalysis(session: SparkSession)
}

private def addCastToColumn(
attr: Attribute,
targetAttr: Attribute,
attr: NamedExpression,
targetAttr: NamedExpression,
tblName: String,
allowTypeWidening: Boolean): NamedExpression = {
val expr = (attr.dataType, targetAttr.dataType) match {
Expand All @@ -930,6 +931,11 @@ class DeltaAnalysis(session: SparkSession)
// Keep the type from the query, the target schema will be updated to widen the existing
// type to match it.
attr
case (s: MapType, t: MapType)
if !DataType.equalsStructurally(s, t, ignoreNullability = true) || allowTypeWidening =>
// only trigger addCastsToMaps if exits differences like extra fields, renaming
// Or allowTypeWidening is enabled
addCastsToMaps(tblName, attr, s, t, allowTypeWidening)
case _ =>
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
}
Expand Down Expand Up @@ -1047,8 +1053,7 @@ class DeltaAnalysis(session: SparkSession)
}

/**
* Recursively casts structs in case it contains null types.
* TODO: Support other complex types like MapType and ArrayType
* Recursively casts struct data types in case the source/target type differs.
*/
private def addCastsToStructs(
tableName: String,
Expand Down Expand Up @@ -1124,6 +1129,65 @@ class DeltaAnalysis(session: SparkSession)
DeltaViewHelper.stripTempViewForMerge(plan, conf)
}

/**
* Recursively casts map data types in case the key/value type differs.
*/
private def addCastsToMaps(
tableName: String,
parent: NamedExpression,
sourceMapType: MapType,
targetMapType: MapType,
allowTypeWidening: Boolean): Expression = {

val transformedKeys =
if (sourceMapType.keyType != targetMapType.keyType) {
// Create a transformation for the keys
ArrayTransform(MapKeys(parent), {
val key = NamedLambdaVariable(
"key", sourceMapType.keyType, nullable = false)

val keyAttr = AttributeReference(
"key", targetMapType.keyType, nullable = false)()

val castedKey =
addCastToColumn(
key,
keyAttr,
tableName,
allowTypeWidening
)
LambdaFunction(castedKey, Seq(key))
})
} else {
MapKeys(parent)
}

val transformedValues =
if (sourceMapType.valueType != targetMapType.valueType) {
// Create a transformation for the values
ArrayTransform(MapValues(parent), {
val value = NamedLambdaVariable(
"value", sourceMapType.valueType, sourceMapType.valueContainsNull)

val valueAttr = AttributeReference(
"value", targetMapType.valueType, sourceMapType.valueContainsNull)()

val castedValue =
addCastToColumn(
value,
valueAttr,
tableName,
allowTypeWidening
)
LambdaFunction(castedValue, Seq(value))
})
} else {
MapValues(parent)
}
// Create new map from transformed keys and values
MapFromArrays(transformedKeys, transformedValues)
}

/**
* Verify the input plan for a SINGLE streaming query with the following:
* 1. Schema location must be under checkpoint location, if not lifted by flag
Expand Down
Loading

0 comments on commit b1ab9ef

Please sign in to comment.