Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Define OptimisticTransaction.catalogTable #2083

Conversation

ryan-johnson-databricks
Copy link
Collaborator

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

As part of implementing #2052, OptimisticTransaction needs the ability to track a CatalogTable for the table it updates. That way, post-commit hooks can reliably identify catalog-based tables and make appropriate catalog calls in response to table changes.

For now, we just define the new field, and add a new catalog-aware overload of DeltaLog.startTransaction that leverages it. Future work will start updating call sites to actually pass catalog information when starting a transaction.

How was this patch tested?

The new field is currently not used, so nothing really to test.
Existing unit tests verify the existing overloads are not broken by the change.

Does this PR introduce any user-facing changes?

No

@ryan-johnson-databricks
Copy link
Collaborator Author

ryan-johnson-databricks commented Sep 19, 2023

Flaky flink test in Delta Spark Tests / test (2.12.15)...

[error] Test io.delta.flink.sink.DeltaSinkStreamingExecutionITCase.testCheckpointLikeASavepointRecovery failed: org.opentest4j.AssertionFailedError: Seems there was a duplicated AddFile in Delta log, took 28.436s
[error]     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[error]     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[error]     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[error]     at io.delta.flink.utils.DeltaTableAsserts$DeltaLogAsserter.hasNoDuplicateAddFiles(DeltaTableAsserts.java:124)
[error]     at io.delta.flink.sink.DeltaSinkStreamingExecutionITCase.testCheckpointLikeASavepointRecovery(DeltaSinkStreamingExecutionITCase.java:388)
[error]     ...
[info] Test io.delta.flink.sink.DeltaSinkStreamingExecutionITCase#shouldResumeSink_savepointNoDrainState() ignored: This test is flaky, for some runs it fails reporting duplicated piles committed into the delta log. We should investigate if this is issue with test of Delta Connector.

*/
def startTransaction(): OptimisticTransaction = startTransaction(None)
def startTransaction(
catalogTableOpt: Option[CatalogTable],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is already merged, but DeltaLog has a forTable overload that accepts CatalogTable, couldnt we store it in a DeltaLog field?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question!

DeltaLog is shared between all threads/sessions on the cluster, and is also a cached object that can go away at any time. Further, one physical Delta table can be registered multiple times with multiple catalogs, e.g. if two users create different external table entries for the same path. So if we rely on the DeltaLog to carry the correct catalog entry, we risk finding somebody else's catalog entry (because they updated it more recently than we did), or even not finding one at all (because of a cache eviction since we last looked). The latter is especially a risk for some of the older V1-style commands, that repeatedly lookup the DeltaLog based on path, instead of remembering the original catalog lookup. Because of all that, I went with OptimisticTransaction as safer, because it's session-private. From there, we just have to plumb through the catalog table that was found during analysis -- usually from a DeltaTableV2 or LogicalRelation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants