Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support external DSV2 catalog in RESTORE command #2033

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
}

override def visitRestore(ctx: RestoreContext): LogicalPlan = withOrigin(ctx) {
val tableRelation = UnresolvedRelation(visitTableIdentifier(ctx.table))
val tableRelation = UnresolvedIdentifier(ctx.table.identifier.asScala.toSeq.map(_.getText))
val timeTravelTableRelation = maybeTimeTravelChild(ctx.clause, tableRelation)
RestoreTableStatement(timeTravelTableRelation.asInstanceOf[TimeTravel])
}
Expand Down
47 changes: 43 additions & 4 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import scala.util.{Failure, Success, Try}

import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedTable}
Expand All @@ -33,10 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.planning.NodeWithOnlyDeterministicProjectAndFilter
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -84,6 +83,8 @@ object DeltaTableUtils extends PredicateHelper
// The valid hadoop prefixes passed through `DeltaTable.forPath` or DataFrame APIs.
val validDeltaTableHadoopPrefixes: List[String] = List("fs.", "dfs.")

private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)

/** Check whether this table is a Delta table based on information from the Catalog. */
def isDeltaTable(table: CatalogTable): Boolean = DeltaSourceUtils.isDeltaTable(table.provider)

Expand All @@ -101,6 +102,15 @@ object DeltaTableUtils extends PredicateHelper
tableIsNotTemporaryTable && tableExists && isDeltaTable(catalog.getTableMetadata(tableName))
}

/**
* Check whether the provided table identifier is a Delta table based on information
* from the Catalog.
*/
def isDeltaTable(tableCatalog: TableCatalog, identifier: Identifier): Boolean = {
val tableExists = tableCatalog.tableExists(identifier)
tableExists && tableCatalog.loadTable(identifier).isInstanceOf[DeltaTableV2]
}

/** Check if the provided path is the root or the children of a Delta table. */
def isDeltaTable(
spark: SparkSession,
Expand Down Expand Up @@ -134,6 +144,35 @@ object DeltaTableUtils extends PredicateHelper
}
}

// Resolve the input name parts to a CatalogPlugin and Identifier.
// The code is from Apache Spark on org.apache.spark.sql.connector.catalog.CatalogAndIdentifier.
// We need to use this because the original one is private.
def resolveCatalogAndIdentifier(
catalogManager: CatalogManager,
nameParts: Seq[String]): (CatalogPlugin, Identifier) = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
assert(nameParts.nonEmpty)
if (nameParts.length == 1) {
(catalogManager.currentCatalog,
Identifier.of(catalogManager.currentNamespace, nameParts.head))
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
// views. To simplify the implementation, we put global temp views in a special namespace
// in the session catalog. The special namespace has higher priority during name resolution.
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
// this custom catalog can't be accessed.
(catalogManager.v2SessionCatalog, nameParts.asIdentifier)
} else {
try {
(catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier)
} catch {
case _: CatalogNotFoundException =>
(catalogManager.currentCatalog, nameParts.asIdentifier)
}
}
}

/**
* It's possible that checking whether database exists can throw an exception. In that case,
* we want to surface the exception only if the provided tableIdentifier cannot be a path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{IdentifierHelper, MultipartIdentifierHelper}
import org.apache.spark.sql.delta.DeltaTableUtils.resolveCatalogAndIdentifier
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

/**
* Resolves the [[UnresolvedRelation]] in command 's child [[TimeTravel]].
Expand All @@ -43,8 +44,9 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical
override def conf: SQLConf = sparkSession.sessionState.conf

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case _ @ RestoreTableStatement(tt @ TimeTravel(ur @ UnresolvedRelation(_, _, _), _, _, _)) =>
val sourceRelation = resolveTimeTravelTable(sparkSession, ur)
// Since the Delta's TimeTravel is a leaf node, we have to resolve the UnresolvedIdentifier.
case _ @ RestoreTableStatement(tt @ TimeTravel(ui: UnresolvedIdentifier, _, _, _)) =>
val sourceRelation = resolveTimetravelIdentifier(sparkSession, ui)
return RestoreTableStatement(
TimeTravel(
sourceRelation,
Expand All @@ -63,6 +65,31 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical
tt.creationSource))
}

private def resolveTimetravelIdentifier(
sparkSession: SparkSession,
unresolvedIdentifier: UnresolvedIdentifier): DataSourceV2Relation = {
val (catalog, identifier) =
resolveCatalogAndIdentifier(sparkSession.sessionState.catalogManager,
unresolvedIdentifier.nameParts)
// If the identifier is not a multipart identifier, we assume it is a table or view name.
val tableId = if (unresolvedIdentifier.nameParts.length == 1) {
TableIdentifier(unresolvedIdentifier.nameParts.head)
} else {
identifier.asTableIdentifier
}
assert(catalog.isInstanceOf[TableCatalog], s"Catalog ${catalog.name()} must implement " +
s"TableCatalog to support loading Delta table.")
val tableCatalog = catalog.asInstanceOf[TableCatalog]
val deltaTableV2 = if (DeltaTableUtils.isDeltaTable(tableCatalog, identifier)) {
tableCatalog.loadTable(identifier).asInstanceOf[DeltaTableV2]
} else if (DeltaTableUtils.isValidPath(tableId)) {
DeltaTableV2(sparkSession, new Path(tableId.table))
} else {
handleTableNotFound(sparkSession.sessionState.catalog, unresolvedIdentifier, tableId)
}
DataSourceV2Relation.create(deltaTableV2, None, Some(identifier))
}

/**
* Helper to resolve a [[TimeTravel]] logical plan to Delta DSv2 relation.
*/
Expand All @@ -82,19 +109,26 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical
} else if (DeltaTableUtils.isValidPath(tableId)) {
DeltaTableV2(sparkSession, new Path(tableId.table))
} else {
if (
(catalog.tableExists(tableId) &&
catalog.getTableMetadata(tableId).tableType == CatalogTableType.VIEW) ||
catalog.isTempView(ur.multipartIdentifier)) {
// If table exists and not found to be a view, throw not supported error
throw DeltaErrors.notADeltaTableException("RESTORE")
} else {
ur.tableNotFound(ur.multipartIdentifier)
}
handleTableNotFound(catalog, ur, tableId)
}

val identifier = deltaTableV2.getTableIdentifierIfExists.map(
id => Identifier.of(id.database.toArray, id.table))
DataSourceV2Relation.create(deltaTableV2, None, identifier)
}

private def handleTableNotFound(
catalog: SessionCatalog,
inputPlan: LeafNode,
tableIdentifier: TableIdentifier): Nothing = {
if (
(catalog.tableExists(tableIdentifier) &&
catalog.getTableMetadata(tableIdentifier).tableType == CatalogTableType.VIEW) ||
catalog.isTempView(tableIdentifier)) {
// If table exists and not found to be a view, throw not supported error
throw DeltaErrors.notADeltaTableException("RESTORE")
} else {
inputPlan.tableNotFound(tableIdentifier.nameParts)
}
}
}
18 changes: 14 additions & 4 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
package io.delta.sql.parser

import io.delta.tables.execution.VacuumTableCommand

import org.apache.spark.sql.delta.CloneTableSQLTestUtils
import org.apache.spark.sql.delta.UnresolvedPathBasedDeltaTable
import org.apache.spark.sql.delta.commands.{OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.sql.delta.commands.{DeltaReorgTable, OptimizeTableCommand}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedIdentifier, UnresolvedRelation, UnresolvedTable}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical.CloneTableStatement
import org.apache.spark.sql.catalyst.plans.logical.{CloneTableStatement, RestoreTableStatement}

class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {

Expand All @@ -48,6 +47,17 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
VacuumTableCommand(None, Some(TableIdentifier("123BD_column", Some("a"))), None, false))
}

test("RESTORE command is parsed as expected") {
val parser = new DeltaSqlParser(null)
val parsedCmd = parser.parsePlan("RESTORE catalog_foo.db.tbl TO VERSION AS OF 1;")
assert(parsedCmd ===
RestoreTableStatement(TimeTravel(
UnresolvedIdentifier(Seq("catalog_foo", "db", "tbl")),
None,
Some(1),
Some("sql"))))
}

test("OPTIMIZE command is parsed as expected") {
val parser = new DeltaSqlParser(null)
var parsedCmd = parser.parsePlan("OPTIMIZE tbl")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta

import java.util
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import io.delta.tables.DeltaTable
import org.apache.hadoop.fs.{FileSystem, Path}

import java.nio.file.{Files, Paths}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
import org.apache.spark.SparkConf

import java.io.{File, IOException}


class CustomCatalogSuite extends QueryTest with SharedSparkSession
with DeltaSQLCommandTest {

override def sparkConf: SparkConf =
super.sparkConf.set("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)

test("RESTORE a table from DummyCatalog") {
val tableName = "table1"
withTable(tableName) {
sql("SET CATALOG dummy")
val dummyCatalog =
spark.sessionState.catalogManager.catalog("dummy").asInstanceOf[DummyCatalog]
val tablePath = dummyCatalog.getTablePath(tableName)
sql(f"CREATE TABLE $tableName (id bigint) USING delta")
// Insert some data into the table in the dummy catalog.
// To make it simple, here we insert data directly into the table path.
sql(f"INSERT INTO delta.`$tablePath` VALUES (0)")
sql(f"INSERT INTO delta.`$tablePath` VALUES (1)")
sql(f"RESTORE TABLE $tableName VERSION AS OF 0")
checkAnswer(spark.table(tableName), Nil)
sql(f"RESTORE TABLE $tableName VERSION AS OF 1")
checkAnswer(spark.table(tableName), spark.range(1).toDF())
// Test 3-part identifier
sql(f"RESTORE TABLE dummy.default.$tableName VERSION AS OF 2")
checkAnswer(spark.table(tableName), spark.range(2).toDF())

sql("SET CATALOG spark_catalog")
// Test 3-part identifier when the current catalog is the default catalog
sql(f"RESTORE TABLE dummy.default.$tableName VERSION AS OF 1")
checkAnswer(spark.table(f"dummy.default.$tableName"), spark.range(1).toDF())
}
}
}

class DummyCatalog extends TableCatalog {
private val spark: SparkSession = SparkSession.active
private val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath)
// scalastyle:off deltahadoopconfiguration
private val fs: FileSystem =
tempDir.getFileSystem(spark.sessionState.newHadoopConf())
// scalastyle:on deltahadoopconfiguration

override def name: String = "dummy"

def getTablePath(tableName: String): Path = {
new Path(tempDir, tableName)
}
override def defaultNamespace(): Array[String] = Array("default")

override def listTables(namespace: Array[String]): Array[Identifier] = {
val status = fs.listStatus(tempDir)
status.filter(_.isDirectory).map { dir =>
Identifier.of(namespace, dir.getPath.getName)
}
}

override def tableExists(ident: Identifier): Boolean = {
val tablePath = getTablePath(ident.name())
fs.exists(tablePath)
}
override def loadTable(ident: Identifier): Table = {
val tablePath = getTablePath(ident.name())
DeltaTableV2(spark, tablePath)
}

override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val tablePath = getTablePath(ident.name())
// Create an empty Delta table on the tablePath
spark.range(0).write.format("delta").save(tablePath.toString)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this use the passed-in schema? Otherwise it takes a DDL later to update it with info we already had...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can just create the DTV2 and call it good -- Delta knows how to handle the new/empty/missing directory case, tho it won't let you read such tables. Which comes back to the first comment -- if the table needs to be readable after this, it needs the correct schema, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this use the passed-in schema?

Since it is a dummy catalog, I try to make the schema fixed as "id: long"

Alternatively, we can just create the DTV2

Could you show me some details of how to use it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaTableV2(spark, tablePath.toString) should suffice? See DeltaTableV2.scala

DeltaTableV2(spark, tablePath)
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
throw new UnsupportedOperationException("Alter table operation is not supported.")
}

override def dropTable(ident: Identifier): Boolean = {
val tablePath = getTablePath(ident.name())
try {
fs.delete(tablePath, true)
true
} catch {
case _: Exception => false
}
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
throw new UnsupportedOperationException("Rename table operation is not supported.")
}

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
// Initialize tempDir here
if (!fs.exists(tempDir)) {
fs.mkdirs(tempDir)
}
}
}
Loading