From cbbfa32624e07e1bdf0d00112fca37ea8ca5f9fd Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Fri, 22 Nov 2024 20:36:28 +0000 Subject: [PATCH] fix: avoid calling `BoundedWorkers.Execute()` after `Wait()` --- core/state/statedb.go | 4 +- core/state/trie_prefetcher.go | 7 +-- core/state/trie_prefetcher.libevm.go | 72 +++++++++++++++++++--------- 3 files changed, 56 insertions(+), 27 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 02fa50a134..b4c2da9566 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -205,7 +205,9 @@ type workerPool struct { *utils.BoundedWorkers } -func (wp *workerPool) Wait() { +func (wp *workerPool) Done() { + // Done is guaranteed to only be called after all work is already complete, + // so Wait()ing is redundant, but it also releases resources. wp.BoundedWorkers.Wait() } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 01e39f67bb..e71a67edfb 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -79,6 +79,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ... // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { + p.abortFetchersAndReleaseWorkerPools() for _, fetcher := range p.fetchers { fetcher.abort() // safe to do multiple times @@ -305,7 +306,7 @@ func (sf *subfetcher) abort() { func (sf *subfetcher) loop() { // No matter how the loop stops, signal anyone waiting that it's terminated defer func() { - sf.wait() + sf.pool.wait() close(sf.term) }() @@ -350,9 +351,9 @@ func (sf *subfetcher) loop() { sf.dups++ } else { if len(task) == common.AddressLength { - sf.GetAccount(common.BytesToAddress(task)) + sf.pool.GetAccount(common.BytesToAddress(task)) } else { - sf.GetStorage(sf.addr, task) + sf.pool.GetStorage(sf.addr, task) } sf.seen[string(task)] = struct{}{} } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 912374fc6e..6ec762e524 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -31,14 +31,16 @@ type prefetcherConfig struct { newWorkers func() WorkerPool } -// A WorkerPool is responsible for executing functions, possibly asynchronously. +// A WorkerPool executes functions asynchronously. Done() is called to signal +// that the pool is no longer needed and that Execute() is guaranteed to not be +// called again. type WorkerPool interface { Execute(func()) - Wait() + Done() } // WithWorkerPools configures trie prefetching to execute asynchronously. The -// provided constructor is called once for each trie being fetched and it MAY +// provided constructor is called once for each trie being fetched but it MAY // return the same pool. func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { return options.Func[prefetcherConfig](func(c *prefetcherConfig) { @@ -49,6 +51,7 @@ func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { type subfetcherPool struct { workers WorkerPool tries sync.Pool + wg sync.WaitGroup } // applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided @@ -68,43 +71,66 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) { } } -func (sf *subfetcher) wait() { - if w := sf.pool.workers; w != nil { - w.Wait() +func (p *triePrefetcher) abortFetchersAndReleaseWorkerPools() { + // Calling abort() sequentially may result in later fetchers accepting new + // work in the interim. + var wg sync.WaitGroup + for _, f := range p.fetchers { + wg.Add(1) + go func(f *subfetcher) { + f.abort() + wg.Done() + }(f) + } + + // A WorkerPool is allowed to be shared between fetchers so we MUST wait for + // them to finish all tasks otherwise they could call Execute() after + // Done(), which we guarantee in the public API to be impossible. + wg.Wait() + for _, f := range p.fetchers { + if w := f.pool.workers; w != nil { + w.Done() + } } } +func (p *subfetcherPool) wait() { + p.wg.Wait() +} + // execute runs the provided function with a copy of the subfetcher's Trie. -// Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was +// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was // configured with a [WorkerPool] then it is used for function execution, // otherwise `fn` is just called directly. -func (sf *subfetcher) execute(fn func(Trie)) { - if w := sf.pool.workers; w != nil { - w.Execute(func() { - trie := sf.pool.tries.Get().(Trie) - fn(trie) - sf.pool.tries.Put(trie) - }) +func (p *subfetcherPool) execute(fn func(Trie)) { + p.wg.Add(1) + do := func() { + t := p.tries.Get().(Trie) + fn(t) + p.tries.Put(t) + p.wg.Done() + } + + if w := p.workers; w != nil { + w.Execute(do) } else { - trie := sf.pool.tries.Get().(Trie) - fn(trie) - sf.pool.tries.Put(trie) + do() } } // GetAccount optimistically pre-fetches an account, dropping the returned value -// and logging errors. See [subfetcher.execute] re worker pools. -func (sf *subfetcher) GetAccount(addr common.Address) { - sf.execute(func(t Trie) { +// and logging errors. See [subfetcherPool.execute] re worker pools. +func (p *subfetcherPool) GetAccount(addr common.Address) { + p.execute(func(t Trie) { if _, err := t.GetAccount(addr); err != nil { log.Error("account prefetching failed", "address", addr, "err", err) } }) } -// GetStorage is the storage equivalent of [subfetcher.GetAccount]. -func (sf *subfetcher) GetStorage(addr common.Address, key []byte) { - sf.execute(func(t Trie) { +// GetStorage is the storage equivalent of [subfetcherPool.GetAccount]. +func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) { + p.execute(func(t Trie) { if _, err := t.GetStorage(addr, key); err != nil { log.Error("storage prefetching failed", "address", addr, "key", key, "err", err) }