Skip to content

Commit

Permalink
basic implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
lmr3796 committed Oct 6, 2023
1 parent d144fda commit 1899ca2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
)

val listOffsetsRequestInstrumentation = new ListOffsetsRequestInstrumentation()

def close(): Unit = {
aclApis.close()
info("Shutdown complete.")
Expand Down Expand Up @@ -1141,6 +1143,10 @@ class KafkaApis(val requestChannel: RequestChannel,
else
None

if (isClientRequest) { // Brokers send listOffset requests too to check if truncation needed --- ignore those.
listOffsetsRequestInstrumentation.logUsage(request.context.principal, topic)
}

val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
partition.timestamp,
isolationLevelOpt,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package kafka.server

import com.yammer.metrics.core.Meter
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic
import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.security.auth.KafkaPrincipal

import java.util.concurrent.TimeUnit
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._

/**
* A short term solution for tracking
* 1. what are the services/topics using by timestamp.
* 2. what's the usage size of this
*/
class ListOffsetsRequestInstrumentation extends KafkaMetricsGroup {
private val metricName = "ListOffsetsPartitionsRequested"
private val eventType = "partitions"
private val listBy = "listBy"

private val unknownTimestamp: Meter = newMeter(metricName, eventType, TimeUnit.SECONDS, Map(listBy -> "UNKNOWN"))
private val earliestTimestamp: Meter = newMeter(metricName, eventType, TimeUnit.SECONDS, Map(listBy -> "EARLIEST"))
private val latestTimestamp: Meter = newMeter(metricName, eventType, TimeUnit.SECONDS, Map(listBy -> "LATEST"))
private val liEarliestLocalTimestamp: Meter = newMeter(metricName, eventType, TimeUnit.SECONDS, Map(listBy -> "LI_EARLIEST_LOCAL"))
private val maxTimestamp: Meter = newMeter(metricName, eventType, TimeUnit.SECONDS, Map(listBy -> "MAX"))
private val byTimestamp: Meter = newMeter(metricName, eventType, TimeUnit.SECONDS, Map(listBy -> "BY_TIMESTAMP"))

// The object would be periodically dumped to log and cleared on kafka-server wrapper
var listOffsetsByTimestampApiClientUsers: mutable.Map[String, mutable.Set[String]] = _
snapshotAndResetListOffsetByTimeStampApiUsers()

/**
* A helper method for the external wrapper to obtain the tracked requesters and refresh the tracking map
*/
def snapshotAndResetListOffsetByTimeStampApiUsers(): mutable.Map[String, mutable.Set[String]] = {
val old = listOffsetsByTimestampApiClientUsers
listOffsetsByTimestampApiClientUsers = mutable.Map[String, mutable.Set[String]]()
old
}

def logUsage(principal: KafkaPrincipal, topic: ListOffsetsTopic): Unit = {
topic.partitions().asScala.foreach { partition =>
partition.timestamp() match {
// special types like EARLIEST are constants < 0
case ListOffsetsRequest.EARLIEST_TIMESTAMP => earliestTimestamp.mark()
case ListOffsetsRequest.LATEST_TIMESTAMP => latestTimestamp.mark()
case ListOffsetsRequest.LI_EARLIEST_LOCAL_TIMESTAMP => liEarliestLocalTimestamp.mark()
case ListOffsetsRequest.MAX_TIMESTAMP => maxTimestamp.mark()
// Negative, not by actual timestamp, but also not yet defined constant type
case t if t < 0 => unknownTimestamp.mark()
// When > 0, it's specifying search by an actual timestamp
case t if t >= 0 =>
byTimestamp.mark()
// For by timestamp, we also want to know who are the ones sending
(listOffsetsByTimestampApiClientUsers.get(principal.getName) match {
case Some(v) => v
case None =>
val newSet: mutable.Set[String] = mutable.Set()
listOffsetsByTimestampApiClientUsers(principal.getName) = newSet
newSet
}).add(topic.name)
}
}
}
}

0 comments on commit 1899ca2

Please sign in to comment.