From f2ca7e0075af0c87646d8fc680b0fb7863431fce Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 14 Sep 2023 20:04:44 -0700 Subject: [PATCH] Read side changes for V2 Checkpoints --- .../spark/sql/delta/CheckpointProvider.scala | 364 +++++++++++++++++- .../apache/spark/sql/delta/Checkpoints.scala | 64 ++- .../org/apache/spark/sql/delta/DeltaLog.scala | 9 + .../org/apache/spark/sql/delta/Snapshot.scala | 20 +- .../spark/sql/delta/SnapshotManagement.scala | 8 + .../sql/delta/actions/InMemoryLogReplay.scala | 1 + .../sql/delta/sources/DeltaSQLConf.scala | 8 + .../spark/sql/delta/util/DeltaEncoders.scala | 4 + .../spark/sql/delta/CheckpointsSuite.scala | 290 +++++++++++++- .../spark/sql/delta/DeltaTestUtils.scala | 25 ++ .../delta/stats/DataSkippingDeltaTests.scala | 23 ++ 11 files changed, 796 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala b/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala index 45741217c47..4e340d9f71a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala @@ -16,15 +16,24 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.actions.Protocol +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.FileNames._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.Path import org.apache.spark.sql.Dataset import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ThreadUtils /** * Represents basic information about a checkpoint. @@ -43,6 +52,14 @@ trait UninitializedCheckpointProvider { * These files could be reused again to initialize the [[CheckpointProvider]]. */ def topLevelFiles: Seq[FileStatus] + + /** + * File index which could help derive actions stored in top level files + * for the checkpoint. + * This could be used to get [[Protocol]], [[Metadata]] etc from a checkpoint. + * This could also be used if we want to shallow copy a checkpoint. + */ + def topLevelFileIndex: Option[DeltaLogFileIndex] } /** @@ -62,6 +79,49 @@ trait CheckpointProvider extends UninitializedCheckpointProvider { object CheckpointProvider extends DeltaLogging { + /** Converts an [[UninitializedCheckpointProvider]] into a [[CheckpointProvider]] */ + def apply( + spark: SparkSession, + snapshotDescriptor: SnapshotDescriptor, + checksumOpt: Option[VersionChecksum], + uninitializedCheckpointProvider: UninitializedCheckpointProvider) + : CheckpointProvider = uninitializedCheckpointProvider match { + // Note: snapshotDescriptor.protocol should be accessed as late as possible inside the futures + // as it might need I/O. + case uninitializedV2CheckpointProvider: UninitializedV2CheckpointProvider => + new LazyCompleteCheckpointProvider(uninitializedV2CheckpointProvider) { + override def createCheckpointProvider(): CheckpointProvider = { + val (checkpointMetadata, sidecarFiles) = + uninitializedV2CheckpointProvider.finishIOAndGetActions() + require(isV2CheckpointEnabled(snapshotDescriptor)) + V2CheckpointProvider(uninitializedV2CheckpointProvider, checkpointMetadata, sidecarFiles) + } + } + case provider: UninitializedV1OrV2ParquetCheckpointProvider + if !isV2CheckpointEnabled(checksumOpt).contains(false) => + // Either v2 checkpoints are enabled, or we lack a Protocol to prove otherwise. + val checkpointReadFuture = SnapshotManagement.checkpointV2ThreadPool.submit(spark) { + readV2ActionsFromParquetCheckpoint( + spark, provider.logPath, provider.fileStatus, snapshotDescriptor.deltaLog.options) + } + new LazyCompleteCheckpointProvider(provider) { + override def createCheckpointProvider(): CheckpointProvider = { + val (checkpointMetadataOpt, sidecarFiles) = + ThreadUtils.awaitResult(checkpointReadFuture, Duration.Inf) + checkpointMetadataOpt match { + case Some(cm) => + require(isV2CheckpointEnabled(snapshotDescriptor)) + V2CheckpointProvider(provider, cm, sidecarFiles) + case None => + PreloadedCheckpointProvider(provider.topLevelFiles, provider.lastCheckpointInfoOpt) + } + } + } + case provider: UninitializedV1OrV2ParquetCheckpointProvider => + // V2 checkpoints are not enabled + PreloadedCheckpointProvider(provider.topLevelFiles, provider.lastCheckpointInfoOpt) + } + private[delta] def isV2CheckpointEnabled(protocol: Protocol): Boolean = protocol.isFeatureSupported(V2CheckpointTableFeature) @@ -71,6 +131,123 @@ object CheckpointProvider extends DeltaLogging { */ def isV2CheckpointEnabled(snapshotDescriptor: SnapshotDescriptor): Boolean = isV2CheckpointEnabled(snapshotDescriptor.protocol) + + /** + * Returns: + * - Some(true) if V2 Checkpoints are enabled for the snapshot corresponding to the given + * `checksumOpt`. + * - Some(false) if V2 Checkpoints are disabled for the snapshot + * - None if the given checksumOpt is not sufficient to identify if v2 checkpoints are enabled or + * not. + */ + def isV2CheckpointEnabled(checksumOpt: Option[VersionChecksum]): Option[Boolean] = { + checksumOpt.flatMap(checksum => Option(checksum.protocol)).map(isV2CheckpointEnabled) + } + + private def sendEventForV2CheckpointRead( + startTimeMs: Long, + fileStatus: FileStatus, + fileType: String, + logPath: Path, + exception: Option[Throwable]): Unit = { + recordDeltaEvent( + deltaLog = null, + opType = "delta.checkpointV2.readV2ActionsFromCheckpoint", + data = Map( + "timeTakenMs" -> (System.currentTimeMillis() - startTimeMs), + "v2CheckpointPath" -> fileStatus.getPath.toString, + "v2CheckpointSize" -> fileStatus.getLen, + "errorMessage" -> exception.map(_.toString).getOrElse(""), + "fileType" -> fileType + ), + path = Some(logPath.getParent) + ) + } + + /** Reads and returns the [[CheckpointMetadata]] and [[SidecarFile]]s from a json v2 checkpoint */ + private[delta] def readV2ActionsFromJsonCheckpoint( + logStore: LogStore, + logPath: Path, + fileStatus: FileStatus, + hadoopConf: Configuration): (CheckpointMetadata, Seq[SidecarFile]) = { + val startTimeMs = System.currentTimeMillis() + try { + var checkpointMetadataOpt: Option[CheckpointMetadata] = None + val sidecarFileActions: ArrayBuffer[SidecarFile] = ArrayBuffer.empty + logStore + .readAsIterator(fileStatus, hadoopConf) + .map(Action.fromJson) + .foreach { + case cm: CheckpointMetadata if checkpointMetadataOpt.isEmpty => + checkpointMetadataOpt = Some(cm) + case cm: CheckpointMetadata => + throw new IllegalStateException( + "More than 1 CheckpointMetadata actions found in the checkpoint file") + case sidecarFile: SidecarFile => + sidecarFileActions.append(sidecarFile) + case _ => () + } + val checkpointMetadata = checkpointMetadataOpt.getOrElse { + throw new IllegalStateException("Json V2 Checkpoint has no CheckpointMetadata action") + } + sendEventForV2CheckpointRead(startTimeMs, fileStatus, "json", logPath, exception = None) + (checkpointMetadata, sidecarFileActions.toSeq) + } catch { + case NonFatal(e) => + sendEventForV2CheckpointRead(startTimeMs, fileStatus, "json", logPath, exception = Some(e)) + throw e + } + } + + /** + * Reads and returns the optional [[CheckpointMetadata]], [[SidecarFile]]s from a parquet + * checkpoint. + * The checkpoint metadata returned might be None if the underlying parquet file is not a v2 + * checkpoint. + */ + private[delta] def readV2ActionsFromParquetCheckpoint( + spark: SparkSession, + logPath: Path, + fileStatus: FileStatus, + deltaLogOptions: Map[String, String]): (Option[CheckpointMetadata], Seq[SidecarFile]) = { + val startTimeMs = System.currentTimeMillis() + try { + val relation = DeltaLog.indexToRelation( + spark, + DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, Seq(fileStatus)).get, + additionalOptions = deltaLogOptions, + Action.logSchema + ) + import implicits._ + val rows = Dataset.ofRows(spark, relation) + .select("checkpointMetadata", "sidecar") + .where("checkpointMetadata.version is not null or sidecar.path is not null") + .as[(CheckpointMetadata, SidecarFile)] + .collect() + + var checkpointMetadata: Option[CheckpointMetadata] = None + val checkpointSidecarFiles = ArrayBuffer.empty[SidecarFile] + rows.foreach { + case (cm: CheckpointMetadata, _) if checkpointMetadata.isEmpty => + checkpointMetadata = Some(cm) + case (cm: CheckpointMetadata, _) => + throw new IllegalStateException( + "More than 1 CheckpointMetadata actions found in the checkpoint file") + case (_, sf: SidecarFile) => + checkpointSidecarFiles.append(sf) + } + if (checkpointMetadata.isEmpty && checkpointSidecarFiles.nonEmpty) { + throw new IllegalStateException( + "sidecar files present in checkpoint even when checkpoint metadata is missing") + } + sendEventForV2CheckpointRead(startTimeMs, fileStatus, "parquet", logPath, exception = None) + (checkpointMetadata, checkpointSidecarFiles.toSeq) + } catch { + case NonFatal(e) => + sendEventForV2CheckpointRead(startTimeMs, fileStatus, "parquet", logPath, Some(e)) + throw e + } + } } /** @@ -96,6 +273,8 @@ case class PreloadedCheckpointProvider( override def effectiveCheckpointSizeInBytes(): Long = fileIndex.sizeInBytes override def allActionsFileIndexes(): Seq[DeltaLogFileIndex] = Seq(fileIndex) + + override lazy val topLevelFileIndex: Option[DeltaLogFileIndex] = Some(fileIndex) } /** @@ -112,4 +291,187 @@ object EmptyCheckpointProvider extends CheckpointProvider { override def topLevelFiles: Seq[FileStatus] = Nil override def effectiveCheckpointSizeInBytes(): Long = 0L override def allActionsFileIndexes(): Seq[DeltaLogFileIndex] = Nil + override def topLevelFileIndex: Option[DeltaLogFileIndex] = None +} + +trait UninitializedV2LikeCheckpointProvider extends UninitializedCheckpointProvider { + def fileStatus: FileStatus + def logPath: Path + def lastCheckpointInfoOpt: Option[LastCheckpointInfo] + def v2CheckpointFormat: V2Checkpoint.Format + + override lazy val topLevelFiles: Seq[FileStatus] = Seq(fileStatus) + override lazy val topLevelFileIndex: Option[DeltaLogFileIndex] = + DeltaLogFileIndex(v2CheckpointFormat.fileFormat, topLevelFiles) +} + +/** + * An implementation of [[UninitializedCheckpointProvider]] to represent a parquet checkpoint + * which could be either a v1 checkpoint or v2 checkpoint. + * This needs to be resolved into a [[PreloadedCheckpointProvider]] or a [[V2CheckpointProvider]] + * depending on whether the [[CheckpointMetadata]] action is present or not in the underlying + * parquet file. + */ +case class UninitializedV1OrV2ParquetCheckpointProvider( + override val version: Long, + override val fileStatus: FileStatus, + override val logPath: Path, + override val lastCheckpointInfoOpt: Option[LastCheckpointInfo] +) extends UninitializedV2LikeCheckpointProvider { + + override val v2CheckpointFormat: V2Checkpoint.Format = V2Checkpoint.Format.PARQUET + +} + +/** + * An implementation of [[UninitializedCheckpointProvider]] to for v2 checkpoints. + * This needs to be resolved into a [[V2CheckpointProvider]]. + * This class starts an I/O to fetch the V2 actions ([[CheckpointMetadata]], [[SidecarFile]]) as + * soon as the class is initialized so that the extra overhead could be parallelized with other + * operations like reading CRC. + */ +case class UninitializedV2CheckpointProvider( + override val version: Long, + override val fileStatus: FileStatus, + override val logPath: Path, + hadoopConf: Configuration, + deltaLogOptions: Map[String, String], + logStore: LogStore, + override val lastCheckpointInfoOpt: Option[LastCheckpointInfo] +) extends UninitializedV2LikeCheckpointProvider { + + override val v2CheckpointFormat: V2Checkpoint.Format = + V2Checkpoint.toFormat(fileStatus.getPath.getName) + + // Try to get the required actions from LastCheckpointInfo + private val v2ActionsFromLastCheckpointOpt: Option[(CheckpointMetadata, Seq[SidecarFile])] = { + lastCheckpointInfoOpt + .flatMap(_.v2Checkpoint) + .map(v2 => (v2.checkpointMetadataOpt, v2.sidecarFiles)) + .collect { + case (Some(checkpointMetadata), Some(sidecarFiles)) => + (checkpointMetadata, sidecarFiles) + } + } + + /** Helper method to do I/O and read v2 actions from the underlying v2 checkpoint file */ + private def readV2Actions( + spark: SparkSession, logStore: LogStore): (CheckpointMetadata, Seq[SidecarFile]) = { + v2CheckpointFormat match { + case V2Checkpoint.Format.JSON => + CheckpointProvider.readV2ActionsFromJsonCheckpoint( + logStore, logPath, fileStatus, hadoopConf) + case V2Checkpoint.Format.PARQUET => + val (checkpointMetadataOpt, sidecarFiles) = + CheckpointProvider.readV2ActionsFromParquetCheckpoint( + spark, logPath, fileStatus, deltaLogOptions) + val checkpointMetadata = checkpointMetadataOpt.getOrElse { + throw new IllegalStateException( + s"CheckpointMetadata action not found in v2 checkpoint ${fileStatus.getPath}") + } + (checkpointMetadata, sidecarFiles) + } + } + + private val checkpointReadFuture: Future[(CheckpointMetadata, Seq[SidecarFile])] = { + v2ActionsFromLastCheckpointOpt match { + case Some(result) => Future.successful(result) + case None => + val spark = SparkSession.getActiveSession.getOrElse { + throw DeltaErrors.sparkSessionNotSetException() + } + SnapshotManagement.checkpointV2ThreadPool.submit(spark) { readV2Actions(spark, logStore) } + } + } + + /** + * Finish any underlying I/O if needed and returns the v2 checkpoint actions + * i.e. [[CheckpointMetadata]] and Seq of [[SidecarFile]]s + */ + def finishIOAndGetActions( + timeout: Duration = Duration.Inf): (CheckpointMetadata, Seq[SidecarFile]) = + ThreadUtils.awaitResult(checkpointReadFuture, timeout) +} + +/** + * A wrapper implementation of [[CheckpointProvider]] which wraps + * `underlyingCheckpointProviderFuture` and `uninitializedCheckpointProvider` for implementing all + * the [[UninitializedCheckpointProvider]] and [[CheckpointProvider]] APIs. + * + * @param uninitializedCheckpointProvider the underlying [[UninitializedCheckpointProvider]] + */ +abstract class LazyCompleteCheckpointProvider( + uninitializedCheckpointProvider: UninitializedCheckpointProvider) + extends CheckpointProvider { + + override def version: Long = uninitializedCheckpointProvider.version + override def topLevelFiles: Seq[FileStatus] = uninitializedCheckpointProvider.topLevelFiles + override def topLevelFileIndex: Option[DeltaLogFileIndex] = + uninitializedCheckpointProvider.topLevelFileIndex + + protected def createCheckpointProvider(): CheckpointProvider + + lazy val underlyingCheckpointProvider: CheckpointProvider = createCheckpointProvider() + + override def effectiveCheckpointSizeInBytes(): Long = + underlyingCheckpointProvider.effectiveCheckpointSizeInBytes() + + override def allActionsFileIndexes(): Seq[DeltaLogFileIndex] = + underlyingCheckpointProvider.allActionsFileIndexes() +} + +/** + * [[CheckpointProvider]] implementation for Json/Parquet V2 checkpoints. + * + * @param version checkpoint version for the underlying checkpoint + * @param v2CheckpointFile [[FileStatus]] for the json/parquet v2 checkpoint file + * @param v2CheckpointFormat format (json/parquet) for the v2 checkpoint + * @param checkpointMetadata [[CheckpointMetadata]] for the v2 checkpoint + * @param sidecarFiles seq of [[SidecarFile]] for the v2 checkpoint + * @param lastCheckpointInfoOpt optional last checkpoint info for the v2 checkpoint + * @param logPath delta log path for the underlying delta table + */ +case class V2CheckpointProvider( + override val version: Long, + v2CheckpointFile: FileStatus, + v2CheckpointFormat: V2Checkpoint.Format, + checkpointMetadata: CheckpointMetadata, + sidecarFiles: Seq[SidecarFile], + lastCheckpointInfoOpt: Option[LastCheckpointInfo], + logPath: Path + ) extends CheckpointProvider with DeltaLogging { + + private[delta] def sidecarFileStatuses: Seq[FileStatus] = + sidecarFiles.map(_.toFileStatus(logPath)) + protected lazy val fileIndexesForSidecarFiles: Seq[DeltaLogFileIndex] = + DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, sidecarFileStatuses).toSeq + protected lazy val fileIndexForV2Checkpoint: DeltaLogFileIndex = + DeltaLogFileIndex(v2CheckpointFormat.fileFormat, Seq(v2CheckpointFile)).head + + + override lazy val topLevelFiles: Seq[FileStatus] = Seq(v2CheckpointFile) + override lazy val topLevelFileIndex: Option[DeltaLogFileIndex] = Some(fileIndexForV2Checkpoint) + override def effectiveCheckpointSizeInBytes(): Long = + sidecarFiles.map(_.sizeInBytes).sum + v2CheckpointFile.getLen + override def allActionsFileIndexes(): Seq[DeltaLogFileIndex] = + topLevelFileIndex.toSeq ++ fileIndexesForSidecarFiles + +} + +object V2CheckpointProvider { + + /** Alternate constructor which uses [[UninitializedV2LikeCheckpointProvider]] */ + def apply( + uninitializedV2LikeCheckpointProvider: UninitializedV2LikeCheckpointProvider, + checkpointMetadata: CheckpointMetadata, + sidecarFiles: Seq[SidecarFile]): V2CheckpointProvider = { + V2CheckpointProvider( + uninitializedV2LikeCheckpointProvider.version, + uninitializedV2LikeCheckpointProvider.fileStatus, + uninitializedV2LikeCheckpointProvider.v2CheckpointFormat, + checkpointMetadata, + sidecarFiles, + uninitializedV2LikeCheckpointProvider.lastCheckpointInfoOpt, + uninitializedV2LikeCheckpointProvider.logPath) + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 924cec3419f..a080618d49e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -54,10 +54,16 @@ import org.apache.spark.util.Utils /** * A class to help with comparing checkpoints with each other, where we may have had concurrent * writers that checkpoint with different number of parts. + * The `numParts` field will be present only for multipart checkpoints (represented by + * Format.WITH_PARTS). + * The `fileName` field is present only for V2 Checkpoints (represented by Format.V2) + * These additional fields are used as a tie breaker when comparing multiple checkpoint + * instance of same Format for the same `version`. */ case class CheckpointInstance( version: Long, format: CheckpointInstance.Format, + fileName: Option[String] = None, numParts: Option[Int] = None) extends Ordered[CheckpointInstance] { // Assert that numParts are present when checkpoint format is Format.WITH_PARTS. @@ -65,6 +71,11 @@ case class CheckpointInstance( require((format == CheckpointInstance.Format.WITH_PARTS) == numParts.isDefined, s"numParts ($numParts) must be present for checkpoint format" + s" ${CheckpointInstance.Format.WITH_PARTS.name}") + // Assert that filePath is present only when checkpoint format is Format.V2. + // For other formats, filePath must be None. + require((format == CheckpointInstance.Format.V2) == fileName.isDefined, + s"fileName ($fileName) must be present for checkpoint format" + + s" ${CheckpointInstance.Format.V2.name}") /** * Returns a [[CheckpointProvider]] which can tell the files corresponding to this @@ -81,6 +92,25 @@ case class CheckpointInstance( val lastCheckpointInfo = lastCheckpointInfoHint.filter(cm => CheckpointInstance(cm) == this) val cpFiles = filterFiles(deltaLog, filesForCheckpointConstruction) format match { + // Treat single file checkpoints also as V2 Checkpoints because we don't know if it is + // actually a V2 checkpoint until we read it. + case CheckpointInstance.Format.V2 | CheckpointInstance.Format.SINGLE => + assert(cpFiles.size == 1) + val fileStatus = cpFiles.head + if (format == CheckpointInstance.Format.V2) { + val hadoopConf = deltaLog.newDeltaHadoopConf() + UninitializedV2CheckpointProvider( + version, + fileStatus, + logPath, + hadoopConf, + deltaLog.options, + deltaLog.store, + lastCheckpointInfo) + } else { + UninitializedV1OrV2ParquetCheckpointProvider( + version, fileStatus, logPath, lastCheckpointInfo) + } case CheckpointInstance.Format.WITH_PARTS | CheckpointInstance.Format.SINGLE => PreloadedCheckpointProvider(cpFiles, lastCheckpointInfo) case CheckpointInstance.Format.SENTINEL => @@ -93,6 +123,23 @@ case class CheckpointInstance( filesForCheckpointConstruction: Seq[FileStatus]) : Seq[FileStatus] = { val logPath = deltaLog.logPath format match { + // Treat Single File checkpoints also as V2 Checkpoints because we don't know if it is + // actually a V2 checkpoint until we read it. + case format if format.usesSidecars => + val checkpointFileName = format match { + case CheckpointInstance.Format.V2 => fileName.get + case CheckpointInstance.Format.SINGLE => checkpointFileSingular(logPath, version).getName + case other => + throw new IllegalStateException(s"Unknown checkpoint format $other supporting sidecars") + } + val fileStatus = filesForCheckpointConstruction + .find(_.getPath.getName == checkpointFileName) + .getOrElse { + throw new IllegalStateException("Failed in getting the file information for:\n" + + fileName.get + "\namong\n" + + filesForCheckpointConstruction.map(_.getPath.getName).mkString(" -", "\n -", "")) + } + Seq(fileStatus) case CheckpointInstance.Format.WITH_PARTS | CheckpointInstance.Format.SINGLE => val filePaths = if (format == CheckpointInstance.Format.WITH_PARTS) { checkpointFileWithParts(logPath, version, numParts.get).toSet @@ -121,26 +168,33 @@ case class CheckpointInstance( * parts is greater than the one with less parts. */ override def compare(other: CheckpointInstance): Int = { - (version, format, numParts) compare (other.version, other.format, other.numParts) + (version, format, numParts, fileName) compare + (other.version, other.format, other.numParts, other.fileName) } } object CheckpointInstance { sealed abstract class Format(val ordinal: Int, val name: String) extends Ordered[Format] { override def compare(other: Format): Int = ordinal compare other.ordinal + def usesSidecars: Boolean = this.isInstanceOf[FormatUsesSidecars] } + trait FormatUsesSidecars object Format { def unapply(name: String): Option[Format] = name match { case SINGLE.name => Some(SINGLE) case WITH_PARTS.name => Some(WITH_PARTS) + case V2.name => Some(V2) case _ => None } /** single-file checkpoint format */ object SINGLE extends Format(0, "SINGLE") + with FormatUsesSidecars /** multi-file checkpoint format */ object WITH_PARTS extends Format(1, "WITH_PARTS") + /** V2 Checkpoint format */ + object V2 extends Format(2, "V2") with FormatUsesSidecars /** Sentinel, for internal use only */ object SENTINEL extends Format(Int.MaxValue, "SENTINEL") } @@ -150,6 +204,12 @@ object CheckpointInstance { // * .checkpoint.parquet // * .checkpoint...parquet path.getName.split("\\.") match { + case Array(v, "checkpoint", uniqueStr, format) if Seq("json", "parquet").contains(format) => + CheckpointInstance( + version = v.toLong, + format = Format.V2, + numParts = None, + fileName = Some(path.getName)) case Array(v, "checkpoint", "parquet") => CheckpointInstance(v.toLong, Format.SINGLE, numParts = None) case Array(v, "checkpoint", _, n, "parquet") => @@ -384,6 +444,8 @@ trait Checkpoints extends DeltaLogging { case CheckpointInstance.Format.WITH_PARTS => assert(ci.numParts.nonEmpty, "Multi-Part Checkpoint must have non empty numParts") matchingCheckpointInstances.length == ci.numParts.get + case CheckpointInstance.Format.V2 => + matchingCheckpointInstances.length == 1 case CheckpointInstance.Format.SENTINEL => false } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 4e5d3362d1c..09afb11ef2e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources._ import org.apache.spark.sql.delta.storage.LogStoreProvider +import org.apache.spark.sql.delta.util.FileNames import com.google.common.cache.{CacheBuilder, RemovalNotification} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -87,6 +88,14 @@ class DeltaLog private( import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.delta.util.FileNames._ + /** + * Path to sidecar directory. + * This is intentionally kept `lazy val` as otherwise any other constructor codepaths in DeltaLog + * (e.g. SnapshotManagement etc) will see it as null as they are executed before this line is + * called. + */ + lazy val sidecarDirPath: Path = FileNames.sidecarDirPath(logPath) + protected def spark = SparkSession.active diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 463303512e4..a2042568ca8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -207,12 +207,22 @@ class Snapshot( * Pulls the protocol and metadata of the table from the files that are used to compute the * Snapshot directly--without triggering a full state reconstruction. This is important, because * state reconstruction depends on protocol and metadata for correctness. + * + * Also this method should only access methods defined in [[UninitializedCheckpointProvider]] + * which are not present in [[CheckpointProvider]]. This is because initialization of + * [[Snapshot.checkpointProvider]] depends on [[Snapshot.protocolAndMetadataReconstruction()]] + * and so if [[Snapshot.protocolAndMetadataReconstruction()]] starts depending on + * [[Snapshot.checkpointProvider]] then there will be cyclic dependency. */ protected def protocolAndMetadataReconstruction(): Array[(Protocol, Metadata)] = { import implicits._ val schemaToUse = Action.logSchema(Set("protocol", "metaData")) - fileIndices.map(deltaLog.loadIndex(_, schemaToUse)) + val checkpointOpt = checkpointProvider.topLevelFileIndex.map { index => + deltaLog.loadIndex(index, schemaToUse) + .withColumn(COMMIT_VERSION_COLUMN, lit(checkpointProvider.version)) + } + (checkpointOpt ++ deltaFileIndexOpt.map(deltaLog.loadIndex(_, schemaToUse)).toSeq) .reduceOption(_.union(_)).getOrElse(emptyDF) .select("protocol", "metaData", COMMIT_VERSION_COLUMN) .where("protocol.minReaderVersion is not null or metaData.id is not null") @@ -367,8 +377,12 @@ class Snapshot( /** The [[CheckpointProvider]] for the underlying checkpoint */ lazy val checkpointProvider: CheckpointProvider = logSegment.checkpointProvider match { - case cp: CheckpointProvider => cp - case o => throw new IllegalStateException(s"Unknown checkpoint provider: ${o.getClass.getName}") + case cp: CheckpointProvider => + cp + case uninitializedProvider: UninitializedCheckpointProvider => + CheckpointProvider(spark, this, checksumOpt, uninitializedProvider) + case o => + throw new IllegalStateException(s"Unknown checkpoint provider: ${o.getClass.getName}") } def redactedPath: String = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 9c7e2ff9928..28a86e4bb24 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -813,6 +813,14 @@ trait SnapshotManagement { self: DeltaLog => } object SnapshotManagement { + // A thread pool for reading checkpoint files and collecting checkpoint v2 actions like + // checkpointMetadata, sidecarFiles. + private[delta] lazy val checkpointV2ThreadPool = { + val numThreads = SparkSession.active.sessionState.conf.getConf( + DeltaSQLConf.CHECKPOINT_V2_DRIVER_THREADPOOL_PARALLELISM) + DeltaThreadPool("checkpointV2-threadpool", numThreads) + } + protected[delta] lazy val deltaLogAsyncUpdateThreadPool = { val tpe = ThreadUtils.newDaemonCachedThreadPool("delta-state-update", 8) new DeltaThreadPool(tpe) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala index fcaa9d46fd7..f576c05d2fc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala @@ -59,6 +59,7 @@ class InMemoryLogReplay( domainMetadatas.remove(a.domain) case a: DomainMetadata if !a.removed => domainMetadatas(a.domain) = a + case _: CheckpointOnlyAction => // Ignore this while doing LogReplay case a: Metadata => currentMetaData = a case a: Protocol => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 1f62ca45348..3c154809a14 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -576,6 +576,14 @@ trait DeltaSQLConfBase { // Checkpoint V2 Specific Configs //////////////////////////////////// + val CHECKPOINT_V2_DRIVER_THREADPOOL_PARALLELISM = + buildStaticConf("checkpointV2.threadpool.size") + .doc("The size of the threadpool for fetching CheckpointMetadata and SidecarFiles from a" + + " checkpoint.") + .internal() + .intConf + .createWithDefault(32) + val CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT = buildConf("checkpointV2.topLevelFileFormat") .internal() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala index 8f6fda91938..8f85847ab37 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala @@ -20,6 +20,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.delta.{DeltaHistory, DeltaHistoryManager, SerializableFileStatus, SnapshotState} import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, RemoveFile, SingleAction} +import org.apache.spark.sql.delta.actions.{CheckpointMetadata, SidecarFile} import org.apache.spark.sql.delta.commands.convert.ConvertTargetFile import org.apache.spark.sql.delta.sources.IndexedFile @@ -77,6 +78,9 @@ private[delta] trait DeltaEncoders { private lazy val _pmvEncoder = new DeltaEncoder[(Protocol, Metadata, Long)] implicit def pmvEncoder: Encoder[(Protocol, Metadata, Long)] = _pmvEncoder.get + private lazy val _v2CheckpointActionsEncoder = new DeltaEncoder[(CheckpointMetadata, SidecarFile)] + implicit def v2CheckpointActionsEncoder: Encoder[(CheckpointMetadata, SidecarFile)] = + _v2CheckpointActionsEncoder.get private lazy val _serializableFileStatusEncoder = new DeltaEncoder[SerializableFileStatus] implicit def serializableFileStatusEncoder: Encoder[SerializableFileStatus] = _serializableFileStatusEncoder.get diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 38e9863c33d..f4f8084ca7c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -37,9 +37,43 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType +trait CheckpointsSuiteBase extends QueryTest with SharedSparkSession { + def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = { + for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) { + test(s"$testName [v2CheckpointFormat: $checkpointFormat]") { + withSQLConf( + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> checkpointFormat + ) { + f + } + } + } + } + + /** Get V2 [[CheckpointProvider]] from the underlying deltalog snapshot */ + def getV2CheckpointProvider( + deltaLog: DeltaLog, + update: Boolean = true): V2CheckpointProvider = { + val snapshot = if (update) deltaLog.update() else deltaLog.unsafeVolatileSnapshot + snapshot.checkpointProvider match { + case v2CheckpointProvider: V2CheckpointProvider => + v2CheckpointProvider + case lzProvider : LazyCompleteCheckpointProvider + if lzProvider.underlyingCheckpointProvider.isInstanceOf[V2CheckpointProvider] => + lzProvider.underlyingCheckpointProvider.asInstanceOf[V2CheckpointProvider] + case EmptyCheckpointProvider => + throw new Exception("underlying snapshot doesn't have a checkpoint") + case other => + throw new Exception(s"The underlying checkpoint is not a v2 checkpoint. " + + s"It is: ${other.getClass.getName}") + } + } +} -class CheckpointsSuite extends QueryTest - with SharedSparkSession +class CheckpointsSuite + extends CheckpointsSuiteBase + with DeltaCheckpointTestUtils with DeltaSQLCommandTest { protected override def sparkConf = { @@ -72,19 +106,6 @@ class CheckpointsSuite extends QueryTest } } - def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = { - for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) { - test(s"$testName [v2CheckpointFormat: $checkpointFormat]") { - withSQLConf( - DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, - DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> checkpointFormat - ) { - f - } - } - } - } - testDifferentV2Checkpoints("checkpoint metadata - checkpoint schema not persisted in" + " json v2 checkpoints but persisted in parquet v2 checkpoints") { withTempDir { tempDir => @@ -268,6 +289,65 @@ class CheckpointsSuite extends QueryTest } } + testDifferentV2Checkpoints("multipart v2 checkpoint") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + withSQLConf( + DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "10", + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "1") { + // 1 file actions + spark.range(1).repartition(1).write.format("delta").save(path) + val deltaLog = DeltaLog.forTable(spark, path) + + def getNumFilesInSidecarDirectory(): Int = { + val fs = deltaLog.sidecarDirPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + fs.listStatus(deltaLog.sidecarDirPath).size + } + + // 2 file actions, 1 new file + spark.range(1).repartition(1).write.format("delta").mode("append").save(path) + assert(getV2CheckpointProvider(deltaLog).version == 1) + assert(getV2CheckpointProvider(deltaLog).sidecarFileStatuses.size == 1) + assert(getNumFilesInSidecarDirectory() == 1) + + // 11 total file actions, 9 new files + spark.range(30).repartition(9).write.format("delta").mode("append").save(path) + assert(getV2CheckpointProvider(deltaLog).version == 2) + assert(getV2CheckpointProvider(deltaLog).sidecarFileStatuses.size == 2) + assert(getNumFilesInSidecarDirectory() == 3) + + // 20 total actions, 9 new files + spark.range(100).repartition(9).write.format("delta").mode("append").save(path) + assert(getV2CheckpointProvider(deltaLog).version == 3) + assert(getV2CheckpointProvider(deltaLog).sidecarFileStatuses.size == 2) + assert(getNumFilesInSidecarDirectory() == 5) + + // 31 total actions, 11 new files + spark.range(100).repartition(11).write.format("delta").mode("append").save(path) + assert(getV2CheckpointProvider(deltaLog).version == 4) + assert(getV2CheckpointProvider(deltaLog).sidecarFileStatuses.size == 4) + assert(getNumFilesInSidecarDirectory() == 9) + + // Increase max actions + withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "100") { + // 100 total actions, 69 new files + spark.range(1000).repartition(69).write.format("delta").mode("append").save(path) + assert(getV2CheckpointProvider(deltaLog).version == 5) + assert(getV2CheckpointProvider(deltaLog).sidecarFileStatuses.size == 1) + assert(getNumFilesInSidecarDirectory() == 10) + + // 101 total actions, 1 new file + spark.range(1).repartition(1).write.format("delta").mode("append").save(path) + assert(getV2CheckpointProvider(deltaLog).version == 6) + assert(getV2CheckpointProvider(deltaLog).sidecarFileStatuses.size == 2) + assert(getNumFilesInSidecarDirectory() == 12) + } + } + } + } + test("checkpoint does not contain CDC field") { withSQLConf( DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true" @@ -308,6 +388,75 @@ class CheckpointsSuite extends QueryTest } } + testDifferentV2Checkpoints("v2 checkpoint contains only addfile and removefile and" + + " remove file does not contain remove.tags and remove.numRecords") { + withSQLConf( + DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true" + ) { + val expectedCheckpointSchema = Seq("add", "remove") + val expectedRemoveFileSchema = Seq( + "path", + "deletionTimestamp", + "dataChange", + "extendedFileMetadata", + "partitionValues", + "size", + "deletionVector", + "baseRowId", + "defaultRowCommitVersion") + withTempDir { tempDir => + withTempView("src") { + val tablePath = tempDir.getAbsolutePath + // Append rows [0, 9] to table and merge tablePath. + spark.range(end = 10).write.format("delta").mode("overwrite").save(tablePath) + spark.range(5, 15).createOrReplaceTempView("src") + sql( + s""" + |MERGE INTO delta.`$tempDir` t USING src s ON t.id = s.id + |WHEN MATCHED THEN DELETE + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + spark.read.format("delta").load(tempDir.getAbsolutePath), + Seq(0, 1, 2, 3, 4, 10, 11, 12, 13, 14).map { i => Row(i) }) + + // CDC should exist in the log as seen through getChanges, but it shouldn't be in the + // snapshots and the checkpoint file shouldn't have a CDC column. + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile])) + assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null }) + deltaLog.checkpoint() + var sidecarCheckpointFiles = getV2CheckpointProvider(deltaLog).sidecarFileStatuses + assert(sidecarCheckpointFiles.size == 1) + var sidecarFile = sidecarCheckpointFiles.head.getPath.toString + var checkpointSchema = spark.read.format("parquet").load(sidecarFile).schema + var removeSchemaName = + checkpointSchema("remove").dataType.asInstanceOf[StructType].fieldNames + assert(checkpointSchema.fieldNames.toSeq == expectedCheckpointSchema) + assert(removeSchemaName.toSeq === expectedRemoveFileSchema) + + // Append rows [0, 9] to table and merge one more time. + spark.range(end = 10).write.format("delta").mode("append").save(tablePath) + sql( + s""" + |MERGE INTO delta.`$tempDir` t USING src s ON t.id = s.id + |WHEN MATCHED THEN DELETE + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + deltaLog.checkpoint() + sidecarCheckpointFiles = getV2CheckpointProvider(deltaLog).sidecarFileStatuses + sidecarFile = sidecarCheckpointFiles.head.getPath.toString + checkpointSchema = spark.read.format(source = "parquet").load(sidecarFile).schema + removeSchemaName = checkpointSchema("remove").dataType.asInstanceOf[StructType].fieldNames + assert(removeSchemaName.toSeq === expectedRemoveFileSchema) + checkAnswer( + spark.sql(s"select * from delta.`$tablePath`"), + Seq(0, 0, 1, 1, 2, 2, 3, 3, 4, 4).map { i => Row(i) }) + } + } + } + } + test("checkpoint does not contain remove.tags and remove.numRecords") { withTempDir { tempDir => val expectedRemoveFileSchema = Seq( @@ -360,6 +509,7 @@ class CheckpointsSuite extends QueryTest } test("checkpoint with DVs") { + for (v2Checkpoint <- Seq(true, false)) withTempDir { tempDir => val source = new File(DeletionVectorsSuite.table1Path) // this table has DVs in two versions val target = new File(tempDir, "insertTest") @@ -367,6 +517,11 @@ class CheckpointsSuite extends QueryTest // Copy the source2 DV table to a temporary directory, so that we do updates to it FileUtils.copyDirectory(source, target) + if (v2Checkpoint) { + spark.sql(s"ALTER TABLE delta.`${target.getAbsolutePath}` SET TBLPROPERTIES " + + s"('${DeltaConfigs.CHECKPOINT_POLICY.key}' = 'v2')") + } + sql(s"ALTER TABLE delta.`${target.getAbsolutePath}` " + s"SET TBLPROPERTIES (${DeltaConfigs.CHECKPOINT_INTERVAL.key} = 10)") def insertData(data: String): Unit = { @@ -393,6 +548,111 @@ class CheckpointsSuite extends QueryTest (DeletionVectorsSuite.expectedTable1DataV4 ++ newData).toSeq.toDF()) } } + + testDifferentCheckpoints("last checkpoint contains checkpoint schema for v1Checkpoints" + + " and not for v2Checkpoints") { (checkpointPolicy, v2CheckpointFormatOpt) => + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + deltaLog.checkpoint() + val lastCheckpointOpt = deltaLog.readLastCheckpointFile() + assert(lastCheckpointOpt.nonEmpty) + if (checkpointPolicy.needsV2CheckpointSupport) { + if (v2CheckpointFormatOpt.contains(V2Checkpoint.Format.JSON)) { + assert(lastCheckpointOpt.get.checkpointSchema.isEmpty) + } else { + assert(lastCheckpointOpt.get.checkpointSchema.nonEmpty) + assert(lastCheckpointOpt.get.checkpointSchema.get.fieldNames.toSeq === + Seq("txn", "add", "remove", "metaData", "protocol", + "domainMetadata", "checkpointMetadata", "sidecar")) + } + } else { + assert(lastCheckpointOpt.get.checkpointSchema.nonEmpty) + assert(lastCheckpointOpt.get.checkpointSchema.get.fieldNames.toSeq === + Seq("txn", "add", "remove", "metaData", "protocol", "domainMetadata")) + } + } + } + + test("last checkpoint - v2 checkpoint fields threshold") { + withTempDir { tempDir => + val tablePath = tempDir.getAbsolutePath + spark.range(1).write.format("delta").save(tablePath) + val deltaLog = DeltaLog.forTable(spark, tablePath) + // Enable v2Checkpoint table feature. + spark.sql(s"ALTER TABLE delta.`$tablePath` SET TBLPROPERTIES " + + s"('${DeltaConfigs.CHECKPOINT_POLICY.key}' = 'v2')") + + def writeCheckpoint( + adds: Int, + nonFileActionThreshold: Int, + sidecarActionThreshold: Int): LastCheckpointInfo = { + withSQLConf( + DeltaSQLConf.LAST_CHECKPOINT_NON_FILE_ACTIONS_THRESHOLD.key -> s"$nonFileActionThreshold", + DeltaSQLConf.LAST_CHECKPOINT_SIDECARS_THRESHOLD.key -> s"$sidecarActionThreshold" + ) { + val addFiles = (1 to adds).map(_ => + AddFile( + path = java.util.UUID.randomUUID.toString, + partitionValues = Map(), + size = 128L, + modificationTime = 1L, + dataChange = true + )) + deltaLog.startTransaction().commit(addFiles, DeltaOperations.ManualUpdate) + deltaLog.checkpoint() + } + val lastCheckpointInfoOpt = deltaLog.readLastCheckpointFile() + assert(lastCheckpointInfoOpt.nonEmpty) + lastCheckpointInfoOpt.get + } + + // Append 1 AddFile [AddFile-2] + val lc1 = writeCheckpoint(adds = 1, nonFileActionThreshold = 10, sidecarActionThreshold = 10) + assert(lc1.v2Checkpoint.nonEmpty) + // 3 non file actions - protocol/metadata/checkpointMetadata, 1 sidecar + assert(lc1.v2Checkpoint.get.nonFileActions.get.size === 3) + assert(lc1.v2Checkpoint.get.sidecarFiles.get.size === 1) + + // Append 1 SetTxn, 8 more AddFiles [SetTxn-1, AddFile-10] + deltaLog.startTransaction() + .commit(Seq(SetTransaction("app-1", 2, None)), DeltaOperations.ManualUpdate) + val lc2 = writeCheckpoint(adds = 8, nonFileActionThreshold = 4, sidecarActionThreshold = 10) + assert(lc2.v2Checkpoint.nonEmpty) + // 4 non file actions - protocol/metadata/checkpointMetadata/setTxn, 1 sidecar + assert(lc2.v2Checkpoint.get.nonFileActions.get.size === 4) + assert(lc2.v2Checkpoint.get.sidecarFiles.get.size === 1) + + // Append 10 more AddFiles [SetTxn-1, AddFile-20] + val lc3 = writeCheckpoint(adds = 10, nonFileActionThreshold = 3, sidecarActionThreshold = 10) + assert(lc3.v2Checkpoint.nonEmpty) + // non-file actions exceeded threshold, 1 sidecar + assert(lc3.v2Checkpoint.get.nonFileActions.isEmpty) + assert(lc3.v2Checkpoint.get.sidecarFiles.get.size === 1) + + withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "5") { + // Append 10 more AddFiles [SetTxn-1, AddFile-30] + val lc4 = + writeCheckpoint(adds = 10, nonFileActionThreshold = 3, sidecarActionThreshold = 10) + assert(lc4.v2Checkpoint.nonEmpty) + // non-file actions exceeded threshold + // total 30 file actions, across 6 sidecar files (5 actions per file) + assert(lc4.v2Checkpoint.get.nonFileActions.isEmpty) + assert(lc4.v2Checkpoint.get.sidecarFiles.get.size === 6) + } + + withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "2") { + // Append 0 AddFiles [SetTxn-1, AddFile-30] + val lc5 = + writeCheckpoint(adds = 0, nonFileActionThreshold = 10, sidecarActionThreshold = 10) + assert(lc5.v2Checkpoint.nonEmpty) + // 4 non file actions - protocol/metadata/checkpointMetadata/setTxn + // total 30 file actions, across 15 sidecar files (2 actions per file) + assert(lc5.v2Checkpoint.get.nonFileActions.get.size === 4) + assert(lc5.v2Checkpoint.get.sidecarFiles.isEmpty) + } + } + } } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index feb95959bec..d7be7f58fed 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -26,12 +26,14 @@ import scala.util.matching.Regex import org.apache.spark.sql.delta.DeltaTestUtils.Plans import org.apache.spark.sql.delta.actions.{AddFile, Protocol} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import io.delta.tables.{DeltaTable => IODeltaTable} import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkContext +import org.apache.spark.SparkFunSuite import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd, SparkListenerJobStart} import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -225,6 +227,29 @@ trait DeltaTestUtilsBase { } } +trait DeltaCheckpointTestUtils + extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession => + + def testDifferentCheckpoints(testName: String) + (f: (CheckpointPolicy.Policy, Option[V2Checkpoint.Format]) => Unit): Unit = { + test(s"$testName [Checkpoint V1]") { + withSQLConf(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> + CheckpointPolicy.Classic.name) { + f(CheckpointPolicy.Classic, None) + } + } + for (checkpointFormat <- V2Checkpoint.Format.ALL) + test(s"$testName [Checkpoint V2, format: ${checkpointFormat.name}]") { + withSQLConf( + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> checkpointFormat.name + ) { + f(CheckpointPolicy.V2, Some(checkpointFormat)) + } + } + } +} + object DeltaTestUtils extends DeltaTestUtilsBase { sealed trait TableIdentifierOrPath diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala b/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala index 54d26b68a8e..739a8104cf5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path import org.scalatest.GivenWhenThen // scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.TableIdentifier @@ -2124,3 +2125,25 @@ class DataSkippingDeltaV1NameColumnMappingSuite with DataSkippingDeltaTestV1ColumnMappingMode { override protected def runAllTests: Boolean = true } + +class DataSkippingDeltaV1JsonCheckpointV2Suite extends DataSkippingDeltaV1Suite { + override def sparkConf: SparkConf = { + super.sparkConf.setAll( + Seq( + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> V2Checkpoint.Format.JSON.name + ) + ) + } +} + +class DataSkippingDeltaV1ParquetCheckpointV2Suite extends DataSkippingDeltaV1Suite { + override def sparkConf: SparkConf = { + super.sparkConf.setAll( + Seq( + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> V2Checkpoint.Format.PARQUET.name + ) + ) + } +}