Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: block on StopPrefetchers() instead of cleanup #1396

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,9 +1367,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
opt, cleanup := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism)
defer cleanup()

opt := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism)
statedb.StartPrefetcher("chain", opt)
defer statedb.StopPrefetcher()

Expand Down Expand Up @@ -1729,9 +1727,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block)
}

// Enable prefetching to pull in trie node paths while processing transactions
opt, cleanup := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism)
defer cleanup()

opt := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism)
statedb.StartPrefetcher("chain", opt)
defer statedb.StopPrefetcher()

Expand Down
29 changes: 9 additions & 20 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ package state
import (
"fmt"
"sort"
"sync"
"time"

"github.com/ava-labs/subnet-evm/core/rawdb"
Expand Down Expand Up @@ -204,29 +203,19 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St

type workerPool struct {
*utils.BoundedWorkers
wg sync.WaitGroup
}

func (wp *workerPool) Wait() {
wp.wg.Wait()
func (wp *workerPool) Done() {
// Done is guaranteed to only be called after all work is already complete,
// so Wait()ing is redundant, but it also releases resources.
wp.BoundedWorkers.Wait()
}

func (wp *workerPool) Execute(f func()) {
wp.wg.Add(1)
wp.BoundedWorkers.Execute(func() {
f()
wp.wg.Done()
})
}

func WithConcurrentWorkers(prefetchers int) (PrefetcherOption, func()) {
pool := utils.NewBoundedWorkers(prefetchers)
cleanup := func() { _ = pool.Wait() }
return WithWorkerPools(func() WorkerPool {
return &workerPool{
BoundedWorkers: pool,
}
}), cleanup
func WithConcurrentWorkers(prefetchers int) PrefetcherOption {
pool := &workerPool{
BoundedWorkers: utils.NewBoundedWorkers(prefetchers),
}
return WithWorkerPools(func() WorkerPool { return pool })
}

// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
Expand Down
11 changes: 7 additions & 4 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (p *triePrefetcher) close() {
}
}
}
p.releaseWorkerPools()
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
Expand Down Expand Up @@ -303,9 +304,11 @@ func (sf *subfetcher) abort() {
// loop waits for new tasks to be scheduled and keeps loading them until it runs
// out of tasks or its underlying trie is retrieved for committing.
func (sf *subfetcher) loop() {
defer sf.wait()
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)
defer func() {
sf.pool.wait()
close(sf.term)
ARR4N marked this conversation as resolved.
Show resolved Hide resolved
}()

// Start by opening the trie and stop processing if it fails
if sf.owner == (common.Hash{}) {
Expand Down Expand Up @@ -348,9 +351,9 @@ func (sf *subfetcher) loop() {
sf.dups++
} else {
if len(task) == common.AddressLength {
sf.GetAccount(common.BytesToAddress(task))
sf.pool.GetAccount(common.BytesToAddress(task))
} else {
sf.GetStorage(sf.addr, task)
sf.pool.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
Expand Down
70 changes: 42 additions & 28 deletions core/state/trie_prefetcher.libevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package state

import (
"sync"

"github.com/ava-labs/subnet-evm/libevm/options"
"github.com/ava-labs/subnet-evm/libevm/sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
Expand All @@ -31,14 +30,16 @@ type prefetcherConfig struct {
newWorkers func() WorkerPool
}

// A WorkerPool is responsible for executing functions, possibly asynchronously.
// A WorkerPool executes functions asynchronously. Done() is called to signal
// that the pool is no longer needed and that Execute() is guaranteed to not be
// called again.
type WorkerPool interface {
Execute(func())
Wait()
Done()
}

// WithWorkerPools configures trie prefetching to execute asynchronously. The
// provided constructor is called once for each trie being fetched and it MAY
// provided constructor is called once for each trie being fetched but it MAY
// return the same pool.
func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
return options.Func[prefetcherConfig](func(c *prefetcherConfig) {
Expand All @@ -48,17 +49,18 @@ func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {

type subfetcherPool struct {
workers WorkerPool
tries sync.Pool
tries sync.Pool[Trie]
wg sync.WaitGroup
}

// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided
// with a [PrefetcherOption].
func (c *prefetcherConfig) applyTo(sf *subfetcher) {
sf.pool = &subfetcherPool{
tries: sync.Pool{
tries: sync.Pool[Trie]{
// Although the workers may be shared between all subfetchers, each
// MUST have its own Trie pool.
New: func() any {
New: func() Trie {
return sf.db.CopyTrie(sf.trie)
},
},
Expand All @@ -68,43 +70,55 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) {
}
}

func (sf *subfetcher) wait() {
if w := sf.pool.workers; w != nil {
w.Wait()
// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be
// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed
// to be shared between them. This is because we guarantee in the public API
// that no further calls will be made to Execute() after a call to Done().
func (p *triePrefetcher) releaseWorkerPools() {
for _, f := range p.fetchers {
if w := f.pool.workers; w != nil {
w.Done()
}
}
}

func (p *subfetcherPool) wait() {
p.wg.Wait()
}

// execute runs the provided function with a copy of the subfetcher's Trie.
// Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was
// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was
// configured with a [WorkerPool] then it is used for function execution,
// otherwise `fn` is just called directly.
func (sf *subfetcher) execute(fn func(Trie)) {
if w := sf.pool.workers; w != nil {
w.Execute(func() {
trie := sf.pool.tries.Get().(Trie)
fn(trie)
sf.pool.tries.Put(trie)
})
func (p *subfetcherPool) execute(fn func(Trie)) {
p.wg.Add(1)
do := func() {
t := p.tries.Get()
fn(t)
p.tries.Put(t)
p.wg.Done()
}

if w := p.workers; w != nil {
w.Execute(do)
} else {
trie := sf.pool.tries.Get().(Trie)
fn(trie)
sf.pool.tries.Put(trie)
do()
}
}

// GetAccount optimistically pre-fetches an account, dropping the returned value
// and logging errors. See [subfetcher.execute] re worker pools.
func (sf *subfetcher) GetAccount(addr common.Address) {
sf.execute(func(t Trie) {
// and logging errors. See [subfetcherPool.execute] re worker pools.
func (p *subfetcherPool) GetAccount(addr common.Address) {
p.execute(func(t Trie) {
if _, err := t.GetAccount(addr); err != nil {
log.Error("account prefetching failed", "address", addr, "err", err)
}
})
}

// GetStorage is the storage equivalent of [subfetcher.GetAccount].
func (sf *subfetcher) GetStorage(addr common.Address, key []byte) {
sf.execute(func(t Trie) {
// GetStorage is the storage equivalent of [subfetcherPool.GetAccount].
func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) {
p.execute(func(t Trie) {
if _, err := t.GetStorage(addr, key); err != nil {
log.Error("storage prefetching failed", "address", addr, "key", key, "err", err)
}
Expand Down
4 changes: 1 addition & 3 deletions core/state/trie_prefetcher_extra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ func addKVs(
return nil, common.Hash{}, err
}
if prefetchers > 0 {
opt, cleanup := WithConcurrentWorkers(prefetchers)
defer cleanup()

opt := WithConcurrentWorkers(prefetchers)
statedb.StartPrefetcher(namespace, opt)
defer statedb.StopPrefetcher()
}
Expand Down
9 changes: 3 additions & 6 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func filledStateDB() *StateDB {

func TestCopyAndClose(t *testing.T) {
db := filledStateDB()
opt, cleanup := WithConcurrentWorkers(maxConcurrency)
defer cleanup()
opt := WithConcurrentWorkers(maxConcurrency)
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt)
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
Expand All @@ -86,8 +85,7 @@ func TestCopyAndClose(t *testing.T) {

func TestUseAfterClose(t *testing.T) {
db := filledStateDB()
opt, cleanup := WithConcurrentWorkers(maxConcurrency)
defer cleanup()
opt := WithConcurrentWorkers(maxConcurrency)
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt)
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
Expand All @@ -104,8 +102,7 @@ func TestUseAfterClose(t *testing.T) {

func TestCopyClose(t *testing.T) {
db := filledStateDB()
opt, cleanup := WithConcurrentWorkers(maxConcurrency)
defer cleanup()
opt := WithConcurrentWorkers(maxConcurrency)
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt)
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
Expand Down
52 changes: 52 additions & 0 deletions libevm/sync/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 the libevm authors.
//
// The libevm additions to go-ethereum are free software: you can redistribute
// them and/or modify them under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The libevm additions are distributed in the hope that they will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see
// <http://www.gnu.org/licenses/>.

// Package sync extends the standard library's sync package.
package sync

import "sync"

// Aliases of stdlib sync's types to avoid having to import it alongside this
// package.
type (
Cond = sync.Cond
Locker = sync.Locker
Map = sync.Map
Mutex = sync.Mutex
Once = sync.Once
RWMutex = sync.RWMutex
WaitGroup = sync.WaitGroup
)

// A Pool is a type-safe wrapper around [sync.Pool].
type Pool[T any] struct {
New func() T
pool sync.Pool
once Once
}

// Get is equivalent to [sync.Pool.Get].
func (p *Pool[T]) Get() T {
p.once.Do(func() { // Do() guarantees at least once, not just only once
p.pool.New = func() any { return p.New() }
})
return p.pool.Get().(T) //nolint:forcetypeassert
}

// Put is equivalent to [sync.Pool.Put].
func (p *Pool[T]) Put(t T) {
p.pool.Put(t)
}
9 changes: 2 additions & 7 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ type environment struct {
// way that the gas pool and state is reset.
predicateResults *predicate.Results

start time.Time // Time that block building began
cleanup func() // Cleanup function to be called when the environment is no longer needed
start time.Time // Time that block building began
}

// worker is the main object which takes care of submitting new work to consensus engine
Expand Down Expand Up @@ -226,9 +225,6 @@ func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateConte
return
}
env.state.StopPrefetcher()
if env.cleanup != nil {
env.cleanup()
}
}()
// Configure any upgrades that should go into effect during this block.
err = core.ApplyUpgrades(w.chainConfig, &parent.Time, types.NewBlockWithHeader(header), env.state)
Expand Down Expand Up @@ -288,7 +284,7 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre
if err != nil {
return nil, err
}
opt, cleanup := state.WithConcurrentWorkers(w.chain.CacheConfig().TriePrefetcherParallelism)
opt := state.WithConcurrentWorkers(w.chain.CacheConfig().TriePrefetcherParallelism)
currentState.StartPrefetcher("miner", opt)
return &environment{
signer: types.MakeSigner(w.chainConfig, header.Number, header.Time),
Expand All @@ -301,7 +297,6 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre
predicateContext: predicateContext,
predicateResults: predicate.NewResults(),
start: tstart,
cleanup: cleanup,
}, nil
}

Expand Down
Loading