Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gank a thread and sleep instead of scheduling, cope with broken assumptions about queue size and timing #616

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,7 @@ object Monitor:
val step = Kamon.gauge("connector.flush.config.step").withoutTags()
val interval = Kamon.gauge("connector.flush.config.interval").withoutTags()
val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags()
val qSize = Kamon.histogram("connector.flush.qSize").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
val qSizeReal = Kamon.histogram("connector.flush.qSize").withoutTags()
val qSizeEstimate = Kamon.histogram("connector.flush.qSize.estimate").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
val loopRuntimeMicroseconds = Kamon.histogram("connector.flush.loopRuntimeMicroseconds").withoutTags()
56 changes: 39 additions & 17 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Deadline

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key
Expand All @@ -19,8 +19,12 @@ final private class ActorChannelConnector(
workers: EventLoopGroup
)(using scheduler: Scheduler, ec: Executor):

private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val flushQ = ActorChannelConnector.FlushQueue()
private val monitor = Monitor.connector.flush
private val flushThread = Future:
while !workers.isShuttingDown && !workers.isTerminated do
val delay = flush().timeLeft.max(0.millis)
Thread.sleep(delay.toMillis, (delay.toNanos % 1_000_000).toInt)

private object config:
private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key))
Expand All @@ -32,8 +36,7 @@ final private class ActorChannelConnector(
monitor.config.step.update(step.get())
monitor.config.interval.update(interval.get())
monitor.config.maxDelay.update(maxDelay.get())

workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS)
monitor.qSizeReal.record(flushQ.realSizeWithLinearPerformance())

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
Expand All @@ -58,30 +61,49 @@ final private class ActorChannelConnector(
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in if withFlush || !config.isFlushQEnabled() =>
case in if withFlush || flushThread.isCompleted || !config.isFlushQEnabled() =>
channel.writeAndFlush(TextWebSocketFrame(in.write))
case in =>
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def flush(): Unit =
val qSize = flushQ.size
private def flush(): Deadline =
val entered = Deadline.now
val qSize = flushQ.estimateSize()
val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1)
var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt)
val nextIntervalMillis =
if config.isFlushQEnabled() then config.interval.get()
else if qSize == 0 then 1000 // hibernate
else 1 // interval is 0 but we still need to empty the queue

workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS)

monitor.qSize.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)

while channelsToFlush > 0 do
Option(flushQ.poll()) match
flushQ.poll() match
case Some(channel) =>
if channel.isOpen then channel.flush()
channelsToFlush -= 1
case _ =>
channelsToFlush = 0

monitor.qSizeEstimate.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)
monitor.loopRuntimeMicroseconds.record((Deadline.now - entered).toMicros)

entered + {
if config.isFlushQEnabled() then config.interval.get().millis
else if qSize == 0 then 1000.millis // hibernate
else 1.millis // interval is 0 but we still need to empty the queue
}

object ActorChannelConnector:
private class FlushQueue:
private val queue = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val size = java.util.concurrent.atomic.AtomicInteger()

def add(channel: Channel): Unit =
queue.add(channel)
size.getAndIncrement()

def poll(): Option[Channel] =
val maybeChannel = Option(queue.poll())
if maybeChannel.nonEmpty then size.getAndDecrement()
maybeChannel

def estimateSize(): Int = size.get()
def realSizeWithLinearPerformance(): Int = queue.size()
Loading