diff --git a/Makefile b/Makefile index a7836ba6d..4420b17ae 100644 --- a/Makefile +++ b/Makefile @@ -8,3 +8,7 @@ godoc: go install golang.org/x/tools/cmd/godoc@latest # http://localhost:6060/pkg/github.com/smartcontractkit/chainlink-relay/ godoc -http=:6060 + +.PHONY: mockery +mockery: $(mockery) ## Install mockery. + go install github.com/vektra/mockery/v2@v2.28.2 \ No newline at end of file diff --git a/pkg/headtracker/head_broadcaster.go b/pkg/headtracker/head_broadcaster.go new file mode 100644 index 000000000..21372435d --- /dev/null +++ b/pkg/headtracker/head_broadcaster.go @@ -0,0 +1,163 @@ +package headtracker + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-relay/pkg/logger" + "github.com/smartcontractkit/chainlink-relay/pkg/types" + "github.com/smartcontractkit/chainlink-relay/pkg/utils" +) + +const TrackableCallbackTimeout = 2 * time.Second + +type callbackSet[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] map[int]types.HeadTrackable[H, BLOCK_HASH] + +func (set callbackSet[H, BLOCK_HASH]) values() []types.HeadTrackable[H, BLOCK_HASH] { + var values []types.HeadTrackable[H, BLOCK_HASH] + for _, callback := range set { + values = append(values, callback) + } + return values +} + +type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct { + logger logger.Logger + callbacks callbackSet[H, BLOCK_HASH] + mailbox *utils.Mailbox[H] + mutex *sync.Mutex + chClose utils.StopChan + wgDone sync.WaitGroup + utils.StartStopOnce + latest H + lastCallbackID int +} + +// NewHeadBroadcaster creates a new HeadBroadcaster +func NewHeadBroadcaster[ + H types.Head[BLOCK_HASH], + BLOCK_HASH types.Hashable, +]( + lggr logger.Logger, +) *HeadBroadcaster[H, BLOCK_HASH] { + return &HeadBroadcaster[H, BLOCK_HASH]{ + logger: logger.Named(lggr, "HeadBroadcaster"), + callbacks: make(callbackSet[H, BLOCK_HASH]), + mailbox: utils.NewSingleMailbox[H](), + mutex: &sync.Mutex{}, + chClose: make(chan struct{}), + wgDone: sync.WaitGroup{}, + StartStopOnce: utils.StartStopOnce{}, + } +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Start(context.Context) error { + return hb.StartOnce("HeadBroadcaster", func() error { + hb.wgDone.Add(1) + go hb.run() + return nil + }) +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Close() error { + return hb.StopOnce("HeadBroadcaster", func() error { + hb.mutex.Lock() + // clear all callbacks + hb.callbacks = make(callbackSet[H, BLOCK_HASH]) + hb.mutex.Unlock() + + close(hb.chClose) + hb.wgDone.Wait() + return nil + }) +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Name() string { + return hb.logger.Name() +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error { + return map[string]error{hb.Name(): hb.StartStopOnce.Healthy()} +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) { + hb.mailbox.Deliver(head) +} + +// Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed, +// or unsubscribe callback is called explicitly +func (hb *HeadBroadcaster[H, BLOCK_HASH]) Subscribe(callback types.HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) { + hb.mutex.Lock() + defer hb.mutex.Unlock() + + currentLongestChain = hb.latest + + hb.lastCallbackID++ + callbackID := hb.lastCallbackID + hb.callbacks[callbackID] = callback + unsubscribe = func() { + hb.mutex.Lock() + defer hb.mutex.Unlock() + delete(hb.callbacks, callbackID) + } + + return +} + +func (hb *HeadBroadcaster[H, BLOCK_HASH]) run() { + defer hb.wgDone.Done() + + for { + select { + case <-hb.chClose: + return + case <-hb.mailbox.Notify(): + hb.executeCallbacks() + } + } +} + +// DEV: the head relayer makes no promises about head delivery! Subscribing +// Jobs should expect to the relayer to skip heads if there is a large number of listeners +// and all callbacks cannot be completed in the allotted time. +func (hb *HeadBroadcaster[H, BLOCK_HASH]) executeCallbacks() { + head, exists := hb.mailbox.Retrieve() + if !exists { + hb.logger.Info("No head to retrieve. It might have been skipped") + return + } + + hb.mutex.Lock() + callbacks := hb.callbacks.values() + hb.latest = head + hb.mutex.Unlock() + + hb.logger.Debugw("Initiating callbacks", + "headNum", head.BlockNumber(), + "numCallbacks", len(callbacks), + ) + + wg := sync.WaitGroup{} + wg.Add(len(callbacks)) + + ctx, cancel := hb.chClose.NewCtx() + defer cancel() + + for _, callback := range callbacks { + go func(trackable types.HeadTrackable[H, BLOCK_HASH]) { + defer wg.Done() + start := time.Now() + cctx, cancel := context.WithTimeout(ctx, TrackableCallbackTimeout) + defer cancel() + trackable.OnNewLongestChain(cctx, head) + elapsed := time.Since(start) + hb.logger.Debugw(fmt.Sprintf("Finished callback in %s", elapsed), + "callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed) + }(callback) + } + + wg.Wait() +} diff --git a/pkg/headtracker/head_listener.go b/pkg/headtracker/head_listener.go new file mode 100644 index 000000000..804d81f5b --- /dev/null +++ b/pkg/headtracker/head_listener.go @@ -0,0 +1,218 @@ +package headtracker + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + htrktypes "github.com/smartcontractkit/chainlink-relay/pkg/headtracker/types" + "github.com/smartcontractkit/chainlink-relay/pkg/logger" + "github.com/smartcontractkit/chainlink-relay/pkg/types" + "github.com/smartcontractkit/chainlink-relay/pkg/utils" +) + +var ( + promNumHeadsReceived = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "head_tracker_heads_received", + Help: "The total number of heads seen", + }, []string{"ChainID"}) + promEthConnectionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "head_tracker_connection_errors", + Help: "The total number of node connection errors", + }, []string{"ChainID"}) +) + +type HeadListener[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S types.Subscription, + ID types.ID, + BLOCK_HASH types.Hashable, +] struct { + config htrktypes.Config + client htrktypes.Client[HTH, S, ID, BLOCK_HASH] + logger logger.Logger + chStop utils.StopChan + chHeaders chan HTH + headSubscription types.Subscription + connected atomic.Bool + receivingHeads atomic.Bool +} + +func NewHeadListener[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S types.Subscription, + ID types.ID, + BLOCK_HASH types.Hashable, + CLIENT htrktypes.Client[HTH, S, ID, BLOCK_HASH], +]( + lggr logger.Logger, + client CLIENT, + config htrktypes.Config, + chStop chan struct{}, +) *HeadListener[HTH, S, ID, BLOCK_HASH] { + return &HeadListener[HTH, S, ID, BLOCK_HASH]{ + config: config, + client: client, + logger: logger.Named(lggr, "HeadListener"), + chStop: chStop, + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Name() string { + return hl.logger.Name() +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH], done func()) { + defer done() + defer hl.unsubscribe() + + ctx, cancel := hl.chStop.NewCtx() + defer cancel() + + for { + if !hl.subscribe(ctx) { + break + } + err := hl.receiveHeaders(ctx, handleNewHead) + if ctx.Err() != nil { + break + } else if err != nil { + hl.logger.Errorw("Error in new head subscription, unsubscribed", "err", err) + continue + } else { + break + } + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ReceivingHeads() bool { + return hl.receivingHeads.Load() +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Connected() bool { + return hl.connected.Load() +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { + var err error + if !hl.ReceivingHeads() { + err = errors.New("Listener is not receiving heads") + } + if !hl.Connected() { + err = errors.New("Listener is not connected") + } + return map[string]error{hl.Name(): err} +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH]) error { + var noHeadsAlarmC <-chan time.Time + var noHeadsAlarmT *time.Ticker + noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold() + if noHeadsAlarmDuration > 0 { + noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration) + noHeadsAlarmC = noHeadsAlarmT.C + } + + for { + select { + case <-hl.chStop: + return nil + + case blockHeader, open := <-hl.chHeaders: + chainId := hl.client.ConfiguredChainID() + if noHeadsAlarmT != nil { + // We've received a head, reset the no heads alarm + noHeadsAlarmT.Stop() + noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration) + noHeadsAlarmC = noHeadsAlarmT.C + } + hl.receivingHeads.Store(true) + if !open { + return errors.New("head listener: chHeaders prematurely closed") + } + if !blockHeader.IsValid() { + hl.logger.Error("got nil block header") + continue + } + + // Compare the chain ID of the block header to the chain ID of the client + if !blockHeader.HasChainID() || blockHeader.ChainID().String() != chainId.String() { + hl.logger.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID()) + } + promNumHeadsReceived.WithLabelValues(chainId.String()).Inc() + + err := handleNewHead(ctx, blockHeader) + if ctx.Err() != nil { + return nil + } else if err != nil { + return err + } + + case err, open := <-hl.headSubscription.Err(): + // err can be nil, because of using chainIDSubForwarder + if !open || err == nil { + return errors.New("head listener: subscription Err channel prematurely closed") + } + return err + + case <-noHeadsAlarmC: + // We haven't received a head on the channel for a long time, log a warning + hl.logger.Warnf("have not received a head for %v", noHeadsAlarmDuration) + hl.receivingHeads.Store(false) + } + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) bool { + subscribeRetryBackoff := utils.NewRedialBackoff() + + chainId := hl.client.ConfiguredChainID() + + for { + hl.unsubscribe() + + hl.logger.Debugf("Subscribing to new heads on chain %s", chainId.String()) + + select { + case <-hl.chStop: + return false + + case <-time.After(subscribeRetryBackoff.Duration()): + err := hl.subscribeToHead(ctx) + if err != nil { + promEthConnectionErrors.WithLabelValues(chainId.String()).Inc() + hl.logger.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err) + } else { + hl.logger.Debugf("Subscribed to heads on chain %s", chainId.String()) + return true + } + } + } +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Context) error { + hl.chHeaders = make(chan HTH) + + var err error + hl.headSubscription, err = hl.client.SubscribeNewHead(ctx, hl.chHeaders) + if err != nil { + close(hl.chHeaders) + return errors.Wrap(err, "EthClient#SubscribeNewHead") + } + + hl.connected.Store(true) + + return nil +} + +func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) unsubscribe() { + if hl.headSubscription != nil { + hl.connected.Store(false) + hl.headSubscription.Unsubscribe() + hl.headSubscription = nil + } +} diff --git a/pkg/headtracker/head_tracker.go b/pkg/headtracker/head_tracker.go new file mode 100644 index 000000000..24118fef4 --- /dev/null +++ b/pkg/headtracker/head_tracker.go @@ -0,0 +1,362 @@ +package headtracker + +import ( + "context" + "fmt" + "math/big" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/exp/maps" + + htrktypes "github.com/smartcontractkit/chainlink-relay/pkg/headtracker/types" + "github.com/smartcontractkit/chainlink-relay/pkg/logger" + "github.com/smartcontractkit/chainlink-relay/pkg/types" + "github.com/smartcontractkit/chainlink-relay/pkg/utils" +) + +var ( + promCurrentHead = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "head_tracker_current_head", + Help: "The highest seen head number", + }, []string{"evmChainID"}) + + promOldHead = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "head_tracker_very_old_head", + Help: "Counter is incremented every time we get a head that is much lower than the highest seen head ('much lower' is defined as a block that is EVM.FinalityDepth or greater below the highest seen head)", + }, []string{"evmChainID"}) +) + +// HeadsBufferSize - The buffer is used when heads sampling is disabled, to ensure the callback is run for every head +const HeadsBufferSize = 10 + +type HeadTracker[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S types.Subscription, + ID types.ID, + BLOCK_HASH types.Hashable, +] struct { + log logger.Logger + headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH] + headSaver types.HeadSaver[HTH, BLOCK_HASH] + mailMon *utils.MailboxMonitor + client htrktypes.Client[HTH, S, ID, BLOCK_HASH] + chainID ID + config htrktypes.Config + htConfig htrktypes.HeadTrackerConfig + + backfillMB *utils.Mailbox[HTH] + broadcastMB *utils.Mailbox[HTH] + headListener types.HeadListener[HTH, BLOCK_HASH] + chStop utils.StopChan + wgDone sync.WaitGroup + utils.StartStopOnce + getNilHead func() HTH +} + +// NewHeadTracker instantiates a new HeadTracker using HeadSaver to persist new block numbers. +func NewHeadTracker[ + HTH htrktypes.Head[BLOCK_HASH, ID], + S types.Subscription, + ID types.ID, + BLOCK_HASH types.Hashable, +]( + lggr logger.Logger, + client htrktypes.Client[HTH, S, ID, BLOCK_HASH], + config htrktypes.Config, + htConfig htrktypes.HeadTrackerConfig, + headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH], + headSaver types.HeadSaver[HTH, BLOCK_HASH], + mailMon *utils.MailboxMonitor, + getNilHead func() HTH, +) types.HeadTracker[HTH, BLOCK_HASH] { + chStop := make(chan struct{}) + lggr = logger.Named(lggr, "HeadTracker") + return &HeadTracker[HTH, S, ID, BLOCK_HASH]{ + headBroadcaster: headBroadcaster, + client: client, + chainID: client.ConfiguredChainID(), + config: config, + htConfig: htConfig, + log: lggr, + backfillMB: utils.NewSingleMailbox[HTH](), + broadcastMB: utils.NewMailbox[HTH](HeadsBufferSize), + chStop: chStop, + headListener: NewHeadListener[HTH, S, ID, BLOCK_HASH](lggr, client, config, chStop), + headSaver: headSaver, + mailMon: mailMon, + getNilHead: getNilHead, + } +} + +// Start starts HeadTracker service. +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error { + return ht.StartOnce("HeadTracker", func() error { + ht.log.Debugw("Starting HeadTracker", "chainID", ht.chainID) + latestChain, err := ht.headSaver.Load(ctx) + if err != nil { + return err + } + if latestChain.IsValid() { + ht.log.Debugw( + fmt.Sprintf("HeadTracker: Tracking logs from last block %v with hash %s", htrktypes.FriendlyNumber(latestChain.BlockNumber()), latestChain.BlockHash()), + "blockNumber", latestChain.BlockNumber(), + "blockHash", latestChain.BlockHash(), + ) + } + + // NOTE: Always try to start the head tracker off with whatever the + // latest head is, without waiting for the subscription to send us one. + // + // In some cases the subscription will send us the most recent head + // anyway when we connect (but we should not rely on this because it is + // not specced). If it happens this is fine, and the head will be + // ignored as a duplicate. + initialHead, err := ht.getInitialHead(ctx) + if err != nil { + if errors.Is(err, ctx.Err()) { + return nil + } + ht.log.Errorw("Error getting initial head", "err", err) + } else if initialHead.IsValid() { + if err := ht.handleNewHead(ctx, initialHead); err != nil { + return errors.Wrap(err, "error handling initial head") + } + } else { + ht.log.Debug("Got nil initial head") + } + + ht.wgDone.Add(3) + go ht.headListener.ListenForNewHeads(ht.handleNewHead, ht.wgDone.Done) + go ht.backfillLoop() + go ht.broadcastLoop() + + ht.mailMon.Monitor(ht.broadcastMB, "HeadTracker", "Broadcast", ht.chainID.String()) + + return nil + }) +} + +// Close stops HeadTracker service. +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Close() error { + return ht.StopOnce("HeadTracker", func() error { + close(ht.chStop) + ht.wgDone.Wait() + return ht.broadcastMB.Close() + }) +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Name() string { + return ht.log.Name() +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { + report := map[string]error{ + ht.Name(): ht.StartStopOnce.Healthy(), + } + maps.Copy(report, ht.headListener.HealthReport()) + return report +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, depth uint) (err error) { + if uint(headWithChain.ChainLength()) >= depth { + return nil + } + + baseHeight := headWithChain.BlockNumber() - int64(depth-1) + if baseHeight < 0 { + baseHeight = 0 + } + + return ht.backfill(ctx, headWithChain.EarliestHeadInChain(), baseHeight) +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) LatestChain() HTH { + return ht.headSaver.LatestChain() +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) getInitialHead(ctx context.Context) (HTH, error) { + head, err := ht.client.HeadByNumber(ctx, nil) + if err != nil { + return ht.getNilHead(), errors.Wrap(err, "failed to fetch initial head") + } + loggerFields := []interface{}{"head", head} + if head.IsValid() { + loggerFields = append(loggerFields, "blockNumber", head.BlockNumber(), "blockHash", head.BlockHash()) + } + ht.log.Debugw("Got initial head", loggerFields...) + return head, nil +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, head HTH) error { + prevHead := ht.headSaver.LatestChain() + + ht.log.Debugw(fmt.Sprintf("Received new head %v", htrktypes.FriendlyNumber(head.BlockNumber())), + "blockHeight", head.BlockNumber(), + "blockHash", head.BlockHash(), + "parentHeadHash", head.GetParentHash(), + ) + + err := ht.headSaver.Save(ctx, head) + if ctx.Err() != nil { + return nil + } else if err != nil { + return errors.Wrapf(err, "failed to save head: %#v", head) + } + + if !prevHead.IsValid() || head.BlockNumber() > prevHead.BlockNumber() { + promCurrentHead.WithLabelValues(ht.chainID.String()).Set(float64(head.BlockNumber())) + + headWithChain := ht.headSaver.Chain(head.BlockHash()) + if !headWithChain.IsValid() { + return errors.Errorf("HeadTracker#handleNewHighestHead headWithChain was unexpectedly nil") + } + ht.backfillMB.Deliver(headWithChain) + ht.broadcastMB.Deliver(headWithChain) + } else if head.BlockNumber() == prevHead.BlockNumber() { + if head.BlockHash() != prevHead.BlockHash() { + ht.log.Debugw("Got duplicate head", "blockNum", head.BlockNumber(), "head", head.BlockHash(), "prevHead", prevHead.BlockHash()) + } else { + ht.log.Debugw("Head already in the database", "head", head.BlockHash()) + } + } else { + ht.log.Debugw("Got out of order head", "blockNum", head.BlockNumber(), "head", head.BlockHash(), "prevHead", prevHead.BlockNumber()) + prevUnFinalizedHead := prevHead.BlockNumber() - int64(ht.config.FinalityDepth()) + if head.BlockNumber() < prevUnFinalizedHead { + promOldHead.WithLabelValues(ht.chainID.String()).Inc() + // ht.log.Criticalf("Got very old block with number %d (highest seen was %d). This is a problem and either means a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", head.BlockNumber(), prevHead.BlockNumber()) + logger.Criticalf(ht.log, "Got very old block with number %d (highest seen was %d). This is a problem and either means a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", head.BlockNumber(), prevHead.BlockNumber()) + ht.SvcErrBuffer.Append(errors.New("got very old block")) + } + } + return nil +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) broadcastLoop() { + defer ht.wgDone.Done() + + samplingInterval := ht.htConfig.SamplingInterval() + if samplingInterval > 0 { + ht.log.Debugf("Head sampling is enabled - sampling interval is set to: %v", samplingInterval) + debounceHead := time.NewTicker(samplingInterval) + defer debounceHead.Stop() + for { + select { + case <-ht.chStop: + return + case <-debounceHead.C: + item := ht.broadcastMB.RetrieveLatestAndClear() + if !item.IsValid() { + continue + } + ht.headBroadcaster.BroadcastNewLongestChain(item) + } + } + } else { + ht.log.Info("Head sampling is disabled - callback will be called on every head") + for { + select { + case <-ht.chStop: + return + case <-ht.broadcastMB.Notify(): + for { + item, exists := ht.broadcastMB.Retrieve() + if !exists { + break + } + ht.headBroadcaster.BroadcastNewLongestChain(item) + } + } + } + } +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() { + defer ht.wgDone.Done() + + ctx, cancel := ht.chStop.NewCtx() + defer cancel() + + for { + select { + case <-ht.chStop: + return + case <-ht.backfillMB.Notify(): + for { + head, exists := ht.backfillMB.Retrieve() + if !exists { + break + } + { + err := ht.Backfill(ctx, head, uint(ht.config.FinalityDepth())) + if err != nil { + ht.log.Warnw("Unexpected error while backfilling heads", "err", err) + } else if ctx.Err() != nil { + break + } + } + } + } + } +} + +// backfill fetches all missing heads up until the base height +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head types.Head[BLOCK_HASH], baseHeight int64) (err error) { + headNumberInt64 := head.BlockNumber() + if headNumberInt64 <= baseHeight { + return nil + } + mark := time.Now() + fetched := 0 + l := logger.With(ht.log, "blockNumber", headNumberInt64, + "n", headNumberInt64-baseHeight, + "fromBlockHeight", baseHeight, + "toBlockHeight", headNumberInt64-1) + l.Debug("Starting backfill") + defer func() { + if ctx.Err() != nil { + l.Warnw("Backfill context error", "err", ctx.Err()) + return + } + l.Debugw("Finished backfill", + "fetched", fetched, + "time", time.Since(mark), + "err", err) + }() + + for i := head.BlockNumber() - 1; i >= baseHeight; i-- { + // NOTE: Sequential requests here mean it's a potential performance bottleneck, be aware! + existingHead := ht.headSaver.Chain(head.GetParentHash()) + if existingHead.IsValid() { + head = existingHead + continue + } + head, err = ht.fetchAndSaveHead(ctx, i) + fetched++ + if ctx.Err() != nil { + ht.log.Debugw("context canceled, aborting backfill", "err", err, "ctx.Err", ctx.Err()) + break + } else if err != nil { + return errors.Wrap(err, "fetchAndSaveHead failed") + } + } + return +} + +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) fetchAndSaveHead(ctx context.Context, n int64) (HTH, error) { + ht.log.Debugw("Fetching head", "blockHeight", n) + head, err := ht.client.HeadByNumber(ctx, big.NewInt(n)) + if err != nil { + return ht.getNilHead(), err + } else if !head.IsValid() { + return ht.getNilHead(), errors.New("got nil head") + } + err = ht.headSaver.Save(ctx, head) + if err != nil { + return ht.getNilHead(), err + } + return head, nil +} diff --git a/pkg/headtracker/types/client.go b/pkg/headtracker/types/client.go new file mode 100644 index 000000000..7bcb9096b --- /dev/null +++ b/pkg/headtracker/types/client.go @@ -0,0 +1,19 @@ +package types + +import ( + "context" + "math/big" + + "github.com/smartcontractkit/chainlink-relay/pkg/types" +) + +//go:generate mockery --quiet --name Client --output ./mocks/ --case=underscore + +type Client[H types.Head[BLOCK_HASH], S types.Subscription, ID types.ID, BLOCK_HASH types.Hashable] interface { + HeadByNumber(ctx context.Context, number *big.Int) (head H, err error) + // ConfiguredChainID returns the chain ID that the node is configured to connect to + ConfiguredChainID() (id ID) + // SubscribeNewHead is the method in which the client receives new Head. + // It can be implemented differently for each chain i.e websocket, polling, etc + SubscribeNewHead(ctx context.Context, ch chan<- H) (S, error) +} diff --git a/pkg/headtracker/types/config.go b/pkg/headtracker/types/config.go new file mode 100644 index 000000000..6dcdcb093 --- /dev/null +++ b/pkg/headtracker/types/config.go @@ -0,0 +1,26 @@ +package types + +import ( + "fmt" + "math/big" + "time" + + "golang.org/x/exp/constraints" +) + +type Config interface { + BlockEmissionIdleWarningThreshold() time.Duration + FinalityDepth() uint32 +} + +type HeadTrackerConfig interface { + HistoryDepth() uint32 + MaxBufferSize() uint32 + SamplingInterval() time.Duration +} + +// FriendlyNumber returns a string printing the integer or big.Int in both +// decimal and hexadecimal formats. +func FriendlyNumber[N constraints.Integer | *big.Int](n N) string { + return fmt.Sprintf("#%[1]v (0x%[1]x)", n) +} diff --git a/pkg/headtracker/types/head.go b/pkg/headtracker/types/head.go new file mode 100644 index 000000000..bdd77f517 --- /dev/null +++ b/pkg/headtracker/types/head.go @@ -0,0 +1,14 @@ +package types + +import "github.com/smartcontractkit/chainlink-relay/pkg/types" + +//go:generate mockery --quiet --name Head --output ./mocks/ --case=underscore +type Head[BLOCK_HASH types.Hashable, CHAIN_ID types.ID] interface { + types.Head[BLOCK_HASH] + // ChainID returns the chain ID that the head is for + ChainID() CHAIN_ID + // Returns true if the head has a chain Id + HasChainID() bool + // IsValid returns true if the head is valid. + IsValid() bool +} diff --git a/pkg/headtracker/types/mocks/client.go b/pkg/headtracker/types/mocks/client.go new file mode 100644 index 000000000..af882bf9e --- /dev/null +++ b/pkg/headtracker/types/mocks/client.go @@ -0,0 +1,94 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + big "math/big" + + mock "github.com/stretchr/testify/mock" + + types "github.com/smartcontractkit/chainlink-relay/pkg/types" +) + +// Client is an autogenerated mock type for the Client type +type Client[H types.Head[BLOCK_HASH], S types.Subscription, ID types.ID, BLOCK_HASH types.Hashable] struct { + mock.Mock +} + +// ConfiguredChainID provides a mock function with given fields: +func (_m *Client[H, S, ID, BLOCK_HASH]) ConfiguredChainID() ID { + ret := _m.Called() + + var r0 ID + if rf, ok := ret.Get(0).(func() ID); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(ID) + } + + return r0 +} + +// HeadByNumber provides a mock function with given fields: ctx, number +func (_m *Client[H, S, ID, BLOCK_HASH]) HeadByNumber(ctx context.Context, number *big.Int) (H, error) { + ret := _m.Called(ctx, number) + + var r0 H + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (H, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) H); ok { + r0 = rf(ctx, number) + } else { + r0 = ret.Get(0).(H) + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SubscribeNewHead provides a mock function with given fields: ctx, ch +func (_m *Client[H, S, ID, BLOCK_HASH]) SubscribeNewHead(ctx context.Context, ch chan<- H) (S, error) { + ret := _m.Called(ctx, ch) + + var r0 S + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, chan<- H) (S, error)); ok { + return rf(ctx, ch) + } + if rf, ok := ret.Get(0).(func(context.Context, chan<- H) S); ok { + r0 = rf(ctx, ch) + } else { + r0 = ret.Get(0).(S) + } + + if rf, ok := ret.Get(1).(func(context.Context, chan<- H) error); ok { + r1 = rf(ctx, ch) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewClient interface { + mock.TestingT + Cleanup(func()) +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewClient[H types.Head[BLOCK_HASH], S types.Subscription, ID types.ID, BLOCK_HASH types.Hashable](t mockConstructorTestingTNewClient) *Client[H, S, ID, BLOCK_HASH] { + mock := &Client[H, S, ID, BLOCK_HASH]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/headtracker/types/mocks/head.go b/pkg/headtracker/types/mocks/head.go new file mode 100644 index 000000000..54f0fad28 --- /dev/null +++ b/pkg/headtracker/types/mocks/head.go @@ -0,0 +1,172 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + types "github.com/smartcontractkit/chainlink-relay/pkg/types" + mock "github.com/stretchr/testify/mock" +) + +// Head is an autogenerated mock type for the Head type +type Head[BLOCK_HASH types.Hashable, CHAIN_ID types.ID] struct { + mock.Mock +} + +// BlockHash provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) BlockHash() BLOCK_HASH { + ret := _m.Called() + + var r0 BLOCK_HASH + if rf, ok := ret.Get(0).(func() BLOCK_HASH); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + return r0 +} + +// BlockNumber provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) BlockNumber() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// ChainID provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) ChainID() CHAIN_ID { + ret := _m.Called() + + var r0 CHAIN_ID + if rf, ok := ret.Get(0).(func() CHAIN_ID); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(CHAIN_ID) + } + + return r0 +} + +// ChainLength provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) ChainLength() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + +// EarliestHeadInChain provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) EarliestHeadInChain() types.Head[BLOCK_HASH] { + ret := _m.Called() + + var r0 types.Head[BLOCK_HASH] + if rf, ok := ret.Get(0).(func() types.Head[BLOCK_HASH]); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.Head[BLOCK_HASH]) + } + } + + return r0 +} + +// GetParent provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) GetParent() types.Head[BLOCK_HASH] { + ret := _m.Called() + + var r0 types.Head[BLOCK_HASH] + if rf, ok := ret.Get(0).(func() types.Head[BLOCK_HASH]); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.Head[BLOCK_HASH]) + } + } + + return r0 +} + +// GetParentHash provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) GetParentHash() BLOCK_HASH { + ret := _m.Called() + + var r0 BLOCK_HASH + if rf, ok := ret.Get(0).(func() BLOCK_HASH); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + return r0 +} + +// HasChainID provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) HasChainID() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// HashAtHeight provides a mock function with given fields: blockNum +func (_m *Head[BLOCK_HASH, CHAIN_ID]) HashAtHeight(blockNum int64) BLOCK_HASH { + ret := _m.Called(blockNum) + + var r0 BLOCK_HASH + if rf, ok := ret.Get(0).(func(int64) BLOCK_HASH); ok { + r0 = rf(blockNum) + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + return r0 +} + +// IsValid provides a mock function with given fields: +func (_m *Head[BLOCK_HASH, CHAIN_ID]) IsValid() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +type mockConstructorTestingTNewHead interface { + mock.TestingT + Cleanup(func()) +} + +// NewHead creates a new instance of Head. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewHead[BLOCK_HASH types.Hashable, CHAIN_ID types.ID](t mockConstructorTestingTNewHead) *Head[BLOCK_HASH, CHAIN_ID] { + mock := &Head[BLOCK_HASH, CHAIN_ID]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/mocks/head_broadcaster.go b/pkg/mocks/head_broadcaster.go new file mode 100644 index 000000000..278cf5294 --- /dev/null +++ b/pkg/mocks/head_broadcaster.go @@ -0,0 +1,133 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + types "github.com/smartcontractkit/chainlink-relay/pkg/types" + mock "github.com/stretchr/testify/mock" +) + +// HeadBroadcaster is an autogenerated mock type for the HeadBroadcaster type +type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct { + mock.Mock +} + +// BroadcastNewLongestChain provides a mock function with given fields: _a0 +func (_m *HeadBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(_a0 H) { + _m.Called(_a0) +} + +// Close provides a mock function with given fields: +func (_m *HeadBroadcaster[H, BLOCK_HASH]) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// HealthReport provides a mock function with given fields: +func (_m *HeadBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error { + ret := _m.Called() + + var r0 map[string]error + if rf, ok := ret.Get(0).(func() map[string]error); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]error) + } + } + + return r0 +} + +// Name provides a mock function with given fields: +func (_m *HeadBroadcaster[H, BLOCK_HASH]) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Ready provides a mock function with given fields: +func (_m *HeadBroadcaster[H, BLOCK_HASH]) Ready() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: _a0 +func (_m *HeadBroadcaster[H, BLOCK_HASH]) Start(_a0 context.Context) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Subscribe provides a mock function with given fields: callback +func (_m *HeadBroadcaster[H, BLOCK_HASH]) Subscribe(callback types.HeadTrackable[H, BLOCK_HASH]) (H, func()) { + ret := _m.Called(callback) + + var r0 H + var r1 func() + if rf, ok := ret.Get(0).(func(types.HeadTrackable[H, BLOCK_HASH]) (H, func())); ok { + return rf(callback) + } + if rf, ok := ret.Get(0).(func(types.HeadTrackable[H, BLOCK_HASH]) H); ok { + r0 = rf(callback) + } else { + r0 = ret.Get(0).(H) + } + + if rf, ok := ret.Get(1).(func(types.HeadTrackable[H, BLOCK_HASH]) func()); ok { + r1 = rf(callback) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(func()) + } + } + + return r0, r1 +} + +type mockConstructorTestingTNewHeadBroadcaster interface { + mock.TestingT + Cleanup(func()) +} + +// NewHeadBroadcaster creates a new instance of HeadBroadcaster. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewHeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable](t mockConstructorTestingTNewHeadBroadcaster) *HeadBroadcaster[H, BLOCK_HASH] { + mock := &HeadBroadcaster[H, BLOCK_HASH]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/mocks/head_tracker.go b/pkg/mocks/head_tracker.go new file mode 100644 index 000000000..9d48deac4 --- /dev/null +++ b/pkg/mocks/head_tracker.go @@ -0,0 +1,130 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + types "github.com/smartcontractkit/chainlink-relay/pkg/types" + mock "github.com/stretchr/testify/mock" +) + +// HeadTracker is an autogenerated mock type for the HeadTracker type +type HeadTracker[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct { + mock.Mock +} + +// Backfill provides a mock function with given fields: ctx, headWithChain, depth +func (_m *HeadTracker[H, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain H, depth uint) error { + ret := _m.Called(ctx, headWithChain, depth) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, H, uint) error); ok { + r0 = rf(ctx, headWithChain, depth) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Close provides a mock function with given fields: +func (_m *HeadTracker[H, BLOCK_HASH]) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// HealthReport provides a mock function with given fields: +func (_m *HeadTracker[H, BLOCK_HASH]) HealthReport() map[string]error { + ret := _m.Called() + + var r0 map[string]error + if rf, ok := ret.Get(0).(func() map[string]error); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]error) + } + } + + return r0 +} + +// LatestChain provides a mock function with given fields: +func (_m *HeadTracker[H, BLOCK_HASH]) LatestChain() H { + ret := _m.Called() + + var r0 H + if rf, ok := ret.Get(0).(func() H); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(H) + } + + return r0 +} + +// Name provides a mock function with given fields: +func (_m *HeadTracker[H, BLOCK_HASH]) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Ready provides a mock function with given fields: +func (_m *HeadTracker[H, BLOCK_HASH]) Ready() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: _a0 +func (_m *HeadTracker[H, BLOCK_HASH]) Start(_a0 context.Context) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewHeadTracker interface { + mock.TestingT + Cleanup(func()) +} + +// NewHeadTracker creates a new instance of HeadTracker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewHeadTracker[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable](t mockConstructorTestingTNewHeadTracker) *HeadTracker[H, BLOCK_HASH] { + mock := &HeadTracker[H, BLOCK_HASH]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/types/chain.go b/pkg/types/chain.go index d00e16921..2d705af40 100644 --- a/pkg/types/chain.go +++ b/pkg/types/chain.go @@ -2,6 +2,7 @@ package types import ( "context" + "fmt" "math/big" ) @@ -36,3 +37,7 @@ type NodeStatus struct { Config string // TOML State string } + +// ID represents the base type, for any chain's ID. +// It should be convertible to a string, that can uniquely identify this chain +type ID fmt.Stringer diff --git a/pkg/types/hashable.go b/pkg/types/hashable.go new file mode 100644 index 000000000..2d166505b --- /dev/null +++ b/pkg/types/hashable.go @@ -0,0 +1,12 @@ +package types + +import "fmt" + +// A chain-agnostic generic interface to represent the following native types on various chains: +// PublicKey, Address, Account, BlockHash, TxHash +type Hashable interface { + fmt.Stringer + comparable + + Bytes() []byte +} diff --git a/pkg/types/head.go b/pkg/types/head.go new file mode 100644 index 000000000..4d339b1cd --- /dev/null +++ b/pkg/types/head.go @@ -0,0 +1,27 @@ +package types + +// Head provides access to a chain's head, as needed by the TxManager. +// This is a generic interface which ALL chains will implement. +// +//go:generate mockery --quiet --name Head --output ./mocks/ --case=underscore +type Head[BLOCK_HASH Hashable] interface { + // BlockNumber is the head's block number + BlockNumber() int64 + + // ChainLength returns the length of the chain followed by recursively looking up parents + ChainLength() uint32 + + // EarliestHeadInChain traverses through parents until it finds the earliest one + EarliestHeadInChain() Head[BLOCK_HASH] + + // Parent is the head's parent block + GetParent() Head[BLOCK_HASH] + + // Hash is the head's block hash + BlockHash() BLOCK_HASH + GetParentHash() BLOCK_HASH + + // HashAtHeight returns the hash of the block at the given height, if it is in the chain. + // If not in chain, returns the zero hash + HashAtHeight(blockNum int64) BLOCK_HASH +} diff --git a/pkg/types/head_tracker.go b/pkg/types/head_tracker.go new file mode 100644 index 000000000..75354ceeb --- /dev/null +++ b/pkg/types/head_tracker.go @@ -0,0 +1,73 @@ +package types + +import ( + "context" +) + +// HeadTracker holds and stores the block experienced by a particular node in a thread safe manner. +// Reconstitutes the last block number on reboot. +// +//go:generate mockery --quiet --name HeadTracker --output ../mocks/ --case=underscore +type HeadTracker[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { + Service + // Backfill given a head will fill in any missing heads up to the given depth + // (used for testing) + Backfill(ctx context.Context, headWithChain H, depth uint) (err error) + LatestChain() H +} + +// HeadTrackable is implemented by the core txm, +// to be able to receive head events from any chain. +// Chain implementations should notify head events to the core txm via this interface. +// +//go:generate mockery --quiet --name HeadTrackable --output ./mocks/ --case=underscore +type HeadTrackable[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { + OnNewLongestChain(ctx context.Context, head H) +} + +// HeadSaver is an chain agnostic interface for saving and loading heads +// Different chains will instantiate generic HeadSaver type with their native Head and BlockHash types. +type HeadSaver[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { + // Save updates the latest block number, if indeed the latest, and persists + // this number in case of reboot. + Save(ctx context.Context, head H) error + // Load loads latest EvmHeadTrackerHistoryDepth heads, returns the latest chain. + Load(ctx context.Context) (H, error) + // LatestChain returns the block header with the highest number that has been seen, or nil. + LatestChain() H + // Chain returns a head for the specified hash, or nil. + Chain(hash BLOCK_HASH) H +} + +// HeadListener is a chain agnostic interface that manages connection of Client that receives heads from the blockchain node +type HeadListener[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { + // ListenForNewHeads kicks off the listen loop (not thread safe) + // done() must be executed upon leaving ListenForNewHeads() + ListenForNewHeads(handleNewHead NewHeadHandler[H, BLOCK_HASH], done func()) + + // ReceivingHeads returns true if the listener is receiving heads (thread safe) + ReceivingHeads() bool + + // Connected returns true if the listener is connected (thread safe) + Connected() bool + + // HealthReport returns report of errors within HeadListener + HealthReport() map[string]error +} + +// NewHeadHandler is a callback that handles incoming heads +type NewHeadHandler[H Head[BLOCK_HASH], BLOCK_HASH Hashable] func(ctx context.Context, header H) error + +// HeadBroadcaster relays new Heads to all subscribers. +// +//go:generate mockery --quiet --name HeadBroadcaster --output ../mocks/ --case=underscore +type HeadBroadcaster[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { + Service + BroadcastNewLongestChain(H) + HeadBroadcasterRegistry[H, BLOCK_HASH] +} + +//go:generate mockery --quiet --name HeadBroadcaster --output ../mocks/ --case=underscore +type HeadBroadcasterRegistry[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { + Subscribe(callback HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) +} diff --git a/pkg/types/mocks/head.go b/pkg/types/mocks/head.go new file mode 100644 index 000000000..5cdc2ba09 --- /dev/null +++ b/pkg/types/mocks/head.go @@ -0,0 +1,130 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + types "github.com/smartcontractkit/chainlink-relay/pkg/types" + mock "github.com/stretchr/testify/mock" +) + +// Head is an autogenerated mock type for the Head type +type Head[BLOCK_HASH types.Hashable] struct { + mock.Mock +} + +// BlockHash provides a mock function with given fields: +func (_m *Head[BLOCK_HASH]) BlockHash() BLOCK_HASH { + ret := _m.Called() + + var r0 BLOCK_HASH + if rf, ok := ret.Get(0).(func() BLOCK_HASH); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + return r0 +} + +// BlockNumber provides a mock function with given fields: +func (_m *Head[BLOCK_HASH]) BlockNumber() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// ChainLength provides a mock function with given fields: +func (_m *Head[BLOCK_HASH]) ChainLength() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + +// EarliestHeadInChain provides a mock function with given fields: +func (_m *Head[BLOCK_HASH]) EarliestHeadInChain() types.Head[BLOCK_HASH] { + ret := _m.Called() + + var r0 types.Head[BLOCK_HASH] + if rf, ok := ret.Get(0).(func() types.Head[BLOCK_HASH]); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.Head[BLOCK_HASH]) + } + } + + return r0 +} + +// GetParent provides a mock function with given fields: +func (_m *Head[BLOCK_HASH]) GetParent() types.Head[BLOCK_HASH] { + ret := _m.Called() + + var r0 types.Head[BLOCK_HASH] + if rf, ok := ret.Get(0).(func() types.Head[BLOCK_HASH]); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.Head[BLOCK_HASH]) + } + } + + return r0 +} + +// GetParentHash provides a mock function with given fields: +func (_m *Head[BLOCK_HASH]) GetParentHash() BLOCK_HASH { + ret := _m.Called() + + var r0 BLOCK_HASH + if rf, ok := ret.Get(0).(func() BLOCK_HASH); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + return r0 +} + +// HashAtHeight provides a mock function with given fields: blockNum +func (_m *Head[BLOCK_HASH]) HashAtHeight(blockNum int64) BLOCK_HASH { + ret := _m.Called(blockNum) + + var r0 BLOCK_HASH + if rf, ok := ret.Get(0).(func(int64) BLOCK_HASH); ok { + r0 = rf(blockNum) + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + return r0 +} + +type mockConstructorTestingTNewHead interface { + mock.TestingT + Cleanup(func()) +} + +// NewHead creates a new instance of Head. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewHead[BLOCK_HASH types.Hashable](t mockConstructorTestingTNewHead) *Head[BLOCK_HASH] { + mock := &Head[BLOCK_HASH]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/types/mocks/head_trackable.go b/pkg/types/mocks/head_trackable.go new file mode 100644 index 000000000..571611968 --- /dev/null +++ b/pkg/types/mocks/head_trackable.go @@ -0,0 +1,35 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + types "github.com/smartcontractkit/chainlink-relay/pkg/types" + mock "github.com/stretchr/testify/mock" +) + +// HeadTrackable is an autogenerated mock type for the HeadTrackable type +type HeadTrackable[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct { + mock.Mock +} + +// OnNewLongestChain provides a mock function with given fields: ctx, head +func (_m *HeadTrackable[H, BLOCK_HASH]) OnNewLongestChain(ctx context.Context, head H) { + _m.Called(ctx, head) +} + +type mockConstructorTestingTNewHeadTrackable interface { + mock.TestingT + Cleanup(func()) +} + +// NewHeadTrackable creates a new instance of HeadTrackable. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewHeadTrackable[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable](t mockConstructorTestingTNewHeadTrackable) *HeadTrackable[H, BLOCK_HASH] { + mock := &HeadTrackable[H, BLOCK_HASH]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/types/mocks/subscription.go b/pkg/types/mocks/subscription.go new file mode 100644 index 000000000..b9cb7886d --- /dev/null +++ b/pkg/types/mocks/subscription.go @@ -0,0 +1,46 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Subscription is an autogenerated mock type for the Subscription type +type Subscription struct { + mock.Mock +} + +// Err provides a mock function with given fields: +func (_m *Subscription) Err() <-chan error { + ret := _m.Called() + + var r0 <-chan error + if rf, ok := ret.Get(0).(func() <-chan error); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan error) + } + } + + return r0 +} + +// Unsubscribe provides a mock function with given fields: +func (_m *Subscription) Unsubscribe() { + _m.Called() +} + +type mockConstructorTestingTNewSubscription interface { + mock.TestingT + Cleanup(func()) +} + +// NewSubscription creates a new instance of Subscription. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewSubscription(t mockConstructorTestingTNewSubscription) *Subscription { + mock := &Subscription{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/types/subscription.go b/pkg/types/subscription.go new file mode 100644 index 000000000..99247107b --- /dev/null +++ b/pkg/types/subscription.go @@ -0,0 +1,17 @@ +package types + +// Subscription represents an event subscription where events are +// delivered on a data channel. +// This is a generic interface for Subscription to represent used by clients. + +//go:generate mockery --quiet --name Subscription --output ./mocks/ --case=underscore +type Subscription interface { + // Unsubscribe cancels the sending of events to the data channel + // and closes the error channel. + Unsubscribe() + // Err returns the subscription error channel. The error channel receives + // a value if there is an issue with the subscription (e.g. the network connection + // delivering the events has been closed). Only one value will ever be sent. + // The error channel is closed by Unsubscribe. + Err() <-chan error +} diff --git a/pkg/utils/error_buffer.go b/pkg/utils/error_buffer.go new file mode 100644 index 000000000..9f4372188 --- /dev/null +++ b/pkg/utils/error_buffer.go @@ -0,0 +1,47 @@ +package utils + +import ( + "errors" + "sync" +) + +// ErrorBuffer uses joinedErrors interface to join multiple errors into a single error. +// This is useful to track the most recent N errors in a service and flush them as a single error. +type ErrorBuffer struct { + // buffer is a slice of errors + buffer []error + + // cap is the maximum number of errors that the buffer can hold. + // Exceeding the cap results in discarding the oldest error + cap int + + mu sync.RWMutex +} + +func (eb *ErrorBuffer) Flush() (err error) { + eb.mu.RLock() + defer eb.mu.RUnlock() + err = errors.Join(eb.buffer...) + eb.buffer = nil + return +} + +func (eb *ErrorBuffer) Append(incoming error) { + eb.mu.Lock() + defer eb.mu.Unlock() + + if len(eb.buffer) == eb.cap && eb.cap != 0 { + eb.buffer = append(eb.buffer[1:], incoming) + return + } + eb.buffer = append(eb.buffer, incoming) +} + +func (eb *ErrorBuffer) SetCap(cap int) { + eb.mu.Lock() + defer eb.mu.Unlock() + if len(eb.buffer) > cap { + eb.buffer = eb.buffer[len(eb.buffer)-cap:] + } + eb.cap = cap +} diff --git a/pkg/utils/error_buffer_test.go b/pkg/utils/error_buffer_test.go new file mode 100644 index 000000000..bc676f077 --- /dev/null +++ b/pkg/utils/error_buffer_test.go @@ -0,0 +1,74 @@ +package utils_test + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-relay/pkg/utils" +) + +func TestErrorBuffer(t *testing.T) { + t.Parallel() + + err1 := errors.New("err1") + err2 := errors.New("err2") + err3 := errors.New("err3") + + t.Run("happy path", func(t *testing.T) { + t.Parallel() + buff := utils.ErrorBuffer{} + buff.Append(err1) + buff.Append(err2) + combined := buff.Flush() + errs := utils.UnwrapError(combined) + assert.Equal(t, 2, len(errs)) + assert.Equal(t, err1.Error(), errs[0].Error()) + assert.Equal(t, err2.Error(), errs[1].Error()) + }) + + t.Run("ovewrite oldest error when cap exceeded", func(t *testing.T) { + t.Parallel() + buff := utils.ErrorBuffer{} + buff.SetCap(2) + buff.Append(err1) + buff.Append(err2) + buff.Append(err3) + combined := buff.Flush() + errs := utils.UnwrapError(combined) + assert.Equal(t, 2, len(errs)) + assert.Equal(t, err2.Error(), errs[0].Error()) + assert.Equal(t, err3.Error(), errs[1].Error()) + }) + + t.Run("does not overwrite the buffer if cap == 0", func(t *testing.T) { + t.Parallel() + buff := utils.ErrorBuffer{} + for i := 1; i <= 20; i++ { + buff.Append(errors.Errorf("err#%d", i)) + } + + combined := buff.Flush() + errs := utils.UnwrapError(combined) + assert.Equal(t, 20, len(errs)) + assert.Equal(t, "err#20", errs[19].Error()) + }) + + t.Run("UnwrapError returns the a single element err array if passed err is not a joinedError", func(t *testing.T) { + t.Parallel() + errs := utils.UnwrapError(err1) + assert.Equal(t, 1, len(errs)) + assert.Equal(t, err1.Error(), errs[0].Error()) + }) + + t.Run("flushing an empty err buffer is a nil error", func(t *testing.T) { + t.Parallel() + buff := utils.ErrorBuffer{} + + combined := buff.Flush() + require.Nil(t, combined) + }) + +} diff --git a/pkg/utils/mailbox.go b/pkg/utils/mailbox.go new file mode 100644 index 000000000..87fe1627f --- /dev/null +++ b/pkg/utils/mailbox.go @@ -0,0 +1,126 @@ +package utils + +import ( + "sync" + "sync/atomic" +) + +// Mailbox contains a notify channel, +// a mutual exclusive lock, +// a queue of interfaces, +// and a queue capacity. +type Mailbox[T any] struct { + mu sync.Mutex + chNotify chan struct{} + queue []T + queueLen atomic.Int64 // atomic so monitor can read w/o blocking the queue + + // capacity - number of items the mailbox can buffer + // NOTE: if the capacity is 1, it's possible that an empty Retrieve may occur after a notification. + capacity uint64 + // onCloseFn is a hook used to stop monitoring, if non-nil + onCloseFn func() +} + +// NewHighCapacityMailbox create a new mailbox with a capacity +// that is better able to handle e.g. large log replays. +func NewHighCapacityMailbox[T any]() *Mailbox[T] { + return NewMailbox[T](100_000) +} + +// NewSingleMailbox returns a new Mailbox with capacity one. +func NewSingleMailbox[T any]() *Mailbox[T] { return NewMailbox[T](1) } + +// NewMailbox creates a new mailbox instance. If name is non-empty, it must be unique and calling Start will launch +// prometheus metric monitor that periodically reports mailbox load until Close() is called. +func NewMailbox[T any](capacity uint64) *Mailbox[T] { + queueCap := capacity + if queueCap == 0 { + queueCap = 100 + } + return &Mailbox[T]{ + chNotify: make(chan struct{}, 1), + queue: make([]T, 0, queueCap), + capacity: capacity, + } +} + +// Notify returns the contents of the notify channel +func (m *Mailbox[T]) Notify() <-chan struct{} { + return m.chNotify +} + +func (m *Mailbox[T]) Close() error { + if m.onCloseFn != nil { + m.onCloseFn() + } + return nil +} + +func (m *Mailbox[T]) onClose(fn func()) { m.onCloseFn = fn } + +func (m *Mailbox[T]) load() (capacity uint64, loadPercent float64) { + capacity = m.capacity + loadPercent = 100 * float64(m.queueLen.Load()) / float64(capacity) + return +} + +// Deliver appends to the queue and returns true if the queue was full, causing a message to be dropped. +func (m *Mailbox[T]) Deliver(x T) (wasOverCapacity bool) { + m.mu.Lock() + defer m.mu.Unlock() + + m.queue = append([]T{x}, m.queue...) + if uint64(len(m.queue)) > m.capacity && m.capacity > 0 { + m.queue = m.queue[:len(m.queue)-1] + wasOverCapacity = true + } else { + m.queueLen.Add(1) + } + + select { + case m.chNotify <- struct{}{}: + default: + } + return +} + +// Retrieve fetches one element from the queue. +func (m *Mailbox[T]) Retrieve() (t T, ok bool) { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.queue) == 0 { + return + } + t = m.queue[len(m.queue)-1] + m.queue = m.queue[:len(m.queue)-1] + m.queueLen.Add(-1) + ok = true + return +} + +// RetrieveAll fetches all elements from the queue. +func (m *Mailbox[T]) RetrieveAll() []T { + m.mu.Lock() + defer m.mu.Unlock() + queue := m.queue + m.queue = nil + m.queueLen.Store(0) + for i, j := 0, len(queue)-1; i < j; i, j = i+1, j-1 { + queue[i], queue[j] = queue[j], queue[i] + } + return queue +} + +// RetrieveLatestAndClear fetch the latest value (or nil), and clears the rest of the queue (if any). +func (m *Mailbox[T]) RetrieveLatestAndClear() (t T) { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.queue) == 0 { + return + } + t = m.queue[0] + m.queue = nil + m.queueLen.Store(0) + return +} diff --git a/pkg/utils/mailbox_prom.go b/pkg/utils/mailbox_prom.go new file mode 100644 index 000000000..30bb707a2 --- /dev/null +++ b/pkg/utils/mailbox_prom.go @@ -0,0 +1,91 @@ +package utils + +import ( + "context" + "strconv" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var mailboxLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "mailbox_load_percent", + Help: "Percent of mailbox capacity used", +}, + []string{"appID", "name", "capacity"}, +) + +const mailboxPromInterval = 5 * time.Second + +type MailboxMonitor struct { + StartStopOnce + appID string + + mailboxes sync.Map + stop func() + done chan struct{} +} + +func NewMailboxMonitor(appID string) *MailboxMonitor { + return &MailboxMonitor{appID: appID} +} + +func (m *MailboxMonitor) Name() string { return "MailboxMonitor" } + +func (m *MailboxMonitor) Start(context.Context) error { + return m.StartOnce("MailboxMonitor", func() error { + t := time.NewTicker(WithJitter(mailboxPromInterval)) + ctx, cancel := context.WithCancel(context.Background()) + m.stop = func() { + t.Stop() + cancel() + } + m.done = make(chan struct{}) + go m.monitorLoop(ctx, t.C) + return nil + }) +} + +func (m *MailboxMonitor) Close() error { + return m.StopOnce("MailboxMonitor", func() error { + m.stop() + <-m.done + return nil + }) +} + +func (m *MailboxMonitor) HealthReport() map[string]error { + return map[string]error{m.Name(): m.StartStopOnce.Healthy()} +} + +func (m *MailboxMonitor) monitorLoop(ctx context.Context, c <-chan time.Time) { + defer close(m.done) + for { + select { + case <-ctx.Done(): + return + case <-c: + m.mailboxes.Range(func(k, v any) bool { + name, mb := k.(string), v.(mailbox) + c, p := mb.load() + capacity := strconv.FormatUint(c, 10) + mailboxLoad.WithLabelValues(m.appID, name, capacity).Set(p) + return true + }) + } + } +} + +type mailbox interface { + load() (capacity uint64, percent float64) + onClose(func()) +} + +func (m *MailboxMonitor) Monitor(mb mailbox, name ...string) { + n := strings.Join(name, ".") + m.mailboxes.Store(n, mb) + mb.onClose(func() { m.mailboxes.Delete(n) }) +} diff --git a/pkg/utils/mailbox_test.go b/pkg/utils/mailbox_test.go new file mode 100644 index 000000000..c83d0035b --- /dev/null +++ b/pkg/utils/mailbox_test.go @@ -0,0 +1,181 @@ +package utils + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMailbox(t *testing.T) { + var ( + expected = []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11} + toDeliver = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} + ) + + const capacity = 10 + m := NewMailbox[int](capacity) + + // Queue deliveries + for i, d := range toDeliver { + atCapacity := m.Deliver(d) + if atCapacity && i < capacity { + t.Errorf("mailbox at capacity %d", i) + } else if !atCapacity && i >= capacity { + t.Errorf("mailbox below capacity %d", i) + } + } + + // Retrieve them + var recvd []int + chDone := make(chan struct{}) + go func() { + defer close(chDone) + for range m.Notify() { + for { + x, exists := m.Retrieve() + if !exists { + break + } + recvd = append(recvd, x) + } + } + }() + + close(m.chNotify) + <-chDone + + require.Equal(t, expected, recvd) +} + +func TestMailbox_RetrieveAll(t *testing.T) { + var ( + expected = []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11} + toDeliver = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} + ) + + const capacity = 10 + m := NewMailbox[int](capacity) + + // Queue deliveries + for i, d := range toDeliver { + atCapacity := m.Deliver(d) + if atCapacity && i < capacity { + t.Errorf("mailbox at capacity %d", i) + } else if !atCapacity && i >= capacity { + t.Errorf("mailbox below capacity %d", i) + } + } + + require.Equal(t, expected, m.RetrieveAll()) +} + +func TestMailbox_RetrieveLatestAndClear(t *testing.T) { + var ( + expected = 11 + toDeliver = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} + ) + + const capacity = 10 + m := NewMailbox[int](capacity) + + // Queue deliveries + for i, d := range toDeliver { + atCapacity := m.Deliver(d) + if atCapacity && i < capacity { + t.Errorf("mailbox at capacity %d", i) + } else if !atCapacity && i >= capacity { + t.Errorf("mailbox below capacity %d", i) + } + } + + require.Equal(t, expected, m.RetrieveLatestAndClear()) + require.Len(t, m.RetrieveAll(), 0) +} + +func TestMailbox_NoEmptyReceivesWhenCapacityIsTwo(t *testing.T) { + m := NewMailbox[int](2) + + var ( + recvd []int + emptyReceives []int + ) + + chDone := make(chan struct{}) + go func() { + defer close(chDone) + for range m.Notify() { + x, exists := m.Retrieve() + if !exists { + emptyReceives = append(emptyReceives, recvd[len(recvd)-1]) + } else { + recvd = append(recvd, x) + } + } + }() + + for i := 0; i < 100000; i++ { + m.Deliver(i) + } + close(m.chNotify) + + <-chDone + require.Len(t, emptyReceives, 0) +} + +func TestMailbox_load(t *testing.T) { + for _, tt := range []struct { + name string + capacity uint64 + deliver []int + exp float64 + + retrieve int + exp2 float64 + + all bool + }{ + {"single-all", 1, []int{1}, 100, 0, 100, true}, + {"single-latest", 1, []int{1}, 100, 0, 100, false}, + {"ten-low", 10, []int{1}, 10, 1, 0.0, false}, + {"ten-full-all", 10, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 100, 5, 50, true}, + {"ten-full-latest", 10, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 100, 5, 50, false}, + {"ten-overflow", 10, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 100, 5, 50, false}, + {"nine", 9, []int{1, 2, 3}, 100.0 / 3.0, 2, 100.0 / 9.0, true}, + } { + t.Run(tt.name, func(t *testing.T) { + m := NewMailbox[int](tt.capacity) + + // Queue deliveries + for i, d := range tt.deliver { + atCapacity := m.Deliver(d) + if atCapacity && i < int(tt.capacity) { + t.Errorf("mailbox at capacity %d", i) + } else if !atCapacity && i >= int(tt.capacity) { + t.Errorf("mailbox below capacity %d", i) + } + } + gotCap, gotLoad := m.load() + require.Equal(t, gotCap, tt.capacity) + require.Equal(t, gotLoad, tt.exp) + + // Retrieve some + for i := 0; i < tt.retrieve; i++ { + _, ok := m.Retrieve() + require.True(t, ok) + } + gotCap, gotLoad = m.load() + require.Equal(t, gotCap, tt.capacity) + require.Equal(t, gotLoad, tt.exp2) + + // Drain it + if tt.all { + m.RetrieveAll() + } else { + m.RetrieveLatestAndClear() + } + gotCap, gotLoad = m.load() + require.Equal(t, gotCap, tt.capacity) + require.Equal(t, gotLoad, 0.0) + }) + } +} diff --git a/pkg/utils/start_stop_once.go b/pkg/utils/start_stop_once.go index 68a4f75c8..ca0d89d54 100644 --- a/pkg/utils/start_stop_once.go +++ b/pkg/utils/start_stop_once.go @@ -48,6 +48,9 @@ func (s startStopOnceState) String() string { type StartStopOnce struct { state atomic.Int32 sync.RWMutex // lock is held during startup/shutdown, RLock is held while executing functions dependent on a particular state + + // SvcErrBuffer is an ErrorBuffer that let service owners track critical errors happening in the service. + SvcErrBuffer ErrorBuffer } // StartOnce sets the state to Started diff --git a/pkg/utils/stop_chan.go b/pkg/utils/stop_chan.go new file mode 100644 index 000000000..0a3b6a513 --- /dev/null +++ b/pkg/utils/stop_chan.go @@ -0,0 +1,49 @@ +package utils + +import "context" + +// A StopChan signals when some work should stop. +type StopChan chan struct{} + +// NewCtx returns a background [context.Context] that is cancelled when StopChan is closed. +func (s StopChan) NewCtx() (context.Context, context.CancelFunc) { + return StopRChan((<-chan struct{})(s)).NewCtx() +} + +// Ctx cancels a [context.Context] when StopChan is closed. +func (s StopChan) Ctx(ctx context.Context) (context.Context, context.CancelFunc) { + return StopRChan((<-chan struct{})(s)).Ctx(ctx) +} + +// CtxCancel cancels a [context.Context] when StopChan is closed. +// Returns ctx and cancel unmodified, for convenience. +func (s StopChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc) { + return StopRChan((<-chan struct{})(s)).CtxCancel(ctx, cancel) +} + +// A StopRChan signals when some work should stop. +// This version is receive-only. +type StopRChan <-chan struct{} + +// NewCtx returns a background [context.Context] that is cancelled when StopChan is closed. +func (s StopRChan) NewCtx() (context.Context, context.CancelFunc) { + return s.Ctx(context.Background()) +} + +// Ctx cancels a [context.Context] when StopChan is closed. +func (s StopRChan) Ctx(ctx context.Context) (context.Context, context.CancelFunc) { + return s.CtxCancel(context.WithCancel(ctx)) +} + +// CtxCancel cancels a [context.Context] when StopChan is closed. +// Returns ctx and cancel unmodified, for convenience. +func (s StopRChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc) { + go func() { + select { + case <-s: + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} diff --git a/pkg/utils/stop_chan_test.go b/pkg/utils/stop_chan_test.go new file mode 100644 index 000000000..3bb0ef01c --- /dev/null +++ b/pkg/utils/stop_chan_test.go @@ -0,0 +1,47 @@ +package utils_test + +import ( + "context" + "testing" + "time" + + "github.com/smartcontractkit/chainlink-relay/pkg/utils" +) + +func TestStopChan_NewCtx(t *testing.T) { + sc := make(utils.StopChan) + + ctx, cancel := sc.NewCtx() + defer cancel() + + go func() { + close(sc) + }() + + select { + case <-time.After(1 * time.Second): + t.Fatal("context should be cancelled when StopChan is closed") + case <-ctx.Done(): + } +} + +func TestStopChan_CtxCancel(t *testing.T) { + stopChan := make(utils.StopChan) + originalCtx, originalCancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := stopChan.CtxCancel(originalCtx, originalCancel) + defer cancel() + + if ctx != originalCtx { + t.Fatal("expected ctx to be the same as originalCtx but it wasn't") + } + + go func() { + close(stopChan) + }() + + select { + case <-ctx.Done(): + case <-time.After(1 * time.Second): + t.Fatal("expected ctx to be cancelled but it wasn't") + } +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index aed521c62..361437376 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -5,6 +5,8 @@ import ( "math" mrand "math/rand" "time" + + "github.com/jpillora/backoff" ) // WithJitter adds +/- 10% to a duration @@ -50,3 +52,25 @@ func ContextWithDeadlineFn(ctx context.Context, deadlineFn func(orig time.Time) } return ctx, cancel } + +// NewRedialBackoff is a standard backoff to use for redialling or reconnecting to +// unreachable network endpoints +func NewRedialBackoff() backoff.Backoff { + return backoff.Backoff{ + Min: 1 * time.Second, + Max: 15 * time.Second, + Jitter: true, + } + +} + +// UnwrapError returns a list of underlying errors if passed error implements joinedError or return the err in a single-element list otherwise. +// +//nolint:errorlint // error type checks will fail on wrapped errors. Disabled since we are not doing checks on error types. +func UnwrapError(err error) []error { + joined, ok := err.(interface{ Unwrap() []error }) + if !ok { + return []error{err} + } + return joined.Unwrap() +}