Skip to content

Commit

Permalink
fix: add timeout for bulk blockfetch operation
Browse files Browse the repository at this point in the history
  • Loading branch information
agaffney committed Nov 17, 2024
1 parent 807ad7f commit 35b6ea3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
3 changes: 3 additions & 0 deletions blockfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package node
import (
"encoding/hex"
"fmt"
"time"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/ledger"
Expand All @@ -37,6 +38,8 @@ func (n *Node) blockfetchClientConnOpts() []oblockfetch.BlockFetchOptionFunc {
return []oblockfetch.BlockFetchOptionFunc{
oblockfetch.WithBlockFunc(n.blockfetchClientBlock),
oblockfetch.WithBatchDoneFunc(n.blockfetchClientBatchDone),
oblockfetch.WithBatchStartTimeout(2 * time.Second),
oblockfetch.WithBlockTimeout(2 * time.Second),
}
}

Expand Down
21 changes: 20 additions & 1 deletion state/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package state

import (
"fmt"
"time"

"github.com/blinklabs-io/node/database"
"github.com/blinklabs-io/node/event"
Expand All @@ -25,6 +26,10 @@ import (
const (
blockfetchBatchSize = 500
blockfetchBatchSlotThreshold = 2500 * 20 // TODO: calculate from protocol params

// Timeout for updates on a blockfetch operation. This is based on a 2s BatchStart
// and a 2s Block timeout for blockfetch
blockfetchBusyTimeout = 5 * time.Second
)

func (ls *LedgerState) handleEventChainsync(evt event.Event) {
Expand Down Expand Up @@ -91,6 +96,15 @@ func (ls *LedgerState) handleEventChainsyncBlockHeader(e ChainsyncEvent) error {
}
// Don't start fetch if there's already one in progress
if ls.chainsyncBlockfetchBusy {
// Clear busy flag on timeout
if time.Since(ls.chainsyncBlockfetchBusyTime) > blockfetchBusyTimeout {
ls.chainsyncBlockfetchBusy = false
ls.config.Logger.Warn(
fmt.Sprintf("blockfetch operation timed out after %s", blockfetchBusyTimeout),
"component",
"ledger",
)
}
ls.chainsyncBlockfetchWaiting = true
return nil
}
Expand All @@ -104,6 +118,7 @@ func (ls *LedgerState) handleEventChainsyncBlockHeader(e ChainsyncEvent) error {
return err
}
ls.chainsyncBlockfetchBusy = true
ls.chainsyncBlockfetchBusyTime = time.Now()
// Reset cached header points
ls.chainsyncHeaderPoints = nil
return nil
Expand All @@ -114,6 +129,8 @@ func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error {
ls.chainsyncBlockEvents,
e,
)
// Update busy time in order to detect fetch timeout
ls.chainsyncBlockfetchBusyTime = time.Now()
return nil
}

Expand Down Expand Up @@ -310,7 +327,8 @@ func (ls *LedgerState) handleEventBlockfetchBatchDone(e BlockfetchEvent) error {
return err
}
// Check for pending block range request
if !ls.chainsyncBlockfetchWaiting {
if !ls.chainsyncBlockfetchWaiting ||
len(ls.chainsyncHeaderPoints) == 0 {
ls.chainsyncBlockfetchBusy = false
return nil
}
Expand All @@ -323,6 +341,7 @@ func (ls *LedgerState) handleEventBlockfetchBatchDone(e BlockfetchEvent) error {
if err != nil {
return err
}
ls.chainsyncBlockfetchBusyTime = time.Now()
// Reset cached header points
ls.chainsyncHeaderPoints = nil
return nil
Expand Down
25 changes: 13 additions & 12 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ type BlockfetchRequestRangeFunc func(ouroboros.ConnectionId, ocommon.Point, ocom

type LedgerState struct {
sync.RWMutex
config LedgerStateConfig
db database.Database
timerCleanupConsumedUtxos *time.Timer
currentPParams any
currentEpoch models.Epoch
currentEra eras.EraDesc
currentTip ochainsync.Tip
metrics stateMetrics
chainsyncHeaderPoints []ocommon.Point
chainsyncBlockEvents []BlockfetchEvent
chainsyncBlockfetchBusy bool
chainsyncBlockfetchWaiting bool
config LedgerStateConfig
db database.Database
timerCleanupConsumedUtxos *time.Timer
currentPParams any
currentEpoch models.Epoch
currentEra eras.EraDesc
currentTip ochainsync.Tip
metrics stateMetrics
chainsyncHeaderPoints []ocommon.Point
chainsyncBlockEvents []BlockfetchEvent
chainsyncBlockfetchBusy bool
chainsyncBlockfetchBusyTime time.Time
chainsyncBlockfetchWaiting bool
}

func NewLedgerState(cfg LedgerStateConfig) (*LedgerState, error) {
Expand Down

0 comments on commit 35b6ea3

Please sign in to comment.