Skip to content

Commit

Permalink
Update Delta DESCRIBE HISTORY command to work with V1 and V2 tables
Browse files Browse the repository at this point in the history
  • Loading branch information
dtenedor committed Sep 2, 2023
1 parent 512562d commit d9ffe81
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,18 @@ class DeltaAnalysis(session: SparkSession)
throw DeltaErrors.notADeltaTableException("RESTORE")
}

// Resolve as a resolved table if the path is for delta table. For non delta table, we keep the
// path and pass it along.
case u: UnresolvedPath =>
val table = getPathBasedDeltaTable(u.path)
if (!table.tableExists) {
u
} else {
val catalog = session.sessionState.catalogManager.currentCatalog.asTableCatalog
ResolvedTable.create(
catalog, Identifier.of(Array(DeltaSourceUtils.ALT_NAME), u.path), table)
}

case u: UnresolvedPathBasedDeltaTable =>
val table = getPathBasedDeltaTable(u.path)
if (!table.tableExists) {
Expand Down
35 changes: 34 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedTable}
import org.apache.spark.sql.catalyst.analysis.{
NoSuchTableException,
UnresolvedLeafNode,
UnresolvedTable
}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
Expand Down Expand Up @@ -565,6 +569,13 @@ case class UnresolvedPathBasedDeltaTableRelation(
path: String,
options: CaseInsensitiveStringMap) extends UnresolvedPathBasedDeltaTableBase(path)

/** Resolves to a [[ResolvedTable]] if the path is for delta table, unchanged if non delta table */
case class UnresolvedPath(
path: String,
commandName: String) extends LeafNode {
override val output: Seq[Attribute] = Nil
}

/**
* A helper object with an apply method to transform a path or table identifier to a LogicalPlan.
* If the path is set, it will be resolved to an [[UnresolvedPathBasedDeltaTable]] whereas if the
Expand All @@ -585,3 +596,25 @@ object UnresolvedDeltaPathOrIdentifier {
}
}
}

/**
* A helper object with an apply method to transform a path or table identifier to a LogicalPlan.
* If the tableIdentifier is set, the LogicalPlan will be an [[UnresolvedTable]] regardless of path
* set or not. If the tableIdentifier is not set but the path is set, it will be resolved to an
* [[UnresolvedPath]] since we can not tell if a path is for delta table or non delta table at this
* stage. If the path is for delta table, it will be resolved during analysis as resolved table.
* If neither of the two are set, throws an exception.
*/
object UnresolvedPathOrIdentifier {
def apply(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
cmd: String): LogicalPlan = {
(path, tableIdentifier) match {
case (_, Some(t)) => UnresolvedTable(t.nameParts, cmd, None)
case (Some(p), None) => UnresolvedPath(p, cmd)
case _ => throw new IllegalArgumentException(
s"At least one of path or tableIdentifier must be provided to $cmd")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaOptions, DeltaTableIdentifier, DeltaTableUtils, OptimisticTransaction}
import com.databricks.sql.PathBasedDeltaTable

import org.apache.spark.sql.delta.{
DeltaErrors,
DeltaLog,
DeltaOptions,
DeltaTableIdentifier,
DeltaTableUtils,
OptimisticTransaction,
UnresolvedPath
}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.{DeltaTableV2, IcebergTablePlaceHolder}
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
Expand All @@ -33,7 +43,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, NoSuchTableException, ResolvedTable, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -294,6 +304,56 @@ trait DeltaCommand extends DeltaLogging {
}
}


/**
* Extracts [[CatalogTable]] metadata from a LogicalPlan if the plan is a [[ResolvedTable]].
*/
def getTableCatalogTable(target: LogicalPlan): Option[CatalogTable] = {
target match {
case ResolvedTable(_, _, d: DeltaTableV2, _) => d.catalogTable
case ResolvedTable(_, _, t: V1Table, _) => Some(t.catalogTable)
case _ => None
}
}

/**
* Helper method to extract the table id or path from a LogicalPlan representing
* a Delta table. This uses [[DeltaCommand.getDeltaTable]] to convert the LogicalPlan
* to a [[DeltaTableV2]] and then extracts either the path or identifier from it. If
* the [[DeltaTableV2]] has a [[CatalogTable]], the table identifier will be returned.
* Otherwise, the table's path will be returned. Throws an exception if the LogicalPlan
* does not represent a Delta table.
*/
def getDeltaTablePathOrIdentifier(
target: LogicalPlan,
cmd: String): (Option[TableIdentifier], Option[String]) = {
val table = getDeltaTable(target, cmd)
table.catalogTable match {
case Some(catalogTable) if !catalogTable.isInstanceOf[PathBasedDeltaTable] =>
(Some(catalogTable.identifier), None)
case _ => (None, Some(table.path.toString))
}
}

/**
* Helper method to extract the table id or path from a LogicalPlan representing a resolved table
* or path. This calls getDeltaTablePathOrIdentifier if the resolved table is a delta table. For
* non delta table with identifier, we extract its identifier. For non delta table with path, it
* will be represented as UnresolvedPath and pass along here, we extract its path.
*/
def getTablePathOrIdentifier(
target: LogicalPlan,
cmd: String): (Option[TableIdentifier], Option[String]) = {
target match {
case ResolvedTable(_, _, t: DeltaTableV2, _) => getDeltaTablePathOrIdentifier(target, cmd)
case ResolvedTable(_, _, t: V1Table, _) if DeltaTableUtils.isDeltaTable(t.catalogTable) =>
getDeltaTablePathOrIdentifier(target, cmd)
case ResolvedTable(_, _, t: V1Table, _) => (Some(t.catalogTable.identifier), None)
case u: UnresolvedPath => (None, Some(u.path))
case _ => (None, None)
}
}

/**
* Returns true if there is information in the spark session that indicates that this write
* has already been successfully written.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,74 @@
package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{DeltaErrors, DeltaHistory, DeltaLog, DeltaTableIdentifier}
import org.apache.spark.sql.delta.metering.DeltaLogging
import com.databricks.sql.acl.CheckPermissions
import org.apache.spark.sql.delta.{DeltaErrors, DeltaHistory, DeltaLog, UnresolvedDeltaPathOrIdentifier, UnresolvedPathBasedDeltaTable}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.{ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.UnaryRunnableCommand
import org.apache.spark.sql.types.StructType

object DescribeDeltaHistory {
/**
* Alternate constructor that converts a provided path or table identifier into the
* correct child LogicalPlan node. If both path and tableIdentifier are specified (or
* if both are None), this method will throw an exception. If a table identifier is
* specified, the child LogicalPlan will be an [[UnresolvedTable]] whereas if a path
* is specified, it will be an [[UnresolvedPathBasedDeltaTable]].
*
* Note that the returned command will have an *unresolved* child table and hence, the command
* needs to be analyzed before it can be executed.
*/
def apply(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
limit: Option[Int],
unusedOptions: Map[Nothing, Nothing],
output: Seq[Attribute] = schema.toAttributes
): DescribeDeltaHistoryCommand = {
val plan = UnresolvedDeltaPathOrIdentifier(path, tableIdentifier, commandName)
DescribeDeltaHistoryCommand(plan, limit, Map.empty[String, String])
}

val schema = ScalaReflection.schemaFor[DeltaHistory].dataType.asInstanceOf[StructType]
val commandName = "DESCRIBE HISTORY"
}

object DescribeDeltaHistoryCommand {
/** Same as above, but for the DescribeDeltaHistoryCommand class instead. */
def apply(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
limit: Option[Int],
unusedOptions: Map[Nothing, Nothing],
output: Seq[Attribute]
): DescribeDeltaHistoryCommand = {
val plan = UnresolvedDeltaPathOrIdentifier(
path, tableIdentifier, DescribeDeltaHistory.commandName)
DescribeDeltaHistoryCommand(plan, limit)
}
}

/**
* A logical placeholder for describing a Delta table's history, so that the history can be
* leveraged in subqueries. Replaced with `DescribeDeltaHistoryCommand` during planning.
*
* @param options: Hadoop file system options used for read and write.
* A logical placeholder for describing a Delta table's history. Currently unused, in the future
* this may be used so that the history can be leveraged in subqueries by replacing with
* `DescribeDeltaHistoryCommand` during planning.
*/
case class DescribeDeltaHistory(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
limit: Option[Int],
options: Map[String, String],
output: Seq[Attribute] = ExpressionEncoder[DeltaHistory]().schema.toAttributes)
extends LeafNode with MultiInstanceRelation {
unusedOptions: Map[Nothing, Nothing],
output: Seq[Attribute])
extends LeafNode with MultiInstanceRelation {
override def computeStats(): Statistics = Statistics(sizeInBytes = conf.defaultSizeInBytes)

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
}

Expand All @@ -54,36 +94,28 @@ case class DescribeDeltaHistory(
* @param options: Hadoop file system options used for read and write.
*/
case class DescribeDeltaHistoryCommand(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
override val child: LogicalPlan,
limit: Option[Int],
options: Map[String, String],
override val output: Seq[Attribute] = ExpressionEncoder[DeltaHistory]().schema.toAttributes)
extends LeafRunnableCommand with DeltaLogging {
options: Map[String, String] = Map.empty,
override val output: Seq[Attribute] = DescribeDeltaHistory.schema.toAttributes)
extends UnaryRunnableCommand with DeltaCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath =
if (path.nonEmpty) {
new Path(path.get)
} else if (tableIdentifier.nonEmpty) {
val sessionCatalog = sparkSession.sessionState.catalog
lazy val metadata = sessionCatalog.getTableMetadata(tableIdentifier.get)
override protected def withNewChildInternal(newChild: LogicalPlan): DescribeDeltaHistoryCommand =
copy(child = newChild)

DeltaTableIdentifier(sparkSession, tableIdentifier.get) match {
case Some(id) if id.path.nonEmpty =>
new Path(id.path.get)
case Some(id) if id.table.nonEmpty =>
new Path(metadata.location)
case _ =>
val isView = metadata.tableType == CatalogTableType.VIEW
if (isView) {
throw DeltaErrors.describeViewHistory
}
throw DeltaErrors.notADeltaTableException("DESCRIBE HISTORY")
override def run(sparkSession: SparkSession): Seq[Row] = {
import DescribeDeltaHistory.commandName
val tableMetadata: Option[CatalogTable] = getTableCatalogTable(child)
val path = getTablePathOrIdentifier(child, commandName)._2
val basePath = tableMetadata match {
case Some(metadata) =>
if (metadata.isMaterializedView) {
throw QueryCompilationErrors.unsupportedCmdForMaterializedViewError(commandName)
}
} else {
throw DeltaErrors.missingTableIdentifierException("DESCRIBE HISTORY")
}
new Path(metadata.location)
case _ if path.isDefined => new Path(path.get)
case _ => throw DeltaErrors.missingTableIdentifierException(commandName)
}

// Max array size
if (limit.exists(_ > Int.MaxValue - 8)) {
Expand All @@ -93,9 +125,8 @@ case class DescribeDeltaHistoryCommand(
val deltaLog = DeltaLog.forTable(sparkSession, basePath, options)
recordDeltaOperation(deltaLog, "delta.ddl.describeHistory") {
if (!deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException("DESCRIBE HISTORY")
throw DeltaErrors.notADeltaTableException(commandName)
}

import org.apache.spark.sql.delta.implicits._
val commits = deltaLog.history.getHistory(limit)
sparkSession.implicits.localSeqToDatasetHolder(commits).toDF().collect().toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ trait DescribeDeltaHistorySuiteBase
val e = intercept[AnalysisException] {
sql(s"DESCRIBE HISTORY $viewName").collect()
}
assert(e.getMessage.contains("history of a view"))
assert(e.getMessage.contains("expects a table"))
}
}

Expand All @@ -237,8 +237,7 @@ trait DescribeDeltaHistorySuiteBase
val e = intercept[AnalysisException] {
sql(s"DESCRIBE HISTORY $viewName").collect()
}
assert(e.getMessage.contains("not found") ||
e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND"))
assert(e.getMessage.contains("expects a table"))
}
}

Expand Down Expand Up @@ -579,7 +578,7 @@ trait DescribeDeltaHistorySuiteBase
val e = intercept[AnalysisException] {
sql(s"describe history $table").show()
}
Seq("DESCRIBE HISTORY", "only supported for Delta tables").foreach { msg =>
Seq("is not a Delta table").foreach { msg =>
assert(e.getMessage.contains(msg))
}
}
Expand Down

0 comments on commit d9ffe81

Please sign in to comment.