Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/sync: blockdownloader to use common.Sleep #10467

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/observer/observer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func (crawler *Crawler) selectCandidates(ctx context.Context, nodes chan<- candi
}

if len(candidates) == 0 {
libcommon.Sleep(ctx, 1*time.Second)
if err := libcommon.Sleep(ctx, 1*time.Second); err != nil {
return err
}
}

for _, id := range candidates {
Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/observer/diplomacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func (diplomacy *Diplomacy) selectCandidates(ctx context.Context, candidatesChan
}

if len(candidates) == 0 {
libcommon.Sleep(ctx, 1*time.Second)
if err := libcommon.Sleep(ctx, 1*time.Second); err != nil {
return err
}
}

for _, id := range candidates {
Expand Down
1 change: 1 addition & 0 deletions cmd/observer/observer/interrogation_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
InterrogationErrorKeygen
InterrogationErrorFindNode
InterrogationErrorFindNodeTimeout
InterrogationErrorCtxCancelled
)

type InterrogationError struct {
Expand Down
8 changes: 6 additions & 2 deletions cmd/observer/observer/interrogator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func (interrogator *Interrogator) Run(ctx context.Context) (*InterrogationResult
// We need to wait until Server sends a Pong reply to that.
// The remote side is waiting for this Pong no longer than v4_udp.respTimeout.
// If we don't wait, the ENRRequest/FindNode might fail due to errUnknownNode.
libcommon.Sleep(ctx, 500*time.Millisecond)
if err := libcommon.Sleep(ctx, 500*time.Millisecond); err != nil {
return nil, NewInterrogationError(InterrogationErrorCtxCancelled, err)
}

// request client ID
var handshakeResult *DiplomatResult
Expand Down Expand Up @@ -158,7 +160,9 @@ func (interrogator *Interrogator) Run(ctx context.Context) (*InterrogationResult
peersByID[node.ID()] = node
}

libcommon.Sleep(ctx, 1*time.Second)
if err := libcommon.Sleep(ctx, 1*time.Second); err != nil {
return nil, NewInterrogationError(InterrogationErrorCtxCancelled, err)
}
}

peers := valuesOfIDToNodeMap(peersByID)
Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/observer/status_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func StatusLoggerLoop(ctx context.Context, db database.DB, networkID uint, perio
var prevDistinctIPCount uint

for ctx.Err() == nil {
libcommon.Sleep(ctx, period)
if err := libcommon.Sleep(ctx, period); err != nil {
break
}

totalCount, err := db.CountNodes(ctx, maxPingTries, networkID)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ func Retry(
for i := 0; i <= retryCount; i++ {
if i > 0 {
logger.Trace("retrying", "op", opName, "attempt", i, "err", err)
libcommon.Sleep(ctx, delayForAttempt(i))
if err := libcommon.Sleep(ctx, delayForAttempt(i)); err != nil {
return nil, err
}
}
result, err = op(ctx)
if (err == nil) || !isRecoverableError(err) {
Expand Down
17 changes: 11 additions & 6 deletions erigon-lib/common/sleep.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"time"
)

func Sleep(parentContext context.Context, timeout time.Duration) {
if timeout <= 0 {
return
// Sleep which is context aware. Blocks for d duration and returns no error, unless the context
// has been cancelled, timed out, deadline reached, etc. in which case it returns ctx.Err().
func Sleep(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
ctx, cancel := context.WithTimeout(parentContext, timeout)
defer cancel()
<-ctx.Done()
}
27 changes: 27 additions & 0 deletions erigon-lib/common/sleep_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package common

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestSleep(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
start := time.Now()
err := Sleep(ctx, 100*time.Millisecond)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(start), 100*time.Millisecond)

eg := errgroup.Group{}
eg.Go(func() error {
return Sleep(ctx, time.Minute)
})
cancel()
err = eg.Wait()
require.ErrorIs(t, err, context.Canceled)
require.Less(t, time.Since(start), time.Minute)
}
34 changes: 26 additions & 8 deletions polygon/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,19 @@ func (h *heimdall) pollSpans(ctx context.Context, tip SpanId, cb func(*Span)) {
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}

if latestSpan.Id <= tip {
h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
continue
}

Expand All @@ -441,7 +447,10 @@ func (h *heimdall) pollSpans(ctx context.Context, tip SpanId, cb func(*Span)) {
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}
Expand Down Expand Up @@ -489,13 +498,19 @@ func (h *heimdall) pollMilestones(ctx context.Context, tip MilestoneId, cb func(
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}

if count <= int64(tip) {
h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
continue
}

Expand All @@ -506,7 +521,10 @@ func (h *heimdall) pollMilestones(ctx context.Context, tip MilestoneId, cb func(
"err", err,
)

h.waitPollingDelay(ctx)
if err := common.Sleep(ctx, h.pollDelay); err != nil {
h.logPollerSleepCancelled(err)
return
}
// keep background goroutine alive in case of heimdall errors
continue
}
Expand Down Expand Up @@ -576,6 +594,6 @@ func (h *heimdall) batchFetchCheckpoints(
return checkpoints, nil
}

func (h *heimdall) waitPollingDelay(ctx context.Context) {
common.Sleep(ctx, h.pollDelay)
func (h *heimdall) logPollerSleepCancelled(err error) {
h.logger.Info("poller sleep cancelled", "err", err)
}
4 changes: 2 additions & 2 deletions polygon/heimdall/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (s *Scraper[TEntity]) Run(ctx context.Context) error {

if idRange.Start > idRange.End {
s.syncEvent.SetAndBroadcast()
libcommon.Sleep(ctx, s.pollDelay)
if ctx.Err() != nil {
if err := libcommon.Sleep(ctx, s.pollDelay); err != nil {
s.syncEvent.Reset()
return err
}
} else {
entities, err := s.fetcher.FetchEntitiesRange(ctx, idRange)
Expand Down
5 changes: 4 additions & 1 deletion polygon/sync/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(
"sleepSeconds", d.notEnoughPeersBackOffDuration.Seconds(),
)

time.Sleep(d.notEnoughPeersBackOffDuration)
if err := common.Sleep(ctx, d.notEnoughPeersBackOffDuration); err != nil {
return nil, err
}

continue
}

Expand Down
Loading