-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Support external DSV2 catalog in Vacuum command #2039
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Support external DSV2 catalog in Vacuum command. After the changes, the restore command supports tables from external DSV2 catalog.
"... the vacuum command supports... " ?
} else { | ||
throw DeltaErrors.missingTableIdentifierException("VACUUM") | ||
} | ||
val pathToVacuum = getDeltaTable(child, "VACUUM").path | ||
val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToVacuum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to find table root any more with this change? If the child
pointed to a subdirectory of the table, I would have expected an AnalysisException before now. Query resolution would not have been able to turn UnresolvedDeltaPathOrIdentifier
into a Delta table (because no _delta_log
directory present in the expected location).
If we really need to support triggering VACUUM for a table by pointing at any subdirectory of that table (as the current code does), then we'd have to somehow delay the table resolution until this point so we can findDeltaTableRoot
. But allowing users to specify subdirectories, as if they were the table itself, seems more like a bug than a feature, to be honest.
And actually, L60 below seems to corroborate that subdirectories aren't supported that way, because it blows up if the found root path mismatches the given table path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I removed the baseDeltaPath check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we should probably double check the existing behavior first -- if vacuuming a subdirectory was supported before, and our changes here would block it, then that's a breaking change and we need to proceed very carefully. I think it hinges off this code:
val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToVacuum)
if (baseDeltaPath.isDefined) {
if (baseDeltaPath.get != pathToVacuum) {
throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get)
}
}
If I'm not mistaken, it requires the given path to be the actual table path, which means the proposed change is not a breaking change. Even if findDeltaTableRoot
were to find the table, starting from a subdirectory, the result would fail the equality check immediately after.
spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala
Outdated
Show resolved
Hide resolved
case class ResolveDeltaPathTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { | ||
|
||
private def maybeSQLFile(u: UnresolvedTable): Boolean = { | ||
sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to copy ideas from ResolveSQLOnFile from spark? Is there a reason we can't leverage that here, and let DeltaDataSource.getTable
produce the DeltaTableV2
we need?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, UnresolvedTable
!= UnresolvedRelation
, and it looks like the data source code uses UnresolvedRelation
while UnresolvedPathBasedDeltaTable
usesUnresolvedTable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- in the parser, this PR uses
UnresolvedDeltaPathOrIdentifier
and it will produceUnresolvedTable
on table identifiers (including the file path table delta.path
) - If we create
UnresolvedRelation
as the child ofVacuumTableCommand
, the resolved relation from Apache Spark will be a Parquet data source relation. There is some issue with my debugger and I haven't figured out the reason. - In the analyzer rule
ResolveRelations
, bothUnresolvedTable
andUnresolvedRelation
are processed.UnresolvedTable
always result inResolvedTable
, whileUnresolvedRelation
results inSubqueryAlias
with various nodes. I think usingUnresolvedTable
is simpler here. Any reason why we should useUnresolvedRelation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using
UnresolvedTable
is simpler here. Any reason why we should useUnresolvedRelation
?
Yeah, UnresolvedRelation
only makes sense if it allows us to reuse existing machinery in some way. But:
resolved relation from Apache Spark will be a Parquet data source relation
That's... awkward. Tho I've noticed that the file index for Delta is parquet source because that's the physical file format Delta reads. Is there no trace of Delta in the resulting scan node, tho?
val deltaLog = DeltaLog.forTable(sparkSession, pathToVacuum) | ||
if (!deltaLog.tableExists) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we no longer have to search subdirectories, we don't need the DeltaLog.forTable
call any more:
val deltaTable = getDeltaTable(child, "VACUUM")
if (!deltaTable.tableExists) {
throw DeltaErrors.notADeltaTableException(
"VACUUM",
DeltaTableIdentifier(path = Some(deltaTable.path.toString)))
}
VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours).collect()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This failed the following test:
test("vacuum for a partition path") {
withEnvironment { (tempDir, _) =>
import testImplicits._
val path = tempDir.getCanonicalPath
Seq((1, "a"), (2, "b")).toDF("v1", "v2")
.write
.format("delta")
.partitionBy("v2")
.save(path)
val ex = intercept[AnalysisException] {
sql(s"vacuum '$path/v2=a' retain 0 hours")
}
assert(ex.getMessage.contains(
s"`$path/v2=a` is not a Delta table. VACUUM is only supported for Delta tables."))
}
}
There is no AnalysisException thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the code for checking deltaLog back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be resolved in the latest code
spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala
Outdated
Show resolved
Hide resolved
@@ -498,8 +498,7 @@ class DeltaVacuumSuite | |||
val e = intercept[AnalysisException] { | |||
vacuumSQLTest(tablePath, viewName) | |||
} | |||
assert(e.getMessage.contains("not found") || | |||
e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND")) | |||
assert(e.getMessage.contains("v is a temp view. 'VACUUM' expects a table.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message here is improved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we be checking for an error class, rather than specific strings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def expectTableNotViewError(
nameParts: Seq[String],
isTemp: Boolean,
cmd: String,
mismatchHint: Option[String],
t: TreeNode[_]): Throwable = {
val viewStr = if (isTemp) "temp view" else "view"
val hintStr = mismatchHint.map(" " + _).getOrElse("")
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1013",
messageParameters = Map(
"nameParts" -> nameParts.quoted,
"viewStr" -> viewStr,
"cmd" -> cmd,
"hintStr" -> hintStr),
origin = t.origin)
}
The error class from Spark is a temporary one and it won't be displayed. We can check it after it is assigned to a delegated name.
… consistency with existing code
The test failure is about Flink. Should not be related to the code changes. |
Which Delta project/connector is this regarding?
Description
Support external DSV2 catalog in Vacuum command. After the changes, the vacuum command supports tables from external DSV2 catalog.
For example, with
We can query
Or simply
This PR also introduce a new analyzer rule ResolveDeltaPathTable so that external DSV2 catalogs won't need to implement the resolution of delta file path table.
How was this patch tested?
Does this PR introduce any user-facing changes?
Yes, users can use Vacuum command on the tables of their external DSV2 catalogs.