-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK] Managed Commit support for cold and hot snapshot update #2755
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice start!
I especially like that the logic backing getSnapshotAtInit
and updateInternal
has been unified. The redundancy was annoying.
spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Show resolved
Hide resolved
// If the commit store has changed, we need to recursively invoke updateSnapshot so that we | ||
// could get the latest commits from the new commit store. | ||
while (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != commitStoreUsed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding -- it seems like we need this loop for (at least) three reasons?
- On cold read, we have no way to guess the table's commit owner. But if the table has a commit owner, we can learn that from the snapshot we created. This seems like a non-trivial overhead for cold reads of managed-commit tables?
- On warm read, it's possible the table's commit owner changed since we last updated the snapshot. If so, the first update attempt will only get the last commit by the older commit owner (which points to the new commit owner). Thus, we need to contact the new commit owner if we want the latest snapshot. This should be an exceedingly rare scenario, because we don't expect O(1) commit owner changes during the entire lifetime of any one table?
- The commit owner could technically change even while the loop is running, tho this should be a vanishingly rare scenario.
Thus, the loop ensures we always get the latest snapshot (not merely the latest backfilled snapshot, or the latest snapshot some older commit owner knew about) -- even if there were multiple commit ownership changes after the last backfilled commit?
We don't actually need to capture case 3/, because the ownership change commits must have arrived after our snapshot update process started, and we still have a linearizable result even if we don't see them.
As for cases 1/ and 2/, I believe we could change the while
to if
IFF the managed commit spec requires atomic backfill of all commits that change ownership. Currently the RFC requires atomic backfill for both FS -> owned and owned -> FS cases, and doesn't say anything about transferring a table from one owner to another. Seems like we should update the spec to either forbid such direct ownership changes, or else define whether they require backfill or not?
(all of this is probably academic, tho -- it seems highly unlikely a table could change ownership a second time before the first ownership change backfills)
isAsync: Boolean): Snapshot = { | ||
segmentOpt.map { segment => | ||
if (segment == previousSnapshot.logSegment) { | ||
if (previousSnapshotOpt.exists(_.logSegment == segment)) { | ||
// If no changes were detected, just refresh the timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale comment? The timestamp manipulation code here was deleted?
} | ||
currentSnapshot.snapshot | ||
} | ||
|
||
/** Replace the given snapshot with the provided one. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale comment, now that we added the keep-same-snapshot semantics?
// There are two table owners: CS1 and CS2 both of which are pointing to same underlying | ||
// in-memory implementation. | ||
// 1. Make 3 commits on the table with CS1 as owner. | ||
// 2. Modify the content of commit-2. Change the table owner from 1 to 2 as part of it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, the RFC doesn't say anything about transferring a table between commit owners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a section to RFC around Transferring commit ownership.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RFC already talks about FS -> MC and MC -> FS. The same could be combined to change the owners.
i.e. MC1 -> FS -> MC2.
This is better than directly allowing MC1 -> MC2 as that would need us to define another protocol which the owners have to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a bunch of discussion offline, it turns out direct ownership transfers are really messy, and hard to do securely, because they require two commit owners to coordinate in some ad-hoc way. At this point it looks better to keep the RFC spec as-is:
- If the Delta client wants to propose an owner for a FS table, they must send a commit request to the proposed owner; if the owner agrees to take over the table, they arrange for a direct-backfilled commit that makes the owner change.
- If the Delta client wants to remove the owner of a table (making it FS based), they must send one last commit request to the current owner; if the owner agrees to disown the table, they arrange for a direct-backfilled commit that makes the owner change.
The direct-backfilled commit may be done client-side by the owner's commit store, or server-side by the owner itself (that's up to the owner to decide).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are in agreement about the above, we should update this test (and possibly others as well) to follow spec?
// 4. Write commit 3/4 using new commit owner. | ||
// 5. Read the table again and make sure right APIs are called: | ||
// a) If read query is run in scala, we do listing 2 times. So CS2.getCommits will be called | ||
// twice. We should not be contacting CS1 anymore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we expect backfill somewhere along the way? But when does that occur, and wouldn't commit-2 and its backfilled counterpart disagree after we modify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Actually, batch size 10 means we don't backfill anything, and instead rely on CS2 to know about commits performed by CS1, and also rely on warm start snapshot update to start directly from CS2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, batch size 10 means we don't backfill anything, and instead rely on CS2 to know about commits performed by CS1, and also rely on warm start snapshot update to start directly from CS2
Yes - this is correct. Even if we add a constrain in RFC that commits must be backfilled on ownership change, still our delta-spark code could recursively update snapshots across different owners even when the commits where ownership changes are not backfilled. This test case is testing the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if we add a constrain in RFC that commits must be backfilled on ownership change, still our delta-spark code could recursively update snapshots across different owners even when the commits where ownership changes are not backfilled. This test case is testing the same.
Why would we need to validate that scenario, once the spec forbids it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That scenario has been removed/simplified now to just test what spec allows.
Seq(2).toDF.write.format("delta").mode("append").save(tablePath) // version 2 | ||
DeltaLog.clearCache() | ||
checkAnswer(sql(s"SELECT * FROM delta.`$tablePath`"), Seq(Row(0), Row(1), Row(2))) | ||
def deltaLog(): DeltaLog = DeltaLog.forTable(spark, tablePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see one call to this method (L350 below) and also see code reusing log
. Maybe this method isn't needed?
// and create a snapshot out of it. Then it will contact cs2 and fail. So deltaLog.update() | ||
// won't succeed and throw exception. But underlying DeltaLog object now has reference to v3. | ||
// The recorded timestamp for this must be commit timestamp and not clock timestamp. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How could DeltaLog
instance have a reference to v3, when getUpdatedSnapshot
only installs the new snapshot until after the commit-owner-changed loop exits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description here is out-of-sync from the actual logic. Fixing this.
.commitImpl(logStore, hadoopConf, logPath, commitVersion, commitFile, commitTimestamp) | ||
} | ||
|
||
override protected[delta] def registerBackfill( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could simplify the override since this class is anyway scoped to this specific test case?
override protected[delta] def registerBackfill( | |
override def registerBackfill( |
) | ||
// If the commit store has changed, we need to recursively invoke updateSnapshot so that we | ||
// could get the latest commits from the new commit store. | ||
while (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != commitStoreUsed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What commit owner does the initial snapshot use? If it's the default commit owner, attempting an update
would fail because the default owner doesn't know about this (not yet existing) table? Seems like we need to force commit owner to None for initial snapshot, and let commit 0 install a commit owner if it wishes?
ba04d7f
to
e29b33b
Compare
0621fd8
to
b0b8bb1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Just needs some unit tests cleanup and we should be good to merge!
// There are two table owners: CS1 and CS2 both of which are pointing to same underlying | ||
// in-memory implementation. | ||
// 1. Make 3 commits on the table with CS1 as owner. | ||
// 2. Modify the content of commit-2. Change the table owner from 1 to 2 as part of it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a bunch of discussion offline, it turns out direct ownership transfers are really messy, and hard to do securely, because they require two commit owners to coordinate in some ad-hoc way. At this point it looks better to keep the RFC spec as-is:
- If the Delta client wants to propose an owner for a FS table, they must send a commit request to the proposed owner; if the owner agrees to take over the table, they arrange for a direct-backfilled commit that makes the owner change.
- If the Delta client wants to remove the owner of a table (making it FS based), they must send one last commit request to the current owner; if the owner agrees to disown the table, they arrange for a direct-backfilled commit that makes the owner change.
The direct-backfilled commit may be done client-side by the owner's commit store, or server-side by the owner itself (that's up to the owner to decide).
// There are two table owners: CS1 and CS2 both of which are pointing to same underlying | ||
// in-memory implementation. | ||
// 1. Make 3 commits on the table with CS1 as owner. | ||
// 2. Modify the content of commit-2. Change the table owner from 1 to 2 as part of it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are in agreement about the above, we should update this test (and possibly others as well) to follow spec?
@@ -231,11 +217,11 @@ trait SnapshotManagement { self: DeltaLog => | |||
* @return Some LogSegment to build a Snapshot if files do exist after the given | |||
* startCheckpoint. None, if the directory was missing or empty. | |||
*/ | |||
protected def getLogSegmentForVersion( | |||
protected def createLogSegment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: This is a welcome change... the old name never really did make much sense.
@@ -253,6 +239,12 @@ trait SnapshotManagement { self: DeltaLog => | |||
) | |||
} | |||
|
|||
private def createLogSegment(previousSnapshot: Snapshot): Option[LogSegment] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a convenience overload to create a new log segment, using the previous snapshot as starting point, right? Otherwise nothing special?
(maybe worth a quick doc comment)
@@ -498,30 +490,20 @@ trait SnapshotManagement { self: DeltaLog => | |||
* file as a hint on where to start listing the transaction log directory. If the _delta_log | |||
* directory doesn't exist, this method will return an `InitialSnapshot`. | |||
*/ | |||
protected def getSnapshotAtInit: CapturedSnapshot = { | |||
protected def getSnapshotAtInit: CapturedSnapshot = withSnapshotLockInterruptibly { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I'm understanding the new flow correctly:
- At construction time,
getSnapshotAtInit
:- calls
createLogSegment
with last checkpoint (if any) as the starting point - passes the resulting log segment to
getUpdatedSnapshot
(which handles commit owner changes) - directly returns the resulting
CapturedSnapshot
so it can be assigned
- calls
- At update time,
updateInternal
:- calls
createLogSegment
with current snapshot as the starting point - passes the resulting log segment to
getUpdatedSnapshot
(which handles commit owner changes) - calls
installSnapshot
to update or replace the captured snapshot as needed.
- calls
Nice. Much cleaner than the previous flow, which was creating log segments in different ways for init vs. update paths. Good-bye, createSnapshotAtInitInternal
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nicely summarized.
f4b2779
to
271c1a4
Compare
Which Delta project/connector is this regarding?
Description
This PR adds support for cold and hot snapshot update for managed-commits.
How was this patch tested?
UTs
Does this PR introduce any user-facing changes?
No