Skip to content

Commit

Permalink
refactor: decouple chainsync from blockfetch
Browse files Browse the repository at this point in the history
* generate events for blockfetch
* create interface for ledger to request range of blocks from node
* migrate bulk mode to ledger and operate in smaller batches
* enable pipelining for chainsync RequestNext messages
* include tip information with chainsync events

Fixes #175
  • Loading branch information
agaffney committed Nov 15, 2024
1 parent a109c4e commit b283702
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 338 deletions.
64 changes: 56 additions & 8 deletions blockfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
package node

import (
"encoding/hex"
"fmt"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/ledger"
"github.com/blinklabs-io/gouroboros/protocol/blockfetch"
oblockfetch "github.com/blinklabs-io/gouroboros/protocol/blockfetch"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
"github.com/blinklabs-io/node/event"
"github.com/blinklabs-io/node/state"
)

func (n *Node) blockfetchServerConnOpts() []oblockfetch.BlockFetchOptionFunc {
Expand All @@ -30,6 +36,7 @@ func (n *Node) blockfetchServerConnOpts() []oblockfetch.BlockFetchOptionFunc {
func (n *Node) blockfetchClientConnOpts() []oblockfetch.BlockFetchOptionFunc {
return []oblockfetch.BlockFetchOptionFunc{
oblockfetch.WithBlockFunc(n.blockfetchClientBlock),
oblockfetch.WithBatchDoneFunc(n.blockfetchClientBatchDone),
}
}

Expand Down Expand Up @@ -77,19 +84,60 @@ func (n *Node) blockfetchServerRequestRange(
return nil
}

// blockfetchClientRequestRange is called by the ledger when it needs to request a range of block bodies
func (n *Node) blockfetchClientRequestRange(
connId ouroboros.ConnectionId,
start ocommon.Point,
end ocommon.Point,
) error {
conn := n.connManager.GetConnectionById(connId)
if conn == nil {
return fmt.Errorf("failed to lookup connection ID: %s", connId.String())
}
oConn := conn.Conn
if err := oConn.BlockFetch().Client.GetBlockRange(start, end); err != nil {
return err
}
return nil
}

func (n *Node) blockfetchClientBlock(
ctx blockfetch.CallbackContext,
blockType uint,
block ledger.Block,
) error {
if err := n.chainsyncState.AddBlock(block, blockType); err != nil {
return err
}
// Start normal chain-sync if we've reached the last block of our bulk range
if block.SlotNumber() == n.chainsyncBulkRangeEnd.Slot {
if err := n.chainsyncClientStart(ctx.ConnectionId); err != nil {
return err
}
// Generate event
blkHash, err := hex.DecodeString(block.Hash())
if err != nil {
return fmt.Errorf("decode block hash: %w", err)
}
n.eventBus.Publish(
state.BlockfetchEventType,
event.NewEvent(
state.BlockfetchEventType,
state.BlockfetchEvent{
Point: ocommon.NewPoint(block.SlotNumber(), blkHash),
Type: blockType,
Block: block,
},
),
)
return nil
}

func (n *Node) blockfetchClientBatchDone(
ctx blockfetch.CallbackContext,
) error {
// Generate event
n.eventBus.Publish(
state.BlockfetchEventType,
event.NewEvent(
state.BlockfetchEventType,
state.BlockfetchEvent{
ConnectionId: ctx.ConnectionId,
BatchDone: true,
},
),
)
return nil
}
57 changes: 16 additions & 41 deletions chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func (n *Node) chainsyncClientConnOpts() []ochainsync.ChainSyncOptionFunc {
return []ochainsync.ChainSyncOptionFunc{
ochainsync.WithRollForwardFunc(n.chainsyncClientRollForward),
ochainsync.WithRollBackwardFunc(n.chainsyncClientRollBackward),
// Enable pipelining of RequestNext messages to speed up chainsync
ochainsync.WithPipelineLimit(10),
}
}

Expand Down Expand Up @@ -78,23 +80,7 @@ func (n *Node) chainsyncClientStart(connId ouroboros.ConnectionId) error {
)
}
}
// Determine available block range between intersect point(s) and current tip
bulkRangeStart, bulkRangeEnd, err := oConn.ChainSync().Client.GetAvailableBlockRange(
intersectPoints,
)
if err != nil {
return err
}
if bulkRangeStart.Slot == 0 && bulkRangeEnd.Slot == 0 {
// We're already at chain tip, so start a normal sync
return oConn.ChainSync().Client.Sync(intersectPoints)
}
// Use BlockFetch to request the entire available block range at once
n.chainsyncBulkRangeEnd = bulkRangeEnd
if err := oConn.BlockFetch().Client.GetBlockRange(bulkRangeStart, bulkRangeEnd); err != nil {
return err
}
return nil
return oConn.ChainSync().Client.Sync(intersectPoints)
}

func (n *Node) chainsyncServerFindIntersect(
Expand Down Expand Up @@ -195,12 +181,18 @@ func (n *Node) chainsyncClientRollBackward(
point ocommon.Point,
tip ochainsync.Tip,
) error {
if err := n.chainsyncState.Rollback(
point.Slot,
hex.EncodeToString(point.Hash),
); err != nil {
return err
}
// Generate event
n.eventBus.Publish(
state.ChainsyncEventType,
event.NewEvent(
state.ChainsyncEventType,
state.ChainsyncEvent{
Rollback: true,
Point: point,
Tip: tip,
},
),
)
return nil
}

Expand All @@ -210,14 +202,10 @@ func (n *Node) chainsyncClientRollForward(
blockData interface{},
tip ochainsync.Tip,
) error {
var blk ledger.Block
switch v := blockData.(type) {
case ledger.Block:
blk = v
case ledger.BlockHeader:
blockSlot := v.SlotNumber()
blockHash, _ := hex.DecodeString(v.Hash())
// Publish event
n.eventBus.Publish(
state.ChainsyncEventType,
event.NewEvent(
Expand All @@ -227,25 +215,12 @@ func (n *Node) chainsyncClientRollForward(
Point: ocommon.NewPoint(blockSlot, blockHash),
Type: blockType,
BlockHeader: v,
Tip: tip,
},
),
)
// Fetch block content via block-fetch
conn := n.connManager.GetConnectionById(ctx.ConnectionId)
if conn == nil {
return fmt.Errorf("failed to lookup connection ID: %s", ctx.ConnectionId.String())
}
oConn := conn.Conn
tmpBlock, err := oConn.BlockFetch().Client.GetBlock(ocommon.Point{Slot: blockSlot, Hash: blockHash})
if err != nil {
return err
}
blk = tmpBlock
default:
return fmt.Errorf("unexpected block data type: %T", v)
}
if err := n.chainsyncState.AddBlock(blk, blockType); err != nil {
return err
}
return nil
}
46 changes: 0 additions & 46 deletions chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
package chainsync

import (
"encoding/hex"
"fmt"
"sync"

"github.com/blinklabs-io/node/event"
"github.com/blinklabs-io/node/state"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/connection"
"github.com/blinklabs-io/gouroboros/ledger"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)

Expand Down Expand Up @@ -98,46 +95,3 @@ func (s *State) RemoveClientConnId(connId ouroboros.ConnectionId) {
s.clientConnId = nil
}
}

func (s *State) AddBlock(block ledger.Block, blockType uint) error {
s.Lock()
defer s.Unlock()
// Generate event
blkHash, err := hex.DecodeString(block.Hash())
if err != nil {
return fmt.Errorf("decode block hash: %w", err)
}
s.eventBus.Publish(
state.ChainsyncEventType,
event.NewEvent(
state.ChainsyncEventType,
state.ChainsyncEvent{
Point: ocommon.NewPoint(block.SlotNumber(), blkHash),
Type: blockType,
Block: block,
},
),
)
return nil
}

func (s *State) Rollback(slot uint64, hash string) error {
s.Lock()
defer s.Unlock()
// Generate event
blkHash, err := hex.DecodeString(hash)
if err != nil {
return fmt.Errorf("decode block hash: %w", err)
}
s.eventBus.Publish(
state.ChainsyncEventType,
event.NewEvent(
state.ChainsyncEventType,
state.ChainsyncEvent{
Rollback: true,
Point: ocommon.NewPoint(slot, blkHash),
},
),
)
return nil
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22
toolchain go1.22.6

require (
github.com/blinklabs-io/gouroboros v0.103.5
github.com/blinklabs-io/gouroboros v0.104.0
github.com/blinklabs-io/ouroboros-mock v0.3.5
github.com/dgraph-io/badger/v4 v4.3.1
github.com/glebarez/sqlite v1.11.0
Expand Down Expand Up @@ -57,16 +57,16 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/utxorpc/go-codegen v0.11.0 // indirect
github.com/utxorpc/go-codegen v0.12.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/text v0.20.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/grpc v1.67.1 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blinklabs-io/gouroboros v0.103.5 h1:PchF9z2bGYZC/QzeCJAtZLqMv34geE6oCRzLD+4pzfY=
github.com/blinklabs-io/gouroboros v0.103.5/go.mod h1:wjiNCbZ2uQy9DGfLCgEgqagHxNBAv5UYsOdRBgoi3SU=
github.com/blinklabs-io/gouroboros v0.104.0 h1:spfE9d0GTQAjs7LuK8pigs4Mnse17XSh3OI3aE69Cb4=
github.com/blinklabs-io/gouroboros v0.104.0/go.mod h1:nW0/J6Zv5Oupr4MHehfJ3noSXu7VSxKWusFRNKo0nhI=
github.com/blinklabs-io/ouroboros-mock v0.3.5 h1:/KWbSoH8Pjrd9uxOH7mVbI7XFsDCNW/O9FtLlvJDUpQ=
github.com/blinklabs-io/ouroboros-mock v0.3.5/go.mod h1:JtUQ3Luo22hCnGBxuxNp6JaUx63VxidxWwmcaVMremw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down Expand Up @@ -138,8 +138,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/utxorpc/go-codegen v0.11.0 h1:MxiC//jV/2xj8kadAuiW7wgtbE+4YhYrf4uxCh8NheA=
github.com/utxorpc/go-codegen v0.11.0/go.mod h1:NHXsykQWNetMMm2Kak+PfqmEY9Htgs6unJENPC4Kobs=
github.com/utxorpc/go-codegen v0.12.0 h1:uSmJ4jTQpFbPaq8zCdYJi1/pnJmoCLFCcBitybjH+gQ=
github.com/utxorpc/go-codegen v0.12.0/go.mod h1:NHXsykQWNetMMm2Kak+PfqmEY9Htgs6unJENPC4Kobs=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -169,8 +169,8 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand Down Expand Up @@ -203,8 +203,8 @@ golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
11 changes: 6 additions & 5 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ func (n *Node) Run() error {
// Load state
state, err := state.NewLedgerState(
state.LedgerStateConfig{
DataDir: n.config.dataDir,
EventBus: n.eventBus,
Logger: n.config.logger,
CardanoNodeConfig: n.config.cardanoNodeConfig,
PromRegistry: n.config.promRegistry,
DataDir: n.config.dataDir,
EventBus: n.eventBus,
Logger: n.config.logger,
CardanoNodeConfig: n.config.cardanoNodeConfig,
PromRegistry: n.config.promRegistry,
BlockfetchRequestRangeFunc: n.blockfetchClientRequestRange,
},
)
if err != nil {
Expand Down
Loading

0 comments on commit b283702

Please sign in to comment.