Skip to content

Commit

Permalink
Fix schema evolution issue with nested struct (within a map) and colu…
Browse files Browse the repository at this point in the history
…mn renaming

Resolved the issue described in [Bug delta-io#3227](delta-io#3227) where adding a field inside a struct (nested within a map) while renaming a top column caused the operation to fail.

The fix focuses on handling schema changes without affecting the integrity of existing data structures, specifically avoiding issues with nested fields within a map and renamed columns.

Signed-off-by: Sola Richard Olorunfemi <[email protected]>
  • Loading branch information
Sola Richard Olorunfemi authored and Sola Richard Olorunfemi committed Nov 16, 2024
1 parent 8442bc6 commit 0b489db
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 0 deletions.
98 changes: 98 additions & 0 deletions examples/scala/src/main/scala/example/DeltaWriteExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package example

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object DeltaWriteExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DeltaWriteExample")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()

import spark.implicits._

val tableName = "insert_map_schema_evolution"

try {
// Define initial schema
val initialSchema = StructType(Seq(
StructField("key", IntegerType, nullable = false),
StructField("metrics", MapType(StringType, StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("value", IntegerType, nullable = false)
))))
))

val data = Seq(
Row(1, Map("event" -> Row(1, 1)))
)

val rdd = spark.sparkContext.parallelize(data)

val initialDf = spark.createDataFrame(rdd, initialSchema)

initialDf.write
.option("overwriteSchema", "true")
.mode("overwrite")
.format("delta")
.saveAsTable(s"$tableName")

// Define the schema with simulteneous change in a StructField name
// And additional field in a map column
val evolvedSchema = StructType(Seq(
StructField("renamed_key", IntegerType, nullable = false),
StructField("metrics", MapType(StringType, StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("value", IntegerType, nullable = false),
StructField("comment", StringType, nullable = true)
))))
))

val evolvedData = Seq(
Row(1, Map("event" -> Row(1, 1, "deprecated")))
)

val evolvedRDD = spark.sparkContext.parallelize(evolvedData)

val modifiedDf = spark.createDataFrame(evolvedRDD, evolvedSchema)

// The below would fail without schema evolution for map types
modifiedDf.write
.mode("append")
.option("mergeSchema", "true")
.format("delta")
.insertInto(s"$tableName")

spark.sql(s"SELECT * FROM $tableName").show(false)

} finally {

// Cleanup
spark.sql(s"DROP TABLE IF EXISTS $tableName")

spark.stop()
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap


/**
* Analysis rules for Delta. Currently, these rules enable schema enforcement / evolution with
* INSERT INTO.
Expand Down Expand Up @@ -931,6 +932,10 @@ class DeltaAnalysis(session: SparkSession)
// Keep the type from the query, the target schema will be updated to widen the existing
// type to match it.
attr
case (s: MapType, t: MapType) if s != t =>
addCastsToMaps(tblName, attr, s, t, allowTypeWidening)
case (s: MapType, t: MapType) if s == t =>
attr
case _ =>
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
}
Expand Down Expand Up @@ -1050,6 +1055,7 @@ class DeltaAnalysis(session: SparkSession)
/**
* Recursively casts structs in case it contains null types.
* TODO: Support other complex types like MapType and ArrayType
* addCastsToMaps below should address the MapType todo
*/
private def addCastsToStructs(
tableName: String,
Expand Down Expand Up @@ -1125,6 +1131,39 @@ class DeltaAnalysis(session: SparkSession)
DeltaViewHelper.stripTempViewForMerge(plan, conf)
}

/**
* Recursively casts maps in case it contains null types.
*/
private def addCastsToMaps(
tableName: String,
parent: NamedExpression,
sourceMapType: MapType,
targetMapType: MapType,
allowTypeWidening: Boolean): Expression = {
// First get keys from the map
val keysExpr = MapKeys(parent)

// Create a transformation for the values
val transformLambdaFunc = {
val elementVar = NamedLambdaVariable(
"elementVar", sourceMapType.valueType, sourceMapType.valueContainsNull)
val castedStruct = addCastsToStructs(
tableName,
Alias(elementVar, "")(),
sourceMapType.valueType.asInstanceOf[StructType],
targetMapType.valueType.asInstanceOf[StructType],
allowTypeWidening
)

LambdaFunction(castedStruct, Seq(elementVar))
}
val transformedValues = ArrayTransform(
MapValues(parent), transformLambdaFunc)

// Create new map from keys and transformed values
MapFromArrays(keysExpr, transformedValues)
}

/**
* Verify the input plan for a SINGLE streaming query with the following:
* 1. Schema location must be under checkpoint location, if not lifted by flag
Expand Down

0 comments on commit 0b489db

Please sign in to comment.