Skip to content

Commit

Permalink
Protocol version downgrade support in Delta Tables
Browse files Browse the repository at this point in the history
Currently, we support Protocol downgrade in the form of feature removal but downgrading protocol versions is not possible. This PR adds support for protocol version downgrade. This is only allowed for tables that support either reader+writer table features or writer table features. The protocol downgrade takes place when the user removes a table feature and there are no non-legacy table features left in the table. The protocol is downgraded to the minimum reader/writer versions required to support all enabled legacy features.

For example, `Protocol(3, 7, readerFeatures=(DeletionVectors), writerFeatures=(DeletionVectors, ChangeDataFeed)` is downgraded to `Protocol(1, 4)` after removing the DeletionVectors table feature.

Closes #2061

GitOrigin-RevId: 76633f6a08ae747ea508ef84e4e4f62a7ad5609d
  • Loading branch information
andreaschat-db authored and vkorukanti committed Sep 25, 2023
1 parent 87f80ce commit f0a3864
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaOperations.Operation
import com.fasterxml.jackson.annotation.JsonIgnore

import org.apache.spark.sql.SparkSession

/**
* Trait to be mixed into the [[Protocol]] case class to enable Table Features.
*
Expand Down Expand Up @@ -179,6 +181,13 @@ trait TableFeatureSupport { this: Protocol =>
@JsonIgnore
lazy val readerAndWriterFeatureNames: Set[String] = readerFeatureNames ++ writerFeatureNames

/**
* Same as above but returns a sequence of [[TableFeature]] instead of a set of feature names.
*/
@JsonIgnore
lazy val readerAndWriterFeatures: Seq[TableFeature] =
readerAndWriterFeatureNames.toSeq.flatMap(TableFeature.featureNameToFeature)

/**
* Get all features that are implicitly supported by this protocol, for example, `Protocol(1,2)`
* implicitly supports `appendOnly` and `invariants`. When this protocol is capable of requiring
Expand Down Expand Up @@ -235,18 +244,36 @@ trait TableFeatureSupport { this: Protocol =>
/**
* Determine whether this protocol can be safely downgraded to a new protocol `to`. This
* includes the following:
* - Protocol version cannot be downgraded.
* - The `to` protocol needs to support at least writer features.
* - The current protocol needs to support at least writer features. This is because protocol
* downgrade is only supported with table features.
* - The protocol version can only be downgraded when there are no non-legacy table features.
* - We can only remove one feature at a time.
* - When downgrading protocol versions, the resulting versions must support exactly the same
* set of legacy features supported by the current protocol.
*
* Note, this not an exhaustive list of downgrade rules. Rather, we check the most important
* downgrade invariants. We also perform checks during feature removal at
* [[AlterTableDropFeatureDeltaCommand]].
*/
def canDowngradeTo(to: Protocol): Boolean = {
if (!to.supportsWriterFeatures) return false
def canDowngradeTo(to: Protocol, droppedFeatureName: String): Boolean = {
if (!supportsWriterFeatures) return false

// When `to` protocol does not have any features version downgrades are possible. However,
// the current protocol needs to contain one non-legacy feature. We also allow downgrade when
// there are only legacy features. This is to accommodate the case when the user attempts to
// remove a legacy feature in a table that only contains legacy features.
if (to.readerAndWriterFeatureNames.isEmpty) {
val featureNames = readerAndWriterFeatureNames - droppedFeatureName
val sameLegacyFeaturesSupported = featureNames == to.implicitlySupportedFeatures.map(_.name)
val minRequiredVersions = TableFeatureProtocolUtils.minimumRequiredVersions(
featureNames.flatMap(TableFeature.featureNameToFeature).toSeq)

return sameLegacyFeaturesSupported &&
(to.minReaderVersion, to.minWriterVersion) == minRequiredVersions &&
readerAndWriterFeatures.filterNot(_.isLegacyFeature).size <= 1
}

// We only support feature removal not protocol version downgrade.
// When `to` protocol contains table features we cannot downgrade the protocol version.
if (to.minReaderVersion != this.minReaderVersion) return false
if (to.minWriterVersion != this.minWriterVersion) return false

Expand All @@ -258,10 +285,9 @@ trait TableFeatureSupport { this: Protocol =>
* True if this protocol can be upgraded or downgraded to the 'to' protocol.
*/
def canTransitionTo(to: Protocol, op: Operation): Boolean = {
if (op.isInstanceOf[DeltaOperations.DropTableFeature]) {
canDowngradeTo(to)
} else {
canUpgradeTo(to)
op match {
case drop: DeltaOperations.DropTableFeature => canDowngradeTo(to, drop.featureName)
case _ => canUpgradeTo(to)
}
}

Expand Down Expand Up @@ -314,20 +340,51 @@ trait TableFeatureSupport { this: Protocol =>
/**
* Remove feature wrapper for removing either Reader/Writer or Writer features. We assume
* the feature exists in the protocol. There is a relevant validation at
* [[AlterTableDropFeatureDeltaCommand]].
* [[AlterTableDropFeatureDeltaCommand]]. We also require targetFeature is removable.
*
* Assumes targetFeature is removable.
* When the feature to remove is the last explicit table feature of the table we also remove the
* TableFeatures feature and downgrade the protocol.
*/
def removeFeature(targetFeature: TableFeature): Protocol = {
require(targetFeature.isRemovable)
targetFeature match {
val newProtocol = targetFeature match {
case f@(_: ReaderWriterFeature | _: LegacyReaderWriterFeature) =>
removeReaderWriterFeature(f)
case f@(_: WriterFeature | _: LegacyWriterFeature) =>
removeWriterFeature(f)
case f =>
throw DeltaErrors.dropTableFeatureNonRemovableFeature(f.name)
}
newProtocol.downgradeProtocolVersionsIfNeeded
}

/**
* If the current protocol does not contain any non-legacy table features and the remaining
* set of legacy table features exactly matches a legacy protocol version, it downgrades the
* protocol to the minimum reader/writer versions required to support the protocol's legacy
* features.
*
* Note, when a table is initialized with table features (3, 7), by default there are no legacy
* features. After we remove the last native feature we downgrade the protocol to (1, 1).
*/
def downgradeProtocolVersionsIfNeeded: Protocol = {
if (!readerAndWriterFeatures.forall(_.isLegacyFeature)) return this

val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
val newProtocol = Protocol(minReaderVersion, minWriterVersion)

require(
!newProtocol.supportsReaderFeatures && !newProtocol.supportsWriterFeatures,
s"Downgraded protocol should not support table features, but got $newProtocol.")

// Ensure the legacy protocol supports features exactly as the current protocol.
if (this.implicitlyAndExplicitlySupportedFeatures ==
newProtocol.implicitlyAndExplicitlySupportedFeatures) {
newProtocol
} else {
this
}
}

/**
Expand Down Expand Up @@ -435,4 +492,10 @@ object TableFeatureProtocolUtils {
key == DeltaConfigs.CREATE_TABLE_IGNORE_PROTOCOL_DEFAULTS.key ||
key.startsWith(TableFeatureProtocolUtils.FEATURE_PROP_PREFIX)
}

/**
* Returns the minimum reader/writer versions required to support all provided features.
*/
def minimumRequiredVersions(features: Seq[TableFeature]): (Int, Int) =
((features.map(_.minReaderVersion) :+ 1).max, (features.map(_.minWriterVersion) :+ 1).max)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2081,7 +2081,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

// Writer feature is removed from the writer features set.
val snapshot = deltaLog.update()
assert(snapshot.protocol === emptyProtocolWithReaderFeatures)
assert(snapshot.protocol === Protocol(1, 1))
assert(!snapshot.metadata.configuration.contains(featurePropertyKey))
assertPropertiesAndShowTblProperties(deltaLog)
}
Expand Down Expand Up @@ -2173,7 +2173,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

// Reader+writer feature is removed from the features set.
val snapshot = deltaLog.update()
assert(snapshot.protocol === emptyProtocolWithReaderFeatures)
assert(snapshot.protocol === Protocol(1, 1))
assert(!snapshot.metadata.configuration.contains(featurePropertyKey))
assertPropertiesAndShowTblProperties(deltaLog)
} else {
Expand Down Expand Up @@ -2248,11 +2248,13 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
val deltaLog = DeltaLog.forTable(spark, dir)
sql(s"""CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta
|TBLPROPERTIES (
|delta.feature.${TestWriterFeature.name} = 'supported',
|delta.feature.${TestRemovableWriterFeature.name} = 'supported'
|)""".stripMargin)

val protocol = deltaLog.update().protocol
assert(protocol === protocolWithWriterFeature(TestRemovableWriterFeature))
assert(protocol === protocolWithFeatures(
writerFeatures = Seq(TestWriterFeature, TestRemovableWriterFeature)))

withSQLConf(DeltaSQLConf.TABLE_FEATURE_DROP_ENABLED.key -> true.toString) {
val command = AlterTableDropFeatureDeltaCommand(
Expand All @@ -2265,7 +2267,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
minReaderVersion = 1,
minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION,
readerFeatures = None,
writerFeatures = Some(Set.empty)))
writerFeatures = Some(Set(TestWriterFeature.name))))
}
}
}
Expand Down Expand Up @@ -2405,7 +2407,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
TestRemovableWriterFeature.name)
command.run(spark)
}
assert(deltaLog.update().protocol === emptyProtocolWithWriterFeatures)
assert(deltaLog.update().protocol === Protocol(1, 1))

sql(s"""ALTER TABLE delta.`${dir.getCanonicalPath}` SET TBLPROPERTIES (
|delta.feature.${TestRemovableWriterFeature.name} = 'supported'
Expand Down Expand Up @@ -2604,7 +2606,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

// Reader+writer feature is removed from the features set.
val snapshot = deltaLog.update()
assert(snapshot.protocol === emptyProtocolWithReaderFeatures)
assert(snapshot.protocol === Protocol(1, 1))
assert(!snapshot.metadata.configuration
.contains(TestRemovableReaderWriterFeature.TABLE_PROP_KEY))
assertPropertiesAndShowTblProperties(deltaLog)
Expand Down Expand Up @@ -2752,7 +2754,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
sql(s"ALTER TABLE $table DROP FEATURE $featureName")
}

assert(deltaLog.update().protocol === emptyProtocolWithWriterFeatures)
assert(deltaLog.update().protocol === Protocol(1, 1))
}
// Test that the write downgrade command was invoked.
val expectedOpType = "delta.test.TestWriterFeaturePreDowngradeCommand"
Expand All @@ -2764,6 +2766,182 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
}
}

protected def testProtocolVersionDowngrade(
initialMinReaderVersion: Int,
initialMinWriterVersion: Int,
featuresToAdd: Seq[TableFeature],
featuresToRemove: Seq[TableFeature],
expectedDowngradedProtocol: Protocol): Unit = {
withTempDir { dir =>
val deltaLog = DeltaLog.forTable(spark, dir)

spark.sql(s"""CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta
|TBLPROPERTIES (
|delta.minReaderVersion = $initialMinReaderVersion,
|delta.minWriterVersion = $initialMinWriterVersion
|)""".stripMargin)

// Upgrade protocol to table features.
val newTBLProperties = featuresToAdd
.map(f => s"delta.feature.${f.name}='supported'")
.reduce(_ + ", " + _)
spark.sql(
s"""ALTER TABLE delta.`${dir.getPath}`
|SET TBLPROPERTIES (
|$newTBLProperties
|)""".stripMargin)

withSQLConf(DeltaSQLConf.TABLE_FEATURE_DROP_ENABLED.key -> true.toString) {
for (feature <- featuresToRemove) {
AlterTableDropFeatureDeltaCommand(DeltaTableV2(spark, deltaLog.dataPath), feature.name)
.run(spark)
}
}

assert(deltaLog.update().protocol === expectedDowngradedProtocol)
}
}

test("Downgrade protocol version (1, 4)") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 4,
featuresToAdd = Seq(TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 4))
}

// Initial minReader version is (2, 4), however, there are no legacy features that require
// reader version 2. Therefore, the protocol version is downgraded to (1, 4).
test("Downgrade protocol version (2, 4)") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 2,
initialMinWriterVersion = 4,
featuresToAdd = Seq(TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 4))
}

// Version (2, 5) enables column mapping which is a reader+writer feature and requires (2, 5).
// Therefore, to downgrade from table features we need at least (2, 5).
test("Downgrade protocol version (2, 5)") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 2,
initialMinWriterVersion = 5,
featuresToAdd = Seq(TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(2, 5))
}


test("Downgrade protocol version (1, 1)") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 1,
featuresToAdd = Seq(TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 1))
}

test("Downgrade protocol version on table created with table features") {
// When the table is initialized with table features there are no active (implicit) legacy
// features. After removing the last table feature we downgrade back to (1, 1).
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 1))
}

test("Downgrade protocol version on table created with writer features") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 1))
}

test("Protocol version downgrade on a table with table features and added legacy feature") {
// Added legacy feature should be removed and the protocol should be downgraded to (2, 5).
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd =
Seq(TestRemovableWriterFeature) ++ Protocol(2, 5).implicitlySupportedFeatures,
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(2, 5))

// Added legacy feature should not be removed and the protocol should stay on (1, 7).
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableWriterFeature, TestRemovableLegacyWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 7)
.withFeature(TestRemovableLegacyWriterFeature))

// Legacy feature was manually removed. Protocol should be downgraded to (1, 1).
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableWriterFeature, TestRemovableLegacyWriterFeature),
featuresToRemove = Seq(TestRemovableLegacyWriterFeature, TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 1))

// Start with writer table features and add a legacy reader+writer feature.
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableWriterFeature, ColumnMappingTableFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(3, 7).withFeature(ColumnMappingTableFeature))

// Remove reader+writer legacy feature as well.
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableLegacyReaderWriterFeature, TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableLegacyReaderWriterFeature, TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 1))
}

test("Protocol version is not downgraded when writer features exist") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableWriterFeature, DomainMetadataTableFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = protocolWithWriterFeature(DomainMetadataTableFeature))
}

test("Protocol version is not downgraded when reader+writer features exist") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableReaderWriterFeature, DeletionVectorsTableFeature),
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
expectedDowngradedProtocol = protocolWithReaderFeature(DeletionVectorsTableFeature))
}

test("Protocol version is not downgraded when both reader+writer and writer features exist") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableReaderWriterFeature, TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
expectedDowngradedProtocol =
Protocol(3, 7, Some(Set.empty), Some(Set(TestRemovableWriterFeature.name))))

testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableReaderWriterFeature, TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = protocolWithReaderFeature(TestRemovableReaderWriterFeature))
}

private def assertPropertiesAndShowTblProperties(
deltaLog: DeltaLog,
tableHasFeatures: Boolean = false): Unit = {
Expand Down
Loading

0 comments on commit f0a3864

Please sign in to comment.