Skip to content

Commit

Permalink
Merge pull request #616 from lichess-org/sleep-and-flush-robustness
Browse files Browse the repository at this point in the history
gank a thread and sleep instead of scheduling, cope with broken assumptions about queue size and timing
  • Loading branch information
ornicar authored Oct 23, 2024
2 parents 89233ac + f7a2f9c commit 3446313
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 19 deletions.
6 changes: 4 additions & 2 deletions src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,7 @@ object Monitor:
val step = Kamon.gauge("connector.flush.config.step").withoutTags()
val interval = Kamon.gauge("connector.flush.config.interval").withoutTags()
val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags()
val qSize = Kamon.histogram("connector.flush.qSize").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
val qSizeReal = Kamon.histogram("connector.flush.qSize").withoutTags()
val qSizeEstimate = Kamon.histogram("connector.flush.qSize.estimate").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
val loopRuntimeMicroseconds = Kamon.histogram("connector.flush.loopRuntimeMicroseconds").withoutTags()
56 changes: 39 additions & 17 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Deadline

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key
Expand All @@ -19,8 +19,12 @@ final private class ActorChannelConnector(
workers: EventLoopGroup
)(using scheduler: Scheduler, ec: Executor):

private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val flushQ = ActorChannelConnector.FlushQueue()
private val monitor = Monitor.connector.flush
private val flushThread = Future:
while !workers.isShuttingDown && !workers.isTerminated do
val delay = flush().timeLeft.max(0.millis)
Thread.sleep(delay.toMillis, (delay.toNanos % 1_000_000).toInt)

private object config:
private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key))
Expand All @@ -32,8 +36,7 @@ final private class ActorChannelConnector(
monitor.config.step.update(step.get())
monitor.config.interval.update(interval.get())
monitor.config.maxDelay.update(maxDelay.get())

workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS)
monitor.qSizeReal.record(flushQ.realSizeWithLinearPerformance())

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
Expand All @@ -58,30 +61,49 @@ final private class ActorChannelConnector(
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in if withFlush || !config.isFlushQEnabled() =>
case in if withFlush || flushThread.isCompleted || !config.isFlushQEnabled() =>
channel.writeAndFlush(TextWebSocketFrame(in.write))
case in =>
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def flush(): Unit =
val qSize = flushQ.size
private def flush(): Deadline =
val entered = Deadline.now
val qSize = flushQ.estimateSize()
val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1)
var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt)
val nextIntervalMillis =
if config.isFlushQEnabled() then config.interval.get()
else if qSize == 0 then 1000 // hibernate
else 1 // interval is 0 but we still need to empty the queue

workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS)

monitor.qSize.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)

while channelsToFlush > 0 do
Option(flushQ.poll()) match
flushQ.poll() match
case Some(channel) =>
if channel.isOpen then channel.flush()
channelsToFlush -= 1
case _ =>
channelsToFlush = 0

monitor.qSizeEstimate.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)
monitor.loopRuntimeMicroseconds.record((Deadline.now - entered).toMicros)

entered + {
if config.isFlushQEnabled() then config.interval.get().millis
else if qSize == 0 then 1000.millis // hibernate
else 1.millis // interval is 0 but we still need to empty the queue
}

object ActorChannelConnector:
private class FlushQueue:
private val queue = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val size = java.util.concurrent.atomic.AtomicInteger()

def add(channel: Channel): Unit =
queue.add(channel)
size.getAndIncrement()

def poll(): Option[Channel] =
val maybeChannel = Option(queue.poll())
if maybeChannel.nonEmpty then size.getAndDecrement()
maybeChannel

def estimateSize(): Int = size.get()
def realSizeWithLinearPerformance(): Int = queue.size()

0 comments on commit 3446313

Please sign in to comment.