diff --git a/core/node/rpc/sync/client/remote.go b/core/node/rpc/sync/client/remote.go index cce7771f8a..92bd6fcfb0 100644 --- a/core/node/rpc/sync/client/remote.go +++ b/core/node/rpc/sync/client/remote.go @@ -93,46 +93,7 @@ func (s *remoteSyncer) Run() { latestMsgReceived.Store(time.Now()) - go func() { - var ( - // check every pingTicker if it's time to send a ping req to remote - pingTicker = time.NewTicker(3 * time.Second) - // don't send a ping req if there was activity within recentActivityInterval - recentActivityInterval = 15 * time.Second - // if no message was receiving within recentActivityDeadline assume stream is dead - recentActivityDeadline = 30 * time.Second - ) - defer pingTicker.Stop() - - for { - select { - case <-pingTicker.C: - now := time.Now() - lastMsgRecv := latestMsgReceived.Load().(time.Time) - if lastMsgRecv.Add(recentActivityDeadline).Before(now) { // no recent activity -> conn dead - s.syncStreamCancel() - return - } - - if lastMsgRecv.Add(recentActivityInterval).After(now) { // seen recent activity - continue - } - - // send ping to remote to generate activity to check if remote is still alive - if _, err := s.client.PingSync(s.syncStreamCtx, connect.NewRequest(&PingSyncRequest{ - SyncId: s.syncID, - Nonce: fmt.Sprintf("%d", now.Unix()), - })); err != nil { - s.syncStreamCancel() - return - } - return - - case <-s.syncStreamCtx.Done(): - return - } - } - }() + go s.connectionAlive(&latestMsgReceived) for s.responseStream.Receive() { if s.syncStreamCtx.Err() != nil { @@ -171,6 +132,49 @@ func (s *remoteSyncer) Run() { } } +// connectionAlive periodically pings remote to check if the connection is still alive. +// if the remote can't be reach the sync stream is canceled. +func (s *remoteSyncer) connectionAlive(latestMsgReceived *atomic.Value) { + var ( + // check every pingTicker if it's time to send a ping req to remote + pingTicker = time.NewTicker(3 * time.Second) + // don't send a ping req if there was activity within recentActivityInterval + recentActivityInterval = 15 * time.Second + // if no message was receiving within recentActivityDeadline assume stream is dead + recentActivityDeadline = 30 * time.Second + ) + defer pingTicker.Stop() + + for { + select { + case <-pingTicker.C: + now := time.Now() + lastMsgRecv := latestMsgReceived.Load().(time.Time) + if lastMsgRecv.Add(recentActivityDeadline).Before(now) { // no recent activity -> conn dead + s.syncStreamCancel() + return + } + + if lastMsgRecv.Add(recentActivityInterval).After(now) { // seen recent activity + continue + } + + // send ping to remote to generate activity to check if remote is still alive + if _, err := s.client.PingSync(s.syncStreamCtx, connect.NewRequest(&PingSyncRequest{ + SyncId: s.syncID, + Nonce: fmt.Sprintf("%d", now.Unix()), + })); err != nil { + s.syncStreamCancel() + return + } + return + + case <-s.syncStreamCtx.Done(): + return + } + } +} + func (s *remoteSyncer) Address() common.Address { return s.remoteAddr }