Skip to content

Commit

Permalink
KAFKA-13837; Return an error from Fetch if follower is not a valid re…
Browse files Browse the repository at this point in the history
…plica (apache#12150)

When a partition leader receives a `Fetch` request from a replica which is not in the current replica set, the behavior today is to return a successful fetch response, but with empty data. This causes the follower to retry until metadata converges without updating any state on the leader side. It is clearer in this case to return an error, so that the metadata inconsistency is visible in logging and so that the follower backs off before retrying. 

In this patch, we use `UNKNOWN_LEADER_EPOCH` when the `Fetch` request includes the current leader epoch. The way we see this is that the leader is validating the (replicaId, leaderEpoch) tuple. When the leader returns `UNKNOWN_LEADER_EPOCH`, it means that the leader does not expect the given leaderEpoch from that replica. If the request does not include a leader epoch, then we use `NOT_LEADER_OR_FOLLOWER`. We can take a similar interpretation for this case: the leader is rejecting the request because it does not think it should be the leader for that replica. But mainly these errors ensure that the follower will retry the request.

As a part of this patch, I have refactored the way that the leader updates follower fetch state. Previously, the process is a little convoluted. We send the fetch from `ReplicaManager` down to `Partition.readRecords`, then we iterate over the results and call `Partition.updateFollowerFetchState`. It is more straightforward to update state directly as a part of `readRecords`. All we need to do is pass through the `FetchParams`. This also prevents an unnecessary copy of the read results.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
Jason Gustafson authored May 19, 2022
1 parent b4f35c9 commit 8efdbce
Show file tree
Hide file tree
Showing 14 changed files with 760 additions and 512 deletions.
234 changes: 165 additions & 69 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,10 @@ class Partition(val topicPartition: TopicPartition,
leaderReplicaIdOpt.filter(_ == localBrokerId)
}

private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): UnifiedLog = {
private def localLogWithEpochOrThrow(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean
): UnifiedLog = {
getLocalLog(currentLeaderEpoch, requireLeader) match {
case Left(localLog) => localLog
case Right(error) =>
Expand Down Expand Up @@ -719,55 +721,51 @@ class Partition(val topicPartition: TopicPartition,
* Update the follower's state in the leader based on the last fetch request. See
* [[Replica.updateFetchState()]] for details.
*
* @return true if the follower's fetch state was updated, false if the followerId is not recognized
* This method is visible for performance testing (see `UpdateFollowerFetchStateBenchmark`)
*/
def updateFollowerFetchState(followerId: Int,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long): Boolean = {
getReplica(followerId) match {
case Some(followerReplica) =>
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
val prevFollowerEndOffset = followerReplica.stateSnapshot.logEndOffset
followerReplica.updateFetchState(
followerFetchOffsetMetadata,
followerStartOffset,
followerFetchTimeMs,
leaderEndOffset)

val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
val leaderLWIncremented = newLeaderLW > oldLeaderLW

// Check if this in-sync replica needs to be added to the ISR.
maybeExpandIsr(followerReplica)

// check if the HW of the partition can now be incremented
// since the replica may already be in the ISR and its LEO has just incremented
val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.stateSnapshot.logEndOffset) {
// the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
// leaderIsrUpdateLock to prevent adding new hw to invalid log.
inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
}
} else {
false
}

// some delayed operations may be unblocked after HW or LW changed
if (leaderLWIncremented || leaderHWIncremented)
tryCompleteDelayedRequests()
def updateFollowerFetchState(
replica: Replica,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long
): Unit = {
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
replica.updateFetchState(
followerFetchOffsetMetadata,
followerStartOffset,
followerFetchTimeMs,
leaderEndOffset
)

val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
val leaderLWIncremented = newLeaderLW > oldLeaderLW

// Check if this in-sync replica needs to be added to the ISR.
maybeExpandIsr(replica)

// check if the HW of the partition can now be incremented
// since the replica may already be in the ISR and its LEO has just incremented
val leaderHWIncremented = if (prevFollowerEndOffset != replica.stateSnapshot.logEndOffset) {
// the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
// leaderIsrUpdateLock to prevent adding new hw to invalid log.
inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
}
} else {
false
}

debug(s"Recorded replica $followerId log end offset (LEO) position " +
s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
true
// some delayed operations may be unblocked after HW or LW changed
if (leaderLWIncremented || leaderHWIncremented)
tryCompleteDelayedRequests()

case None =>
false
}
debug(s"Recorded replica ${replica.brokerId} log end offset (LEO) position " +
s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
}

/**
Expand Down Expand Up @@ -1145,15 +1143,112 @@ class Partition(val topicPartition: TopicPartition,
info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}

def readRecords(lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
fetchOnlyFromLeader: Boolean,
minOneMessage: Boolean): LogReadInfo = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
/**
* Fetch records from the partition.
*
* @param fetchParams parameters of the corresponding `Fetch` request
* @param fetchPartitionData partition-level parameters of the `Fetch` (e.g. the fetch offset)
* @param fetchTimeMs current time in milliseconds on the broker of this fetch request
* @param maxBytes the maximum bytes to return
* @param minOneMessage whether to ensure that at least one complete message is returned
* @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches)
* @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present
* @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]]
* is enabled, or if this is a follower fetch with an older request version
* and the replicaId is not recognized among the current valid replicas
* @throws FencedLeaderEpochException if the leader epoch in the `Fetch` request is lower than the current
* leader epoch
* @throws UnknownLeaderEpochException if the leader epoch in the `Fetch` request is higher than the current
* leader epoch, or if this is a follower fetch and the replicaId is not
* recognized among the current valid replicas
* @throws OffsetOutOfRangeException if the fetch offset is smaller than the log start offset or larger than
* the log end offset (or high watermark depending on [[FetchParams.isolation]]),
* or if the end offset for the last fetched epoch in [[FetchRequest.PartitionData]]
* cannot be determined from the local epoch cache (e.g. if it is larger than
* any cached epoch value)
*/
def fetchRecords(
fetchParams: FetchParams,
fetchPartitionData: FetchRequest.PartitionData,
fetchTimeMs: Long,
maxBytes: Int,
minOneMessage: Boolean,
updateFetchState: Boolean
): LogReadInfo = {
def readFromLocalLog(): LogReadInfo = {
readRecords(
fetchPartitionData.lastFetchedEpoch,
fetchPartitionData.fetchOffset,
fetchPartitionData.currentLeaderEpoch,
maxBytes,
fetchParams.isolation,
minOneMessage,
fetchParams.fetchOnlyLeader
)
}

if (fetchParams.isFromFollower) {
// Check that the request is from a valid replica before doing the read
val replica = followerReplicaOrThrow(fetchParams.replicaId, fetchPartitionData)
val logReadInfo = readFromLocalLog()

if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) {
updateFollowerFetchState(
replica,
followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
followerStartOffset = fetchPartitionData.logStartOffset,
followerFetchTimeMs = fetchTimeMs,
leaderEndOffset = logReadInfo.logEndOffset
)
}

logReadInfo
} else {
readFromLocalLog()
}
}

private def followerReplicaOrThrow(
replicaId: Int,
fetchPartitionData: FetchRequest.PartitionData
): Replica = {
getReplica(replicaId).getOrElse {
debug(s"Leader $localBrokerId failed to record follower $replicaId's position " +
s"${fetchPartitionData.fetchOffset}, and last sent high watermark since the replica is " +
s"not recognized to be one of the assigned replicas ${assignmentState.replicas.mkString(",")} " +
s"for leader epoch $leaderEpoch with partition epoch $partitionEpoch")

val error = if (fetchPartitionData.currentLeaderEpoch.isPresent) {
// The leader epoch is present in the request and matches the local epoch, but
// the replica is not in the replica set. This case is possible in KRaft,
// for example, when new replicas are added as part of a reassignment.
// We return UNKNOWN_LEADER_EPOCH to signify that the tuple (replicaId, leaderEpoch)
// is not yet recognized as valid, which causes the follower to retry.
Errors.UNKNOWN_LEADER_EPOCH
} else {
// The request has no leader epoch, which means it is an older version. We cannot
// say if the follower's state is stale or the local state is. In this case, we
// return `NOT_LEADER_OR_FOLLOWER` for lack of a better error so that the follower
// will retry.
Errors.NOT_LEADER_OR_FOLLOWER
}

throw error.exception(s"Replica $replicaId is not recognized as a " +
s"valid replica of $topicPartition in leader epoch $leaderEpoch with " +
s"partition epoch $partitionEpoch")
}
}

private def readRecords(
lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
minOneMessage: Boolean,
fetchOnlyFromLeader: Boolean
): LogReadInfo = inReadLock(leaderIsrUpdateLock) {
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)

// Note we use the log end offset prior to the read. This ensures that any appends following
// the fetch do not prevent a follower from coming into sync.
Expand Down Expand Up @@ -1181,18 +1276,12 @@ class Partition(val topicPartition: TopicPartition,
}

if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) {
val emptyFetchData = FetchDataInfo(
fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
records = MemoryRecords.EMPTY,
abortedTransactions = None
)

val divergingEpoch = new FetchResponseData.EpochEndOffset()
.setEpoch(epochEndOffset.leaderEpoch)
.setEndOffset(epochEndOffset.endOffset)

return LogReadInfo(
fetchedData = emptyFetchData,
fetchedData = FetchDataInfo.empty(fetchOffset),
divergingEpoch = Some(divergingEpoch),
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
Expand All @@ -1201,22 +1290,29 @@ class Partition(val topicPartition: TopicPartition,
}
}

val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
val fetchedData = localLog.read(
fetchOffset,
maxBytes,
fetchIsolation,
minOneMessage
)

LogReadInfo(
fetchedData = fetchedData,
divergingEpoch = None,
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset)
lastStableOffset = initialLastStableOffset
)
}

def fetchOffsetForTimestamp(timestamp: Long,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)

val lastFetchableOffset = isolationLevel match {
case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
Expand Down Expand Up @@ -1277,15 +1373,15 @@ class Partition(val topicPartition: TopicPartition,
def fetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): LogOffsetSnapshot = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)
localLog.fetchOffsetSnapshot
}

def legacyFetchOffsetsForTimestamp(timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) {
val localLog = localLogWithEpochOrException(Optional.empty(), fetchOnlyFromLeader)
val localLog = localLogWithEpochOrThrow(Optional.empty(), fetchOnlyFromLeader)
val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets)

if (!isFromConsumer) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ case class LogOffsetSnapshot(logStartOffset: Long,
lastStableOffset: LogOffsetMetadata)

/**
* Another container which is used for lower level reads using [[kafka.cluster.Partition.readRecords()]].
* Another container which is used for lower level reads using [[kafka.cluster.Partition.fetchRecords()]].
*/
case class LogReadInfo(fetchedData: FetchDataInfo,
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,16 @@ class DelayedFetch(
* Upon completion, read whatever data is available and pass to the complete callback
*/
override def onComplete(): Unit = {
val fetchInfos = fetchPartitionStatus.map { case (tp, status) =>
tp -> status.fetchInfo
}

val logReadResults = replicaManager.readFromLocalLog(
replicaId = params.replicaId,
fetchOnlyFromLeader = params.fetchOnlyLeader,
fetchIsolation = params.isolation,
fetchMaxBytes = params.maxBytes,
hardMaxBytesLimit = params.hardMaxBytesLimit,
readPartitionInfo = fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
clientMetadata = params.clientMetadata,
quota = quota)
params,
fetchInfos,
quota,
readFromPurgatory = true
)

val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower &&
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/kafka/server/FetchDataInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.server
import kafka.api.Request
import org.apache.kafka.common.IsolationLevel
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.Records
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest

Expand Down Expand Up @@ -75,6 +75,17 @@ case class FetchParams(
}
}

object FetchDataInfo {
def empty(fetchOffset: Long): FetchDataInfo = {
FetchDataInfo(
fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
records = MemoryRecords.EMPTY,
firstEntryIncomplete = false,
abortedTransactions = None
)
}
}

case class FetchDataInfo(
fetchOffsetMetadata: LogOffsetMetadata,
records: Records,
Expand Down
Loading

0 comments on commit 8efdbce

Please sign in to comment.