Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeboost Bulk BlockMetadata API #2754

Open
wants to merge 13 commits into
base: add-timeboosted-broadcastfeedmessage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,3 +1034,7 @@ func (n *Node) ValidatedMessageCount() (arbutil.MessageIndex, error) {
}
return n.BlockValidator.GetValidated(), nil
}

func (n *Node) BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) {
return n.TxStreamer.BlockMetadataAtCount(count)
}
23 changes: 20 additions & 3 deletions execution/gethexec/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/arbitrum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -25,17 +26,33 @@ import (
)

type ArbAPI struct {
txPublisher TransactionPublisher
txPublisher TransactionPublisher
bulkBlockMetadataFetcher *BulkBlockMetadataFetcher
}

func NewArbAPI(publisher TransactionPublisher, bulkBlockMetadataFetcher *BulkBlockMetadataFetcher) *ArbAPI {
return &ArbAPI{
txPublisher: publisher,
bulkBlockMetadataFetcher: bulkBlockMetadataFetcher,
}
}

func NewArbAPI(publisher TransactionPublisher) *ArbAPI {
return &ArbAPI{publisher}
type NumberAndBlockMetadata struct {
BlockNumber uint64 `json:"blockNumber"`
RawMetadata hexutil.Bytes `json:"rawMetadata"`
}

func (a *ArbAPI) CheckPublisherHealth(ctx context.Context) error {
return a.txPublisher.CheckHealth(ctx)
}

func (a *ArbAPI) GetRawBlockMetadata(ctx context.Context, fromBlock, toBlock rpc.BlockNumber) ([]NumberAndBlockMetadata, error) {
if a.bulkBlockMetadataFetcher == nil {
return nil, errors.New("arb_getRawBlockMetadata is not available")
}
return a.bulkBlockMetadataFetcher.Fetch(fromBlock, toBlock)
}

type ArbTimeboostAuctioneerAPI struct {
txPublisher TransactionPublisher
}
Expand Down
116 changes: 116 additions & 0 deletions execution/gethexec/blockmetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package gethexec

import (
"context"
"errors"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

var ErrBlockMetadataApiBlocksLimitExceeded = errors.New("number of blocks requested for blockMetadata exceeded")

type BlockMetadataFetcher interface {
BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error)
BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error)
MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64
SetReorgEventsReader(reorgEventsReader chan struct{})
}

type BulkBlockMetadataFetcher struct {
stopwaiter.StopWaiter
bc *core.BlockChain
fetcher BlockMetadataFetcher
reorgDetector chan struct{}
blocksLimit int
cacheMutex sync.RWMutex
cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata]
}

func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize, blocksLimit int) *BulkBlockMetadataFetcher {
var cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata]
var reorgDetector chan struct{}
if cacheSize != 0 {
cache = containers.NewLruCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize)
reorgDetector = make(chan struct{})
fetcher.SetReorgEventsReader(reorgDetector)
}
return &BulkBlockMetadataFetcher{
bc: bc,
fetcher: fetcher,
cache: cache,
reorgDetector: reorgDetector,
blocksLimit: blocksLimit,
}
}

func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([]NumberAndBlockMetadata, error) {
fromBlock, _ = b.bc.ClipToPostNitroGenesis(fromBlock)
toBlock, _ = b.bc.ClipToPostNitroGenesis(toBlock)
start, err := b.fetcher.BlockNumberToMessageIndex(uint64(fromBlock))
if err != nil {
return nil, fmt.Errorf("error converting fromBlock blocknumber to message index: %w", err)
}
end, err := b.fetcher.BlockNumberToMessageIndex(uint64(toBlock))
if err != nil {
return nil, fmt.Errorf("error converting toBlock blocknumber to message index: %w", err)
}
if start > end {
return nil, fmt.Errorf("invalid inputs, fromBlock: %d is greater than toBlock: %d", fromBlock, toBlock)
}
if b.blocksLimit > 0 && end-start+1 > arbutil.MessageIndex(b.blocksLimit) {
return nil, fmt.Errorf("%w. Range requested- %d, Limit- %d", ErrBlockMetadataApiBlocksLimitExceeded, end-start+1, b.blocksLimit)
}
var result []NumberAndBlockMetadata
for i := start; i <= end; i++ {
var data arbostypes.BlockMetadata
var found bool
if b.cache != nil {
b.cacheMutex.RLock()
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
data, found = b.cache.Get(i)
b.cacheMutex.RUnlock()
}
if !found {
data, err = b.fetcher.BlockMetadataAtCount(i + 1)
if err != nil {
return nil, err
}
if data != nil && b.cache != nil {
b.cacheMutex.Lock()
b.cache.Add(i, data)
b.cacheMutex.Unlock()
}
}
if data != nil {
result = append(result, NumberAndBlockMetadata{
BlockNumber: b.fetcher.MessageIndexToBlockNumber(i),
RawMetadata: (hexutil.Bytes)(data),
})
}
}
return result, nil
}

func (b *BulkBlockMetadataFetcher) ClearCache(ctx context.Context, ignored struct{}) {
b.cacheMutex.Lock()
b.cache.Clear()
b.cacheMutex.Unlock()
}

func (b *BulkBlockMetadataFetcher) Start(ctx context.Context) {
b.StopWaiter.Start(ctx, b)
if b.reorgDetector != nil {
stopwaiter.CallWhenTriggeredWith[struct{}](&b.StopWaiterSafe, b.ClearCache, b.reorgDetector)
}
}

func (b *BulkBlockMetadataFetcher) StopAndWait() {
b.StopWaiter.StopAndWait()
}
24 changes: 21 additions & 3 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ type ExecutionEngine struct {
resequenceChan chan []*arbostypes.MessageWithMetadata
createBlocksMutex sync.Mutex

newBlockNotifier chan struct{}
latestBlockMutex sync.Mutex
latestBlock *types.Block
newBlockNotifier chan struct{}
reorgEventsReader chan struct{}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
latestBlockMutex sync.Mutex
latestBlock *types.Block

nextScheduledVersionCheck time.Time // protected by the createBlocksMutex

Expand Down Expand Up @@ -134,6 +135,10 @@ func (s *ExecutionEngine) backlogL1GasCharged() uint64 {
s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged)
}

func (s *ExecutionEngine) SetReorgEventsReader(reorgEventsReader chan struct{}) {
s.reorgEventsReader = reorgEventsReader
}

func (s *ExecutionEngine) MarkFeedStart(to arbutil.MessageIndex) {
s.cachedL1PriceData.mutex.Lock()
defer s.cachedL1PriceData.mutex.Unlock()
Expand Down Expand Up @@ -212,6 +217,13 @@ func (s *ExecutionEngine) SetConsensus(consensus execution.FullConsensusClient)
s.consensus = consensus
}

func (s *ExecutionEngine) BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) {
if s.consensus != nil {
return s.consensus.BlockMetadataAtCount(count)
}
return nil, errors.New("FullConsensusClient is not accessible to execution")
}

func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher {
return s.consensus
}
Expand Down Expand Up @@ -265,6 +277,12 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost
s.resequenceChan <- oldMessages
resequencing = true
}
if s.reorgEventsReader != nil {
select {
case s.reorgEventsReader <- struct{}{}:
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
default:
}
}
return newMessagesResults, nil
}

Expand Down
Loading
Loading