Skip to content

Commit

Permalink
parallelise epoch processing subtasks
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal committed Oct 4, 2024
1 parent b013526 commit 2a560ab
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (g *GossipManager) Start(ctx context.Context) {
}
return true
})
log.Debug("[Gossip] Subscriptions", "subscriptions", logArgs)
log.Trace("[Gossip] Subscriptions", "subscriptions", logArgs)
}
}
}()
Expand Down
42 changes: 25 additions & 17 deletions cl/transition/impl/eth2/statechange/process_inactivity_scores.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package statechange

import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/phase1/core/state"
)
Expand All @@ -27,23 +29,29 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies
return nil
}

wp := CreateWorkerPool(runtime.NumCPU())
for _, validatorIndex := range eligibleValidatorsIndicies {
// retrieve validator inactivity score index.
score, err := s.ValidatorInactivityScore(int(validatorIndex))
if err != nil {
return err
}
if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] {
score -= min(1, score)
} else {
score += s.BeaconConfig().InactivityScoreBias
}
if !state.InactivityLeaking(s) {
score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score)
}
if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil {
return err
}
wp.AddWork(func() error {
// retrieve validator inactivity score index.
score, err := s.ValidatorInactivityScore(int(validatorIndex))
if err != nil {
return err
}
if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] {
score -= min(1, score)
} else {
score += s.BeaconConfig().InactivityScoreBias
}
if !state.InactivityLeaking(s) {
score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score)
}
if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil {
return err
}
return nil
})
}
return nil

wp.WaitAndClose()
return wp.Error()
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida
for _, index := range eligibleValidators {
baseReward, err = s.BaseReward(index)
if err != nil {
return
return err
}
delta := int64(0)
for flagIdx := range weights {
Expand Down
72 changes: 72 additions & 0 deletions cl/transition/impl/eth2/statechange/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 The Erigon Authors
// This file is part of Erigon.
//
// Erigon is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Erigon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package statechange

import (
"sync"
"sync/atomic"
"unsafe"
)

type WorkerPool struct {
work chan func() error
wg sync.WaitGroup
atomicErr unsafe.Pointer
}

// CreateWorkerPool initializes a pool of workers to process tasks.
func CreateWorkerPool(numWorkers int) *WorkerPool {
wp := WorkerPool{
work: make(chan func() error, 1000),
}
for i := 1; i <= numWorkers; i++ {
go wp.StartWorker()
}
return &wp
}

// close work channel and finish
func (wp *WorkerPool) WaitAndClose() {
// Wait for all workers to finish.
wp.wg.Wait()
// Close the task channel to indicate no more tasks will be sent.
close(wp.work)
}

// Worker is the worker that processes tasks.
func (wp *WorkerPool) StartWorker() {
for task := range wp.work {
if err := task(); err != nil {
atomic.StorePointer(&wp.atomicErr, unsafe.Pointer(&err))
}
wp.wg.Done()
}
}

func (wp *WorkerPool) Error() error {
errPointer := atomic.LoadPointer(&wp.atomicErr)
if errPointer == nil {
return nil
}
return *(*error)(errPointer)
}

// enqueue work
func (wp *WorkerPool) AddWork(f func() error) {
wp.wg.Add(1)
wp.work <- f
}

0 comments on commit 2a560ab

Please sign in to comment.