Skip to content

Commit

Permalink
Fix for slow stage3 (#1377)
Browse files Browse the repository at this point in the history
* Fix for slow stage3

* Cache sequence

* Small optimisation

* mutation sequence lock and sequence(0) use read transaction

* tx

* Fixes

Co-authored-by: Alexey Sharp <[email protected]>
Co-authored-by: alex.sharov <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2020
1 parent ed2b36f commit 8d2db43
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 18 deletions.
7 changes: 3 additions & 4 deletions consensus/clique/clique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package clique

import (
"math/big"
"runtime"
"testing"

"github.com/holiman/uint256"
Expand Down Expand Up @@ -58,7 +57,7 @@ func TestReimportMirroredState(t *testing.T) {
genesis := genspec.MustCommit(db)

// Generate a batch of blocks, each properly signed
txCacher := core.NewTxSenderCacher(runtime.NumCPU())
txCacher := core.NewTxSenderCacher(1)
chain, _ := core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil, txCacher)
defer chain.Stop()

Expand Down Expand Up @@ -96,7 +95,7 @@ func TestReimportMirroredState(t *testing.T) {
db = ethdb.NewMemDatabase()
genspec.MustCommit(db)

txCacher1 := core.NewTxSenderCacher(runtime.NumCPU())
txCacher1 := core.NewTxSenderCacher(1)
chain1, _ := core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil, txCacher1)
defer chain1.Stop()

Expand All @@ -110,7 +109,7 @@ func TestReimportMirroredState(t *testing.T) {
// Simulate a crash by creating a new chain on top of the database, without
// flushing the dirty states out. Insert the last block, triggering a sidechain
// reimport.
txCacher2 := core.NewTxSenderCacher(runtime.NumCPU())
txCacher2 := core.NewTxSenderCacher(1)
chain2, _ := core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil, txCacher2)
defer chain2.Stop()

Expand Down
22 changes: 15 additions & 7 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ func (bc *BlockChain) addFutureBlock(block *types.Block) error {

// InsertBodyChain attempts to insert the given batch of block into the
// canonical chain, without executing those blocks
func (bc *BlockChain) InsertBodyChain(logPrefix string, ctx context.Context, chain types.Blocks) (bool, error) {
func (bc *BlockChain) InsertBodyChain(logPrefix string, ctx context.Context, db ethdb.Database, chain types.Blocks) (bool, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return true, nil
Expand Down Expand Up @@ -1301,7 +1301,7 @@ func (bc *BlockChain) InsertBodyChain(logPrefix string, ctx context.Context, cha
ctx,
&bc.procInterrupt,
chain,
bc.db,
db,
bc.Config(),
bc.NoHistory(),
bc.IsNoHistory,
Expand Down Expand Up @@ -2254,14 +2254,19 @@ func InsertBodies(
return true, nil
}

batch := db.NewBatch()
tx, err := db.Begin(ctx, ethdb.RW)
if err != nil {
return false, err
}
defer tx.Rollback()
batch := tx.NewBatch()
defer batch.Rollback()
stats := InsertStats{StartTime: mclock.Now()}

var parent *types.Block
var parentNumber = chain[0].NumberU64() - 1
parentHash := chain[0].ParentHash()
parent = rawdb.ReadBlock(batch, parentHash, parentNumber)
if parent == nil {

if parent := rawdb.ReadStorageBodyRLP(batch, parentHash, parentNumber); parent == nil {
log.Error("chain segment could not be inserted, missing parent", "hash", parentHash)
return true, fmt.Errorf("chain segment could not be inserted, missing parent %x", parentHash)
}
Expand Down Expand Up @@ -2305,9 +2310,12 @@ func InsertBodies(
}
stats.Processed = len(chain)
stats.Report(logPrefix, chain, len(chain)-1, true)
rawdb.WriteHeadBlockHash(db, chain[len(chain)-1].Hash())
rawdb.WriteHeadBlockHash(batch, chain[len(chain)-1].Hash())
if _, err := batch.Commit(); err != nil {
return true, fmt.Errorf("commit inserting bodies: %w", err)
}
if _, err := tx.Commit(); err != nil {
return true, fmt.Errorf("commit inserting bodies: %w", err)
}
return false, nil
}
4 changes: 2 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type BlockChain interface {
InsertChain(context.Context, types.Blocks) (int, error)

// InsertBodyChain inserts a batch of blocks into the local chain, without executing them.
InsertBodyChain(string, context.Context, types.Blocks) (bool, error)
InsertBodyChain(string, context.Context, ethdb.Database, types.Blocks) (bool, error)

// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
Expand Down Expand Up @@ -1807,7 +1807,7 @@ func (d *Downloader) importBlockResults(logPrefix string, results []*fetchResult
if execute {
index, err = d.blockchain.InsertChain(context.Background(), blocks)
} else {
stopped, err = d.blockchain.InsertBodyChain(logPrefix, context.Background(), blocks)
stopped, err = d.blockchain.InsertBodyChain(logPrefix, context.Background(), d.stateDB, blocks)
if stopped {
index = 0
} else {
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/downloader_stagedsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ func (st *stagedSyncTester) HasHeader(hash common.Hash, number uint64) bool {
}

// InsertBodyChain is part of the implementation of BlockChain interface defined in downloader.go
func (st *stagedSyncTester) InsertBodyChain(_ string, _ context.Context, blocks types.Blocks) (bool, error) {
func (st *stagedSyncTester) InsertBodyChain(_ string, _ context.Context, db ethdb.Database, blocks types.Blocks) (bool, error) {
st.lock.Lock()
defer st.lock.Unlock()
for _, block := range blocks {
if err := rawdb.WriteBlock(context.Background(), st.db, block); err != nil {
if err := rawdb.WriteBlock(context.Background(), db, block); err != nil {
panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq i
return len(headers), nil
}

func (dl *downloadTester) InsertBodyChain(_ string, _ context.Context, blocks types.Blocks) (bool, error) {
func (dl *downloadTester) InsertBodyChain(_ string, _ context.Context, _ ethdb.Database, blocks types.Blocks) (bool, error) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/all_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func InsertBlockInStages(db ethdb.Database, config *params.ChainConfig, engine c
}

// Stage 3
if _, err := bc.InsertBodyChain("logPrefix", context.TODO(), []*types.Block{block}); err != nil {
if _, err := bc.InsertBodyChain("logPrefix", context.TODO(), db, []*types.Block{block}); err != nil {
return err
}

Expand Down
23 changes: 22 additions & 1 deletion ethdb/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ethdb
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/c2h5oh/datasize"
"github.com/google/btree"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/metrics"
)
Expand Down Expand Up @@ -65,7 +68,25 @@ func (m *mutation) getMem(table string, key []byte) ([]byte, bool) {
}

func (m *mutation) Sequence(bucket string, amount uint64) (res uint64, err error) {
return m.db.Sequence(bucket, amount)
v, ok := m.getMem(dbutils.Sequence, []byte(bucket))
if !ok && m.db != nil {
v, err = m.db.Get(dbutils.Sequence, []byte(bucket))
if err != nil && !errors.Is(err, ErrKeyNotFound) {
return 0, err
}
}
var currentV uint64 = 0
if len(v) > 0 {
currentV = binary.BigEndian.Uint64(v)
}

newVBytes := make([]byte, 8)
binary.BigEndian.PutUint64(newVBytes, currentV+amount)
if err = m.Put(dbutils.Sequence, []byte(bucket), newVBytes); err != nil {
return 0, err
}

return currentV, nil
}

// Can only be called from the worker thread
Expand Down
7 changes: 7 additions & 0 deletions ethdb/object_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ func (db *ObjectDatabase) DiskSize(ctx context.Context) (uint64, error) {
}

func (db *ObjectDatabase) Sequence(bucket string, amount uint64) (res uint64, err error) {
if amount == 0 {
err = db.kv.View(context.Background(), func(tx Tx) error {
res, err = tx.Sequence(bucket, amount)
return err
})
return res, err
}
err = db.kv.Update(context.Background(), func(tx Tx) error {
res, err = tx.Sequence(bucket, amount)
return err
Expand Down

0 comments on commit 8d2db43

Please sign in to comment.