From d1c83792bb8a855afcb8ff20eeb0141ac592d674 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 7 Sep 2023 16:01:51 -0700 Subject: [PATCH 1/7] workable version --- .../io/delta/sql/parser/DeltaSqlParser.scala | 2 +- .../apache/spark/sql/delta/DeltaTable.scala | 13 ++- .../sql/delta/PreprocessTimeTravel.scala | 91 +++++++++++++++---- .../DescribeDeltaDetailsCommand.scala | 3 +- .../sql/parser/DeltaSqlParserSuite.scala | 18 +++- 5 files changed, 100 insertions(+), 27 deletions(-) diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index 8afa78e726a..41c1251726b 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -402,7 +402,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { } override def visitRestore(ctx: RestoreContext): LogicalPlan = withOrigin(ctx) { - val tableRelation = UnresolvedRelation(visitTableIdentifier(ctx.table)) + val tableRelation = UnresolvedIdentifier(ctx.table.identifier.asScala.toSeq.map(_.getText)) val timeTravelTableRelation = maybeTimeTravelChild(ctx.clause, tableRelation) RestoreTableStatement(timeTravelTableRelation.asInstanceOf[TimeTravel]) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index e75fc64bdeb..f7a9c853beb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -18,12 +18,10 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import scala.util.{Failure, Success, Try} - import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedTable} @@ -33,8 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.planning.NodeWithOnlyDeterministicProjectAndFilter import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils -import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -101,6 +100,14 @@ object DeltaTableUtils extends PredicateHelper tableIsNotTemporaryTable && tableExists && isDeltaTable(catalog.getTableMetadata(tableName)) } + /** + * Check whether the provided table identifier is a Delta table based on information from the Catalog. + */ + def isDeltaTable(tableCatalog: TableCatalog, identifier: Identifier): Boolean = { + val tableExists = tableCatalog.tableExists(identifier) + tableExists && tableCatalog.loadTable(identifier).isInstanceOf[DeltaTableV2] + } + /** Check if the provided path is the root or the children of a Delta table. */ def isDeltaTable( spark: SparkSession, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala index a4b4f092e23..113d3a07fc5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala @@ -17,19 +17,20 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.catalyst.TimeTravel -import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2} import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, UnresolvedIdentifier, UnresolvedRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.quoteIfNeeded -import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{IdentifierHelper, MultipartIdentifierHelper} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog} +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** * Resolves the [[UnresolvedRelation]] in command 's child [[TimeTravel]]. @@ -42,9 +43,13 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical override def conf: SQLConf = sparkSession.sessionState.conf + private val globalTempDB = conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { - case _ @ RestoreTableStatement(tt @ TimeTravel(ur @ UnresolvedRelation(_, _, _), _, _, _)) => - val sourceRelation = resolveTimeTravelTable(sparkSession, ur) + // Since the Delta's TimeTravel is a leaf node, we have to resolve the UnresolvedIdentifier. + case _ @ RestoreTableStatement(tt @ TimeTravel(ui: UnresolvedIdentifier, _, _, _)) => + val sourceRelation = resolveTimetravelIdentifier(sparkSession, ui) return RestoreTableStatement( TimeTravel( sourceRelation, @@ -63,6 +68,51 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical tt.creationSource)) } + private def resolveTimetravelIdentifier( + sparkSession: SparkSession, + unresolvedIdentifier: UnresolvedIdentifier): DataSourceV2Relation = { + val (catalog, identifier) = + resolveCatalogAndIdentifier(sparkSession.sessionState.catalogManager, + unresolvedIdentifier.nameParts) + val tableId = identifier.asTableIdentifier + assert(catalog.isInstanceOf[TableCatalog], s"Catalog ${catalog.name()} must implement " + + s"TableCatalog to support loading Delta table.") + val tableCatalog = catalog.asInstanceOf[TableCatalog] + val deltaTableV2 = if (DeltaTableUtils.isDeltaTable(tableCatalog, identifier)) { + tableCatalog.loadTable(identifier).asInstanceOf[DeltaTableV2] + } else if (DeltaTableUtils.isValidPath(tableId)) { + DeltaTableV2(sparkSession, new Path(tableId.table)) + } else { + handleTableNotFound(sparkSession.sessionState.catalog, unresolvedIdentifier, tableId) + } + DataSourceV2Relation.create(deltaTableV2, None, Some(identifier)) + } + + private def resolveCatalogAndIdentifier( + catalogManager: CatalogManager, + nameParts: Seq[String]): (CatalogPlugin, Identifier) = { + assert(nameParts.nonEmpty) + if (nameParts.length == 1) { + (catalogManager.currentCatalog, + Identifier.of(catalogManager.currentNamespace, nameParts.head)) + } else if (nameParts.head.equalsIgnoreCase(globalTempDB)) { + // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog + // API does not support view yet, and we have to use v1 commands to deal with global temp + // views. To simplify the implementation, we put global temp views in a special namespace + // in the session catalog. The special namespace has higher priority during name resolution. + // For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`, + // this custom catalog can't be accessed. + (catalogManager.v2SessionCatalog, nameParts.asIdentifier) + } else { + try { + (catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier) + } catch { + case _: CatalogNotFoundException => + (catalogManager.currentCatalog, nameParts.asIdentifier) + } + } + } + /** * Helper to resolve a [[TimeTravel]] logical plan to Delta DSv2 relation. */ @@ -82,19 +132,26 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical } else if (DeltaTableUtils.isValidPath(tableId)) { DeltaTableV2(sparkSession, new Path(tableId.table)) } else { - if ( - (catalog.tableExists(tableId) && - catalog.getTableMetadata(tableId).tableType == CatalogTableType.VIEW) || - catalog.isTempView(ur.multipartIdentifier)) { - // If table exists and not found to be a view, throw not supported error - throw DeltaErrors.notADeltaTableException("RESTORE") - } else { - ur.tableNotFound(ur.multipartIdentifier) - } + handleTableNotFound(catalog, ur, tableId) } val identifier = deltaTableV2.getTableIdentifierIfExists.map( id => Identifier.of(id.database.toArray, id.table)) DataSourceV2Relation.create(deltaTableV2, None, identifier) } + + private def handleTableNotFound( + catalog: SessionCatalog, + inputPlan: LeafNode, + tableIdentifier: TableIdentifier): Nothing = { + if ( + (catalog.tableExists(tableIdentifier) && + catalog.getTableMetadata(tableIdentifier).tableType == CatalogTableType.VIEW) || + catalog.isTempView(tableIdentifier)) { + // If table exists and not found to be a view, throw not supported error + throw DeltaErrors.notADeltaTableException("RESTORE") + } else { + inputPlan.tableNotFound(tableIdentifier.nameParts) + } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala index 863f4031df2..0f84f6b87f3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala @@ -19,17 +19,16 @@ package org.apache.spark.sql.delta.commands // scalastyle:off import.ordering.noEmptyLine import java.io.FileNotFoundException import java.sql.Timestamp - import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, Snapshot} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs.Path - import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.types.StructType diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 860065a9ec9..b39a4e2fc4e 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -17,17 +17,16 @@ package io.delta.sql.parser import io.delta.tables.execution.VacuumTableCommand - import org.apache.spark.sql.delta.CloneTableSQLTestUtils import org.apache.spark.sql.delta.UnresolvedPathBasedDeltaTable -import org.apache.spark.sql.delta.commands.{OptimizeTableCommand, DeltaReorgTable} +import org.apache.spark.sql.delta.commands.{DeltaReorgTable, OptimizeTableCommand} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel} -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedIdentifier, UnresolvedRelation, UnresolvedTable} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.catalyst.plans.logical.CloneTableStatement +import org.apache.spark.sql.catalyst.plans.logical.{CloneTableStatement, RestoreTableStatement} class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { @@ -48,6 +47,17 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { VacuumTableCommand(None, Some(TableIdentifier("123BD_column", Some("a"))), None, false)) } + test("RESTORE command is parsed as expected") { + val parser = new DeltaSqlParser(null) + val parsedCmd = parser.parsePlan("RESTORE catalog_foo.db.tbl TO VERSION AS OF 1;") + assert(parsedCmd === + RestoreTableStatement(TimeTravel( + UnresolvedIdentifier(Seq("catalog_foo", "db", "tbl")), + None, + Some(1), + Some("sql")))) + } + test("OPTIMIZE command is parsed as expected") { val parser = new DeltaSqlParser(null) var parsedCmd = parser.parsePlan("OPTIMIZE tbl") From 013f964ceb03650de2ddc5871095303e1089d1cd Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 7 Sep 2023 16:11:17 -0700 Subject: [PATCH 2/7] fix temp view --- .../org/apache/spark/sql/delta/PreprocessTimeTravel.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala index 113d3a07fc5..da34f186067 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala @@ -74,7 +74,12 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical val (catalog, identifier) = resolveCatalogAndIdentifier(sparkSession.sessionState.catalogManager, unresolvedIdentifier.nameParts) - val tableId = identifier.asTableIdentifier + // If the identifier is not a multipart identifier, we assume it is a table or view name. + val tableId = if (unresolvedIdentifier.nameParts.length == 1) { + TableIdentifier(unresolvedIdentifier.nameParts.head) + } else { + identifier.asTableIdentifier + } assert(catalog.isInstanceOf[TableCatalog], s"Catalog ${catalog.name()} must implement " + s"TableCatalog to support loading Delta table.") val tableCatalog = catalog.asInstanceOf[TableCatalog] From 90b616b069ed35db33794137d6c55c27e37272b5 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 7 Sep 2023 21:44:00 -0700 Subject: [PATCH 3/7] add CustomCatalogSuite.scala --- .../spark/sql/delta/CustomCatalogSuite.scala | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala new file mode 100644 index 00000000000..146688f02c9 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala @@ -0,0 +1,138 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import java.util +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import io.delta.tables.DeltaTable +import org.apache.hadoop.fs.{FileSystem, Path} + +import java.nio.file.{Files, Paths} +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils +import org.apache.spark.SparkConf + +import java.io.{File, IOException} + + +class CustomCatalogSuite extends QueryTest with SharedSparkSession + with DeltaSQLCommandTest { + + override def sparkConf: SparkConf = + super.sparkConf.set("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) + + test("RESTORE a table from DummyCatalog") { + val tableName = "table1" + withTable(tableName) { + sql("SET CATALOG dummy") + val dummyCatalog = spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog] + val tablePath = dummyCatalog.getTablePath(tableName) + sql(f"CREATE TABLE $tableName (id bigint) USING delta") + sql(f"INSERT INTO delta.`$tablePath` VALUES (0)") + sql(f"INSERT INTO delta.`$tablePath` VALUES (1)") + sql(f"RESTORE TABLE $tableName VERSION AS OF 0") + checkAnswer(spark.table(tableName), Nil) + sql(f"RESTORE TABLE $tableName VERSION AS OF 1") + checkAnswer(spark.table(tableName), spark.range(1).toDF()) + // Test 3-part identifier + sql(f"RESTORE TABLE dummy.default.$tableName VERSION AS OF 2") + checkAnswer(spark.table(tableName), spark.range(2).toDF()) + + sql("SET CATALOG spark_catalog") + // Test 3-part identifier when the current catalog is the default catalog + sql(f"RESTORE TABLE dummy.default.$tableName VERSION AS OF 1") + checkAnswer(spark.table(f"dummy.default.$tableName"), spark.range(1).toDF()) + } + } +} + +class DummyCatalog extends TableCatalog { + private val spark: SparkSession = SparkSession.active + private val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath) + private val fs: FileSystem = tempDir.getFileSystem(spark.sessionState.newHadoopConf()) + + override def name: String = "dummy" + + def getTablePath(tableName: String): Path = { + new Path(tempDir, tableName) + } + override def defaultNamespace(): Array[String] = Array("default") + + override def listTables(namespace: Array[String]): Array[Identifier] = { + val status = fs.listStatus(tempDir) + status.filter(_.isDirectory).map { dir => + Identifier.of(namespace, dir.getPath.getName) + } + } + + override def tableExists(ident: Identifier): Boolean = { + val tablePath = getTablePath(ident.name()) + fs.exists(tablePath) + } + override def loadTable(ident: Identifier): Table = { + val tablePath = getTablePath(ident.name()) + DeltaTableV2(spark, tablePath) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { + val tablePath = getTablePath(ident.name()) + // Create an empty Delta table on the tablePath + spark.range(0).write.format("delta").save(tablePath.toString) + DeltaTableV2(spark, tablePath) + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + throw new UnsupportedOperationException("Alter table operation is not supported.") + } + + override def dropTable(ident: Identifier): Boolean = { + val tablePath = getTablePath(ident.name()) + try { + fs.delete(tablePath, true) + true + } catch { + case _: Exception => false + } + } + + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + throw new UnsupportedOperationException("Rename table operation is not supported.") + } + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + // Initialize tempDir here + if (!fs.exists(tempDir)) { + fs.mkdirs(tempDir) + } + } +} + + From be93ff5f5cf9a97cba95a4d8ace79df24a1709e7 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 7 Sep 2023 21:57:24 -0700 Subject: [PATCH 4/7] move resolveCatalogAndIdentifier to DeltaTableUtils --- .../apache/spark/sql/delta/DeltaTable.scala | 35 +++++++++++++++++-- .../sql/delta/PreprocessTimeTravel.scala | 32 ++--------------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index f7a9c853beb..a570d69ad28 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -31,11 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.planning.NodeWithOnlyDeterministicProjectAndFilter import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -83,6 +83,8 @@ object DeltaTableUtils extends PredicateHelper // The valid hadoop prefixes passed through `DeltaTable.forPath` or DataFrame APIs. val validDeltaTableHadoopPrefixes: List[String] = List("fs.", "dfs.") + private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + /** Check whether this table is a Delta table based on information from the Catalog. */ def isDeltaTable(table: CatalogTable): Boolean = DeltaSourceUtils.isDeltaTable(table.provider) @@ -141,6 +143,35 @@ object DeltaTableUtils extends PredicateHelper } } + // Resolve the input name parts to a CatalogPlugin and Identifier. + // The code is from Apache Spark on org.apache.spark.sql.connector.catalog.CatalogAndIdentifier. + // We need to use this because the original one is private. + def resolveCatalogAndIdentifier( + catalogManager: CatalogManager, + nameParts: Seq[String]): (CatalogPlugin, Identifier) = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper + assert(nameParts.nonEmpty) + if (nameParts.length == 1) { + (catalogManager.currentCatalog, + Identifier.of(catalogManager.currentNamespace, nameParts.head)) + } else if (nameParts.head.equalsIgnoreCase(globalTempDB)) { + // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog + // API does not support view yet, and we have to use v1 commands to deal with global temp + // views. To simplify the implementation, we put global temp views in a special namespace + // in the session catalog. The special namespace has higher priority during name resolution. + // For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`, + // this custom catalog can't be accessed. + (catalogManager.v2SessionCatalog, nameParts.asIdentifier) + } else { + try { + (catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier) + } catch { + case _: CatalogNotFoundException => + (catalogManager.currentCatalog, nameParts.asIdentifier) + } + } + } + /** * It's possible that checking whether database exists can throw an exception. In that case, * we want to surface the exception only if the provided tableIdentifier cannot be a path. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala index da34f186067..4e8669ff949 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTimeTravel.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.catalyst.TimeTravel import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2} import org.apache.hadoop.fs.Path - import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, UnresolvedIdentifier, UnresolvedRelation} @@ -27,8 +26,9 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.quoteIfNeeded -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{IdentifierHelper, MultipartIdentifierHelper} +import org.apache.spark.sql.delta.DeltaTableUtils.resolveCatalogAndIdentifier import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -43,9 +43,6 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical override def conf: SQLConf = sparkSession.sessionState.conf - private val globalTempDB = conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) - - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { // Since the Delta's TimeTravel is a leaf node, we have to resolve the UnresolvedIdentifier. case _ @ RestoreTableStatement(tt @ TimeTravel(ui: UnresolvedIdentifier, _, _, _)) => @@ -93,31 +90,6 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical DataSourceV2Relation.create(deltaTableV2, None, Some(identifier)) } - private def resolveCatalogAndIdentifier( - catalogManager: CatalogManager, - nameParts: Seq[String]): (CatalogPlugin, Identifier) = { - assert(nameParts.nonEmpty) - if (nameParts.length == 1) { - (catalogManager.currentCatalog, - Identifier.of(catalogManager.currentNamespace, nameParts.head)) - } else if (nameParts.head.equalsIgnoreCase(globalTempDB)) { - // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog - // API does not support view yet, and we have to use v1 commands to deal with global temp - // views. To simplify the implementation, we put global temp views in a special namespace - // in the session catalog. The special namespace has higher priority during name resolution. - // For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`, - // this custom catalog can't be accessed. - (catalogManager.v2SessionCatalog, nameParts.asIdentifier) - } else { - try { - (catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier) - } catch { - case _: CatalogNotFoundException => - (catalogManager.currentCatalog, nameParts.asIdentifier) - } - } - } - /** * Helper to resolve a [[TimeTravel]] logical plan to Delta DSv2 relation. */ From 32e66aaaf36636070c66483b4202199ea543f8cd Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 7 Sep 2023 22:15:31 -0700 Subject: [PATCH 5/7] fix style --- .../apache/spark/sql/delta/CustomCatalogSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala index 146688f02c9..902f4f9a3d7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala @@ -49,7 +49,8 @@ class CustomCatalogSuite extends QueryTest with SharedSparkSession val tableName = "table1" withTable(tableName) { sql("SET CATALOG dummy") - val dummyCatalog = spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog] + val dummyCatalog = + spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog] val tablePath = dummyCatalog.getTablePath(tableName) sql(f"CREATE TABLE $tableName (id bigint) USING delta") sql(f"INSERT INTO delta.`$tablePath` VALUES (0)") @@ -73,7 +74,10 @@ class CustomCatalogSuite extends QueryTest with SharedSparkSession class DummyCatalog extends TableCatalog { private val spark: SparkSession = SparkSession.active private val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath) - private val fs: FileSystem = tempDir.getFileSystem(spark.sessionState.newHadoopConf()) + // scalastyle:off deltahadoopconfiguration + private val fs: FileSystem = + tempDir.getFileSystem(spark.sessionState.newHadoopConf()) + // scalastyle:on deltahadoopconfiguration override def name: String = "dummy" @@ -134,5 +138,3 @@ class DummyCatalog extends TableCatalog { } } } - - From 01e7d796fbcb58f673b374a379970c1c4f2c1fa1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 7 Sep 2023 22:21:04 -0700 Subject: [PATCH 6/7] fix style in DeltaTable.scala --- .../src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index a570d69ad28..bd3dc1881bb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -103,7 +103,8 @@ object DeltaTableUtils extends PredicateHelper } /** - * Check whether the provided table identifier is a Delta table based on information from the Catalog. + * Check whether the provided table identifier is a Delta table based on information + * from the Catalog. */ def isDeltaTable(tableCatalog: TableCatalog, identifier: Identifier): Boolean = { val tableExists = tableCatalog.tableExists(identifier) From c0baf095451b3bff3f2f7238fa3fce6ffc1d9ae6 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 7 Sep 2023 23:08:37 -0700 Subject: [PATCH 7/7] remove unnecessary changes; add one comment --- .../spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala | 3 ++- .../scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala index 0f84f6b87f3..863f4031df2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DescribeDeltaDetailsCommand.scala @@ -19,16 +19,17 @@ package org.apache.spark.sql.delta.commands // scalastyle:off import.ordering.noEmptyLine import java.io.FileNotFoundException import java.sql.Timestamp + import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, Snapshot} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs.Path + import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.types.StructType diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala index 902f4f9a3d7..4dcb43a3516 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CustomCatalogSuite.scala @@ -53,6 +53,8 @@ class CustomCatalogSuite extends QueryTest with SharedSparkSession spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog] val tablePath = dummyCatalog.getTablePath(tableName) sql(f"CREATE TABLE $tableName (id bigint) USING delta") + // Insert some data into the table in the dummy catalog. + // To make it simple, here we insert data directly into the table path. sql(f"INSERT INTO delta.`$tablePath` VALUES (0)") sql(f"INSERT INTO delta.`$tablePath` VALUES (1)") sql(f"RESTORE TABLE $tableName VERSION AS OF 0")