Skip to content

Commit

Permalink
core: extract go routine to separate function
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Jul 22, 2024
1 parent 614fe3a commit d7c8309
Showing 1 changed file with 44 additions and 40 deletions.
84 changes: 44 additions & 40 deletions core/node/rpc/sync/client/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,46 +93,7 @@ func (s *remoteSyncer) Run() {

latestMsgReceived.Store(time.Now())

go func() {
var (
// check every pingTicker if it's time to send a ping req to remote
pingTicker = time.NewTicker(3 * time.Second)
// don't send a ping req if there was activity within recentActivityInterval
recentActivityInterval = 15 * time.Second
// if no message was receiving within recentActivityDeadline assume stream is dead
recentActivityDeadline = 30 * time.Second
)
defer pingTicker.Stop()

for {
select {
case <-pingTicker.C:
now := time.Now()
lastMsgRecv := latestMsgReceived.Load().(time.Time)
if lastMsgRecv.Add(recentActivityDeadline).Before(now) { // no recent activity -> conn dead
s.syncStreamCancel()
return
}

if lastMsgRecv.Add(recentActivityInterval).After(now) { // seen recent activity
continue
}

// send ping to remote to generate activity to check if remote is still alive
if _, err := s.client.PingSync(s.syncStreamCtx, connect.NewRequest(&PingSyncRequest{
SyncId: s.syncID,
Nonce: fmt.Sprintf("%d", now.Unix()),
})); err != nil {
s.syncStreamCancel()
return
}
return

case <-s.syncStreamCtx.Done():
return
}
}
}()
go s.connectionAlive(&latestMsgReceived)

for s.responseStream.Receive() {
if s.syncStreamCtx.Err() != nil {
Expand Down Expand Up @@ -171,6 +132,49 @@ func (s *remoteSyncer) Run() {
}
}

// connectionAlive periodically pings remote to check if the connection is still alive.
// if the remote can't be reach the sync stream is canceled.
func (s *remoteSyncer) connectionAlive(latestMsgReceived *atomic.Value) {
var (
// check every pingTicker if it's time to send a ping req to remote
pingTicker = time.NewTicker(3 * time.Second)
// don't send a ping req if there was activity within recentActivityInterval
recentActivityInterval = 15 * time.Second
// if no message was receiving within recentActivityDeadline assume stream is dead
recentActivityDeadline = 30 * time.Second
)
defer pingTicker.Stop()

for {
select {
case <-pingTicker.C:
now := time.Now()
lastMsgRecv := latestMsgReceived.Load().(time.Time)
if lastMsgRecv.Add(recentActivityDeadline).Before(now) { // no recent activity -> conn dead
s.syncStreamCancel()
return
}

if lastMsgRecv.Add(recentActivityInterval).After(now) { // seen recent activity
continue
}

// send ping to remote to generate activity to check if remote is still alive
if _, err := s.client.PingSync(s.syncStreamCtx, connect.NewRequest(&PingSyncRequest{
SyncId: s.syncID,
Nonce: fmt.Sprintf("%d", now.Unix()),
})); err != nil {
s.syncStreamCancel()
return
}
return

case <-s.syncStreamCtx.Done():
return
}
}
}

func (s *remoteSyncer) Address() common.Address {
return s.remoteAddr
}
Expand Down

0 comments on commit d7c8309

Please sign in to comment.