diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 2748153eaa..1587c51eb8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -78,9 +78,7 @@ trait RecordChecksum extends DeltaLogging { private lazy val writer = CheckpointFileManager.create(deltaLog.logPath, deltaLog.newDeltaHadoopConf()) - private def getChecksum(snapshot: Snapshot): VersionChecksum = { - snapshot.checksumOpt.getOrElse(snapshot.computeChecksum) - } + private def getChecksum(snapshot: Snapshot): VersionChecksum = snapshot.computeChecksum protected def writeChecksumFile(txnId: String, snapshot: Snapshot): Unit = { if (!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED)) { @@ -277,6 +275,12 @@ trait RecordChecksum extends DeltaLogging { case _ => } + val setTransactions = incrementallyComputeSetTransactions( + oldSnapshot, oldVersionChecksum, attemptVersion, actions) + + val domainMetadata = incrementallyComputeDomainMetadatas( + oldSnapshot, oldVersionChecksum, attemptVersion, actions) + Right(VersionChecksum( txnId = txnIdOpt, tableSizeBytes = tableSizeBytes, @@ -286,13 +290,106 @@ trait RecordChecksum extends DeltaLogging { inCommitTimestampOpt = inCommitTimestamp, metadata = metadata, protocol = protocol, - setTransactions = None, - domainMetadata = None, + setTransactions = setTransactions, + domainMetadata = domainMetadata, histogramOpt = None, allFiles = None )) } + /** + * Incrementally compute [[Snapshot.setTransactions]] for the commit `attemptVersion`. + * + * @param oldSnapshot - snapshot corresponding to `attemptVersion` - 1 + * @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1 + * @param attemptVersion - version which we want to commit + * @param actionsToCommit - actions for commit `attemptVersion` + * @return Optional sequence of incrementally computed [[SetTransaction]]s for commit + * `attemptVersion`. + */ + private def incrementallyComputeSetTransactions( + oldSnapshot: Option[Snapshot], + oldVersionChecksum: VersionChecksum, + attemptVersion: Long, + actionsToCommit: Seq[Action]): Option[Seq[SetTransaction]] = { + // Check-1: check conf + if (!spark.conf.get(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC)) { + return None + } + + // Check-2: check `minSetTransactionRetentionTimestamp` is not set + val newMetadataToCommit = actionsToCommit.collectFirst { case m: Metadata => m } + // TODO: Add support for incrementally computing [[SetTransaction]]s even when + // `minSetTransactionRetentionTimestamp` is set. + // We don't incrementally compute [[SetTransaction]]s when user has configured + // `minSetTransactionRetentionTimestamp` as it makes verification non-deterministic. + // Check all places to figure out whether `minSetTransactionRetentionTimestamp` is set: + // 1. oldSnapshot corresponding to `attemptVersion - 1` + // 2. old VersionChecksum's MetaData (corresponding to `attemptVersion-1`) + // 3. new VersionChecksum's MetaData (corresponding to `attemptVersion`) + val setTransactionRetentionTimestampConfigured = + (oldSnapshot.map(_.metadata) ++ Option(oldVersionChecksum.metadata) ++ newMetadataToCommit) + .exists(DeltaLog.minSetTransactionRetentionInterval(_).nonEmpty) + if (setTransactionRetentionTimestampConfigured) return None + + // Check-3: Check old setTransactions are available so that we can incrementally compute new. + val oldSetTransactions = oldVersionChecksum.setTransactions + .getOrElse { return None } + + // Check-4: old/new setTransactions are within the threshold. + val setTransactionsToCommit = actionsToCommit.filter(_.isInstanceOf[SetTransaction]) + val threshold = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_SET_TRANSACTIONS_IN_CRC) + if (Math.max(setTransactionsToCommit.size, oldSetTransactions.size) > threshold) return None + + // We currently don't attempt incremental [[SetTransaction]] when + // `minSetTransactionRetentionTimestamp` is set. So passing this as None here explicitly. + // We can also ignore file retention because that only affects [[RemoveFile]] actions. + val logReplay = new InMemoryLogReplay( + minFileRetentionTimestamp = 0, + minSetTransactionRetentionTimestamp = None) + + logReplay.append(attemptVersion - 1, oldSetTransactions.toIterator) + logReplay.append(attemptVersion, setTransactionsToCommit.toIterator) + Some(logReplay.getTransactions.toSeq).filter(_.size <= threshold) + } + + /** + * Incrementally compute [[Snapshot.domainMetadata]] for the commit `attemptVersion`. + * + * @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1 + * @param attemptVersion - version which we want to commit + * @param actionsToCommit - actions for commit `attemptVersion` + * @return Sequence of incrementally computed [[DomainMetadata]]s for commit + * `attemptVersion`. + */ + private def incrementallyComputeDomainMetadatas( + oldSnapshot: Option[Snapshot], + oldVersionChecksum: VersionChecksum, + attemptVersion: Long, + actionsToCommit: Seq[Action]): Option[Seq[DomainMetadata]] = { + // Check old DomainMetadatas are available so that we can incrementally compute new. + val oldDomainMetadatas = oldVersionChecksum.domainMetadata + .getOrElse { return None } + val newDomainMetadatas = actionsToCommit.filter(_.isInstanceOf[DomainMetadata]) + + // We only work with DomainMetadata, so RemoveFile and SetTransaction retention don't matter. + val logReplay = new InMemoryLogReplay( + minFileRetentionTimestamp = 0, + minSetTransactionRetentionTimestamp = None) + + val threshold = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_DOMAIN_METADATAS_IN_CRC) + + logReplay.append(attemptVersion - 1, oldDomainMetadatas.iterator) + logReplay.append(attemptVersion, newDomainMetadatas.iterator) + // We don't truncate the set of DomainMetadata actions. Instead, we either store all of them or + // none of them. The advantage of this is that you can then determine presence based on the + // checksum, i.e. if the checksum contains domain metadatas but it doesn't contain the one you + // are looking for, then it's not there. + // + // It's also worth noting that we can distinguish "no domain metadatas" versus + // "domain metadatas not stored" as [[Some]] vs. [[None]]. + Some(logReplay.getDomainMetadatas.toSeq).filter(_.size <= threshold) + } } object RecordChecksum { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 2b85b2c73d..e2eb511cbb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -513,17 +513,29 @@ class Snapshot( */ def computeChecksum: VersionChecksum = VersionChecksum( txnId = None, - tableSizeBytes = sizeInBytes, - numFiles = numOfFiles, - numMetadata = numOfMetadata, - numProtocol = numOfProtocol, inCommitTimestampOpt = getInCommitTimestampOpt, - setTransactions = checksumOpt.flatMap(_.setTransactions), - domainMetadata = checksumOpt.flatMap(_.domainMetadata), metadata = metadata, protocol = protocol, - histogramOpt = checksumOpt.flatMap(_.histogramOpt), - allFiles = checksumOpt.flatMap(_.allFiles)) + allFiles = checksumOpt.flatMap(_.allFiles), + tableSizeBytes = checksumOpt.map(_.tableSizeBytes).getOrElse(sizeInBytes), + numFiles = checksumOpt.map(_.numFiles).getOrElse(numOfFiles), + numMetadata = checksumOpt.map(_.numMetadata).getOrElse(numOfMetadata), + numProtocol = checksumOpt.map(_.numProtocol).getOrElse(numOfProtocol), + // Only return setTransactions and domainMetadata if they are either already present + // in the checksum or if they have already been computed in the current snapshot. + setTransactions = checksumOpt.flatMap(_.setTransactions) + .orElse { + Option.when(_computedStateTriggered && + // Only extract it from the current snapshot if set transaction + // writes are enabled. + spark.conf.get(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC)) { + setTransactions + } + }, + domainMetadata = checksumOpt.flatMap(_.domainMetadata) + .orElse(Option.when(_computedStateTriggered)(domainMetadata)), + histogramOpt = checksumOpt.flatMap(_.histogramOpt) + ) /** Returns the data schema of the table, used for reading stats */ def tableSchema: StructType = metadata.dataSchema @@ -704,6 +716,7 @@ class DummySnapshot( override protected lazy val computedState: SnapshotState = initialState(metadata, protocol) override protected lazy val getInCommitTimestampOpt: Option[Long] = None + _computedStateTriggered = true // The [[InitialSnapshot]] is not backed by any external commit-coordinator. override val tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] = None diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 3340422f05..059c3c5400 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -1271,6 +1271,7 @@ trait SnapshotManagement { self: DeltaLog => // a checksum based on state reconstruction. Disable incremental commit to avoid // further error triggers in this session. spark.sessionState.conf.setConf(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED, false) + spark.sessionState.conf.setConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC, false) return createSnapshotAfterCommit( initSegment, newChecksumOpt = None, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala index 3a80bdb278..33fb47993d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala @@ -65,6 +65,9 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => // For implicits which re-use Encoder: import implicits._ + /** Whether computedState is already computed or not */ + @volatile protected var _computedStateTriggered: Boolean = false + /** A map to look up transaction version by appId. */ lazy val transactions: Map[String, Long] = setTransactions.map(t => t.appId -> t.version).toMap @@ -114,6 +117,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => throw DeltaErrors.actionNotFoundException("metadata", version) } + _computedStateTriggered = true _computedState } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index fffd59e968..8983053f7f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1073,6 +1073,31 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_WRITE_SET_TRANSACTIONS_IN_CRC = + buildConf("setTransactionsInCrc.writeOnCommit") + .internal() + .doc("When enabled, each commit will incrementally compute and cache all SetTransaction" + + " actions in the .crc file. Note that this only happens when incremental commits" + + s" are enabled (${INCREMENTAL_COMMIT_ENABLED.key})") + .booleanConf + .createWithDefault(true) + + val DELTA_MAX_SET_TRANSACTIONS_IN_CRC = + buildConf("setTransactionsInCrc.maxAllowed") + .internal() + .doc("Threshold of the number of SetTransaction actions below which this optimization" + + " should be enabled") + .longConf + .createWithDefault(100) + + val DELTA_MAX_DOMAIN_METADATAS_IN_CRC = + buildConf("domainMetadatasInCrc.maxAllowed") + .internal() + .doc("Threshold of the number of DomainMetadata actions below which this optimization" + + " should be enabled") + .longConf + .createWithDefault(10) + val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED = buildConf("checkpoint.exceptionThrowing.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaIncrementalSetTransactionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaIncrementalSetTransactionsSuite.scala new file mode 100644 index 0000000000..c780f2bb5f --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaIncrementalSetTransactionsSuite.scala @@ -0,0 +1,450 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File +import java.util.UUID + +// scalastyle:off import.ordering.noEmptyLine +import com.databricks.spark.util.UsageRecord +import org.apache.spark.sql.delta.DeltaTestUtils.{collectUsageLogs, createTestAddFile, BOOLEAN_DOMAIN} +import org.apache.spark.sql.delta.actions.{AddFile, SetTransaction, SingleAction} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} + +import org.apache.spark.sql.{QueryTest, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.SharedSparkSession + +class DeltaIncrementalSetTransactionsSuite + extends QueryTest + with DeltaSQLCommandTest + with SharedSparkSession { + + protected override def sparkConf = super.sparkConf + .set(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key, "true") + .set(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key, "true") + // needed for DELTA_WRITE_SET_TRANSACTIONS_IN_CRC + .set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true") + // This test suite is sensitive to stateReconstruction we do at different places. So we disable + // [[INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS]] to simulate prod behaviour. + .set(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key, "false") + + /** + * Validates the result of [[Snapshot.setTransactions]] API for the latest snapshot of this + * [[DeltaLog]]. + */ + private def assertSetTransactions( + deltaLog: DeltaLog, + expectedTxns: Map[String, Long], + viaCRC: Boolean = false + ): Unit = { + val snapshot = deltaLog.update() + if (viaCRC) { + assert(snapshot.checksumOpt.flatMap(_.setTransactions).isDefined) + snapshot.checksumOpt.flatMap(_.setTransactions).foreach { setTxns => + assert(setTxns.map(txn => (txn.appId, txn.version)).toMap === expectedTxns) + } + } + assert(snapshot.setTransactions.map(txn => (txn.appId, txn.version)).toMap === expectedTxns) + assert(snapshot.numOfSetTransactions === expectedTxns.size) + assert(expectedTxns === snapshot.transactions) + } + + + /** Commit given [[SetTransaction]] to `deltaLog`` */ + private def commitSetTxn( + deltaLog: DeltaLog, appId: String, version: Long, lastUpdated: Long): Unit = { + commitSetTxn(deltaLog, Seq(SetTransaction(appId, version, Some(lastUpdated)))) + } + + /** Commit given [[SetTransaction]]s to `deltaLog`` */ + private def commitSetTxn( + deltaLog: DeltaLog, + setTransactions: Seq[SetTransaction]): Unit = { + deltaLog.startTransaction().commit( + setTransactions :+ + AddFile( + s"file-${UUID.randomUUID().toString}", + partitionValues = Map.empty, + size = 1L, + modificationTime = 1L, + dataChange = true), + DeltaOperations.Write(SaveMode.Append) + ) + } + + test( + "set-transaction tracking starts from 0th commit in CRC" + ) { + withSQLConf( + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true" + ) { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl USING delta as SELECT 1 as value") // 0th commit + val log = DeltaLog.forTable(spark, TableIdentifier(tbl)) + log.update() + // CRC for 0th commit has SetTransactions defined and are empty Seq. + assert(log.unsafeVolatileSnapshot.checksumOpt.flatMap(_.setTransactions).isDefined) + assert(log.unsafeVolatileSnapshot.checksumOpt.flatMap(_.setTransactions).get.isEmpty) + assertSetTransactions(log, expectedTxns = Map()) + + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) // 1st commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 1)) + commitSetTxn(log, "app-1", version = 3, lastUpdated = 2) // 2nd commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3)) + commitSetTxn(log, "app-2", version = 100, lastUpdated = 3) // 3rd commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3, "app-2" -> 100)) + commitSetTxn(log, "app-1", version = 4, lastUpdated = 4) // 4th commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100)) + + // 5th commit - Commit multiple [[SetTransaction]] in single commit + commitSetTxn( + log, + setTransactions = Seq( + SetTransaction("app-1", version = 100, lastUpdated = Some(4)), + SetTransaction("app-3", version = 300, lastUpdated = Some(4)) + )) + assertSetTransactions( + log, + expectedTxns = Map("app-1" -> 100, "app-2" -> 100, "app-3" -> 300)) + } + } + } + + test("set-transaction tracking starts for old tables after new commits") { + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "false") { + val tbl = "test_table" + withTable(tbl) { + // Create a table with feature disabled. So 0th/1st commit won't do SetTransaction + // tracking in CRC. + sql(s"CREATE TABLE $tbl USING delta as SELECT 1 as value") // 0th commit + + def deltaLog: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + + assert(deltaLog.update().checksumOpt.get.setTransactions.isEmpty) + commitSetTxn(deltaLog, "app-1", version = 1, lastUpdated = 1) // 1st commit + assert(deltaLog.update().checksumOpt.get.setTransactions.isEmpty) + + // Enable the SetTransaction tracking config and do more commits in the table. + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + DeltaLog.clearCache() + commitSetTxn(deltaLog, "app-1", version = 2, lastUpdated = 2) // 2nd commit + // By default, commit doesn't trigger stateReconstruction and so the + // incremental CRC won't have setTransactions present until `setTransactions` API is + // explicitly invoked before the commit. + assert(deltaLog.update().checksumOpt.get.setTransactions.isEmpty) // crc has no set-txn + assertSetTransactions(deltaLog, expectedTxns = Map("app-1" -> 2), viaCRC = false) + DeltaLog.clearCache() + + // Do commit after forcing computeState. Now SetTransaction tracking will start. + deltaLog.snapshot.setTransactions // This triggers computeState. + commitSetTxn(deltaLog, "app-2", version = 100, lastUpdated = 3) // 3rd commit + assert(deltaLog.update().checksumOpt.get.setTransactions.nonEmpty) // crc has set-txn + } + } + } + } + + test("validate that crc doesn't contain SetTransaction when tracking is disabled") { + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "false") { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + val log = DeltaLog.forTable(spark, TableIdentifier(tbl)) + // CRC for 0th commit should not have SetTransactions defined if conf is disabled. + assert(log.unsafeVolatileSnapshot.checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map(), viaCRC = false) + + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 1), viaCRC = false) + + commitSetTxn(log, "app-1", version = 3, lastUpdated = 2) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3), viaCRC = false) + + commitSetTxn(log, "app-2", version = 100, lastUpdated = 3) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3, "app-2" -> 100), viaCRC = false) + + commitSetTxn(log, "app-1", version = 4, lastUpdated = 4) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100), viaCRC = false) + } + } + } + + for(computeStatePreloaded <- BOOLEAN_DOMAIN) { + test("set-transaction tracking should start if computeState is pre-loaded before" + + s" commit [computeState preloaded: $computeStatePreloaded]") { + + // Enable INCREMENTAL COMMITS and disable verification - to make sure that we + // don't trigger state reconstruction after a commit. + withSQLConf( + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "false", + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "false" + ) { + val tbl = "test_table" + + def log: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + // After 0th commit - CRC shouldn't have SetTransactions as feature is disabled. + assertSetTransactions(log, expectedTxns = Map(), viaCRC = false) + + DeltaLog.clearCache() + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + // During 1st commit, the feature is enabled. But still the new commit crc shouldn't + // contain the [[SetTransaction]] actions as we don't have an estimate of how many + // [[SetTransaction]] actions might be already part of this table till now. + // So incremental computation of [[SetTransaction]] won't trigger. + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + + if (computeStatePreloaded) { + // Calling `validateChecksum` will pre-load the computeState + log.update().validateChecksum() + } + // During 2nd commit, we have following 2 cases: + // 1. If `computeStatePreloaded` is set, then the Snapshot has already calculated + // computeState and so we have estimate of number of SetTransactions till this point. + // So next commit will trigger incremental computation of [[SetTransaction]]. + // 2. If `computeStatePreloaded` is not set, then Snapshot doesn't have computeState + // pre-computed. So next commit will not trigger incremental computation of + // [[SetTransaction]]. + commitSetTxn(log, "app-1", version = 100, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).nonEmpty === + computeStatePreloaded) + } + } + } + } + } + + test("set-transaction tracking in CRC should stop once threshold is crossed") { + withSQLConf( + DeltaSQLConf.DELTA_MAX_SET_TRANSACTIONS_IN_CRC.key -> "2", + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + def log: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + assertSetTransactions(log, expectedTxns = Map()) + + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 1)) + + commitSetTxn(log, "app-1", version = 3, lastUpdated = 2) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3)) + + commitSetTxn(log, "app-2", version = 100, lastUpdated = 3) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions( + log, expectedTxns = Map("app-1" -> 3, "app-2" -> 100)) + + commitSetTxn(log, "app-1", version = 4, lastUpdated = 4) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions( + log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100)) + + commitSetTxn(log, "app-3", version = 1000, lastUpdated = 5) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions( + log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100, "app-3" -> 1000), viaCRC = false) + } + } + } + + test("set-transaction tracking in CRC should stop once setTxn retention conf is set") { + withSQLConf( + DeltaSQLConf.DELTA_MAX_SET_TRANSACTIONS_IN_CRC.key -> "2", + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + def log: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + + // Do 1 commit to table - set-transaction tracking continue to happen. + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + + // Set any random table property - set-transaction tracking continue to happen. + sql(s"ALTER TABLE $tbl SET TBLPROPERTIES ('randomProp1' = 'value1')") + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + + // Set the `setTransactionRetentionDuration` table property - set-transaction tracking will + // stop. + sql(s"ALTER TABLE $tbl SET TBLPROPERTIES " + + s"('delta.setTransactionRetentionDuration' = 'interval 1 days')") + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + log.update().setTransactions + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + + } + } + } + + for(checksumVerificationFailureIsFatal <- BOOLEAN_DOMAIN) { + // In this test we check that verification failed usage-logs are triggered when + // there is an issue in incremental computation and verification is explicitly enabled. + test("incremental set-transaction verification failures" + + s" [checksumVerificationFailureIsFatal: $checksumVerificationFailureIsFatal]") { + withSQLConf( + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true", + // Enable verification explicitly as it is disabled by default. + DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> true.toString, + DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> s"$checksumVerificationFailureIsFatal" + ) { + withTempDir { tempDir => + // Procedure: + // 1. Populate the table with 2 [[SetTransaction]]s and create a checkpoint, validate that + // CRC has setTransactions present. + // 2. Intentionally corrupt the checkpoint - Remove one SetTransaction from it. + // 3. Clear the delta log cache so we pick up the checkpoint + // 4. Start a new transaction and attempt to commit the transaction + // a. Incremental SetTransaction verification should fail + // b. Post-commit snapshot should have checksumOpt with no [[SetTransaction]]s + + // Step-1 + val txn0 = SetTransaction("app-0", version = 1, lastUpdated = Some(1)) + val txn1 = SetTransaction("app-1", version = 888, lastUpdated = Some(2)) + + def log: DeltaLog = DeltaLog.forTable(spark, tempDir) + + // commit-0 + val actions0 = + (1 to 10).map(i => createTestAddFile(encodedPath = i.toString)) :+ txn0 + log.startTransaction().commitWriteAppend(actions0: _*) + // commit-1 + val actions1 = + (11 to 20).map(i => createTestAddFile(encodedPath = i.toString)) :+ txn1 + log.startTransaction().commitWriteAppend(actions1: _*) + assert(log.readChecksum(version = 1).get.setTransactions.nonEmpty) + log.checkpoint() + + // Step-2 + dropOneSetTransactionFromCheckpoint(log) + + // Step-3 + DeltaLog.clearCache() + assert(!log.update().logSegment.checkpointProvider.isEmpty) + + // Step-4 + // Create the txn with [[DELTA_CHECKSUM_MISMATCH_IS_FATAL]] as false so that pre-commit + // CRC validation doesn't fail. Our goal is to capture that post-commit verification + // catches any issues. + var txn: OptimisticTransactionImpl = null + withSQLConf(DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false") { + txn = log.startTransaction() + } + val Seq(corruptionReport) = collectSetTransactionCorruptionReport { + if (checksumVerificationFailureIsFatal) { + val e = intercept[DeltaIllegalStateException] { + withSQLConf(DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true") { + txn.commit(Seq(), DeltaOperations.Write(SaveMode.Append)) + } + } + assert(e.getMessage.contains("SetTransaction mismatch")) + } else { + txn.commit(Seq(), DeltaOperations.Write(SaveMode.Append)) + } + } + val eventData = JsonUtils.fromJson[Map[String, Any]](corruptionReport.blob) + + val expectedErrorEventData = Map( + "unmatchedSetTransactionsCRC" -> Seq(txn1), + "unmatchedSetTransactionsComputedState" -> Seq.empty, + "version" -> 2, + "minSetTransactionRetentionTimestamp" -> None, + "repeatedEntriesForSameAppId" -> Seq.empty, + "exactMatchFailed" -> true) + + val observedMismatchingFields = eventData("mismatchingFields").asInstanceOf[Seq[String]] + val observedErrorMessage = eventData("error").asInstanceOf[String] + val observedDetailedErrorMap = + eventData("detailedErrorMap").asInstanceOf[Map[String, String]] + assert(observedMismatchingFields === Seq("setTransactions")) + assert(observedErrorMessage.contains("SetTransaction mismatch")) + assert(observedDetailedErrorMap("setTransactions") === + JsonUtils.toJson(expectedErrorEventData)) + + if (checksumVerificationFailureIsFatal) { + // Due to failure, post-commit snapshot couldn't be updated + assert(log.snapshot.version === 1) + assert(log.readChecksum(version = 2).isEmpty) + } else { + assert(log.snapshot.version === 2) + assert(log.readChecksum(version = 2).get.setTransactions.isEmpty) + } + } + } + } + } + + /** Drops one [[SetTransaction]] operation from checkpoint - the one with max appId */ + private def dropOneSetTransactionFromCheckpoint(log: DeltaLog): Unit = { + import testImplicits._ + val checkpointPath = FileNames.checkpointFileSingular(log.logPath, log.snapshot.version) + withTempDir { tmpCheckpoint => + // count total rows in checkpoint + val checkpointDf = spark.read + .schema(SingleAction.encoder.schema) + .parquet(checkpointPath.toString) + val initialActionCount = checkpointDf.count().toInt + val corruptedCheckpointData = checkpointDf + .orderBy(col("txn.appId").asc_nulls_first) // force non setTransaction actions to front + .as[SingleAction].take(initialActionCount - 1) // Drop 1 action + + corruptedCheckpointData.toSeq.toDS().coalesce(1).write + .mode("overwrite").parquet(tmpCheckpoint.toString) + assert(spark.read.parquet(tmpCheckpoint.toString).count() === initialActionCount - 1) + val writtenCheckpoint = + tmpCheckpoint.listFiles().toSeq.filter(_.getName.startsWith("part")).head + val checkpointFile = new File(checkpointPath.toUri) + new File(log.logPath.toUri).listFiles().toSeq.foreach { file => + if (file.getName.startsWith(".0")) { + // we need to delete checksum files, otherwise trying to replace our incomplete + // checkpoint file fails due to the LocalFileSystem's checksum checks. + assert(file.delete(), "Failed to delete checksum file") + } + } + assert(checkpointFile.delete(), "Failed to delete old checkpoint") + assert(writtenCheckpoint.renameTo(checkpointFile), + "Failed to rename corrupt checkpoint") + val newCheckpoint = spark.read.parquet(checkpointFile.toString) + assert(newCheckpoint.count() === initialActionCount - 1, + "Checkpoint file incorrect:\n" + newCheckpoint.collect().mkString("\n")) + } + } + + private def collectSetTransactionCorruptionReport(f: => Unit): Seq[UsageRecord] = { + collectUsageLogs("delta.checksum.invalid")(f).toSeq + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala index e431e08081..027c9c3eb3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala @@ -89,6 +89,16 @@ class DomainMetadataSuite sortByDomain(deltaTable.snapshot.domainMetadata)) } + DeltaLog.clearCache() + deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + val checksumOpt = deltaTable.snapshot.checksumOpt + if (doChecksum) { + assertEquals( + sortByDomain(checksumOpt.get.domainMetadata.get), sortByDomain(domainMetadata)) + } else { + assert(checksumOpt.isEmpty) + } + assert(deltaLog.update().validateChecksum()) } } } @@ -150,6 +160,108 @@ class DomainMetadataSuite } } + test("DomainMetadata actions tracking in CRC should stop once threshold is crossed") { + def assertDomainMetadatas( + deltaLog: DeltaLog, + expectedDomainMetadatas: Seq[DomainMetadata], + expectedInCrc: Boolean): Unit = { + val snapshot = deltaLog.update() + assert(snapshot.validateChecksum()) + assertEquals(sortByDomain(expectedDomainMetadatas), sortByDomain(snapshot.domainMetadata)) + assert(snapshot.checksumOpt.nonEmpty) + if (expectedInCrc) { + assert(snapshot.checksumOpt.get.domainMetadata.nonEmpty) + assertEquals( + sortByDomain(expectedDomainMetadatas), + sortByDomain(snapshot.checksumOpt.get.domainMetadata.get)) + } else { + assert(snapshot.checksumOpt.get.domainMetadata.isEmpty) + } + } + + val table = "testTable" + withSQLConf( + DeltaSQLConf.DELTA_MAX_DOMAIN_METADATAS_IN_CRC.key -> "2") { + withTable(table) { + sql( + s""" + | CREATE TABLE $table(id int) USING delta + | tblproperties + | ('${TableFeatureProtocolUtils.propertyKey(DomainMetadataTableFeature)}' = 'enabled') + |""".stripMargin) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + assertDomainMetadatas(deltaLog, Seq.empty, true) + + deltaLog + .startTransaction() + .commit(DomainMetadata("testDomain1", "", false) :: Nil, Truncate()) + assertDomainMetadatas( + deltaLog, DomainMetadata("testDomain1", "", false) :: Nil, true) + + deltaLog + .startTransaction() + .commit(DomainMetadata("testDomain2", "", false) :: Nil, Truncate()) + assertDomainMetadatas( + deltaLog, + DomainMetadata("testDomain1", "", false) :: + DomainMetadata("testDomain2", "", false) :: Nil, + true) + + deltaLog + .startTransaction() + .commit(DomainMetadata("testDomain3", "", false) :: Nil, Truncate()) + assertDomainMetadatas( + deltaLog, + DomainMetadata("testDomain1", "", false) :: + DomainMetadata("testDomain2", "", false) :: + DomainMetadata("testDomain3", "", false) :: Nil, + false) + } + } + } + + test("Validate crc can be read when domainMetadata is missing") { + val table = "testTable" + withTable(table) { + sql( + s""" + | CREATE TABLE $table(id int) USING delta + | tblproperties + | ('${TableFeatureProtocolUtils.propertyKey(DomainMetadataTableFeature)}' = 'enabled') + |""".stripMargin) + val deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + val deltaLog = deltaTable.deltaLog + val version = + deltaTable + .startTransactionWithInitialSnapshot() + .commit(DomainMetadata("testDomain1", "", false) :: Nil, Truncate()) + val snapshot = deltaLog.update() + assert(snapshot.checksumOpt.nonEmpty) + assert(snapshot.checksumOpt.get.domainMetadata.nonEmpty) + val originalChecksum = snapshot.checksumOpt.get + + // Write out a checksum without domainMetadata. + val checksumWithoutDomainMetadata = originalChecksum.copy(domainMetadata = None) + val writer = CheckpointFileManager.create(deltaLog.logPath, deltaLog.newDeltaHadoopConf()) + val toWrite = JsonUtils.toJson(checksumWithoutDomainMetadata) + "\n" + val stream = writer.createAtomic( + FileNames.checksumFile(deltaLog.logPath, version + 1), + overwriteIfPossible = false) + stream.write(toWrite.getBytes(UTF_8)) + stream.close() + + // Make sure the read is not broken. + val content = + deltaLog + .store + .read( + FileNames.checksumFile(deltaLog.logPath, version + 1), + deltaLog.newDeltaHadoopConf()) + val checksumFromFile = JsonUtils.mapper.readValue[VersionChecksum](content.head) + assert(checksumWithoutDomainMetadata == checksumFromFile) + } + } + test("DomainMetadata action survives state reconstruction [w/o checkpoint, w/o checksum]") { validateStateReconstructionHelper(doCheckpoint = false, doChecksum = false)