Skip to content

Commit

Permalink
[Spark] Rename DeltaSource.isStartingVersion to DeltaSource.isInitial…
Browse files Browse the repository at this point in the history
…Snapshot

`DeltaSourceOffset.isStartingVersion` means "is this offset part of an initial snapshot", which is the exact opposite of `"startingVersion"` which is the user specified option that means "no initial snapshot, just changes starting at this version". This PR renames `isStartingVersion` to `isInitialSnapshot`, keeping the serialized name as "isStartingVersion".

Closes delta-io#2139

GitOrigin-RevId: edcb79d942c597d5b4fd61820624951a717891a3
  • Loading branch information
bart-samwel authored and xupefei committed Oct 31, 2023
1 parent c21d491 commit 2546502
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,18 +260,17 @@ trait DeltaSourceBase extends Source
protected def getFileChangesWithRateLimit(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean,
limits: Option[AdmissionLimits] = Some(AdmissionLimits())):
ClosableIterator[IndexedFile] = {
isInitialSnapshot: Boolean,
limits: Option[AdmissionLimits] = Some(AdmissionLimits())): ClosableIterator[IndexedFile] = {
val iter = if (options.readChangeFeed) {
// In this CDC use case, we need to consider RemoveFile and AddCDCFiles when getting the
// offset.

// This method is only used to get the offset so we need to return an iterator of IndexedFile.
getFileChangesForCDC(fromVersion, fromIndex, isStartingVersion, limits, None).flatMap(_._2)
getFileChangesForCDC(fromVersion, fromIndex, isInitialSnapshot, limits, None).flatMap(_._2)
.toClosable
} else {
val changes = getFileChanges(fromVersion, fromIndex, isStartingVersion)
val changes = getFileChanges(fromVersion, fromIndex, isInitialSnapshot)

// Take each change until we've seen the configured number of addFiles. Some changes don't
// represent file additions; we retain them for offset tracking, but they don't count towards
Expand All @@ -295,22 +294,22 @@ trait DeltaSourceBase extends Source
* get the changes from startVersion, startIndex to the end
* @param startVersion - calculated starting version
* @param startIndex - calculated starting index
* @param isStartingVersion - whether the stream has to return the initial snapshot or not
* @param isInitialSnapshot - whether the stream has to return the initial snapshot or not
* @param endOffset - Offset that signifies the end of the stream.
* @return
*/
protected def getFileChangesAndCreateDataFrame(
startVersion: Long,
startIndex: Long,
isStartingVersion: Boolean,
isInitialSnapshot: Boolean,
endOffset: DeltaSourceOffset): DataFrame = {
if (options.readChangeFeed) {
getCDCFileChangesAndCreateDataFrame(startVersion, startIndex, isStartingVersion, endOffset)
getCDCFileChangesAndCreateDataFrame(startVersion, startIndex, isInitialSnapshot, endOffset)
} else {
val fileActionsIter = getFileChanges(
startVersion,
startIndex,
isStartingVersion,
isInitialSnapshot,
endOffset = Some(endOffset)
)
try {
Expand Down Expand Up @@ -373,12 +372,12 @@ trait DeltaSourceBase extends Source
* called when starting a new stream query.
*
* @param fromVersion The version of the delta table to calculate the offset from.
* @param isStartingVersion Whether the delta version is for the initial snapshot or not.
* @param isInitialSnapshot Whether the delta version is for the initial snapshot or not.
* @param limits Indicates how much data can be processed by a micro batch.
*/
protected def getStartingOffsetFromSpecificDeltaVersion(
fromVersion: Long,
isStartingVersion: Boolean,
isInitialSnapshot: Boolean,
limits: Option[AdmissionLimits]): Option[DeltaSourceOffset] = {
// Initialize schema tracking log if possible, no-op if already initialized
// This is one of the two places can initialize schema tracking.
Expand All @@ -390,7 +389,7 @@ trait DeltaSourceBase extends Source
val changes = getFileChangesWithRateLimit(
fromVersion,
fromIndex = DeltaSourceOffset.BASE_INDEX,
isStartingVersion = isStartingVersion,
isInitialSnapshot = isInitialSnapshot,
limits)

val lastFileChange = DeltaSource.iteratorLast(changes)
Expand All @@ -401,7 +400,7 @@ trait DeltaSourceBase extends Source
// Block latestOffset() from generating an invalid offset by proactively verifying
// incompatible schema changes under column mapping. See more details in the method doc.
checkReadIncompatibleSchemaChangeOnStreamStartOnce(fromVersion)
buildOffsetFromIndexedFile(lastFileChange.get, fromVersion, isStartingVersion)
buildOffsetFromIndexedFile(lastFileChange.get, fromVersion, isInitialSnapshot)
}
}

Expand All @@ -423,7 +422,7 @@ trait DeltaSourceBase extends Source
val changes = getFileChangesWithRateLimit(
previousOffset.reservoirVersion,
previousOffset.index,
previousOffset.isStartingVersion,
previousOffset.isInitialSnapshot,
limits)

val lastFileChange = DeltaSource.iteratorLast(changes)
Expand All @@ -436,7 +435,7 @@ trait DeltaSourceBase extends Source
// method scala doc.
checkReadIncompatibleSchemaChangeOnStreamStartOnce(previousOffset.reservoirVersion)
buildOffsetFromIndexedFile(lastFileChange.get, previousOffset.reservoirVersion,
previousOffset.isStartingVersion)
previousOffset.isInitialSnapshot)
}
}

Expand All @@ -445,12 +444,12 @@ trait DeltaSourceBase extends Source
* version is valid by comparing with previous version.
* @param indexedFile The last indexed file used to build offset from.
* @param version Previous offset reservoir version.
* @param isStartingVersion Whether previous offset is starting version or not.
* @param isInitialSnapshot Whether previous offset is starting version or not.
*/
private def buildOffsetFromIndexedFile(
indexedFile: IndexedFile,
version: Long,
isStartingVersion: Boolean): Option[DeltaSourceOffset] = {
isInitialSnapshot: Boolean): Option[DeltaSourceOffset] = {
val (v, i, isLastFileInVersion) = (indexedFile.version, indexedFile.index, indexedFile.isLast)
assert(v >= version,
s"buildOffsetFromIndexedFile returns an invalid version: $v (expected: >= $version), " +
Expand All @@ -459,18 +458,18 @@ trait DeltaSourceBase extends Source
// If the last file in previous batch is the last file of that version, automatically bump
// to next version to skip accessing that version file altogether.
val offset = if (isLastFileInVersion) {
// isStartingVersion must be false here as we have bumped the version.
// isInitialSnapshot must be false here as we have bumped the version.
Some(DeltaSourceOffset(
tableId,
v + 1,
index = DeltaSourceOffset.BASE_INDEX,
isStartingVersion = false))
isInitialSnapshot = false))
} else {
// isStartingVersion will be true only if previous isStartingVersion is true and the next file
// isInitialSnapshot will be true only if previous isInitialSnapshot is true and the next file
// is still at the same version (i.e v == version).
Some(DeltaSourceOffset(
tableId, v, i,
isStartingVersion = v == version && isStartingVersion
isInitialSnapshot = v == version && isInitialSnapshot
))
}
offset
Expand All @@ -482,10 +481,10 @@ trait DeltaSourceBase extends Source
protected def createDataFrameBetweenOffsets(
startVersion: Long,
startIndex: Long,
isStartingVersion: Boolean,
isInitialSnapshot: Boolean,
startOffsetOption: Option[DeltaSourceOffset],
endOffset: DeltaSourceOffset): DataFrame = {
getFileChangesAndCreateDataFrame(startVersion, startIndex, isStartingVersion, endOffset)
getFileChangesAndCreateDataFrame(startVersion, startIndex, isInitialSnapshot, endOffset)
}

protected def cleanUpSnapshotResources(): Unit = {
Expand Down Expand Up @@ -730,7 +729,7 @@ case class DeltaSource(
protected def getFileChanges(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean,
isInitialSnapshot: Boolean,
endOffset: Option[DeltaSourceOffset] = None,
verifyMetadataAction: Boolean = true
): ClosableIterator[IndexedFile] = {
Expand Down Expand Up @@ -764,7 +763,7 @@ case class DeltaSource(
}
}

var iter = if (isStartingVersion) {
var iter = if (isInitialSnapshot) {
Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy
case 1 => getSnapshotAt(fromVersion).toClosable
case 2 => filterAndIndexDeltaLogs(fromVersion + 1)
Expand Down Expand Up @@ -850,15 +849,15 @@ case class DeltaSource(

private def getStartingOffset(limits: Option[AdmissionLimits]): Option[DeltaSourceOffset] = {

val (version, isStartingVersion) = getStartingVersion match {
val (version, isInitialSnapshot) = getStartingVersion match {
case Some(v) => (v, false)
case None => (snapshotAtSourceInit.version, true)
}
if (version < 0) {
return None
}

getStartingOffsetFromSpecificDeltaVersion(version, isStartingVersion, limits)
getStartingOffsetFromSpecificDeltaVersion(version, isInitialSnapshot, limits)
}

override def getDefaultReadLimit: ReadLimit = {
Expand Down Expand Up @@ -1010,7 +1009,7 @@ case class DeltaSource(
val endOffset = toDeltaSourceOffset(end)
val startDeltaOffsetOption = startOffsetOption.map(toDeltaSourceOffset)

val (startVersion, startIndex, isStartingVersion) =
val (startVersion, startIndex, isInitialSnapshot) =
extractStartingState(startDeltaOffsetOption, endOffset)

if (startOffsetOption.contains(endOffset)) {
Expand Down Expand Up @@ -1052,7 +1051,7 @@ case class DeltaSource(
validateAndInitMetadataLogForPlannedBatchesDuringStreamStart(startVersion, endOffset)

val createdDf = createDataFrameBetweenOffsets(
startVersion, startIndex, isStartingVersion, startDeltaOffsetOption, endOffset)
startVersion, startIndex, isInitialSnapshot, startDeltaOffsetOption, endOffset)

createdDf
}
Expand All @@ -1072,18 +1071,18 @@ case class DeltaSource(
* @param endOffset The end offset for a batch.
* @return (start commit version to scan from,
* start offset index to scan from,
* whether this version is still considered part of initial snapshot)
* whether this version is part of the initial snapshot)
*/
private def extractStartingState(
startOffsetOption: Option[DeltaSourceOffset],
endOffset: DeltaSourceOffset): (Long, Long, Boolean) = {
val (startVersion, startIndex, isStartingVersion) = if (startOffsetOption.isEmpty) {
val (startVersion, startIndex, isInitialSnapshot) = if (startOffsetOption.isEmpty) {
getStartingVersion match {
case Some(v) =>
(v, DeltaSourceOffset.BASE_INDEX, false)

case None =>
if (endOffset.isStartingVersion) {
if (endOffset.isInitialSnapshot) {
(endOffset.reservoirVersion, DeltaSourceOffset.BASE_INDEX, true)
} else {
assert(
Expand All @@ -1098,13 +1097,13 @@ case class DeltaSource(
}
} else {
val startOffset = startOffsetOption.get
if (!startOffset.isStartingVersion) {
if (!startOffset.isInitialSnapshot) {
// unpersist `snapshot` because it won't be used any more.
cleanUpSnapshotResources()
}
(startOffset.reservoirVersion, startOffset.index, startOffset.isStartingVersion)
(startOffset.reservoirVersion, startOffset.index, startOffset.isInitialSnapshot)
}
(startVersion, startIndex, isStartingVersion)
(startVersion, startIndex, isInitialSnapshot)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,17 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
*
* @param startVersion - calculated starting version
* @param startIndex - calculated starting index
* @param isStartingVersion - whether the stream has to return the initial snapshot or not
* @param isInitialSnapshot - whether the stream has to return the initial snapshot or not
* @param endOffset - Offset that signifies the end of the stream.
* @return the DataFrame containing the file changes (AddFile, RemoveFile, AddCDCFile)
*/
protected def getCDCFileChangesAndCreateDataFrame(
startVersion: Long,
startIndex: Long,
isStartingVersion: Boolean,
isInitialSnapshot: Boolean,
endOffset: DeltaSourceOffset): DataFrame = {
val changes: Iterator[(Long, Iterator[IndexedFile])] =
getFileChangesForCDC(startVersion, startIndex, isStartingVersion, None, Some(endOffset))
getFileChangesForCDC(startVersion, startIndex, isInitialSnapshot, None, Some(endOffset))

val groupedFileActions: Iterator[(Long, Seq[FileAction])] =
changes.map { case (v, indexFiles) =>
Expand Down Expand Up @@ -227,7 +227,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
protected def getFileChangesForCDC(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean,
isInitialSnapshot: Boolean,
limits: Option[AdmissionLimits],
endOffset: Option[DeltaSourceOffset],
verifyMetadataAction: Boolean = true): Iterator[(Long, Iterator[IndexedFile])] = {
Expand Down Expand Up @@ -278,7 +278,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
}
}

val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isStartingVersion) {
val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) {
// If we are reading change data from the start of the table we need to
// get the latest snapshot of the table as well.
val snapshot: Iterator[IndexedFile] = getSnapshotAt(fromVersion).map { m =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.sources
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
import org.apache.spark.sql.delta.util.JsonUtils
import com.fasterxml.jackson.annotation.JsonProperty
import org.json4s._
import org.json4s.jackson.JsonMethods.parse

Expand All @@ -40,17 +41,22 @@ import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
* @param index The index in the sequence of AddFiles in this version. Used to
* break large commits into multiple batches. This index is created by
* sorting on modificationTimestamp and path.
* @param isStartingVersion Whether this offset denotes a query that is starting rather than
* processing changes. When starting a new query, we first process
* all data present in the table at the start and then move on to
* processing new data that has arrived.
* @param isInitialSnapshot Whether this offset points into an initial full table snapshot at the
* provided reservoir version rather than into the changes at that version.
* When starting a new query, we first process all data present in the
* table at the start and then move on to processing new data that has
* arrived.
*/
case class DeltaSourceOffset private(
sourceVersion: Long,
reservoirId: String,
reservoirVersion: Long,
index: Long,
isStartingVersion: Boolean
// This was confusingly called "starting version" in earlier versions, even though enabling the
// option "starting version" actually causes this to be disabled. We still have to
// serialize it using the old name for backward compatibility.
@JsonProperty("isStartingVersion")
isInitialSnapshot: Boolean
) extends Offset with Comparable[DeltaSourceOffset] {

import DeltaSourceOffset._
Expand Down Expand Up @@ -132,21 +138,21 @@ object DeltaSourceOffset extends Logging {
* @param reservoirId Table id
* @param reservoirVersion Table commit version
* @param index File action index in the commit version
* @param isStartingVersion Whether this offset is still in initial snapshot
* @param isInitialSnapshot Whether this offset is still in initial snapshot
*/
def apply(
reservoirId: String,
reservoirVersion: Long,
index: Long,
isStartingVersion: Boolean
isInitialSnapshot: Boolean
): DeltaSourceOffset = {
// TODO should we detect `reservoirId` changes when a query is running?
new DeltaSourceOffset(
CURRENT_VERSION,
reservoirId,
reservoirVersion,
index,
isStartingVersion
isInitialSnapshot
)
}

Expand Down Expand Up @@ -177,7 +183,7 @@ object DeltaSourceOffset extends Logging {
o.reservoirId,
o.reservoirVersion,
offsetIndex,
o.isStartingVersion
o.isInitialSnapshot
)
}
}
Expand Down Expand Up @@ -218,9 +224,9 @@ object DeltaSourceOffset extends Logging {
* re-process data and cause data duplication.
*/
def validateOffsets(previousOffset: DeltaSourceOffset, currentOffset: DeltaSourceOffset): Unit = {
if (!previousOffset.isStartingVersion && currentOffset.isStartingVersion) {
if (!previousOffset.isInitialSnapshot && currentOffset.isInitialSnapshot) {
throw new IllegalStateException(
s"Found invalid offsets: 'isStartingVersion' fliped incorrectly. " +
s"Found invalid offsets: 'isInitialSnapshot' fliped incorrectly. " +
s"Previous: $previousOffset, Current: $currentOffset")
}
if (previousOffset.reservoirVersion > currentOffset.reservoirVersion) {
Expand Down
Loading

0 comments on commit 2546502

Please sign in to comment.