Skip to content

Commit

Permalink
polygon/sync: blockdownloader to use common.Sleep (#10467)
Browse files Browse the repository at this point in the history
Original intent of this PR was to replace 1 usage of `time.Sleep` in
`polygon/sync/block_downloader.go` with `common.Sleep` which is
context-aware.

However when doing this we discussed that common.Sleep can be improved
to return an `err` to the caller when the parent ctx has been cancelled
so that the caller is made aware of that without having to make
subsequent checks.
  • Loading branch information
taratorio authored May 28, 2024
1 parent 53b3d23 commit afcfe62
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 23 deletions.
4 changes: 3 additions & 1 deletion cmd/observer/observer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func (crawler *Crawler) selectCandidates(ctx context.Context, nodes chan<- candi
}

if len(candidates) == 0 {
libcommon.Sleep(ctx, 1*time.Second)
if err := libcommon.Sleep(ctx, 1*time.Second); err != nil {
return err
}
}

for _, id := range candidates {
Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/observer/diplomacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func (diplomacy *Diplomacy) selectCandidates(ctx context.Context, candidatesChan
}

if len(candidates) == 0 {
libcommon.Sleep(ctx, 1*time.Second)
if err := libcommon.Sleep(ctx, 1*time.Second); err != nil {
return err
}
}

for _, id := range candidates {
Expand Down
1 change: 1 addition & 0 deletions cmd/observer/observer/interrogation_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
InterrogationErrorKeygen
InterrogationErrorFindNode
InterrogationErrorFindNodeTimeout
InterrogationErrorCtxCancelled
)

type InterrogationError struct {
Expand Down
8 changes: 6 additions & 2 deletions cmd/observer/observer/interrogator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func (interrogator *Interrogator) Run(ctx context.Context) (*InterrogationResult
// We need to wait until Server sends a Pong reply to that.
// The remote side is waiting for this Pong no longer than v4_udp.respTimeout.
// If we don't wait, the ENRRequest/FindNode might fail due to errUnknownNode.
libcommon.Sleep(ctx, 500*time.Millisecond)
if err := libcommon.Sleep(ctx, 500*time.Millisecond); err != nil {
return nil, NewInterrogationError(InterrogationErrorCtxCancelled, err)
}

// request client ID
var handshakeResult *DiplomatResult
Expand Down Expand Up @@ -158,7 +160,9 @@ func (interrogator *Interrogator) Run(ctx context.Context) (*InterrogationResult
peersByID[node.ID()] = node
}

libcommon.Sleep(ctx, 1*time.Second)
if err := libcommon.Sleep(ctx, 1*time.Second); err != nil {
return nil, NewInterrogationError(InterrogationErrorCtxCancelled, err)
}
}

peers := valuesOfIDToNodeMap(peersByID)
Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/observer/status_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func StatusLoggerLoop(ctx context.Context, db database.DB, networkID uint, perio
var prevDistinctIPCount uint

for ctx.Err() == nil {
libcommon.Sleep(ctx, period)
if err := libcommon.Sleep(ctx, period); err != nil {
break
}

totalCount, err := db.CountNodes(ctx, maxPingTries, networkID)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ func Retry(
for i := 0; i <= retryCount; i++ {
if i > 0 {
logger.Trace("retrying", "op", opName, "attempt", i, "err", err)
libcommon.Sleep(ctx, delayForAttempt(i))
if err := libcommon.Sleep(ctx, delayForAttempt(i)); err != nil {
return nil, err
}
}
result, err = op(ctx)
if (err == nil) || !isRecoverableError(err) {
Expand Down
17 changes: 11 additions & 6 deletions erigon-lib/common/sleep.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"time"
)

func Sleep(parentContext context.Context, timeout time.Duration) {
if timeout <= 0 {
return
// Sleep which is context aware. Blocks for d duration and returns no error, unless the context
// has been cancelled, timed out, deadline reached, etc. in which case it returns ctx.Err().
func Sleep(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
ctx, cancel := context.WithTimeout(parentContext, timeout)
defer cancel()
<-ctx.Done()
}
27 changes: 27 additions & 0 deletions erigon-lib/common/sleep_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package common

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestSleep(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
start := time.Now()
err := Sleep(ctx, 100*time.Millisecond)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(start), 100*time.Millisecond)

eg := errgroup.Group{}
eg.Go(func() error {
return Sleep(ctx, time.Minute)
})
cancel()
err = eg.Wait()
require.ErrorIs(t, err, context.Canceled)
require.Less(t, time.Since(start), time.Minute)
}
34 changes: 26 additions & 8 deletions polygon/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,19 @@ func (h *heimdall) pollSpans(ctx context.Context, tip SpanId, cb func(*Span)) {
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}

if latestSpan.Id <= tip {
h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
continue
}

Expand All @@ -441,7 +447,10 @@ func (h *heimdall) pollSpans(ctx context.Context, tip SpanId, cb func(*Span)) {
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}
Expand Down Expand Up @@ -489,13 +498,19 @@ func (h *heimdall) pollMilestones(ctx context.Context, tip MilestoneId, cb func(
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}

if count <= int64(tip) {
h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
continue
}

Expand All @@ -506,7 +521,10 @@ func (h *heimdall) pollMilestones(ctx context.Context, tip MilestoneId, cb func(
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}
Expand Down Expand Up @@ -576,6 +594,6 @@ func (h *heimdall) batchFetchCheckpoints(
return checkpoints, nil
}

func (h *heimdall) waitPollingDelay(ctx context.Context) {
common.Sleep(ctx, h.pollDelay)
func (h *heimdall) logPollerSleepCancelled(err error) {
h.logger.Info("poller sleep cancelled", "err", err)
}
4 changes: 2 additions & 2 deletions polygon/heimdall/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (s *Scraper[TEntity]) Run(ctx context.Context) error {

if idRange.Start > idRange.End {
s.syncEvent.SetAndBroadcast()
libcommon.Sleep(ctx, s.pollDelay)
if ctx.Err() != nil {
if err := libcommon.Sleep(ctx, s.pollDelay); err != nil {
s.syncEvent.Reset()
return err
}
} else {
entities, err := s.fetcher.FetchEntitiesRange(ctx, idRange)
Expand Down
5 changes: 4 additions & 1 deletion polygon/sync/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(
"sleepSeconds", d.notEnoughPeersBackOffDuration.Seconds(),
)

time.Sleep(d.notEnoughPeersBackOffDuration)
if err := common.Sleep(ctx, d.notEnoughPeersBackOffDuration); err != nil {
return nil, err
}

continue
}

Expand Down

0 comments on commit afcfe62

Please sign in to comment.