diff --git a/src/main/scala/Monitor.scala b/src/main/scala/Monitor.scala index a228bb0b..5d069576 100644 --- a/src/main/scala/Monitor.scala +++ b/src/main/scala/Monitor.scala @@ -188,5 +188,7 @@ object Monitor: val step = Kamon.gauge("connector.flush.config.step").withoutTags() val interval = Kamon.gauge("connector.flush.config.interval").withoutTags() val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags() - val qSize = Kamon.histogram("connector.flush.qSize").withoutTags() - val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags() + val qSizeReal = Kamon.histogram("connector.flush.qSize").withoutTags() + val qSizeEstimate = Kamon.histogram("connector.flush.qSize.estimate").withoutTags() + val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags() + val loopRuntimeMicroseconds = Kamon.histogram("connector.flush.loopRuntimeMicroseconds").withoutTags() diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index dc87ede0..e347a72b 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -7,7 +7,7 @@ import io.netty.handler.codec.http.websocketx.* import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener } import org.apache.pekko.actor.typed.{ ActorRef, Scheduler } -import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Deadline import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key @@ -19,8 +19,12 @@ final private class ActorChannelConnector( workers: EventLoopGroup )(using scheduler: Scheduler, ec: Executor): - private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]() + private val flushQ = ActorChannelConnector.FlushQueue() private val monitor = Monitor.connector.flush + private val flushThread = Future: + while !workers.isShuttingDown && !workers.isTerminated do + val delay = flush().timeLeft.max(0.millis) + Thread.sleep(delay.toMillis, (delay.toNanos % 1_000_000).toInt) private object config: private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key)) @@ -32,8 +36,7 @@ final private class ActorChannelConnector( monitor.config.step.update(step.get()) monitor.config.interval.update(interval.get()) monitor.config.maxDelay.update(maxDelay.get()) - - workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS) + monitor.qSizeReal.record(flushQ.realSizeWithLinearPerformance()) def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() @@ -58,30 +61,49 @@ final private class ActorChannelConnector( .addListener(ChannelFutureListener.CLOSE) case ipc.ClientIn.RoundPingFrameNoFlush => channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in if withFlush || !config.isFlushQEnabled() => + case in if withFlush || flushThread.isCompleted || !config.isFlushQEnabled() => channel.writeAndFlush(TextWebSocketFrame(in.write)) case in => channel.write(TextWebSocketFrame(in.write)) flushQ.add(channel) - private def flush(): Unit = - val qSize = flushQ.size + private def flush(): Deadline = + val entered = Deadline.now + val qSize = flushQ.estimateSize() val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1) var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt) - val nextIntervalMillis = - if config.isFlushQEnabled() then config.interval.get() - else if qSize == 0 then 1000 // hibernate - else 1 // interval is 0 but we still need to empty the queue - - workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS) - - monitor.qSize.record(qSize) - monitor.channelsToFlush.record(channelsToFlush) while channelsToFlush > 0 do - Option(flushQ.poll()) match + flushQ.poll() match case Some(channel) => if channel.isOpen then channel.flush() channelsToFlush -= 1 case _ => channelsToFlush = 0 + + monitor.qSizeEstimate.record(qSize) + monitor.channelsToFlush.record(channelsToFlush) + monitor.loopRuntimeMicroseconds.record((Deadline.now - entered).toMicros) + + entered + { + if config.isFlushQEnabled() then config.interval.get().millis + else if qSize == 0 then 1000.millis // hibernate + else 1.millis // interval is 0 but we still need to empty the queue + } + +object ActorChannelConnector: + private class FlushQueue: + private val queue = java.util.concurrent.ConcurrentLinkedQueue[Channel]() + private val size = java.util.concurrent.atomic.AtomicInteger() + + def add(channel: Channel): Unit = + queue.add(channel) + size.getAndIncrement() + + def poll(): Option[Channel] = + val maybeChannel = Option(queue.poll()) + if maybeChannel.nonEmpty then size.getAndDecrement() + maybeChannel + + def estimateSize(): Int = size.get() + def realSizeWithLinearPerformance(): Int = queue.size()