From 8efdbce5231f3b5ef61deb827c41b0a8c50aa84a Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 18 May 2022 20:58:20 -0700 Subject: [PATCH] KAFKA-13837; Return an error from Fetch if follower is not a valid replica (#12150) When a partition leader receives a `Fetch` request from a replica which is not in the current replica set, the behavior today is to return a successful fetch response, but with empty data. This causes the follower to retry until metadata converges without updating any state on the leader side. It is clearer in this case to return an error, so that the metadata inconsistency is visible in logging and so that the follower backs off before retrying. In this patch, we use `UNKNOWN_LEADER_EPOCH` when the `Fetch` request includes the current leader epoch. The way we see this is that the leader is validating the (replicaId, leaderEpoch) tuple. When the leader returns `UNKNOWN_LEADER_EPOCH`, it means that the leader does not expect the given leaderEpoch from that replica. If the request does not include a leader epoch, then we use `NOT_LEADER_OR_FOLLOWER`. We can take a similar interpretation for this case: the leader is rejecting the request because it does not think it should be the leader for that replica. But mainly these errors ensure that the follower will retry the request. As a part of this patch, I have refactored the way that the leader updates follower fetch state. Previously, the process is a little convoluted. We send the fetch from `ReplicaManager` down to `Partition.readRecords`, then we iterate over the results and call `Partition.updateFollowerFetchState`. It is more straightforward to update state directly as a part of `readRecords`. All we need to do is pass through the `FetchParams`. This also prevents an unnecessary copy of the read results. Reviewers: David Jacot --- .../main/scala/kafka/cluster/Partition.scala | 234 +++++++--- .../src/main/scala/kafka/log/UnifiedLog.scala | 2 +- .../scala/kafka/server/DelayedFetch.scala | 17 +- .../scala/kafka/server/FetchDataInfo.scala | 13 +- .../scala/kafka/server/ReplicaManager.scala | 116 ++--- .../kafka/server/DelayedFetchTest.scala | 28 +- .../kafka/cluster/AbstractPartitionTest.scala | 5 +- .../kafka/cluster/PartitionLockTest.scala | 90 +++- .../unit/kafka/cluster/PartitionTest.scala | 435 +++++++++++------- ...FetchRequestDownConversionConfigTest.scala | 145 ++++-- .../server/ReplicaManagerQuotasTest.scala | 68 +-- .../kafka/server/ReplicaManagerTest.scala | 32 +- .../scala/unit/kafka/utils/TestUtils.scala | 74 ++- .../UpdateFollowerFetchStateBenchmark.java | 13 +- 14 files changed, 760 insertions(+), 512 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9864480e78b2b..61d5f707dcaf0 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -440,8 +440,10 @@ class Partition(val topicPartition: TopicPartition, leaderReplicaIdOpt.filter(_ == localBrokerId) } - private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer], - requireLeader: Boolean): UnifiedLog = { + private def localLogWithEpochOrThrow( + currentLeaderEpoch: Optional[Integer], + requireLeader: Boolean + ): UnifiedLog = { getLocalLog(currentLeaderEpoch, requireLeader) match { case Left(localLog) => localLog case Right(error) => @@ -719,55 +721,51 @@ class Partition(val topicPartition: TopicPartition, * Update the follower's state in the leader based on the last fetch request. See * [[Replica.updateFetchState()]] for details. * - * @return true if the follower's fetch state was updated, false if the followerId is not recognized + * This method is visible for performance testing (see `UpdateFollowerFetchStateBenchmark`) */ - def updateFollowerFetchState(followerId: Int, - followerFetchOffsetMetadata: LogOffsetMetadata, - followerStartOffset: Long, - followerFetchTimeMs: Long, - leaderEndOffset: Long): Boolean = { - getReplica(followerId) match { - case Some(followerReplica) => - // No need to calculate low watermark if there is no delayed DeleteRecordsRequest - val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L - val prevFollowerEndOffset = followerReplica.stateSnapshot.logEndOffset - followerReplica.updateFetchState( - followerFetchOffsetMetadata, - followerStartOffset, - followerFetchTimeMs, - leaderEndOffset) - - val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L - // check if the LW of the partition has incremented - // since the replica's logStartOffset may have incremented - val leaderLWIncremented = newLeaderLW > oldLeaderLW - - // Check if this in-sync replica needs to be added to the ISR. - maybeExpandIsr(followerReplica) - - // check if the HW of the partition can now be incremented - // since the replica may already be in the ISR and its LEO has just incremented - val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.stateSnapshot.logEndOffset) { - // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of - // leaderIsrUpdateLock to prevent adding new hw to invalid log. - inReadLock(leaderIsrUpdateLock) { - leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs)) - } - } else { - false - } - - // some delayed operations may be unblocked after HW or LW changed - if (leaderLWIncremented || leaderHWIncremented) - tryCompleteDelayedRequests() + def updateFollowerFetchState( + replica: Replica, + followerFetchOffsetMetadata: LogOffsetMetadata, + followerStartOffset: Long, + followerFetchTimeMs: Long, + leaderEndOffset: Long + ): Unit = { + // No need to calculate low watermark if there is no delayed DeleteRecordsRequest + val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L + val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset + replica.updateFetchState( + followerFetchOffsetMetadata, + followerStartOffset, + followerFetchTimeMs, + leaderEndOffset + ) + + val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L + // check if the LW of the partition has incremented + // since the replica's logStartOffset may have incremented + val leaderLWIncremented = newLeaderLW > oldLeaderLW + + // Check if this in-sync replica needs to be added to the ISR. + maybeExpandIsr(replica) + + // check if the HW of the partition can now be incremented + // since the replica may already be in the ISR and its LEO has just incremented + val leaderHWIncremented = if (prevFollowerEndOffset != replica.stateSnapshot.logEndOffset) { + // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of + // leaderIsrUpdateLock to prevent adding new hw to invalid log. + inReadLock(leaderIsrUpdateLock) { + leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs)) + } + } else { + false + } - debug(s"Recorded replica $followerId log end offset (LEO) position " + - s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.") - true + // some delayed operations may be unblocked after HW or LW changed + if (leaderLWIncremented || leaderHWIncremented) + tryCompleteDelayedRequests() - case None => - false - } + debug(s"Recorded replica ${replica.brokerId} log end offset (LEO) position " + + s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.") } /** @@ -1145,15 +1143,112 @@ class Partition(val topicPartition: TopicPartition, info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same) } - def readRecords(lastFetchedEpoch: Optional[Integer], - fetchOffset: Long, - currentLeaderEpoch: Optional[Integer], - maxBytes: Int, - fetchIsolation: FetchIsolation, - fetchOnlyFromLeader: Boolean, - minOneMessage: Boolean): LogReadInfo = inReadLock(leaderIsrUpdateLock) { - // decide whether to only fetch from leader - val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) + /** + * Fetch records from the partition. + * + * @param fetchParams parameters of the corresponding `Fetch` request + * @param fetchPartitionData partition-level parameters of the `Fetch` (e.g. the fetch offset) + * @param fetchTimeMs current time in milliseconds on the broker of this fetch request + * @param maxBytes the maximum bytes to return + * @param minOneMessage whether to ensure that at least one complete message is returned + * @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches) + * @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present + * @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]] + * is enabled, or if this is a follower fetch with an older request version + * and the replicaId is not recognized among the current valid replicas + * @throws FencedLeaderEpochException if the leader epoch in the `Fetch` request is lower than the current + * leader epoch + * @throws UnknownLeaderEpochException if the leader epoch in the `Fetch` request is higher than the current + * leader epoch, or if this is a follower fetch and the replicaId is not + * recognized among the current valid replicas + * @throws OffsetOutOfRangeException if the fetch offset is smaller than the log start offset or larger than + * the log end offset (or high watermark depending on [[FetchParams.isolation]]), + * or if the end offset for the last fetched epoch in [[FetchRequest.PartitionData]] + * cannot be determined from the local epoch cache (e.g. if it is larger than + * any cached epoch value) + */ + def fetchRecords( + fetchParams: FetchParams, + fetchPartitionData: FetchRequest.PartitionData, + fetchTimeMs: Long, + maxBytes: Int, + minOneMessage: Boolean, + updateFetchState: Boolean + ): LogReadInfo = { + def readFromLocalLog(): LogReadInfo = { + readRecords( + fetchPartitionData.lastFetchedEpoch, + fetchPartitionData.fetchOffset, + fetchPartitionData.currentLeaderEpoch, + maxBytes, + fetchParams.isolation, + minOneMessage, + fetchParams.fetchOnlyLeader + ) + } + + if (fetchParams.isFromFollower) { + // Check that the request is from a valid replica before doing the read + val replica = followerReplicaOrThrow(fetchParams.replicaId, fetchPartitionData) + val logReadInfo = readFromLocalLog() + + if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) { + updateFollowerFetchState( + replica, + followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata, + followerStartOffset = fetchPartitionData.logStartOffset, + followerFetchTimeMs = fetchTimeMs, + leaderEndOffset = logReadInfo.logEndOffset + ) + } + + logReadInfo + } else { + readFromLocalLog() + } + } + + private def followerReplicaOrThrow( + replicaId: Int, + fetchPartitionData: FetchRequest.PartitionData + ): Replica = { + getReplica(replicaId).getOrElse { + debug(s"Leader $localBrokerId failed to record follower $replicaId's position " + + s"${fetchPartitionData.fetchOffset}, and last sent high watermark since the replica is " + + s"not recognized to be one of the assigned replicas ${assignmentState.replicas.mkString(",")} " + + s"for leader epoch $leaderEpoch with partition epoch $partitionEpoch") + + val error = if (fetchPartitionData.currentLeaderEpoch.isPresent) { + // The leader epoch is present in the request and matches the local epoch, but + // the replica is not in the replica set. This case is possible in KRaft, + // for example, when new replicas are added as part of a reassignment. + // We return UNKNOWN_LEADER_EPOCH to signify that the tuple (replicaId, leaderEpoch) + // is not yet recognized as valid, which causes the follower to retry. + Errors.UNKNOWN_LEADER_EPOCH + } else { + // The request has no leader epoch, which means it is an older version. We cannot + // say if the follower's state is stale or the local state is. In this case, we + // return `NOT_LEADER_OR_FOLLOWER` for lack of a better error so that the follower + // will retry. + Errors.NOT_LEADER_OR_FOLLOWER + } + + throw error.exception(s"Replica $replicaId is not recognized as a " + + s"valid replica of $topicPartition in leader epoch $leaderEpoch with " + + s"partition epoch $partitionEpoch") + } + } + + private def readRecords( + lastFetchedEpoch: Optional[Integer], + fetchOffset: Long, + currentLeaderEpoch: Optional[Integer], + maxBytes: Int, + fetchIsolation: FetchIsolation, + minOneMessage: Boolean, + fetchOnlyFromLeader: Boolean + ): LogReadInfo = inReadLock(leaderIsrUpdateLock) { + val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader) // Note we use the log end offset prior to the read. This ensures that any appends following // the fetch do not prevent a follower from coming into sync. @@ -1181,18 +1276,12 @@ class Partition(val topicPartition: TopicPartition, } if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) { - val emptyFetchData = FetchDataInfo( - fetchOffsetMetadata = LogOffsetMetadata(fetchOffset), - records = MemoryRecords.EMPTY, - abortedTransactions = None - ) - val divergingEpoch = new FetchResponseData.EpochEndOffset() .setEpoch(epochEndOffset.leaderEpoch) .setEndOffset(epochEndOffset.endOffset) return LogReadInfo( - fetchedData = emptyFetchData, + fetchedData = FetchDataInfo.empty(fetchOffset), divergingEpoch = Some(divergingEpoch), highWatermark = initialHighWatermark, logStartOffset = initialLogStartOffset, @@ -1201,14 +1290,21 @@ class Partition(val topicPartition: TopicPartition, } } - val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage) + val fetchedData = localLog.read( + fetchOffset, + maxBytes, + fetchIsolation, + minOneMessage + ) + LogReadInfo( fetchedData = fetchedData, divergingEpoch = None, highWatermark = initialHighWatermark, logStartOffset = initialLogStartOffset, logEndOffset = initialLogEndOffset, - lastStableOffset = initialLastStableOffset) + lastStableOffset = initialLastStableOffset + ) } def fetchOffsetForTimestamp(timestamp: Long, @@ -1216,7 +1312,7 @@ class Partition(val topicPartition: TopicPartition, currentLeaderEpoch: Optional[Integer], fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) { // decide whether to only fetch from leader - val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) + val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader) val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset @@ -1277,7 +1373,7 @@ class Partition(val topicPartition: TopicPartition, def fetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer], fetchOnlyFromLeader: Boolean): LogOffsetSnapshot = inReadLock(leaderIsrUpdateLock) { // decide whether to only fetch from leader - val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) + val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader) localLog.fetchOffsetSnapshot } @@ -1285,7 +1381,7 @@ class Partition(val topicPartition: TopicPartition, maxNumOffsets: Int, isFromConsumer: Boolean, fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) { - val localLog = localLogWithEpochOrException(Optional.empty(), fetchOnlyFromLeader) + val localLog = localLogWithEpochOrThrow(Optional.empty(), fetchOnlyFromLeader) val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets) if (!isFromConsumer) { diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 99524385fb047..ddd66eb160fcb 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -147,7 +147,7 @@ case class LogOffsetSnapshot(logStartOffset: Long, lastStableOffset: LogOffsetMetadata) /** - * Another container which is used for lower level reads using [[kafka.cluster.Partition.readRecords()]]. + * Another container which is used for lower level reads using [[kafka.cluster.Partition.fetchRecords()]]. */ case class LogReadInfo(fetchedData: FetchDataInfo, divergingEpoch: Option[FetchResponseData.EpochEndOffset], diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 3eb8eedf4c2f4..55a15682b64e0 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -158,15 +158,16 @@ class DelayedFetch( * Upon completion, read whatever data is available and pass to the complete callback */ override def onComplete(): Unit = { + val fetchInfos = fetchPartitionStatus.map { case (tp, status) => + tp -> status.fetchInfo + } + val logReadResults = replicaManager.readFromLocalLog( - replicaId = params.replicaId, - fetchOnlyFromLeader = params.fetchOnlyLeader, - fetchIsolation = params.isolation, - fetchMaxBytes = params.maxBytes, - hardMaxBytesLimit = params.hardMaxBytesLimit, - readPartitionInfo = fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, - clientMetadata = params.clientMetadata, - quota = quota) + params, + fetchInfos, + quota, + readFromPurgatory = true + ) val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = params.isFromFollower && diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala index 82e8092c10d1f..95b68c0839576 100644 --- a/core/src/main/scala/kafka/server/FetchDataInfo.scala +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -20,7 +20,7 @@ package kafka.server import kafka.api.Request import org.apache.kafka.common.IsolationLevel import org.apache.kafka.common.message.FetchResponseData -import org.apache.kafka.common.record.Records +import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.FetchRequest @@ -75,6 +75,17 @@ case class FetchParams( } } +object FetchDataInfo { + def empty(fetchOffset: Long): FetchDataInfo = { + FetchDataInfo( + fetchOffsetMetadata = LogOffsetMetadata(fetchOffset), + records = MemoryRecords.EMPTY, + firstEntryIncomplete = false, + abortedTransactions = None + ) + } +} + case class FetchDataInfo( fetchOffsetMetadata: LogOffsetMetadata, records: Records, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e84abbe5f4359..190f80f36c4b7 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -994,29 +994,14 @@ class ReplicaManager(val config: KafkaConfig, quota: ReplicaQuota, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit ): Unit = { - // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata) - def readFromLog(): Seq[(TopicIdPartition, LogReadResult)] = { - val result = readFromLocalLog( - replicaId = params.replicaId, - fetchOnlyFromLeader = params.fetchOnlyLeader, - fetchIsolation = params.isolation, - fetchMaxBytes = params.maxBytes, - hardMaxBytesLimit = params.hardMaxBytesLimit, - readPartitionInfo = fetchInfos, - quota = quota, - clientMetadata = params.clientMetadata) - if (params.isFromFollower) updateFollowerFetchState(params.replicaId, result) - else result - } - - val logReadResults = readFromLog() - // check if this fetch request can be satisfied right away + val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false) var bytesReadable: Long = 0 var errorReadingData = false var hasDivergingEpoch = false var hasPreferredReadReplica = false val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult] + logReadResults.foreach { case (topicIdPartition, logReadResult) => brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() @@ -1073,14 +1058,12 @@ class ReplicaManager(val config: KafkaConfig, /** * Read from multiple topic partitions at the given offset up to maxSize bytes */ - def readFromLocalLog(replicaId: Int, - fetchOnlyFromLeader: Boolean, - fetchIsolation: FetchIsolation, - fetchMaxBytes: Int, - hardMaxBytesLimit: Boolean, - readPartitionInfo: Seq[(TopicIdPartition, PartitionData)], - quota: ReplicaQuota, - clientMetadata: Option[ClientMetadata]): Seq[(TopicIdPartition, LogReadResult)] = { + def readFromLocalLog( + params: FetchParams, + readPartitionInfo: Seq[(TopicIdPartition, PartitionData)], + quota: ReplicaQuota, + readFromPurgatory: Boolean + ): Seq[(TopicIdPartition, LogReadResult)] = { val traceEnabled = isTraceEnabled def read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { @@ -1104,13 +1087,13 @@ class ReplicaManager(val config: KafkaConfig, throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.") // If we are the leader, determine the preferred read-replica - val preferredReadReplica = clientMetadata.flatMap( - metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs)) + val preferredReadReplica = params.clientMetadata.flatMap( + metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs)) if (preferredReadReplica.isDefined) { replicaSelectorOpt.foreach { selector => debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " + - s"${preferredReadReplica.get} for $clientMetadata") + s"${preferredReadReplica.get} for ${params.clientMetadata}") } // If a preferred read-replica is set, skip the read val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false) @@ -1126,20 +1109,19 @@ class ReplicaManager(val config: KafkaConfig, exception = None) } else { // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition - val readInfo: LogReadInfo = partition.readRecords( - lastFetchedEpoch = fetchInfo.lastFetchedEpoch, - fetchOffset = fetchInfo.fetchOffset, - currentLeaderEpoch = fetchInfo.currentLeaderEpoch, + val readInfo: LogReadInfo = partition.fetchRecords( + fetchParams = params, + fetchPartitionData = fetchInfo, + fetchTimeMs = fetchTimeMs, maxBytes = adjustedMaxBytes, - fetchIsolation = fetchIsolation, - fetchOnlyFromLeader = fetchOnlyFromLeader, - minOneMessage = minOneMessage) - val isFromFollower = Request.isValidBrokerId(replicaId) + minOneMessage = minOneMessage, + updateFetchState = !readFromPurgatory + ) - val fetchDataInfo = if (isFromFollower && shouldLeaderThrottle(quota, partition, replicaId)) { + val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) { // If the partition is being throttled, simply return an empty set. FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) - } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { + } else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don't need to report a `RecordTooLargeException` FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) @@ -1156,7 +1138,8 @@ class ReplicaManager(val config: KafkaConfig, fetchTimeMs = fetchTimeMs, lastStableOffset = Some(readInfo.lastStableOffset), preferredReadReplica = preferredReadReplica, - exception = None) + exception = None + ) } } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it @@ -1182,7 +1165,7 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() - val fetchSource = Request.describeReplicaId(replicaId) + val fetchSource = Request.describeReplicaId(params.replicaId) error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " + s"on partition $tp: $fetchInfo", e) @@ -1194,13 +1177,14 @@ class ReplicaManager(val config: KafkaConfig, followerLogStartOffset = UnifiedLog.UnknownOffset, fetchTimeMs = -1L, lastStableOffset = None, - exception = Some(e)) + exception = Some(e) + ) } } - var limitBytes = fetchMaxBytes + var limitBytes = params.maxBytes val result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)] - var minOneMessage = !hardMaxBytesLimit + var minOneMessage = !params.hardMaxBytesLimit readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) val recordBatchSize = readResult.info.records.sizeInBytes @@ -1802,52 +1786,6 @@ class ReplicaManager(val config: KafkaConfig, } } - /** - * Update the follower's fetch state on the leader based on the last fetch request and update `readResult`. - * If the follower replica is not recognized to be one of the assigned replicas, do not update - * `readResult` so that log start/end offset and high watermark is consistent with - * records in fetch response. Log start/end offset and high watermark may change not only due to - * this fetch request, e.g., rolling new log segment and removing old log segment may move log - * start offset further than the last offset in the fetched records. The followers will get the - * updated leader's state in the next fetch response. If follower has a diverging epoch or if read - * fails with any error, follower fetch state is not updated. - */ - private def updateFollowerFetchState(followerId: Int, - readResults: Seq[(TopicIdPartition, LogReadResult)]): Seq[(TopicIdPartition, LogReadResult)] = { - readResults.map { case (topicIdPartition, readResult) => - val updatedReadResult = if (readResult.error != Errors.NONE) { - debug(s"Skipping update of fetch state for follower $followerId since the " + - s"log read returned error ${readResult.error}") - readResult - } else if (readResult.divergingEpoch.nonEmpty) { - debug(s"Skipping update of fetch state for follower $followerId since the " + - s"log read returned diverging epoch ${readResult.divergingEpoch}") - readResult - } else { - onlinePartition(topicIdPartition.topicPartition) match { - case Some(partition) => - if (partition.updateFollowerFetchState(followerId, - followerFetchOffsetMetadata = readResult.info.fetchOffsetMetadata, - followerStartOffset = readResult.followerLogStartOffset, - followerFetchTimeMs = readResult.fetchTimeMs, - leaderEndOffset = readResult.leaderLogEndOffset)) { - readResult - } else { - warn(s"Leader $localBrokerId failed to record follower $followerId's position " + - s"${readResult.info.fetchOffsetMetadata.messageOffset}, and last sent HW since the replica " + - s"is not recognized to be one of the assigned replicas ${partition.assignmentState.replicas.mkString(",")} " + - s"for partition $topicIdPartition. Empty records will be returned for this partition.") - readResult.withEmptyFetchInfo - } - case None => - warn(s"While recording the replica LEO, the partition $topicIdPartition hasn't been created.") - readResult - } - } - topicIdPartition -> updatedReadResult - } - } - private def leaderPartitionsIterator: Iterator[Partition] = onlinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined) diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 940968f411b12..dce5a2eaee75f 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -73,7 +73,7 @@ class DelayedFetchTest { .thenThrow(new FencedLeaderEpochException("Requested epoch has been fenced")) when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) - expectReadFromReplica(replicaId, topicIdPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH) + expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH) assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) @@ -111,7 +111,7 @@ class DelayedFetchTest { when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenThrow(new NotLeaderOrFollowerException(s"Replica for $topicIdPartition not available")) - expectReadFromReplica(replicaId, topicIdPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER) + expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER) when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) assertTrue(delayedFetch.tryComplete()) @@ -160,7 +160,7 @@ class DelayedFetchTest { .setLeaderEpoch(lastFetchedEpoch.get) .setEndOffset(fetchOffset - 1)) when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) - expectReadFromReplica(replicaId, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) @@ -182,20 +182,18 @@ class DelayedFetchTest { ) } - private def expectReadFromReplica(replicaId: Int, - topicIdPartition: TopicIdPartition, - fetchPartitionData: FetchRequest.PartitionData, - error: Errors): Unit = { + private def expectReadFromReplica( + fetchParams: FetchParams, + topicIdPartition: TopicIdPartition, + fetchPartitionData: FetchRequest.PartitionData, + error: Errors + ): Unit = { when(replicaManager.readFromLocalLog( - replicaId = replicaId, - fetchOnlyFromLeader = true, - fetchIsolation = FetchLogEnd, - fetchMaxBytes = maxBytes, - hardMaxBytesLimit = false, + fetchParams, readPartitionInfo = Seq((topicIdPartition, fetchPartitionData)), - clientMetadata = None, - quota = replicaQuota)) - .thenReturn(Seq((topicIdPartition, buildReadResult(error)))) + quota = replicaQuota, + readFromPurgatory = true + )).thenReturn(Seq((topicIdPartition, buildReadResult(error)))) } private def buildReadResult(error: Errors): LogReadResult = { diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index 969f8a2e793a1..147743a77d097 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -43,6 +43,7 @@ object AbstractPartitionTest { class AbstractPartitionTest { val brokerId = AbstractPartitionTest.brokerId + val remoteReplicaId = brokerId + 1 val topicPartition = new TopicPartition("test-topic", 0) val time = new MockTime() var tmpDir: File = _ @@ -115,7 +116,7 @@ class AbstractPartitionTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val controllerEpoch = 0 - val replicas = List[Integer](brokerId, brokerId + 1).asJava + val replicas = List[Integer](brokerId, remoteReplicaId).asJava val isr = replicas if (isLeader) { @@ -131,7 +132,7 @@ class AbstractPartitionTest { } else { assertTrue(partition.makeFollower(new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) - .setLeader(brokerId + 1) + .setLeader(remoteReplicaId) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 39a2edb504eee..1cf66a9b4c5d2 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -17,7 +17,7 @@ package kafka.cluster -import java.util.Properties +import java.util.{Optional, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean @@ -29,7 +29,9 @@ import kafka.server.epoch.LeaderEpochFileCache import kafka.server.metadata.MockConfigRepository import kafka.utils._ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState +import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion @@ -61,7 +63,6 @@ class PartitionLockTest extends Logging { val executorService = Executors.newFixedThreadPool(numReplicaFetchers + numProducers + 1) val appendSemaphore = new Semaphore(0) val shrinkIsrSemaphore = new Semaphore(0) - val followerQueues = (0 until numReplicaFetchers).map(_ => new ArrayBlockingQueue[MemoryRecords](2)) var logManager: LogManager = _ var partition: Partition = _ @@ -181,14 +182,16 @@ class PartitionLockTest extends Logging { * Then release the permit for the final append and verify that all appends and follower updates complete. */ private def concurrentProduceFetchWithReadLockOnly(): Unit = { + val leaderEpoch = partition.getLeaderEpoch + val appendFutures = scheduleAppends() - val stateUpdateFutures = scheduleUpdateFollowers(numProducers * numRecordsPerProducer - 1) + val stateUpdateFutures = scheduleFollowerFetches(leaderEpoch, numRecords = numProducers * numRecordsPerProducer - 1) appendSemaphore.release(numProducers * numRecordsPerProducer - 1) stateUpdateFutures.foreach(_.get(15, TimeUnit.SECONDS)) appendSemaphore.release(1) - scheduleUpdateFollowers(1).foreach(_.get(15, TimeUnit.SECONDS)) // just to make sure follower state update still works + scheduleFollowerFetches(leaderEpoch, numRecords = 1).foreach(_.get(15, TimeUnit.SECONDS)) // just to make sure follower state update still works appendFutures.foreach(_.get(15, TimeUnit.SECONDS)) } @@ -199,9 +202,10 @@ class PartitionLockTest extends Logging { * permits for all appends to complete before verifying state updates. */ private def concurrentProduceFetchWithWriteLock(): Unit = { + val leaderEpoch = partition.getLeaderEpoch val appendFutures = scheduleAppends() - val stateUpdateFutures = scheduleUpdateFollowers(numProducers * numRecordsPerProducer) + val stateUpdateFutures = scheduleFollowerFetches(leaderEpoch, numRecords = numProducers * numRecordsPerProducer) assertFalse(stateUpdateFutures.exists(_.isDone)) appendSemaphore.release(numProducers * numRecordsPerProducer) @@ -216,7 +220,7 @@ class PartitionLockTest extends Logging { (0 until numProducers).map { _ => executorService.submit((() => { try { - append(partition, numRecordsPerProducer, followerQueues) + append(partition, numRecordsPerProducer) } catch { case e: Throwable => error("Exception during append", e) @@ -226,11 +230,11 @@ class PartitionLockTest extends Logging { } } - private def scheduleUpdateFollowers(numRecords: Int): Seq[Future[_]] = { + private def scheduleFollowerFetches(leaderEpoch: Int, numRecords: Int): Seq[Future[_]] = { (1 to numReplicaFetchers).map { index => executorService.submit((() => { try { - updateFollowerFetchState(partition, index, numRecords, followerQueues(index - 1)) + fetchFollower(partition, index, leaderEpoch, numRecords) } catch { case e: Throwable => error("Exception during updateFollowerFetchState", e) @@ -352,30 +356,68 @@ class PartitionLockTest extends Logging { logProps } - private def append(partition: Partition, numRecords: Int, followerQueues: Seq[ArrayBlockingQueue[MemoryRecords]]): Unit = { + private def append( + partition: Partition, + numRecords: Int + ): Unit = { val requestLocal = RequestLocal.withThreadConfinedCaching (0 until numRecords).foreach { _ => val batch = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes))) partition.appendRecordsToLeader(batch, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal) - followerQueues.foreach(_.put(batch)) } } - private def updateFollowerFetchState(partition: Partition, followerId: Int, numRecords: Int, followerQueue: ArrayBlockingQueue[MemoryRecords]): Unit = { - (1 to numRecords).foreach { i => - val batch = followerQueue.poll(15, TimeUnit.SECONDS) - if (batch == null) - throw new RuntimeException(s"Timed out waiting for next batch $i") - val batches = batch.batches.iterator.asScala.toList - assertEquals(1, batches.size) - val recordBatch = batches.head - partition.updateFollowerFetchState( - followerId, - followerFetchOffsetMetadata = LogOffsetMetadata(recordBatch.lastOffset + 1), - followerStartOffset = 0L, - followerFetchTimeMs = mockTime.milliseconds(), - leaderEndOffset = partition.localLogOrException.logEndOffset) + private def fetchFollower( + partition: Partition, + followerId: Int, + leaderEpoch: Int, + numRecords: Int + ): Unit = { + val logStartOffset = 0L + var fetchOffset = 0L + var lastFetchedEpoch = Optional.empty[Integer] + val maxBytes = 1 + + while (fetchOffset < numRecords) { + val fetchParams = FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, + replicaId = followerId, + maxWaitMs = 0, + minBytes = 1, + maxBytes = maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + + val fetchPartitionData = new FetchRequest.PartitionData( + Uuid.ZERO_UUID, + fetchOffset, + logStartOffset, + maxBytes, + Optional.of(Int.box(leaderEpoch)), + lastFetchedEpoch + ) + + val logReadInfo = partition.fetchRecords( + fetchParams, + fetchPartitionData, + mockTime.milliseconds(), + maxBytes, + minOneMessage = true, + updateFetchState = true + ) + + assertTrue(logReadInfo.divergingEpoch.isEmpty) + + val batches = logReadInfo.fetchedData.records.batches.asScala + if (batches.nonEmpty) { + assertEquals(1, batches.size) + + val batch = batches.head + lastFetchedEpoch = Optional.of(Int.box(batch.partitionLeaderEpoch)) + fetchOffset = batch.lastOffset + 1 + } } } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 7d45617fec94a..04d2b15c6034b 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -16,6 +16,7 @@ */ package kafka.cluster +import java.net.InetAddress import com.yammer.metrics.core.Metric import kafka.common.UnexpectedAppendOffsetException import kafka.log.{Defaults => _, _} @@ -24,13 +25,13 @@ import kafka.server.checkpoints.OffsetCheckpoints import kafka.server.epoch.EpochEntry import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.errors.{ApiException, InconsistentTopicIdException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException} +import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException} import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.ListOffsetsRequest +import org.apache.kafka.common.requests.{FetchRequest, ListOffsetsRequest} import org.apache.kafka.common.utils.SystemTime import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState @@ -45,13 +46,56 @@ import java.nio.ByteBuffer import java.util.Optional import java.util.concurrent.{CountDownLatch, Semaphore} import kafka.server.epoch.LeaderEpochFileCache +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.replica.ClientMetadata +import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.KafkaYammerMetrics +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +object PartitionTest { + def followerFetchParams( + replicaId: Int, + maxWaitMs: Long = 0L, + minBytes: Int = 1, + maxBytes: Int = Int.MaxValue + ): FetchParams = { + FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, + replicaId = replicaId, + maxWaitMs = maxWaitMs, + minBytes = minBytes, + maxBytes = maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + } + + def consumerFetchParams( + maxWaitMs: Long = 0L, + minBytes: Int = 1, + maxBytes: Int = Int.MaxValue, + clientMetadata: Option[ClientMetadata] = None, + isolation: FetchIsolation = FetchHighWatermark + ): FetchParams = { + FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, + replicaId = FetchRequest.CONSUMER_REPLICA_ID, + maxWaitMs = maxWaitMs, + minBytes = minBytes, + maxBytes = maxBytes, + isolation = isolation, + clientMetadata = clientMetadata + ) + } +} + class PartitionTest extends AbstractPartitionTest { + import PartitionTest._ @Test def testLastFetchedOffsetValidation(): Unit = { @@ -74,6 +118,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(17L, log.logEndOffset) val leaderEpoch = 10 + val logStartOffset = 0L val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true) def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = { @@ -83,14 +128,13 @@ class PartitionTest extends AbstractPartitionTest { } def read(lastFetchedEpoch: Int, fetchOffset: Long): LogReadInfo = { - partition.readRecords( - Optional.of(lastFetchedEpoch), + fetchFollower( + partition, + remoteReplicaId, fetchOffset, - currentLeaderEpoch = Optional.of(leaderEpoch), - maxBytes = Int.MaxValue, - fetchIsolation = FetchLogEnd, - fetchOnlyFromLeader = true, - minOneMessage = true + logStartOffset, + leaderEpoch = Some(leaderEpoch), + lastFetchedEpoch = Some(lastFetchedEpoch) ) } @@ -192,6 +236,84 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(None, partition.futureLog) } + @Test + def testFetchFromUnrecognizedFollower(): Unit = { + val controllerEpoch = 3 + val leader = brokerId + val validReplica = brokerId + 1 + val addingReplica1 = brokerId + 2 + val addingReplica2 = brokerId + 3 + val replicas = List(leader, validReplica) + val isr = List[Integer](leader, validReplica).asJava + val leaderEpoch = 8 + val partitionEpoch = 1 + + assertTrue(partition.makeLeader(new LeaderAndIsrPartitionState() + .setControllerEpoch(controllerEpoch) + .setLeader(leader) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(partitionEpoch) + .setReplicas(replicas.map(Int.box).asJava) + .setIsNew(true), + offsetCheckpoints, None + )) + + assertThrows(classOf[UnknownLeaderEpochException], () => { + fetchFollower( + partition, + replicaId = addingReplica1, + fetchOffset = 0L, + leaderEpoch = Some(leaderEpoch) + ) + }) + assertEquals(None, partition.getReplica(addingReplica1).map(_.stateSnapshot.logEndOffset)) + + assertThrows(classOf[NotLeaderOrFollowerException], () => { + fetchFollower( + partition, + replicaId = addingReplica2, + fetchOffset = 0L, + leaderEpoch = None + ) + }) + assertEquals(None, partition.getReplica(addingReplica2).map(_.stateSnapshot.logEndOffset)) + + // The replicas are added as part of a reassignment + val newReplicas = List(leader, validReplica, addingReplica1, addingReplica2) + val newPartitionEpoch = partitionEpoch + 1 + val addingReplicas = List(addingReplica1, addingReplica2) + + assertFalse(partition.makeLeader(new LeaderAndIsrPartitionState() + .setControllerEpoch(controllerEpoch) + .setLeader(leader) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(newPartitionEpoch) + .setReplicas(newReplicas.map(Int.box).asJava) + .setAddingReplicas(addingReplicas.map(Int.box).asJava) + .setIsNew(true), + offsetCheckpoints, None + )) + + // Now the fetches are allowed + assertEquals(0L, fetchFollower( + partition, + replicaId = addingReplica1, + fetchOffset = 0L, + leaderEpoch = Some(leaderEpoch) + ).logEndOffset) + assertEquals(Some(0L), partition.getReplica(addingReplica1).map(_.stateSnapshot.logEndOffset)) + + assertEquals(0L, fetchFollower( + partition, + replicaId = addingReplica2, + fetchOffset = 0L, + leaderEpoch = None + ).logEndOffset) + assertEquals(Some(0L), partition.getReplica(addingReplica2).map(_.stateSnapshot.logEndOffset)) + } + // Verify that partition.makeFollower() and partition.appendRecordsToFollowerOrFutureReplica() can run concurrently @Test def testMakeFollowerWithWithFollowerAppendRecords(): Unit = { @@ -405,69 +527,59 @@ class PartitionTest extends AbstractPartitionTest { } @Test - def testReadRecordEpochValidationForLeader(): Unit = { + def testLeaderEpochValidationOnLeader(): Unit = { val leaderEpoch = 5 val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) - def assertReadRecordsError(error: Errors, - currentLeaderEpochOpt: Optional[Integer]): Unit = { - try { - partition.readRecords( - lastFetchedEpoch = Optional.empty(), - fetchOffset = 0L, - currentLeaderEpoch = currentLeaderEpochOpt, - maxBytes = 1024, - fetchIsolation = FetchLogEnd, - fetchOnlyFromLeader = true, - minOneMessage = false) - if (error != Errors.NONE) - fail(s"Expected readRecords to fail with error $error") - } catch { - case e: Exception => - assertEquals(error, Errors.forException(e)) - } + def sendFetch(leaderEpoch: Option[Int]): LogReadInfo = { + fetchFollower( + partition, + remoteReplicaId, + fetchOffset = 0L, + leaderEpoch = leaderEpoch + ) } - assertReadRecordsError(Errors.NONE, Optional.empty()) - assertReadRecordsError(Errors.NONE, Optional.of(leaderEpoch)) - assertReadRecordsError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1)) - assertReadRecordsError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1)) + assertEquals(0L, sendFetch(leaderEpoch = None).logEndOffset) + assertEquals(0L, sendFetch(leaderEpoch = Some(leaderEpoch)).logEndOffset) + assertThrows(classOf[FencedLeaderEpochException], () => sendFetch(Some(leaderEpoch - 1))) + assertThrows(classOf[UnknownLeaderEpochException], () => sendFetch(Some(leaderEpoch + 1))) } @Test - def testReadRecordEpochValidationForFollower(): Unit = { + def testLeaderEpochValidationOnFollower(): Unit = { val leaderEpoch = 5 val partition = setupPartitionWithMocks(leaderEpoch, isLeader = false) - def assertReadRecordsError(error: Errors, - currentLeaderEpochOpt: Optional[Integer], - fetchOnlyLeader: Boolean): Unit = { - try { - partition.readRecords( - lastFetchedEpoch = Optional.empty(), - fetchOffset = 0L, - currentLeaderEpoch = currentLeaderEpochOpt, - maxBytes = 1024, - fetchIsolation = FetchLogEnd, - fetchOnlyFromLeader = fetchOnlyLeader, - minOneMessage = false) - if (error != Errors.NONE) - fail(s"Expected readRecords to fail with error $error") - } catch { - case e: Exception => - assertEquals(error, Errors.forException(e)) - } + def sendFetch( + leaderEpoch: Option[Int], + clientMetadata: Option[ClientMetadata] + ): LogReadInfo = { + fetchConsumer( + partition, + fetchOffset = 0L, + leaderEpoch = leaderEpoch, + clientMetadata = clientMetadata + ) } - assertReadRecordsError(Errors.NONE, Optional.empty(), fetchOnlyLeader = false) - assertReadRecordsError(Errors.NONE, Optional.of(leaderEpoch), fetchOnlyLeader = false) - assertReadRecordsError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false) - assertReadRecordsError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false) - - assertReadRecordsError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true) - assertReadRecordsError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true) - assertReadRecordsError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true) - assertReadRecordsError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true) + // Follower fetching is only allowed when the client provides metadata + assertThrows(classOf[NotLeaderOrFollowerException], () => sendFetch(None, None)) + assertThrows(classOf[NotLeaderOrFollowerException], () => sendFetch(Some(leaderEpoch), None)) + assertThrows(classOf[FencedLeaderEpochException], () => sendFetch(Some(leaderEpoch - 1), None)) + assertThrows(classOf[UnknownLeaderEpochException], () => sendFetch(Some(leaderEpoch + 1), None)) + + val clientMetadata = new DefaultClientMetadata( + "rack", + "clientId", + InetAddress.getLoopbackAddress, + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value + ) + assertEquals(0L, sendFetch(leaderEpoch = None, Some(clientMetadata)).logEndOffset) + assertEquals(0L, sendFetch(leaderEpoch = Some(leaderEpoch), Some(clientMetadata)).logEndOffset) + assertThrows(classOf[FencedLeaderEpochException], () => sendFetch(Some(leaderEpoch - 1), Some(clientMetadata))) + assertThrows(classOf[UnknownLeaderEpochException], () => sendFetch(Some(leaderEpoch + 1), Some(clientMetadata))) } @Test @@ -588,16 +700,6 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(partition.localLogOrException.logStartOffset, partition.localLogOrException.highWatermark, "Expected leader's HW not move") - // let the follower in ISR move leader's HW to move further but below LEO - def updateFollowerFetchState(followerId: Int, fetchOffsetMetadata: LogOffsetMetadata): Unit = { - partition.updateFollowerFetchState( - followerId, - followerFetchOffsetMetadata = fetchOffsetMetadata, - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = partition.localLogOrException.logEndOffset) - } - def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = { try { Right(partition.fetchOffsetForTimestamp( @@ -611,11 +713,12 @@ class PartitionTest extends AbstractPartitionTest { } } - updateFollowerFetchState(follower1, LogOffsetMetadata(0)) - updateFollowerFetchState(follower1, LogOffsetMetadata(2)) + // let the follower in ISR move leader's HW to move further but below LEO + fetchFollower(partition, replicaId = follower1, fetchOffset = 0L) + fetchFollower(partition, replicaId = follower1, fetchOffset = 2L) - updateFollowerFetchState(follower2, LogOffsetMetadata(0)) - updateFollowerFetchState(follower2, LogOffsetMetadata(2)) + fetchFollower(partition, replicaId = follower2, fetchOffset = 0L) + fetchFollower(partition, replicaId = follower2, fetchOffset = 2L) // Simulate successful ISR update alterPartitionManager.completeIsrUpdate(2) @@ -704,8 +807,8 @@ class PartitionTest extends AbstractPartitionTest { } // Next fetch from replicas, HW is moved up to 5 (ahead of the LEO) - updateFollowerFetchState(follower1, LogOffsetMetadata(5)) - updateFollowerFetchState(follower2, LogOffsetMetadata(5)) + fetchFollower(partition, replicaId = follower1, fetchOffset = 5L) + fetchFollower(partition, replicaId = follower2, fetchOffset = 5L) // Simulate successful ISR update alterPartitionManager.completeIsrUpdate(6) @@ -919,17 +1022,8 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(partition.localLogOrException.logStartOffset, partition.log.get.highWatermark, "Expected leader's HW not move") // let the follower in ISR move leader's HW to move further but below LEO - def updateFollowerFetchState(followerId: Int, fetchOffsetMetadata: LogOffsetMetadata): Unit = { - partition.updateFollowerFetchState( - followerId, - followerFetchOffsetMetadata = fetchOffsetMetadata, - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = partition.localLogOrException.logEndOffset) - } - - updateFollowerFetchState(follower2, LogOffsetMetadata(0)) - updateFollowerFetchState(follower2, LogOffsetMetadata(lastOffsetOfFirstBatch)) + fetchFollower(partition, replicaId = follower2, fetchOffset = 0) + fetchFollower(partition, replicaId = follower2, fetchOffset = lastOffsetOfFirstBatch) assertEquals(lastOffsetOfFirstBatch, partition.log.get.highWatermark, "Expected leader's HW") // current leader becomes follower and then leader again (without any new records appended) @@ -959,13 +1053,13 @@ class PartitionTest extends AbstractPartitionTest { partition.appendRecordsToLeader(batch3, origin = AppendOrigin.Client, requiredAcks = 0, requestLocal) // fetch from follower not in ISR from log start offset should not add this follower to ISR - updateFollowerFetchState(follower1, LogOffsetMetadata(0)) - updateFollowerFetchState(follower1, LogOffsetMetadata(lastOffsetOfFirstBatch)) + fetchFollower(partition, replicaId = follower1, fetchOffset = 0) + fetchFollower(partition, replicaId = follower1, fetchOffset = lastOffsetOfFirstBatch) assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") // fetch from the follower not in ISR from start offset of the current leader epoch should // add this follower to ISR - updateFollowerFetchState(follower1, LogOffsetMetadata(currentLeaderEpochStartOffset)) + fetchFollower(partition, replicaId = follower1, fetchOffset = currentLeaderEpochStartOffset) // Expansion does not affect the ISR assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") @@ -1057,12 +1151,7 @@ class PartitionTest extends AbstractPartitionTest { time.sleep(500) - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(3), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) - + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = initializeTimeMs, logStartOffset = 0L, @@ -1071,12 +1160,7 @@ class PartitionTest extends AbstractPartitionTest { time.sleep(500) - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(6L), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) - + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 6L) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = time.milliseconds(), logStartOffset = 0L, @@ -1114,11 +1198,7 @@ class PartitionTest extends AbstractPartitionTest { logEndOffset = UnifiedLog.UnknownOffset ) - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10L) + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L) // Check that the isr didn't change and alter update is scheduled assertEquals(Set(brokerId), partition.inSyncReplicaIds) @@ -1169,12 +1249,7 @@ class PartitionTest extends AbstractPartitionTest { logEndOffset = UnifiedLog.UnknownOffset ) - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(3), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) - + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L) assertEquals(Set(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = 0L, @@ -1182,12 +1257,7 @@ class PartitionTest extends AbstractPartitionTest { logEndOffset = 3L ) - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 6L) - + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L) assertEquals(alterPartitionManager.isrUpdates.size, 1) val isrItem = alterPartitionManager.isrUpdates.head assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId)) @@ -1238,11 +1308,7 @@ class PartitionTest extends AbstractPartitionTest { logEndOffset = UnifiedLog.UnknownOffset ) - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10L) + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L) // Follower state is updated, but the ISR has not expanded assertEquals(Set(brokerId), partition.inSyncReplicaIds) @@ -1465,11 +1531,7 @@ class PartitionTest extends AbstractPartitionTest { // There is a short delay before the first fetch. The follower is not yet caught up to the log end. time.sleep(5000) val firstFetchTimeMs = time.milliseconds() - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(5), - followerStartOffset = 0L, - followerFetchTimeMs = firstFetchTimeMs, - leaderEndOffset = 10L) + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 5L, fetchTimeMs = firstFetchTimeMs) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = initializeTimeMs, logStartOffset = 0L, @@ -1481,11 +1543,7 @@ class PartitionTest extends AbstractPartitionTest { // The total elapsed time from initialization is larger than the max allowed replica lag. time.sleep(5001) seedLogData(log, numRecords = 5, leaderEpoch = leaderEpoch) - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 15L) + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L, fetchTimeMs = time.milliseconds()) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = firstFetchTimeMs, logStartOffset = 0L, @@ -1530,11 +1588,7 @@ class PartitionTest extends AbstractPartitionTest { ) // The follower catches up to the log end immediately. - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10L) + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = time.milliseconds(), logStartOffset = 0L, @@ -1658,11 +1712,7 @@ class PartitionTest extends AbstractPartitionTest { // This will attempt to expand the ISR val firstFetchTimeMs = time.milliseconds() - partition.updateFollowerFetchState(remoteBrokerId, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10L) + fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L, fetchTimeMs = firstFetchTimeMs) // Follower state is updated, but the ISR has not expanded assertEquals(Set(brokerId), partition.inSyncReplicaIds) @@ -1706,13 +1756,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(0L, partition.localLogOrException.highWatermark) // Expand ISR - partition.updateFollowerFetchState( - followerId = follower3, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10 - ) + fetchFollower(partition, replicaId = follower3, fetchOffset = 10L) assertEquals(Set(brokerId, follower1, follower2), partition.partitionState.isr) assertEquals(Set(brokerId, follower1, follower2, follower3), partition.partitionState.maximalIsr) @@ -1776,13 +1820,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(0L, partition.localLogOrException.highWatermark) // Expand ISR - partition.updateFollowerFetchState( - followerId = follower3, - followerFetchOffsetMetadata = LogOffsetMetadata(10), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = 10 - ) + fetchFollower(partition, replicaId = follower3, fetchOffset = 10L) // Try avoiding a race TestUtils.waitUntilTrue(() => !partition.partitionState.isInflight, "Expected ISR state to be committed", 100) @@ -2170,14 +2208,7 @@ class PartitionTest extends AbstractPartitionTest { ) // Follower fetches and updates its replica state. - partition.updateFollowerFetchState( - followerId = followerId, - followerFetchOffsetMetadata = LogOffsetMetadata(0L), - followerStartOffset = 0L, - followerFetchTimeMs = time.milliseconds(), - leaderEndOffset = partition.localLogOrException.logEndOffset - ) - + fetchFollower(partition, replicaId = followerId, fetchOffset = 0L) assertReplicaState(partition, followerId, lastCaughtUpTimeMs = time.milliseconds(), logStartOffset = 0L, @@ -2473,4 +2504,76 @@ class PartitionTest extends AbstractPartitionTest { fail(s"Replica $replicaId not found.") } } + + private def fetchConsumer( + partition: Partition, + fetchOffset: Long, + leaderEpoch: Option[Int], + clientMetadata: Option[ClientMetadata], + maxBytes: Int = Int.MaxValue, + lastFetchedEpoch: Option[Int] = None, + fetchTimeMs: Long = time.milliseconds(), + topicId: Uuid = Uuid.ZERO_UUID, + isolation: FetchIsolation = FetchHighWatermark + ): LogReadInfo = { + val fetchParams = consumerFetchParams( + maxBytes = maxBytes, + clientMetadata = clientMetadata, + isolation = isolation + ) + + val fetchPartitionData = new FetchRequest.PartitionData( + topicId, + fetchOffset, + FetchRequest.INVALID_LOG_START_OFFSET, + maxBytes, + leaderEpoch.map(Int.box).asJava, + lastFetchedEpoch.map(Int.box).asJava + ) + + partition.fetchRecords( + fetchParams, + fetchPartitionData, + fetchTimeMs, + maxBytes, + minOneMessage = true, + updateFetchState = false + ) + } + + private def fetchFollower( + partition: Partition, + replicaId: Int, + fetchOffset: Long, + logStartOffset: Long = 0L, + maxBytes: Int = Int.MaxValue, + leaderEpoch: Option[Int] = None, + lastFetchedEpoch: Option[Int] = None, + fetchTimeMs: Long = time.milliseconds(), + topicId: Uuid = Uuid.ZERO_UUID + ): LogReadInfo = { + val fetchParams = followerFetchParams( + replicaId, + maxBytes = maxBytes + ) + + val fetchPartitionData = new FetchRequest.PartitionData( + topicId, + fetchOffset, + logStartOffset, + maxBytes, + leaderEpoch.map(Int.box).asJava, + lastFetchedEpoch.map(Int.box).asJava + ) + + partition.fetchRecords( + fetchParams, + fetchPartitionData, + fetchTimeMs, + maxBytes, + minOneMessage = true, + updateFetchState = true + ) + } + } diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala index 6efa37b11728a..0cf7c1d8e2ee4 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala @@ -18,21 +18,25 @@ package kafka.server import java.util import java.util.{Optional, Properties} + import kafka.log.LogConfig -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.serialization.StringSerializer import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ class FetchRequestDownConversionConfigTest extends BaseRequestTest { private var producer: KafkaProducer[String, String] = null - override def brokerCount: Int = 1 + override def brokerCount: Int = 2 @BeforeEach override def setUp(testInfo: TestInfo): Unit = { @@ -64,8 +68,12 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 1.toString) configs.foreach { case (k, v) => topicConfig.setProperty(k, v) } topics.flatMap { topic => - val partitionToLeader = createTopic(topic, numPartitions = numPartitions, replicationFactor = 1, - topicConfig = topicConfig) + val partitionToLeader = createTopic( + topic, + numPartitions = numPartitions, + replicationFactor = 2, + topicConfig = topicConfig + ) partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader } }.toMap } @@ -140,56 +148,101 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { * Tests that "message.downconversion.enable" can be set at topic level, and its configuration is obeyed for client * fetch requests. */ - @Test - def testV1FetchWithTopicLevelOverrides(): Unit = { - // create topics with default down-conversion configuration (i.e. conversion disabled) - val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0) - val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq - - // create topics with down-conversion configuration enabled - val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true") - val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5) - val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq - - val allTopics = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions - val leaderId = conversionDisabledTopicsMap.head._2 - val topicIds = servers.head.kafkaController.controllerContext.topicIds - val topicNames = topicIds.map(_.swap) - - allTopics.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get()) - val fetchRequest = FetchRequest.Builder.forConsumer(1, Int.MaxValue, 0, createPartitionMap(1024, - allTopics, topicIds.toMap)).build(1) - val fetchResponse = sendFetchRequest(leaderId, fetchRequest) - - val fetchResponseData = fetchResponse.responseData(topicNames.asJava, 1) - conversionDisabledTopicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, Errors.forCode(fetchResponseData.get(tp).errorCode))) - conversionEnabledTopicPartitions.foreach(tp => assertEquals(Errors.NONE, Errors.forCode(fetchResponseData.get(tp).errorCode))) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testV1FetchFromConsumer(quorum: String): Unit = { + testV1Fetch(isFollowerFetch = false) } /** * Tests that "message.downconversion.enable" has no effect on fetch requests from replicas. */ - @Test - def testV1FetchFromReplica(): Unit = { - // create topics with default down-conversion configuration (i.e. conversion disabled) - val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0) - val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testV1FetchFromReplica(quorum: String): Unit = { + testV1Fetch(isFollowerFetch = true) + } - // create topics with down-conversion configuration enabled - val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true") - val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5) - val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq + def testV1Fetch(isFollowerFetch: Boolean): Unit = { + val topicWithDownConversionEnabled = "foo" + val topicWithDownConversionDisabled = "bar" + val replicaIds = brokers.map(_.config.brokerId) + val leaderId = replicaIds.head + val followerId = replicaIds.last - val allTopicPartitions = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions - val topicIds = servers.head.kafkaController.controllerContext.topicIds - val topicNames = topicIds.map(_.swap) - val leaderId = conversionDisabledTopicsMap.head._2 + val admin = createAdminClient() + + val topicWithDownConversionDisabledId = TestUtils.createTopicWithAdminRaw( + admin, + topicWithDownConversionDisabled, + replicaAssignment = Map(0 -> replicaIds) + ) + + val topicConfig = new Properties + topicConfig.put(LogConfig.MessageDownConversionEnableProp, "true") + val topicWithDownConversionEnabledId = TestUtils.createTopicWithAdminRaw( + admin, + topicWithDownConversionEnabled, + replicaAssignment = Map(0 -> replicaIds), + topicConfig = topicConfig + ) + + val partitionWithDownConversionEnabled = new TopicPartition(topicWithDownConversionEnabled, 0) + val partitionWithDownConversionDisabled = new TopicPartition(topicWithDownConversionDisabled, 0) + + val allTopicPartitions = Seq( + partitionWithDownConversionEnabled, + partitionWithDownConversionDisabled + ) + + allTopicPartitions.foreach { tp => + producer.send(new ProducerRecord(tp.topic, "key", "value")).get() + } + + val topicIdMap = Map( + topicWithDownConversionEnabled -> topicWithDownConversionEnabledId, + topicWithDownConversionDisabled -> topicWithDownConversionDisabledId + ) + + val fetchResponseData = sendFetch( + leaderId, + allTopicPartitions, + topicIdMap, + fetchVersion = 1, + replicaIdOpt = if (isFollowerFetch) Some(followerId) else None + ) + + def error(tp: TopicPartition): Errors = { + Errors.forCode(fetchResponseData.get(tp).errorCode) + } + + assertEquals(Errors.NONE, error(partitionWithDownConversionEnabled)) + if (isFollowerFetch) { + assertEquals(Errors.NONE, error(partitionWithDownConversionDisabled)) + } else { + assertEquals(Errors.UNSUPPORTED_VERSION, error(partitionWithDownConversionDisabled)) + } + } + + private def sendFetch( + leaderId: Int, + partitions: Seq[TopicPartition], + topicIdMap: Map[String, Uuid], + fetchVersion: Short, + replicaIdOpt: Option[Int] + ): util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] = { + val topicNameMap = topicIdMap.map(_.swap) + val partitionMap = createPartitionMap(1024, partitions, topicIdMap) + + val fetchRequest = replicaIdOpt.map { replicaId => + FetchRequest.Builder.forReplica(fetchVersion, replicaId, Int.MaxValue, 0, partitionMap) + .build(fetchVersion) + }.getOrElse { + FetchRequest.Builder.forConsumer(fetchVersion, Int.MaxValue, 0, partitionMap) + .build(fetchVersion) + } - allTopicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic, "key", "value")).get()) - val fetchRequest = FetchRequest.Builder.forReplica(1, 1, Int.MaxValue, 0, - createPartitionMap(1024, allTopicPartitions, topicIds.toMap)).build() val fetchResponse = sendFetchRequest(leaderId, fetchRequest) - val fetchResponseData = fetchResponse.responseData(topicNames.asJava, 1) - allTopicPartitions.foreach(tp => assertEquals(Errors.NONE, Errors.forCode(fetchResponseData.get(tp).errorCode))) + fetchResponse.responseData(topicNameMap.asJava, fetchVersion) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 49ac23ec23223..18e810bb88def 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -18,8 +18,7 @@ package kafka.server import java.io.File import java.util.{Collections, Optional, Properties} - -import kafka.cluster.Partition +import kafka.cluster.{Partition, PartitionTest} import kafka.log.{LogManager, LogOffsetSnapshot, UnifiedLog} import kafka.server.QuotaFactory.QuotaManagers import kafka.utils._ @@ -32,9 +31,9 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} -import org.mockito.{AdditionalMatchers, ArgumentMatchers} import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} import org.mockito.Mockito.{mock, when} +import org.mockito.{AdditionalMatchers, ArgumentMatchers} import scala.jdk.CollectionConverters._ @@ -65,18 +64,10 @@ class ReplicaManagerQuotasTest { .thenReturn(false) .thenReturn(true) - val fetch = replicaManager.readFromLocalLog( - replicaId = followerReplicaId, - fetchOnlyFromLeader = true, - fetchIsolation = FetchHighWatermark, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - readPartitionInfo = fetchInfo, - quota = quota, - clientMetadata = None) + val fetchParams = PartitionTest.followerFetchParams(followerReplicaId) + val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false) assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size, "Given two partitions, with only one throttled, we should get the first") - assertEquals(0, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size, "But we shouldn't get the second") } @@ -91,15 +82,8 @@ class ReplicaManagerQuotasTest { .thenReturn(true) .thenReturn(true) - val fetch = replicaManager.readFromLocalLog( - replicaId = followerReplicaId, - fetchOnlyFromLeader = true, - fetchIsolation = FetchHighWatermark, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - readPartitionInfo = fetchInfo, - quota = quota, - clientMetadata = None) + val fetchParams = PartitionTest.followerFetchParams(followerReplicaId) + val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false) assertEquals(0, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size, "Given two partitions, with both throttled, we should get no messages") assertEquals(0, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size, @@ -116,15 +100,8 @@ class ReplicaManagerQuotasTest { .thenReturn(false) .thenReturn(false) - val fetch = replicaManager.readFromLocalLog( - replicaId = followerReplicaId, - fetchOnlyFromLeader = true, - fetchIsolation = FetchHighWatermark, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - readPartitionInfo = fetchInfo, - quota = quota, - clientMetadata = None) + val fetchParams = PartitionTest.followerFetchParams(followerReplicaId) + val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false) assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size, "Given two partitions, with both non-throttled, we should get both messages") assertEquals(1, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size, @@ -141,15 +118,8 @@ class ReplicaManagerQuotasTest { .thenReturn(false) .thenReturn(true) - val fetch = replicaManager.readFromLocalLog( - replicaId = followerReplicaId, - fetchOnlyFromLeader = true, - fetchIsolation = FetchHighWatermark, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - readPartitionInfo = fetchInfo, - quota = quota, - clientMetadata = None) + val fetchParams = PartitionTest.followerFetchParams(followerReplicaId) + val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false) assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size, "Given two partitions, with only one throttled, we should get the first") @@ -164,19 +134,10 @@ class ReplicaManagerQuotasTest { val quota = mockQuota() when(quota.isQuotaExceeded).thenReturn(true) - val fetch = replicaManager.readFromLocalLog( - replicaId = FetchRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = true, - fetchIsolation = FetchHighWatermark, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - readPartitionInfo = fetchInfo, - quota = quota, - clientMetadata = None).toMap - + val fetchParams = PartitionTest.consumerFetchParams() + val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false).toMap assertEquals(1, fetch(topicIdPartition1).info.records.batches.asScala.size, "Replication throttled partitions should return data for consumer fetch") - assertEquals(1, fetch(topicIdPartition2).info.records.batches.asScala.size, "Replication throttled partitions should return data for consumer fetch") } @@ -315,6 +276,10 @@ class ReplicaManagerQuotasTest { MemoryRecords.EMPTY )) + when(log.maybeIncrementHighWatermark( + any[LogOffsetMetadata] + )).thenReturn(None) + //Create log manager val logManager: LogManager = mock(classOf[LogManager]) @@ -367,4 +332,5 @@ class ReplicaManagerQuotasTest { when(quota.isThrottled(any[TopicPartition])).thenReturn(true) quota } + } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 977da6c69c79e..80e611251831b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -973,13 +973,14 @@ class ReplicaManagerTest { val partition0Replicas = Seq[Integer](0, 1).asJava val partition1Replicas = Seq[Integer](0, 2).asJava val topicIds = Map(tp0.topic -> topicId, tp1.topic -> topicId).asJava + val leaderEpoch = 0 val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq( new LeaderAndIsrPartitionState() .setTopicName(tp0.topic) .setPartitionIndex(tp0.partition) .setControllerEpoch(0) - .setLeader(0) + .setLeader(leaderEpoch) .setLeaderEpoch(0) .setIsr(partition0Replicas) .setPartitionEpoch(0) @@ -990,7 +991,7 @@ class ReplicaManagerTest { .setPartitionIndex(tp1.partition) .setControllerEpoch(0) .setLeader(0) - .setLeaderEpoch(0) + .setLeaderEpoch(leaderEpoch) .setIsr(partition1Replicas) .setPartitionEpoch(0) .setReplicas(partition1Replicas) @@ -1024,20 +1025,17 @@ class ReplicaManagerTest { assertEquals(Errors.NONE, tp0Status.get.error) assertTrue(tp0Status.get.records.batches.iterator.hasNext) + // Replica 1 is not a valid replica for partition 1 val tp1Status = responseStatusMap.get(tidp1) - assertTrue(tp1Status.isDefined) - assertEquals(0, tp1Status.get.highWatermark) - assertEquals(Some(0), tp0Status.get.lastStableOffset) - assertEquals(Errors.NONE, tp1Status.get.error) - assertFalse(tp1Status.get.records.batches.iterator.hasNext) + assertEquals(Errors.UNKNOWN_LEADER_EPOCH, tp1Status.get.error) } fetchPartitions( replicaManager, replicaId = 1, fetchInfos = Seq( - tidp0 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty()), - tidp1 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty()) + tidp0 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.of[Integer](leaderEpoch)), + tidp1 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.of[Integer](leaderEpoch)) ), responseCallback = fetchCallback, maxWaitMs = 1000, @@ -1354,13 +1352,14 @@ class ReplicaManagerTest { ).toMap) // Make this replica the leader + val leaderEpoch = 1 val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() .setTopicName(topic) .setPartitionIndex(0) .setControllerEpoch(0) .setLeader(0) - .setLeaderEpoch(1) + .setLeaderEpoch(leaderEpoch) .setIsr(brokerList) .setPartitionEpoch(0) .setReplicas(brokerList) @@ -1368,15 +1367,22 @@ class ReplicaManagerTest { Collections.singletonMap(topic, topicId), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) - // Avoid the replica selector ignore the follower replica if it not have the data that need to fetch - replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(followerBrokerId, new LogOffsetMetadata(0), 0, 0, 0) + + // The leader must record the follower's fetch offset to make it eligible for follower fetch selection + val followerFetchData = new PartitionData(topicId, 0L, 0L, Int.MaxValue, Optional.of(Int.box(leaderEpoch)), Optional.empty[Integer]) + fetchPartitionAsFollower( + replicaManager, + tidp0, + followerFetchData, + replicaId = followerBrokerId + ) val metadata = new DefaultClientMetadata("rack-b", "client-id", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") // If a preferred read replica is selected, the fetch response returns immediately, even if min bytes and timeout conditions are not met. val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, - new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), + new PartitionData(topicId, 0, 0, 100000, Optional.empty()), minBytes = 1, clientMetadata = Some(metadata), maxWaitMs = 5000) // Fetch from leader succeeds diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 31ba10f79c34b..e097dbd620035 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -32,7 +32,7 @@ import java.util.{Arrays, Collections, Optional, Properties} import com.yammer.metrics.core.{Gauge, Meter} import javax.net.ssl.X509TrustManager import kafka.api._ -import kafka.cluster.{Broker, EndPoint, AlterPartitionListener} +import kafka.cluster.{AlterPartitionListener, Broker, EndPoint} import kafka.controller.{ControllerEventManager, LeaderIsrAndControllerEpoch} import kafka.log._ import kafka.network.RequestChannel @@ -66,7 +66,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer} import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{KafkaFuture, TopicPartition} +import org.apache.kafka.common.{KafkaFuture, TopicPartition, Uuid} import org.apache.kafka.controller.QuorumController import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer} import org.apache.kafka.server.common.MetadataVersion @@ -382,6 +382,34 @@ object TestUtils extends Logging { Admin.create(adminClientProperties) } + def createTopicWithAdminRaw[B <: KafkaBroker]( + admin: Admin, + topic: String, + numPartitions: Int = 1, + replicationFactor: Int = 1, + replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty, + topicConfig: Properties = new Properties, + ): Uuid = { + val configsMap = new util.HashMap[String, String]() + topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString)) + + val result = if (replicaAssignment.isEmpty) { + admin.createTopics(Collections.singletonList(new NewTopic( + topic, numPartitions, replicationFactor.toShort).configs(configsMap))) + } else { + val assignment = new util.HashMap[Integer, util.List[Integer]]() + replicaAssignment.forKeyValue { case (k, v) => + val replicas = new util.ArrayList[Integer] + v.foreach(r => replicas.add(r.asInstanceOf[Integer])) + assignment.put(k.asInstanceOf[Integer], replicas) + } + admin.createTopics(Collections.singletonList(new NewTopic( + topic, assignment).configs(configsMap))) + } + + result.topicId(topic).get() +} + def createTopicWithAdmin[B <: KafkaBroker]( admin: Admin, topic: String, @@ -397,23 +425,15 @@ object TestUtils extends Logging { replicaAssignment.size } - val configsMap = new util.HashMap[String, String]() - topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString)) try { - val result = if (replicaAssignment.isEmpty) { - admin.createTopics(Collections.singletonList(new NewTopic( - topic, numPartitions, replicationFactor.toShort).configs(configsMap))) - } else { - val assignment = new util.HashMap[Integer, util.List[Integer]]() - replicaAssignment.forKeyValue { case (k, v) => - val replicas = new util.ArrayList[Integer] - v.foreach(r => replicas.add(r.asInstanceOf[Integer])) - assignment.put(k.asInstanceOf[Integer], replicas) - } - admin.createTopics(Collections.singletonList(new NewTopic( - topic, assignment).configs(configsMap))) - } - result.all().get() + createTopicWithAdminRaw( + admin, + topic, + numPartitions, + replicationFactor, + replicaAssignment, + topicConfig + ) } catch { case e: ExecutionException => if (!(e.getCause != null && e.getCause.isInstanceOf[TopicExistsException] && @@ -432,16 +452,24 @@ object TestUtils extends Logging { }.toMap } + def describeTopic( + admin: Admin, + topic: String + ): TopicDescription = { + val describedTopics = admin.describeTopics( + Collections.singleton(topic) + ).allTopicNames().get() + describedTopics.get(topic) + } + def topicHasSameNumPartitionsAndReplicationFactor(adminClient: Admin, topic: String, numPartitions: Int, replicationFactor: Int): Boolean = { - val describedTopics = adminClient.describeTopics(Collections. - singleton(topic)).allTopicNames().get() - val description = describedTopics.get(topic) - (description != null && + val description = describeTopic(adminClient, topic) + description != null && description.partitions().size() == numPartitions && - description.partitions().iterator().next().replicas().size() == replicationFactor) + description.partitions().iterator().next().replicas().size() == replicationFactor } def createOffsetsTopicWithAdmin[B <: KafkaBroker]( diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index f1f3d76ba73c4..b2cf1ac5569bb 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -20,6 +20,7 @@ import kafka.cluster.DelayedOperations; import kafka.cluster.AlterPartitionListener; import kafka.cluster.Partition; +import kafka.cluster.Replica; import kafka.log.CleanerConfig; import kafka.log.Defaults; import kafka.log.LogConfig; @@ -79,6 +80,8 @@ public class UpdateFollowerFetchStateBenchmark { private long nextOffset = 0; private LogManager logManager; private Partition partition; + private Replica replica1; + private Replica replica2; @Setup(Level.Trial) public void setUp() { @@ -127,6 +130,8 @@ public void setUp() { alterPartitionListener, delayedOperations, Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); partition.makeLeader(partitionState, offsetCheckpoints, topicId); + replica1 = partition.getReplica(1).get(); + replica2 = partition.getReplica(2).get(); } // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results @@ -166,9 +171,9 @@ private LogConfig createLogConfig() { @OutputTimeUnit(TimeUnit.NANOSECONDS) public void updateFollowerFetchStateBench() { // measure the impact of two follower fetches on the leader - partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0), + partition.updateFollowerFetchState(replica1, new LogOffsetMetadata(nextOffset, nextOffset, 0), 0, 1, nextOffset); - partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0), + partition.updateFollowerFetchState(replica2, new LogOffsetMetadata(nextOffset, nextOffset, 0), 0, 1, nextOffset); nextOffset++; } @@ -178,9 +183,9 @@ public void updateFollowerFetchStateBench() { public void updateFollowerFetchStateBenchNoChange() { // measure the impact of two follower fetches on the leader when the follower didn't // end up fetching anything - partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0), + partition.updateFollowerFetchState(replica1, new LogOffsetMetadata(nextOffset, nextOffset, 0), 0, 1, 100); - partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0), + partition.updateFollowerFetchState(replica2, new LogOffsetMetadata(nextOffset, nextOffset, 0), 0, 1, 100); } }