Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelise epoch processing subtasks #12191

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (g *GossipManager) Start(ctx context.Context) {
}
return true
})
log.Debug("[Gossip] Subscriptions", "subscriptions", logArgs)
log.Trace("[Gossip] Subscriptions", "subscriptions", logArgs)
}
}
}()
Expand Down
1 change: 0 additions & 1 deletion cl/transition/impl/eth2/statechange/process_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func ProcessEpoch(s abstract.BeaconState) error {
}
monitor.ObserveProcessJustificationBitsAndFinalityTime(start)
// fmt.Println("ProcessJustificationBitsAndFinality", time.Since(start))
// start = time.Now()

if s.Version() >= clparams.AltairVersion {
start = time.Now()
Expand Down
14 changes: 11 additions & 3 deletions cl/transition/impl/eth2/statechange/process_inactivity_scores.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package statechange

import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/phase1/core/state"
)
Expand All @@ -27,12 +29,18 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies
return nil
}

for _, validatorIndex := range eligibleValidatorsIndicies {
return ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidatorsIndicies), func(i int) error {
validatorIndex := eligibleValidatorsIndicies[i]

// retrieve validator inactivity score index.
score, err := s.ValidatorInactivityScore(int(validatorIndex))
if err != nil {
return err
}
if score == 0 && unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] {
return nil
}

if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] {
score -= min(1, score)
} else {
Expand All @@ -44,6 +52,6 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies
if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil {
return err
}
}
return nil
return nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package statechange

import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes/solid"
Expand Down Expand Up @@ -47,13 +49,13 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida
rewardMultipliers[i] = weights[i] * (flagsTotalBalances[i] / beaconConfig.EffectiveBalanceIncrement)
}
rewardDenominator := (totalActiveBalance / beaconConfig.EffectiveBalanceIncrement) * beaconConfig.WeightDenominator
var baseReward uint64
inactivityLeaking := state.InactivityLeaking(s)
// Now process deltas and whats nots.
for _, index := range eligibleValidators {
baseReward, err = s.BaseReward(index)

return ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidators), func(i int) error {
index := eligibleValidators[i]
baseReward, err := s.BaseReward(index)
if err != nil {
return
return err
}
delta := int64(0)
for flagIdx := range weights {
Expand Down Expand Up @@ -84,8 +86,8 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida
} else if err := state.DecreaseBalance(s, index, uint64(-delta)); err != nil {
return err
}
}
return
return nil
})
}

// processRewardsAndPenaltiesPhase0 process rewards and penalties for phase0 state.
Expand Down
18 changes: 6 additions & 12 deletions cl/transition/impl/eth2/statechange/process_slashings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package statechange

import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/phase1/core/state"
)

Expand All @@ -37,10 +38,10 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error {
}
beaconConfig := s.BeaconConfig()
// Apply penalties to validators who have been slashed and reached the withdrawable epoch
var err error
s.ForEachValidator(func(validator solid.Validator, i, total int) bool {
return ParallellForLoop(runtime.NumCPU(), 0, s.ValidatorSet().Length(), func(i int) error {
validator := s.ValidatorSet().Get(i)
if !validator.Slashed() || epoch+beaconConfig.EpochsPerSlashingsVector/2 != validator.WithdrawableEpoch() {
return true
return nil
}
// Get the effective balance increment
increment := beaconConfig.EffectiveBalanceIncrement
Expand All @@ -49,15 +50,8 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error {
// Calculate the penalty by dividing the penalty numerator by the total balance and multiplying by the increment
penalty := penaltyNumerator / totalBalance * increment
// Decrease the validator's balance by the calculated penalty
if err = state.DecreaseBalance(s, uint64(i), penalty); err != nil {
return false
}
return true
return state.DecreaseBalance(s, uint64(i), penalty)
})
if err != nil {
return err
}
return nil
}

func ProcessSlashings(state abstract.BeaconState) error {
Expand Down
95 changes: 95 additions & 0 deletions cl/transition/impl/eth2/statechange/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 The Erigon Authors
// This file is part of Erigon.
//
// Erigon is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Erigon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package statechange

import (
"sync"
"sync/atomic"
"unsafe"
)

type WorkerPool struct {
work chan func() error
wg sync.WaitGroup
atomicErr unsafe.Pointer
}

// CreateWorkerPool initializes a pool of workers to process tasks.
func CreateWorkerPool(numWorkers int) *WorkerPool {
wp := WorkerPool{
work: make(chan func() error, 1000),
}
for i := 1; i <= numWorkers; i++ {
go wp.StartWorker()
}
return &wp
}

// close work channel and finish
func (wp *WorkerPool) WaitAndClose() {
// Wait for all workers to finish.
wp.wg.Wait()
// Close the task channel to indicate no more tasks will be sent.
close(wp.work)
}

// Worker is the worker that processes tasks.
func (wp *WorkerPool) StartWorker() {
for task := range wp.work {
if err := task(); err != nil {
atomic.StorePointer(&wp.atomicErr, unsafe.Pointer(&err))
}
wp.wg.Done()
}
}

func (wp *WorkerPool) Error() error {
errPointer := atomic.LoadPointer(&wp.atomicErr)
if errPointer == nil {
return nil
}
return *(*error)(errPointer)
}

// enqueue work
func (wp *WorkerPool) AddWork(f func() error) {
wp.wg.Add(1)
wp.work <- f
}

func ParallellForLoop(numWorkers int, from, to int, f func(int) error) error {
// divide the work into numWorkers parts
size := (to - from) / numWorkers
wp := CreateWorkerPool(numWorkers)
for i := 0; i < numWorkers; i++ {
start := from + i*size
end := start + size
if i == numWorkers-1 {
end = to
}
wp.AddWork(func() error {
for j := start; j < end; j++ {
if err := f(j); err != nil {
return err
}
}
return nil
})
}
wp.WaitAndClose()
return wp.Error()
}
Loading