diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala b/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala index 4e340d9f71a..9f0ecc40a17 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala @@ -320,7 +320,6 @@ case class UninitializedV1OrV2ParquetCheckpointProvider( ) extends UninitializedV2LikeCheckpointProvider { override val v2CheckpointFormat: V2Checkpoint.Format = V2Checkpoint.Format.PARQUET - } /** @@ -448,7 +447,6 @@ case class V2CheckpointProvider( protected lazy val fileIndexForV2Checkpoint: DeltaLogFileIndex = DeltaLogFileIndex(v2CheckpointFormat.fileFormat, Seq(v2CheckpointFile)).head - override lazy val topLevelFiles: Seq[FileStatus] = Seq(v2CheckpointFile) override lazy val topLevelFileIndex: Option[DeltaLogFileIndex] = Some(fileIndexForV2Checkpoint) override def effectiveCheckpointSizeInBytes(): Long = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index a080618d49e..3e4c3c3cb93 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -166,6 +166,7 @@ case class CheckpointInstance( * Single part checkpoint. * 3. For Multi-part [[CheckpointInstance]]s corresponding to same version, the one with more * parts is greater than the one with less parts. + * 4. For V2 Checkpoints corresponding to same version, we use the fileName as tie breaker. */ override def compare(other: CheckpointInstance): Int = { (version, format, numParts, fileName) compare @@ -189,8 +190,7 @@ object CheckpointInstance { } /** single-file checkpoint format */ - object SINGLE extends Format(0, "SINGLE") - with FormatUsesSidecars + object SINGLE extends Format(0, "SINGLE") with FormatUsesSidecars /** multi-file checkpoint format */ object WITH_PARTS extends Format(1, "WITH_PARTS") /** V2 Checkpoint format */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala index 8f85847ab37..4fb074ebde8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.delta.util import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.delta.{DeltaHistory, DeltaHistoryManager, SerializableFileStatus, SnapshotState} -import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, RemoveFile, SingleAction} -import org.apache.spark.sql.delta.actions.{CheckpointMetadata, SidecarFile} +import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.convert.ConvertTargetFile import org.apache.spark.sql.delta.sources.IndexedFile @@ -81,6 +80,7 @@ private[delta] trait DeltaEncoders { private lazy val _v2CheckpointActionsEncoder = new DeltaEncoder[(CheckpointMetadata, SidecarFile)] implicit def v2CheckpointActionsEncoder: Encoder[(CheckpointMetadata, SidecarFile)] = _v2CheckpointActionsEncoder.get + private lazy val _serializableFileStatusEncoder = new DeltaEncoder[SerializableFileStatus] implicit def serializableFileStatusEncoder: Encoder[SerializableFileStatus] = _serializableFileStatusEncoder.get diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index f4f8084ca7c..5d34c8292a3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -549,8 +549,8 @@ class CheckpointsSuite } } - testDifferentCheckpoints("last checkpoint contains checkpoint schema for v1Checkpoints" + - " and not for v2Checkpoints") { (checkpointPolicy, v2CheckpointFormatOpt) => + testDifferentCheckpoints("last checkpoint contains correct schema for v1/v2" + + " Checkpoints") { (checkpointPolicy, v2CheckpointFormatOpt) => withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)