Skip to content

Commit

Permalink
[Spark] Rename DeltaTableV2.snapshot as initialSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-johnson-databricks committed Sep 15, 2023
1 parent 19b6c9e commit 755bf30
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 30 deletions.
24 changes: 12 additions & 12 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,21 @@ class DeltaAnalysis(session: SparkSession)
// used on the source delta table, then the corresponding fields would be set for the
// sourceTable and needs to be removed from the targetTable's configuration. The fields
// will then be set in the targetTable's configuration internally after.
val sourceMetadata = deltaLogSrc.initialSnapshot.metadata
val config =
deltaLogSrc.snapshot.metadata.configuration.-("delta.columnMapping.maxColumnId")
sourceMetadata.configuration.-("delta.columnMapping.maxColumnId")
.-(MaterializedRowId.MATERIALIZED_COLUMN_NAME_PROP)
.-(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP)

new CatalogTable(
identifier = targetTableIdentifier,
tableType = tblType,
storage = newStorage,
schema = deltaLogSrc.snapshot.metadata.schema,
schema = sourceMetadata.schema,
properties = config,
partitionColumnNames = deltaLogSrc.snapshot.metadata.partitionColumns,
partitionColumnNames = sourceMetadata.partitionColumns,
provider = Some("delta"),
comment = Option(deltaLogSrc.snapshot.metadata.description)
comment = Option(sourceMetadata.description)
)
} else { // Source table is not delta format
new CatalogTable(
Expand All @@ -181,7 +182,7 @@ class DeltaAnalysis(session: SparkSession)

val protocol =
if (src.provider.exists(DeltaSourceUtils.isDeltaDataSourceName)) {
Some(DeltaTableV2(session, new Path(src.location)).snapshot.protocol)
Some(DeltaTableV2(session, new Path(src.location)).initialSnapshot.protocol)
} else {
None
}
Expand Down Expand Up @@ -368,7 +369,7 @@ class DeltaAnalysis(session: SparkSession)
}
// restoring to same version as latest should be a no-op.
val sourceSnapshot = try {
traveledTable.snapshot
traveledTable.initialSnapshot
} catch {
case v: VersionNotFoundException =>
throw DeltaErrors.restoreVersionNotExistException(v.userVersion, v.earliest, v.latest)
Expand Down Expand Up @@ -439,8 +440,7 @@ class DeltaAnalysis(session: SparkSession)
case dsv2 @ DataSourceV2Relation(d: DeltaTableV2, _, _, _, options) =>
DeltaRelation.fromV2Relation(d, dsv2, options)

case ResolvedTable(_, _, d: DeltaTableV2, _)
if d.catalogTable.isEmpty && d.snapshot.version < 0 =>
case ResolvedTable(_, _, d: DeltaTableV2, _) if d.catalogTable.isEmpty && !d.tableExists =>
// This is DDL on a path based table that doesn't exist. CREATE will not hit this path, most
// SHOW / DESC code paths will hit this
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(path = Some(d.path.toString)))
Expand Down Expand Up @@ -716,7 +716,7 @@ class DeltaAnalysis(session: SparkSession)
val cloneSourceTable = sourceTbl match {
case source: CloneIcebergSource =>
// Reuse the existing schema so that the physical name of columns are consistent
source.copy(tableSchema = Some(deltaTableV2.snapshot.metadata.schema))
source.copy(tableSchema = Some(deltaTableV2.initialSnapshot.metadata.schema))
case other => other
}
val catalogTable = createCatalogTableForCloneCommand(
Expand Down Expand Up @@ -868,17 +868,17 @@ class DeltaAnalysis(session: SparkSession)
} else {
CaseInsensitiveMap(query.output.map(a => (a.name, a)).toMap)
}
val tableSchema = deltaTable.snapshot.metadata.schema
val tableSchema = deltaTable.initialSnapshot.metadata.schema
if (tableSchema.length != targetAttrs.length) {
// The target attributes may contain the metadata columns by design. Throwing an exception
// here in case target attributes may have the metadata columns for Delta in future.
throw DeltaErrors.schemaNotConsistentWithTarget(s"$tableSchema", s"$targetAttrs")
}
val nullAsDefault = deltaTable.spark.sessionState.conf.useNullsForMissingDefaultColumnValues
deltaTable.snapshot.metadata.schema.foreach { col =>
deltaTable.initialSnapshot.metadata.schema.foreach { col =>
if (!userSpecifiedNames.contains(col.name) &&
!ColumnWithDefaultExprUtils.columnHasDefaultExpr(
deltaTable.snapshot.protocol, col, nullAsDefault)) {
deltaTable.initialSnapshot.protocol, col, nullAsDefault)) {
throw DeltaErrors.missingColumnsInInsertInto(col.name)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
// To remove the feature we only need to remove the table property.
override def removeFeatureTracesIfNeeded(): Boolean = {
// Make sure feature data/metadata exist before proceeding.
if (TestRemovableWriterFeature.validateRemoval(table.snapshot)) return false
if (TestRemovableWriterFeature.validateRemoval(table.initialSnapshot)) return false

recordDeltaEvent(table.deltaLog, "delta.test.TestWriterFeaturePreDowngradeCommand")
val properties = Seq(TestRemovableWriterFeature.TABLE_PROP_KEY)
Expand All @@ -56,7 +56,7 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
// To remove the feature we only need to remove the table property.
override def removeFeatureTracesIfNeeded(): Boolean = {
// Make sure feature data/metadata exist before proceeding.
if (TestRemovableReaderWriterFeature.validateRemoval(table.snapshot)) return false
if (TestRemovableReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false

val properties = Seq(TestRemovableReaderWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
Expand All @@ -68,7 +68,7 @@ case class TestLegacyWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand {
/** Return true if we removed the property, false if no action was needed. */
override def removeFeatureTracesIfNeeded(): Boolean = {
if (TestRemovableLegacyWriterFeature.validateRemoval(table.snapshot)) return false
if (TestRemovableLegacyWriterFeature.validateRemoval(table.initialSnapshot)) return false

val properties = Seq(TestRemovableLegacyWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
Expand All @@ -80,7 +80,7 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand {
/** Return true if we removed the property, false if no action was needed. */
override def removeFeatureTracesIfNeeded(): Boolean = {
if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.snapshot)) return false
if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false

val properties = Seq(TestRemovableLegacyReaderWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
def getColumn(fieldNames: Seq[String]): (StructField, Option[ColumnPosition]) = {
columnUpdates.getOrElseUpdate(fieldNames, {
// TODO: Theoretically we should be able to fetch the snapshot from a txn.
val schema = table.snapshot.schema
val schema = table.initialSnapshot.schema
val colName = UnresolvedAttribute(fieldNames).name
val fieldOpt = schema.findNestedField(fieldNames, includeCollections = true,
spark.sessionState.conf.resolver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,18 @@ case class DeltaTableV2(

private lazy val caseInsensitiveOptions = new CaseInsensitiveStringMap(options.asJava)

lazy val snapshot: Snapshot = {
/**
* The snapshot initially associated with this table. It is captured on first access, usually (but
* not always) shortly after the table was first created, and is immutable once captured.
*
* WARNING: This snapshot could be arbitrarily stale for long-lived [[DeltaTableV2]] instances,
* such as the ones [[DeltaTable]] uses internally. Callers who cannot tolerate this potential
* staleness should use [[getFreshSnapshot]] instead.
*
* WARNING: Because the snapshot is captured lazily, callers should explicitly access the snapshot
* if they want to be certain it has been captured.
*/
lazy val initialSnapshot: Snapshot = {
timeTravelSpec.map { spec =>
// By default, block using CDF + time-travel
if (CDCReader.isCDCRead(caseInsensitiveOptions) &&
Expand Down Expand Up @@ -136,29 +147,30 @@ case class DeltaTableV2(
recordDeltaEvent(deltaLog, "delta.cdf.read",
data = caseInsensitiveOptions.asCaseSensitiveMap())
Some(CDCReader.getCDCRelation(
spark, snapshot, timeTravelSpec.nonEmpty, spark.sessionState.conf, caseInsensitiveOptions))
spark, initialSnapshot, timeTravelSpec.nonEmpty, spark.sessionState.conf,
caseInsensitiveOptions))
} else {
None
}
}

private lazy val tableSchema: StructType = {
val baseSchema = cdcRelation.map(_.schema).getOrElse {
DeltaTableUtils.removeInternalMetadata(spark, snapshot.schema)
DeltaTableUtils.removeInternalMetadata(spark, initialSnapshot.schema)
}
DeltaColumnMapping.dropColumnMappingMetadata(baseSchema)
}

override def schema(): StructType = tableSchema

override def partitioning(): Array[Transform] = {
snapshot.metadata.partitionColumns.map { col =>
initialSnapshot.metadata.partitionColumns.map { col =>
new IdentityTransform(new FieldReference(Seq(col)))
}.toArray
}

override def properties(): ju.Map[String, String] = {
val base = snapshot.getProperties
val base = initialSnapshot.getProperties
base.put(TableCatalog.PROP_PROVIDER, "delta")
base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri))
catalogTable.foreach { table =>
Expand All @@ -172,7 +184,7 @@ case class DeltaTableV2(
base.put(TableCatalog.PROP_EXTERNAL, "true")
}
}
Option(snapshot.metadata.description).foreach(base.put(TableCatalog.PROP_COMMENT, _))
Option(initialSnapshot.metadata.description).foreach(base.put(TableCatalog.PROP_COMMENT, _))
base.asJava
}

Expand All @@ -195,7 +207,7 @@ case class DeltaTableV2(
*/
def toBaseRelation: BaseRelation = {
// force update() if necessary in DataFrameReader.load code
snapshot
initialSnapshot
if (!tableExists) {
// special error handling for path based tables
if (catalogTable.isEmpty
Expand All @@ -208,11 +220,11 @@ case class DeltaTableV2(
throw DeltaErrors.nonExistentDeltaTable(id)
}
val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters(
path.toString, snapshot, partitionFilters)
path.toString, initialSnapshot, partitionFilters)

cdcRelation.getOrElse {
deltaLog.createRelation(
partitionPredicates, Some(snapshot), timeTravelSpec.isDefined)
partitionPredicates, Some(initialSnapshot), timeTravelSpec.isDefined)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class CloneDeltaSource(
sourceTable: DeltaTableV2) extends CloneSource {

private val deltaLog = sourceTable.deltaLog
private val sourceSnapshot = sourceTable.snapshot
private val sourceSnapshot = sourceTable.initialSnapshot

def format: String = CloneSourceFormat.DELTA

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ case class AlterTableDropFeatureDeltaCommand(

// Check whether the protocol contains the feature in either the writer features list or
// the reader+writer features list.
if (!table.snapshot.protocol.readerAndWriterFeatureNames.contains(featureName)) {
if (!table.initialSnapshot.protocol.readerAndWriterFeatureNames.contains(featureName)) {
throw DeltaErrors.dropTableFeatureFeatureNotSupportedByProtocol(featureName)
}

Expand Down Expand Up @@ -289,7 +289,7 @@ case class AlterTableDropFeatureDeltaCommand(
// certainly still contain traces of the feature. We don't have to run an expensive
// explicit check, but instead we fail straight away.
throw DeltaErrors.dropTableFeatureWaitForRetentionPeriod(
featureName, table.snapshot.metadata)
featureName, table.initialSnapshot.metadata)
}

val txn = startTransaction(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ trait CloneTableSuiteBase extends QueryTest
val sourceData = Dataset.ofRows(
spark,
LogicalRelation(sourceLog.createRelation(
snapshotToUseOpt = Some(deltaTable.snapshot),
snapshotToUseOpt = Some(deltaTable.initialSnapshot),
isTimeTravelQuery = sourceVersion.isDefined || sourceTimestamp.isDefined)))
(new CloneDeltaSource(deltaTable), sourceData)
}
Expand Down

0 comments on commit 755bf30

Please sign in to comment.