Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove acceptor queue (part 1) #1334

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 49 additions & 187 deletions core/blockchain.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to store the latest tip with WriteAcceptorTip? I mean stopping updating the stored value (not to completely remove it).

Copy link
Collaborator

@ceyonur ceyonur Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken acceptorTip can only be =< to lastAccepted. So if we decide not to update this we might end up always reprocessing the state.

I wonder after removing the acceptor queue we will ever need to reprocess the state. Maybe we can keep the check bc.HasState(current.Root()).

I also don't fully understand how that reprocessState works. If acceptorTip =< lastAccepted then from what I see it would go back from acceptorTip since we do this current = bc.GetBlockByHash(acceptorTip). what happens to the state between acceptorTip and lastAccepted then? I might be completely wrong though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is correct that acceptorTip <= lastAccepted or that acceptorTip == common.Hash{}
reprocessState is kind of confusing:

  • If acceptorTip < lastAccepted, execution will start at acceptorTip.
  • Next, we find the block with state present on disk so we can start re-executing (this can be less than acceptorTip/lastAccepted since we only persist roots to disk once per commit interval in pruning mode)
  • When the acceptorTip is reached, from that point we will start processing the snapshot along with the state & also write the tx accepted indexes.
  • We continue up to lastAccepted.

We can stop updating it once it matches lastAccepted, but I would prefer to keep that to the next PR so we get it right. Let me know if you prefer to include stop updating it in this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no I think it's fine if we remove them altogether in a safer way.

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion core/blockchain_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func TestAcceptedLogsSubscription(t *testing.T) {
err := chain.Accept(block)
require.NoError(err)
}
chain.DrainAcceptorQueue()

logs := <-logsCh
require.Len(logs, 1)
Expand Down
1 change: 0 additions & 1 deletion core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
lastAcceptedHash = canonblocks[i].Hash()
}
chain.DrainAcceptorQueue()
}
}
if _, err := chain.InsertChain(canonblocks[tt.commitBlock:]); err != nil {
Expand Down
1 change: 0 additions & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
basic.lastAcceptedHash = blocks[i].Hash()
}
chain.DrainAcceptorQueue()

diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root()
if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) {
Expand Down
61 changes: 12 additions & 49 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ var (
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
}

pruningConfig = &CacheConfig{
Expand All @@ -43,7 +42,6 @@ var (
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
}
)

Expand Down Expand Up @@ -92,7 +90,6 @@ func TestArchiveBlockChainSnapsDisabled(t *testing.T) {
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -128,7 +125,6 @@ func TestPruningBlockChainSnapsDisabled(t *testing.T) {
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -178,7 +174,6 @@ func TestPruningBlockChainUngracefulShutdownSnapsDisabled(t *testing.T) {
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -214,7 +209,6 @@ func TestEnableSnapshots(t *testing.T) {
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: snapLimit,
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -265,11 +259,11 @@ func TestBlockChainOfflinePruningUngracefulShutdown(t *testing.T) {
return blockchain, nil
}

if err := blockchain.CleanBlockRootsAboveLastAccepted(); err != nil {
targetRoot, err := blockchain.CleanBlockRootsAboveLastAccepted()
if err != nil {
return nil, err
}
// get the target root to prune to before stopping the blockchain
targetRoot := blockchain.LastAcceptedBlock().Root()
if targetRoot == types.EmptyRootHash {
return blockchain, nil
}
Expand Down Expand Up @@ -341,9 +335,8 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) {
t.Fatal(err)
}
}
blockchain.DrainAcceptorQueue()

lastAcceptedHash := blockchain.LastConsensusAcceptedBlock().Hash()
lastAcceptedHash := blockchain.LastAcceptedBlock().Hash()
blockchain.Stop()

blockchain, err = createBlockChain(chainDB, pruningConfig, gspec, lastAcceptedHash)
Expand Down Expand Up @@ -372,7 +365,6 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) {
SnapshotLimit: 256,
PopulateMissingTries: &startHeight, // Starting point for re-populating.
PopulateMissingTriesParallelism: parallelism,
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand All @@ -396,7 +388,7 @@ func TestRepopulateMissingTries(t *testing.T) {
}
}

func TestUngracefulAsyncShutdown(t *testing.T) {
func TestUngracefulShutdown(t *testing.T) {
var (
create = func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
blockchain, err := createBlockChain(db, &CacheConfig{
Expand All @@ -408,7 +400,6 @@ func TestUngracefulAsyncShutdown(t *testing.T) {
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 1000, // ensure channel doesn't block
}, gspec, lastAcceptedHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -446,52 +437,28 @@ func TestUngracefulAsyncShutdown(t *testing.T) {
t.Fatal(err)
}

// Insert three blocks into the chain and accept only the first block.
// Insert and accept three blocks into the chain.
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatal(err)
}

foundTxs := []common.Hash{}
missingTxs := []common.Hash{}
for i, block := range chain {
allTxs := []common.Hash{}
for _, block := range chain {
if err := blockchain.Accept(block); err != nil {
t.Fatal(err)
}

if i == 3 {
// At height 3, kill the async accepted block processor to force an
// ungraceful recovery
blockchain.stopAcceptor()
blockchain.acceptorQueue = nil
}

if i <= 3 {
// If <= height 3, all txs should be accessible on lookup
for _, tx := range block.Transactions() {
foundTxs = append(foundTxs, tx.Hash())
}
} else {
// If > 3, all txs should be accessible on lookup
for _, tx := range block.Transactions() {
missingTxs = append(missingTxs, tx.Hash())
}
for _, tx := range block.Transactions() {
allTxs = append(allTxs, tx.Hash())
}
}

// After inserting all blocks, we should confirm that txs added after the
// async worker shutdown cannot be found.
for _, tx := range foundTxs {
// After accepting the blocks, all txs should be queryable.
for _, tx := range allTxs {
txLookup, _, _ := blockchain.GetTransactionLookup(tx)
if txLookup == nil {
t.Fatalf("missing transaction: %v", tx)
}
}
for _, tx := range missingTxs {
txLookup, _, _ := blockchain.GetTransactionLookup(tx)
if txLookup != nil {
t.Fatalf("transaction should be missing: %v", tx)
}
}

// check the state of the last accepted block
checkState := func(sdb *state.StateDB) error {
Expand Down Expand Up @@ -521,15 +488,13 @@ func TestUngracefulAsyncShutdown(t *testing.T) {
}

_, newChain, restartedChain := checkBlockChainState(t, blockchain, gspec, chainDB, create, checkState)

allTxs := append(foundTxs, missingTxs...)
for _, bc := range []*BlockChain{newChain, restartedChain} {
// We should confirm that snapshots were properly initialized
if bc.snaps == nil {
t.Fatal("snapshot initialization failed")
}

// We should confirm all transactions can now be queried
// All transactions should still be queryable after a restart.
for _, tx := range allTxs {
txLookup, _, _ := bc.GetTransactionLookup(tx)
if txLookup == nil {
Expand Down Expand Up @@ -665,7 +630,6 @@ func TestTxLookupBlockChain(t *testing.T) {
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TransactionHistory: 5,
}
createTxLookupBlockChain := func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
Expand All @@ -688,7 +652,6 @@ func TestTxLookupSkipIndexingBlockChain(t *testing.T) {
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TransactionHistory: 5,
SkipTxIndexing: true,
}
Expand Down
Loading
Loading