From 80b323003d78e1df2c8af8288f518eac6e7c66b0 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Wed, 4 Sep 2024 14:54:34 -0700 Subject: [PATCH 1/4] remove acceptor queue (part 1) --- core/blockchain.go | 197 ++++++------------------------- core/blockchain_log_test.go | 1 - core/blockchain_repair_test.go | 1 - core/blockchain_snapshot_test.go | 1 - core/blockchain_test.go | 55 ++------- core/test_blockchain.go | 20 ---- core/txindexer_test.go | 5 - eth/backend.go | 1 - eth/ethconfig/config.go | 1 - eth/tracers/api_test.go | 1 - ethclient/simulated/backend.go | 1 - internal/ethapi/api_test.go | 1 - plugin/evm/config.go | 3 - plugin/evm/syncervm_test.go | 2 - plugin/evm/vm.go | 1 - plugin/evm/vm_warp_test.go | 3 - 16 files changed, 43 insertions(+), 251 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index a1272336e3..35ffce2221 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -85,7 +85,6 @@ var ( blockValidationTimer = metrics.NewRegisteredCounter("chain/block/validations/state", nil) blockWriteTimer = metrics.NewRegisteredCounter("chain/block/writes", nil) - acceptorQueueGauge = metrics.NewRegisteredGauge("chain/acceptor/queue/size", nil) acceptorWorkTimer = metrics.NewRegisteredCounter("chain/acceptor/work", nil) acceptorWorkCount = metrics.NewRegisteredCounter("chain/acceptor/work/count", nil) lastAcceptedBlockBaseFeeGauge = metrics.NewRegisteredGauge("chain/block/fee/basefee", nil) @@ -175,7 +174,6 @@ type CacheConfig struct { TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once CommitInterval uint64 // Commit the trie every [CommitInterval] blocks. Pruning bool // Whether to disable trie write caching and GC altogether (archive node) - AcceptorQueueLimit int // Blocks to queue before blocking during acceptance PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries. PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries. AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled @@ -221,7 +219,6 @@ var DefaultCacheConfig = &CacheConfig{ TriePrefetcherParallelism: 16, Pruning: true, CommitInterval: 4096, - AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay SnapshotLimit: 256, AcceptedCacheSize: 32, StateScheme: rawdb.HashScheme, @@ -305,25 +302,6 @@ type BlockChain struct { senderCacher *TxSenderCacher - // [acceptorQueue] is a processing queue for the Acceptor. This is - // different than [chainAcceptedFeed], which is sent an event after an accepted - // block is processed (after each loop of the accepted worker). If there is a - // clean shutdown, all items inserted into the [acceptorQueue] will be processed. - acceptorQueue chan *types.Block - - // [acceptorClosingLock], and [acceptorClosed] are used - // to synchronize the closing of the [acceptorQueue] channel. - // - // Because we can't check if a channel is closed without reading from it - // (which we don't want to do as we may remove a processing block), we need - // to use a second variable to ensure we don't close a closed channel. - acceptorClosingLock sync.RWMutex - acceptorClosed bool - - // [acceptorWg] is used to wait for the acceptorQueue to clear. This is used - // during shutdown and in tests. - acceptorWg sync.WaitGroup - // [wg] is used to wait for the async blockchain processes to finish on shutdown. wg sync.WaitGroup @@ -332,16 +310,6 @@ type BlockChain struct { // WaitGroups are used to ensure that async processes have finished during shutdown. quit chan struct{} - // [acceptorTip] is the last block processed by the acceptor. This is - // returned as the LastAcceptedBlock() to ensure clients get only fully - // processed blocks. This may be equal to [lastAccepted]. - acceptorTip *types.Block - acceptorTipLock sync.Mutex - - // [flattenLock] prevents the [acceptor] from flattening snapshots while - // a block is being verified. - flattenLock sync.Mutex - // [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs. acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log] @@ -395,7 +363,6 @@ func NewBlockChain( engine: engine, vmConfig: vmConfig, senderCacher: NewTxSenderCacher(runtime.NumCPU()), - acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), quit: make(chan struct{}), acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize), } @@ -422,13 +389,6 @@ func NewBlockChain( return nil, err } - // After loading the last state (and reprocessing if necessary), we are - // guaranteed that [acceptorTip] is equal to [lastAccepted]. - // - // It is critical to update this vaue before performing any state repairs so - // that all accepted blocks can be considered. - bc.acceptorTip = bc.lastAccepted - // Make sure the state associated with the block is available head := bc.CurrentBlock() if !bc.HasState(head.Root) { @@ -461,10 +421,6 @@ func NewBlockChain( latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db) bc.repairTxIndexTail(latestStateSynced) } - - // Start processing accepted blocks effects in the background - go bc.startAcceptor() - // Start tx indexer if it's enabled. if bc.cacheConfig.TransactionHistory != 0 { bc.txIndexer = newTxIndexer(bc.cacheConfig.TransactionHistory, bc) @@ -513,12 +469,6 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha return err } - // Ensure we avoid flattening the snapshot while we are processing a block, or - // block execution will fallback to reading from the trie (which is much - // slower). - bc.flattenLock.Lock() - defer bc.flattenLock.Unlock() - // Flatten the entire snap Trie to disk // // Note: This resumes snapshot generation. @@ -562,110 +512,41 @@ func (bc *BlockChain) warmAcceptedCaches() { log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime)) } -// startAcceptor starts processing items on the [acceptorQueue]. If a [nil] -// object is placed on the [acceptorQueue], the [startAcceptor] will exit. -func (bc *BlockChain) startAcceptor() { - log.Info("Starting Acceptor", "queue length", bc.cacheConfig.AcceptorQueueLimit) - - for next := range bc.acceptorQueue { - start := time.Now() - acceptorQueueGauge.Dec(1) - - if err := bc.flattenSnapshot(func() error { - return bc.stateManager.AcceptTrie(next) - }, next.Hash()); err != nil { - log.Crit("unable to flatten snapshot from acceptor", "blockHash", next.Hash(), "err", err) - } - - // Update last processed and transaction lookup index - if err := bc.writeBlockAcceptedIndices(next); err != nil { - log.Crit("failed to write accepted block effects", "err", err) - } - - // Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content - bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header()) - logs := bc.collectUnflattenedLogs(next, false) - bc.acceptedLogsCache.Put(next.Hash(), logs) - - // Update the acceptor tip before sending events to ensure that any client acting based off of - // the events observes the updated acceptorTip on subsequent requests - bc.acceptorTipLock.Lock() - bc.acceptorTip = next - bc.acceptorTipLock.Unlock() - - // Update accepted feeds - flattenedLogs := types.FlattenLogs(logs) - bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs}) - if len(flattenedLogs) > 0 { - bc.logsAcceptedFeed.Send(flattenedLogs) - } - if len(next.Transactions()) != 0 { - bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()}) - } - - bc.acceptorWg.Done() +// accept processes a block that has been verified and updates the snapshot +// and indexes. +func (bc *BlockChain) accept(next *types.Block) error { + start := time.Now() - acceptorWorkTimer.Inc(time.Since(start).Milliseconds()) - acceptorWorkCount.Inc(1) - // Note: in contrast to most accepted metrics, we increment the accepted log metrics in the acceptor queue because - // the logs are already processed in the acceptor queue. - acceptedLogsCounter.Inc(int64(len(logs))) + if err := bc.flattenSnapshot(func() error { + return bc.stateManager.AcceptTrie(next) + }, next.Hash()); err != nil { + log.Crit("unable to flatten snapshot from acceptor", "blockHash", next.Hash(), "err", err) } -} - -// addAcceptorQueue adds a new *types.Block to the [acceptorQueue]. This will -// block if there are [AcceptorQueueLimit] items in [acceptorQueue]. -func (bc *BlockChain) addAcceptorQueue(b *types.Block) { - // We only acquire a read lock here because it is ok to add items to the - // [acceptorQueue] concurrently. - bc.acceptorClosingLock.RLock() - defer bc.acceptorClosingLock.RUnlock() - if bc.acceptorClosed { - return + // Update last processed and transaction lookup index + if err := bc.writeBlockAcceptedIndices(next); err != nil { + log.Crit("failed to write accepted block effects", "err", err) } - acceptorQueueGauge.Inc(1) - bc.acceptorWg.Add(1) - bc.acceptorQueue <- b -} - -// DrainAcceptorQueue blocks until all items in [acceptorQueue] have been -// processed. -func (bc *BlockChain) DrainAcceptorQueue() { - bc.acceptorClosingLock.RLock() - defer bc.acceptorClosingLock.RUnlock() + // Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content + bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header()) + logs := bc.collectUnflattenedLogs(next, false) + bc.acceptedLogsCache.Put(next.Hash(), logs) - if bc.acceptorClosed { - return + // Update accepted feeds + flattenedLogs := types.FlattenLogs(logs) + bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs}) + if len(flattenedLogs) > 0 { + bc.logsAcceptedFeed.Send(flattenedLogs) } - - bc.acceptorWg.Wait() -} - -// stopAcceptor sends a signal to the Acceptor to stop processing accepted -// blocks. The Acceptor will exit once all items in [acceptorQueue] have been -// processed. -func (bc *BlockChain) stopAcceptor() { - bc.acceptorClosingLock.Lock() - defer bc.acceptorClosingLock.Unlock() - - // If [acceptorClosed] is already false, we should just return here instead - // of attempting to close [acceptorQueue] more than once (will cause - // a panic). - // - // This typically happens when a test calls [stopAcceptor] directly (prior to - // shutdown) and then [stopAcceptor] is called again in shutdown. - if bc.acceptorClosed { - return + if len(next.Transactions()) != 0 { + bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()}) } - // Although nothing should be added to [acceptorQueue] after - // [acceptorClosed] is updated, we close the channel so the Acceptor - // goroutine exits. - bc.acceptorWg.Wait() - bc.acceptorClosed = true - close(bc.acceptorQueue) + acceptorWorkTimer.Inc(time.Since(start).Milliseconds()) + acceptorWorkCount.Inc(1) + acceptedLogsCounter.Inc(int64(len(logs))) + return nil } func (bc *BlockChain) InitializeSnapshots() { @@ -817,9 +698,6 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) { // ValidateCanonicalChain confirms a canonical chain is well-formed. func (bc *BlockChain) ValidateCanonicalChain() error { - // Ensure all accepted blocks are fully processed - bc.DrainAcceptorQueue() - current := bc.CurrentBlock() i := 0 log.Info("Beginning to validate canonical chain", "startBlock", current.Number) @@ -936,11 +814,6 @@ func (bc *BlockChain) stopWithoutSaving() { log.Info("Closing quit channel") close(bc.quit) - // Wait for accepted feed to process all remaining items - log.Info("Stopping Acceptor") - start := time.Now() - bc.stopAcceptor() - log.Info("Acceptor queue drained", "t", time.Since(start)) // Stop senderCacher's goroutines log.Info("Shutting down sender cacher") @@ -1041,10 +914,10 @@ func (bc *BlockChain) LastConsensusAcceptedBlock() *types.Block { // // Note: During initialization, [acceptorTip] is equal to [lastAccepted]. func (bc *BlockChain) LastAcceptedBlock() *types.Block { - bc.acceptorTipLock.Lock() - defer bc.acceptorTipLock.Unlock() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() - return bc.acceptorTip + return bc.lastAccepted } // Accept sets a minimum height at which no reorg can pass. Additionally, @@ -1077,9 +950,12 @@ func (bc *BlockChain) Accept(block *types.Block) error { } } - // Enqueue block in the acceptor + // Update the last accepted block bc.lastAccepted = block - bc.addAcceptorQueue(block) + if err := bc.accept(block); err != nil { + return err + } + acceptedBlockGasUsedCounter.Inc(int64(block.GasUsed())) acceptedTxsCounter.Inc(int64(len(block.Transactions()))) if baseFee := block.BaseFee(); baseFee != nil { @@ -1365,10 +1241,6 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { // Instantiate the statedb to use for processing transactions // - // NOTE: Flattening a snapshot during block execution requires fetching state - // entries directly from the trie (much slower). - bc.flattenLock.Lock() - defer bc.flattenLock.Unlock() statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) if err != nil { return err @@ -2129,7 +2001,6 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error { // Update all in-memory chain markers bc.lastAccepted = block - bc.acceptorTip = block bc.currentBlock.Store(block.Header()) bc.hc.SetCurrentHeader(block.Header()) diff --git a/core/blockchain_log_test.go b/core/blockchain_log_test.go index fa16293634..3746f50bfd 100644 --- a/core/blockchain_log_test.go +++ b/core/blockchain_log_test.go @@ -91,7 +91,6 @@ func TestAcceptedLogsSubscription(t *testing.T) { err := chain.Accept(block) require.NoError(err) } - chain.DrainAcceptorQueue() logs := <-logsCh require.Len(logs, 1) diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index 3772aa8da9..1a158c4381 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -597,7 +597,6 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s } lastAcceptedHash = canonblocks[i].Hash() } - chain.DrainAcceptorQueue() } } if _, err := chain.InsertChain(canonblocks[tt.commitBlock:]); err != nil { diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index 9b5cea4aa1..9a2cd125b0 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -116,7 +116,6 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo } basic.lastAcceptedHash = blocks[i].Hash() } - chain.DrainAcceptorQueue() diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root() if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index d1d124f668..da40621345 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -32,7 +32,6 @@ var ( TriePrefetcherParallelism: 4, Pruning: false, // Archive mode SnapshotLimit: 256, - AcceptorQueueLimit: 64, } pruningConfig = &CacheConfig{ @@ -43,7 +42,6 @@ var ( Pruning: true, // Enable pruning CommitInterval: 4096, SnapshotLimit: 256, - AcceptorQueueLimit: 64, } ) @@ -92,7 +90,6 @@ func TestArchiveBlockChainSnapsDisabled(t *testing.T) { TriePrefetcherParallelism: 4, Pruning: false, // Archive mode SnapshotLimit: 0, // Disable snapshots - AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -128,7 +125,6 @@ func TestPruningBlockChainSnapsDisabled(t *testing.T) { Pruning: true, // Enable pruning CommitInterval: 4096, SnapshotLimit: 0, // Disable snapshots - AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -178,7 +174,6 @@ func TestPruningBlockChainUngracefulShutdownSnapsDisabled(t *testing.T) { Pruning: true, // Enable pruning CommitInterval: 4096, SnapshotLimit: 0, // Disable snapshots - AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -214,7 +209,6 @@ func TestEnableSnapshots(t *testing.T) { Pruning: true, // Enable pruning CommitInterval: 4096, SnapshotLimit: snapLimit, - AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -341,7 +335,6 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) { t.Fatal(err) } } - blockchain.DrainAcceptorQueue() lastAcceptedHash := blockchain.LastConsensusAcceptedBlock().Hash() blockchain.Stop() @@ -372,7 +365,6 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) { SnapshotLimit: 256, PopulateMissingTries: &startHeight, // Starting point for re-populating. PopulateMissingTriesParallelism: parallelism, - AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -396,7 +388,7 @@ func TestRepopulateMissingTries(t *testing.T) { } } -func TestUngracefulAsyncShutdown(t *testing.T) { +func TestUngracefulShutdown(t *testing.T) { var ( create = func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) { blockchain, err := createBlockChain(db, &CacheConfig{ @@ -408,7 +400,6 @@ func TestUngracefulAsyncShutdown(t *testing.T) { CommitInterval: 4096, SnapshotLimit: 256, SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 1000, // ensure channel doesn't block }, gspec, lastAcceptedHash) if err != nil { return nil, err @@ -446,52 +437,28 @@ func TestUngracefulAsyncShutdown(t *testing.T) { t.Fatal(err) } - // Insert three blocks into the chain and accept only the first block. + // Insert and accept three blocks into the chain. if _, err := blockchain.InsertChain(chain); err != nil { t.Fatal(err) } - foundTxs := []common.Hash{} - missingTxs := []common.Hash{} - for i, block := range chain { + allTxs := []common.Hash{} + for _, block := range chain { if err := blockchain.Accept(block); err != nil { t.Fatal(err) } - - if i == 3 { - // At height 3, kill the async accepted block processor to force an - // ungraceful recovery - blockchain.stopAcceptor() - blockchain.acceptorQueue = nil - } - - if i <= 3 { - // If <= height 3, all txs should be accessible on lookup - for _, tx := range block.Transactions() { - foundTxs = append(foundTxs, tx.Hash()) - } - } else { - // If > 3, all txs should be accessible on lookup - for _, tx := range block.Transactions() { - missingTxs = append(missingTxs, tx.Hash()) - } + for _, tx := range block.Transactions() { + allTxs = append(allTxs, tx.Hash()) } } - // After inserting all blocks, we should confirm that txs added after the - // async worker shutdown cannot be found. - for _, tx := range foundTxs { + // After accepting the blocks, all txs should be queryable. + for _, tx := range allTxs { txLookup, _, _ := blockchain.GetTransactionLookup(tx) if txLookup == nil { t.Fatalf("missing transaction: %v", tx) } } - for _, tx := range missingTxs { - txLookup, _, _ := blockchain.GetTransactionLookup(tx) - if txLookup != nil { - t.Fatalf("transaction should be missing: %v", tx) - } - } // check the state of the last accepted block checkState := func(sdb *state.StateDB) error { @@ -521,15 +488,13 @@ func TestUngracefulAsyncShutdown(t *testing.T) { } _, newChain, restartedChain := checkBlockChainState(t, blockchain, gspec, chainDB, create, checkState) - - allTxs := append(foundTxs, missingTxs...) for _, bc := range []*BlockChain{newChain, restartedChain} { // We should confirm that snapshots were properly initialized if bc.snaps == nil { t.Fatal("snapshot initialization failed") } - // We should confirm all transactions can now be queried + // All transactions should still be queryable after a restart. for _, tx := range allTxs { txLookup, _, _ := bc.GetTransactionLookup(tx) if txLookup == nil { @@ -665,7 +630,6 @@ func TestTxLookupBlockChain(t *testing.T) { CommitInterval: 4096, SnapshotLimit: 256, SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 64, // ensure channel doesn't block TransactionHistory: 5, } createTxLookupBlockChain := func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) { @@ -688,7 +652,6 @@ func TestTxLookupSkipIndexingBlockChain(t *testing.T) { CommitInterval: 4096, SnapshotLimit: 256, SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 64, // ensure channel doesn't block TransactionHistory: 5, SkipTxIndexing: true, } diff --git a/core/test_blockchain.go b/core/test_blockchain.go index 02f98b61cf..9bb24ce2f4 100644 --- a/core/test_blockchain.go +++ b/core/test_blockchain.go @@ -148,7 +148,6 @@ func checkBlockChainState( t.Fatalf("Failed to accept block %s:%d due to %s", block.Hash().Hex(), block.NumberU64(), err) } } - newBlockChain.DrainAcceptorQueue() newLastAcceptedBlock := newBlockChain.LastConsensusAcceptedBlock() if newLastAcceptedBlock.Hash() != lastAcceptedBlock.Hash() { @@ -231,7 +230,6 @@ func TestInsertChainAcceptSingleBlock(t *testing.T, create func(db ethdb.Databas if err := blockchain.Accept(chain[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() // check the state of the last accepted block checkState := func(sdb *state.StateDB) error { @@ -349,7 +347,6 @@ func TestInsertLongForkedChain(t *testing.T, create func(db ethdb.Database, gspe if err := blockchain.Accept(chain1[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() if blockchain.snaps != nil { // Snap layer count should be 1 fewer @@ -381,7 +378,6 @@ func TestInsertLongForkedChain(t *testing.T, create func(db ethdb.Database, gspe if err := blockchain.Accept(chain1[i]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() if blockchain.snaps != nil { // Snap layer count should decrease by 1 per Accept @@ -491,7 +487,6 @@ func TestAcceptNonCanonicalBlock(t *testing.T, create func(db ethdb.Database, gs if err := blockchain.Accept(chain2[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() for i := 0; i < len(chain1); i++ { if err := blockchain.Reject(chain1[i]); err != nil { @@ -630,7 +625,6 @@ func TestSetPreferenceRewind(t *testing.T, create func(db ethdb.Database, gspec if err := blockchain.Accept(chain[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() lastAcceptedBlock = blockchain.LastConsensusAcceptedBlock() expectedLastAcceptedBlock = chain[0] @@ -751,7 +745,6 @@ func TestBuildOnVariousStages(t *testing.T, create func(db ethdb.Database, gspec t.Fatal(err) } } - blockchain.DrainAcceptorQueue() // Insert the forked chain [chain2] which starts at the 10th // block in [chain1] ie. a block that is still in processing. @@ -768,7 +761,6 @@ func TestBuildOnVariousStages(t *testing.T, create func(db ethdb.Database, gspec if err := blockchain.Accept(chain1[5]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() for _, block := range chain3 { if err := blockchain.Reject(block); err != nil { t.Fatal(err) @@ -780,14 +772,12 @@ func TestBuildOnVariousStages(t *testing.T, create func(db ethdb.Database, gspec t.Fatal(err) } } - blockchain.DrainAcceptorQueue() // Accept the first block in [chain2] and reject the // subsequent blocks in [chain1] which would then be rejected. if err := blockchain.Accept(chain2[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() for _, block := range chain1[10:] { if err := blockchain.Reject(block); err != nil { @@ -863,7 +853,6 @@ func TestEmptyBlocks(t *testing.T, create func(db ethdb.Database, gspec *Genesis t.Fatal(err) } } - blockchain.DrainAcceptorQueue() // Nothing to assert about the state checkState := func(sdb *state.StateDB) error { @@ -937,7 +926,6 @@ func TestReorgReInsert(t *testing.T, create func(db ethdb.Database, gspec *Genes if err := blockchain.Accept(chain[2]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() // Nothing to assert about the state checkState := func(sdb *state.StateDB) error { @@ -1053,7 +1041,6 @@ func TestAcceptBlockIdenticalStateRoot(t *testing.T, create func(db ethdb.Databa if err := blockchain.Accept(chain1[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() for _, block := range chain2 { if err := blockchain.Reject(block); err != nil { @@ -1067,7 +1054,6 @@ func TestAcceptBlockIdenticalStateRoot(t *testing.T, create func(db ethdb.Databa if err := blockchain.Accept(chain1[1]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() lastAcceptedBlock := blockchain.LastConsensusAcceptedBlock() expectedLastAcceptedBlock := chain1[1] @@ -1081,7 +1067,6 @@ func TestAcceptBlockIdenticalStateRoot(t *testing.T, create func(db ethdb.Databa if err := blockchain.Accept(chain1[2]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() // check the state of the last accepted block checkState := func(sdb *state.StateDB) error { @@ -1221,7 +1206,6 @@ func TestReprocessAcceptBlockIdenticalStateRoot(t *testing.T, create func(db eth if err := blockchain.Accept(chain1[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() for _, block := range chain2 { if err := blockchain.Reject(block); err != nil { @@ -1235,7 +1219,6 @@ func TestReprocessAcceptBlockIdenticalStateRoot(t *testing.T, create func(db eth if err := blockchain.Accept(chain1[1]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() lastAcceptedBlock := blockchain.LastConsensusAcceptedBlock() expectedLastAcceptedBlock := chain1[1] @@ -1249,7 +1232,6 @@ func TestReprocessAcceptBlockIdenticalStateRoot(t *testing.T, create func(db eth if err := blockchain.Accept(chain1[2]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() // check the state of the last accepted block checkState := func(sdb *state.StateDB) error { @@ -1436,7 +1418,6 @@ func TestInsertChainValidBlockFee(t *testing.T, create func(db ethdb.Database, g if err := blockchain.Accept(chain[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() // check the state of the last accepted block checkState := func(sdb *state.StateDB) error { @@ -1634,7 +1615,6 @@ func TestStatefulPrecompiles(t *testing.T, create func(db ethdb.Database, gspec if err := blockchain.Accept(chain[0]); err != nil { t.Fatal(err) } - blockchain.DrainAcceptorQueue() genesisState, err := blockchain.StateAt(blockchain.Genesis().Root()) if err != nil { diff --git a/core/txindexer_test.go b/core/txindexer_test.go index 39d71d49ce..b93857c1b8 100644 --- a/core/txindexer_test.go +++ b/core/txindexer_test.go @@ -71,7 +71,6 @@ func TestTransactionIndices(t *testing.T) { CommitInterval: 4096, SnapshotLimit: 256, SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 64, } // Init block chain and check all needed indices has been indexed. @@ -86,7 +85,6 @@ func TestTransactionIndices(t *testing.T) { err := chain.Accept(block) require.NoError(err) } - chain.DrainAcceptorQueue() lastAcceptedBlock := blocks[len(blocks)-1] require.Equal(lastAcceptedBlock.Hash(), chain.CurrentHeader().Hash()) @@ -121,7 +119,6 @@ func TestTransactionIndices(t *testing.T) { lastAcceptedBlock = newBlks[0] err = chain.Accept(lastAcceptedBlock) // Accept the block to trigger indices updater. require.NoError(err) - chain.DrainAcceptorQueue() tail = getTail(l, lastAcceptedBlock.NumberU64()) // check if indices are updated correctly @@ -182,7 +179,6 @@ func TestTransactionSkipIndexing(t *testing.T) { CommitInterval: 4096, SnapshotLimit: 256, SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 64, SkipTxIndexing: true, } @@ -248,7 +244,6 @@ func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Ge if err != nil { return nil, err } - chain.DrainAcceptorQueue() if accepted != nil { accepted(block) } diff --git a/eth/backend.go b/eth/backend.go index b92ad99f99..6c9da27108 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -210,7 +210,6 @@ func New( TrieDirtyCommitTarget: config.TrieDirtyCommitTarget, TriePrefetcherParallelism: config.TriePrefetcherParallelism, Pruning: config.Pruning, - AcceptorQueueLimit: config.AcceptorQueueLimit, CommitInterval: config.CommitInterval, PopulateMissingTries: config.PopulateMissingTries, PopulateMissingTriesParallelism: config.PopulateMissingTriesParallelism, diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 0c451d6241..23e31f6cb3 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -86,7 +86,6 @@ type Config struct { NetworkId uint64 Pruning bool // Whether to disable pruning and flush everything to disk - AcceptorQueueLimit int // Maximum blocks to queue before blocking during acceptance CommitInterval uint64 // If pruning is enabled, specified the interval at which to commit an entire trie to disk. PopulateMissingTries *uint64 // Height at which to start re-populating missing tries on startup. PopulateMissingTriesParallelism int // Number of concurrent readers to use when re-populating missing tries on startup. diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index 1f1b32492e..5b16069b93 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -105,7 +105,6 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i i } } backend.chain = chain - chain.DrainAcceptorQueue() return backend } diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 9d53ed79bd..0d5c2feabd 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -185,7 +185,6 @@ func (n *Backend) buildBlock(accept bool, gap uint64) (common.Hash, error) { if err := n.acceptAncestors(block); err != nil { return common.Hash{}, err } - chain.DrainAcceptorQueue() } return block.Hash(), nil } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 72b2ba8568..5acf9e7e7b 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -468,7 +468,6 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E t.Fatalf("block %d: failed to accept into chain: %v", block.NumberU64(), err) } } - chain.DrainAcceptorQueue() backend := &testBackend{db: db, chain: chain, accman: accman, acc: acc} return backend diff --git a/plugin/evm/config.go b/plugin/evm/config.go index 9f59775ba9..9ad8c24b66 100644 --- a/plugin/evm/config.go +++ b/plugin/evm/config.go @@ -16,7 +16,6 @@ import ( ) const ( - defaultAcceptorQueueLimit = 64 // Provides 2 minutes of buffer (2s block target) for a commit delay defaultPruningEnabled = true defaultCommitInterval = 4096 defaultTrieCleanCache = 512 @@ -119,7 +118,6 @@ type Config struct { // Pruning Settings Pruning bool `json:"pruning-enabled"` // If enabled, trie roots are only persisted every 4096 blocks - AcceptorQueueLimit int `json:"accepted-queue-limit"` // Maximum blocks to queue before blocking during acceptance CommitInterval uint64 `json:"commit-interval"` // Specifies the commit interval at which to persist EVM and atomic tries. AllowMissingTries bool `json:"allow-missing-tries"` // If enabled, warnings preventing an incomplete trie index are suppressed PopulateMissingTries *uint64 `json:"populate-missing-tries,omitempty"` // Sets the starting point for re-populating missing tries. Disables re-generation if nil. @@ -262,7 +260,6 @@ func (c *Config) SetDefaults() { c.TrieDirtyCommitTarget = defaultTrieDirtyCommitTarget c.TriePrefetcherParallelism = defaultTriePrefetcherParallelism c.SnapshotCache = defaultSnapshotCache - c.AcceptorQueueLimit = defaultAcceptorQueueLimit c.CommitInterval = defaultCommitInterval c.SnapshotWait = defaultSnapshotWait c.PushGossipPercentStake = defaultPushGossipPercentStake diff --git a/plugin/evm/syncervm_test.go b/plugin/evm/syncervm_test.go index 5a5b260a2d..c2d3bb0093 100644 --- a/plugin/evm/syncervm_test.go +++ b/plugin/evm/syncervm_test.go @@ -192,7 +192,6 @@ func TestStateSyncToggleEnabledToDisabled(t *testing.T) { if err := syncDisabledVM.blockChain.Snapshots().Verify(lastRoot); err != nil { t.Fatal(err) } - syncDisabledVM.blockChain.DrainAcceptorQueue() // Create a new VM from the same database with state sync enabled. syncReEnabledVM := &VM{} @@ -589,7 +588,6 @@ func generateAndAcceptBlocks(t *testing.T, vm *VM, numBlocks int, gen func(int, if err != nil { t.Fatal(err) } - vm.blockChain.DrainAcceptorQueue() } // assertSyncPerformedHeights iterates over all heights the VM has synced to and diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 376a8a12c7..958ad0cc2c 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -417,7 +417,6 @@ func (vm *VM) Initialize( vm.ethConfig.TrieDirtyCommitTarget = vm.config.TrieDirtyCommitTarget vm.ethConfig.TriePrefetcherParallelism = vm.config.TriePrefetcherParallelism vm.ethConfig.SnapshotCache = vm.config.SnapshotCache - vm.ethConfig.AcceptorQueueLimit = vm.config.AcceptorQueueLimit vm.ethConfig.PopulateMissingTries = vm.config.PopulateMissingTries vm.ethConfig.PopulateMissingTriesParallelism = vm.config.PopulateMissingTriesParallelism vm.ethConfig.AllowMissingTries = vm.config.AllowMissingTries diff --git a/plugin/evm/vm_warp_test.go b/plugin/evm/vm_warp_test.go index eb235379ee..b4fa1b1122 100644 --- a/plugin/evm/vm_warp_test.go +++ b/plugin/evm/vm_warp_test.go @@ -136,7 +136,6 @@ func TestSendWarpMessage(t *testing.T) { require.NoError(vm.SetPreference(context.Background(), blk.ID())) require.NoError(blk.Accept(context.Background())) - vm.blockChain.DrainAcceptorQueue() // Verify the message signature after accepting the block. rawSignatureBytes, err := vm.warpBackend.GetMessageSignature(unsignedMessageID) @@ -389,7 +388,6 @@ func testWarpVMTransaction(t *testing.T, unsignedMessage *avalancheWarp.Unsigned require.NoError(warpBlockVerifyWithCtx.VerifyWithContext(context.Background(), blockCtx)) require.NoError(vm.SetPreference(context.Background(), warpBlock.ID())) require.NoError(warpBlock.Accept(context.Background())) - vm.blockChain.DrainAcceptorQueue() ethBlock := warpBlock.(*chain.BlockWrapper).Block.(*Block).ethBlock verifiedMessageReceipts := vm.blockChain.GetReceiptsByHash(ethBlock.Hash()) @@ -701,7 +699,6 @@ func testReceiveWarpMessage( // Accept the block after performing multiple VerifyWithContext operations require.NoError(block2.Accept(context.Background())) - vm.blockChain.DrainAcceptorQueue() verifiedMessageReceipts := vm.blockChain.GetReceiptsByHash(ethBlock.Hash()) require.Len(verifiedMessageReceipts, 1) From b5034354cedba5783b5a021f4e4b801c69a238fa Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Wed, 11 Sep 2024 07:46:11 -0700 Subject: [PATCH 2/4] pr comments --- core/blockchain.go | 28 ++++++++-------------------- core/blockchain_test.go | 2 +- core/test_blockchain.go | 18 +++++++++--------- ethclient/simulated/backend.go | 2 +- plugin/evm/service.go | 2 +- 5 files changed, 20 insertions(+), 32 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 35ffce2221..5cb748eac9 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -480,7 +480,7 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha func (bc *BlockChain) warmAcceptedCaches() { var ( startTime = time.Now() - lastAccepted = bc.LastAcceptedBlock().NumberU64() + lastAccepted = bc.lastAccepted.NumberU64() startIndex = uint64(1) targetCacheSize = uint64(bc.cacheConfig.AcceptedCacheSize) ) @@ -520,12 +520,12 @@ func (bc *BlockChain) accept(next *types.Block) error { if err := bc.flattenSnapshot(func() error { return bc.stateManager.AcceptTrie(next) }, next.Hash()); err != nil { - log.Crit("unable to flatten snapshot from acceptor", "blockHash", next.Hash(), "err", err) + return fmt.Errorf("unable to flatten snapshot in accept for block (%v): %w", next.Hash(), err) } // Update last processed and transaction lookup index if err := bc.writeBlockAcceptedIndices(next); err != nil { - log.Crit("failed to write accepted block effects", "err", err) + return fmt.Errorf("failed to write accepted block indices: %w", err) } // Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content @@ -900,22 +900,10 @@ func (bc *BlockChain) setPreference(block *types.Block) error { return nil } -// LastConsensusAcceptedBlock returns the last block to be marked as accepted. It may or -// may not yet be processed. -func (bc *BlockChain) LastConsensusAcceptedBlock() *types.Block { - bc.chainmu.Lock() - defer bc.chainmu.Unlock() - - return bc.lastAccepted -} - -// LastAcceptedBlock returns the last block to be marked as accepted and is -// processed. -// -// Note: During initialization, [acceptorTip] is equal to [lastAccepted]. +// LastAcceptedBlock returns the last block that was marked as accepted. func (bc *BlockChain) LastAcceptedBlock() *types.Block { - bc.chainmu.Lock() - defer bc.chainmu.Unlock() + bc.chainmu.RLock() + defer bc.chainmu.RUnlock() return bc.lastAccepted } @@ -1825,7 +1813,7 @@ func (bc *BlockChain) populateMissingTries() error { } var ( - lastAccepted = bc.LastAcceptedBlock().NumberU64() + lastAccepted = bc.lastAccepted.NumberU64() startHeight = *bc.cacheConfig.PopulateMissingTries startTime = time.Now() logged time.Time @@ -1901,7 +1889,7 @@ func (bc *BlockChain) populateMissingTries() error { // as inaccessible and mirrors the handling of middle roots in the geth offline pruning implementation. // This is not strictly necessary, but maintains a soft assumption. func (bc *BlockChain) CleanBlockRootsAboveLastAccepted() error { - targetRoot := bc.LastAcceptedBlock().Root() + targetRoot := bc.lastAccepted.Root() // Clean up any block roots above the last accepted block before we start pruning. // Note: this takes the place of middleRoots in the geth implementation since we do not diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 76d3dcd65f..2bb7eb5449 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -336,7 +336,7 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) { } } - lastAcceptedHash := blockchain.LastConsensusAcceptedBlock().Hash() + lastAcceptedHash := blockchain.LastAcceptedBlock().Hash() blockchain.Stop() blockchain, err = createBlockChain(chainDB, pruningConfig, gspec, lastAcceptedHash) diff --git a/core/test_blockchain.go b/core/test_blockchain.go index 9bb24ce2f4..cb0d196313 100644 --- a/core/test_blockchain.go +++ b/core/test_blockchain.go @@ -118,7 +118,7 @@ func checkBlockChainState( checkState func(sdb *state.StateDB) error, ) (*BlockChain, *BlockChain, *BlockChain) { var ( - lastAcceptedBlock = bc.LastConsensusAcceptedBlock() + lastAcceptedBlock = bc.LastAcceptedBlock() newDB = rawdb.NewMemoryDatabase() ) @@ -149,7 +149,7 @@ func checkBlockChainState( } } - newLastAcceptedBlock := newBlockChain.LastConsensusAcceptedBlock() + newLastAcceptedBlock := newBlockChain.LastAcceptedBlock() if newLastAcceptedBlock.Hash() != lastAcceptedBlock.Hash() { t.Fatalf("Expected new blockchain to have last accepted block %s:%d, but found %s:%d", lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64(), newLastAcceptedBlock.Hash().Hex(), newLastAcceptedBlock.NumberU64()) } @@ -176,7 +176,7 @@ func checkBlockChainState( if currentBlock := restartedChain.CurrentBlock(); currentBlock.Hash() != lastAcceptedBlock.Hash() { t.Fatalf("Expected restarted chain to have current block %s:%d, but found %s:%d", lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64(), currentBlock.Hash().Hex(), currentBlock.Number.Uint64()) } - if restartedLastAcceptedBlock := restartedChain.LastConsensusAcceptedBlock(); restartedLastAcceptedBlock.Hash() != lastAcceptedBlock.Hash() { + if restartedLastAcceptedBlock := restartedChain.LastAcceptedBlock(); restartedLastAcceptedBlock.Hash() != lastAcceptedBlock.Hash() { t.Fatalf("Expected restarted chain to have current block %s:%d, but found %s:%d", lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64(), restartedLastAcceptedBlock.Hash().Hex(), restartedLastAcceptedBlock.NumberU64()) } @@ -387,7 +387,7 @@ func TestInsertLongForkedChain(t *testing.T, create func(db ethdb.Database, gspe } } - lastAcceptedBlock := blockchain.LastConsensusAcceptedBlock() + lastAcceptedBlock := blockchain.LastAcceptedBlock() expectedLastAcceptedBlock := chain1[len(chain1)-1] if lastAcceptedBlock.Hash() != expectedLastAcceptedBlock.Hash() { t.Fatalf("Expected last accepted block to be %s:%d, but found %s%d", expectedLastAcceptedBlock.Hash().Hex(), expectedLastAcceptedBlock.NumberU64(), lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64()) @@ -495,7 +495,7 @@ func TestAcceptNonCanonicalBlock(t *testing.T, create func(db ethdb.Database, gs require.False(t, blockchain.HasBlock(chain1[i].Hash(), chain1[i].NumberU64())) } - lastAcceptedBlock := blockchain.LastConsensusAcceptedBlock() + lastAcceptedBlock := blockchain.LastAcceptedBlock() expectedLastAcceptedBlock := chain2[0] if lastAcceptedBlock.Hash() != expectedLastAcceptedBlock.Hash() { t.Fatalf("Expected last accepted block to be %s:%d, but found %s%d", expectedLastAcceptedBlock.Hash().Hex(), expectedLastAcceptedBlock.NumberU64(), lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64()) @@ -591,7 +591,7 @@ func TestSetPreferenceRewind(t *testing.T, create func(db ethdb.Database, gspec t.Fatalf("Expected current block to be %s:%d, but found %s%d", expectedCurrentBlock.Hash().Hex(), expectedCurrentBlock.NumberU64(), currentBlock.Hash().Hex(), currentBlock.Number.Uint64()) } - lastAcceptedBlock := blockchain.LastConsensusAcceptedBlock() + lastAcceptedBlock := blockchain.LastAcceptedBlock() expectedLastAcceptedBlock := blockchain.Genesis() if lastAcceptedBlock.Hash() != expectedLastAcceptedBlock.Hash() { t.Fatalf("Expected last accepted block to be %s:%d, but found %s%d", expectedLastAcceptedBlock.Hash().Hex(), expectedLastAcceptedBlock.NumberU64(), lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64()) @@ -626,7 +626,7 @@ func TestSetPreferenceRewind(t *testing.T, create func(db ethdb.Database, gspec t.Fatal(err) } - lastAcceptedBlock = blockchain.LastConsensusAcceptedBlock() + lastAcceptedBlock = blockchain.LastAcceptedBlock() expectedLastAcceptedBlock = chain[0] if lastAcceptedBlock.Hash() != expectedLastAcceptedBlock.Hash() { t.Fatalf("Expected last accepted block to be %s:%d, but found %s%d", expectedLastAcceptedBlock.Hash().Hex(), expectedLastAcceptedBlock.NumberU64(), lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64()) @@ -1055,7 +1055,7 @@ func TestAcceptBlockIdenticalStateRoot(t *testing.T, create func(db ethdb.Databa t.Fatal(err) } - lastAcceptedBlock := blockchain.LastConsensusAcceptedBlock() + lastAcceptedBlock := blockchain.LastAcceptedBlock() expectedLastAcceptedBlock := chain1[1] if lastAcceptedBlock.Hash() != expectedLastAcceptedBlock.Hash() { t.Fatalf("Expected last accepted block to be %s:%d, but found %s%d", expectedLastAcceptedBlock.Hash().Hex(), expectedLastAcceptedBlock.NumberU64(), lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64()) @@ -1220,7 +1220,7 @@ func TestReprocessAcceptBlockIdenticalStateRoot(t *testing.T, create func(db eth t.Fatal(err) } - lastAcceptedBlock := blockchain.LastConsensusAcceptedBlock() + lastAcceptedBlock := blockchain.LastAcceptedBlock() expectedLastAcceptedBlock := chain1[1] if lastAcceptedBlock.Hash() != expectedLastAcceptedBlock.Hash() { t.Fatalf("Expected last accepted block to be %s:%d, but found %s%d", expectedLastAcceptedBlock.Hash().Hex(), expectedLastAcceptedBlock.NumberU64(), lastAcceptedBlock.Hash().Hex(), lastAcceptedBlock.NumberU64()) diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 0d5c2feabd..c8e53a61ba 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -191,7 +191,7 @@ func (n *Backend) buildBlock(accept bool, gap uint64) (common.Hash, error) { func (n *Backend) acceptAncestors(block *types.Block) error { chain := n.eth.BlockChain() - lastAccepted := chain.LastConsensusAcceptedBlock() + lastAccepted := chain.LastAcceptedBlock() // Accept all ancestors of the block toAccept := []*types.Block{block} diff --git a/plugin/evm/service.go b/plugin/evm/service.go index a8fe61cbc0..e3d7fe7042 100644 --- a/plugin/evm/service.go +++ b/plugin/evm/service.go @@ -23,7 +23,7 @@ type GetAcceptedFrontReply struct { // GetAcceptedFront returns the last accepted block's hash and height func (api *SnowmanAPI) GetAcceptedFront(ctx context.Context) (*GetAcceptedFrontReply, error) { - blk := api.vm.blockChain.LastConsensusAcceptedBlock() + blk := api.vm.blockChain.LastAcceptedBlock() return &GetAcceptedFrontReply{ Hash: blk.Hash(), Number: blk.Number(), From 57b79e086a40b83fe73f767421e8ff76ec3e3d40 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Wed, 11 Sep 2024 07:55:09 -0700 Subject: [PATCH 3/4] fix --- core/blockchain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 5cb748eac9..c6f728aaba 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -520,7 +520,7 @@ func (bc *BlockChain) accept(next *types.Block) error { if err := bc.flattenSnapshot(func() error { return bc.stateManager.AcceptTrie(next) }, next.Hash()); err != nil { - return fmt.Errorf("unable to flatten snapshot in accept for block (%v): %w", next.Hash(), err) + return fmt.Errorf("unable to flatten snapshot in accept for block (%): %w", next.Hash(), err) } // Update last processed and transaction lookup index From 5b4fb67d3fe7f5aef4757202e231208c7e26b65d Mon Sep 17 00:00:00 2001 From: Ceyhun Onur Date: Wed, 11 Sep 2024 18:13:39 +0300 Subject: [PATCH 4/4] race-safe block root cleaning (#1339) * race-safe block root cleaning * Revert formatting changes --------- Signed-off-by: Ceyhun Onur --- core/blockchain.go | 21 ++++++++++++--------- core/blockchain_test.go | 4 ++-- eth/backend.go | 4 ++-- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c6f728aaba..79acc2ec28 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1888,30 +1888,33 @@ func (bc *BlockChain) populateMissingTries() error { // This is used prior to pruning to ensure that all of the tries that may still be in processing are marked // as inaccessible and mirrors the handling of middle roots in the geth offline pruning implementation. // This is not strictly necessary, but maintains a soft assumption. -func (bc *BlockChain) CleanBlockRootsAboveLastAccepted() error { - targetRoot := bc.lastAccepted.Root() +func (bc *BlockChain) CleanBlockRootsAboveLastAccepted() (common.Hash, error) { + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + targetBlock := bc.lastAccepted + targetRoot := targetBlock.Root() // Clean up any block roots above the last accepted block before we start pruning. // Note: this takes the place of middleRoots in the geth implementation since we do not // track processing block roots via snapshot journals in the same way. - processingRoots := bc.gatherBlockRootsAboveLastAccepted() + processingRoots := bc.gatherBlockRootsAbove(targetBlock.NumberU64()) // If there is a block above the last accepted block with an identical state root, we // explicitly remove it from the set to ensure we do not corrupt the last accepted trie. delete(processingRoots, targetRoot) for processingRoot := range processingRoots { // Delete the processing root from disk to mark the trie as inaccessible (no need to handle this in a batch). if err := bc.db.Delete(processingRoot[:]); err != nil { - return fmt.Errorf("failed to remove processing root (%s) preparing for offline pruning: %w", processingRoot, err) + return common.Hash{}, fmt.Errorf("failed to remove processing root (%s) preparing for offline pruning: %w", processingRoot, err) } } - return nil + return targetRoot, nil } // gatherBlockRootsAboveLastAccepted iterates forward from the last accepted block and returns a list of all block roots -// for any blocks that were inserted above the last accepted block. +// for any blocks that were inserted above the [targetHeight]. // Given that we never insert a block into the chain unless all of its ancestors have been inserted, this should gather -// all of the block roots for blocks inserted above the last accepted block that may have been in processing at some point +// all of the block roots for blocks inserted above the [targetHeight] that may have been in processing at some point // in the past and are therefore potentially still acceptable. // Note: there is an edge case where the node dies while the consensus engine is rejecting a branch of blocks since the // consensus engine will reject the lowest ancestor first. In this case, these blocks will not be considered acceptable in @@ -1931,9 +1934,9 @@ func (bc *BlockChain) CleanBlockRootsAboveLastAccepted() error { // The consensus engine accepts block C and proceeds to reject the other branch in order (B, D, E, F). // If the consensus engine dies after rejecting block D, block D will be deleted, such that the forward iteration // may not find any blocks at this height and will not reach the previously processing blocks E and F. -func (bc *BlockChain) gatherBlockRootsAboveLastAccepted() map[common.Hash]struct{} { +func (bc *BlockChain) gatherBlockRootsAbove(targetHeight uint64) map[common.Hash]struct{} { blockRoots := make(map[common.Hash]struct{}) - for height := bc.lastAccepted.NumberU64() + 1; ; height++ { + for height := targetHeight + 1; ; height++ { blockHashes := rawdb.ReadAllHashes(bc.db, height) // If there are no block hashes at [height], then there should be no further acceptable blocks // past this point. diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 2bb7eb5449..85fd120cee 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -259,11 +259,11 @@ func TestBlockChainOfflinePruningUngracefulShutdown(t *testing.T) { return blockchain, nil } - if err := blockchain.CleanBlockRootsAboveLastAccepted(); err != nil { + targetRoot, err := blockchain.CleanBlockRootsAboveLastAccepted() + if err != nil { return nil, err } // get the target root to prune to before stopping the blockchain - targetRoot := blockchain.LastAcceptedBlock().Root() if targetRoot == types.EmptyRootHash { return blockchain, nil } diff --git a/eth/backend.go b/eth/backend.go index 6c9da27108..8cee916293 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -455,10 +455,10 @@ func (s *Ethereum) handleOfflinePruning(cacheConfig *core.CacheConfig, gspec *co } // Clean up middle roots - if err := s.blockchain.CleanBlockRootsAboveLastAccepted(); err != nil { + targetRoot, err := s.blockchain.CleanBlockRootsAboveLastAccepted() + if err != nil { return err } - targetRoot := s.blockchain.LastAcceptedBlock().Root() // Allow the blockchain to be garbage collected immediately, since we will shut down the chain after offline pruning completes. s.blockchain.Stop()