Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeboost Bulk BlockMetadata API #2754

Open
wants to merge 13 commits into
base: add-timeboosted-broadcastfeedmessage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,3 +1034,7 @@ func (n *Node) ValidatedMessageCount() (arbutil.MessageIndex, error) {
}
return n.BlockValidator.GetValidated(), nil
}

func (n *Node) BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) {
return n.TxStreamer.BlockMetadataAtCount(count)
}
23 changes: 20 additions & 3 deletions execution/gethexec/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/arbitrum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -25,17 +26,33 @@ import (
)

type ArbAPI struct {
txPublisher TransactionPublisher
txPublisher TransactionPublisher
bulkBlockMetadataFetcher *BulkBlockMetadataFetcher
}

func NewArbAPI(publisher TransactionPublisher, bulkBlockMetadataFetcher *BulkBlockMetadataFetcher) *ArbAPI {
return &ArbAPI{
txPublisher: publisher,
bulkBlockMetadataFetcher: bulkBlockMetadataFetcher,
}
}

func NewArbAPI(publisher TransactionPublisher) *ArbAPI {
return &ArbAPI{publisher}
type NumberAndBlockMetadata struct {
BlockNumber uint64 `json:"blockNumber"`
RawMetadata hexutil.Bytes `json:"rawMetadata"`
}

func (a *ArbAPI) CheckPublisherHealth(ctx context.Context) error {
return a.txPublisher.CheckHealth(ctx)
}

func (a *ArbAPI) GetRawBlockMetadata(ctx context.Context, fromBlock, toBlock rpc.BlockNumber) ([]NumberAndBlockMetadata, error) {
if a.bulkBlockMetadataFetcher == nil {
return nil, errors.New("arb_getRawBlockMetadata is not available")
}
return a.bulkBlockMetadataFetcher.Fetch(fromBlock, toBlock)
}

type ArbTimeboostAuctioneerAPI struct {
txPublisher TransactionPublisher
}
Expand Down
108 changes: 108 additions & 0 deletions execution/gethexec/blockmetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package gethexec

import (
"context"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

var ErrBlockMetadataApiBlocksLimitExceeded = errors.New("number of blocks requested for blockMetadata exceeded")

type BlockMetadataFetcher interface {
BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error)
BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error)
MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64
SetReorgEventsNotifier(reorgEventsNotifier chan struct{})
}

type BulkBlockMetadataFetcher struct {
stopwaiter.StopWaiter
bc *core.BlockChain
fetcher BlockMetadataFetcher
reorgDetector chan struct{}
blocksLimit uint64
cache *lru.SizeConstrainedCache[arbutil.MessageIndex, arbostypes.BlockMetadata]
}

func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize, blocksLimit uint64) *BulkBlockMetadataFetcher {
var cache *lru.SizeConstrainedCache[arbutil.MessageIndex, arbostypes.BlockMetadata]
var reorgDetector chan struct{}
if cacheSize != 0 {
cache = lru.NewSizeConstrainedCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize)
reorgDetector = make(chan struct{})
fetcher.SetReorgEventsNotifier(reorgDetector)
}
return &BulkBlockMetadataFetcher{
bc: bc,
fetcher: fetcher,
cache: cache,
reorgDetector: reorgDetector,
blocksLimit: blocksLimit,
}
}

func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([]NumberAndBlockMetadata, error) {
fromBlock, _ = b.bc.ClipToPostNitroGenesis(fromBlock)
toBlock, _ = b.bc.ClipToPostNitroGenesis(toBlock)
start, err := b.fetcher.BlockNumberToMessageIndex(uint64(fromBlock))
if err != nil {
return nil, fmt.Errorf("error converting fromBlock blocknumber to message index: %w", err)
}
end, err := b.fetcher.BlockNumberToMessageIndex(uint64(toBlock))
if err != nil {
return nil, fmt.Errorf("error converting toBlock blocknumber to message index: %w", err)
}
if start > end {
return nil, fmt.Errorf("invalid inputs, fromBlock: %d is greater than toBlock: %d", fromBlock, toBlock)
}
if b.blocksLimit > 0 && end-start+1 > arbutil.MessageIndex(b.blocksLimit) {
return nil, fmt.Errorf("%w. Range requested- %d, Limit- %d", ErrBlockMetadataApiBlocksLimitExceeded, end-start+1, b.blocksLimit)
}
var result []NumberAndBlockMetadata
for i := start; i <= end; i++ {
var data arbostypes.BlockMetadata
var found bool
if b.cache != nil {
data, found = b.cache.Get(i)
}
if !found {
data, err = b.fetcher.BlockMetadataAtCount(i + 1)
if err != nil {
return nil, err
}
if data != nil && b.cache != nil {
b.cache.Add(i, data)
}
}
if data != nil {
result = append(result, NumberAndBlockMetadata{
BlockNumber: b.fetcher.MessageIndexToBlockNumber(i),
RawMetadata: (hexutil.Bytes)(data),
})
}
}
return result, nil
}

func (b *BulkBlockMetadataFetcher) ClearCache(ctx context.Context, ignored struct{}) {
b.cache.Clear()
}

func (b *BulkBlockMetadataFetcher) Start(ctx context.Context) {
b.StopWaiter.Start(ctx, b)
if b.reorgDetector != nil {
stopwaiter.CallWhenTriggeredWith[struct{}](&b.StopWaiterSafe, b.ClearCache, b.reorgDetector)
}
}

func (b *BulkBlockMetadataFetcher) StopAndWait() {
b.StopWaiter.StopAndWait()
}
31 changes: 28 additions & 3 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ type ExecutionEngine struct {
resequenceChan chan []*arbostypes.MessageWithMetadata
createBlocksMutex sync.Mutex

newBlockNotifier chan struct{}
latestBlockMutex sync.Mutex
latestBlock *types.Block
newBlockNotifier chan struct{}
reorgEventsNotifier chan struct{}
latestBlockMutex sync.Mutex
latestBlock *types.Block

nextScheduledVersionCheck time.Time // protected by the createBlocksMutex

Expand Down Expand Up @@ -182,6 +183,16 @@ func (s *ExecutionEngine) SetRecorder(recorder *BlockRecorder) {
s.recorder = recorder
}

func (s *ExecutionEngine) SetReorgEventsNotifier(reorgEventsNotifier chan struct{}) {
if s.Started() {
panic("trying to set reorg events notifier after start")
}
if s.reorgEventsNotifier != nil {
panic("trying to set reorg events notifier when already set")
}
s.reorgEventsNotifier = reorgEventsNotifier
}

func (s *ExecutionEngine) EnableReorgSequencing() {
if s.Started() {
panic("trying to enable reorg sequencing after start")
Expand Down Expand Up @@ -212,6 +223,13 @@ func (s *ExecutionEngine) SetConsensus(consensus execution.FullConsensusClient)
s.consensus = consensus
}

func (s *ExecutionEngine) BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) {
if s.consensus != nil {
return s.consensus.BlockMetadataAtCount(count)
}
return nil, errors.New("FullConsensusClient is not accessible to execution")
}

func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher {
return s.consensus
}
Expand Down Expand Up @@ -246,6 +264,13 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost
return nil, err
}

if s.reorgEventsNotifier != nil {
select {
case s.reorgEventsNotifier <- struct{}{}:
default:
}
}

newMessagesResults := make([]*execution.MessageResult, 0, len(oldMessages))
for i := range newMessages {
var msgForPrefetch *arbostypes.MessageWithMetadata
Expand Down
114 changes: 63 additions & 51 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,21 @@ func StylusTargetConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type Config struct {
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
StylusTarget StylusTargetConfig `koanf:"stylus-target"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
StylusTarget StylusTargetConfig `koanf:"stylus-target"`
BlockMetadataApiCacheSize uint64 `koanf:"block-metadata-api-cache-size"`
BlockMetadataApiBlocksLimit uint64 `koanf:"block-metadata-api-blocks-limit"`

forwardingTarget string
}
Expand Down Expand Up @@ -99,39 +101,44 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".tx-lookup-limit", ConfigDefault.TxLookupLimit, "retain the ability to lookup transactions by hash for the past N blocks (0 = all blocks)")
f.Bool(prefix+".enable-prefetch-block", ConfigDefault.EnablePrefetchBlock, "enable prefetching of blocks")
StylusTargetConfigAddOptions(prefix+".stylus-target", f)
f.Uint64(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size (in bytes) of lru cache storing the blockMetadata to service arb_getRawBlockMetadata")
f.Uint64(prefix+".block-metadata-api-blocks-limit", ConfigDefault.BlockMetadataApiBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query. Enabled by default, set 0 to disable the limit")
}

var ConfigDefault = Config{
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
ForwardingTarget: "",
SecondaryForwardingTarget: []string{},
TxPreChecker: DefaultTxPreCheckerConfig,
TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second
Caching: DefaultCachingConfig,
Forwarder: DefaultNodeForwarderConfig,
EnablePrefetchBlock: true,
StylusTarget: DefaultStylusTargetConfig,
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
ForwardingTarget: "",
SecondaryForwardingTarget: []string{},
TxPreChecker: DefaultTxPreCheckerConfig,
TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second
Caching: DefaultCachingConfig,
Forwarder: DefaultNodeForwarderConfig,
EnablePrefetchBlock: true,
StylusTarget: DefaultStylusTargetConfig,
BlockMetadataApiCacheSize: 100 * 1024 * 1024,
BlockMetadataApiBlocksLimit: 100,
}

type ConfigFetcher func() *Config

type ExecutionNode struct {
ChainDB ethdb.Database
Backend *arbitrum.Backend
FilterSystem *filters.FilterSystem
ArbInterface *ArbInterface
ExecEngine *ExecutionEngine
Recorder *BlockRecorder
Sequencer *Sequencer // either nil or same as TxPublisher
TxPublisher TransactionPublisher
ConfigFetcher ConfigFetcher
SyncMonitor *SyncMonitor
ParentChainReader *headerreader.HeaderReader
ClassicOutbox *ClassicOutboxRetriever
started atomic.Bool
ChainDB ethdb.Database
Backend *arbitrum.Backend
FilterSystem *filters.FilterSystem
ArbInterface *ArbInterface
ExecEngine *ExecutionEngine
Recorder *BlockRecorder
Sequencer *Sequencer // either nil or same as TxPublisher
TxPublisher TransactionPublisher
ConfigFetcher ConfigFetcher
SyncMonitor *SyncMonitor
ParentChainReader *headerreader.HeaderReader
ClassicOutbox *ClassicOutboxRetriever
started atomic.Bool
bulkBlockMetadataFetcher *BulkBlockMetadataFetcher
}

func CreateExecutionNode(
Expand Down Expand Up @@ -218,10 +225,12 @@ func CreateExecutionNode(
}
}

bulkBlockMetadataFetcher := NewBulkBlockMetadataFetcher(l2BlockChain, execEngine, config.BlockMetadataApiCacheSize, config.BlockMetadataApiBlocksLimit)

apis := []rpc.API{{
Namespace: "arb",
Version: "1.0",
Service: NewArbAPI(txPublisher),
Service: NewArbAPI(txPublisher, bulkBlockMetadataFetcher),
Public: false,
}}
apis = append(apis, rpc.API{
Expand Down Expand Up @@ -265,18 +274,19 @@ func CreateExecutionNode(
stack.RegisterAPIs(apis)

return &ExecutionNode{
ChainDB: chainDB,
Backend: backend,
FilterSystem: filterSystem,
ArbInterface: arbInterface,
ExecEngine: execEngine,
Recorder: recorder,
Sequencer: sequencer,
TxPublisher: txPublisher,
ConfigFetcher: configFetcher,
SyncMonitor: syncMon,
ParentChainReader: parentChainReader,
ClassicOutbox: classicOutbox,
ChainDB: chainDB,
Backend: backend,
FilterSystem: filterSystem,
ArbInterface: arbInterface,
ExecEngine: execEngine,
Recorder: recorder,
Sequencer: sequencer,
TxPublisher: txPublisher,
ConfigFetcher: configFetcher,
SyncMonitor: syncMon,
ParentChainReader: parentChainReader,
ClassicOutbox: classicOutbox,
bulkBlockMetadataFetcher: bulkBlockMetadataFetcher,
}, nil

}
Expand Down Expand Up @@ -325,13 +335,15 @@ func (n *ExecutionNode) Start(ctx context.Context) error {
if n.ParentChainReader != nil {
n.ParentChainReader.Start(ctx)
}
n.bulkBlockMetadataFetcher.Start(ctx)
return nil
}

func (n *ExecutionNode) StopAndWait() {
if !n.started.Load() {
return
}
n.bulkBlockMetadataFetcher.StopAndWait()
// TODO after separation
// n.Stack.StopRPC() // does nothing if not running
if n.TxPublisher.Started() {
Expand Down
1 change: 1 addition & 0 deletions execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type ConsensusInfo interface {
Synced() bool
FullSyncProgressMap() map[string]interface{}
SyncTargetMessageCount() arbutil.MessageIndex
BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error)

// TODO: switch from pulling to pushing safe/finalized
GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error)
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
Loading
Loading