Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed Sep 22, 2023
1 parent f2ca7e0 commit 2e638a3
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ case class UninitializedV1OrV2ParquetCheckpointProvider(
) extends UninitializedV2LikeCheckpointProvider {

override val v2CheckpointFormat: V2Checkpoint.Format = V2Checkpoint.Format.PARQUET

}

/**
Expand Down Expand Up @@ -448,7 +447,6 @@ case class V2CheckpointProvider(
protected lazy val fileIndexForV2Checkpoint: DeltaLogFileIndex =
DeltaLogFileIndex(v2CheckpointFormat.fileFormat, Seq(v2CheckpointFile)).head


override lazy val topLevelFiles: Seq[FileStatus] = Seq(v2CheckpointFile)
override lazy val topLevelFileIndex: Option[DeltaLogFileIndex] = Some(fileIndexForV2Checkpoint)
override def effectiveCheckpointSizeInBytes(): Long =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ case class CheckpointInstance(
* Single part checkpoint.
* 3. For Multi-part [[CheckpointInstance]]s corresponding to same version, the one with more
* parts is greater than the one with less parts.
* 4. For V2 Checkpoints corresponding to same version, we use the fileName as tie breaker.
*/
override def compare(other: CheckpointInstance): Int = {
(version, format, numParts, fileName) compare
Expand All @@ -189,8 +190,7 @@ object CheckpointInstance {
}

/** single-file checkpoint format */
object SINGLE extends Format(0, "SINGLE")
with FormatUsesSidecars
object SINGLE extends Format(0, "SINGLE") with FormatUsesSidecars
/** multi-file checkpoint format */
object WITH_PARTS extends Format(1, "WITH_PARTS")
/** V2 Checkpoint format */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.sql.delta.util
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.sql.delta.{DeltaHistory, DeltaHistoryManager, SerializableFileStatus, SnapshotState}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, RemoveFile, SingleAction}
import org.apache.spark.sql.delta.actions.{CheckpointMetadata, SidecarFile}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.convert.ConvertTargetFile
import org.apache.spark.sql.delta.sources.IndexedFile

Expand Down Expand Up @@ -81,6 +80,7 @@ private[delta] trait DeltaEncoders {
private lazy val _v2CheckpointActionsEncoder = new DeltaEncoder[(CheckpointMetadata, SidecarFile)]
implicit def v2CheckpointActionsEncoder: Encoder[(CheckpointMetadata, SidecarFile)] =
_v2CheckpointActionsEncoder.get

private lazy val _serializableFileStatusEncoder = new DeltaEncoder[SerializableFileStatus]
implicit def serializableFileStatusEncoder: Encoder[SerializableFileStatus] =
_serializableFileStatusEncoder.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ class CheckpointsSuite
}
}

testDifferentCheckpoints("last checkpoint contains checkpoint schema for v1Checkpoints" +
" and not for v2Checkpoints") { (checkpointPolicy, v2CheckpointFormatOpt) =>
testDifferentCheckpoints("last checkpoint contains correct schema for v1/v2" +
" Checkpoints") { (checkpointPolicy, v2CheckpointFormatOpt) =>
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
Expand Down

0 comments on commit 2e638a3

Please sign in to comment.