From 4788c6c6023df71ca8d538906eff1d308384a837 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 24 May 2024 16:32:49 +0100 Subject: [PATCH 1/3] polygon/sync: blockdownloader to use common.Sleep --- polygon/sync/block_downloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polygon/sync/block_downloader.go b/polygon/sync/block_downloader.go index 738aae6cb08..9daacf5b5e4 100644 --- a/polygon/sync/block_downloader.go +++ b/polygon/sync/block_downloader.go @@ -149,7 +149,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp "sleepSeconds", d.notEnoughPeersBackOffDuration.Seconds(), ) - time.Sleep(d.notEnoughPeersBackOffDuration) + common.Sleep(ctx, d.notEnoughPeersBackOffDuration) continue } From 5a25eb70066702190626635d2916d66c46245ccd Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Mon, 27 May 2024 14:55:18 +0100 Subject: [PATCH 2/3] common sleep to return err when cancelled by parent ctx --- cmd/observer/observer/crawler.go | 4 ++- cmd/observer/observer/diplomacy.go | 4 ++- cmd/observer/observer/interrogation_error.go | 1 + cmd/observer/observer/interrogator.go | 8 +++-- cmd/observer/observer/status_logger.go | 4 ++- cmd/observer/utils/retry.go | 4 ++- erigon-lib/common/sleep.go | 17 ++++++---- polygon/heimdall/heimdall.go | 34 +++++++++++++++----- polygon/heimdall/scraper.go | 4 +-- polygon/sync/block_downloader.go | 5 ++- 10 files changed, 62 insertions(+), 23 deletions(-) diff --git a/cmd/observer/observer/crawler.go b/cmd/observer/observer/crawler.go index 94c28d088ea..31c48ea4ea6 100644 --- a/cmd/observer/observer/crawler.go +++ b/cmd/observer/observer/crawler.go @@ -185,7 +185,9 @@ func (crawler *Crawler) selectCandidates(ctx context.Context, nodes chan<- candi } if len(candidates) == 0 { - libcommon.Sleep(ctx, 1*time.Second) + if err := libcommon.Sleep(ctx, 1*time.Second); err != nil { + return err + } } for _, id := range candidates { diff --git a/cmd/observer/observer/diplomacy.go b/cmd/observer/observer/diplomacy.go index 818a88b5c74..331ce7113f3 100644 --- a/cmd/observer/observer/diplomacy.go +++ b/cmd/observer/observer/diplomacy.go @@ -82,7 +82,9 @@ func (diplomacy *Diplomacy) selectCandidates(ctx context.Context, candidatesChan } if len(candidates) == 0 { - libcommon.Sleep(ctx, 1*time.Second) + if err := libcommon.Sleep(ctx, 1*time.Second); err != nil { + return err + } } for _, id := range candidates { diff --git a/cmd/observer/observer/interrogation_error.go b/cmd/observer/observer/interrogation_error.go index bc99dd93a9c..455766c80df 100644 --- a/cmd/observer/observer/interrogation_error.go +++ b/cmd/observer/observer/interrogation_error.go @@ -12,6 +12,7 @@ const ( InterrogationErrorKeygen InterrogationErrorFindNode InterrogationErrorFindNodeTimeout + InterrogationErrorCtxCancelled ) type InterrogationError struct { diff --git a/cmd/observer/observer/interrogator.go b/cmd/observer/observer/interrogator.go index 6bf828f28f3..262f703288a 100644 --- a/cmd/observer/observer/interrogator.go +++ b/cmd/observer/observer/interrogator.go @@ -88,7 +88,9 @@ func (interrogator *Interrogator) Run(ctx context.Context) (*InterrogationResult // We need to wait until Server sends a Pong reply to that. // The remote side is waiting for this Pong no longer than v4_udp.respTimeout. // If we don't wait, the ENRRequest/FindNode might fail due to errUnknownNode. - libcommon.Sleep(ctx, 500*time.Millisecond) + if err := libcommon.Sleep(ctx, 500*time.Millisecond); err != nil { + return nil, NewInterrogationError(InterrogationErrorCtxCancelled, err) + } // request client ID var handshakeResult *DiplomatResult @@ -158,7 +160,9 @@ func (interrogator *Interrogator) Run(ctx context.Context) (*InterrogationResult peersByID[node.ID()] = node } - libcommon.Sleep(ctx, 1*time.Second) + if err := libcommon.Sleep(ctx, 1*time.Second); err != nil { + return nil, NewInterrogationError(InterrogationErrorCtxCancelled, err) + } } peers := valuesOfIDToNodeMap(peersByID) diff --git a/cmd/observer/observer/status_logger.go b/cmd/observer/observer/status_logger.go index 974e242d764..545cc12aad8 100644 --- a/cmd/observer/observer/status_logger.go +++ b/cmd/observer/observer/status_logger.go @@ -17,7 +17,9 @@ func StatusLoggerLoop(ctx context.Context, db database.DB, networkID uint, perio var prevDistinctIPCount uint for ctx.Err() == nil { - libcommon.Sleep(ctx, period) + if err := libcommon.Sleep(ctx, period); err != nil { + break + } totalCount, err := db.CountNodes(ctx, maxPingTries, networkID) if err != nil { diff --git a/cmd/observer/utils/retry.go b/cmd/observer/utils/retry.go index 9f0cf0917c3..030b41ac347 100644 --- a/cmd/observer/utils/retry.go +++ b/cmd/observer/utils/retry.go @@ -24,7 +24,9 @@ func Retry( for i := 0; i <= retryCount; i++ { if i > 0 { logger.Trace("retrying", "op", opName, "attempt", i, "err", err) - libcommon.Sleep(ctx, delayForAttempt(i)) + if err := libcommon.Sleep(ctx, delayForAttempt(i)); err != nil { + return nil, err + } } result, err = op(ctx) if (err == nil) || !isRecoverableError(err) { diff --git a/erigon-lib/common/sleep.go b/erigon-lib/common/sleep.go index e326df6f964..d6ee8298e9b 100644 --- a/erigon-lib/common/sleep.go +++ b/erigon-lib/common/sleep.go @@ -5,11 +5,16 @@ import ( "time" ) -func Sleep(parentContext context.Context, timeout time.Duration) { - if timeout <= 0 { - return +// Sleep which is context aware. Blocks for d duration and returns no error, unless the context +// has been cancelled, timed out, deadline reached, etc. in which case it returns ctx.Err(). +func Sleep(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil } - ctx, cancel := context.WithTimeout(parentContext, timeout) - defer cancel() - <-ctx.Done() } diff --git a/polygon/heimdall/heimdall.go b/polygon/heimdall/heimdall.go index 0fa88de79a8..a969197e50e 100644 --- a/polygon/heimdall/heimdall.go +++ b/polygon/heimdall/heimdall.go @@ -424,13 +424,19 @@ func (h *heimdall) pollSpans(ctx context.Context, tip SpanId, cb func(*Span)) { "err", err, ) - h.waitPollingDelay(ctx) + if err := common.Sleep(ctx, h.pollDelay); err != nil { + h.logPollerSleepCancelled(err) + return + } // keep background goroutine alive in case of heimdall errors continue } if latestSpan.Id <= tip { - h.waitPollingDelay(ctx) + if err := common.Sleep(ctx, h.pollDelay); err != nil { + h.logPollerSleepCancelled(err) + return + } continue } @@ -441,7 +447,10 @@ func (h *heimdall) pollSpans(ctx context.Context, tip SpanId, cb func(*Span)) { "err", err, ) - h.waitPollingDelay(ctx) + if err := common.Sleep(ctx, h.pollDelay); err != nil { + h.logPollerSleepCancelled(err) + return + } // keep background goroutine alive in case of heimdall errors continue } @@ -489,13 +498,19 @@ func (h *heimdall) pollMilestones(ctx context.Context, tip MilestoneId, cb func( "err", err, ) - h.waitPollingDelay(ctx) + if err := common.Sleep(ctx, h.pollDelay); err != nil { + h.logPollerSleepCancelled(err) + return + } // keep background goroutine alive in case of heimdall errors continue } if count <= int64(tip) { - h.waitPollingDelay(ctx) + if err := common.Sleep(ctx, h.pollDelay); err != nil { + h.logPollerSleepCancelled(err) + return + } continue } @@ -506,7 +521,10 @@ func (h *heimdall) pollMilestones(ctx context.Context, tip MilestoneId, cb func( "err", err, ) - h.waitPollingDelay(ctx) + if err := common.Sleep(ctx, h.pollDelay); err != nil { + h.logPollerSleepCancelled(err) + return + } // keep background goroutine alive in case of heimdall errors continue } @@ -576,6 +594,6 @@ func (h *heimdall) batchFetchCheckpoints( return checkpoints, nil } -func (h *heimdall) waitPollingDelay(ctx context.Context) { - common.Sleep(ctx, h.pollDelay) +func (h *heimdall) logPollerSleepCancelled(err error) { + h.logger.Info("poller sleep cancelled", "err", err) } diff --git a/polygon/heimdall/scraper.go b/polygon/heimdall/scraper.go index 83363c039ba..75b7471150d 100644 --- a/polygon/heimdall/scraper.go +++ b/polygon/heimdall/scraper.go @@ -64,9 +64,9 @@ func (s *Scraper[TEntity]) Run(ctx context.Context) error { if idRange.Start > idRange.End { s.syncEvent.SetAndBroadcast() - libcommon.Sleep(ctx, s.pollDelay) - if ctx.Err() != nil { + if err := libcommon.Sleep(ctx, s.pollDelay); err != nil { s.syncEvent.Reset() + return err } } else { entities, err := s.fetcher.FetchEntitiesRange(ctx, idRange) diff --git a/polygon/sync/block_downloader.go b/polygon/sync/block_downloader.go index ce7beb336ab..bbd9ba2a338 100644 --- a/polygon/sync/block_downloader.go +++ b/polygon/sync/block_downloader.go @@ -158,7 +158,10 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints( "sleepSeconds", d.notEnoughPeersBackOffDuration.Seconds(), ) - common.Sleep(ctx, d.notEnoughPeersBackOffDuration) + if err := common.Sleep(ctx, d.notEnoughPeersBackOffDuration); err != nil { + return nil, err + } + continue } From b46e69b0a92898f9a14a5290546c07d3f833bc0c Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Mon, 27 May 2024 15:14:05 +0100 Subject: [PATCH 3/3] add test for common sleep --- erigon-lib/common/sleep_test.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 erigon-lib/common/sleep_test.go diff --git a/erigon-lib/common/sleep_test.go b/erigon-lib/common/sleep_test.go new file mode 100644 index 00000000000..699c1e366a9 --- /dev/null +++ b/erigon-lib/common/sleep_test.go @@ -0,0 +1,27 @@ +package common + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestSleep(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + start := time.Now() + err := Sleep(ctx, 100*time.Millisecond) + require.NoError(t, err) + require.GreaterOrEqual(t, time.Since(start), 100*time.Millisecond) + + eg := errgroup.Group{} + eg.Go(func() error { + return Sleep(ctx, time.Minute) + }) + cancel() + err = eg.Wait() + require.ErrorIs(t, err, context.Canceled) + require.Less(t, time.Since(start), time.Minute) +}