Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Generalised Headtracker Components #123

Closed
wants to merge 16 commits into from
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# empty config to prevent info/debug level log spam
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ godoc:
go install golang.org/x/tools/cmd/godoc@latest
# http://localhost:6060/pkg/github.com/smartcontractkit/chainlink-relay/
godoc -http=:6060

.PHONY: mockery
mockery: $(mockery) ## Install mockery.
go install github.com/vektra/mockery/[email protected]
163 changes: 163 additions & 0 deletions pkg/headtracker/head_broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package headtracker

import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

const TrackableCallbackTimeout = 2 * time.Second

type callbackSet[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] map[int]types.HeadTrackable[H, BLOCK_HASH]

func (set callbackSet[H, BLOCK_HASH]) values() []types.HeadTrackable[H, BLOCK_HASH] {
var values []types.HeadTrackable[H, BLOCK_HASH]
for _, callback := range set {
values = append(values, callback)
}
return values
}

type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct {
logger logger.Logger
callbacks callbackSet[H, BLOCK_HASH]
mailbox *utils.Mailbox[H]
mutex *sync.Mutex
chClose utils.StopChan
wgDone sync.WaitGroup
utils.StartStopOnce
latest H
lastCallbackID int
}

// NewHeadBroadcaster creates a new HeadBroadcaster
func NewHeadBroadcaster[
H types.Head[BLOCK_HASH],
BLOCK_HASH types.Hashable,
](
lggr logger.Logger,
) *HeadBroadcaster[H, BLOCK_HASH] {
return &HeadBroadcaster[H, BLOCK_HASH]{
logger: logger.Named(lggr, "HeadBroadcaster"),
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: utils.NewSingleMailbox[H](),
mutex: &sync.Mutex{},
chClose: make(chan struct{}),
wgDone: sync.WaitGroup{},
StartStopOnce: utils.StartStopOnce{},
}
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Start(context.Context) error {
return hb.StartOnce("HeadBroadcaster", func() error {
hb.wgDone.Add(1)
go hb.run()
return nil
})
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Close() error {
return hb.StopOnce("HeadBroadcaster", func() error {
hb.mutex.Lock()
// clear all callbacks
hb.callbacks = make(callbackSet[H, BLOCK_HASH])
hb.mutex.Unlock()

close(hb.chClose)
hb.wgDone.Wait()
return nil
})
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) Name() string {
return hb.logger.Name()
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error {
return map[string]error{hb.Name(): hb.StartStopOnce.Healthy()}
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) {
hb.mailbox.Deliver(head)
}

// Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed,
// or unsubscribe callback is called explicitly
func (hb *HeadBroadcaster[H, BLOCK_HASH]) Subscribe(callback types.HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) {
hb.mutex.Lock()
defer hb.mutex.Unlock()

currentLongestChain = hb.latest

hb.lastCallbackID++
callbackID := hb.lastCallbackID
hb.callbacks[callbackID] = callback
unsubscribe = func() {
hb.mutex.Lock()
defer hb.mutex.Unlock()
delete(hb.callbacks, callbackID)
}

return
}

func (hb *HeadBroadcaster[H, BLOCK_HASH]) run() {
defer hb.wgDone.Done()

for {
select {
case <-hb.chClose:
return
case <-hb.mailbox.Notify():
hb.executeCallbacks()
}
}
}

// DEV: the head relayer makes no promises about head delivery! Subscribing
// Jobs should expect to the relayer to skip heads if there is a large number of listeners
// and all callbacks cannot be completed in the allotted time.
func (hb *HeadBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
head, exists := hb.mailbox.Retrieve()
if !exists {
hb.logger.Info("No head to retrieve. It might have been skipped")
return
}

hb.mutex.Lock()
callbacks := hb.callbacks.values()
hb.latest = head
hb.mutex.Unlock()

hb.logger.Debugw("Initiating callbacks",
"headNum", head.BlockNumber(),
"numCallbacks", len(callbacks),
)

wg := sync.WaitGroup{}
wg.Add(len(callbacks))

ctx, cancel := hb.chClose.NewCtx()
defer cancel()

for _, callback := range callbacks {
go func(trackable types.HeadTrackable[H, BLOCK_HASH]) {
defer wg.Done()
start := time.Now()
cctx, cancel := context.WithTimeout(ctx, TrackableCallbackTimeout)
defer cancel()
trackable.OnNewLongestChain(cctx, head)
elapsed := time.Since(start)
hb.logger.Debugw(fmt.Sprintf("Finished callback in %s", elapsed),
"callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed)
}(callback)
}

wg.Wait()
}
218 changes: 218 additions & 0 deletions pkg/headtracker/head_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package headtracker

import (
"context"
"sync/atomic"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

htrktypes "github.com/smartcontractkit/chainlink-relay/pkg/headtracker/types"
"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

var (
promNumHeadsReceived = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "head_tracker_heads_received",
Help: "The total number of heads seen",
}, []string{"ChainID"})
promEthConnectionErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "head_tracker_connection_errors",
Help: "The total number of node connection errors",
}, []string{"ChainID"})
)

type HeadListener[
HTH htrktypes.Head[BLOCK_HASH, ID],
S types.Subscription,
ID types.ID,
BLOCK_HASH types.Hashable,
] struct {
config htrktypes.Config
client htrktypes.Client[HTH, S, ID, BLOCK_HASH]
logger logger.Logger
chStop utils.StopChan
chHeaders chan HTH
headSubscription types.Subscription
connected atomic.Bool
receivingHeads atomic.Bool
}

func NewHeadListener[
HTH htrktypes.Head[BLOCK_HASH, ID],
S types.Subscription,
ID types.ID,
BLOCK_HASH types.Hashable,
CLIENT htrktypes.Client[HTH, S, ID, BLOCK_HASH],
](
lggr logger.Logger,
client CLIENT,
config htrktypes.Config,
chStop chan struct{},
) *HeadListener[HTH, S, ID, BLOCK_HASH] {
return &HeadListener[HTH, S, ID, BLOCK_HASH]{
config: config,
client: client,
logger: logger.Named(lggr, "HeadListener"),
chStop: chStop,
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Name() string {
return hl.logger.Name()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH], done func()) {
defer done()
defer hl.unsubscribe()

ctx, cancel := hl.chStop.NewCtx()
defer cancel()

for {
if !hl.subscribe(ctx) {
break
}
err := hl.receiveHeaders(ctx, handleNewHead)
if ctx.Err() != nil {
break
} else if err != nil {
hl.logger.Errorw("Error in new head subscription, unsubscribed", "err", err)
continue
} else {
break
}
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) ReceivingHeads() bool {
return hl.receivingHeads.Load()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) Connected() bool {
return hl.connected.Load()
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error {
var err error
if !hl.ReceivingHeads() {
err = errors.New("Listener is not receiving heads")
}
if !hl.Connected() {
err = errors.New("Listener is not connected")
}
return map[string]error{hl.Name(): err}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead types.NewHeadHandler[HTH, BLOCK_HASH]) error {
var noHeadsAlarmC <-chan time.Time
var noHeadsAlarmT *time.Ticker
noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold()
if noHeadsAlarmDuration > 0 {
noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration)
noHeadsAlarmC = noHeadsAlarmT.C
}

for {
select {
case <-hl.chStop:
return nil

case blockHeader, open := <-hl.chHeaders:
chainId := hl.client.ConfiguredChainID()
if noHeadsAlarmT != nil {
// We've received a head, reset the no heads alarm
noHeadsAlarmT.Stop()
noHeadsAlarmT = time.NewTicker(noHeadsAlarmDuration)
noHeadsAlarmC = noHeadsAlarmT.C
}
hl.receivingHeads.Store(true)
if !open {
return errors.New("head listener: chHeaders prematurely closed")
}
if !blockHeader.IsValid() {
hl.logger.Error("got nil block header")
continue
}

// Compare the chain ID of the block header to the chain ID of the client
if !blockHeader.HasChainID() || blockHeader.ChainID().String() != chainId.String() {
hl.logger.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID())
}
promNumHeadsReceived.WithLabelValues(chainId.String()).Inc()

err := handleNewHead(ctx, blockHeader)
if ctx.Err() != nil {
return nil
} else if err != nil {
return err
}

case err, open := <-hl.headSubscription.Err():
// err can be nil, because of using chainIDSubForwarder
if !open || err == nil {
return errors.New("head listener: subscription Err channel prematurely closed")
}
return err

case <-noHeadsAlarmC:
// We haven't received a head on the channel for a long time, log a warning
hl.logger.Warnf("have not received a head for %v", noHeadsAlarmDuration)
hl.receivingHeads.Store(false)
}
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) bool {
subscribeRetryBackoff := utils.NewRedialBackoff()

chainId := hl.client.ConfiguredChainID()

for {
hl.unsubscribe()

hl.logger.Debugf("Subscribing to new heads on chain %s", chainId.String())

select {
case <-hl.chStop:
return false

case <-time.After(subscribeRetryBackoff.Duration()):
err := hl.subscribeToHead(ctx)
if err != nil {
promEthConnectionErrors.WithLabelValues(chainId.String()).Inc()
hl.logger.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err)
} else {
hl.logger.Debugf("Subscribed to heads on chain %s", chainId.String())
return true
}
}
}
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) subscribeToHead(ctx context.Context) error {
hl.chHeaders = make(chan HTH)

var err error
hl.headSubscription, err = hl.client.SubscribeNewHead(ctx, hl.chHeaders)
if err != nil {
close(hl.chHeaders)
return errors.Wrap(err, "EthClient#SubscribeNewHead")
}

hl.connected.Store(true)

return nil
}

func (hl *HeadListener[HTH, S, ID, BLOCK_HASH]) unsubscribe() {
if hl.headSubscription != nil {
hl.connected.Store(false)
hl.headSubscription.Unsubscribe()
hl.headSubscription = nil
}
}
Loading