Skip to content

Commit

Permalink
KAFKA-1720; Renamed Delayed Operations after KAFKA-1583; reviewed by …
Browse files Browse the repository at this point in the history
…Gwen Shapira and Joel Koshy
  • Loading branch information
Guozhang Wang committed Dec 4, 2014
1 parent 7e9368b commit 3cc10d5
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 79 deletions.
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
package kafka.cluster

import kafka.common._
import kafka.admin.AdminUtils
import kafka.utils._
import kafka.utils.Utils.{inReadLock,inWriteLock}
import kafka.admin.AdminUtils
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager}
import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet

import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.Utils.{inReadLock,inWriteLock}
import scala.collection.immutable.Set

import com.yammer.metrics.core.Gauge
Expand Down Expand Up @@ -232,7 +232,7 @@ class Partition(val topic: String,
/**
* Update the log end offset of a certain replica of this partition
*/
def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) = {
def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) {
getReplica(replicaId) match {
case Some(replica) =>
replica.logEndOffset = offset
Expand Down Expand Up @@ -343,8 +343,8 @@ class Partition(val topic: String,
if(oldHighWatermark.precedes(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
// some delayed requests may be unblocked after HW changed
val requestKey = new TopicAndPartition(this.topic, this.partitionId)
// some delayed operations may be unblocked after HW changed
val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
} else {
Expand Down Expand Up @@ -414,7 +414,7 @@ class Partition(val topic: String,

val info = log.append(messages, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.tryCompleteDelayedFetch(new TopicAndPartition(this.topic, this.partitionId))
replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf
}

/**
* The fetch metadata maintained by the delayed produce request
* The fetch metadata maintained by the delayed fetch operation
*/
case class FetchMetadata(fetchMinBytes: Int,
fetchOnlyLeader: Boolean,
Expand All @@ -45,17 +45,17 @@ case class FetchMetadata(fetchMinBytes: Int,
"partitionStatus: " + fetchPartitionStatus + "]"
}
/**
* A delayed fetch request that can be created by the replica manager and watched
* in the fetch request purgatory
* A delayed fetch operation that can be created by the replica manager and watched
* in the fetch operation purgatory
*/
class DelayedFetch(delayMs: Long,
fetchMetadata: FetchMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
extends DelayedRequest(delayMs) {
extends DelayedOperation(delayMs) {

/**
* The request can be completed if:
* The operation can be completed if:
*
* Case A: This broker is no longer the leader for some partitions it tries to fetch
* Case B: This broker does not know of some partitions it tries to fetch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ import com.yammer.metrics.core.Gauge
* or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
* forceComplete().
*
* A subclass of DelayedRequest needs to provide an implementation of both onComplete() and tryComplete().
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
*/
abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) {
abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) {
private val completed = new AtomicBoolean(false)

/*
Expand All @@ -51,7 +51,10 @@ abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) {
* 1. The operation has been verified to be completable inside tryComplete()
* 2. The operation has expired and hence needs to be completed right now
*
* Return true iff the operation is completed by the caller
* Return true iff the operation is completed by the caller: note that
* concurrent threads can try to complete the same operation, but only
* the first thread will succeed in completing the operation and return
* true, others will still return false
*/
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) {
Expand All @@ -68,8 +71,8 @@ abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) {
def isCompleted(): Boolean = completed.get()

/**
* Process for completing an operation; This function needs to be defined in subclasses
* and will be called exactly once in forceComplete()
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
def onComplete(): Unit

Expand All @@ -78,25 +81,21 @@ abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) {
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
*
* Note that concurrent threads can check if an operation can be completed or not,
* but only the first thread will succeed in completing the operation and return
* true, others will still return false
*
* this function needs to be defined in subclasses
* This function needs to be defined in subclasses
*/
def tryComplete(): Boolean
}

/**
* A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
*/
class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000)
class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000)
extends Logging with KafkaMetricsGroup {

/* a list of requests watching each key */
/* a list of operation watching keys */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))

/* background thread expiring requests that have been waiting too long */
/* background thread expiring operations that have timed out */
private val expirationReaper = new ExpiredOperationReaper

newGauge(
Expand All @@ -107,7 +106,7 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In
)

newGauge(
"NumDelayedRequests",
"NumDelayedOperations",
new Gauge[Int] {
def value = delayed()
}
Expand Down Expand Up @@ -153,10 +152,10 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In
}

/**
* Check if some some delayed requests can be completed with the given watch key,
* Check if some some delayed operations can be completed with the given watch key,
* and if yes complete them.
*
* @return the number of completed requests during this process
* @return the number of completed operations during this process
*/
def checkAndComplete(key: Any): Int = {
val watchers = watchersForKey.get(key)
Expand Down Expand Up @@ -194,26 +193,26 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In
* A linked list of watched delayed operations based on some key
*/
private class Watchers {
private val requests = new util.LinkedList[T]
private val operations = new util.LinkedList[T]

def watched = requests.size()
def watched = operations.size()

// add the element to watch
def watch(t: T) {
synchronized {
requests.add(t)
operations.add(t)
}
}

// traverse the list and try to complete some watched elements
def tryCompleteWatched(): Int = {
var completed = 0
synchronized {
val iter = requests.iterator()
val iter = operations.iterator()
while(iter.hasNext) {
val curr = iter.next
if (curr.isCompleted()) {
// another thread has completed this request, just remove it
// another thread has completed this operation, just remove it
iter.remove()
} else {
if(curr synchronized curr.tryComplete()) {
Expand All @@ -230,7 +229,7 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In
def purgeCompleted(): Int = {
var purged = 0
synchronized {
val iter = requests.iterator()
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next
if(curr.isCompleted()) {
Expand Down Expand Up @@ -301,12 +300,12 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In
// try to get the next expired operation and force completing it
expireNext()
// see if we need to purge the watch lists
if (RequestPurgatory.this.watched() >= purgeInterval) {
if (DelayedOperationPurgatory.this.watched() >= purgeInterval) {
debug("Begin purging watch lists")
val purged = watchersForKey.values.map(_.purgeCompleted()).sum
debug("Purged %d elements from watch lists.".format(purged))
}
// see if we need to purge the delayed request queue
// see if we need to purge the delayed operation queue
if (delayed() >= purgeInterval) {
debug("Begin purging delayed queue")
val purged = purgeCompleted()
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/kafka/server/DelayedOperationKey.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import kafka.common.TopicAndPartition

/**
* Keys used for delayed operation metrics recording
*/
trait DelayedOperationKey {
def keyLabel: String
}

object DelayedOperationKey {
val globalLabel = "All"
}

case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {

def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)

override def keyLabel = "%s-%d".format(topic, partition)
}
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Producer
}

/**
* The produce metadata maintained by the delayed produce request
* The produce metadata maintained by the delayed produce operation
*/
case class ProduceMetadata(produceRequiredAcks: Short,
produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) {
Expand All @@ -42,14 +42,14 @@ case class ProduceMetadata(produceRequiredAcks: Short,
}

/**
* A delayed produce request that can be created by the replica manager and watched
* in the produce request purgatory
* A delayed produce operation that can be created by the replica manager and watched
* in the produce operation purgatory
*/
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
extends DelayedRequest(delayMs) {
extends DelayedOperation(delayMs) {

// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
Expand All @@ -65,13 +65,13 @@ class DelayedProduce(delayMs: Long,
}

/**
* The delayed produce request can be completed if every partition
* The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following:
*
* Case A: This broker is no longer the leader: set an error in response
* Case B: This broker is the leader:
* B.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this request: set an error in response
* replicas have caught up to this operation: set an error in response
* B.2 - Otherwise, set the response with no error.
*/
override def tryComplete(): Boolean = {
Expand Down Expand Up @@ -117,4 +117,4 @@ class DelayedProduce(delayMs: Long,
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)
}
}
}
Loading

0 comments on commit 3cc10d5

Please sign in to comment.