From d9a18f7cc1bcb115307bb5c85fdbe558c1d06584 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Mon, 11 Nov 2024 10:49:20 -0800 Subject: [PATCH] Parallel calls --- build.sbt | 1 + .../CommitCoordinatorClient.scala | 5 + .../InMemoryUCCommitCoordinator.scala | 291 ++++++ .../UCCommitCoordinatorBuilder.scala | 242 +++++ .../sql/delta/logging/DeltaLogKeys.scala | 1 + .../coordinatedcommits/InMemoryUCClient.scala | 94 ++ .../UCCommitCoordinatorBuilderSuite.scala | 289 ++++++ .../UCCommitCoordinatorClientSuite.scala | 360 +++++++ .../CommitLimitReachedException.java | 28 + .../InvalidTargetTableException.java | 30 + .../commit/uccommitcoordinator/UCClient.java | 129 +++ .../UCCommitCoordinatorClient.java | 937 ++++++++++++++++++ .../UCCommitCoordinatorException.java | 26 + .../UCCoordinatedCommitsUsageLogs.java | 39 + .../UCRestClientPayload.java | 238 +++++ .../UCTokenBasedRestClient.java | 293 ++++++ .../UpgradeNotAllowedException.java | 28 + 17 files changed, 3031 insertions(+) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCCommitCoordinator.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/CommitLimitReachedException.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/InvalidTargetTableException.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorException.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCoordinatedCommitsUsageLogs.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCRestClientPayload.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UpgradeNotAllowedException.java diff --git a/build.sbt b/build.sbt index 54d8503686e..65a6a0a9a93 100644 --- a/build.sbt +++ b/build.sbt @@ -435,6 +435,7 @@ lazy val spark = (project in file("spark")) "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", + "org.mockito" % "mockito-core" % "4.11.0" % "test", ), Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++ listPythonFiles(baseDirectory.value.getParentFile / "python"), diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CommitCoordinatorClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CommitCoordinatorClient.scala index 8149c1d7b37..6b21b7a52ef 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CommitCoordinatorClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CommitCoordinatorClient.scala @@ -100,7 +100,12 @@ object CommitCoordinatorProvider { nameToBuilderMapping.retain((k, _) => initialCommitCoordinatorNames.contains(k)) } + private[delta] def clearAllBuilders(): Unit = synchronized { + nameToBuilderMapping.clear() + } + private val initialCommitCoordinatorBuilders = Seq[CommitCoordinatorBuilder]( + UCCommitCoordinatorBuilder, new DynamoDBCommitCoordinatorClientBuilder() ) initialCommitCoordinatorBuilders.foreach(registerBuilder) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCCommitCoordinator.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCCommitCoordinator.scala new file mode 100644 index 00000000000..b27d053a9bf --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCCommitCoordinator.scala @@ -0,0 +1,291 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.coordinatedcommits + +import java.io.IOException +import java.net.URI +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.delta.DeltaTableUtils +import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.util.FileNames +import io.delta.storage.commit.{ + Commit => JCommit, + CommitFailedException => JCommitFailedException, + GetCommitsResponse => JGetCommitsResponse +} +import io.delta.storage.commit.uccommitcoordinator.{CommitLimitReachedException => JCommitLimitReachedException, InvalidTargetTableException => JInvalidTargetTableException} +import org.apache.hadoop.fs.{FileStatus, Path} + + +final object UCCoordinatedCommitsRequestType extends Enumeration { + type UCCoordinatedCommitsRequestType = Value + val COMMIT = Value + val GET_COMMITS = Value +} + +/** + * A mock UC commit coordinator for testing purposes. + */ +class InMemoryUCCommitCoordinator { + + /** + * Represents the data associated with a table. + * `ucCommits` mimics the underlying list for the commit files. + */ + private class PerTableData(val path: URI) { + + /** + * Represents a UC commit record. + * @param commit represents the commit itself. + * @param isBackfilled represents whether the commit is backfilled or not. + */ + private case class UCCommit( + commit: JCommit, + isBackfilled: Boolean = false) { + /** Version of the underlying commit file */ + val version: Long = commit.getVersion + } + + /** Underlying storage of UC commit records */ + private val ucCommits: mutable.ArrayBuffer[UCCommit] = mutable.ArrayBuffer.empty + + /** RWLock to protect the commitsMap */ + val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock() + + /** + * Returns the last ratified commit version for the table. + * If no commits have been done from commit-coordinator yet, returns -1. + */ + def lastRatifiedCommitVersion: Long = ucCommits.lastOption.map(_.version).getOrElse(-1L) + + /** + * Returns true if: + * - the table has ratified any commit, and + */ + def isActive: Boolean = { + ucCommits.nonEmpty + } + + /** Appends a commit to the table's commit history */ + def appendCommit( + commit: JCommit + ): Unit = { + ucCommits += UCCommit(commit) + } + + /** Removes all commits until the given version (inclusive) */ + def removeCommitsUntilVersion(version: Long): Unit = { + val toRemove = ucCommits.takeWhile(_.version <= version) + ucCommits --= toRemove + } + + /** Marks the last commit as backfilled */ + def markLastCommitBackfilled(): Unit = { + ucCommits.lastOption.foreach { lastUCCommit => + ucCommits.update(ucCommits.size - 1, lastUCCommit.copy(isBackfilled = true)) + } + } + + /** + * Returns the unbackfilled commits in the given range. + * If `startVersion` is not provided, the first commit is used. + * If `endVersion` is not provided, the last commit is used. + */ + def getCommits(startVersion: Option[Long], endVersion: Option[Long]): Seq[JCommit] = { + val effectiveStartVersion = startVersion.getOrElse(0L) + val effectiveEndVersion = endVersion.getOrElse( + ucCommits.lastOption.map(_.version).getOrElse(return Seq.empty)) + // Collect unbackfilled `Commit`s from the `UCCommit`s in the range. + ucCommits.filter(c => + effectiveStartVersion <= c.version && c.version <= effectiveEndVersion && !c.isBackfilled + ).map(_.commit).toSeq + } + } + + /** + * Variable to allow to control the behavior of the InMemoryUCCommitCoordinator + * externally. If set to true, the coordinator will throw an IOException after + * a successful commit. This will be reset to false once the exception has been + * thrown. + */ + var throwIOExceptionAfterCommit: Boolean = false + + /** + * Variable to allow to control the behavior of the InMemoryUCCommitCoordinator + * externally. If set to true, the coordinator will throw an IOException before + * persisting a commit to the in memory map. This will be reset to false once the + * exception has been thrown. + */ + var throwIOExceptionBeforeCommit: Boolean = false + + /** The maximum number of unbackfilled commits this commit coordinator can store at a time */ + private val MAX_NUM_COMMITS = 10 + + /** + * Map from table UUID to the data associated with the table. + * Mimics the underlying storage for the commit files of different tables. + */ + private val perTableMap = new ConcurrentHashMap[UUID, PerTableData]() + + /** Performs the given operation with lock acquired on the table entry */ + private def withLock[T](tableUUID: UUID, writeLock: Boolean = false)(operation: => T): T = { + val tableData = Option(perTableMap.get(tableUUID)).getOrElse { + throw new IllegalArgumentException(s"Unknown table $tableUUID.") + } + val lock = if (writeLock) tableData.lock.writeLock() else tableData.lock.readLock() + lock.lock() + try { + operation + } finally { + lock.unlock() + } + } + + private def validateTableURI( + srcTable: URI, + targetTable: URI, + request: UCCoordinatedCommitsRequestType.UCCoordinatedCommitsRequestType): Unit = { + if (srcTable != targetTable) { + val errorMsg = s"Source table $srcTable and targetTable $targetTable do not match for " + + s"$request" + throw new JInvalidTargetTableException(errorMsg) + } + } + + // scalastyle:off argcount + def commitToCoordinator( + tableId: String, + tableUri: URI, + commitFileName: Option[String] = None, + commitVersion: Option[Long] = None, + commitFileSize: Option[Long] = None, + commitFileModTime: Option[Long] = None, + commitTimestamp: Option[Long] = None, + lastKnownBackfilledVersion: Option[Long] = None, + isDisownCommit: Boolean = false, + protocolOpt: Option[Protocol] = None, + metadataOpt: Option[Metadata] = None): Unit = { + // either commitFileName or backfilledUntil (or both) need to be set + require(commitFileName.nonEmpty || lastKnownBackfilledVersion.nonEmpty) + val tableUUID = UUID.fromString(tableId) + // if this is the first commit, we just accept it + val path = Option(perTableMap.get(tableUUID)).map(_.path).getOrElse { + // The first commit has to be an actual commit and not just a backfill-only + // request so commitVersion.get is accessible. + require(commitVersion.nonEmpty) + // Register the table with the commit coordinator. + perTableMap.putIfAbsent(tableUUID, new PerTableData(tableUri)) + tableUri + } + + commitFileName.foreach { fileName => + // ensure that all other necessary parameters are provided + require(commitVersion.nonEmpty) + require(commitFileSize.nonEmpty) + require(commitFileModTime.nonEmpty) + require(commitTimestamp.nonEmpty) + validateTableURI(path, tableUri, UCCoordinatedCommitsRequestType.COMMIT) + + // Check that there is still space in the commit coordinator. + val currentNumCommits = getCommitsFromCoordinator( + tableId, tableUri, startVersion = None, endVersion = None).getCommits.size + if (currentNumCommits == MAX_NUM_COMMITS) { + val errorMsg = s"Too many unbackfilled commits for $tableId. Cannot " + + s"store more than $MAX_NUM_COMMITS commits" + throw new JCommitLimitReachedException(errorMsg) + } + + if (throwIOExceptionBeforeCommit) { + throwIOExceptionBeforeCommit = false + throw new IOException("Problem before comitting") + } + // Store the commit. For the InMemoryUCCommit coordinator, we concatenate the full commit path + // here already so that we don't have to do it during getCommits. + val basePath = FileNames.commitDirPath( + DeltaTableUtils.safeConcatPaths(new Path(tableUri), "_delta_log")) + val commitFilePath = new Path(basePath, fileName) + val fileStatus = new FileStatus( + commitFileSize.get, false, 0, 0, commitFileModTime.get, commitFilePath) + withLock(tableUUID, writeLock = true) { + val tableData = perTableMap.get(tableUUID) + // We only check the expected version matches the commit version if the table is active. + // If the table was just registered, the check is not necessary. + if (tableData.isActive) { + val expectedVersion = tableData.lastRatifiedCommitVersion + 1 + if (commitVersion.get != expectedVersion) { + throw new JCommitFailedException( + commitVersion.get < expectedVersion, + commitVersion.get < expectedVersion, + s"Commit version ${commitVersion.get} is not valid. " + + s"Expected version: $expectedVersion.") + } + } + tableData.appendCommit( + new JCommit(commitVersion.get, fileStatus, commitTimestamp.get) + ) + } + } + if (throwIOExceptionAfterCommit) { + throwIOExceptionAfterCommit = false + throw new IOException("Problem after comitting") + } + + // Register any backfills. + lastKnownBackfilledVersion.foreach { backfilledUntil => + withLock(tableUUID, writeLock = true) { + val tableData = perTableMap.get(tableUUID) + val maxVersionToRemove = if (backfilledUntil == tableData.lastRatifiedCommitVersion) { + // If the backfill version is the last ratified commit version, we remove all but the + // last commit, and mark the last commit as backfilled. This is to ensure that every + // active table keeps track of at least one commit record. + tableData.markLastCommitBackfilled() + backfilledUntil - 1 + } else if (backfilledUntil < tableData.lastRatifiedCommitVersion) { + backfilledUntil + } else { + throw new IllegalArgumentException( + s"Unexpected backfill version: $backfilledUntil. " + + s"Max backfill version: ${tableData.lastRatifiedCommitVersion}") + } + tableData.removeCommitsUntilVersion(maxVersionToRemove) + } + } + } + + def getCommitsFromCoordinator( + tableId: String, + tableUri: URI, + startVersion: Option[Long], + endVersion: Option[Long]): JGetCommitsResponse = { + val tableUUID = UUID.fromString(tableId) + val path = Option(perTableMap.get(tableUUID)).map(_.path).getOrElse { + return new JGetCommitsResponse(Seq.empty.asJava, -1) + } + validateTableURI(path, tableUri, UCCoordinatedCommitsRequestType.GET_COMMITS) + withLock[JGetCommitsResponse](tableUUID) { + val tableData = perTableMap.get(tableUUID) + val commits = tableData.getCommits(startVersion, endVersion) + new JGetCommitsResponse(commits.asJava, tableData.lastRatifiedCommitVersion) + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala new file mode 100644 index 00000000000..937483d934b --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -0,0 +1,242 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.coordinatedcommits + +import java.net.{URI, URISyntaxException} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.metering.DeltaLogging +import io.delta.storage.commit.CommitCoordinatorClient +import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient, UCTokenBasedRestClient} + +import org.apache.spark.internal.MDC +import org.apache.spark.internal.MDC +import org.apache.spark.sql.SparkSession + +/** + * Builder for Unity Catalog Commit Coordinator Clients. + * + * This builder is responsible for creating and caching UCCommitCoordinatorClient instances + * based on the provided metastore IDs and catalog configurations. + * + * It caches the UCCommitCoordinatorClient instance for a given metastore ID upon its first access. + */ +object UCCommitCoordinatorBuilder extends CommitCoordinatorBuilder with DeltaLogging { + + /** Prefix for Spark SQL catalog configurations. */ + final private val SPARK_SQL_CATALOG_PREFIX = "spark.sql.catalog." + + /** Connector class name for filtering relevant Unity Catalog catalogs. */ + final private val UNITY_CATALOG_CONNECTOR_CLASS: String = "io.unitycatalog.spark.UCSingleCatalog" + + /** Suffix for the URI configuration of a catalog. */ + final private val URI_SUFFIX = "uri" + + /** Suffix for the token configuration of a catalog. */ + final private val TOKEN_SUFFIX = "token" + + /** Cache for UCCommitCoordinatorClient instances. */ + private val commitCoordinatorClientCache = + new ConcurrentHashMap[String, UCCommitCoordinatorClient]() + + /** + * Helper cache for (uri, token) to metastoreId to avoid redundant calls to getMetastoreId + * catalog. + */ + private val uriTokenToMetastoreIdCache = new ConcurrentHashMap[(String, String), String]() + + // Use a var instead of val for ease of testing by injecting different UCClientFactory. + private[delta] var ucClientFactory: UCClientFactory = UCTokenBasedRestClientFactory + + override def getName: String = "unity-catalog" + + override def build(spark: SparkSession, conf: Map[String, String]): CommitCoordinatorClient = { + val metastoreId = conf.getOrElse( + UCCommitCoordinatorClient.UC_METASTORE_ID_KEY, + throw new IllegalArgumentException( + s"UC metastore ID not found in the provided coordinator conf: $conf")) + + commitCoordinatorClientCache.computeIfAbsent( + metastoreId, + _ => new UCCommitCoordinatorClient(conf.asJava, getMatchingUCClient(spark, metastoreId)) + ) + } + + /** + * Finds and returns a UCClient that matches the given metastore ID. + * + * This method iterates through all configured catalogs in SparkSession, creates UCClients for + * each, gets their metastore ID and returns the one that matches the provided metastore ID. + * If no matching catalog is found or if multiple matching catalogs are found, it throws an + * appropriate exception. + */ + private def getMatchingUCClient(spark: SparkSession, metastoreId: String): UCClient = { + val matchingClients: List[(String, String)] = getCatalogConfigs(spark) + .map { case (name, uri, token) => (uri, token) } + .distinct // Remove duplicates since multiple catalogs can have the same uri and token + .filter { case (uri, token) => getMetastoreId(uri, token).contains(metastoreId) } + + matchingClients match { + case Nil => throw noMatchingCatalogException(metastoreId) + case (uri, token) :: Nil => ucClientFactory.createUCClient(uri, token) + case multiple => throw multipleMatchingCatalogs(metastoreId, multiple.map(_._1)) + } + } + + /** + * Retrieves the metastore ID for a given URI and token. + * + * This method creates a UCClient using the provided URI and token, then retrieves its metastore + * ID. The result is cached to avoid unnecessary getMetastoreId requests in future calls. If + * there's an error, it returns None and logs a warning. + */ + private def getMetastoreId(uri: String, token: String): Option[String] = { + try { + val metastoreId = uriTokenToMetastoreIdCache.computeIfAbsent( + (uri, token), + _ => { + val ucClient = ucClientFactory.createUCClient(uri, token) + try { + ucClient.getMetastoreId + } finally { + safeClose(ucClient, uri) + } + }) + Some(metastoreId) + } catch { + case NonFatal(e) => + logWarning(log"Failed to getMetastoreSummary with ${MDC(DeltaLogKeys.URI, uri)}", e) + None + } + } + + private def noMatchingCatalogException(metastoreId: String) = { + new IllegalStateException( + s"No matching catalog found for UC metastore ID $metastoreId. " + + "Please ensure the catalog is configured correctly by setting " + + "`spark.sql.catalog.`, `spark.sql.catalog..uri` and " + + "`spark.sql.catalog..token`. Note that the matching process involves " + + "retrieving the metastoreId using the provided `` pairs in Spark " + + "Session configs.") + } + + private def multipleMatchingCatalogs(metastoreId: String, uris: List[String]) = { + new IllegalStateException( + s"Found multiple catalogs for UC metastore ID $metastoreId at $uris. " + + "Please ensure the catalog is configured correctly by setting " + + "`spark.sql.catalog.`, `spark.sql.catalog..uri` and " + + "`spark.sql.catalog..token`. Note that the matching process involves " + + "retrieving the metastoreId using the provided `` pairs in Spark " + + "Session configs.") + } + + /** + * Retrieves the catalog configurations from the SparkSession. + * + * Example; Given Spark configurations: + * spark.sql.catalog.catalog1 = "io.unitycatalog.spark.UCSingleCatalog" + * spark.sql.catalog.catalog1.uri = "https://dbc-123abc.databricks.com" + * spark.sql.catalog.catalog1.token = "dapi1234567890" + * + * spark.sql.catalog.catalog2 = "io.unitycatalog.spark.UCSingleCatalog" + * spark.sql.catalog.catalog2.uri = "https://dbc-456def.databricks.com" + * spark.sql.catalog.catalog2.token = "dapi0987654321" + * + * spark.sql.catalog.catalog3 = "io.unitycatalog.spark.UCSingleCatalog" + * spark.sql.catalog.catalog3.uri = "https://dbc-789ghi.databricks.com" + * + * spark.sql.catalog.catalog4 = "com.someeothercatalog.sql.lakehouse.catalog3" + * spark.sql.catalog.catalog4.uri = "https://dbc-456def.someothercatalog.com" + * spark.sql.catalog.catalog4.token = "dapi0987654321" + * + * spark.sql.catalog.catalog5 = "io.unitycatalog.spark.UCSingleCatalog" + * spark.sql.catalog.catalog5.uri = "random-string" + * spark.sql.catalog.catalog5.token = "dapi0987654321" + * + * This method would return: + * List( + * ("catalog1", "https://dbc-123abc.databricks.com", "dapi1234567890"), + * ("catalog2", "https://dbc-456def.databricks.com", "dapi0987654321") + * ) + * + * Note: catalog3 is not included in the result because it's missing the token configuration. + * Note: catalog4 is not included in the result because it's not a UCSingleCatalog connector. + * Note: catalog5 is not included in the result because its URI is not a valid URI. + * + * @return + * A list of tuples containing (catalogName, uri, token) for each properly configured catalog + */ + private[delta] def getCatalogConfigs(spark: SparkSession): List[(String, String, String)] = { + val catalogConfigs = spark.conf.getAll.filterKeys(_.startsWith(SPARK_SQL_CATALOG_PREFIX)) + + catalogConfigs + .keys + .map(_.split("\\.")) + .filter(_.length == 4) + .map(_(3)) + .filter { catalogName: String => + val connector = catalogConfigs.get(s"$SPARK_SQL_CATALOG_PREFIX$catalogName") + connector.contains(UNITY_CATALOG_CONNECTOR_CLASS)} + .flatMap { catalogName: String => + val uri = catalogConfigs.get(s"$SPARK_SQL_CATALOG_PREFIX$catalogName.$URI_SUFFIX") + val token = catalogConfigs.get(s"$SPARK_SQL_CATALOG_PREFIX$catalogName.$TOKEN_SUFFIX") + (uri, token) match { + case (Some(u), Some(t)) => + try { + new URI(u) // Validate the URI + Some((catalogName, u, t)) + } catch { + case _: URISyntaxException => + logWarning(log"Skipping catalog ${MDC(DeltaLogKeys.CATALOG, catalogName)} as it " + + log"does not have a valid URI ${MDC(DeltaLogKeys.URI, u)}.") + None + } + case _ => + logWarning(log"Skipping catalog ${MDC(DeltaLogKeys.CATALOG, catalogName)} as it does " + + "not have both uri and token configured in Spark Session.") + None + }} + .toList + } + + private def safeClose(ucClient: UCClient, uri: String): Unit = { + try { + ucClient.close() + } catch { + case NonFatal(e) => + logWarning(log"Failed to close UCClient for uri ${MDC(DeltaLogKeys.URI, uri)}", e) + } + } + + def clearCache(): Unit = { + commitCoordinatorClientCache.clear() + uriTokenToMetastoreIdCache.clear() + } +} + +trait UCClientFactory { + def createUCClient(uri: String, token: String): UCClient +} + +object UCTokenBasedRestClientFactory extends UCClientFactory { + override def createUCClient(uri: String, token: String): UCClient = + new UCTokenBasedRestClient(uri, token) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala index ce8b41fdbc9..f0bcd7589c4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala @@ -48,6 +48,7 @@ trait DeltaLogKeysBase { case object APP_ID extends LogKeyShims case object BATCH_ID extends LogKeyShims case object BATCH_SIZE extends LogKeyShims + case object CATALOG extends LogKeyShims case object CLONE_SOURCE_DESC extends LogKeyShims case object CONFIG extends LogKeyShims case object CONFIG_KEY extends LogKeyShims diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala new file mode 100644 index 00000000000..273ed29c701 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala @@ -0,0 +1,94 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.coordinatedcommits + +import java.lang.{Long => JLong} +import java.net.URI +import java.util.Optional + +import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import io.delta.storage.commit.{Commit => JCommit, GetCommitsResponse => JGetCommitsResponse} +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import io.delta.storage.commit.uccommitcoordinator.UCClient + +/** + * An in-memory implementation of [[UCClient]] for testing purposes. + * This implementation simulates Unity Catalog operations without actually connecting to a remote + * service. It maintains all state in memory in [[InMemoryUCCommitCoordinator]] + * + * This class provides a lightweight way to test Delta table operations that would + * normally require interaction with the Unity Catalog. + * + * Example usage: + * {{{ + * val metastoreId = "test-metastore" + * val ucCommitCoordinator = new InMemoryUCCommitCoordinator() + * val client = new InMemoryUCClient(metastoreId, ucCommitCoordinator) + * + * // Use the client for testing + * val getCommitsResponse = client.getCommits( + * "tableId", + * new URI("tableUri"), + * Optional.empty(), + * Optional.empty()) + * }}} + * + * @param metastoreId The identifier for the simulated metastore + * @param ucCommitCoordinator The in-memory coordinator to handle commit operations + */ +class InMemoryUCClient( + metastoreId: String, + ucCommitCoordinator: InMemoryUCCommitCoordinator) extends UCClient { + + override def getMetastoreId: String = metastoreId + + override def commit( + tableId: String, + tableUri: URI, + commit: Optional[JCommit], + lastKnownBackfilledVersion: Optional[JLong], + disown: Boolean, + newMetadata: Optional[AbstractMetadata], + newProtocol: Optional[AbstractProtocol]): Unit = { + ucCommitCoordinator.commitToCoordinator( + tableId, + tableUri, + Option(commit.orElse(null)).map(_.getFileStatus.getPath.getName), + Option(commit.orElse(null)).map(_.getVersion), + Option(commit.orElse(null)).map(_.getFileStatus.getLen), + Option(commit.orElse(null)).map(_.getFileStatus.getModificationTime), + Option(commit.orElse(null)).map(_.getCommitTimestamp), + Option(lastKnownBackfilledVersion.orElse(null)).map(_.toLong), + disown, + Option(newProtocol.orElse(null)).map(_.asInstanceOf[Protocol]), + Option(newMetadata.orElse(null)).map(_.asInstanceOf[Metadata])) + } + + override def getCommits( + tableId: String, + tableUri: URI, + startVersion: Optional[JLong], + endVersion: Optional[JLong]): JGetCommitsResponse = { + ucCommitCoordinator.getCommitsFromCoordinator( + tableId, + tableUri, + Option(startVersion.orElse(null)).map(_.toLong), + Option(endVersion.orElse(null)).map(_.toLong)) + } + + override def close(): Unit = {} +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala new file mode 100644 index 00000000000..b9df4990dfb --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala @@ -0,0 +1,289 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.coordinatedcommits + +import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient} +import org.mockito.{Mock, Mockito} +import org.mockito.Mockito.{mock, never, times, verify, when} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSparkSession + +class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSession { + + @Mock + private val mockFactory: UCClientFactory = mock(classOf[UCClientFactory]) + + override def beforeEach(): Unit = { + super.beforeEach() + Mockito.reset(mockFactory) + CommitCoordinatorProvider.clearAllBuilders() + UCCommitCoordinatorBuilder.ucClientFactory = mockFactory + UCCommitCoordinatorBuilder.clearCache() + CommitCoordinatorProvider.registerBuilder(UCCommitCoordinatorBuilder) + } + + case class CatalogTestConfig( + name: String, + uri: Option[String] = None, + token: Option[String] = None, + metastoreId: Option[String] = None, + path: Option[String] = Some("io.unitycatalog.spark.UCSingleCatalog") + ) + + def setupCatalogs(configs: CatalogTestConfig*)(testCode: => Unit): Unit = { + val allConfigs = configs.flatMap { config => + Seq( + config.path.map(p => s"spark.sql.catalog.${config.name}" -> p), + config.uri.map(uri => s"spark.sql.catalog.${config.name}.uri" -> uri), + config.token.map(token => s"spark.sql.catalog.${config.name}.token" -> token) + ).flatten + } + + withSQLConf(allConfigs: _*) { + configs.foreach { config => + (config.uri, config.token, config.metastoreId) match { + case (Some(uri), Some(token), Some(id)) => + registerMetastoreId(uri, token, id) + case (Some(uri), Some(token), None) => + registerMetastoreIdException(uri, token, new RuntimeException("Invalid metastore ID")) + case _ => // Do nothing for incomplete configs + } + } + testCode + } + } + + test("build with valid configuration") { + val expectedMetastoreId = "test-metastore-id" + val catalog1 = CatalogTestConfig( + name = "catalog1", + uri = Some("https://test-uri-1.com"), + token = Some("test-token-1"), + metastoreId = Some(expectedMetastoreId) + ) + val catalog2 = CatalogTestConfig( + name = "catalog2", + uri = Some("https://test-uri-2.com"), + token = Some("test-token-2"), + metastoreId = Some("different-metastore-id") + ) + + setupCatalogs(catalog1, catalog2) { + val result = getCommitCoordinatorClient(expectedMetastoreId) + + assert(result.isInstanceOf[UCCommitCoordinatorClient]) + verify(mockFactory, times(2)).createUCClient(catalog1.uri.get, catalog1.token.get) + verify(mockFactory).createUCClient(catalog2.uri.get, catalog2.token.get) + verify(mockFactory.createUCClient(catalog1.uri.get, catalog1.token.get)) + .getMetastoreId + verify(mockFactory.createUCClient(catalog2.uri.get, catalog2.token.get)) + .getMetastoreId + verify(mockFactory.createUCClient(catalog2.uri.get, catalog2.token.get)).close() + verify(mockFactory.createUCClient(catalog1.uri.get, catalog1.token.get)).close() + } + } + + test("build with missing metastore ID") { + val exception = intercept[IllegalArgumentException] { + CommitCoordinatorProvider.getCommitCoordinatorClient( + UCCommitCoordinatorBuilder.getName, + Map.empty, + spark) + } + assert(exception.getMessage.contains("UC metastore ID not found")) + } + + test("build with no matching catalog") { + val metastoreId = "test-metastore-id" + val catalog = CatalogTestConfig( + name = "catalog", + uri = Some("https://test-uri.com"), + token = Some("test-token"), + metastoreId = Some("different-metastore-id") + ) + + setupCatalogs(catalog) { + val exception = intercept[IllegalStateException] { + getCommitCoordinatorClient(metastoreId) + } + assert(exception.getMessage.contains("No matching catalog found")) + verify(mockFactory).createUCClient(catalog.uri.get, catalog.token.get) + verify(mockFactory.createUCClient(catalog.uri.get, catalog.token.get)).getMetastoreId + verify(mockFactory.createUCClient(catalog.uri.get, catalog.token.get)).close() + } + } + + test("build with multiple matching catalogs") { + val metastoreId = "test-metastore-id" + val catalog1 = CatalogTestConfig( + name = "catalog1", + uri = Some("https://test-uri1.com"), + token = Some("test-token-1"), + metastoreId = Some(metastoreId) + ) + val catalog2 = CatalogTestConfig( + name = "catalog2", + uri = Some("https://test-uri2.com"), + token = Some("test-token-2"), + metastoreId = Some(metastoreId) + ) + + setupCatalogs(catalog1, catalog2) { + val exception = intercept[IllegalStateException] { + getCommitCoordinatorClient(metastoreId) + } + assert(exception.getMessage.contains("Found multiple catalogs")) + verify(mockFactory).createUCClient(catalog1.uri.get, catalog1.token.get) + verify(mockFactory).createUCClient(catalog2.uri.get, catalog2.token.get) + verify(mockFactory.createUCClient(catalog1.uri.get, catalog1.token.get)) + .getMetastoreId + verify(mockFactory.createUCClient(catalog2.uri.get, catalog2.token.get)) + .getMetastoreId + verify(mockFactory.createUCClient(catalog1.uri.get, catalog1.token.get)).close() + verify(mockFactory.createUCClient(catalog2.uri.get, catalog2.token.get)).close() + } + } + + test("build with mixed valid and invalid catalog configurations") { + val expectedMetastoreId = "test-metastore-id" + val validCatalog = CatalogTestConfig( + name = "valid-catalog", + uri = Some("https://valid-uri.com"), + token = Some("valid-token"), + metastoreId = Some(expectedMetastoreId) + ) + val invalidCatalog1 = CatalogTestConfig( + name = "invalid-catalog-1", + uri = Some("https://invalid-uri.com"), + token = Some("invalid-token"), + metastoreId = None + ) + val invalidCatalog2 = CatalogTestConfig( + name = "invalid-catalog-2", + uri = Some("random-uri"), + token = Some("invalid-token") + ) + val incompleteCatalog = CatalogTestConfig( + name = "incomplete-catalog", + path = None + ) + + setupCatalogs(validCatalog, invalidCatalog1, invalidCatalog2, incompleteCatalog) { + val result = getCommitCoordinatorClient(expectedMetastoreId) + + assert(result.isInstanceOf[UCCommitCoordinatorClient]) + verify(mockFactory, times(2)).createUCClient( + validCatalog.uri.get, + validCatalog.token.get + ) + verify(mockFactory.createUCClient(validCatalog.uri.get, validCatalog.token.get), + times(1)).close() + } + } + + test("build caching behavior") { + val metastoreId = "test-metastore-id" + val catalog = CatalogTestConfig( + name = "catalog", + uri = Some("https://test-uri.com"), + token = Some("test-token"), + metastoreId = Some(metastoreId) + ) + + setupCatalogs(catalog) { + val result1 = getCommitCoordinatorClient(metastoreId) + val result2 = getCommitCoordinatorClient(metastoreId) + assert(result1 eq result2) + } + } + + test("build with multiple catalogs pointing to the same URI, token, and metastore") { + val metastoreId = "shared-metastore-id" + val sharedUri = "https://shared-test-uri.com" + val sharedToken = "shared-test-token" + val catalog1 = CatalogTestConfig( + name = "catalog1", + uri = Some(sharedUri), + token = Some(sharedToken), + metastoreId = Some(metastoreId) + ) + val catalog2 = CatalogTestConfig( + name = "catalog2", + uri = Some(sharedUri), + token = Some(sharedToken), + metastoreId = Some(metastoreId) + ) + val catalog3 = CatalogTestConfig( + name = "catalog3", + uri = Some(sharedUri), + token = Some(sharedToken), + metastoreId = Some(metastoreId) + ) + + setupCatalogs(catalog1, catalog2, catalog3) { + val result = getCommitCoordinatorClient(metastoreId) + + assert(result.isInstanceOf[UCCommitCoordinatorClient]) + verify(mockFactory, times(2)).createUCClient(sharedUri, sharedToken) + verify(mockFactory.createUCClient(sharedUri, sharedToken)).getMetastoreId + verify(mockFactory.createUCClient(sharedUri, sharedToken)).close() + } + } + + test("build with a catalog having invalid path but valid URI and token") { + val metastoreId = "test-metastore-id" + val catalog = CatalogTestConfig( + name = "invalid-path-catalog", + uri = Some("https://test-uri.com"), + token = Some("test-token"), + metastoreId = Some(metastoreId), + path = Some("invalid-catalog-path") + ) + + setupCatalogs(catalog) { + assert(UCCommitCoordinatorBuilder.getCatalogConfigs(spark).isEmpty) + val e = intercept[IllegalStateException] { + getCommitCoordinatorClient(metastoreId) + } + assert(e.getMessage.contains("No matching catalog found")) + verify(mockFactory, never()).createUCClient(catalog.uri.get, catalog.token.get) + } + } + + private def registerMetastoreId(uri: String, token: String, metastoreId: String): Unit = { + val mockClient = org.mockito.Mockito.mock(classOf[UCClient]) + when(mockClient.getMetastoreId).thenReturn(metastoreId) + when(mockFactory.createUCClient(uri, token)).thenReturn(mockClient) + } + + private def registerMetastoreIdException( + uri: String, + token: String, + exception: Throwable): Unit = { + val mockClient = org.mockito.Mockito.mock(classOf[UCClient]) + when(mockClient.getMetastoreId).thenThrow(exception) + when(mockFactory.createUCClient(uri, token)).thenReturn(mockClient) + } + + private def getCommitCoordinatorClient(metastoreId: String) = { + CommitCoordinatorProvider.getCommitCoordinatorClient( + UCCommitCoordinatorBuilder.getName, + Map(UCCommitCoordinatorClient.UC_METASTORE_ID_KEY -> metastoreId), + spark) + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala new file mode 100644 index 00000000000..1218f73213b --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala @@ -0,0 +1,360 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.coordinatedcommits + +import java.io.IOException +import java.lang.{Long => JLong} +import java.net.URI +import java.util.{List => JList, Optional, UUID} + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +// scalastyle:off import.ordering.noEmptyLine +import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord} +import org.apache.spark.sql.delta.{DeltaConfigs, DeltaIllegalArgumentException, DeltaLog, LogSegment, Snapshot} +import org.apache.spark.sql.delta.CommitCoordinatorGetCommitsFailedException +import org.apache.spark.sql.delta.DeltaConfigs.{COORDINATED_COMMITS_COORDINATOR_CONF, COORDINATED_COMMITS_COORDINATOR_NAME, COORDINATED_COMMITS_TABLE_CONF} +import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile +import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.storage.LogStoreInverseAdaptor +import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import io.delta.storage.LogStore +import io.delta.storage.commit.{ + Commit => JCommit, + CommitFailedException => JCommitFailedException, + CoordinatedCommitsUtils => JCoordinatedCommitsUtils, + GetCommitsResponse => JGetCommitsResponse, + TableDescriptor, + UpdatedActions +} +import io.delta.storage.commit.uccommitcoordinator.{ + UCClient, + UCCommitCoordinatorClient, + UCCoordinatedCommitsUsageLogs} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, LocalFileSystem, Path} +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.Mockito +import org.mockito.Mockito.{mock, when} +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StringType, StructField} +import org.apache.spark.util.SystemClock + +class UCCommitCoordinatorClientSuite extends CommitCoordinatorClientImplSuiteBase +{ + + /** + * A unique table ID for each test. + */ + private var tableUUID = UUID.randomUUID().toString + + /** + * A unique metastore ID for each test. + */ + private var metastoreId = UUID.randomUUID().toString + + @Mock + private val mockFactory: UCClientFactory = mock(classOf[UCClientFactory]) + + private var ucClient: UCClient = _ + + private var ucCommitCoordinator: InMemoryUCCommitCoordinator = _ + + protected override def sparkConf = super.sparkConf + .set("spark.sql.catalog.main", "io.unitycatalog.spark.UCSingleCatalog") + .set("spark.sql.catalog.main.uri", "https://test-uri.com") + .set("spark.sql.catalog.main.token", "test-token") + .set("spark.hadoop.fs.file.impl", classOf[LocalFileSystem].getCanonicalName) + + override def beforeEach(): Unit = { + super.beforeEach() + tableUUID = UUID.randomUUID().toString + UCCommitCoordinatorClient.BACKFILL_LISTING_OFFSET = 100 + metastoreId = UUID.randomUUID().toString + DeltaLog.clearCache() + Mockito.reset(mockFactory) + CommitCoordinatorProvider.clearAllBuilders() + UCCommitCoordinatorBuilder.ucClientFactory = mockFactory + UCCommitCoordinatorBuilder.clearCache() + CommitCoordinatorProvider.registerBuilder(UCCommitCoordinatorBuilder) + ucCommitCoordinator = new InMemoryUCCommitCoordinator() + ucClient = new InMemoryUCClient(metastoreId, ucCommitCoordinator) + when(mockFactory.createUCClient(anyString(), anyString())).thenReturn(ucClient) + } + + override protected def commit( + version: Long, + timestamp: Long, + tableCommitCoordinatorClient: TableCommitCoordinatorClient): JCommit = { + val commitResult = super.commit(version, timestamp, tableCommitCoordinatorClient) + // As backfilling for UC happens after every commit asynchronously, we block here until + // the current in-progress backfill has completed in order to make tests deterministic. + eventually(timeout(10.seconds)) { + val logPath = tableCommitCoordinatorClient.logPath + val log = DeltaLog.forTable(spark, JCoordinatedCommitsUtils.getTablePath(logPath)) + val fs = logPath.getFileSystem(log.newDeltaHadoopConf()) + assert(fs.exists(FileNames.unsafeDeltaFile(logPath, version))) + } + commitResult + } + + override protected def createTableCommitCoordinatorClient( + deltaLog: DeltaLog): TableCommitCoordinatorClient = { + var commitCoordinatorClient = UCCommitCoordinatorBuilder + .build(spark, Map(UCCommitCoordinatorClient.UC_METASTORE_ID_KEY -> metastoreId)) + .asInstanceOf[UCCommitCoordinatorClient] + commitCoordinatorClient = new UCCommitCoordinatorClient( + commitCoordinatorClient.conf, + commitCoordinatorClient.ucClient) with DeltaLogging { + override def recordDeltaEvent(opType: String, data: Any, path: Path): Unit = { + data match { + case ref: AnyRef => recordDeltaEvent(null, opType = opType, data = ref, path = Some(path)) + case _ => super.recordDeltaEvent(opType, data, path) + } + } + } + commitCoordinatorClient.registerTable( + deltaLog.logPath, Optional.empty(), -1L, initMetadata(), Protocol(1, 1)) + TableCommitCoordinatorClient( + commitCoordinatorClient, + deltaLog, + Map(UCCommitCoordinatorClient.UC_TABLE_ID_KEY -> tableUUID) + ) + } + + override protected def registerBackfillOp( + tableCommitCoordinatorClient: TableCommitCoordinatorClient, + deltaLog: DeltaLog, + version: Long): Unit = { + ucClient.commit( + tableUUID, + JCoordinatedCommitsUtils.getTablePath(deltaLog.logPath).toUri, + Optional.empty(), + Optional.of(version), + false, + Optional.empty(), + Optional.empty()) + } + + override protected def validateBackfillStrategy( + tableCommitCoordinatorClient: TableCommitCoordinatorClient, + logPath: Path, + version: Long): Unit = { + val response = tableCommitCoordinatorClient.getCommits() + assert(response.getCommits.size == 1) + assert(response.getCommits.asScala.head.getVersion == version) + assert(response.getLatestTableVersion == version) + } + + protected def validateGetCommitsResult( + response: JGetCommitsResponse, + startVersion: Option[Long], + endVersion: Option[Long], + maxVersion: Long): Unit = { + val expectedVersions = endVersion.map { _ => Seq.empty }.getOrElse(Seq(maxVersion)) + assert(response.getCommits.asScala.map(_.getVersion) == expectedVersions) + assert(response.getLatestTableVersion == maxVersion) + } + + override protected def initMetadata(): Metadata = { + // Ensure that the metadata that is passed to registerTable has the + // correct table conf set. + Metadata(configuration = Map( + COORDINATED_COMMITS_TABLE_CONF.key -> + JsonUtils.toJson(Map(UCCommitCoordinatorClient.UC_TABLE_ID_KEY -> tableUUID)), + COORDINATED_COMMITS_COORDINATOR_NAME.key -> UCCommitCoordinatorBuilder.getName, + COORDINATED_COMMITS_COORDINATOR_CONF.key -> + JsonUtils.toJson(Map(UCCommitCoordinatorClient.UC_METASTORE_ID_KEY -> metastoreId)))) + } + + protected def assertUsageLogsContains(usageLogs: Seq[UsageRecord], opType: String): Unit = { + assert(usageLogs.exists { record => + record.tags.get("opType").contains(opType) + }) + } + + test("incorrect last known backfilled version") { + withTempTableDir { tempDir => + val log = DeltaLog.forTable(spark, tempDir.toString) + val logPath = log.logPath + val tableCommitCoordinatorClient = createTableCommitCoordinatorClient(log) + tableCommitCoordinatorClient.commitCoordinatorClient.registerTable( + logPath, Optional.empty(), -1L, initMetadata, Protocol(1, 1)) + // Write 11 commits. + writeCommitZero(logPath) + (1 to 10).foreach(i => commit(i, i, tableCommitCoordinatorClient)) + // Now delete some backfilled versions + val fs = logPath.getFileSystem(log.newDeltaHadoopConf()) + fs.delete(FileNames.unsafeDeltaFile(logPath, 8), false) + fs.delete(FileNames.unsafeDeltaFile(logPath, 9), false) + fs.delete(FileNames.unsafeDeltaFile(logPath, 10), false) + // Backfill with the wrong specified last version + val e = intercept[IllegalStateException] { + tableCommitCoordinatorClient.backfillToVersion(10L, Some(9L)) + } + assert(e.getMessage.contains("Last known backfilled version 9 doesn't exist")) + // Backfill with the correct version + tableCommitCoordinatorClient.backfillToVersion(10L, Some(7L)) + // Everything should be backfilled now + validateBackfillStrategy(tableCommitCoordinatorClient, logPath, 10) + } + } + + test("test getLastKnownBackfilledVersion") { + withTempTableDir { tempDir => + val backfillListingOffset = 5 + val log = DeltaLog.forTable(spark, tempDir.toString) + val logPath = log.logPath + UCCommitCoordinatorClient.BACKFILL_LISTING_OFFSET = backfillListingOffset + val tableCommitCoordinatorClient = createTableCommitCoordinatorClient(log) + tableCommitCoordinatorClient.commitCoordinatorClient.registerTable( + logPath, Optional.empty(), -1L, initMetadata, Protocol(1, 1)) + val hadoopConf = log.newDeltaHadoopConf() + val fs = logPath.getFileSystem(hadoopConf) + + writeCommitZero(logPath) + val backfillThreshold = 5 + (1 to backfillThreshold + backfillListingOffset + 5).foreach { + commitVersion => + commit(commitVersion, commitVersion, tableCommitCoordinatorClient) + if (commitVersion > backfillThreshold) { + // After x = backfillThreshold commits, delete all backfilled files to simulate + // backfill failing. This means UC should keep track of all commits starting + // from x and nothing >= x should be backfilled. + (backfillThreshold + 1 to commitVersion).foreach { deleteVersion => + fs.delete(FileNames.unsafeDeltaFile(logPath, deleteVersion), false) + } + val tableDesc = new TableDescriptor( + logPath, Optional.empty(), tableCommitCoordinatorClient.tableConf.asJava) + + val ucCommitCoordinatorClient = tableCommitCoordinatorClient.commitCoordinatorClient + .asInstanceOf[UCCommitCoordinatorClient] + assert( + ucCommitCoordinatorClient.getLastKnownBackfilledVersion( + commitVersion, + hadoopConf, + LogStoreInverseAdaptor(log.store, hadoopConf), + tableDesc + ) == backfillThreshold + ) + } + } + } + } + + test("commit-limit-reached exception handling") { + withTempTableDir { tempDir => + val log = DeltaLog.forTable(spark, tempDir.toString) + val logPath = log.logPath + // Create a client that does not register backfills to keep accumulating + // commits in the commit coordinator. + val noBackfillRegistrationClient = + new UCCommitCoordinatorClient(Map.empty[String, String].asJava, ucClient) + with DeltaLogging { + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + tableDesc: TableDescriptor, + version: Long, + lastKnownBackfilledVersion: JLong): Unit = { + throw new IOException("Simulated exception") + } + + override protected def recordDeltaEvent(opType: String, data: Any, path: Path): Unit = { + data match { + case ref: AnyRef => + recordDeltaEvent(null, opType = opType, data = ref, path = Some(path)) + } + } + } + // Client 1 performs backfills correctly. + val tcc1 = createTableCommitCoordinatorClient(log) + // Client 2 does not backfill. + val tcc2 = tcc1.copy(commitCoordinatorClient = noBackfillRegistrationClient) + + // Write 10 commits to fill up the commit coordinator (MAX_NUM_COMMITS is set to 10 + // in the InMemoryUCCommitCoordinator). + writeCommitZero(logPath) + // We use super.commit here because tco2 does not backfill so the local override of + // commit would fail waiting for the commits to be backfilled. This also applies + // to the retry of commit 11 with tco2 below. + (1 to 10).foreach(i => + super.commit(version = i, timestamp = i, tableCommitCoordinatorClient = tcc2) + ) + // Commit 11 should trigger an exception and a full backfill should be attempted. + // With tcc2, this backfill attempt should again fail, leading to a user facing + // CommitLimitReachedException, along with the usage logs. + var usageLogs = Log4jUsageLogger.track { + val e1 = intercept[JCommitFailedException] { + super.commit(version = 11, timestamp = 11, tableCommitCoordinatorClient = tcc2) + } + val tableId = tcc2.tableConf(UCCommitCoordinatorClient.UC_TABLE_ID_KEY) + assert(e1.getMessage.contains(s"Too many unbackfilled commits for $tableId.")) + assert(e1.getMessage.contains(s"A full backfill attempt failed due to: " + + "java.io.IOException: Simulated exception")) + } + assertUsageLogsContains( + usageLogs, UCCoordinatedCommitsUsageLogs.UC_FULL_BACKFILL_ATTEMPT_FAILED) + // Retry commit 11 with tcc1. This should again trigger an exception and a full + // backfill should be attempted but the backfill should succeed this time. The + // commit is then retried automatically and should succeed. We use the local + // override of commit here to ensure that we only return once commit 11 has + // been backfilled and the remaining asserts pass. + usageLogs = Log4jUsageLogger.track { + commit(version = 11, timestamp = 11, tableCommitCoordinatorClient = tcc1) + } + assertUsageLogsContains(usageLogs, UCCoordinatedCommitsUsageLogs.UC_ATTEMPT_FULL_BACKFILL) + validateBackfillStrategy(tcc1, logPath, version = 11) + } + } + + test("usage logs in commit calls are emitted correctly") { + withTempTableDir { tempDir => + val log = DeltaLog.forTable(spark, tempDir.toString) + val eventLoggerClient = + new UCCommitCoordinatorClient(Map.empty[String, String].asJava, ucClient) + with DeltaLogging { + override protected def recordDeltaEvent(opType: String, data: Any, path: Path): Unit = { + data match { + case ref: AnyRef => + recordDeltaEvent(null, opType = opType, data = ref, path = Some(path)) + } + } + } + val logPath = log.logPath + val tableCommitCoordinatorClient = createTableCommitCoordinatorClient(log) + .copy(commitCoordinatorClient = eventLoggerClient) + writeCommitZero(logPath) + // A normal commit should emit one usage log. + val usageLogs = Log4jUsageLogger.track { + commit(version = 1, timestamp = 1, tableCommitCoordinatorClient) + } + assertUsageLogsContains(usageLogs, UCCoordinatedCommitsUsageLogs.UC_COMMIT_STATS) + } + } + +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/CommitLimitReachedException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/CommitLimitReachedException.java new file mode 100644 index 00000000000..60d6269c7f8 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/CommitLimitReachedException.java @@ -0,0 +1,28 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +/** + * This exception is thrown by the UC client in case UC has reached the maximum + * number of commits that it is allowed to track (50 by default). Upon receiving + * this exception, the client should run a backfill. + */ +public class CommitLimitReachedException extends UCCommitCoordinatorException { + public CommitLimitReachedException(String message) { + super(message); + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/InvalidTargetTableException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/InvalidTargetTableException.java new file mode 100644 index 00000000000..1e42052ae90 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/InvalidTargetTableException.java @@ -0,0 +1,30 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +/** + * This exception is thrown by the UC client in case a commit attempt tried to add + * a UUID-based commit to the wrong table. The table is wrong if the path prefixes + * of the table and the UUID commit do not match. + * For example, adding /path/to/table1/_commits/01-uuid.json to the table at + * /path/to/table2 is not allowed. + */ +public class InvalidTargetTableException extends UCCommitCoordinatorException { + public InvalidTargetTableException(String message) { + super(message); + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java new file mode 100644 index 00000000000..4b3d4baf83f --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java @@ -0,0 +1,129 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.CommitFailedException; +import io.delta.storage.commit.GetCommitsResponse; +import io.delta.storage.commit.actions.AbstractMetadata; +import io.delta.storage.commit.actions.AbstractProtocol; + +import java.io.IOException; +import java.net.URI; +import java.util.Optional; + +/** + * Interface for interacting with the Unity Catalog. + * + * This interface defines the contract for operations related to the Unity Catalog, + * including retrieving the metastore ID, and adding new commits to Delta tables where UC + * acts as the Commit Coordinator and similarly retrieving unbackfilled commits. + * + * Implementations of this interface should handle the specifics of connecting to and + * communicating with the Unity Catalog, including any necessary authentication and + * request handling. + */ +public interface UCClient extends AutoCloseable { + + /** + * Retrieves the metastore ID associated with this Unity Catalog instance. + * + * @return A String representing the unique identifier of the metastore + * @throws IOException if there's an error in retrieving the metastore ID + */ + String getMetastoreId() throws IOException; + + /** + * Commits new changes to a Delta table using the Unity Catalog as the Commit Coordinator. + * + * This method is responsible for committing changes to a Delta table, including new data, + * metadata updates, and protocol changes. It interacts with the Unity Catalog to ensure + * proper coordination and consistency of the commit process. + * Note: At least one of commit or lastKnownBackfilledVersion must be present. + * + * @param tableId The unique identifier of the Delta table. + * @param tableUri The URI of the storage location of the table. e.g. s3://bucket/path/to/table + * (and not s3://bucket/path/to/table/_delta_log). + * If the tableId exists but the tableUri is different from the one previously + * registered (e.g., if the table as moved), the request will fail. + * @param commit An Optional containing the Commit object with the changes to be committed. + * If empty, it indicates that no new data is being added in this commit. + * @param lastKnownBackfilledVersion An Optional containing the last known backfilled version + * of the table. This value serves as a hint to the UC about the + * most recent version that has been successfully backfilled. + * UC can use this information to optimize its internal state + * management by cleaning up tracking information for backfilled + * commits up to this version. + * If not provided (Optional.empty()), UC will rely on its + * current state without any additional cleanup hints. + * @param disown A boolean flag indicating whether to disown the table after commit. + * If true, the coordinator will release ownership of the table after the commit. + * @param newMetadata An Optional containing new metadata to be applied to the table. + * If present, the table's metadata will be updated atomically with the commit. + * @param newProtocol An Optional containing a new protocol version to be applied to the table. + * If present, the table's protocol will be updated atomically with the commit. + * @throws IOException if there's an error during the commit process, such as network issues. + * @throws CommitFailedException if the commit fails due to conflicts or other logical errors. + * @throws UCCommitCoordinatorException if there's an error specific to the Unity Catalog + * commit coordination process. + */ + void commit( + String tableId, + URI tableUri, + Optional commit, + Optional lastKnownBackfilledVersion, + boolean disown, + Optional newMetadata, + Optional newProtocol + ) throws IOException, CommitFailedException, UCCommitCoordinatorException; + + /** + * Retrieves the unbackfilled commits for a Delta table within a specified version range. + * + * @param tableId The unique identifier of the Delta table. + * @param tableUri The URI of the storage location of the table. e.g. s3://bucket/path/to/table + * (and not s3://bucket/path/to/table/_delta_log). + * If the tableId exists but the tableUri is different from the one previously + * registered (e.g., if the table as moved), the request will fail. + * @param startVersion An Optional containing the start version of the range of commits to + * retrieve. + * @param endVersion An Optional containing the end version of the range of commits to retrieve. + * @return A GetCommitsResponse object containing the unbackfilled commits within the specified + * version range. If all commits are backfilled, the response will contain an empty list. + * The response also contains the last known backfilled version of the table. If no + * commits are ratified via UC, the lastKnownBackfilledVersion will be -1. + * @throws IOException if there's an error during the commit process, such as network issues. + * @throws UCCommitCoordinatorException if there's an error specific to the Unity Catalog such as + * the table not being found. + */ + GetCommitsResponse getCommits( + String tableId, + URI tableUri, + Optional startVersion, + Optional endVersion) throws IOException, UCCommitCoordinatorException; + + /** + * Closes any resources used by this client. + * This method should be called to properly release resources such as network + * connections (e.g., HTTPClient) when the client is no longer needed. + * Once this method is called, the client should not be used to perform any operations. + * + * @throws IOException if there's an error while closing resources + */ + @Override + void close() throws IOException; +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java new file mode 100644 index 00000000000..7ba620cd585 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java @@ -0,0 +1,937 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.file.FileAlreadyExistsException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; + +import io.delta.storage.CloseableIterator; +import io.delta.storage.LogStore; +import io.delta.storage.commit.*; +import io.delta.storage.commit.actions.AbstractMetadata; +import io.delta.storage.commit.actions.AbstractProtocol; +import io.delta.storage.internal.FileNameUtils; +import io.delta.storage.internal.LogStoreErrors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A commit coordinator client that uses unity-catalog as the commit coordinator. + * TODO: Add a cross-link to the UC Commit Coordinator Protocol spec. + */ +public class UCCommitCoordinatorClient implements CommitCoordinatorClient { + public UCCommitCoordinatorClient(Map conf, UCClient ucClient) { + this.conf = conf; + this.ucClient = ucClient; + } + + /** + * Logger for UCCommitCoordinatorClient class operations and diagnostics. + */ + private static final Logger LOG = LoggerFactory.getLogger(UCCommitCoordinatorClient.class); + + // UC Protocol Version Control Constants + /** Supported version for read operations in the Unity Catalog protocol. */ + private static final int SUPPORTED_READ_VERSION = 0; + + /** Supported version for write operations in the Unity Catalog protocol. */ + private static final int SUPPORTED_WRITE_VERSION = 0; + + /** Key used to identify the read version in protocol communications with the UC server. */ + private static final String READ_VERSION_KEY = "readVersion"; + + /** Key used to identify the write version in protocol communications with the UC server. */ + private static final String WRITE_VERSION_KEY = "writeVersion"; + + // Unity Catalog Identifiers + /** + * Key for identifying Unity Catalog table ID in `delta.coordinatedCommits.tableConf{-preview}`. + */ + final static public String UC_TABLE_ID_KEY = "ucTableId"; + + /** + * Key for identifying Unity Catalog metastore ID in + * `delta.coordinatedCommits.commitCoordinatorConf{-preview}`. + */ + final static public String UC_METASTORE_ID_KEY = "ucMetastoreId"; + + // Backfill and Retry Configuration + /** + * Offset from current commit version for backfill listing optimization. + * Used to prevent expensive listings from version 0. + */ + public static int BACKFILL_LISTING_OFFSET = 100; + + /** Maximum number of retry attempts for transient errors. */ + protected static final int MAX_RETRIES_ON_TRANSIENT_ERROR = 15; + + /** Initial wait time in milliseconds before retrying after a transient error. */ + protected static final long TRANSIENT_ERROR_RETRY_INITIAL_WAIT_MS = 100; + + /** Maximum wait time in milliseconds between retries for transient errors. */ + protected static final long TRANSIENT_ERROR_RETRY_MAX_WAIT_MS = 1000 * 60; // 1 minute + + // Thread Pool Configuration + /** Size of the thread pool for handling asynchronous operations. */ + static protected int THREAD_POOL_SIZE = 20; + + /** + * Thread pool executor for handling asynchronous tasks like backfilling. + * Configured with daemon threads and custom naming pattern. + */ + private static final ThreadPoolExecutor asyncExecutor; + + // Static Initializer Block + static { + asyncExecutor = new ThreadPoolExecutor( + THREAD_POOL_SIZE, + THREAD_POOL_SIZE, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(Integer.MAX_VALUE), + new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(@Nonnull Runnable r) { + Thread t = defaultFactory.newThread(r); + // Set the thread name to uc-commit-coordinator-pool-1-thread-1 + t.setName("uc-commit-coordinator-" + t.getName()); + t.setDaemon(true); + return t; + } + }); + asyncExecutor.allowCoreThreadTimeOut(true); + } + + // Instance Variables + /** Unity Catalog client instance for interacting with UC services. */ + public final UCClient ucClient; + + /** Configuration map containing settings for the coordinator client. */ + public final Map conf; + + /** + * Runs a task asynchronously using the backfillThreadPool. + * + * @param task The task to be executed asynchronously + * @return A Future representing pending completion of the task + */ + protected Future executeAsync(Callable task) { + return asyncExecutor.submit(task); + } + + protected String extractUCTableId(TableDescriptor tableDesc) { + Map tableConf = tableDesc.getTableConf(); + if (!tableConf.containsKey(UC_TABLE_ID_KEY)) { + throw new IllegalStateException("UC Table ID not found in " + tableConf); + } + return tableConf.get(UC_TABLE_ID_KEY); + } + + /** + * For UC, table registration is a no-op because we already contacted UC during table + * creation and that already obtained the necessary table config and added + * it to the metadata (this is for performance reasons and ease of use). As a result, + * this method only verifies that the metadata has been added correct and is present. + * Otherwise, it throws an exception. + */ + @Override + public Map registerTable( + Path logPath, + Optional tableIdentifier, + long currentVersion, + AbstractMetadata currentMetadata, + AbstractProtocol currentProtocol) { + Map tableConf = CoordinatedCommitsUtils.getTableConf(currentMetadata); + checkVersionSupported(tableConf, false /* compareRead */); + + // The coordinatedCommitsTableConf must have been instantiated prior to this call + // with the UC table ID. + if (!tableConf.containsKey(UC_TABLE_ID_KEY)) { + throw new IllegalStateException("Could not verify if the table is registered with the " + + "UC commit coordinator because the table ID is missing from the table metadata."); + } + // The coordinatedCommitsCoordinatorConf must have been instantiated prior to this call + // with the metastore ID of the metastore, which stores the table. + if (!CoordinatedCommitsUtils.getCoordinatorConf(currentMetadata).containsKey( + UC_METASTORE_ID_KEY)) { + throw new IllegalStateException("Could not verify if the table is registered with the UC " + + "commit coordinator because the metastore ID is missing from the table metadata."); + } + return tableConf; + } + + /** + * Find the last known backfilled version by doing a listing of the last + * [[BACKFILL_LISTING_OFFSET]] commits. If no backfilled commits are found + * among those, a UC call is made to get the oldest tracked commit in UC. + */ + public long getLastKnownBackfilledVersion( + long commitVersion, + Configuration hadoopConf, + LogStore logStore, + TableDescriptor tableDesc + ) { + Path logPath = tableDesc.getLogPath(); + long listFromVersion = Math.max(0, commitVersion - BACKFILL_LISTING_OFFSET); + Optional lastKnownBackfilledVersion = + listAndGetLastKnownBackfilledVersion(listFromVersion, logStore, hadoopConf, logPath); + if (!lastKnownBackfilledVersion.isPresent()) { + // In case we don't find anything in the last 100 commits (should not happen) + // we go to UC to find the earliest commit it is tracking as the commit prior + // to that must have been backfilled. + recordDeltaEvent( + UCCoordinatedCommitsUsageLogs.UC_LAST_KNOWN_BACKFILLED_VERSION_NOT_FOUND, + new HashMap() {{ + put("commitVersion", commitVersion); + put("conf", conf); + put("listFromVersion", listFromVersion); + put("tableConf", tableDesc.getTableConf()); + }}, + logPath.getParent() + ); + long minVersion = + getCommits(tableDesc, null, null) + .getCommits() + .stream() + .min(Comparator.comparingLong(Commit::getVersion)) + .map(Commit::getVersion) + .orElseThrow(() -> new IllegalStateException("Couldn't find any unbackfilled commit " + + "for table at " + logPath + " at version " + commitVersion)); + lastKnownBackfilledVersion = listAndGetLastKnownBackfilledVersion( + minVersion - 1, logStore, hadoopConf, logPath); + if (!lastKnownBackfilledVersion.isPresent()) { + throw new IllegalStateException("Couldn't find any backfilled commit for table at " + + logPath + " at version " + commitVersion); + } + } + return lastKnownBackfilledVersion.get(); + } + + protected Iterator listFrom( + LogStore logStore, + long listFromVersion, + Configuration hadoopConf, + Path logPath) { + Path listingPath = CoordinatedCommitsUtils.getBackfilledDeltaFilePath(logPath, listFromVersion); + try { + return logStore.listFrom(listingPath, hadoopConf); + } catch (IOException e) { + LOG.error("Failed to list files from {} due to: {}", listingPath, exceptionString(e)); + throw new IllegalStateException(e); + } + } + + protected Optional listAndGetLastKnownBackfilledVersion( + long listFromVersion, + LogStore logStore, + Configuration hadoopConf, + Path logPath) { + Optional lastKnownBackfilledVersion = Optional.empty(); + Iterator deltaLogFileIt = + listFrom(logStore, listFromVersion, hadoopConf, logPath); + while (deltaLogFileIt.hasNext()) { + FileStatus fileStatus = deltaLogFileIt.next(); + if (FileNameUtils.isDeltaFile(fileStatus.getPath())) { + lastKnownBackfilledVersion = + Optional.of(FileNameUtils.deltaVersion(fileStatus.getPath())); + } + } + return lastKnownBackfilledVersion; + } + + @Override + public CommitResponse commit( + LogStore logStore, + Configuration hadoopConf, + TableDescriptor tableDesc, + long commitVersion, + Iterator actions, + UpdatedActions updatedActions) throws CommitFailedException { + return commitImpl( + logStore, + hadoopConf, + tableDesc, + commitVersion, + actions, + updatedActions); + } + + /** + * Commits the provided actions as the specified version. The steps are as follows. + * + * 1. Write the actions to a UUID-based commit file + * 2. In parallel to 1. determine the last known backfilled version. + * If a backfill hint is provided, we verify that it exists via a single HEAD call. Otherwise, + * the last known backfilled version is determined via a listing. + * 3. Send commit request to UC to commit the version and register backfills up to the + * found last known backfilled version. + * 4. Backfill all unbackfilled commits (including the latest one made in this call) + * asynchronously. + * A getCommits call is made to UC to retrieve all currently unbackfilled commits. + */ + protected CommitResponse commitImpl( + LogStore logStore, + Configuration hadoopConf, + TableDescriptor tableDesc, + long commitVersion, + Iterator actions, + UpdatedActions updatedActions) throws CommitFailedException { + Path logPath = tableDesc.getLogPath(); + Map coordinatedCommitsTableConf = tableDesc.getTableConf(); + checkVersionSupported(coordinatedCommitsTableConf, false /* compareRead */); + // Writes may also have to perform reads to determine the last known backfilled + // version/the commits to backfill in case we don't have a backfill hint. To + // prevent to write to succeed but then fail the read, we do the read protocol + // version check here. + checkVersionSupported(coordinatedCommitsTableConf, true /* compareRead */); + + if (commitVersion == 0) { + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + "Commit version 0 must go via filesystem."); + } + + long startTimeMs = System.currentTimeMillis(); + Map eventData = new HashMap<>(); + eventData.put("commitVersion", commitVersion); + eventData.put("coordinatedCommitsTableConf", coordinatedCommitsTableConf); + eventData.put("updatedActions", updatedActions); + + BiConsumer, String> recordUsageLog = (exception, opType) -> { + exception.ifPresent(throwable -> { + eventData.put("exceptionClass", throwable.getClass().getName()); + eventData.put("exceptionString", exceptionString(throwable)); + }); + eventData.put("totalTimeTakenMs", System.currentTimeMillis() - startTimeMs); + recordDeltaEvent(opType, eventData, logPath.getParent()); + }; + + // After commit 0, the table ID must exist in UC + String tableId = extractUCTableId(tableDesc); + LOG.info("Attempting to commit version " + commitVersion + " to table " + tableId); + + // Asynchronously verify/retrieve the last known backfilled version + // Using AtomicLong instead of Long because we need to update the value in the lambda + // and "Variable used in lambda expression should be final or effectively final". + AtomicLong timeSpentInGettingLastKnownBackfilledVersion = + new AtomicLong(System.currentTimeMillis()); + Future lastKnownBackfilledVersionFuture; + try { + lastKnownBackfilledVersionFuture = executeAsync(() -> { + long foundVersion = getLastKnownBackfilledVersion( + commitVersion, + hadoopConf, + logStore, + tableDesc); + timeSpentInGettingLastKnownBackfilledVersion.getAndUpdate(start -> + System.currentTimeMillis() - start); + return foundVersion; + }); + } catch (Exception e) { + // Synchronously verify/retrieve last known backfilled version. + LOG.warn("Error while submitting task to verify/retrieve last known backfilled version " + + "due to: " + exceptionString(e) + ". Verifying/retrieving synchronously"); + recordUsageLog.accept( + Optional.of(e), + UCCoordinatedCommitsUsageLogs.UC_BACKFILL_VALIDATION_FALLBACK_TO_SYNC); + long foundVersion = getLastKnownBackfilledVersion( + commitVersion, + hadoopConf, + logStore, + tableDesc); + timeSpentInGettingLastKnownBackfilledVersion.getAndUpdate(start -> + System.currentTimeMillis() - start);; + lastKnownBackfilledVersionFuture = CompletableFuture.completedFuture(foundVersion); + } + + // In parallel to verifying/getting the last known backfilled version, write the commit file. + long writeStartTimeMs = System.currentTimeMillis(); + FileStatus commitFile; + try { + commitFile = CoordinatedCommitsUtils.writeUnbackfilledCommitFile( + logStore, + hadoopConf, + logPath.toString(), + commitVersion, + actions, + UUID.randomUUID().toString() + ); + } catch (IOException e) { + throw new CommitFailedException( + true /* retryable */, + false /* conflict */, + "Failed to write commit file due to: " + e.getMessage(), + e); + } + eventData.put("writeCommitFileTimeTakenMs", System.currentTimeMillis() - writeStartTimeMs); + + // Using AtomicLong instead of Long because we need to access the value in the lambda + // and "Variable used in lambda expression should be final or effectively final". + AtomicLong lastKnownBackfilledVersion = new AtomicLong(); + try { + lastKnownBackfilledVersion.set(lastKnownBackfilledVersionFuture.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + long commitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp(); + boolean disown = isDisownCommit( + updatedActions.getOldMetadata(), + updatedActions.getNewMetadata()); + eventData.put("tableId", tableId); + eventData.put("lastKnownBackfilledVersion", lastKnownBackfilledVersion.get()); + eventData.put("commitTimestamp", commitTimestamp); + eventData.put("disown", disown); + eventData.put( + "timeSpentInGettingLastKnownBackfilledVersion", + timeSpentInGettingLastKnownBackfilledVersion); + + int transientErrorRetryCount = 0; + while (transientErrorRetryCount <= MAX_RETRIES_ON_TRANSIENT_ERROR) { + try { + commitToUC( + tableDesc, + logPath, + Optional.of(commitFile), + Optional.of(commitVersion), + Optional.of(commitTimestamp), + Optional.of(lastKnownBackfilledVersion.get()), + disown, + updatedActions.getNewMetadata() == updatedActions.getOldMetadata() ? + Optional.empty() : + Optional.of(updatedActions.getNewMetadata()), + updatedActions.getNewProtocol() == updatedActions.getOldProtocol() ? + Optional.empty() : + Optional.of(updatedActions.getNewProtocol()) + ); + break; + } catch (CommitFailedException cfe) { + if (transientErrorRetryCount > 0 && cfe.getConflict() && cfe.getRetryable() && + hasSameContent( + logStore, + hadoopConf, + logPath, + CoordinatedCommitsUtils.getBackfilledDeltaFilePath(logPath, commitVersion), + commitFile.getPath())) { + // The commit was persisted in UC, but we did not get a response. Continue + // because the commit was successful + eventData.put("alreadyBackfilledCommitCausedConflict", true); + break; + } else { + // Rethrow the exception here as is because the caller needs to handle it. + recordUsageLog.accept(Optional.of(cfe), UCCoordinatedCommitsUsageLogs.UC_COMMIT_STATS); + throw cfe; + } + } catch (IOException ioe) { + if (transientErrorRetryCount == MAX_RETRIES_ON_TRANSIENT_ERROR) { + // Rethrow exception in case we've reached the retry limit. + recordUsageLog.accept(Optional.of(ioe), UCCoordinatedCommitsUsageLogs.UC_COMMIT_STATS); + throw new CommitFailedException( + true /* retryable */, + false /* conflict */, + ioe.getMessage(), + ioe); + } + // Exponentially back off. The initial wait time is set to 100ms and the max retry count + // is 15. The max wait time is 1 min so overall, we'll be waiting for a max of ~8 min. + long sleepTime = Math.min( + TRANSIENT_ERROR_RETRY_INITIAL_WAIT_MS << transientErrorRetryCount, + TRANSIENT_ERROR_RETRY_MAX_WAIT_MS + ); + LOG.info("Sleeping for " + sleepTime + "ms before retrying commit after transient error " + + ioe.getMessage()); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + transientErrorRetryCount++; + eventData.put("transientErrorRetryCount", transientErrorRetryCount); + } catch (UpgradeNotAllowedException + unae) { + // This is translated to a non-retryable, non-conflicting commit failure. + recordUsageLog.accept(Optional.of(unae), UCCoordinatedCommitsUsageLogs.UC_COMMIT_STATS); + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + unae.getMessage(), + unae); + } catch (InvalidTargetTableException + itte) { + // Just rethrow, this will propagate to the user. + recordUsageLog.accept(Optional.of(itte), UCCoordinatedCommitsUsageLogs.UC_COMMIT_STATS); + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + itte.getMessage(), + itte); + } catch (CommitLimitReachedException + clre) { + // We attempt a full backfill and then retry the commit. + try { + AtomicReference caughtException = new AtomicReference<>(null); + lastKnownBackfilledVersion.getAndUpdate(lastKnownBackfilledVersionVal -> { + try { + return attemptFullBackfill( + logStore, + hadoopConf, + tableDesc, + commitVersion, + tableId, + lastKnownBackfilledVersionVal, + eventData + ); + } catch (Exception e) { + caughtException.set(e); + return lastKnownBackfilledVersionVal; // Return unchanged value on exception + } + }); + if (caughtException.get() != null) { + throw caughtException.get(); + } + } catch (Throwable e) { + recordUsageLog.accept( + Optional.of(e), UCCoordinatedCommitsUsageLogs.UC_FULL_BACKFILL_ATTEMPT_FAILED); + String message = String.format( + "Commit limit reached (%s) for table %s. A full backfill attempt failed due to: %s", + exceptionString(clre), + tableId, + exceptionString(e)); + throw new CommitFailedException( + true /* retryable */, + false /* conflict */, + message, + clre); + } + eventData.put("lastKnownBackfilledVersion", lastKnownBackfilledVersion.get()); + eventData.put("encounteredCommitLimitReachedException", true); + // Retry the commit as there should be space in UC now. We set isCommitLimitReachedRetry + // to true so that in case the full backfill attempt was unsuccessful in freeing up space + // in UC, we don't indefinitely retry but rather throw the CommitLimitReachedException. + // Don't increase transientErrorRetryCount as this is not a transient error. + } catch (UCCommitCoordinatorException + ucce) { + // Just rethrow, this will propagate to the user. + recordUsageLog.accept(Optional.of(ucce), UCCoordinatedCommitsUsageLogs.UC_COMMIT_STATS); + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + ucce.getMessage(), + ucce); + } + } + + LOG.info("Successfully wrote " + commitFile.getPath() + " as commit " + commitVersion + + " to table " + tableId); + + // Asynchronously backfill everything up to the latest commit. + Callable doBackfill = () -> { + backfillToVersion( + logStore, + hadoopConf, + tableDesc, + commitVersion, + lastKnownBackfilledVersion.get() + ); + return null; + }; + + try { + executeAsync(doBackfill); + } catch (Throwable e) { + if (LogStoreErrors.isFatal(e)) { + throw e; + } + // attempt a synchronous backfill + LOG.warn("Error while submitting backfill task: " + exceptionString(e) + + ". Performing synchronous backfill now."); + recordUsageLog.accept( + Optional.of(e), + UCCoordinatedCommitsUsageLogs.UC_BACKFILL_FALLBACK_TO_SYNC); + try { + doBackfill.call(); + } catch (Throwable t) { + if (LogStoreErrors.isFatal(t)) { + throw new RuntimeException(t); + } + } + } + + recordUsageLog.accept(Optional.empty(), UCCoordinatedCommitsUsageLogs.UC_COMMIT_STATS); + return new CommitResponse(new Commit(commitVersion, commitFile, commitTimestamp)); + } + + /** + * Attempts a full backfill of all currently unbackfilled versions in order to free + * up space in UC. After the attempt, will do a listing to find the new last known + * backfilled version and returns it. + */ + protected long attemptFullBackfill( + LogStore logStore, + Configuration hadoopConf, + TableDescriptor tableDesc, + long commitVersion, + String tableId, + long lastKnownBackfilledVersion, + Map eventData) throws IOException, + UCCommitCoordinatorException, + CommitFailedException { + Path logPath = tableDesc.getLogPath(); + LOG.info("Too many unbackfilled commits in UC at version {} for table at {} " + + "and ID {}. Last known backfill version is {}. Attempting a full backfill.", + commitVersion, logPath, tableId, lastKnownBackfilledVersion); + + long backfillStartTime = System.currentTimeMillis(); + backfillToVersion( + logStore, + hadoopConf, + tableDesc, + commitVersion, + lastKnownBackfilledVersion + ); + long backfillDuration = System.currentTimeMillis() - backfillStartTime; + + long updatedLastKnownBackfilledVersion = getLastKnownBackfilledVersion( + commitVersion, + hadoopConf, + logStore, + tableDesc); + + long commitStartTime = System.currentTimeMillis(); + commitToUC( + tableDesc, + logPath, + Optional.empty() /* commitFile */, + Optional.empty() /* commitVersion */, + Optional.empty() /* commitTimestamp */, + Optional.of(updatedLastKnownBackfilledVersion), + true /* disown */, + Optional.empty() /* newMetadata */, + Optional.empty() /* newProtocol */ + ); + long commitDuration = System.currentTimeMillis() - commitStartTime; + + recordDeltaEvent( + UCCoordinatedCommitsUsageLogs.UC_ATTEMPT_FULL_BACKFILL, + new HashMap(eventData) {{ + put("commitVersion", commitVersion); + put("coordinatedCommitsTableConf", tableDesc.getTableConf()); + put("lastKnownBackfilledVersion", lastKnownBackfilledVersion); + put("updatedLastKnownBackfilledVersion", updatedLastKnownBackfilledVersion); + put("tableId", tableId); + put("backfillTime", backfillDuration); + put("ucCommitTime", commitDuration); + }}, + logPath.getParent() + ); + return updatedLastKnownBackfilledVersion; + } + + protected void commitToUC( + TableDescriptor tableDesc, + Path logPath, + Optional commitFile, + Optional commitVersion, + Optional commitTimestamp, + Optional lastKnownBackfilledVersion, + boolean disown, + Optional newMetadata, + Optional newProtocol + ) throws IOException, CommitFailedException, UCCommitCoordinatorException + { + Optional commit = commitFile.map(f -> new Commit( + commitVersion.orElseThrow(() -> new IllegalArgumentException( + "Commit version should be specified when commitFile is present")), + f, + commitTimestamp.orElseThrow(() -> new IllegalArgumentException( + "Commit timestamp should be specified when commitFile is present")) + )); + ucClient.commit( + extractUCTableId(tableDesc), + CoordinatedCommitsUtils.getTablePath(logPath).toUri(), + commit, + lastKnownBackfilledVersion, + disown, + newMetadata, + newProtocol + ); + } + + /** + * Detects whether the current commit is a downgrade (disown) commit by checking + * that the UC commit coordinator name is present in the old metadata but removed from + * the new metadata. + */ + protected boolean isDisownCommit(AbstractMetadata oldMetadata, AbstractMetadata newMetadata) { + return CoordinatedCommitsUtils + .getCoordinatorName(oldMetadata) + .filter("unity-catalog"::equals).isPresent() && + !CoordinatedCommitsUtils.getCoordinatorName(newMetadata).isPresent(); + } + + /** + * This method is used to verify, whether the currently attempted commit already + * exists as a backfilled commit. This is possible in the following scenario: + * + * 1. Client attempts to make commit v. + * 2. UC persists the commit in its database but then the connection to the client breaks. + * 3. The client receives a transient error. + * 4. Before retrying, a concurrent client commits v + 1 and backfills v. + * 5. Another subsequent commit registers the backfill of v with UC, leading to UC. + * deleting the commit for v from its database. + * 6. Now this client retries commit v. + * 7. UC does not store v anymore and so cannot determine whether v has already been made + * or not and so returns a retryable conflict. + * + * Now commit v exists as a backfilled commit but Delta would try to rebase v and recommit, + * which could lead to duplicate data being written. To avoid this scenario, we need to + * verify, that a commit does not already exist as a backfilled commit after the corresponding + * commit attempt failed with an IOException and the subsequent retry resulted in a conflict. + */ + protected boolean hasSameContent( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + Path backfilledCommit, + Path unbackfilledCommit) { + try { + FileSystem fs = logPath.getFileSystem(hadoopConf); + if (fs.getFileStatus(backfilledCommit).getLen() != + fs.getFileStatus(unbackfilledCommit).getLen()) { + return false; + } + } catch (FileNotFoundException e) { + // If we get a FileNotFoundException, it should be for the backfilled + // commit because we are only calling this method from commit() at the moment, + // which means we just wrote the unbackfilled commit. + return false; + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Compare content. + try (CloseableIterator contentBackfilled = logStore.read(backfilledCommit, hadoopConf); + CloseableIterator contentUnbackfilled = + logStore.read(unbackfilledCommit, hadoopConf)) { + while (contentUnbackfilled.hasNext() && contentBackfilled.hasNext()) { + if (!contentUnbackfilled.next().equals(contentBackfilled.next())) { + return false; + } + } + return !contentBackfilled.hasNext() && !contentUnbackfilled.hasNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public GetCommitsResponse getCommits( + TableDescriptor tableDesc, + Long startVersion, + Long endVersion) { + checkVersionSupported(tableDesc.getTableConf(), true /* compareRead */); + GetCommitsResponse resp = getCommitsFromUCImpl( + tableDesc, + Optional.ofNullable(startVersion), + Optional.ofNullable(endVersion)); + // Sort by version just in case commits in the response from UC aren't sorted. + List sortedCommits = + resp + .getCommits() + .stream() + .sorted(Comparator.comparingLong(Commit::getVersion)) + .collect(Collectors.toList()); + return new GetCommitsResponse(sortedCommits, resp.getLatestTableVersion()); + } + + protected GetCommitsResponse getCommitsFromUCImpl( + TableDescriptor tableDesc, + Optional startVersion, + Optional endVersion) { + try { + return ucClient.getCommits( + extractUCTableId(tableDesc), + CoordinatedCommitsUtils.getTablePath(tableDesc.getLogPath()).toUri(), + startVersion, + endVersion); + } catch (IOException | UCCommitCoordinatorException e) { + throw new RuntimeException(e); + } + } + + @Override + public void backfillToVersion( + LogStore logStore, + Configuration hadoopConf, + TableDescriptor tableDesc, + long version, + Long lastKnownBackfilledVersion) throws IOException { + // backfillToVersion currently does not depend on write. However, it is + // technically a write operation, so we also add a write version check here + // in case we ever introduce a write dependency. + checkVersionSupported(tableDesc.getTableConf(), true /* compareRead */); + checkVersionSupported(tableDesc.getTableConf(), false /* compareRead */); + + Path logPath = tableDesc.getLogPath(); + String tableId = extractUCTableId(tableDesc); + long startVersion = (lastKnownBackfilledVersion == null) ? 0L : lastKnownBackfilledVersion; + long startTimeMs = System.currentTimeMillis(); + LOG.info("Backfilling {}: startVersion {} to endVersion {}", tableId, startVersion, version); + + // Check that the last known backfilled version actually exists if it + // has been specified. If it doesn't exist, we fail the backfill. If it + // hasn't been specified backfill everything that hasn't been backfilled yet. + if (lastKnownBackfilledVersion != null) { + FileSystem fs = logPath.getFileSystem(hadoopConf); + // Check that the last known backfilled version actually exists. + if (!fs.exists(CoordinatedCommitsUtils + .getBackfilledDeltaFilePath(logPath, lastKnownBackfilledVersion))) { + LOG.error("Specified last known backfilled version {} does not exist for table {}", + lastKnownBackfilledVersion, tableId); + recordDeltaEvent( + UCCoordinatedCommitsUsageLogs.UC_BACKFILL_DOES_NOT_EXIST, + new HashMap() {{ + put("lastKnownBackfilledVersion", lastKnownBackfilledVersion); + put("version", version); + put("tableConf", tableDesc.getTableConf()); + }}, + logPath.getParent() + ); + throw new IllegalStateException("Last known backfilled version " + + lastKnownBackfilledVersion + " doesn't exist for table at " + logPath); + } + } + GetCommitsResponse commitsResponse = getCommits(tableDesc, lastKnownBackfilledVersion, version); + for (Commit commit : commitsResponse.getCommits()) { + boolean backfillResult = backfillSingleCommit( + logStore, + hadoopConf, + logPath, + commit.getVersion(), + commit.getFileStatus(), + false /* failOnException */); + if (!backfillResult) { + break; + } + } + + recordDeltaEvent( + UCCoordinatedCommitsUsageLogs.UC_BACKFILL_TO_VERSION, + new HashMap() {{ + put("coordinatedCommitsTableConf", tableDesc.getTableConf()); + put("totalTimeTakenMs", System.currentTimeMillis() - startTimeMs); + put("lastKnownBackfilledVersion", lastKnownBackfilledVersion); + put("tableId", tableId); + put("version", version); + }}, + logPath.getParent() + ); + } + + /** + * Backfill the specified commit as the target version. Returns true if the + * backfill was successful (or the backfilled file already existed) and false + * in case the backfill failed. + */ + protected boolean backfillSingleCommit( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + long version, + FileStatus fileStatus, + Boolean failOnException) { + Path targetFile = CoordinatedCommitsUtils.getBackfilledDeltaFilePath(logPath, version); + try (CloseableIterator commitContentIterator = + logStore.read(fileStatus.getPath(), hadoopConf)) { + // Use put-if-absent for backfills so that files are not overwritten and the + // modification time does not change for already backfilled files. + logStore.write(targetFile, commitContentIterator, false /* overwrite */, hadoopConf); + } catch (FileAlreadyExistsException e) { + LOG.info("The backfilled file {} already exists.", targetFile); + } catch (Exception e) { + if (LogStoreErrors.isFatal(e) || failOnException) { + throw new RuntimeException(e); + } + LOG.warn("Backfill for table at {} failed for version {} due to: {}", + logPath, version, exceptionString(e)); + recordDeltaEvent( + UCCoordinatedCommitsUsageLogs.UC_BACKFILL_FAILED, + new HashMap() {{ + put("version", version); + put("exceptionClass", e.getClass().getName()); + put("exceptionString", exceptionString(e)); + }}, + logPath.getParent() + ); + return false; + } + return true; + } + + @Override + public boolean semanticEquals(CommitCoordinatorClient other) { + if (!(other instanceof UCCommitCoordinatorClient)) { + return false; + } + UCCommitCoordinatorClient otherStore = (UCCommitCoordinatorClient) other; + return this.conf == otherStore.conf; + } + + protected void recordDeltaEvent(String opType, Object data, Path path) { + LOG.info("Delta event recorded with opType={}, data={}, and path={}", opType, data, path); + } + + protected String exceptionString(Throwable e) { + if (e == null) { + return ""; + } else { + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + return stringWriter.toString(); + } + } + + protected void checkVersionSupported(Map tableConf, boolean compareRead) { + int readVersion = Integer.parseInt(tableConf.getOrDefault(READ_VERSION_KEY, "0")); + int writeVersion = Integer.parseInt(tableConf.getOrDefault(WRITE_VERSION_KEY, "0")); + int targetVersion = compareRead ? readVersion : writeVersion; + int supportedVersion = compareRead ? SUPPORTED_READ_VERSION : SUPPORTED_WRITE_VERSION; + String op = compareRead ? "read" : "write"; + if (supportedVersion != targetVersion) { + throw new UnsupportedOperationException("The version of the UC commit coordinator protocol" + + " is not supported by this version of the UC commit coordinator client. Please upgrade" + + " the commit coordinator client to " + op + " this table."); + } + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorException.java new file mode 100644 index 00000000000..08c32c2dfbf --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorException.java @@ -0,0 +1,26 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +/** + * Base class for all exceptions thrown by the UC client from coordinated commits-related APIs. + */ +public abstract class UCCommitCoordinatorException extends Exception { + public UCCommitCoordinatorException(String message) { + super(message); + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCoordinatedCommitsUsageLogs.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCoordinatedCommitsUsageLogs.java new file mode 100644 index 00000000000..748e9d3afaf --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCoordinatedCommitsUsageLogs.java @@ -0,0 +1,39 @@ +package io.delta.storage.commit.uccommitcoordinator; + +/** Class containing usage logs emitted by Coordinated Commits. */ +public class UCCoordinatedCommitsUsageLogs { + + // Common prefix for all coordinated-commits usage logs. + private static final String PREFIX = "delta.coordinatedCommits"; + + // Usage log emitted after backfilling to a version. + public static final String UC_BACKFILL_TO_VERSION = PREFIX + ".uc.backfillToVersion"; + + // Usage log emitted if the specified last known backfilled version does not exist. + public static final String UC_BACKFILL_DOES_NOT_EXIST = PREFIX + ".uc.backfillDoesNotExist"; + + // Usage log emitted if a backfill attempt for a single file failed. + public static final String UC_BACKFILL_FAILED = PREFIX + ".uc.backfillFailed"; + + // Usage log emitted when the last known backfilled version cannot be determined from the last + // `BACKFILL_LISTING_OFFSET` commits. + public static final String UC_LAST_KNOWN_BACKFILLED_VERSION_NOT_FOUND = + PREFIX + ".uc.lastKnownBackfilledVersionNotFound"; + + // Usage log emitted if UC commit coordinator client falls back to synchronous backfill. + public static final String UC_BACKFILL_VALIDATION_FALLBACK_TO_SYNC = + PREFIX + ".uc.backfillValidation.fallbackToSync"; + + // Usage log emitted when commit limit is reached, and we attempt a full backfill. + public static final String UC_ATTEMPT_FULL_BACKFILL = PREFIX + ".uc.attemptFullBackfill"; + + // Usage log emitted if UC commit coordinator client falls back to synchronous backfill. + public static final String UC_BACKFILL_FALLBACK_TO_SYNC = PREFIX + ".uc.backfill.fallbackToSync"; + + // Usage log emitted as part of [[UCCommitCoordinatorClient.commit]] call. + public static final String UC_COMMIT_STATS = PREFIX + ".uc.commitStats"; + + // Usage log emitted when a full backfill attempt has failed + public static final String UC_FULL_BACKFILL_ATTEMPT_FAILED = + PREFIX + ".uc.fullBackfillAttemptFailed"; +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCRestClientPayload.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCRestClientPayload.java new file mode 100644 index 00000000000..90dc058fed1 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCRestClientPayload.java @@ -0,0 +1,238 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import com.fasterxml.jackson.annotation.JsonInclude; +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.actions.AbstractMetadata; +import io.delta.storage.commit.actions.AbstractProtocol; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Container for internal REST classes used by UCTokenBasedRestClient. + * Encapsulates all necessary classes for JSON serialization/deserialization. + */ +class UCRestClientPayload { + + // ============================== + // CommitInfo Class + // ============================== + static class CommitInfo { + Long version; + Long timestamp; + String fileName; + Long fileSize; + Long fileModificationTimestamp; + Boolean isDisownCommit; + + static CommitInfo fromCommit(Commit externalCommit, boolean isDisownCommit) { + if (externalCommit == null) { + throw new IllegalArgumentException("externalCommit cannot be null"); + } + if (externalCommit.getFileStatus() == null) { + throw new IllegalArgumentException("externalCommit.getFileStatus() cannot be null"); + } + + CommitInfo commitInfo = new CommitInfo(); + commitInfo.version = externalCommit.getVersion(); + commitInfo.timestamp = externalCommit.getCommitTimestamp(); + commitInfo.fileName = externalCommit.getFileStatus().getPath().getName(); + commitInfo.fileSize = externalCommit.getFileStatus().getLen(); + commitInfo.fileModificationTimestamp = externalCommit.getFileStatus().getModificationTime(); + commitInfo.isDisownCommit = isDisownCommit; + return commitInfo; + } + + static Commit toCommit(CommitInfo commitInfo, Path basePath) { + FileStatus fileStatus = new FileStatus( + commitInfo.fileSize, + false /* isdir */, + 0 /* block_replication */, + 0 /* blocksize */, + commitInfo.fileModificationTimestamp, + new Path(basePath, commitInfo.fileName)); + return new Commit(commitInfo.version, fileStatus, commitInfo.timestamp); + } + } + + // ============================== + // Protocol Class + // ============================== + static class Protocol { + Integer minReaderVersion; + Integer minWriterVersion; + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List readerFeatures; + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List writerFeatures; + + static Protocol fromAbstractProtocol(AbstractProtocol externalProtocol) { + if (externalProtocol == null) { + throw new IllegalArgumentException("externalProtocol cannot be null"); + } + + Protocol protocol = new Protocol(); + protocol.minReaderVersion = externalProtocol.getMinReaderVersion(); + protocol.minWriterVersion = externalProtocol.getMinWriterVersion(); + protocol.readerFeatures = new ArrayList<>(externalProtocol.getReaderFeatures()); + protocol.writerFeatures = new ArrayList<>(externalProtocol.getWriterFeatures()); + + return protocol; + } + } + + // ============================== + // Metadata Class + // ============================== + static class Metadata { + String deltaTableId; + String name; + String description; + String provider; + OptionsKVPairs formatOptions; + ColumnInfos schema; + List partitionColumns; + PropertiesKVPairs properties; + String createdTime; + + static Metadata fromAbstractMetadata(AbstractMetadata externalMetadata) { + if (externalMetadata == null) { + throw new IllegalArgumentException("externalMetadata cannot be null"); + } + + Metadata metadata = new Metadata(); + metadata.deltaTableId = externalMetadata.getId(); + metadata.name = externalMetadata.getName(); + metadata.description = externalMetadata.getDescription(); + metadata.provider = externalMetadata.getProvider(); + metadata.formatOptions = OptionsKVPairs.fromFormatOptions( + externalMetadata.getFormatOptions()); + metadata.schema = ColumnInfos.fromSchemaString(externalMetadata.getSchemaString()); + metadata.partitionColumns = externalMetadata.getPartitionColumns(); + metadata.properties = PropertiesKVPairs.fromProperties(externalMetadata.getConfiguration()); + metadata.createdTime = externalMetadata.getCreatedTime().toString(); // Assuming ISO format + + return metadata; + } + } + + // ============================== + // OptionsKVPairs Class + // ============================== + static class OptionsKVPairs { + Map options; + + static OptionsKVPairs fromFormatOptions(Map externalOptions) { + if (externalOptions == null) { + throw new IllegalArgumentException("externalOptions cannot be null"); + } + + OptionsKVPairs kvPairs = new OptionsKVPairs(); + kvPairs.options = externalOptions; + return kvPairs; + } + } + + // ============================== + // PropertiesKVPairs Class + // ============================== + static class PropertiesKVPairs { + Map properties; + + static PropertiesKVPairs fromProperties(Map externalProperties) { + if (externalProperties == null) { + throw new IllegalArgumentException("externalProperties cannot be null"); + } + + PropertiesKVPairs kvPairs = new PropertiesKVPairs(); + kvPairs.properties = externalProperties; + return kvPairs; + } + } + + // ============================== + // ColumnInfos Class + // ============================== + static class ColumnInfos { + List columns; + + static ColumnInfos fromSchemaString(String schemaString) { + // TODO: Implement actual schema parsing logic based on schema format + return null; + } + + static class ColumnInfo { + String name; + String type; + Boolean nullable; + + static ColumnInfo fromColumnDetails(String name, String type, Boolean nullable) { + if (name == null || type == null || nullable == null) { + throw new IllegalArgumentException("Column details cannot be null"); + } + + ColumnInfo columnInfo = new ColumnInfo(); + columnInfo.name = name; + columnInfo.type = type; + columnInfo.nullable = nullable; + return columnInfo; + } + } + } + + // ============================== + // CommitRequest Class + // ============================== + static class CommitRequest { + String tableId; + String tableUri; + CommitInfo commitInfo; + Long latestBackfilledVersion; + Metadata metadata; + Protocol protocol; + } + + // ============================== + // GetCommitsRequest Class + // ============================== + static class GetCommitsRequest { + String tableId; + String tableUri; + Long startVersion; + Long endVersion; + } + + // ============================== + // RestGetCommitsResponse Class + // ============================== + static class RestGetCommitsResponse { + public List commits; + public Long latestTableVersion; + } + + // ============================== + // GetMetastoreSummaryResponse Class + // ============================== + static class GetMetastoreSummaryResponse { + String metastoreId; + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java new file mode 100644 index 00000000000..3c5dfc75e57 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java @@ -0,0 +1,293 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.CommitFailedException; +import io.delta.storage.commit.CoordinatedCommitsUtils; +import io.delta.storage.commit.GetCommitsResponse; +import io.delta.storage.commit.uccommitcoordinator.UCRestClientPayload.CommitInfo; +import io.delta.storage.commit.uccommitcoordinator.UCRestClientPayload.CommitRequest; +import io.delta.storage.commit.uccommitcoordinator.UCRestClientPayload.GetCommitsRequest; +import io.delta.storage.commit.uccommitcoordinator.UCRestClientPayload.GetMetastoreSummaryResponse; +import io.delta.storage.commit.uccommitcoordinator.UCRestClientPayload.RestGetCommitsResponse; +import io.delta.storage.commit.uccommitcoordinator.UCRestClientPayload.Metadata; +import io.delta.storage.commit.uccommitcoordinator.UCRestClientPayload.Protocol; +import io.delta.storage.commit.actions.AbstractMetadata; +import io.delta.storage.commit.actions.AbstractProtocol; +import org.apache.hadoop.fs.Path; +import org.apache.http.client.methods.*; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.entity.ContentType; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.http.message.BasicHeader; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * A REST client implementation of [[UCClient]] for interacting with Unity Catalog's commit + * coordination service. This client uses token-based authentication to make HTTP requests to the + * Unity Catalog API for managing Delta table commits and metadata. + * + *

The client handles the following primary operations: + *

    + *
  • Retrieving metastore information
  • + *
  • Committing changes to Delta tables
  • + *
  • Fetching unbackfilled commit histories
  • + *
+ * + *

All requests are authenticated using a Bearer token and communicate using JSON payloads. + * The client automatically handles JSON serialization/deserialization and HTTP header management. + * + *

Usage example: + *

{@code
+ * try (UCTokenBasedRestClient client = new UCTokenBasedRestClient(baseUri, token)) {
+ *     String metastoreId = client.getMetastoreId();
+ *     // Perform operations with the client...
+ * }
+ * }
+ * + * @see UCClient + * @see Commit + * @see GetCommitsResponse + */ +public class UCTokenBasedRestClient implements UCClient { + private final String baseUri; + private final ObjectMapper mapper; + private final CloseableHttpClient httpClient; + + public UCTokenBasedRestClient(String baseUri, String token) { + this.baseUri = resolve(baseUri, "/api/2.1/unity-catalog"); + this.mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setSerializationInclusion(JsonInclude.Include.NON_ABSENT) + .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + this.httpClient = + HttpClientBuilder + .create() + .setDefaultHeaders(Arrays.asList( + // Authorization header: Provides the Bearer token for authentication + new BasicHeader(HttpHeaders.AUTHORIZATION, "Bearer " + token), + // Accept header: Indicates that the client expects JSON responses from the server + new BasicHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.getMimeType()), + // Content-Type header: Indicates that the client sends JSON payloads to the server + new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()))) + .build(); + } + + private static String resolve(String baseUri, String child) { + // Ensures baseUri doesn't end with '/' + if (baseUri.endsWith("/")) { + baseUri = baseUri.substring(0, baseUri.length() - 1); + } + // Ensures child starts with '/' + if (!child.startsWith("/")) { + child = "/" + child; + } + return baseUri + child; + } + + private String toJson(T object) throws JsonProcessingException { + return mapper.writeValueAsString(object); + } + + private T fromJson(String json, Class clazz) throws JsonProcessingException { + return mapper.readValue(json, clazz); + } + + private static class HttpError extends Throwable { + final int statusCode; + final String responseBody; + + HttpError(int statusCode, String responseBody) { + super("HTTP Error " + statusCode + ": " + responseBody); + this.statusCode = statusCode; + this.responseBody = responseBody; + } + } + + private T executeHttpRequest( + HttpUriRequest request, + Object payload, + Class responseClass) throws IOException, HttpError { + + if (payload != null && request instanceof HttpEntityEnclosingRequestBase) { + String jsonPayload = toJson(payload); + ((HttpEntityEnclosingRequestBase) request).setEntity( + new StringEntity(jsonPayload, ContentType.APPLICATION_JSON)); + } + + try (CloseableHttpResponse response = httpClient.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + String responseBody = EntityUtils.toString(response.getEntity()); + + if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_CREATED) { + return fromJson(responseBody, responseClass); + } else { + throw new HttpError(statusCode, responseBody); + } + } catch (JsonProcessingException e) { + throw new IOException("Failed to parse response", e); + } + } + + @Override + public String getMetastoreId() throws IOException { + URI uri = URI.create(resolve(baseUri, "/metastore_summary")); + HttpGet request = new HttpGet(uri); + + try { + GetMetastoreSummaryResponse response = + executeHttpRequest(request, null, GetMetastoreSummaryResponse.class); + return response.metastoreId; + } catch (HttpError e) { + throw new IOException("Failed to get metastore ID (HTTP " + e.statusCode + "): " + + e.responseBody); + } + } + + @Override + public void commit( + String tableId, + URI tableUri, + Optional commit, + Optional lastKnownBackfilledVersion, + boolean disown, + Optional newMetadata, + Optional newProtocol + ) throws IOException, CommitFailedException, UCCommitCoordinatorException { + // Validate required parameters + Objects.requireNonNull(tableId, "tableId must not be null."); + Objects.requireNonNull(tableUri, "tableUri must not be null."); + + // Create commit request payload + CommitRequest commitRequest = new CommitRequest(); + commitRequest.tableId = tableId; + commitRequest.tableUri = tableUri.toString(); + commit.ifPresent(c -> commitRequest.commitInfo = CommitInfo.fromCommit(c, disown)); + lastKnownBackfilledVersion.ifPresent(version -> + commitRequest.latestBackfilledVersion = version); + newMetadata.ifPresent(m -> commitRequest.metadata = Metadata.fromAbstractMetadata(m)); + newProtocol.ifPresent(p -> commitRequest.protocol = Protocol.fromAbstractProtocol(p)); + + URI uri = URI.create(resolve(baseUri, "/delta/commits")); + HttpPost request = new HttpPost(uri); + + try { + executeHttpRequest(request, commitRequest, Void.class); + } catch (HttpError e) { + switch (e.statusCode) { + case HttpStatus.SC_BAD_REQUEST: + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + "Invalid commit parameters: " + e.responseBody, + e); + case HttpStatus.SC_NOT_FOUND: + throw new InvalidTargetTableException("Invalid Target Table: " + e.responseBody); + case HttpStatus.SC_CONFLICT: + throw new CommitFailedException( + true /* retryable */, + true /* conflict */, + "Commit conflict: " + e.responseBody, + e); + case 429: + throw new CommitLimitReachedException("Backfilled commits limit reached: " + + e.responseBody); + default: + throw new CommitFailedException( + true /* retryable */, + false /* conflict */, + "Unexpected commit failure (HTTP " + e.statusCode + "): " + e.responseBody, + e); + } + } + } + + @Override + public GetCommitsResponse getCommits( + String tableId, + URI tableUri, + Optional startVersion, + Optional endVersion) throws IOException, UCCommitCoordinatorException { + // Validate required parameters + Objects.requireNonNull(tableId, "tableId must not be null."); + Objects.requireNonNull(tableUri, "tableUri must not be null."); + + GetCommitsRequest getCommitsRequest = new GetCommitsRequest(); + getCommitsRequest.tableId = tableId; + getCommitsRequest.tableUri = tableUri.toString(); + getCommitsRequest.startVersion = startVersion.orElse(0L); + endVersion.ifPresent(v -> getCommitsRequest.endVersion = v); + + // Create a custom HttpGet that allows body + URI uri = URI.create(resolve(baseUri, "/delta/commits")); + HttpEntityEnclosingRequestBase httpRequest = new HttpEntityEnclosingRequestBase() { + @Override + public String getMethod() { + return "GET"; + } + }; + httpRequest.setURI(uri); + + try { + RestGetCommitsResponse restGetCommitsResponse = + executeHttpRequest(httpRequest, getCommitsRequest, RestGetCommitsResponse.class); + Path basePath = CoordinatedCommitsUtils.commitDirPath(CoordinatedCommitsUtils.logDirPath( + new Path(tableUri))); + List commits = new ArrayList<>(); + if (restGetCommitsResponse.commits != null) { + for (CommitInfo commitInfo : restGetCommitsResponse.commits) { + commits.add(CommitInfo.toCommit(commitInfo, basePath)); + } + } + return new GetCommitsResponse(commits, restGetCommitsResponse.latestTableVersion); + } catch (HttpError e) { + // Handle response based on status code + switch (e.statusCode) { + case HttpStatus.SC_NOT_FOUND: + throw new InvalidTargetTableException("Invalid Target Table due to: " + e.responseBody); + default: + throw new IOException("Unexpected getCommits failure (HTTP " + e.statusCode + + "): due to: " + e.responseBody); + } + } + } + + @Override + public void close() throws IOException { + httpClient.close(); + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UpgradeNotAllowedException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UpgradeNotAllowedException.java new file mode 100644 index 00000000000..48def51fe32 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UpgradeNotAllowedException.java @@ -0,0 +1,28 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +/** + * This exception is thrown by the UC client in case the client attempted an upgrade + * of a table to a UC managed table but the upgrade is not allowed because the previous + * commit was a downgrade. + */ +public class UpgradeNotAllowedException extends UCCommitCoordinatorException { + public UpgradeNotAllowedException(String message) { + super(message); + } +}