diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index 053dc9c09ed..43100bdc3aa 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -22,7 +22,7 @@ import java.{util => ju} import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaOptions, DeltaTableIdentifier, DeltaTableUtils, DeltaTimeTravelSpec, GeneratedColumn, Snapshot} +import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging @@ -32,10 +32,13 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, V2TableWithV1Fallback} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableCapability._ +import org.apache.spark.sql.connector.catalog.V1Table import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1Write, WriteBuilder} import org.apache.spark.sql.errors.QueryCompilationErrors @@ -189,6 +192,23 @@ case class DeltaTableV2( deltaLog, info.options, spark.sessionState.conf.useNullsForMissingDefaultColumnValues) } + /** + * Starts a transaction for this table, using the snapshot captured during table resolution. + * + * WARNING: Caller is responsible to ensure that table resolution was recent (e.g. if working with + * [[DataFrame]] or [[DeltaTable]] API, where the table could have been resolved long ago). + */ + def startTransactionWithInitialSnapshot(): OptimisticTransaction = + startTransaction(Some(snapshot)) + + /** + * Starts a transaction for this table, using Some provided snapshot, or a fresh snapshot if None + * was provided. + */ + def startTransaction(snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = { + deltaLog.startTransaction(snapshotOpt) + } + /** * Creates a V1 BaseRelation from this Table to allow read APIs to go through V1 DataSource code * paths. @@ -266,6 +286,31 @@ case class DeltaTableV2( } } +object DeltaTableV2 { + /** Resolves a path into a DeltaTableV2, leveraging standard v2 table resolution. */ + def apply(spark: SparkSession, tablePath: Path, cmd: String): DeltaTableV2 = + resolve(spark, UnresolvedPathBasedDeltaTable(tablePath.toString, cmd), cmd) + + /** Resolves a table identifier into a DeltaTableV2, leveraging standard v2 table resolution. */ + def apply(spark: SparkSession, tableId: TableIdentifier, cmd: String): DeltaTableV2 = + resolve(spark, UnresolvedTable(tableId.nameParts, cmd, None), cmd) + + /** Applies standard v2 table resolution to an unresolved Delta table plan node */ + def resolve(spark: SparkSession, unresolved: LogicalPlan, cmd: String): DeltaTableV2 = + extractFrom(spark.sessionState.analyzer.ResolveRelations(unresolved), cmd) + + /** + * Extracts the DeltaTableV2 from a resolved Delta table plan node, throwing "table not found" if + * the node does not actually represent a resolved Delta table. + */ + def extractFrom(plan: LogicalPlan, cmd: String): DeltaTableV2 = plan match { + case ResolvedTable(_, _, d: DeltaTableV2, _) => d + case ResolvedTable(_, _, t: V1Table, _) if DeltaTableUtils.isDeltaTable(t.catalogTable) => + DeltaTableV2(SparkSession.active, new Path(t.v1Table.location), Some(t.v1Table)) + case _ => throw DeltaErrors.notADeltaTableException(cmd) + } +} + private class WriteIntoDeltaBuilder( log: DeltaLog, writeOptions: CaseInsensitiveStringMap, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala index d6541eb0891..e07294a5ab0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala @@ -286,12 +286,8 @@ trait DeltaCommand extends DeltaLogging { * other cases this method will throw a "Table not found" exception. */ def getDeltaTable(target: LogicalPlan, cmd: String): DeltaTableV2 = { - target match { - case ResolvedTable(_, _, d: DeltaTableV2, _) => d - case ResolvedTable(_, _, t: V1Table, _) if DeltaTableUtils.isDeltaTable(t.catalogTable) => - DeltaTableV2(SparkSession.active, new Path(t.v1Table.location), Some(t.v1Table)) - case _ => throw DeltaErrors.notADeltaTableException(cmd) - } + // TODO: Remove this wrapper and let former callers invoke DeltaTableV2.extractFrom directly. + DeltaTableV2.extractFrom(target, cmd) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 19a1cbeefdc..0c7a17f7ce8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -49,15 +49,16 @@ trait AlterDeltaTableCommand extends DeltaCommand { def table: DeltaTableV2 - protected def startTransaction(spark: SparkSession): OptimisticTransaction = { - val txn = table.deltaLog.startTransaction() + protected def startTransaction(): OptimisticTransaction = { + // WARNING: It's not safe to use startTransactionWithInitialSnapshot here. Some commands call + // this method more than once, and some commands can be created with a stale table. + val txn = table.startTransaction() if (txn.readVersion == -1) { throw DeltaErrors.notADeltaTableException(table.name()) } txn } - /** * Check if the column to change has any dependent expressions: * - generated column expressions @@ -106,7 +107,7 @@ case class AlterTableSetPropertiesDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() val metadata = txn.metadata val filteredConfs = configuration.filterKeys { @@ -154,7 +155,7 @@ case class AlterTableUnsetPropertiesDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog recordDeltaOperation(deltaLog, "delta.ddl.alter.unsetProperties") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() val metadata = txn.metadata val normalizedKeys = DeltaConfigs.normalizeConfigKeys(propKeys) @@ -292,7 +293,7 @@ case class AlterTableDropFeatureDeltaCommand( featureName, table.snapshot.metadata) } - val txn = startTransaction(sparkSession) + val txn = table.startTransaction() val snapshot = txn.snapshot // Verify whether all requirements hold before performing the protocol downgrade. @@ -348,7 +349,7 @@ case class AlterTableAddColumnsDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog recordDeltaOperation(deltaLog, "delta.ddl.alter.addColumns") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() if (SchemaUtils.filterRecursively( StructType(colsToAddWithPosition.map { @@ -444,7 +445,7 @@ case class AlterTableDropColumnsDeltaCommand( } val deltaLog = table.deltaLog recordDeltaOperation(deltaLog, "delta.ddl.alter.dropColumns") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() val metadata = txn.metadata if (txn.metadata.columnMappingMode == NoMapping) { throw DeltaErrors.dropColumnNotSupported(suggestUpgrade = true) @@ -504,7 +505,7 @@ case class AlterTableChangeColumnDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog recordDeltaOperation(deltaLog, "delta.ddl.alter.changeColumns") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() val metadata = txn.metadata val oldSchema = metadata.schema val resolver = sparkSession.sessionState.conf.resolver @@ -708,7 +709,7 @@ case class AlterTableReplaceColumnsDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { recordDeltaOperation(table.deltaLog, "delta.ddl.alter.replaceColumns") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() val metadata = txn.metadata val existingSchema = metadata.schema @@ -837,7 +838,7 @@ case class AlterTableAddConstraintDeltaCommand( throw DeltaErrors.invalidConstraintName(name) } recordDeltaOperation(deltaLog, "delta.ddl.alter.addConstraint") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() getConstraintWithName(table, name, txn.metadata, sparkSession).foreach { oldExpr => throw DeltaErrors.constraintAlreadyExists(name, oldExpr) @@ -889,7 +890,7 @@ case class AlterTableDropConstraintDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog recordDeltaOperation(deltaLog, "delta.ddl.alter.dropConstraint") { - val txn = startTransaction(sparkSession) + val txn = startTransaction() val oldExprText = Constraints.getExprTextByName(name, txn.metadata, sparkSession) if (oldExprText.isEmpty && !ifExists && !sparkSession.sessionState.conf.getConf( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index dd7ad12e7f7..3692948926a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -21,8 +21,10 @@ import java.util.UUID import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} import org.apache.hadoop.fs.Path @@ -357,12 +359,14 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta | tblproperties | ('${TableFeatureProtocolUtils.propertyKey(DomainMetadataTableFeature)}' = 'enabled') |""".stripMargin) - val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + val deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + val deltaLog = deltaTable.deltaLog val domainMetadatas = DomainMetadata( domain = "testDomain", configuration = JsonUtils.toJson(Map("key1" -> "value1")), removed = false) :: Nil - val version = deltaLog.startTransaction().commit(domainMetadatas, ManualUpdate) + val version = deltaTable.startTransactionWithInitialSnapshot() + .commit(domainMetadatas, ManualUpdate) val committedActions = deltaLog.store.read( FileNames.deltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index e07ef36620b..3d7d722dbba 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, Metadata => MetadataAction, Protocol, SetTransaction} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -1587,8 +1588,8 @@ class DeltaColumnMappingSuite extends QueryTest "t1", props = Map(DeltaConfigs.CHANGE_DATA_FEED.key -> cdfEnabled.toString)) - val log = DeltaLog.forTable(spark, TableIdentifier("t1")) - val currMetadata = log.snapshot.metadata + val table = DeltaTableV2(spark, TableIdentifier("t1")) + val currMetadata = table.snapshot.metadata val upgradeMetadata = currMetadata.copy( configuration = currMetadata.configuration ++ Map( DeltaConfigs.MIN_READER_VERSION.key -> "2", @@ -1597,7 +1598,7 @@ class DeltaColumnMappingSuite extends QueryTest ) ) - val txn = log.startTransaction() + val txn = table.startTransactionWithInitialSnapshot() txn.updateMetadata(upgradeMetadata) if (shouldBlock) { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala index 9cd99685306..bb59d06fb74 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala @@ -23,8 +23,10 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate, Truncate} import org.apache.spark.sql.delta.actions.{DomainMetadata, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} import org.junit.Assert._ @@ -64,12 +66,13 @@ class DomainMetadataSuite |""".stripMargin) (1 to 100).toDF("id").write.format("delta").mode("append").saveAsTable(table) - var deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) - assert(deltaLog.unsafeVolatileSnapshot.domainMetadata.isEmpty) + var deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + def deltaLog = deltaTable.deltaLog + assert(deltaTable.snapshot.domainMetadata.isEmpty) val domainMetadata = DomainMetadata("testDomain1", "", false) :: DomainMetadata("testDomain2", "{\"key1\":\"value1\"", false) :: Nil - deltaLog.startTransaction().commit(domainMetadata, Truncate()) + deltaTable.startTransactionWithInitialSnapshot().commit(domainMetadata, Truncate()) assertEquals(sortByDomain(domainMetadata), sortByDomain(deltaLog.update().domainMetadata)) assert(deltaLog.update().logSegment.checkpointProvider.version === -1) @@ -78,12 +81,12 @@ class DomainMetadataSuite // Clear the DeltaLog cache to force creating a new DeltaLog instance which will build // the Snapshot from the checkpoint file. DeltaLog.clearCache() - deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) - assert(!deltaLog.unsafeVolatileSnapshot.logSegment.checkpointProvider.isEmpty) + deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + assert(!deltaTable.snapshot.logSegment.checkpointProvider.isEmpty) assertEquals( sortByDomain(domainMetadata), - sortByDomain(deltaLog.unsafeVolatileSnapshot.domainMetadata)) + sortByDomain(deltaTable.snapshot.domainMetadata)) } } @@ -106,18 +109,19 @@ class DomainMetadataSuite (1 to 100).toDF("id").write.format("delta").mode("append").saveAsTable(table) DeltaLog.clearCache() - var deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) - assert(deltaLog.unsafeVolatileSnapshot.domainMetadata.isEmpty) + val deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + val deltaLog = deltaTable.deltaLog + assert(deltaTable.snapshot.domainMetadata.isEmpty) val domainMetadata = DomainMetadata("testDomain1", "", false) :: DomainMetadata("testDomain2", "{\"key1\":\"value1\"}", false) :: Nil - deltaLog.startTransaction().commit(domainMetadata, Truncate()) + deltaTable.startTransactionWithInitialSnapshot().commit(domainMetadata, Truncate()) assertEquals(sortByDomain(domainMetadata), sortByDomain(deltaLog.update().domainMetadata)) assert(deltaLog.update().logSegment.checkpointProvider.version === -1) // Delete testDomain1. - deltaLog.startTransaction().commit( + deltaTable.startTransaction().commit( DomainMetadata("testDomain1", "", true) :: Nil, Truncate()) val domainMetadatasAfterDeletion = DomainMetadata( "testDomain2", @@ -128,13 +132,11 @@ class DomainMetadataSuite // Create a new commit and validate the incrementally built snapshot state respects the // DomainMetadata deletion. - deltaLog.startTransaction().commit(Nil, ManualUpdate) - deltaLog.update() - assertEquals( - sortByDomain(domainMetadatasAfterDeletion), - deltaLog.unsafeVolatileSnapshot.domainMetadata) + deltaTable.startTransaction().commit(Nil, ManualUpdate) + var snapshot = deltaLog.update() + assertEquals(sortByDomain(domainMetadatasAfterDeletion), snapshot.domainMetadata) if (doCheckpoint) { - deltaLog.checkpoint(deltaLog.unsafeVolatileSnapshot) + deltaLog.checkpoint(snapshot) assertEquals( sortByDomain(domainMetadatasAfterDeletion), deltaLog.update().domainMetadata) @@ -142,7 +144,7 @@ class DomainMetadataSuite // force state reconstruction and validate it respects the DomainMetadata retention. DeltaLog.clearCache() - val snapshot = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table))._2 + snapshot = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table))._2 assertEquals(sortByDomain(domainMetadatasAfterDeletion), snapshot.domainMetadata) } } @@ -191,12 +193,12 @@ class DomainMetadataSuite | ('${TableFeatureProtocolUtils.propertyKey(DomainMetadataTableFeature)}' = 'enabled') |""".stripMargin) (1 to 100).toDF("id").write.format("delta").mode("append").saveAsTable(table) - val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + val deltaTable = DeltaTableV2(spark, TableIdentifier(table)) val domainMetadata = DomainMetadata("testDomain1", "", false) :: DomainMetadata("testDomain1", "", false) :: Nil val e = intercept[DeltaIllegalArgumentException] { - deltaLog.startTransaction().commit(domainMetadata, Truncate()) + deltaTable.startTransactionWithInitialSnapshot().commit(domainMetadata, Truncate()) } assertEquals(e.getMessage, "Internal error: two DomainMetadata actions within the same transaction have " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala index 8fbf6ab3cbc..b8715423d2c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.delta.{CheckConstraintsTableFeature, DeltaLog, DeltaOperations} import org.apache.spark.sql.delta.actions.{Metadata, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, Invariants} import org.apache.spark.sql.delta.constraints.Constraints.NotNull import org.apache.spark.sql.delta.constraints.Invariants.PersistedExpression @@ -400,8 +401,8 @@ class InvariantEnforcementSuite extends QueryTest withTable("constraint") { spark.range(10).selectExpr("id AS valueA", "id AS valueB", "id AS valueC") .write.format("delta").saveAsTable("constraint") - val log = DeltaLog.forTable(spark, TableIdentifier("constraint", None)) - val txn = log.startTransaction() + val table = DeltaTableV2(spark, TableIdentifier("constraint", None)) + val txn = table.startTransactionWithInitialSnapshot() val newMetadata = txn.metadata.copy( configuration = txn.metadata.configuration + ("delta.constraints.mychk" -> "valueA < valueB")) @@ -412,7 +413,7 @@ class InvariantEnforcementSuite extends QueryTest } else { CheckConstraintsTableFeature.minWriterVersion } - assert(log.snapshot.protocol.minWriterVersion === upVersion) + assert(table.deltaLog.unsafeVolatileSnapshot.protocol.minWriterVersion === upVersion) spark.sql("INSERT INTO constraint VALUES (50, 100, null)") val e = intercept[InvariantViolationException] { spark.sql("INSERT INTO constraint VALUES (100, 50, null)") @@ -438,8 +439,8 @@ class InvariantEnforcementSuite extends QueryTest withTable("constraint") { spark.range(10).selectExpr("id AS valueA", "id AS valueB") .write.format("delta").saveAsTable("constraint") - val log = DeltaLog.forTable(spark, TableIdentifier("constraint", None)) - val txn = log.startTransaction() + val table = DeltaTableV2(spark, TableIdentifier("constraint", None)) + val txn = table.startTransactionWithInitialSnapshot() val newMetadata = txn.metadata.copy( configuration = txn.metadata.configuration + ("delta.constraints.mychk" -> "valueA < valueB")) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala index d91f24454f6..9b9ec009e36 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql.delta.test import org.apache.spark.sql.delta.{DeltaLog, OptimisticTransaction, Snapshot} import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate, Operation, Write} import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier /** * Additional method definitions for Delta classes that are intended for use only in testing. @@ -88,4 +90,10 @@ object DeltaTestImplicits { deltaLog.enableExpiredLogCleanup(snapshot.metadata) } } + + implicit class DeltaTableV2ObjectTestHelper(dt: DeltaTableV2.type) { + /** Convenience overload that omits the cmd arg (which is not helpful in tests). */ + def apply(spark: SparkSession, id: TableIdentifier): DeltaTableV2 = + dt.apply(spark, id, "test") + } }