-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Implement keepalive routine with ping-ponging to ws connection in ws controller #6757
base: master
Are you sure you want to change the base?
[Access] Implement keepalive routine with ping-ponging to ws connection in ws controller #6757
Conversation
…ub.com:The-K-R-O-K/flow-go into UlyanaAndrukhiv/6639-ws-ping-pong
…drukhiv/6639-ws-ping-pong
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6757 +/- ##
==========================================
- Coverage 41.26% 41.24% -0.02%
==========================================
Files 2061 2064 +3
Lines 182702 182900 +198
==========================================
+ Hits 75384 75438 +54
- Misses 101010 101132 +122
- Partials 6308 6330 +22
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
…ub.com:The-K-R-O-K/flow-go into UlyanaAndrukhiv/6638-ws-connection-configuring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the first round of review - looks cool!
…dded some refactoring, added godoc
…R-O-K/flow-go into UlyanaAndrukhiv/6639-ws-ping-pong
// sendPing sends a periodic ping message to the WebSocket client to keep the connection alive. | ||
// | ||
// No errors are expected during normal operation. | ||
func (c *Controller) sendPing() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this abstraction do? Can't we get rid of it and use this code directly in keep-alive routine ?
@@ -0,0 +1,57 @@ | |||
package websockets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has this file name been changed to connections
?
if err := c.sendPing(); err != nil { | ||
// Log error and exit the loop on failure | ||
c.logger.Error().Err(err).Msg("failed to send ping") | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should stop keep-alive only if CloseErr was send to connection. However, I guess I will handle it in #6642 as it will be clear till that time
// This value must be less than pongWait. | ||
PingPeriod = (PongWait * 9) / 10 | ||
|
||
// PongWait specifies the maximum time to wait for a pong message from the peer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this accurate?
// PongWait specifies the maximum time to wait for a pong message from the peer. | |
// PongWait specifies the maximum time to wait for a pong response message from the peer | |
// after sending a ping |
"github.com/onflow/flow-go/utils/concurrentmap" | ||
) | ||
|
||
const ( | ||
// PingPeriod defines the interval at which ping messages are sent to the client. | ||
// This value must be less than pongWait. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why less? intuitively I would have thought it would need to be larger. Can you elaborate more in this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess Ulyana took it from here https://github.com/gorilla/websocket/blob/v1.5.3/examples/chat/client.go#L23.
I believe it's because ping and pong share the same timer.
Let’s consider a case where pongWait
is smaller than pingPeriod
, and we’ll see why this configuration is problematic.
Parameters:
pongWait
= 30s
pingPeriod
= 40s
At t=0:
The server sends a ping message to the client.
At t=30s:
The pongWait
expires because the server hasn't received a pong (or any message) from the client.
The server assumes the connection is dead and closes it.
At t=40s:
The server sends its second ping, but the connection is already closed due to the timeout at t=30s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, but in that case, the server should have cleaned up the ping service when the connection was closed, so the second ping would never happen
// PongWait specifies the maximum time to wait for a pong message from the peer. | ||
PongWait = 10 * time.Second | ||
|
||
// WriteWait specifies the maximum duration allowed to write a message to the peer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a good explanation of what this means in the code. Can you elaborate some more here. Mostly, readers will look to the definitions to understand how to set/modify the values.
select { | ||
case err, ok := <-c.errorChannel: | ||
if !ok { | ||
c.logger.Error().Msg("error channel closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this possible? It looks like the channel is only closed in a defer
c.writeMessagesToClient(ctx) | ||
|
||
// for track all goroutines and error handling | ||
var wg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you think about using errgroup
here instead? it handles both the goroutine lifecycle and passing errors. it has the added benefit that if an error is returned from any other goroutine, the shared context is canceled.
it would look something like
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return c.readMessagesFromClient(ctx)
})
...
err := g.Wait()
if err != nil {
c.shutdownConnection()
}
``
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd leave this decision to be made in #6642 as the error handling/routines start might be changed
The goal of this PR is to introduce keep-alive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why wait? Is this code being introduced in another PR?
defer func(conn *websocket.Conn) { | ||
if err := c.conn.Close(); err != nil { | ||
c.logger.Error().Err(err).Msg("error closing connection") | ||
c.shutdownOnce.Do(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync.Once
has the added functionality that it will block all other callers until the first completes. is that desired here? If not, you can just use an atomic bool with compare and swap
if err := c.conn.Close(); err != nil { | ||
c.logger.Error().Err(err).Msg("error closing connection") | ||
} | ||
close(c.communicationChannel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not safe to close this here, because the data providrs could continue to write out new messages causing a panic.
going back to the original design, we had said that data providers would get a callback function they could call to put new data on this queue. That would allow the controller to decide when to stop accepting new messages and to close this channel safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a code below that stops every provider. So, as this function call is deferred, all data providers will be stopped and "unregistered" when this code executes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code calls Close()
which simply calls the cancel()
function. there's no guarantee the providers have actually stopped.
https://github.com/onflow/flow-go/pull/6636/files#diff-5cbca3503bb00261318db4a8b8b1714447f7348a4d87e11aaf8b37b43b36e2bbR49-R52
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case msg := <-c.communicationChannel: | ||
return ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when the controller is updated to allow c.communicationChannel
to be close when shutdown is triggered, you can remove this check, and instead rely on the channel closing to signal shutdown.
as it is now, if this returns here, the data providers may hang waiting to push onto the queue
case <-pingTicker.C: | ||
if err := c.sendPing(); err != nil { | ||
// Log error and exit the loop on failure | ||
c.logger.Error().Err(err).Msg("failed to send ping") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will get noisy in the logs at error level
c.logger.Error().Err(err).Msg("failed to send ping") | |
c.logger.Debug().Err(err).Msg("failed to send ping") |
PingPeriod = (PongWait * 9) / 10 | ||
|
||
// PongWait specifies the maximum time to wait for a pong message from the peer. | ||
PongWait = 10 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to place it in websockets.Config
type?
err := process(ctx) | ||
if err != nil { | ||
// Check if shutdown has already been called, to avoid multiple shutdowns | ||
if c.shutdown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a data race, isn't it? I'm thinking of the following situation:
- One of the processes crashes with some error without calling shutdown. (e.g. keepalive routine)
- So, we got to this code when we read
c.shutdown
on this line - Simultaneously, another process (e.g. reader routine) called shutdown and touches
c.shutdown
concurrently.
If we need this, we have to use an atomic variable here. However, I don't understand why we need it, can you elaborate on it?
Closes: #6639
Note: ##6750 should be merged first
Context
In this pull request implemented a
keepalive
routine to monitorWebSocket
network connectivity using ping-pong messages.Key Changes
keepalive
logic with forWebSocket
controller to detect and handle connectivity issues and ensured thekeepalive
operates independently, managing ping-pong messages and monitoring connection health.WebSocket
connection and gracefully shut down affected components when issues are detected.