Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support external DSV2 catalog in RESTORE & Vacuum command #2036

Closed

Conversation

gengliangwang
Copy link
Contributor

@gengliangwang gengliangwang commented Sep 9, 2023

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Support external DSV2 catalog in RESTORE & Vacuum command. After the changes, the restore command supports tables from external DSV2 catalog.
For example, with

spark.sql.catalog.customer_catalog=org.apache.spark.sql.CustomerCatalog

We can query

SET CATALOG customer_catalog;
RESTORE TABLE t1 VERSION AS OF 0
VACUUM t1

Or simply

RESTORE TABLE customer_catalog.default.t1 VERSION AS OF 0
VACUUM customer_catalog.default.t1

-->

How was this patch tested?

  1. new end-to-end tests
  2. new parser test cases

Does this PR introduce any user-facing changes?

Yes, users can use RESTORE & Vacuum command on the tables of their external DSV2 catalogs.

@gengliangwang
Copy link
Contributor Author

cc @ryan-johnson-databricks

@gengliangwang
Copy link
Contributor Author

gengliangwang commented Sep 9, 2023

Note: this PR is based on #2033.
To make it easier for review, we can merge #2033 first.
With refactoring, this PR contains a narrow waist for resolving the delta identifier: resolveDeltaIdentifier

tt.timestamp,
tt.version,
tt.creationSource))

case ct @ CloneTableStatement(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move the resolution of CloneTableStatement to ResolveDeltaIdentifier as well.

@@ -498,8 +498,8 @@ class DeltaVacuumSuite
val e = intercept[AnalysisException] {
vacuumSQLTest(tablePath, viewName)
}
assert(e.getMessage.contains("not found") ||
e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND"))
assert(e.getMessage.contains("VACUUM is only supported for Delta tables.") ||
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is better now when vacuuming on a view.

val optionalDeltaTableV2 = resolveAsTable(unresolvedId)

// If the identifier is not a Delta table, try to resolve it as a Delta file table.
val deltaTableV2 = optionalDeltaTableV2.getOrElse {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances would resolveAsTable return None for a valid (path-based?) Delta table?
Or, put another way, what circumstances would mean ResolveRelations fails to handle an UnresolvedTable that references a path-based Delta table?

If such circumstances exist, can/should we fix table resolution directly, instead of compensating here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is mentioned in the comment of resolveAsTable. The short answer is that the table identifier doesn't exist in spark catalogs.

  // Resolve the identifier as a table in the Spark catalogs.
  // Return None if the table doesn't exist.
  // If the table exists but is not a Delta table, throw an exception.
  // If the identifier is a view, throw an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If such circumstances exist, can/should we fix table resolution directly, instead of compensating here?

What do you mean by "fix table resolution directly"?
I believe there is a corner case here.
In Spark, You can create a table /tmp/foo under a database delta

use delta;
create table `/tmp/foo` using delta;

So we always have to look up as a catalog table, and try loading it as a path if the table doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

// If the identifier is a view, throw an exception.
private def resolveAsTable(unresolvedId: UnresolvedDeltaIdentifier): Option[DeltaTableV2] = {
val unresolvedTable =
UnresolvedTable(unresolvedId.nameParts, unresolvedId.commandName, None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this an UnresolvedRelation before? Are we changing it to UnresolvedTable intentionally?
If so, why needed, and what are the side effects of that change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, I introduce a new logical plan UnresolvedDeltaIdentifier. I didn't make it UnresolvedRelation because of resolution order matters for the corner case I mentioned above #2036 (comment) .

The code here coverts a UnresolvedDeltaIdentifier to UnresolvedTable and uses the Spark analyzer rule to resolve the relation.

@@ -32,28 +32,14 @@ import org.apache.spark.sql.types.StringType
* }}}
*/
case class VacuumTableCommand(
path: Option[String],
table: Option[TableIdentifier],
pathToVacuum: Path,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this just be a RunnableCommand with UnaryLike, whose child starts out as an UnresolvedDeltaPathOrIdentifier the sql parser creates?
(see e.g. how OPTIMIZE command was recently upgraded to handle this situation -- the sql parser just has to call that helper method)

If we go that route, we won't need the VacuumTableStatement any more.

If it doesn't work for some reason, then I suspect OptimizeCommand will also need whatever fix we come up with in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I took a look at the implementation of VacuumTableCommand, it only cares about the pathToVacuum.
I didn't use UnresolvedDeltaPathOrIdentifier since I introduced UnresolvedDeltaIdentifier in this PR.

@gengliangwang
Copy link
Contributor Author

@ryan-johnson-databricks Thanks for the review. We need to decide whether we need to take care of the corner case I mentioned in the PR comment.
If we do, we need a new unresolved logical plan, otherwise, we can’t ensure the resolution order of table(spark) and path table(delta).
If we don’t, let’s simply try resolving any unresolved relation as path table in Delta. But it can be regression in case someone is using a file path as a table name.…

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants