From d60de53ded7930256a5f2de3cd693015c83863ce Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 24 Oct 2024 18:32:32 +0530 Subject: [PATCH 01/11] Timeboost Bulk Metadata API --- arbnode/node.go | 4 ++ execution/gethexec/api.go | 23 ++++++- execution/gethexec/blockmetadata.go | 67 +++++++++++++++++++++ execution/gethexec/executionengine.go | 7 +++ execution/gethexec/node.go | 7 ++- execution/interface.go | 1 + system_tests/common_test.go | 1 + system_tests/timeboost_test.go | 86 +++++++++++++++++++++++++++ 8 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 execution/gethexec/blockmetadata.go diff --git a/arbnode/node.go b/arbnode/node.go index a8cee03bbc..705a48da08 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -1034,3 +1034,7 @@ func (n *Node) ValidatedMessageCount() (arbutil.MessageIndex, error) { } return n.BlockValidator.GetValidated(), nil } + +func (n *Node) BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) { + return n.TxStreamer.BlockMetadataAtCount(count) +} diff --git a/execution/gethexec/api.go b/execution/gethexec/api.go index c32e0c0064..7492cf04b3 100644 --- a/execution/gethexec/api.go +++ b/execution/gethexec/api.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" @@ -25,17 +26,33 @@ import ( ) type ArbAPI struct { - txPublisher TransactionPublisher + txPublisher TransactionPublisher + bulkBlockMetadataFetcher *BulkBlockMetadataFetcher +} + +func NewArbAPI(publisher TransactionPublisher, bulkBlockMetadataFetcher *BulkBlockMetadataFetcher) *ArbAPI { + return &ArbAPI{ + txPublisher: publisher, + bulkBlockMetadataFetcher: bulkBlockMetadataFetcher, + } } -func NewArbAPI(publisher TransactionPublisher) *ArbAPI { - return &ArbAPI{publisher} +type NumberAndBlockMetadata struct { + BlockNumber uint64 `json:"blockNumber"` + RawMetadata hexutil.Bytes `json:"rawMetadata"` } func (a *ArbAPI) CheckPublisherHealth(ctx context.Context) error { return a.txPublisher.CheckHealth(ctx) } +func (a *ArbAPI) GetRawBlockMetadata(ctx context.Context, fromBlock, toBlock hexutil.Uint64) ([]NumberAndBlockMetadata, error) { + if a.bulkBlockMetadataFetcher == nil { + return nil, errors.New("arb_getRawBlockMetadata is not available") + } + return a.bulkBlockMetadataFetcher.Fetch(fromBlock, toBlock) +} + type ArbTimeboostAuctioneerAPI struct { txPublisher TransactionPublisher } diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go new file mode 100644 index 0000000000..d538596946 --- /dev/null +++ b/execution/gethexec/blockmetadata.go @@ -0,0 +1,67 @@ +package gethexec + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/util/containers" +) + +type BlockMetadataFetcher interface { + BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) + BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) + MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 +} + +type BulkBlockMetadataFetcher struct { + fetcher BlockMetadataFetcher + cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] +} + +func NewBulkBlockMetadataFetcher(fetcher BlockMetadataFetcher, cacheSize int) *BulkBlockMetadataFetcher { + var cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] + if cacheSize != 0 { + cache = containers.NewLruCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize) + } + return &BulkBlockMetadataFetcher{ + fetcher: fetcher, + cache: cache, + } +} + +func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock hexutil.Uint64) ([]NumberAndBlockMetadata, error) { + start, err := b.fetcher.BlockNumberToMessageIndex(uint64(fromBlock)) + if err != nil { + return nil, fmt.Errorf("error converting fromBlock blocknumber to message index: %w", err) + } + end, err := b.fetcher.BlockNumberToMessageIndex(uint64(toBlock)) + if err != nil { + return nil, fmt.Errorf("error converting toBlock blocknumber to message index: %w", err) + } + var result []NumberAndBlockMetadata + for i := start; i <= end; i++ { + var data arbostypes.BlockMetadata + var found bool + if b.cache != nil { + data, found = b.cache.Get(i) + } + if !found { + data, err = b.fetcher.BlockMetadataAtCount(i + 1) + if err != nil { + return nil, err + } + if data != nil && b.cache != nil { + b.cache.Add(i, data) + } + } + if data != nil { + result = append(result, NumberAndBlockMetadata{ + BlockNumber: b.fetcher.MessageIndexToBlockNumber(i), + RawMetadata: (hexutil.Bytes)(data), + }) + } + } + return result, nil +} diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index b5257c4efc..105947414a 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -212,6 +212,13 @@ func (s *ExecutionEngine) SetConsensus(consensus execution.FullConsensusClient) s.consensus = consensus } +func (s *ExecutionEngine) BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) { + if s.consensus != nil { + return s.consensus.BlockMetadataAtCount(count) + } + return nil, errors.New("FullConsensusClient is not accessible to execution") +} + func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher { return s.consensus } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index b751de4288..2b5c62a7b8 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -60,6 +60,7 @@ type Config struct { EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` StylusTarget StylusTargetConfig `koanf:"stylus-target"` + BlockMetadataApiCacheSize int `koanf:"block-metadata-api-cache-size"` forwardingTarget string } @@ -99,6 +100,9 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".tx-lookup-limit", ConfigDefault.TxLookupLimit, "retain the ability to lookup transactions by hash for the past N blocks (0 = all blocks)") f.Bool(prefix+".enable-prefetch-block", ConfigDefault.EnablePrefetchBlock, "enable prefetching of blocks") StylusTargetConfigAddOptions(prefix+".stylus-target", f) + f.Int(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size of lru cache storing the blockMetadata to service arb_getRawBlockMetadata.\n"+ + "Note: setting a non-zero value would mean the blockMetadata might be outdated (if the block was reorged out).\n"+ + "Default is set to 0 which disables caching") } var ConfigDefault = Config{ @@ -114,6 +118,7 @@ var ConfigDefault = Config{ Forwarder: DefaultNodeForwarderConfig, EnablePrefetchBlock: true, StylusTarget: DefaultStylusTargetConfig, + BlockMetadataApiCacheSize: 0, } type ConfigFetcher func() *Config @@ -221,7 +226,7 @@ func CreateExecutionNode( apis := []rpc.API{{ Namespace: "arb", Version: "1.0", - Service: NewArbAPI(txPublisher), + Service: NewArbAPI(txPublisher, NewBulkBlockMetadataFetcher(execEngine, config.BlockMetadataApiCacheSize)), Public: false, }} apis = append(apis, rpc.API{ diff --git a/execution/interface.go b/execution/interface.go index 94c60a31bb..dbe2927ecf 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -99,4 +99,5 @@ type FullConsensusClient interface { BatchFetcher ConsensusInfo ConsensusSequencer + BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 6e7375a921..8d4d01eebf 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -480,6 +480,7 @@ func (b *NodeBuilder) RestartL2Node(t *testing.T) { l2.Client = client l2.ExecNode = execNode l2.cleanup = func() { b.L2.ConsensusNode.StopAndWait() } + l2.Stack = stack b.L2 = l2 b.L2Info = l2info diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index c24885cb60..29ec22a8d3 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -1,8 +1,10 @@ package arbtest import ( + "bytes" "context" "crypto/ecdsa" + "encoding/binary" "fmt" "math/big" "net" @@ -41,6 +43,90 @@ import ( "github.com/stretchr/testify/require" ) +func TestTimeboosBulkBlockMetadataAPI(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + builder := NewNodeBuilder(ctx).DefaultConfig(t, false) + cleanup := builder.Build(t) + defer cleanup() + + arbDb := builder.L2.ConsensusNode.ArbDB + dbKey := func(prefix []byte, pos uint64) []byte { + var key []byte + key = append(key, prefix...) + data := make([]byte, 8) + binary.BigEndian.PutUint64(data, pos) + key = append(key, data...) + return key + } + + start := 1 + end := 20 + var sampleBulkData []gethexec.NumberAndBlockMetadata + for i := start; i <= end; i += 2 { + sampleData := gethexec.NumberAndBlockMetadata{ + BlockNumber: uint64(i), + RawMetadata: []byte{0, uint8(i)}, + } + sampleBulkData = append(sampleBulkData, sampleData) + arbDb.Put(dbKey([]byte("t"), sampleData.BlockNumber), sampleData.RawMetadata) + } + + l2rpc := builder.L2.Stack.Attach() + var result []gethexec.NumberAndBlockMetadata + err := l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(start), hexutil.Uint64(end)) + Require(t, err) + + if len(result) != len(sampleBulkData) { + t.Fatalf("number of entries in arb_getRawBlockMetadata is incorrect. Got: %d, Want: %d", len(result), len(sampleBulkData)) + } + for i, data := range result { + if data.BlockNumber != sampleBulkData[i].BlockNumber { + t.Fatalf("BlockNumber mismatch. Got: %d, Want: %d", data.BlockNumber, sampleBulkData[i].BlockNumber) + } + if !bytes.Equal(data.RawMetadata, sampleBulkData[i].RawMetadata) { + t.Fatalf("RawMetadata. Got: %s, Want: %s", data.RawMetadata, sampleBulkData[i].RawMetadata) + } + } + + // Test that without cache the result returned is always in sync with ArbDB + sampleBulkData[0].RawMetadata = []byte{1, 11} + arbDb.Put(dbKey([]byte("t"), 1), sampleBulkData[0].RawMetadata) + + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(1), hexutil.Uint64(1)) + Require(t, err) + if len(result) != 1 { + t.Fatal("result returned with more than one entry") + } + if !bytes.Equal(sampleBulkData[0].RawMetadata, result[0].RawMetadata) { + t.Fatal("BlockMetadata gotten from API doesn't match the latest entry in ArbDB") + } + + // Test that LRU caching works + builder.execConfig.BlockMetadataApiCacheSize = 10 + builder.RestartL2Node(t) + l2rpc = builder.L2.Stack.Attach() + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(start), hexutil.Uint64(end)) + Require(t, err) + + arbDb = builder.L2.ConsensusNode.ArbDB + updatedBlockMetadata := []byte{2, 12} + arbDb.Put(dbKey([]byte("t"), 1), updatedBlockMetadata) + + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(1), hexutil.Uint64(1)) + Require(t, err) + if len(result) != 1 { + t.Fatal("result returned with more than one entry") + } + if bytes.Equal(updatedBlockMetadata, result[0].RawMetadata) { + t.Fatal("BlockMetadata should've been fetched from cache and not the db") + } + if !bytes.Equal(sampleBulkData[0].RawMetadata, result[0].RawMetadata) { + t.Fatal("incorrect caching of BlockMetadata") + } +} + func TestSequencerFeed_ExpressLaneAuction_ExpressLaneTxsHaveAdvantage_TimeboostedFieldIsCorrect(t *testing.T) { t.Parallel() From 712b3ad1549352427dfeba949e0ca9a55c0dd480 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 24 Oct 2024 18:42:18 +0530 Subject: [PATCH 02/11] validate BlockMetadataApiCacheSize --- execution/gethexec/node.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 2b5c62a7b8..5552999ab9 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -83,6 +83,9 @@ func (c *Config) Validate() error { if c.forwardingTarget != "" && c.Sequencer.Enable { return errors.New("ForwardingTarget set and sequencer enabled") } + if c.BlockMetadataApiCacheSize < 0 { + return errors.New("block-metadata-api-cache-size cannot be negative") + } return nil } From 952cd376759dbd92ac868d3d68a202b6867f2e9f Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Fri, 25 Oct 2024 16:10:38 +0530 Subject: [PATCH 03/11] move BlockMetadataAtCount into ConsensusInfo --- execution/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/execution/interface.go b/execution/interface.go index dbe2927ecf..01f71d4422 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -83,6 +83,7 @@ type ConsensusInfo interface { Synced() bool FullSyncProgressMap() map[string]interface{} SyncTargetMessageCount() arbutil.MessageIndex + BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) // TODO: switch from pulling to pushing safe/finalized GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) @@ -99,5 +100,4 @@ type FullConsensusClient interface { BatchFetcher ConsensusInfo ConsensusSequencer - BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) } From 857823c655cae1a5059dcef816f913f217b15d58 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 28 Oct 2024 12:39:40 +0530 Subject: [PATCH 04/11] update arb_getRawBlockMetadata to accept rpc.BlockNumber inputs instead of Uint64s --- execution/gethexec/api.go | 2 +- execution/gethexec/blockmetadata.go | 10 +++++++-- execution/gethexec/node.go | 2 +- system_tests/timeboost_test.go | 33 +++++++++++++++++++++-------- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/execution/gethexec/api.go b/execution/gethexec/api.go index 7492cf04b3..1edfcbfdb9 100644 --- a/execution/gethexec/api.go +++ b/execution/gethexec/api.go @@ -46,7 +46,7 @@ func (a *ArbAPI) CheckPublisherHealth(ctx context.Context) error { return a.txPublisher.CheckHealth(ctx) } -func (a *ArbAPI) GetRawBlockMetadata(ctx context.Context, fromBlock, toBlock hexutil.Uint64) ([]NumberAndBlockMetadata, error) { +func (a *ArbAPI) GetRawBlockMetadata(ctx context.Context, fromBlock, toBlock rpc.BlockNumber) ([]NumberAndBlockMetadata, error) { if a.bulkBlockMetadataFetcher == nil { return nil, errors.New("arb_getRawBlockMetadata is not available") } diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go index d538596946..76a064aa50 100644 --- a/execution/gethexec/blockmetadata.go +++ b/execution/gethexec/blockmetadata.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/util/containers" @@ -16,22 +18,26 @@ type BlockMetadataFetcher interface { } type BulkBlockMetadataFetcher struct { + bc *core.BlockChain fetcher BlockMetadataFetcher cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] } -func NewBulkBlockMetadataFetcher(fetcher BlockMetadataFetcher, cacheSize int) *BulkBlockMetadataFetcher { +func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize int) *BulkBlockMetadataFetcher { var cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] if cacheSize != 0 { cache = containers.NewLruCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize) } return &BulkBlockMetadataFetcher{ + bc: bc, fetcher: fetcher, cache: cache, } } -func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock hexutil.Uint64) ([]NumberAndBlockMetadata, error) { +func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([]NumberAndBlockMetadata, error) { + fromBlock, _ = b.bc.ClipToPostNitroGenesis(fromBlock) + toBlock, _ = b.bc.ClipToPostNitroGenesis(toBlock) start, err := b.fetcher.BlockNumberToMessageIndex(uint64(fromBlock)) if err != nil { return nil, fmt.Errorf("error converting fromBlock blocknumber to message index: %w", err) diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 5552999ab9..477c3a0e2b 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -229,7 +229,7 @@ func CreateExecutionNode( apis := []rpc.API{{ Namespace: "arb", Version: "1.0", - Service: NewArbAPI(txPublisher, NewBulkBlockMetadataFetcher(execEngine, config.BlockMetadataApiCacheSize)), + Service: NewArbAPI(txPublisher, NewBulkBlockMetadataFetcher(l2BlockChain, execEngine, config.BlockMetadataApiCacheSize)), Public: false, }} apis = append(apis, rpc.API{ diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 29ec22a8d3..e1ca3fc52b 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode" + "github.com/offchainlabs/nitro/arbos/util" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" "github.com/offchainlabs/nitro/broadcaster/message" @@ -43,7 +44,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestTimeboosBulkBlockMetadataAPI(t *testing.T) { +func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -52,8 +53,9 @@ func TestTimeboosBulkBlockMetadataAPI(t *testing.T) { defer cleanup() arbDb := builder.L2.ConsensusNode.ArbDB - dbKey := func(prefix []byte, pos uint64) []byte { + blockMetadataInputFeedKey := func(pos uint64) []byte { var key []byte + prefix := []byte("t") key = append(key, prefix...) data := make([]byte, 8) binary.BigEndian.PutUint64(data, pos) @@ -61,8 +63,21 @@ func TestTimeboosBulkBlockMetadataAPI(t *testing.T) { return key } + // Generate blocks until current block is end start := 1 end := 20 + builder.L2Info.GenerateAccount("User") + user := builder.L2Info.GetDefaultTransactOpts("User", ctx) + for i := 0; ; i++ { + builder.L2.TransferBalanceTo(t, "Owner", util.RemapL1Address(user.From), big.NewInt(1e18), builder.L2Info) + latestL2, err := builder.L2.Client.BlockNumber(ctx) + Require(t, err) + // Clean BlockMetadata from arbDB so that we can modify it at will + Require(t, arbDb.Delete(blockMetadataInputFeedKey(latestL2))) + if latestL2 > uint64(end) { + break + } + } var sampleBulkData []gethexec.NumberAndBlockMetadata for i := start; i <= end; i += 2 { sampleData := gethexec.NumberAndBlockMetadata{ @@ -70,12 +85,12 @@ func TestTimeboosBulkBlockMetadataAPI(t *testing.T) { RawMetadata: []byte{0, uint8(i)}, } sampleBulkData = append(sampleBulkData, sampleData) - arbDb.Put(dbKey([]byte("t"), sampleData.BlockNumber), sampleData.RawMetadata) + arbDb.Put(blockMetadataInputFeedKey(sampleData.BlockNumber), sampleData.RawMetadata) } l2rpc := builder.L2.Stack.Attach() var result []gethexec.NumberAndBlockMetadata - err := l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(start), hexutil.Uint64(end)) + err := l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(start), "latest") // Test rpc.BlockNumber feature, send "latest" as an arg instead of blockNumber Require(t, err) if len(result) != len(sampleBulkData) { @@ -92,9 +107,9 @@ func TestTimeboosBulkBlockMetadataAPI(t *testing.T) { // Test that without cache the result returned is always in sync with ArbDB sampleBulkData[0].RawMetadata = []byte{1, 11} - arbDb.Put(dbKey([]byte("t"), 1), sampleBulkData[0].RawMetadata) + arbDb.Put(blockMetadataInputFeedKey(1), sampleBulkData[0].RawMetadata) - err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(1), hexutil.Uint64(1)) + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(1), rpc.BlockNumber(1)) Require(t, err) if len(result) != 1 { t.Fatal("result returned with more than one entry") @@ -107,14 +122,14 @@ func TestTimeboosBulkBlockMetadataAPI(t *testing.T) { builder.execConfig.BlockMetadataApiCacheSize = 10 builder.RestartL2Node(t) l2rpc = builder.L2.Stack.Attach() - err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(start), hexutil.Uint64(end)) + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(start), rpc.BlockNumber(end)) Require(t, err) arbDb = builder.L2.ConsensusNode.ArbDB updatedBlockMetadata := []byte{2, 12} - arbDb.Put(dbKey([]byte("t"), 1), updatedBlockMetadata) + arbDb.Put(blockMetadataInputFeedKey(1), updatedBlockMetadata) - err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", hexutil.Uint64(1), hexutil.Uint64(1)) + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(1), rpc.BlockNumber(1)) Require(t, err) if len(result) != 1 { t.Fatal("result returned with more than one entry") From 851d2bdea572cb1e9e64e8521e37edd48a43a9b6 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 28 Oct 2024 13:14:55 +0530 Subject: [PATCH 05/11] protect BulkBlockMetadataFetcher's cache with mutex --- execution/gethexec/blockmetadata.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go index 76a064aa50..def22a6349 100644 --- a/execution/gethexec/blockmetadata.go +++ b/execution/gethexec/blockmetadata.go @@ -2,6 +2,7 @@ package gethexec import ( "fmt" + "sync" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" @@ -18,9 +19,10 @@ type BlockMetadataFetcher interface { } type BulkBlockMetadataFetcher struct { - bc *core.BlockChain - fetcher BlockMetadataFetcher - cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] + bc *core.BlockChain + fetcher BlockMetadataFetcher + cacheMutex sync.RWMutex + cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] } func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize int) *BulkBlockMetadataFetcher { @@ -51,7 +53,9 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] var data arbostypes.BlockMetadata var found bool if b.cache != nil { + b.cacheMutex.RLock() data, found = b.cache.Get(i) + b.cacheMutex.RUnlock() } if !found { data, err = b.fetcher.BlockMetadataAtCount(i + 1) @@ -59,7 +63,9 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] return nil, err } if data != nil && b.cache != nil { + b.cacheMutex.Lock() b.cache.Add(i, data) + b.cacheMutex.Unlock() } } if data != nil { From 4c07677817d2910028dd14232bab125d2a4e765e Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 28 Oct 2024 14:04:32 +0530 Subject: [PATCH 06/11] Enable caching of blockMetadata by default and clear cache when reorgs are detected --- execution/gethexec/blockmetadata.go | 40 ++++++++++++++--- execution/gethexec/executionengine.go | 17 +++++-- execution/gethexec/node.go | 64 ++++++++++++++------------- system_tests/timeboost_test.go | 12 +++++ util/stopwaiter/stopwaiter.go | 20 +++++++++ 5 files changed, 113 insertions(+), 40 deletions(-) diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go index def22a6349..cdde5751c9 100644 --- a/execution/gethexec/blockmetadata.go +++ b/execution/gethexec/blockmetadata.go @@ -1,6 +1,7 @@ package gethexec import ( + "context" "fmt" "sync" @@ -10,30 +11,38 @@ import ( "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/util/containers" + "github.com/offchainlabs/nitro/util/stopwaiter" ) type BlockMetadataFetcher interface { BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 + SetReorgEventsReader(reorgEventsReader chan struct{}) } type BulkBlockMetadataFetcher struct { - bc *core.BlockChain - fetcher BlockMetadataFetcher - cacheMutex sync.RWMutex - cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] + stopwaiter.StopWaiter + bc *core.BlockChain + fetcher BlockMetadataFetcher + reorgDetector chan struct{} + cacheMutex sync.RWMutex + cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] } func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize int) *BulkBlockMetadataFetcher { var cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] + var reorgDetector chan struct{} if cacheSize != 0 { cache = containers.NewLruCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize) + reorgDetector = make(chan struct{}) + fetcher.SetReorgEventsReader(reorgDetector) } return &BulkBlockMetadataFetcher{ - bc: bc, - fetcher: fetcher, - cache: cache, + bc: bc, + fetcher: fetcher, + cache: cache, + reorgDetector: reorgDetector, } } @@ -77,3 +86,20 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] } return result, nil } + +func (b *BulkBlockMetadataFetcher) ClearCache(ctx context.Context, ignored struct{}) { + b.cacheMutex.Lock() + b.cache.Clear() + b.cacheMutex.Unlock() +} + +func (b *BulkBlockMetadataFetcher) Start(ctx context.Context) { + b.StopWaiter.Start(ctx, b) + if b.reorgDetector != nil { + stopwaiter.CallWhenTriggeredWith[struct{}](&b.StopWaiterSafe, b.ClearCache, b.reorgDetector) + } +} + +func (b *BulkBlockMetadataFetcher) StopAndWait() { + b.StopWaiter.StopAndWait() +} diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 105947414a..b440b1c4b7 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -80,9 +80,10 @@ type ExecutionEngine struct { resequenceChan chan []*arbostypes.MessageWithMetadata createBlocksMutex sync.Mutex - newBlockNotifier chan struct{} - latestBlockMutex sync.Mutex - latestBlock *types.Block + newBlockNotifier chan struct{} + reorgEventsReader chan struct{} + latestBlockMutex sync.Mutex + latestBlock *types.Block nextScheduledVersionCheck time.Time // protected by the createBlocksMutex @@ -134,6 +135,10 @@ func (s *ExecutionEngine) backlogL1GasCharged() uint64 { s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) } +func (s *ExecutionEngine) SetReorgEventsReader(reorgEventsReader chan struct{}) { + s.reorgEventsReader = reorgEventsReader +} + func (s *ExecutionEngine) MarkFeedStart(to arbutil.MessageIndex) { s.cachedL1PriceData.mutex.Lock() defer s.cachedL1PriceData.mutex.Unlock() @@ -272,6 +277,12 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost s.resequenceChan <- oldMessages resequencing = true } + if s.reorgEventsReader != nil { + select { + case s.reorgEventsReader <- struct{}{}: + default: + } + } return newMessagesResults, nil } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 477c3a0e2b..3aae4677cc 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -103,9 +103,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".tx-lookup-limit", ConfigDefault.TxLookupLimit, "retain the ability to lookup transactions by hash for the past N blocks (0 = all blocks)") f.Bool(prefix+".enable-prefetch-block", ConfigDefault.EnablePrefetchBlock, "enable prefetching of blocks") StylusTargetConfigAddOptions(prefix+".stylus-target", f) - f.Int(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size of lru cache storing the blockMetadata to service arb_getRawBlockMetadata.\n"+ - "Note: setting a non-zero value would mean the blockMetadata might be outdated (if the block was reorged out).\n"+ - "Default is set to 0 which disables caching") + f.Int(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size of lru cache storing the blockMetadata to service arb_getRawBlockMetadata") } var ConfigDefault = Config{ @@ -121,25 +119,26 @@ var ConfigDefault = Config{ Forwarder: DefaultNodeForwarderConfig, EnablePrefetchBlock: true, StylusTarget: DefaultStylusTargetConfig, - BlockMetadataApiCacheSize: 0, + BlockMetadataApiCacheSize: 10000, } type ConfigFetcher func() *Config type ExecutionNode struct { - ChainDB ethdb.Database - Backend *arbitrum.Backend - FilterSystem *filters.FilterSystem - ArbInterface *ArbInterface - ExecEngine *ExecutionEngine - Recorder *BlockRecorder - Sequencer *Sequencer // either nil or same as TxPublisher - TxPublisher TransactionPublisher - ConfigFetcher ConfigFetcher - SyncMonitor *SyncMonitor - ParentChainReader *headerreader.HeaderReader - ClassicOutbox *ClassicOutboxRetriever - started atomic.Bool + ChainDB ethdb.Database + Backend *arbitrum.Backend + FilterSystem *filters.FilterSystem + ArbInterface *ArbInterface + ExecEngine *ExecutionEngine + Recorder *BlockRecorder + Sequencer *Sequencer // either nil or same as TxPublisher + TxPublisher TransactionPublisher + ConfigFetcher ConfigFetcher + SyncMonitor *SyncMonitor + ParentChainReader *headerreader.HeaderReader + ClassicOutbox *ClassicOutboxRetriever + started atomic.Bool + bulkBlockMetadataFetcher *BulkBlockMetadataFetcher } func CreateExecutionNode( @@ -226,10 +225,12 @@ func CreateExecutionNode( } } + bulkBlockMetadataFetcher := NewBulkBlockMetadataFetcher(l2BlockChain, execEngine, config.BlockMetadataApiCacheSize) + apis := []rpc.API{{ Namespace: "arb", Version: "1.0", - Service: NewArbAPI(txPublisher, NewBulkBlockMetadataFetcher(l2BlockChain, execEngine, config.BlockMetadataApiCacheSize)), + Service: NewArbAPI(txPublisher, bulkBlockMetadataFetcher), Public: false, }} apis = append(apis, rpc.API{ @@ -273,18 +274,19 @@ func CreateExecutionNode( stack.RegisterAPIs(apis) return &ExecutionNode{ - ChainDB: chainDB, - Backend: backend, - FilterSystem: filterSystem, - ArbInterface: arbInterface, - ExecEngine: execEngine, - Recorder: recorder, - Sequencer: sequencer, - TxPublisher: txPublisher, - ConfigFetcher: configFetcher, - SyncMonitor: syncMon, - ParentChainReader: parentChainReader, - ClassicOutbox: classicOutbox, + ChainDB: chainDB, + Backend: backend, + FilterSystem: filterSystem, + ArbInterface: arbInterface, + ExecEngine: execEngine, + Recorder: recorder, + Sequencer: sequencer, + TxPublisher: txPublisher, + ConfigFetcher: configFetcher, + SyncMonitor: syncMon, + ParentChainReader: parentChainReader, + ClassicOutbox: classicOutbox, + bulkBlockMetadataFetcher: bulkBlockMetadataFetcher, }, nil } @@ -333,6 +335,7 @@ func (n *ExecutionNode) Start(ctx context.Context) error { if n.ParentChainReader != nil { n.ParentChainReader.Start(ctx) } + n.bulkBlockMetadataFetcher.Start(ctx) return nil } @@ -340,6 +343,7 @@ func (n *ExecutionNode) StopAndWait() { if !n.started.Load() { return } + n.bulkBlockMetadataFetcher.StopAndWait() // TODO after separation // n.Stack.StopRPC() // does nothing if not running if n.TxPublisher.Started() { diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index e1ca3fc52b..ea20b16a4d 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -49,6 +49,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { defer cancel() builder := NewNodeBuilder(ctx).DefaultConfig(t, false) + builder.execConfig.BlockMetadataApiCacheSize = 0 // Caching is disabled cleanup := builder.Build(t) defer cleanup() @@ -140,6 +141,17 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { if !bytes.Equal(sampleBulkData[0].RawMetadata, result[0].RawMetadata) { t.Fatal("incorrect caching of BlockMetadata") } + + // A Reorg event should clear the cache, hence the data fetched now should be accurate + builder.L2.ConsensusNode.TxStreamer.ReorgTo(10) + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(start), rpc.BlockNumber(end)) + Require(t, err) + if len(result) != 5 { + t.Fatalf("Reorg should've cleared out messages starting at number 10. Want: 5, Got: %d", len(result)) + } + if !bytes.Equal(updatedBlockMetadata, result[0].RawMetadata) { + t.Fatal("BlockMetadata should've been fetched from db and not the cache") + } } func TestSequencerFeed_ExpressLaneAuction_ExpressLaneTxsHaveAdvantage_TimeboostedFieldIsCorrect(t *testing.T) { diff --git a/util/stopwaiter/stopwaiter.go b/util/stopwaiter/stopwaiter.go index 1e70e328eb..23a0cc1000 100644 --- a/util/stopwaiter/stopwaiter.go +++ b/util/stopwaiter/stopwaiter.go @@ -251,6 +251,26 @@ func CallIterativelyWith[T any]( }) } +func CallWhenTriggeredWith[T any]( + s ThreadLauncher, + foo func(context.Context, T), + triggerChan <-chan T, +) error { + return s.LaunchThreadSafe(func(ctx context.Context) { + for { + if ctx.Err() != nil { + return + } + select { + case <-ctx.Done(): + return + case val := <-triggerChan: + foo(ctx, val) + } + } + }) +} + func LaunchPromiseThread[T any]( s ThreadLauncher, foo func(context.Context) (T, error), From 18cddc5fa28c5bfa9554928d05112dc063ccbb59 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 28 Oct 2024 20:46:37 +0530 Subject: [PATCH 07/11] address PR comments --- execution/gethexec/blockmetadata.go | 13 +++++- execution/gethexec/node.go | 62 ++++++++++++++++------------- system_tests/timeboost_test.go | 10 ++++- 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go index cdde5751c9..28ec7a1eb5 100644 --- a/execution/gethexec/blockmetadata.go +++ b/execution/gethexec/blockmetadata.go @@ -2,6 +2,7 @@ package gethexec import ( "context" + "errors" "fmt" "sync" @@ -14,6 +15,8 @@ import ( "github.com/offchainlabs/nitro/util/stopwaiter" ) +var ErrBlockMetadataApiBlocksLimitExceeded = errors.New("number of blocks requested for blockMetadata exceeded") + type BlockMetadataFetcher interface { BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) @@ -26,11 +29,12 @@ type BulkBlockMetadataFetcher struct { bc *core.BlockChain fetcher BlockMetadataFetcher reorgDetector chan struct{} + blocksLimit int cacheMutex sync.RWMutex cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] } -func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize int) *BulkBlockMetadataFetcher { +func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize, blocksLimit int) *BulkBlockMetadataFetcher { var cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] var reorgDetector chan struct{} if cacheSize != 0 { @@ -43,6 +47,7 @@ func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetch fetcher: fetcher, cache: cache, reorgDetector: reorgDetector, + blocksLimit: blocksLimit, } } @@ -57,6 +62,12 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] if err != nil { return nil, fmt.Errorf("error converting toBlock blocknumber to message index: %w", err) } + if start > end { + return nil, fmt.Errorf("invalid inputs, fromBlock: %d is greater than toBlock: %d", fromBlock, toBlock) + } + if b.blocksLimit > 0 && end-start+1 > arbutil.MessageIndex(b.blocksLimit) { + return nil, fmt.Errorf("%w. Range requested- %d", ErrBlockMetadataApiBlocksLimitExceeded, end-start+1) + } var result []NumberAndBlockMetadata for i := start; i <= end; i++ { var data arbostypes.BlockMetadata diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 3aae4677cc..32e43874f2 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -47,20 +47,21 @@ func StylusTargetConfigAddOptions(prefix string, f *flag.FlagSet) { } type Config struct { - ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` - Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"` - RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"` - TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"` - Forwarder ForwarderConfig `koanf:"forwarder"` - ForwardingTarget string `koanf:"forwarding-target"` - SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"` - Caching CachingConfig `koanf:"caching"` - RPC arbitrum.Config `koanf:"rpc"` - TxLookupLimit uint64 `koanf:"tx-lookup-limit"` - EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` - SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` - StylusTarget StylusTargetConfig `koanf:"stylus-target"` - BlockMetadataApiCacheSize int `koanf:"block-metadata-api-cache-size"` + ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` + Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"` + RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"` + TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"` + Forwarder ForwarderConfig `koanf:"forwarder"` + ForwardingTarget string `koanf:"forwarding-target"` + SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"` + Caching CachingConfig `koanf:"caching"` + RPC arbitrum.Config `koanf:"rpc"` + TxLookupLimit uint64 `koanf:"tx-lookup-limit"` + EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` + SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` + StylusTarget StylusTargetConfig `koanf:"stylus-target"` + BlockMetadataApiCacheSize int `koanf:"block-metadata-api-cache-size"` + BlockMetadataApiBlocksLimit int `koanf:"block-metadata-api-blocks-limit"` forwardingTarget string } @@ -86,6 +87,9 @@ func (c *Config) Validate() error { if c.BlockMetadataApiCacheSize < 0 { return errors.New("block-metadata-api-cache-size cannot be negative") } + if c.BlockMetadataApiBlocksLimit < 0 { + return errors.New("block-metadata-api-blocks-limit cannot be negative") + } return nil } @@ -104,22 +108,24 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable-prefetch-block", ConfigDefault.EnablePrefetchBlock, "enable prefetching of blocks") StylusTargetConfigAddOptions(prefix+".stylus-target", f) f.Int(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size of lru cache storing the blockMetadata to service arb_getRawBlockMetadata") + f.Int(prefix+".block-metadata-api-blocks-limit", ConfigDefault.BlockMetadataApiBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query. Enabled by default, set 0 to disable") } var ConfigDefault = Config{ - RPC: arbitrum.DefaultConfig, - Sequencer: DefaultSequencerConfig, - ParentChainReader: headerreader.DefaultConfig, - RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig, - ForwardingTarget: "", - SecondaryForwardingTarget: []string{}, - TxPreChecker: DefaultTxPreCheckerConfig, - TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second - Caching: DefaultCachingConfig, - Forwarder: DefaultNodeForwarderConfig, - EnablePrefetchBlock: true, - StylusTarget: DefaultStylusTargetConfig, - BlockMetadataApiCacheSize: 10000, + RPC: arbitrum.DefaultConfig, + Sequencer: DefaultSequencerConfig, + ParentChainReader: headerreader.DefaultConfig, + RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig, + ForwardingTarget: "", + SecondaryForwardingTarget: []string{}, + TxPreChecker: DefaultTxPreCheckerConfig, + TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second + Caching: DefaultCachingConfig, + Forwarder: DefaultNodeForwarderConfig, + EnablePrefetchBlock: true, + StylusTarget: DefaultStylusTargetConfig, + BlockMetadataApiCacheSize: 10000, + BlockMetadataApiBlocksLimit: 100, } type ConfigFetcher func() *Config @@ -225,7 +231,7 @@ func CreateExecutionNode( } } - bulkBlockMetadataFetcher := NewBulkBlockMetadataFetcher(l2BlockChain, execEngine, config.BlockMetadataApiCacheSize) + bulkBlockMetadataFetcher := NewBulkBlockMetadataFetcher(l2BlockChain, execEngine, config.BlockMetadataApiCacheSize, config.BlockMetadataApiBlocksLimit) apis := []rpc.API{{ Namespace: "arb", diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index ea20b16a4d..75f63e5092 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -10,6 +10,7 @@ import ( "net" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -75,7 +76,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { Require(t, err) // Clean BlockMetadata from arbDB so that we can modify it at will Require(t, arbDb.Delete(blockMetadataInputFeedKey(latestL2))) - if latestL2 > uint64(end) { + if latestL2 > uint64(end)+10 { break } } @@ -121,6 +122,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { // Test that LRU caching works builder.execConfig.BlockMetadataApiCacheSize = 10 + builder.execConfig.BlockMetadataApiBlocksLimit = 25 builder.RestartL2Node(t) l2rpc = builder.L2.Stack.Attach() err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(start), rpc.BlockNumber(end)) @@ -142,6 +144,12 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { t.Fatal("incorrect caching of BlockMetadata") } + // Test that ErrBlockMetadataApiBlocksLimitExceeded is thrown when query range exceeds the limit + err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(start), rpc.BlockNumber(26)) + if !strings.Contains(err.Error(), gethexec.ErrBlockMetadataApiBlocksLimitExceeded.Error()) { + t.Fatalf("expecting ErrBlockMetadataApiBlocksLimitExceeded error, got: %v", err) + } + // A Reorg event should clear the cache, hence the data fetched now should be accurate builder.L2.ConsensusNode.TxStreamer.ReorgTo(10) err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(start), rpc.BlockNumber(end)) From 4383d5289fd970dd0adee257e81a46432cbd69ea Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 29 Oct 2024 12:53:16 +0530 Subject: [PATCH 08/11] small fix --- execution/gethexec/blockmetadata.go | 2 +- system_tests/timeboost_test.go | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go index 28ec7a1eb5..77b738c2f9 100644 --- a/execution/gethexec/blockmetadata.go +++ b/execution/gethexec/blockmetadata.go @@ -66,7 +66,7 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] return nil, fmt.Errorf("invalid inputs, fromBlock: %d is greater than toBlock: %d", fromBlock, toBlock) } if b.blocksLimit > 0 && end-start+1 > arbutil.MessageIndex(b.blocksLimit) { - return nil, fmt.Errorf("%w. Range requested- %d", ErrBlockMetadataApiBlocksLimitExceeded, end-start+1) + return nil, fmt.Errorf("%w. Range requested- %d, Limit- %d", ErrBlockMetadataApiBlocksLimitExceeded, end-start+1, b.blocksLimit) } var result []NumberAndBlockMetadata for i := start; i <= end; i++ { diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 75f63e5092..49d9d5fb16 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -154,9 +154,6 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { builder.L2.ConsensusNode.TxStreamer.ReorgTo(10) err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(start), rpc.BlockNumber(end)) Require(t, err) - if len(result) != 5 { - t.Fatalf("Reorg should've cleared out messages starting at number 10. Want: 5, Got: %d", len(result)) - } if !bytes.Equal(updatedBlockMetadata, result[0].RawMetadata) { t.Fatal("BlockMetadata should've been fetched from db and not the cache") } From 87af74894a2e4d4d56e789692a10490f0c078312 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 7 Nov 2024 11:33:59 +0530 Subject: [PATCH 09/11] address PR comments --- execution/gethexec/blockmetadata.go | 20 ++++++-------------- execution/gethexec/executionengine.go | 13 +++++++------ execution/gethexec/node.go | 16 +++++----------- go-ethereum | 2 +- system_tests/timeboost_test.go | 2 +- 5 files changed, 20 insertions(+), 33 deletions(-) diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go index 77b738c2f9..e2b0f56db6 100644 --- a/execution/gethexec/blockmetadata.go +++ b/execution/gethexec/blockmetadata.go @@ -4,14 +4,13 @@ import ( "context" "errors" "fmt" - "sync" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" - "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/stopwaiter" ) @@ -29,16 +28,15 @@ type BulkBlockMetadataFetcher struct { bc *core.BlockChain fetcher BlockMetadataFetcher reorgDetector chan struct{} - blocksLimit int - cacheMutex sync.RWMutex - cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] + blocksLimit uint64 + cache *lru.SizeConstrainedCache[arbutil.MessageIndex, arbostypes.BlockMetadata] } -func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize, blocksLimit int) *BulkBlockMetadataFetcher { - var cache *containers.LruCache[arbutil.MessageIndex, arbostypes.BlockMetadata] +func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetcher, cacheSize, blocksLimit uint64) *BulkBlockMetadataFetcher { + var cache *lru.SizeConstrainedCache[arbutil.MessageIndex, arbostypes.BlockMetadata] var reorgDetector chan struct{} if cacheSize != 0 { - cache = containers.NewLruCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize) + cache = lru.NewSizeConstrainedCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize) reorgDetector = make(chan struct{}) fetcher.SetReorgEventsReader(reorgDetector) } @@ -73,9 +71,7 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] var data arbostypes.BlockMetadata var found bool if b.cache != nil { - b.cacheMutex.RLock() data, found = b.cache.Get(i) - b.cacheMutex.RUnlock() } if !found { data, err = b.fetcher.BlockMetadataAtCount(i + 1) @@ -83,9 +79,7 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] return nil, err } if data != nil && b.cache != nil { - b.cacheMutex.Lock() b.cache.Add(i, data) - b.cacheMutex.Unlock() } } if data != nil { @@ -99,9 +93,7 @@ func (b *BulkBlockMetadataFetcher) Fetch(fromBlock, toBlock rpc.BlockNumber) ([] } func (b *BulkBlockMetadataFetcher) ClearCache(ctx context.Context, ignored struct{}) { - b.cacheMutex.Lock() b.cache.Clear() - b.cacheMutex.Unlock() } func (b *BulkBlockMetadataFetcher) Start(ctx context.Context) { diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index b440b1c4b7..87a8b510aa 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -258,6 +258,13 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost return nil, err } + if s.reorgEventsReader != nil { + select { + case s.reorgEventsReader <- struct{}{}: + default: + } + } + newMessagesResults := make([]*execution.MessageResult, 0, len(oldMessages)) for i := range newMessages { var msgForPrefetch *arbostypes.MessageWithMetadata @@ -277,12 +284,6 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost s.resequenceChan <- oldMessages resequencing = true } - if s.reorgEventsReader != nil { - select { - case s.reorgEventsReader <- struct{}{}: - default: - } - } return newMessagesResults, nil } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 32e43874f2..2f1623d10f 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -60,8 +60,8 @@ type Config struct { EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` StylusTarget StylusTargetConfig `koanf:"stylus-target"` - BlockMetadataApiCacheSize int `koanf:"block-metadata-api-cache-size"` - BlockMetadataApiBlocksLimit int `koanf:"block-metadata-api-blocks-limit"` + BlockMetadataApiCacheSize uint64 `koanf:"block-metadata-api-cache-size"` + BlockMetadataApiBlocksLimit uint64 `koanf:"block-metadata-api-blocks-limit"` forwardingTarget string } @@ -84,12 +84,6 @@ func (c *Config) Validate() error { if c.forwardingTarget != "" && c.Sequencer.Enable { return errors.New("ForwardingTarget set and sequencer enabled") } - if c.BlockMetadataApiCacheSize < 0 { - return errors.New("block-metadata-api-cache-size cannot be negative") - } - if c.BlockMetadataApiBlocksLimit < 0 { - return errors.New("block-metadata-api-blocks-limit cannot be negative") - } return nil } @@ -107,8 +101,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".tx-lookup-limit", ConfigDefault.TxLookupLimit, "retain the ability to lookup transactions by hash for the past N blocks (0 = all blocks)") f.Bool(prefix+".enable-prefetch-block", ConfigDefault.EnablePrefetchBlock, "enable prefetching of blocks") StylusTargetConfigAddOptions(prefix+".stylus-target", f) - f.Int(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size of lru cache storing the blockMetadata to service arb_getRawBlockMetadata") - f.Int(prefix+".block-metadata-api-blocks-limit", ConfigDefault.BlockMetadataApiBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query. Enabled by default, set 0 to disable") + f.Uint64(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size (in bytes) of lru cache storing the blockMetadata to service arb_getRawBlockMetadata") + f.Uint64(prefix+".block-metadata-api-blocks-limit", ConfigDefault.BlockMetadataApiBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query. Enabled by default, set 0 to disable the limit") } var ConfigDefault = Config{ @@ -124,7 +118,7 @@ var ConfigDefault = Config{ Forwarder: DefaultNodeForwarderConfig, EnablePrefetchBlock: true, StylusTarget: DefaultStylusTargetConfig, - BlockMetadataApiCacheSize: 10000, + BlockMetadataApiCacheSize: 100 * 1024 * 1024, BlockMetadataApiBlocksLimit: 100, } diff --git a/go-ethereum b/go-ethereum index 575062fad7..ecbb71b896 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 575062fad7ff4db9d7c235f49472f658be29e2fe +Subproject commit ecbb71b89683c1506c374b34d0fefd87ae6144e6 diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 49d9d5fb16..1e7ca47db4 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -121,7 +121,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { } // Test that LRU caching works - builder.execConfig.BlockMetadataApiCacheSize = 10 + builder.execConfig.BlockMetadataApiCacheSize = 1000 builder.execConfig.BlockMetadataApiBlocksLimit = 25 builder.RestartL2Node(t) l2rpc = builder.L2.Stack.Attach() From c0acac1418bbf804dd10fce6c6773396bbedb3df Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 7 Nov 2024 11:42:43 +0530 Subject: [PATCH 10/11] rename reorgEventsReader to reorgEventsNotifier --- execution/gethexec/blockmetadata.go | 4 ++-- execution/gethexec/executionengine.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/execution/gethexec/blockmetadata.go b/execution/gethexec/blockmetadata.go index e2b0f56db6..7605861ffe 100644 --- a/execution/gethexec/blockmetadata.go +++ b/execution/gethexec/blockmetadata.go @@ -20,7 +20,7 @@ type BlockMetadataFetcher interface { BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 - SetReorgEventsReader(reorgEventsReader chan struct{}) + SetReorgEventsNotifier(reorgEventsNotifier chan struct{}) } type BulkBlockMetadataFetcher struct { @@ -38,7 +38,7 @@ func NewBulkBlockMetadataFetcher(bc *core.BlockChain, fetcher BlockMetadataFetch if cacheSize != 0 { cache = lru.NewSizeConstrainedCache[arbutil.MessageIndex, arbostypes.BlockMetadata](cacheSize) reorgDetector = make(chan struct{}) - fetcher.SetReorgEventsReader(reorgDetector) + fetcher.SetReorgEventsNotifier(reorgDetector) } return &BulkBlockMetadataFetcher{ bc: bc, diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 87a8b510aa..126d5ad1b2 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -80,10 +80,10 @@ type ExecutionEngine struct { resequenceChan chan []*arbostypes.MessageWithMetadata createBlocksMutex sync.Mutex - newBlockNotifier chan struct{} - reorgEventsReader chan struct{} - latestBlockMutex sync.Mutex - latestBlock *types.Block + newBlockNotifier chan struct{} + reorgEventsNotifier chan struct{} + latestBlockMutex sync.Mutex + latestBlock *types.Block nextScheduledVersionCheck time.Time // protected by the createBlocksMutex @@ -135,8 +135,8 @@ func (s *ExecutionEngine) backlogL1GasCharged() uint64 { s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) } -func (s *ExecutionEngine) SetReorgEventsReader(reorgEventsReader chan struct{}) { - s.reorgEventsReader = reorgEventsReader +func (s *ExecutionEngine) SetReorgEventsNotifier(reorgEventsNotifier chan struct{}) { + s.reorgEventsNotifier = reorgEventsNotifier } func (s *ExecutionEngine) MarkFeedStart(to arbutil.MessageIndex) { @@ -258,9 +258,9 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost return nil, err } - if s.reorgEventsReader != nil { + if s.reorgEventsNotifier != nil { select { - case s.reorgEventsReader <- struct{}{}: + case s.reorgEventsNotifier <- struct{}{}: default: } } From feaf30681db8e7f38a172dda4d753b4515577d13 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 12 Nov 2024 11:52:30 +0530 Subject: [PATCH 11/11] address PR comments --- execution/gethexec/executionengine.go | 14 ++++++++++---- go-ethereum | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 126d5ad1b2..193e8e7414 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -135,10 +135,6 @@ func (s *ExecutionEngine) backlogL1GasCharged() uint64 { s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) } -func (s *ExecutionEngine) SetReorgEventsNotifier(reorgEventsNotifier chan struct{}) { - s.reorgEventsNotifier = reorgEventsNotifier -} - func (s *ExecutionEngine) MarkFeedStart(to arbutil.MessageIndex) { s.cachedL1PriceData.mutex.Lock() defer s.cachedL1PriceData.mutex.Unlock() @@ -187,6 +183,16 @@ func (s *ExecutionEngine) SetRecorder(recorder *BlockRecorder) { s.recorder = recorder } +func (s *ExecutionEngine) SetReorgEventsNotifier(reorgEventsNotifier chan struct{}) { + if s.Started() { + panic("trying to set reorg events notifier after start") + } + if s.reorgEventsNotifier != nil { + panic("trying to set reorg events notifier when already set") + } + s.reorgEventsNotifier = reorgEventsNotifier +} + func (s *ExecutionEngine) EnableReorgSequencing() { if s.Started() { panic("trying to enable reorg sequencing after start") diff --git a/go-ethereum b/go-ethereum index ecbb71b896..68cb86d1ec 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit ecbb71b89683c1506c374b34d0fefd87ae6144e6 +Subproject commit 68cb86d1eca03353375c24befaa7814f145d425a