Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Read side changes for V2 Checkpoints #2056

Closed
wants to merge 2 commits into from

Conversation

prakharjain09
Copy link
Collaborator

@prakharjain09 prakharjain09 commented Sep 15, 2023

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Read side changes for v2 checkpoints of json/parquet type.
Design doc: https://docs.google.com/document/d/18D4SiI0_t7ak8sgvyQtH9lBUyGJIp8hyZi2-399uByQ/edit#heading=h.dd2cc57oc5wk

They are given higher priority as compared to multi-part/classic checkpoint -- in case multiple checkpoints exist for same version.

How was this patch tested?

Added UTs

Does this PR introduce any user-facing changes?

No

Comment on lines 30 to +31
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}

uninitializedCheckpointProvider: UninitializedCheckpointProvider)
: CheckpointProvider = uninitializedCheckpointProvider match {
// Note: snapshotDescriptor.protocol should be accessed as late as possible inside the futures
// as it might need I/O.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we access snapshotDescriptor.protocol? I don't see any direct access, so is it just when we require isV2CheckpointEnabled? Or are there other indirect accesses as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have require(isV2CheckpointEnabled(snapshotDescriptor) few lines below. Converting it to require(isV2CheckpointEnabled(snapshotDescriptor.protocol) to make it more explicit.

spark, provider.logPath, provider.fileStatus, snapshotDescriptor.deltaLog.options)
}
new LazyCompleteCheckpointProvider(provider) {
override def createCheckpointProvider(): CheckpointProvider = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I understanding correctly, that creating the lazy checkpoint provider here kicks off a future to start reading the manifest, but we won't wait on the future unless/until somebody calls a complete checkpoint provider method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also adding code comments for this.

new LazyCompleteCheckpointProvider(uninitializedV2CheckpointProvider) {
override def createCheckpointProvider(): CheckpointProvider = {
val (checkpointMetadata, sidecarFiles) =
uninitializedV2CheckpointProvider.finishIOAndGetActions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're leveraging an existing future here, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking if uninitializedV2CheckpointProvider has already a future inside it which is doing I/O? Yes.

}
case provider: UninitializedV1OrV2ParquetCheckpointProvider
if !isV2CheckpointEnabled(checksumOpt).contains(false) =>
// Either v2 checkpoints are enabled, or we lack a Protocol to prove otherwise.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean negation is confusing (that took a while to parse). Can we instead do:

case provider: UninitializedV1OrV2ParquetCheckpointProvider
   if isV2CheckpointEnabled(checksumOpt).contains(false) => 
  // V2 checkpoints are specifically disabled, so it must be V1
  ...
case provider: UninitializedV1OrV2ParquetCheckpointProvider =>
  // We can't tell immediately whether it's V1 or V2, just by looking at the file name
  ...

@@ -119,28 +166,35 @@ case class CheckpointInstance(
* Single part checkpoint.
* 3. For Multi-part [[CheckpointInstance]]s corresponding to same version, the one with more
* parts is greater than the one with less parts.
* 4. For V2 Checkpoints corresponding to same version, we use the fileName as tie breaker.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the semantics of using file name to break ties? Put another way, when would that make a difference? Reaching step 4 of the comparison means that we're dealing with the same version, same format, and same number of parts. What else could differ in the name?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: .json vs. .parquet... tho I'm not sure we actually want to prefer json merely because it lexically precedes parquet?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessarily json vs parquet.
We could have two v2 checkpoints if someone runs this multiple times: deltaLog.checkpoint(deltaLog.getSnapshotAt(version = 44)).

Both are valid v2 checkpoints and any of them can be preferred one over the other. In long run, we can decide to go ahead with parquet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, uuid-named... right

@@ -150,6 +204,12 @@ object CheckpointInstance {
// * <version>.checkpoint.parquet
// * <version>.checkpoint.<i>.<n>.parquet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is missing the v2 file format description? (it says "three formats" but only shows two)

Comment on lines +380 to +385
case cp: CheckpointProvider =>
cp
case uninitializedProvider: UninitializedCheckpointProvider =>
CheckpointProvider(spark, this, checksumOpt, uninitializedProvider)
case o =>
throw new IllegalStateException(s"Unknown checkpoint provider: ${o.getClass.getName}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any particular reason to churn the two existing cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted.

Comment on lines +62 to +64
case lzProvider : LazyCompleteCheckpointProvider
if lzProvider.underlyingCheckpointProvider.isInstanceOf[V2CheckpointProvider] =>
lzProvider.underlyingCheckpointProvider.asInstanceOf[V2CheckpointProvider]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cryptic abbreviations don't really help IMO...

Suggested change
case lzProvider : LazyCompleteCheckpointProvider
if lzProvider.underlyingCheckpointProvider.isInstanceOf[V2CheckpointProvider] =>
lzProvider.underlyingCheckpointProvider.asInstanceOf[V2CheckpointProvider]
case provider : LazyCompleteCheckpointProvider
if provider.underlyingCheckpointProvider.isInstanceOf[V2CheckpointProvider] =>
provider.underlyingCheckpointProvider.asInstanceOf[V2CheckpointProvider]

(meanwhile, the cast is annoying, but I'm not sure it's annoying enough to be worth making an extractor for)

if lzProvider.underlyingCheckpointProvider.isInstanceOf[V2CheckpointProvider] =>
lzProvider.underlyingCheckpointProvider.asInstanceOf[V2CheckpointProvider]
case EmptyCheckpointProvider =>
throw new Exception("underlying snapshot doesn't have a checkpoint")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing a plain Exception seems really strange... IllegalArgumentException or IllegalStateException or even AssertionError would seem better?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants