Skip to content

Commit

Permalink
fix: avoid calling BoundedWorkers.Execute() after Wait()
Browse files Browse the repository at this point in the history
  • Loading branch information
ARR4N committed Nov 22, 2024
1 parent 8855cf5 commit cbbfa32
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 27 deletions.
4 changes: 3 additions & 1 deletion core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ type workerPool struct {
*utils.BoundedWorkers
}

func (wp *workerPool) Wait() {
func (wp *workerPool) Done() {
// Done is guaranteed to only be called after all work is already complete,
// so Wait()ing is redundant, but it also releases resources.
wp.BoundedWorkers.Wait()
}

Expand Down
7 changes: 4 additions & 3 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...
// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
p.abortFetchersAndReleaseWorkerPools()
for _, fetcher := range p.fetchers {
fetcher.abort() // safe to do multiple times

Expand Down Expand Up @@ -305,7 +306,7 @@ func (sf *subfetcher) abort() {
func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer func() {
sf.wait()
sf.pool.wait()
close(sf.term)
}()

Expand Down Expand Up @@ -350,9 +351,9 @@ func (sf *subfetcher) loop() {
sf.dups++
} else {
if len(task) == common.AddressLength {
sf.GetAccount(common.BytesToAddress(task))
sf.pool.GetAccount(common.BytesToAddress(task))
} else {
sf.GetStorage(sf.addr, task)
sf.pool.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
Expand Down
72 changes: 49 additions & 23 deletions core/state/trie_prefetcher.libevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type prefetcherConfig struct {
newWorkers func() WorkerPool
}

// A WorkerPool is responsible for executing functions, possibly asynchronously.
// A WorkerPool executes functions asynchronously. Done() is called to signal
// that the pool is no longer needed and that Execute() is guaranteed to not be
// called again.
type WorkerPool interface {
Execute(func())
Wait()
Done()
}

// WithWorkerPools configures trie prefetching to execute asynchronously. The
// provided constructor is called once for each trie being fetched and it MAY
// provided constructor is called once for each trie being fetched but it MAY
// return the same pool.
func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
return options.Func[prefetcherConfig](func(c *prefetcherConfig) {
Expand All @@ -49,6 +51,7 @@ func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
type subfetcherPool struct {
workers WorkerPool
tries sync.Pool
wg sync.WaitGroup
}

// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided
Expand All @@ -68,43 +71,66 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) {
}
}

func (sf *subfetcher) wait() {
if w := sf.pool.workers; w != nil {
w.Wait()
func (p *triePrefetcher) abortFetchersAndReleaseWorkerPools() {
// Calling abort() sequentially may result in later fetchers accepting new
// work in the interim.
var wg sync.WaitGroup
for _, f := range p.fetchers {
wg.Add(1)
go func(f *subfetcher) {
f.abort()
wg.Done()
}(f)
}

// A WorkerPool is allowed to be shared between fetchers so we MUST wait for
// them to finish all tasks otherwise they could call Execute() after
// Done(), which we guarantee in the public API to be impossible.
wg.Wait()
for _, f := range p.fetchers {
if w := f.pool.workers; w != nil {
w.Done()
}
}
}

func (p *subfetcherPool) wait() {
p.wg.Wait()
}

// execute runs the provided function with a copy of the subfetcher's Trie.
// Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was
// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was
// configured with a [WorkerPool] then it is used for function execution,
// otherwise `fn` is just called directly.
func (sf *subfetcher) execute(fn func(Trie)) {
if w := sf.pool.workers; w != nil {
w.Execute(func() {
trie := sf.pool.tries.Get().(Trie)
fn(trie)
sf.pool.tries.Put(trie)
})
func (p *subfetcherPool) execute(fn func(Trie)) {
p.wg.Add(1)
do := func() {
t := p.tries.Get().(Trie)
fn(t)
p.tries.Put(t)
p.wg.Done()
}

if w := p.workers; w != nil {
w.Execute(do)
} else {
trie := sf.pool.tries.Get().(Trie)
fn(trie)
sf.pool.tries.Put(trie)
do()
}
}

// GetAccount optimistically pre-fetches an account, dropping the returned value
// and logging errors. See [subfetcher.execute] re worker pools.
func (sf *subfetcher) GetAccount(addr common.Address) {
sf.execute(func(t Trie) {
// and logging errors. See [subfetcherPool.execute] re worker pools.
func (p *subfetcherPool) GetAccount(addr common.Address) {
p.execute(func(t Trie) {
if _, err := t.GetAccount(addr); err != nil {
log.Error("account prefetching failed", "address", addr, "err", err)
}
})
}

// GetStorage is the storage equivalent of [subfetcher.GetAccount].
func (sf *subfetcher) GetStorage(addr common.Address, key []byte) {
sf.execute(func(t Trie) {
// GetStorage is the storage equivalent of [subfetcherPool.GetAccount].
func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) {
p.execute(func(t Trie) {
if _, err := t.GetStorage(addr, key); err != nil {
log.Error("storage prefetching failed", "address", addr, "key", key, "err", err)
}
Expand Down

0 comments on commit cbbfa32

Please sign in to comment.