From 755bf30dca326c43643a65fa81f1eedb8c97f0dd Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Thu, 14 Sep 2023 06:49:44 -0700 Subject: [PATCH] [Spark] Rename DeltaTableV2.snapshot as initialSnapshot --- .../spark/sql/delta/DeltaAnalysis.scala | 24 +++++++-------- .../PreDowngradeTableFeatureCommand.scala | 8 ++--- .../sql/delta/catalog/DeltaCatalog.scala | 2 +- .../sql/delta/catalog/DeltaTableV2.scala | 30 +++++++++++++------ .../delta/commands/CloneTableCommand.scala | 2 +- .../commands/alterDeltaTableCommands.scala | 4 +-- .../spark/sql/delta/CloneTableSuiteBase.scala | 2 +- 7 files changed, 42 insertions(+), 30 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 48f1b7a87fb..cfc46bf12ea 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -145,8 +145,9 @@ class DeltaAnalysis(session: SparkSession) // used on the source delta table, then the corresponding fields would be set for the // sourceTable and needs to be removed from the targetTable's configuration. The fields // will then be set in the targetTable's configuration internally after. + val sourceMetadata = deltaLogSrc.initialSnapshot.metadata val config = - deltaLogSrc.snapshot.metadata.configuration.-("delta.columnMapping.maxColumnId") + sourceMetadata.configuration.-("delta.columnMapping.maxColumnId") .-(MaterializedRowId.MATERIALIZED_COLUMN_NAME_PROP) .-(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP) @@ -154,11 +155,11 @@ class DeltaAnalysis(session: SparkSession) identifier = targetTableIdentifier, tableType = tblType, storage = newStorage, - schema = deltaLogSrc.snapshot.metadata.schema, + schema = sourceMetadata.schema, properties = config, - partitionColumnNames = deltaLogSrc.snapshot.metadata.partitionColumns, + partitionColumnNames = sourceMetadata.partitionColumns, provider = Some("delta"), - comment = Option(deltaLogSrc.snapshot.metadata.description) + comment = Option(sourceMetadata.description) ) } else { // Source table is not delta format new CatalogTable( @@ -181,7 +182,7 @@ class DeltaAnalysis(session: SparkSession) val protocol = if (src.provider.exists(DeltaSourceUtils.isDeltaDataSourceName)) { - Some(DeltaTableV2(session, new Path(src.location)).snapshot.protocol) + Some(DeltaTableV2(session, new Path(src.location)).initialSnapshot.protocol) } else { None } @@ -368,7 +369,7 @@ class DeltaAnalysis(session: SparkSession) } // restoring to same version as latest should be a no-op. val sourceSnapshot = try { - traveledTable.snapshot + traveledTable.initialSnapshot } catch { case v: VersionNotFoundException => throw DeltaErrors.restoreVersionNotExistException(v.userVersion, v.earliest, v.latest) @@ -439,8 +440,7 @@ class DeltaAnalysis(session: SparkSession) case dsv2 @ DataSourceV2Relation(d: DeltaTableV2, _, _, _, options) => DeltaRelation.fromV2Relation(d, dsv2, options) - case ResolvedTable(_, _, d: DeltaTableV2, _) - if d.catalogTable.isEmpty && d.snapshot.version < 0 => + case ResolvedTable(_, _, d: DeltaTableV2, _) if d.catalogTable.isEmpty && !d.tableExists => // This is DDL on a path based table that doesn't exist. CREATE will not hit this path, most // SHOW / DESC code paths will hit this throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(path = Some(d.path.toString))) @@ -716,7 +716,7 @@ class DeltaAnalysis(session: SparkSession) val cloneSourceTable = sourceTbl match { case source: CloneIcebergSource => // Reuse the existing schema so that the physical name of columns are consistent - source.copy(tableSchema = Some(deltaTableV2.snapshot.metadata.schema)) + source.copy(tableSchema = Some(deltaTableV2.initialSnapshot.metadata.schema)) case other => other } val catalogTable = createCatalogTableForCloneCommand( @@ -868,17 +868,17 @@ class DeltaAnalysis(session: SparkSession) } else { CaseInsensitiveMap(query.output.map(a => (a.name, a)).toMap) } - val tableSchema = deltaTable.snapshot.metadata.schema + val tableSchema = deltaTable.initialSnapshot.metadata.schema if (tableSchema.length != targetAttrs.length) { // The target attributes may contain the metadata columns by design. Throwing an exception // here in case target attributes may have the metadata columns for Delta in future. throw DeltaErrors.schemaNotConsistentWithTarget(s"$tableSchema", s"$targetAttrs") } val nullAsDefault = deltaTable.spark.sessionState.conf.useNullsForMissingDefaultColumnValues - deltaTable.snapshot.metadata.schema.foreach { col => + deltaTable.initialSnapshot.metadata.schema.foreach { col => if (!userSpecifiedNames.contains(col.name) && !ColumnWithDefaultExprUtils.columnHasDefaultExpr( - deltaTable.snapshot.protocol, col, nullAsDefault)) { + deltaTable.initialSnapshot.protocol, col, nullAsDefault)) { throw DeltaErrors.missingColumnsInInsertInto(col.name) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 455be3632b3..c894eba0092 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -42,7 +42,7 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2) // To remove the feature we only need to remove the table property. override def removeFeatureTracesIfNeeded(): Boolean = { // Make sure feature data/metadata exist before proceeding. - if (TestRemovableWriterFeature.validateRemoval(table.snapshot)) return false + if (TestRemovableWriterFeature.validateRemoval(table.initialSnapshot)) return false recordDeltaEvent(table.deltaLog, "delta.test.TestWriterFeaturePreDowngradeCommand") val properties = Seq(TestRemovableWriterFeature.TABLE_PROP_KEY) @@ -56,7 +56,7 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) // To remove the feature we only need to remove the table property. override def removeFeatureTracesIfNeeded(): Boolean = { // Make sure feature data/metadata exist before proceeding. - if (TestRemovableReaderWriterFeature.validateRemoval(table.snapshot)) return false + if (TestRemovableReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false val properties = Seq(TestRemovableReaderWriterFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) @@ -68,7 +68,7 @@ case class TestLegacyWriterFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand { /** Return true if we removed the property, false if no action was needed. */ override def removeFeatureTracesIfNeeded(): Boolean = { - if (TestRemovableLegacyWriterFeature.validateRemoval(table.snapshot)) return false + if (TestRemovableLegacyWriterFeature.validateRemoval(table.initialSnapshot)) return false val properties = Seq(TestRemovableLegacyWriterFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) @@ -80,7 +80,7 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand { /** Return true if we removed the property, false if no action was needed. */ override def removeFeatureTracesIfNeeded(): Boolean = { - if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.snapshot)) return false + if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false val properties = Seq(TestRemovableLegacyReaderWriterFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala index 562091ee95c..0b30c189803 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala @@ -629,7 +629,7 @@ class DeltaCatalog extends DelegatingCatalogExtension def getColumn(fieldNames: Seq[String]): (StructField, Option[ColumnPosition]) = { columnUpdates.getOrElseUpdate(fieldNames, { // TODO: Theoretically we should be able to fetch the snapshot from a txn. - val schema = table.snapshot.schema + val schema = table.initialSnapshot.schema val colName = UnresolvedAttribute(fieldNames).name val fieldOpt = schema.findNestedField(fieldNames, includeCollections = true, spark.sessionState.conf.resolver) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index 053dc9c09ed..4bc643a2cdb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -103,7 +103,18 @@ case class DeltaTableV2( private lazy val caseInsensitiveOptions = new CaseInsensitiveStringMap(options.asJava) - lazy val snapshot: Snapshot = { + /** + * The snapshot initially associated with this table. It is captured on first access, usually (but + * not always) shortly after the table was first created, and is immutable once captured. + * + * WARNING: This snapshot could be arbitrarily stale for long-lived [[DeltaTableV2]] instances, + * such as the ones [[DeltaTable]] uses internally. Callers who cannot tolerate this potential + * staleness should use [[getFreshSnapshot]] instead. + * + * WARNING: Because the snapshot is captured lazily, callers should explicitly access the snapshot + * if they want to be certain it has been captured. + */ + lazy val initialSnapshot: Snapshot = { timeTravelSpec.map { spec => // By default, block using CDF + time-travel if (CDCReader.isCDCRead(caseInsensitiveOptions) && @@ -136,7 +147,8 @@ case class DeltaTableV2( recordDeltaEvent(deltaLog, "delta.cdf.read", data = caseInsensitiveOptions.asCaseSensitiveMap()) Some(CDCReader.getCDCRelation( - spark, snapshot, timeTravelSpec.nonEmpty, spark.sessionState.conf, caseInsensitiveOptions)) + spark, initialSnapshot, timeTravelSpec.nonEmpty, spark.sessionState.conf, + caseInsensitiveOptions)) } else { None } @@ -144,7 +156,7 @@ case class DeltaTableV2( private lazy val tableSchema: StructType = { val baseSchema = cdcRelation.map(_.schema).getOrElse { - DeltaTableUtils.removeInternalMetadata(spark, snapshot.schema) + DeltaTableUtils.removeInternalMetadata(spark, initialSnapshot.schema) } DeltaColumnMapping.dropColumnMappingMetadata(baseSchema) } @@ -152,13 +164,13 @@ case class DeltaTableV2( override def schema(): StructType = tableSchema override def partitioning(): Array[Transform] = { - snapshot.metadata.partitionColumns.map { col => + initialSnapshot.metadata.partitionColumns.map { col => new IdentityTransform(new FieldReference(Seq(col))) }.toArray } override def properties(): ju.Map[String, String] = { - val base = snapshot.getProperties + val base = initialSnapshot.getProperties base.put(TableCatalog.PROP_PROVIDER, "delta") base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri)) catalogTable.foreach { table => @@ -172,7 +184,7 @@ case class DeltaTableV2( base.put(TableCatalog.PROP_EXTERNAL, "true") } } - Option(snapshot.metadata.description).foreach(base.put(TableCatalog.PROP_COMMENT, _)) + Option(initialSnapshot.metadata.description).foreach(base.put(TableCatalog.PROP_COMMENT, _)) base.asJava } @@ -195,7 +207,7 @@ case class DeltaTableV2( */ def toBaseRelation: BaseRelation = { // force update() if necessary in DataFrameReader.load code - snapshot + initialSnapshot if (!tableExists) { // special error handling for path based tables if (catalogTable.isEmpty @@ -208,11 +220,11 @@ case class DeltaTableV2( throw DeltaErrors.nonExistentDeltaTable(id) } val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters( - path.toString, snapshot, partitionFilters) + path.toString, initialSnapshot, partitionFilters) cdcRelation.getOrElse { deltaLog.createRelation( - partitionPredicates, Some(snapshot), timeTravelSpec.isDefined) + partitionPredicates, Some(initialSnapshot), timeTravelSpec.isDefined) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala index 2514dd6f374..5da35112ddd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala @@ -149,7 +149,7 @@ class CloneDeltaSource( sourceTable: DeltaTableV2) extends CloneSource { private val deltaLog = sourceTable.deltaLog - private val sourceSnapshot = sourceTable.snapshot + private val sourceSnapshot = sourceTable.initialSnapshot def format: String = CloneSourceFormat.DELTA diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 19a1cbeefdc..97b834b8b55 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -257,7 +257,7 @@ case class AlterTableDropFeatureDeltaCommand( // Check whether the protocol contains the feature in either the writer features list or // the reader+writer features list. - if (!table.snapshot.protocol.readerAndWriterFeatureNames.contains(featureName)) { + if (!table.initialSnapshot.protocol.readerAndWriterFeatureNames.contains(featureName)) { throw DeltaErrors.dropTableFeatureFeatureNotSupportedByProtocol(featureName) } @@ -289,7 +289,7 @@ case class AlterTableDropFeatureDeltaCommand( // certainly still contain traces of the feature. We don't have to run an expensive // explicit check, but instead we fail straight away. throw DeltaErrors.dropTableFeatureWaitForRetentionPeriod( - featureName, table.snapshot.metadata) + featureName, table.initialSnapshot.metadata) } val txn = startTransaction(sparkSession) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala index b19fd9def4e..6cab79522a7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala @@ -177,7 +177,7 @@ trait CloneTableSuiteBase extends QueryTest val sourceData = Dataset.ofRows( spark, LogicalRelation(sourceLog.createRelation( - snapshotToUseOpt = Some(deltaTable.snapshot), + snapshotToUseOpt = Some(deltaTable.initialSnapshot), isTimeTravelQuery = sourceVersion.isDefined || sourceTimestamp.isDefined))) (new CloneDeltaSource(deltaTable), sourceData) }