Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support external DSV2 catalog in Vacuum command #2039

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}
extensions.injectResolutionRule { session =>
ResolveDeltaPathTable(session)
}
extensions.injectResolutionRule { session =>
new PreprocessTimeTravel(session)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,14 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
* }}}
*/
override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) {
VacuumTableCommand(
val child = UnresolvedDeltaPathOrIdentifier(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
"VACUUM"
)

VacuumTableCommand(
child,
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
Option(ctx.number).map(_.getText.toDouble),
ctx.RUN != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package io.delta.tables.execution

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils}
import org.apache.spark.sql.delta.commands.VacuumCommand
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable
import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand}
import org.apache.spark.sql.types.StringType

/**
Expand All @@ -32,28 +33,15 @@ import org.apache.spark.sql.types.StringType
* }}}
*/
case class VacuumTableCommand(
path: Option[String],
table: Option[TableIdentifier],
override val child: LogicalPlan,
horizonHours: Option[Double],
dryRun: Boolean) extends LeafRunnableCommand {
dryRun: Boolean) extends RunnableCommand with UnaryLike[LogicalPlan]{

override val output: Seq[Attribute] =
Seq(AttributeReference("path", StringType, nullable = true)())

override def run(sparkSession: SparkSession): Seq[Row] = {
val pathToVacuum =
if (path.nonEmpty) {
new Path(path.get)
} else if (table.nonEmpty) {
DeltaTableIdentifier(sparkSession, table.get) match {
case Some(id) if id.path.nonEmpty =>
new Path(id.path.get)
case _ =>
new Path(sparkSession.sessionState.catalog.getTableMetadata(table.get).location)
}
} else {
throw DeltaErrors.missingTableIdentifierException("VACUUM")
}
val pathToVacuum = getDeltaTable(child, "VACUUM").path
val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToVacuum)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to find table root any more with this change? If the child pointed to a subdirectory of the table, I would have expected an AnalysisException before now. Query resolution would not have been able to turn UnresolvedDeltaPathOrIdentifier into a Delta table (because no _delta_log directory present in the expected location).

If we really need to support triggering VACUUM for a table by pointing at any subdirectory of that table (as the current code does), then we'd have to somehow delay the table resolution until this point so we can findDeltaTableRoot. But allowing users to specify subdirectories, as if they were the table itself, seems more like a bug than a feature, to be honest.

And actually, L60 below seems to corroborate that subdirectories aren't supported that way, because it blows up if the found root path mismatches the given table path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I removed the baseDeltaPath check

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we should probably double check the existing behavior first -- if vacuuming a subdirectory was supported before, and our changes here would block it, then that's a breaking change and we need to proceed very carefully. I think it hinges off this code:

    val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToVacuum)
    if (baseDeltaPath.isDefined) {
      if (baseDeltaPath.get != pathToVacuum) {
        throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get)
      }
    }

If I'm not mistaken, it requires the given path to be the actual table path, which means the proposed change is not a breaking change. Even if findDeltaTableRoot were to find the table, starting from a subdirectory, the result would fail the equality check immediately after.

CC @tdas @zsxwing

if (baseDeltaPath.isDefined) {
if (baseDeltaPath.get != pathToVacuum) {
Expand All @@ -68,4 +56,7 @@ case class VacuumTableCommand(
}
VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect()
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
* Replaces [[UnresolvedTable]]s if the plan is for direct query on files.
*/
case class ResolveDeltaPathTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {

private def maybeSQLFile(u: UnresolvedTable): Boolean = {
sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to copy ideas from ResolveSQLOnFile from spark? Is there a reason we can't leverage that here, and let DeltaDataSource.getTable produce the DeltaTableV2 we need?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, UnresolvedTable != UnresolvedRelation, and it looks like the data source code uses UnresolvedRelation while UnresolvedPathBasedDeltaTable usesUnresolvedTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. in the parser, this PR uses UnresolvedDeltaPathOrIdentifier and it will produce UnresolvedTable on table identifiers (including the file path table delta.path)
  2. If we create UnresolvedRelation as the child of VacuumTableCommand, the resolved relation from Apache Spark will be a Parquet data source relation. There is some issue with my debugger and I haven't figured out the reason.
  3. In the analyzer rule ResolveRelations, both UnresolvedTable and UnresolvedRelation are processed. UnresolvedTable always result in ResolvedTable, while UnresolvedRelation results in SubqueryAlias with various nodes. I think using UnresolvedTable is simpler here. Any reason why we should use UnresolvedRelation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using UnresolvedTable is simpler here. Any reason why we should use UnresolvedRelation?

Yeah, UnresolvedRelation only makes sense if it allows us to reuse existing machinery in some way. But:

resolved relation from Apache Spark will be a Parquet data source relation

That's... awkward. Tho I've noticed that the file index for Delta is parquet source because that's the physical file format Delta reads. Is there no trace of Delta in the resulting scan node, tho?

}

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case u: UnresolvedTable if maybeSQLFile(u) =>
val tableId = u.multipartIdentifier.asTableIdentifier
if (DeltaTableUtils.isValidPath(tableId)) {
val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table))
DataSourceV2Relation.create(deltaTableV2, None, Some(u.multipartIdentifier.asIdentifier))
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
} else {
u
}
}
}
21 changes: 15 additions & 6 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
// Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`.
val parser = new DeltaSqlParser(null)
assert(parser.parsePlan("vacuum 123_") ===
VacuumTableCommand(None, Some(TableIdentifier("123_")), None, false))
VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum 1a.123_") ===
VacuumTableCommand(None, Some(TableIdentifier("123_", Some("1a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum a.123A") ===
VacuumTableCommand(None, Some(TableIdentifier("123A", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum a.123E3_column") ===
VacuumTableCommand(None, Some(TableIdentifier("123E3_column", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum a.123D_column") ===
VacuumTableCommand(None, Some(TableIdentifier("123D_column", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM", None),
None, false))
assert(parser.parsePlan("vacuum a.123BD_column") ===
VacuumTableCommand(None, Some(TableIdentifier("123BD_column", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM", None),
None, false))

assert(parser.parsePlan("vacuum delta.`/tmp/table`") ===
VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM", None),
None, false))

assert(parser.parsePlan("vacuum \"/tmp/table\"") ===
VacuumTableCommand(UnresolvedPathBasedDeltaTable("/tmp/table", "VACUUM"), None, false))
}

test("OPTIMIZE command is parsed as expected") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta

import io.delta.tables.execution.VacuumTableCommand
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable

class CustomCatalogSuite extends QueryTest with SharedSparkSession
with DeltaSQLCommandTest {

override def sparkConf: SparkConf =
super.sparkConf.set("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)

private def verifyVacuumPath(query: String, expected: Path): Unit = {
val plan = sql(query).queryExecution.analyzed
assert(plan.isInstanceOf[VacuumTableCommand])
val path = getDeltaTable(plan.asInstanceOf[VacuumTableCommand].child, "VACUUM").path
assert(path == expected)
}

test("Vacuum a table from DummyCatalog") {
val tableName = "vacuum_table"
withTable(tableName) {
sql("SET CATALOG dummy")
val dummyCatalog =
spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog]
sql(f"CREATE TABLE $tableName (id bigint) USING delta")
val tablePath = dummyCatalog.getTablePath(tableName)
verifyVacuumPath(s"VACUUM $tableName", tablePath)
verifyVacuumPath(s"VACUUM delta.`$tablePath`", tablePath)

sql("SET CATALOG spark_catalog")
verifyVacuumPath(s"VACUUM dummy.default.$tableName", tablePath)
}
}
}

class DummyCatalog extends TableCatalog {
private val spark: SparkSession = SparkSession.active
private val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath)
// scalastyle:off deltahadoopconfiguration
private val fs: FileSystem =
tempDir.getFileSystem(spark.sessionState.newHadoopConf())
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
// scalastyle:on deltahadoopconfiguration

override def name: String = "dummy"

def getTablePath(tableName: String): Path = {
new Path(tempDir, tableName)
}
override def defaultNamespace(): Array[String] = Array("default")

override def listTables(namespace: Array[String]): Array[Identifier] = {
val status = fs.listStatus(tempDir)
status.filter(_.isDirectory).map { dir =>
Identifier.of(namespace, dir.getPath.getName)
}
}

override def tableExists(ident: Identifier): Boolean = {
val tablePath = getTablePath(ident.name())
fs.exists(tablePath)
}
override def loadTable(ident: Identifier): Table = {
val tablePath = getTablePath(ident.name())
DeltaTableV2(spark, tablePath)
}

override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val tablePath = getTablePath(ident.name())
// Create an empty Delta table on the tablePath
spark.range(0).write.format("delta").save(tablePath.toString)
DeltaTableV2(spark, tablePath)
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
throw new UnsupportedOperationException("Alter table operation is not supported.")
}

override def dropTable(ident: Identifier): Boolean = {
val tablePath = getTablePath(ident.name())
try {
fs.delete(tablePath, true)
true
} catch {
case _: Exception => false
}
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
throw new UnsupportedOperationException("Rename table operation is not supported.")
}

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
// Initialize tempDir here
if (!fs.exists(tempDir)) {
fs.mkdirs(tempDir)
}
}
}
Loading