diff --git a/src/main/scala/LilaWs.scala b/src/main/scala/LilaWs.scala index f006e551..4c56580f 100644 --- a/src/main/scala/LilaWs.scala +++ b/src/main/scala/LilaWs.scala @@ -11,29 +11,30 @@ object LilaWs extends App: given Scheduler = clientSystem.scheduler given Executor = clientSystem.executionContext - lazy val mongo = wire[Mongo] - lazy val groupedWithin = wire[util.GroupedWithin] - lazy val lightUserApi = wire[LightUserApi] - lazy val lilaRedis = wire[Lila] - lazy val inquirers = wire[Inquirers] - lazy val roundCrowd = wire[RoundCrowd] - lazy val roomCrowd = wire[RoomCrowd] - lazy val crowdJson = wire[ipc.CrowdJson] - lazy val users = wire[Users] - lazy val keepAlive = wire[KeepAlive] - lazy val lobby = wire[Lobby] - lazy val socialGraph = wire[SocialGraph] - lazy val friendList = wire[FriendList] - lazy val stormSign = wire[StormSign] - lazy val lag = wire[Lag] - lazy val evalCache = wire[lila.ws.evalCache.EvalCacheApi] - lazy val services = wire[Services] - lazy val controller = wire[Controller] - lazy val router = wire[Router] - lazy val seenAt = wire[SeenAtUpdate] - lazy val auth = wire[Auth] - lazy val nettyServer = wire[netty.NettyServer] - lazy val monitor = wire[Monitor] + lazy val mongo = wire[Mongo] + lazy val groupedWithin = wire[util.GroupedWithin] + lazy val runPeriodically = wire[util.RunPeriodically] + lazy val lightUserApi = wire[LightUserApi] + lazy val lilaRedis = wire[Lila] + lazy val inquirers = wire[Inquirers] + lazy val roundCrowd = wire[RoundCrowd] + lazy val roomCrowd = wire[RoomCrowd] + lazy val crowdJson = wire[ipc.CrowdJson] + lazy val users = wire[Users] + lazy val keepAlive = wire[KeepAlive] + lazy val lobby = wire[Lobby] + lazy val socialGraph = wire[SocialGraph] + lazy val friendList = wire[FriendList] + lazy val stormSign = wire[StormSign] + lazy val lag = wire[Lag] + lazy val evalCache = wire[lila.ws.evalCache.EvalCacheApi] + lazy val services = wire[Services] + lazy val controller = wire[Controller] + lazy val router = wire[Router] + lazy val seenAt = wire[SeenAtUpdate] + lazy val auth = wire[Auth] + lazy val nettyServer = wire[netty.NettyServer] + lazy val monitor = wire[Monitor] wire[LilaHandler] // must eagerly instanciate! wire[RelayCrowd] // must eagerly instanciate! diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 3108e19c..c8ceb65e 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -9,13 +9,19 @@ import org.apache.pekko.actor.typed.ActorRef import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key +import lila.ws.util.RunPeriodically -final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(using Executor): +final private class ActorChannelConnector( + clients: ActorRef[Clients.Control], + runPeriodically: RunPeriodically +)(using Executor): - def apply(endpoint: Endpoint, channel: Channel): Unit = + def apply(endpoint: Endpoint, channel: Channel, alwaysFlush: Boolean): Unit = val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) - val channelEmit = emitToChannel(channel) + val channelEmit: ClientEmit = + if alwaysFlush then emitToChannel(channel) + else emitToChannelWithPeriodicFlush(channel) val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) => endpoint.emitCounter.increment() channelEmit(msg) @@ -27,12 +33,25 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(us clients ! Clients.Control.Stop(client) } + private inline def emitDisconnect(inline channel: Channel, inline reason: String): Unit = + channel + .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) + .addListener(ChannelFutureListener.CLOSE) + + private inline def emitPingFrame(inline channel: Channel): Unit = + channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } + private def emitToChannel(channel: Channel): ClientEmit = - case ipc.ClientIn.Disconnect(reason) => - channel - .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) - .addListener(ChannelFutureListener.CLOSE) - case ipc.ClientIn.RoundPingFrameNoFlush => - channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in => - channel.writeAndFlush(TextWebSocketFrame(in.write)) + case ipc.ClientIn.Disconnect(reason) => emitDisconnect(channel, reason) + case ipc.ClientIn.RoundPingFrameNoFlush => emitPingFrame(channel) + case in => channel.writeAndFlush(TextWebSocketFrame(in.write)) + + private def emitToChannelWithPeriodicFlush(channel: Channel): ClientEmit = + val periodicFlush = runPeriodically(5, 3.seconds)(() => channel.flush()) + msg => + msg.match + case ipc.ClientIn.Disconnect(reason) => emitDisconnect(channel, reason) + case ipc.ClientIn.RoundPingFrameNoFlush => emitPingFrame(channel) + case in => + channel.write(TextWebSocketFrame(in.write)) + periodicFlush.increment() diff --git a/src/main/scala/netty/NettyServer.scala b/src/main/scala/netty/NettyServer.scala index cbfaa8ac..f6ba4870 100644 --- a/src/main/scala/netty/NettyServer.scala +++ b/src/main/scala/netty/NettyServer.scala @@ -14,10 +14,11 @@ import io.netty.handler.codec.http.* final class NettyServer( clients: ClientSystem, router: Router, - config: Config + config: Config, + runPeriodically: util.RunPeriodically )(using Executor): - private val connector = ActorChannelConnector(clients) + private val connector = ActorChannelConnector(clients, runPeriodically) private val logger = Logger(getClass) def start(): Unit = diff --git a/src/main/scala/netty/ProtocolHandler.scala b/src/main/scala/netty/ProtocolHandler.scala index c152067f..d76a0061 100644 --- a/src/main/scala/netty/ProtocolHandler.scala +++ b/src/main/scala/netty/ProtocolHandler.scala @@ -26,7 +26,7 @@ final private class ProtocolHandler(connector: ActorChannelConnector) evt match case _: WebSocketServerProtocolHandler.HandshakeComplete => // Monitor.count.handshake.inc - connector(ctx.channel.attr(key.endpoint).get, ctx.channel) + connector(ctx.channel.attr(key.endpoint).get, ctx.channel, alwaysFlush = true) case _ => super.userEventTriggered(ctx, evt) diff --git a/src/main/scala/util/RunPeriodically.scala b/src/main/scala/util/RunPeriodically.scala new file mode 100644 index 00000000..b72730a4 --- /dev/null +++ b/src/main/scala/util/RunPeriodically.scala @@ -0,0 +1,34 @@ +package lila.ws +package util + +import org.apache.pekko.actor.Cancellable +import org.apache.pekko.actor.typed.Scheduler + +/** Run a function after a counter hits a ceiling, or periodically after a fixed time interval, if counter > 0 + */ +final class RunPeriodically()(using Scheduler, Executor): + def apply(counterMax: Int, interval: FiniteDuration)(flush: () => Unit) = + RunPeriodicallyStage(counterMax, interval, flush) + +final class RunPeriodicallyStage(counterMax: Int, interval: FiniteDuration, run: () => Unit)(using + scheduler: Scheduler, + ec: Executor +): + + private var counter: Int = 0 + + private var scheduledFlush: Cancellable = scheduler.scheduleOnce(interval, () => flush()) + + def increment(): Unit = + synchronized: + counter += 1 + if counter >= counterMax then unsafeFlush() + + private def flush(): Unit = synchronized { unsafeFlush() } + + private def unsafeFlush(): Unit = + if counter > 0 then + run() + counter = 0 + scheduledFlush.cancel() + scheduledFlush = scheduler.scheduleOnce(interval, () => flush())