Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support external DSV2 catalog in Vacuum command #2039

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}
extensions.injectResolutionRule { session =>
ResolveDeltaPathTable(session)
}
extensions.injectResolutionRule { session =>
new PreprocessTimeTravel(session)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package io.delta.tables.execution

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils, UnresolvedDeltaPathOrIdentifier}
import org.apache.spark.sql.delta.commands.VacuumCommand
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable
import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand}
import org.apache.spark.sql.types.StringType

/**
Expand All @@ -32,40 +34,36 @@ import org.apache.spark.sql.types.StringType
* }}}
*/
case class VacuumTableCommand(
path: Option[String],
table: Option[TableIdentifier],
override val child: LogicalPlan,
horizonHours: Option[Double],
dryRun: Boolean) extends LeafRunnableCommand {
dryRun: Boolean) extends RunnableCommand with UnaryLike[LogicalPlan]{

override val output: Seq[Attribute] =
Seq(AttributeReference("path", StringType, nullable = true)())

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)

override def run(sparkSession: SparkSession): Seq[Row] = {
val pathToVacuum =
if (path.nonEmpty) {
new Path(path.get)
} else if (table.nonEmpty) {
DeltaTableIdentifier(sparkSession, table.get) match {
case Some(id) if id.path.nonEmpty =>
new Path(id.path.get)
case _ =>
new Path(sparkSession.sessionState.catalog.getTableMetadata(table.get).location)
}
} else {
throw DeltaErrors.missingTableIdentifierException("VACUUM")
}
val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToVacuum)
if (baseDeltaPath.isDefined) {
if (baseDeltaPath.get != pathToVacuum) {
throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get)
}
}
val deltaLog = DeltaLog.forTable(sparkSession, pathToVacuum)
if (!deltaLog.tableExists) {
val deltaTable = getDeltaTable(child, "VACUUM")
// The VACUUM command is only supported on existing delta tables. If the target table doesn't
// exist or it is based on a partition directory, an exception will be thrown.
if (!deltaTable.tableExists || deltaTable.hasPartitionFilters) {
throw DeltaErrors.notADeltaTableException(
"VACUUM",
DeltaTableIdentifier(path = Some(pathToVacuum.toString)))
DeltaTableIdentifier(path = Some(deltaTable.path.toString)))
}
VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect()
VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours).collect()
}
}

object VacuumTableCommand {
def apply(
path: Option[String],
table: Option[TableIdentifier],
horizonHours: Option[Double],
dryRun: Boolean): VacuumTableCommand = {
val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM")
VacuumTableCommand(child, horizonHours, dryRun)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper}
import org.apache.spark.sql.delta.catalog.DeltaTableV2

/**
* Replaces [[UnresolvedTable]]s if the plan is for direct query on files.
*/
case class ResolveDeltaPathTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {

private def maybeSQLFile(u: UnresolvedTable): Boolean = {
sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to copy ideas from ResolveSQLOnFile from spark? Is there a reason we can't leverage that here, and let DeltaDataSource.getTable produce the DeltaTableV2 we need?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, UnresolvedTable != UnresolvedRelation, and it looks like the data source code uses UnresolvedRelation while UnresolvedPathBasedDeltaTable usesUnresolvedTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. in the parser, this PR uses UnresolvedDeltaPathOrIdentifier and it will produce UnresolvedTable on table identifiers (including the file path table delta.path)
  2. If we create UnresolvedRelation as the child of VacuumTableCommand, the resolved relation from Apache Spark will be a Parquet data source relation. There is some issue with my debugger and I haven't figured out the reason.
  3. In the analyzer rule ResolveRelations, both UnresolvedTable and UnresolvedRelation are processed. UnresolvedTable always result in ResolvedTable, while UnresolvedRelation results in SubqueryAlias with various nodes. I think using UnresolvedTable is simpler here. Any reason why we should use UnresolvedRelation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using UnresolvedTable is simpler here. Any reason why we should use UnresolvedRelation?

Yeah, UnresolvedRelation only makes sense if it allows us to reuse existing machinery in some way. But:

resolved relation from Apache Spark will be a Parquet data source relation

That's... awkward. Tho I've noticed that the file index for Delta is parquet source because that's the physical file format Delta reads. Is there no trace of Delta in the resulting scan node, tho?

}

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case u: UnresolvedTable if maybeSQLFile(u) =>
val tableId = u.multipartIdentifier.asTableIdentifier
if (DeltaTableUtils.isValidPath(tableId)) {
val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table))
val sessionCatalog =
sparkSession.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
ResolvedTable.create(sessionCatalog, u.multipartIdentifier.asIdentifier, deltaTableV2)
} else {
u
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ case class DeltaTableV2(
}
}

def hasPartitionFilters: Boolean = partitionFilters.nonEmpty

// This MUST be initialized before the deltaLog object is created, in order to accurately
// bound the creation time of the table.
Expand Down
21 changes: 15 additions & 6 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
// Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`.
val parser = new DeltaSqlParser(null)
assert(parser.parsePlan("vacuum 123_") ===
VacuumTableCommand(None, Some(TableIdentifier("123_")), None, false))
VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum 1a.123_") ===
VacuumTableCommand(None, Some(TableIdentifier("123_", Some("1a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum a.123A") ===
VacuumTableCommand(None, Some(TableIdentifier("123A", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum a.123E3_column") ===
VacuumTableCommand(None, Some(TableIdentifier("123E3_column", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), None, false))
assert(parser.parsePlan("vacuum a.123D_column") ===
VacuumTableCommand(None, Some(TableIdentifier("123D_column", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM", None),
None, false))
assert(parser.parsePlan("vacuum a.123BD_column") ===
VacuumTableCommand(None, Some(TableIdentifier("123BD_column", Some("a"))), None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM", None),
None, false))

assert(parser.parsePlan("vacuum delta.`/tmp/table`") ===
VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM", None),
None, false))

assert(parser.parsePlan("vacuum \"/tmp/table\"") ===
VacuumTableCommand(UnresolvedPathBasedDeltaTable("/tmp/table", "VACUUM"), None, false))
}

test("OPTIMIZE command is parsed as expected") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta

import io.delta.tables.execution.VacuumTableCommand
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable

class CustomCatalogSuite extends QueryTest with SharedSparkSession
with DeltaSQLCommandTest {

override def sparkConf: SparkConf =
super.sparkConf.set("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)

private def verifyVacuumPath(query: String, expected: Path): Unit = {
val plan = sql(query).queryExecution.analyzed
assert(plan.isInstanceOf[VacuumTableCommand])
val path = getDeltaTable(plan.asInstanceOf[VacuumTableCommand].child, "VACUUM").path
assert(path == expected)
}

test("Vacuum a table from DummyCatalog") {
val tableName = "vacuum_table"
withTable(tableName) {
sql("SET CATALOG dummy")
val dummyCatalog =
spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog]
sql(f"CREATE TABLE $tableName (id bigint) USING delta")
val tablePath = dummyCatalog.getTablePath(tableName)
verifyVacuumPath(s"VACUUM $tableName", tablePath)
verifyVacuumPath(s"VACUUM delta.`$tablePath`", tablePath)

sql("SET CATALOG spark_catalog")
verifyVacuumPath(s"VACUUM dummy.default.$tableName", tablePath)
}
}
}

class DummyCatalog extends TableCatalog {
private val spark: SparkSession = SparkSession.active
private val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath)
// scalastyle:off deltahadoopconfiguration
private val fs: FileSystem = tempDir.getFileSystem(spark.sessionState.newHadoopConf())
// scalastyle:on deltahadoopconfiguration

override def name: String = "dummy"

def getTablePath(tableName: String): Path = {
new Path(tempDir, tableName)
}
override def defaultNamespace(): Array[String] = Array("default")

override def listTables(namespace: Array[String]): Array[Identifier] = {
val status = fs.listStatus(tempDir)
status.filter(_.isDirectory).map { dir =>
Identifier.of(namespace, dir.getPath.getName)
}
}

override def tableExists(ident: Identifier): Boolean = {
val tablePath = getTablePath(ident.name())
fs.exists(tablePath)
}
override def loadTable(ident: Identifier): Table = {
val tablePath = getTablePath(ident.name())
DeltaTableV2(spark, tablePath)
}

override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val tablePath = getTablePath(ident.name())
// Create an empty Delta table on the tablePath
spark.range(0).write.format("delta").save(tablePath.toString)
DeltaTableV2(spark, tablePath)
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
throw new UnsupportedOperationException("Alter table operation is not supported.")
}

override def dropTable(ident: Identifier): Boolean = {
val tablePath = getTablePath(ident.name())
try {
fs.delete(tablePath, true)
true
} catch {
case _: Exception => false
}
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
throw new UnsupportedOperationException("Rename table operation is not supported.")
}

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
// Initialize tempDir here
if (!fs.exists(tempDir)) {
fs.mkdirs(tempDir)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,7 @@ class DeltaVacuumSuite
val e = intercept[AnalysisException] {
vacuumSQLTest(tablePath, viewName)
}
assert(e.getMessage.contains("not found") ||
e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND"))
assert(e.getMessage.contains("v is a temp view. 'VACUUM' expects a table."))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message here is improved.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be checking for an error class, rather than specific strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  def expectTableNotViewError(
      nameParts: Seq[String],
      isTemp: Boolean,
      cmd: String,
      mismatchHint: Option[String],
      t: TreeNode[_]): Throwable = {
    val viewStr = if (isTemp) "temp view" else "view"
    val hintStr = mismatchHint.map(" " + _).getOrElse("")
    new AnalysisException(
      errorClass = "_LEGACY_ERROR_TEMP_1013",
      messageParameters = Map(
        "nameParts" -> nameParts.quoted,
        "viewStr" -> viewStr,
        "cmd" -> cmd,
        "hintStr" -> hintStr),
      origin = t.origin)
  }

The error class from Spark is a temporary one and it won't be displayed. We can check it after it is assigned to a delegated name.

}
}
}
Expand Down Expand Up @@ -788,7 +787,7 @@ class DeltaVacuumSuite
sql(s"vacuum '$path/v2=a' retain 0 hours")
}
assert(ex.getMessage.contains(
s"Please provide the base path ($path) when Vacuuming Delta tables."))
s"`$path/v2=a` is not a Delta table. VACUUM is only supported for Delta tables."))
}
}

Expand Down Expand Up @@ -993,9 +992,7 @@ class DeltaVacuumSuite
val e = intercept[AnalysisException] {
sql(s"vacuum $table")
}
Seq("VACUUM", "only supported for Delta tables").foreach { msg =>
assert(e.getMessage.contains(msg))
}
assert(e.getMessage.contains("is not a Delta table."))
}
}
withTempPath { tempDir =>
Expand Down
Loading