Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send websocket messages without flushing, with periodic flush #600

Closed
wants to merge 4 commits into from

Conversation

ornicar
Copy link
Collaborator

@ornicar ornicar commented Sep 20, 2024

not used yet, just implementing the behaviour

private def unsafeFlush(): Unit =
if counter > 0 then
run()
counter == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be counter = 0 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dammit discarded values!!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think it really matters, the clientEmit closure seems to recreate itself with some regularity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably because i'm reconnecting..

@schlawg
Copy link
Contributor

schlawg commented Sep 20, 2024

i was thinking something like:

final class FlushPeriodically()(using scheduler: Scheduler, ec: Executor):
  val flushQ = scala.collection.mutable.LinkedHashSet.empty[Channel]

  scheduler.scheduleAtFixedRate(0 seconds, 1 millisecond)(() => flushEm())

  def apply(channel: Channel) =
    synchronized: // maybe ConcurrentHashMap could avoid synchronized
      flushQ += channel

  private def flushEm(): Unit =
    val toFlush = synchronized:
      val f = flushQ.take(max(100, flushQ.size / 500) // try to stick to 100/1ms
      flushQ --= f
      f
    toFlush.foreach(_.flush())

mine doesn't do any batching but it does smooth traffic, so maybe we'll need a combo of the two approaches

@schlawg
Copy link
Contributor

schlawg commented Sep 21, 2024

i cannot test the code in this PR because clients just keep reconnecting with the emitToChannelWithPeriodicFlush lambda path. could scala hate me more?

  private def emitToChannel(channel: Channel): ClientEmit =
    // partial function with direct pattern matching
  private def emitToChannelWithPeriodicFlush(channel: Channel): ClientEmit =
    // closure & lambda wrapper
    msg =>
      msg match
        // pattern matching

@ornicar
Copy link
Collaborator Author

ornicar commented Sep 22, 2024

the client reconnects because lila-ws doesn't respond to its null ping with the expected 0 pong.

@schlawg
Copy link
Contributor

schlawg commented Sep 22, 2024

except for adding:

        case ipc.ClientIn.Pong =>
          channel.writeAndFlush("0")

doesn't help.

the client reconnects because lila-ws doesn't respond to its null ping with the expected 0 pong.

yeah, i had commented out all of the periodically stuff, changed to use writeAndFlush in both, and was left with the only difference between the two methods being one was wrapped in another layer of lambda. and swapping between them the wrapped one was still reconnecting every few seconds. but this morning things are behaving, so maybe i only thought i was rebuilding? at least it's me that's crazy and not scalac or netty.

@ornicar ornicar closed this Sep 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants