From 3ec4741acd86346f5564a6b1280212c9c83246ea Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 11 Sep 2023 15:35:48 -0700 Subject: [PATCH 1/9] compilable --- .../sql/DeltaSparkSessionExtension.scala | 3 + .../io/delta/sql/parser/DeltaSqlParser.scala | 7 +- .../tables/execution/VacuumTableCommand.scala | 29 ++-- .../sql/delta/ResolveDeltaPathTable.scala | 47 +++++++ .../sql/parser/DeltaSqlParserSuite.scala | 21 ++- .../spark/sql/delta/CustomCatalogSuite.scala | 128 ++++++++++++++++++ 6 files changed, 209 insertions(+), 26 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala diff --git a/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala b/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala index 1d423068498..3c91a5d97ae 100644 --- a/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala +++ b/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala @@ -78,6 +78,9 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) { extensions.injectParser { (session, parser) => new DeltaSqlParser(parser) } + extensions.injectResolutionRule { session => + ResolveDeltaPathTable(session) + } extensions.injectResolutionRule { session => new PreprocessTimeTravel(session) } diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index 8afa78e726a..def3993d631 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -307,9 +307,14 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { * }}} */ override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) { - VacuumTableCommand( + val child = UnresolvedDeltaPathOrIdentifier( Option(ctx.path).map(string), Option(ctx.table).map(visitTableIdentifier), + "VACUUM" + ) + + VacuumTableCommand( + child, Option(ctx.number).map(_.getText.toDouble), ctx.RUN != null) } diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 6165ee09af2..e34f0b487d7 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -16,13 +16,14 @@ package io.delta.tables.execution -import org.apache.hadoop.fs.Path import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.UnaryLike import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils} import org.apache.spark.sql.delta.commands.VacuumCommand -import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable +import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand} import org.apache.spark.sql.types.StringType /** @@ -32,28 +33,15 @@ import org.apache.spark.sql.types.StringType * }}} */ case class VacuumTableCommand( - path: Option[String], - table: Option[TableIdentifier], + override val child: LogicalPlan, horizonHours: Option[Double], - dryRun: Boolean) extends LeafRunnableCommand { + dryRun: Boolean) extends RunnableCommand with UnaryLike[LogicalPlan]{ override val output: Seq[Attribute] = Seq(AttributeReference("path", StringType, nullable = true)()) override def run(sparkSession: SparkSession): Seq[Row] = { - val pathToVacuum = - if (path.nonEmpty) { - new Path(path.get) - } else if (table.nonEmpty) { - DeltaTableIdentifier(sparkSession, table.get) match { - case Some(id) if id.path.nonEmpty => - new Path(id.path.get) - case _ => - new Path(sparkSession.sessionState.catalog.getTableMetadata(table.get).location) - } - } else { - throw DeltaErrors.missingTableIdentifierException("VACUUM") - } + val pathToVacuum = getDeltaTable(child, "VACUUM").path val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToVacuum) if (baseDeltaPath.isDefined) { if (baseDeltaPath.get != pathToVacuum) { @@ -68,4 +56,7 @@ case class VacuumTableCommand( } VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect() } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(child = newChild) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala new file mode 100644 index 00000000000..050a4d75781 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala @@ -0,0 +1,47 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +/** + * Replaces [[UnresolvedTable]]s if the plan is for direct query on files. + */ +case class ResolveDeltaPathTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + private def maybeSQLFile(u: UnresolvedTable): Boolean = { + sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2 + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case u: UnresolvedTable if maybeSQLFile(u) => + val tableId = u.multipartIdentifier.asTableIdentifier + if (DeltaTableUtils.isValidPath(tableId)) { + val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table)) + DataSourceV2Relation.create(deltaTableV2, None, Some(u.multipartIdentifier.asIdentifier)) + } else { + u + } + } +} diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 860065a9ec9..90090d452f2 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -35,17 +35,26 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`. val parser = new DeltaSqlParser(null) assert(parser.parsePlan("vacuum 123_") === - VacuumTableCommand(None, Some(TableIdentifier("123_")), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, false)) assert(parser.parsePlan("vacuum 1a.123_") === - VacuumTableCommand(None, Some(TableIdentifier("123_", Some("1a"))), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), None, false)) assert(parser.parsePlan("vacuum a.123A") === - VacuumTableCommand(None, Some(TableIdentifier("123A", Some("a"))), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), None, false)) assert(parser.parsePlan("vacuum a.123E3_column") === - VacuumTableCommand(None, Some(TableIdentifier("123E3_column", Some("a"))), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), None, false)) assert(parser.parsePlan("vacuum a.123D_column") === - VacuumTableCommand(None, Some(TableIdentifier("123D_column", Some("a"))), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM", None), + None, false)) assert(parser.parsePlan("vacuum a.123BD_column") === - VacuumTableCommand(None, Some(TableIdentifier("123BD_column", Some("a"))), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM", None), + None, false)) + + assert(parser.parsePlan("vacuum delta.`/tmp/table`") === + VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM", None), + None, false)) + + assert(parser.parsePlan("vacuum \"/tmp/table\"") === + VacuumTableCommand(UnresolvedPathBasedDeltaTable("/tmp/table", "VACUUM"), None, false)) } test("OPTIMIZE command is parsed as expected") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala new file mode 100644 index 00000000000..307f5e46452 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala @@ -0,0 +1,128 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import io.delta.tables.execution.VacuumTableCommand +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils +import org.apache.spark.SparkConf +import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable + +class CustomCatalogSuite extends QueryTest with SharedSparkSession + with DeltaSQLCommandTest { + + override def sparkConf: SparkConf = + super.sparkConf.set("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) + + private def verifyVacuumPath(query: String, expected: Path): Unit = { + val plan = sql(query).queryExecution.analyzed + assert(plan.isInstanceOf[VacuumTableCommand]) + val path = getDeltaTable(plan.asInstanceOf[VacuumTableCommand].child, "VACUUM").path + assert(path == expected) + } + + test("Vacuum a table from DummyCatalog") { + val tableName = "vacuum_table" + withTable(tableName) { + sql("SET CATALOG dummy") + val dummyCatalog = + spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog] + sql(f"CREATE TABLE $tableName (id bigint) USING delta") + val tablePath = dummyCatalog.getTablePath(tableName) + verifyVacuumPath(s"VACUUM $tableName", tablePath) + verifyVacuumPath(s"VACUUM delta.`$tablePath`", tablePath) + + sql("SET CATALOG spark_catalog") + verifyVacuumPath(s"VACUUM dummy.default.$tableName", tablePath) + } + } +} + +class DummyCatalog extends TableCatalog { + private val spark: SparkSession = SparkSession.active + private val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath) + // scalastyle:off deltahadoopconfiguration + private val fs: FileSystem = + tempDir.getFileSystem(spark.sessionState.newHadoopConf()) + // scalastyle:on deltahadoopconfiguration + + override def name: String = "dummy" + + def getTablePath(tableName: String): Path = { + new Path(tempDir, tableName) + } + override def defaultNamespace(): Array[String] = Array("default") + + override def listTables(namespace: Array[String]): Array[Identifier] = { + val status = fs.listStatus(tempDir) + status.filter(_.isDirectory).map { dir => + Identifier.of(namespace, dir.getPath.getName) + } + } + + override def tableExists(ident: Identifier): Boolean = { + val tablePath = getTablePath(ident.name()) + fs.exists(tablePath) + } + override def loadTable(ident: Identifier): Table = { + val tablePath = getTablePath(ident.name()) + DeltaTableV2(spark, tablePath) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { + val tablePath = getTablePath(ident.name()) + // Create an empty Delta table on the tablePath + spark.range(0).write.format("delta").save(tablePath.toString) + DeltaTableV2(spark, tablePath) + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + throw new UnsupportedOperationException("Alter table operation is not supported.") + } + + override def dropTable(ident: Identifier): Boolean = { + val tablePath = getTablePath(ident.name()) + try { + fs.delete(tablePath, true) + true + } catch { + case _: Exception => false + } + } + + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + throw new UnsupportedOperationException("Rename table operation is not supported.") + } + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + // Initialize tempDir here + if (!fs.exists(tempDir)) { + fs.mkdirs(tempDir) + } + } +} From 8e81699cbd7f6070196ddfd84d72370e133b1ddb Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 11 Sep 2023 21:17:50 -0700 Subject: [PATCH 2/9] address comments --- .../delta/tables/execution/VacuumTableCommand.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index e34f0b487d7..5d840200176 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -40,14 +40,11 @@ case class VacuumTableCommand( override val output: Seq[Attribute] = Seq(AttributeReference("path", StringType, nullable = true)()) + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(child = newChild) + override def run(sparkSession: SparkSession): Seq[Row] = { val pathToVacuum = getDeltaTable(child, "VACUUM").path - val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToVacuum) - if (baseDeltaPath.isDefined) { - if (baseDeltaPath.get != pathToVacuum) { - throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get) - } - } val deltaLog = DeltaLog.forTable(sparkSession, pathToVacuum) if (!deltaLog.tableExists) { throw DeltaErrors.notADeltaTableException( @@ -56,7 +53,4 @@ case class VacuumTableCommand( } VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect() } - - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(child = newChild) } From 1ae8c85d90e267d49ef6cd202ffbaabd37be1f06 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 12 Sep 2023 10:47:09 -0700 Subject: [PATCH 3/9] fix test failures --- .../org/apache/spark/sql/delta/DeltaVacuumSuite.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 957399534d9..969eb825173 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -498,8 +498,7 @@ class DeltaVacuumSuite val e = intercept[AnalysisException] { vacuumSQLTest(tablePath, viewName) } - assert(e.getMessage.contains("not found") || - e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND")) + assert(e.getMessage.contains("v is a temp view. 'VACUUM' expects a table.")) } } } @@ -788,7 +787,7 @@ class DeltaVacuumSuite sql(s"vacuum '$path/v2=a' retain 0 hours") } assert(ex.getMessage.contains( - s"Please provide the base path ($path) when Vacuuming Delta tables.")) + s"`$path/v2=a` is not a Delta table. VACUUM is only supported for Delta tables.")) } } @@ -993,9 +992,7 @@ class DeltaVacuumSuite val e = intercept[AnalysisException] { sql(s"vacuum $table") } - Seq("VACUUM", "only supported for Delta tables").foreach { msg => - assert(e.getMessage.contains(msg)) - } + assert(e.getMessage.contains("is not a Delta table.")) } } withTempPath { tempDir => From 9c986eecd1113da2f0a2acda4f14f6a625747b43 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 12 Sep 2023 10:55:54 -0700 Subject: [PATCH 4/9] address Ryan's comment --- .../io/delta/tables/execution/VacuumTableCommand.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 5d840200176..98e8f7a0687 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -44,13 +44,12 @@ case class VacuumTableCommand( copy(child = newChild) override def run(sparkSession: SparkSession): Seq[Row] = { - val pathToVacuum = getDeltaTable(child, "VACUUM").path - val deltaLog = DeltaLog.forTable(sparkSession, pathToVacuum) - if (!deltaLog.tableExists) { + val deltaTable = getDeltaTable(child, "VACUUM") + if (!deltaTable.tableExists) { throw DeltaErrors.notADeltaTableException( "VACUUM", - DeltaTableIdentifier(path = Some(pathToVacuum.toString))) + DeltaTableIdentifier(path = Some(deltaTable.path.toString))) } - VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect() + VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours).collect() } } From 4f3946fc02a565639d5979162d703d94b1038431 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 12 Sep 2023 11:15:35 -0700 Subject: [PATCH 5/9] update parser; fix test regression --- .../io/delta/sql/parser/DeltaSqlParser.scala | 7 +----- .../tables/execution/VacuumTableCommand.scala | 23 +++++++++++++++---- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index def3993d631..8afa78e726a 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -307,14 +307,9 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { * }}} */ override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) { - val child = UnresolvedDeltaPathOrIdentifier( + VacuumTableCommand( Option(ctx.path).map(string), Option(ctx.table).map(visitTableIdentifier), - "VACUUM" - ) - - VacuumTableCommand( - child, Option(ctx.number).map(_.getText.toDouble), ctx.RUN != null) } diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 98e8f7a0687..192360e8a03 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -20,7 +20,8 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.UnaryLike -import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils, UnresolvedDeltaPathOrIdentifier} import org.apache.spark.sql.delta.commands.VacuumCommand import org.apache.spark.sql.delta.commands.VacuumCommand.getDeltaTable import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand} @@ -44,12 +45,24 @@ case class VacuumTableCommand( copy(child = newChild) override def run(sparkSession: SparkSession): Seq[Row] = { - val deltaTable = getDeltaTable(child, "VACUUM") - if (!deltaTable.tableExists) { + val pathToVacuum = getDeltaTable(child, "VACUUM").path + val deltaLog = DeltaLog.forTable(sparkSession, pathToVacuum) + if (!deltaLog.tableExists) { throw DeltaErrors.notADeltaTableException( "VACUUM", - DeltaTableIdentifier(path = Some(deltaTable.path.toString))) + DeltaTableIdentifier(path = Some(pathToVacuum.toString))) } - VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours).collect() + VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect() + } +} + +object VacuumTableCommand { + def apply( + path: Option[String], + table: Option[TableIdentifier], + horizonHours: Option[Double], + dryRun: Boolean): VacuumTableCommand = { + val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM") + VacuumTableCommand(child, horizonHours, dryRun) } } From 5d9bf8810db1cda245f4088ce0c6e0b7a20735c2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 12 Sep 2023 11:16:58 -0700 Subject: [PATCH 6/9] address nits --- .../org/apache/spark/sql/delta/ResolveDeltaPathTable.scala | 2 +- .../scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala index 050a4d75781..6ff0541c6ce 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala @@ -38,7 +38,7 @@ case class ResolveDeltaPathTable(sparkSession: SparkSession) extends Rule[Logica case u: UnresolvedTable if maybeSQLFile(u) => val tableId = u.multipartIdentifier.asTableIdentifier if (DeltaTableUtils.isValidPath(tableId)) { - val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table)) + val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table)) DataSourceV2Relation.create(deltaTableV2, None, Some(u.multipartIdentifier.asIdentifier)) } else { u diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala index 307f5e46452..3ac94e44ee0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala @@ -63,8 +63,7 @@ class DummyCatalog extends TableCatalog { private val spark: SparkSession = SparkSession.active private val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath) // scalastyle:off deltahadoopconfiguration - private val fs: FileSystem = - tempDir.getFileSystem(spark.sessionState.newHadoopConf()) + private val fs: FileSystem = tempDir.getFileSystem(spark.sessionState.newHadoopConf()) // scalastyle:on deltahadoopconfiguration override def name: String = "dummy" From d8a88467388d974a8f7cccb4ac85c06f084e14b1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 12 Sep 2023 12:58:33 -0700 Subject: [PATCH 7/9] improve delta log check; resolve file path table as ResolvedTable for consistency with existing code --- .../delta/tables/execution/VacuumTableCommand.scala | 12 +++++++----- .../spark/sql/delta/ResolveDeltaPathTable.scala | 9 +++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 192360e8a03..580a3c56cef 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -45,14 +45,16 @@ case class VacuumTableCommand( copy(child = newChild) override def run(sparkSession: SparkSession): Seq[Row] = { - val pathToVacuum = getDeltaTable(child, "VACUUM").path - val deltaLog = DeltaLog.forTable(sparkSession, pathToVacuum) - if (!deltaLog.tableExists) { + val deltaTable = getDeltaTable(child, "VACUUM") + if (!deltaTable.tableExists || + // The table path can be of a partition directory and the deltaLog data path is + // the actual table path. In such cases, we should not allow vacuuming the table. + !deltaTable.path.toUri.getPath.equals(deltaTable.deltaLog.dataPath.toUri.getPath)) { throw DeltaErrors.notADeltaTableException( "VACUUM", - DeltaTableIdentifier(path = Some(pathToVacuum.toString))) + DeltaTableIdentifier(path = Some(deltaTable.path.toString))) } - VacuumCommand.gc(sparkSession, deltaLog, dryRun, horizonHours).collect() + VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours).collect() } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala index 6ff0541c6ce..b10ea108612 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaPathTable.scala @@ -18,12 +18,11 @@ package org.apache.spark.sql.delta import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.UnresolvedTable +import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper} import org.apache.spark.sql.delta.catalog.DeltaTableV2 -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation /** * Replaces [[UnresolvedTable]]s if the plan is for direct query on files. @@ -39,7 +38,9 @@ case class ResolveDeltaPathTable(sparkSession: SparkSession) extends Rule[Logica val tableId = u.multipartIdentifier.asTableIdentifier if (DeltaTableUtils.isValidPath(tableId)) { val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table)) - DataSourceV2Relation.create(deltaTableV2, None, Some(u.multipartIdentifier.asIdentifier)) + val sessionCatalog = + sparkSession.sessionState.catalogManager.v2SessionCatalog.asTableCatalog + ResolvedTable.create(sessionCatalog, u.multipartIdentifier.asIdentifier, deltaTableV2) } else { u } From 12d89a675d6338afe2057c395a989110a440b2ef Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 12 Sep 2023 13:07:57 -0700 Subject: [PATCH 8/9] introduce a new method hasPartitionFilters --- .../io/delta/tables/execution/VacuumTableCommand.scala | 7 +++---- .../org/apache/spark/sql/delta/catalog/DeltaTableV2.scala | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 580a3c56cef..dce1f2d7ac1 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -46,10 +46,9 @@ case class VacuumTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaTable = getDeltaTable(child, "VACUUM") - if (!deltaTable.tableExists || - // The table path can be of a partition directory and the deltaLog data path is - // the actual table path. In such cases, we should not allow vacuuming the table. - !deltaTable.path.toUri.getPath.equals(deltaTable.deltaLog.dataPath.toUri.getPath)) { + // The VACUUM command is only supported on existing delta tables. If the target table doesn't + // exist or it is based on a partition directory, an exception will be thrown. + if (!deltaTable.tableExists || deltaTable.hasPartitionFilters) { throw DeltaErrors.notADeltaTableException( "VACUUM", DeltaTableIdentifier(path = Some(deltaTable.path.toString))) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index 87b6915087d..79b8cb3940b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -71,6 +71,7 @@ case class DeltaTableV2( } } + def hasPartitionFilters: Boolean = partitionFilters.nonEmpty // This MUST be initialized before the deltaLog object is created, in order to accurately // bound the creation time of the table. From 2d7264b119172220e18ebc6dbd36021f5079b581 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 12 Sep 2023 19:46:04 -0700 Subject: [PATCH 9/9] retrigger tests