Skip to content

Commit

Permalink
connector: monitor real q size every minute, to see if the estimation…
Browse files Browse the repository at this point in the history
… can drift
  • Loading branch information
ornicar committed Oct 23, 2024
1 parent fc97b05 commit f7a2f9c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ object Monitor:
val step = Kamon.gauge("connector.flush.config.step").withoutTags()
val interval = Kamon.gauge("connector.flush.config.interval").withoutTags()
val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags()
val qSize = Kamon.histogram("connector.flush.qSize").withoutTags()
val qSizeReal = Kamon.histogram("connector.flush.qSize").withoutTags()
val qSizeEstimate = Kamon.histogram("connector.flush.qSize.estimate").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
val loopRuntimeMicroseconds = Kamon.histogram("connector.flush.loopRuntimeMicroseconds").withoutTags()
10 changes: 6 additions & 4 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final private class ActorChannelConnector(
monitor.config.step.update(step.get())
monitor.config.interval.update(interval.get())
monitor.config.maxDelay.update(maxDelay.get())
monitor.qSizeReal.record(flushQ.realSizeWithLinearPerformance())

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
Expand Down Expand Up @@ -80,7 +81,7 @@ final private class ActorChannelConnector(
case _ =>
channelsToFlush = 0

monitor.qSize.record(qSize)
monitor.qSizeEstimate.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)
monitor.loopRuntimeMicroseconds.record((Deadline.now - entered).toMicros)

Expand All @@ -92,8 +93,8 @@ final private class ActorChannelConnector(

object ActorChannelConnector:
private class FlushQueue:
private val queue = new java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val size = new java.util.concurrent.atomic.AtomicInteger()
private val queue = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val size = java.util.concurrent.atomic.AtomicInteger()

def add(channel: Channel): Unit =
queue.add(channel)
Expand All @@ -104,4 +105,5 @@ object ActorChannelConnector:
if maybeChannel.nonEmpty then size.getAndDecrement()
maybeChannel

def estimateSize(): Int = size.get()
def estimateSize(): Int = size.get()
def realSizeWithLinearPerformance(): Int = queue.size()

0 comments on commit f7a2f9c

Please sign in to comment.