From 9bc2ed7ecd256b143eb3183952fa061e4541087f Mon Sep 17 00:00:00 2001 From: "shota.silagadze" Date: Thu, 3 Oct 2024 15:51:31 +0400 Subject: [PATCH 1/2] parallelise epoch processing subtasks --- cl/sentinel/gossip.go | 2 +- .../statechange/process_inactivity_scores.go | 42 ++++++----- .../impl/eth2/statechange/worker_pool.go | 72 +++++++++++++++++++ 3 files changed, 98 insertions(+), 18 deletions(-) create mode 100644 cl/transition/impl/eth2/statechange/worker_pool.go diff --git a/cl/sentinel/gossip.go b/cl/sentinel/gossip.go index 99028e91e8b..d35b145fb3d 100644 --- a/cl/sentinel/gossip.go +++ b/cl/sentinel/gossip.go @@ -518,7 +518,7 @@ func (g *GossipManager) Start(ctx context.Context) { } return true }) - log.Debug("[Gossip] Subscriptions", "subscriptions", logArgs) + log.Trace("[Gossip] Subscriptions", "subscriptions", logArgs) } } }() diff --git a/cl/transition/impl/eth2/statechange/process_inactivity_scores.go b/cl/transition/impl/eth2/statechange/process_inactivity_scores.go index b51d265dfa5..b8369117207 100644 --- a/cl/transition/impl/eth2/statechange/process_inactivity_scores.go +++ b/cl/transition/impl/eth2/statechange/process_inactivity_scores.go @@ -17,6 +17,8 @@ package statechange import ( + "runtime" + "github.com/erigontech/erigon/cl/abstract" "github.com/erigontech/erigon/cl/phase1/core/state" ) @@ -27,23 +29,29 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies return nil } + wp := CreateWorkerPool(runtime.NumCPU()) for _, validatorIndex := range eligibleValidatorsIndicies { - // retrieve validator inactivity score index. - score, err := s.ValidatorInactivityScore(int(validatorIndex)) - if err != nil { - return err - } - if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { - score -= min(1, score) - } else { - score += s.BeaconConfig().InactivityScoreBias - } - if !state.InactivityLeaking(s) { - score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score) - } - if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil { - return err - } + wp.AddWork(func() error { + // retrieve validator inactivity score index. + score, err := s.ValidatorInactivityScore(int(validatorIndex)) + if err != nil { + return err + } + if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { + score -= min(1, score) + } else { + score += s.BeaconConfig().InactivityScoreBias + } + if !state.InactivityLeaking(s) { + score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score) + } + if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil { + return err + } + return nil + }) } - return nil + + wp.WaitAndClose() + return wp.Error() } diff --git a/cl/transition/impl/eth2/statechange/worker_pool.go b/cl/transition/impl/eth2/statechange/worker_pool.go new file mode 100644 index 00000000000..25d98300c23 --- /dev/null +++ b/cl/transition/impl/eth2/statechange/worker_pool.go @@ -0,0 +1,72 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package statechange + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +type WorkerPool struct { + work chan func() error + wg sync.WaitGroup + atomicErr unsafe.Pointer +} + +// CreateWorkerPool initializes a pool of workers to process tasks. +func CreateWorkerPool(numWorkers int) *WorkerPool { + wp := WorkerPool{ + work: make(chan func() error, 1000), + } + for i := 1; i <= numWorkers; i++ { + go wp.StartWorker() + } + return &wp +} + +// close work channel and finish +func (wp *WorkerPool) WaitAndClose() { + // Wait for all workers to finish. + wp.wg.Wait() + // Close the task channel to indicate no more tasks will be sent. + close(wp.work) +} + +// Worker is the worker that processes tasks. +func (wp *WorkerPool) StartWorker() { + for task := range wp.work { + if err := task(); err != nil { + atomic.StorePointer(&wp.atomicErr, unsafe.Pointer(&err)) + } + wp.wg.Done() + } +} + +func (wp *WorkerPool) Error() error { + errPointer := atomic.LoadPointer(&wp.atomicErr) + if errPointer == nil { + return nil + } + return *(*error)(errPointer) +} + +// enqueue work +func (wp *WorkerPool) AddWork(f func() error) { + wp.wg.Add(1) + wp.work <- f +} From e75874596f1fdfcb7071dc0586189028fb0fd1a3 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 4 Oct 2024 19:14:31 +0200 Subject: [PATCH 2/2] save --- .../impl/eth2/statechange/process_epoch.go | 1 - .../statechange/process_inactivity_scores.go | 46 +++++++++---------- .../process_rewards_and_penalties.go | 16 ++++--- .../eth2/statechange/process_slashings.go | 18 +++----- .../impl/eth2/statechange/worker_pool.go | 23 ++++++++++ 5 files changed, 61 insertions(+), 43 deletions(-) diff --git a/cl/transition/impl/eth2/statechange/process_epoch.go b/cl/transition/impl/eth2/statechange/process_epoch.go index 72f722f9641..a9f94073d64 100644 --- a/cl/transition/impl/eth2/statechange/process_epoch.go +++ b/cl/transition/impl/eth2/statechange/process_epoch.go @@ -55,7 +55,6 @@ func ProcessEpoch(s abstract.BeaconState) error { } monitor.ObserveProcessJustificationBitsAndFinalityTime(start) // fmt.Println("ProcessJustificationBitsAndFinality", time.Since(start)) - // start = time.Now() if s.Version() >= clparams.AltairVersion { start = time.Now() diff --git a/cl/transition/impl/eth2/statechange/process_inactivity_scores.go b/cl/transition/impl/eth2/statechange/process_inactivity_scores.go index b8369117207..25400f91c19 100644 --- a/cl/transition/impl/eth2/statechange/process_inactivity_scores.go +++ b/cl/transition/impl/eth2/statechange/process_inactivity_scores.go @@ -29,29 +29,29 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies return nil } - wp := CreateWorkerPool(runtime.NumCPU()) - for _, validatorIndex := range eligibleValidatorsIndicies { - wp.AddWork(func() error { - // retrieve validator inactivity score index. - score, err := s.ValidatorInactivityScore(int(validatorIndex)) - if err != nil { - return err - } - if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { - score -= min(1, score) - } else { - score += s.BeaconConfig().InactivityScoreBias - } - if !state.InactivityLeaking(s) { - score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score) - } - if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil { - return err - } + return ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidatorsIndicies), func(i int) error { + validatorIndex := eligibleValidatorsIndicies[i] + + // retrieve validator inactivity score index. + score, err := s.ValidatorInactivityScore(int(validatorIndex)) + if err != nil { + return err + } + if score == 0 && unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { return nil - }) - } + } - wp.WaitAndClose() - return wp.Error() + if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { + score -= min(1, score) + } else { + score += s.BeaconConfig().InactivityScoreBias + } + if !state.InactivityLeaking(s) { + score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score) + } + if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil { + return err + } + return nil + }) } diff --git a/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go b/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go index 5cd1adb2c52..fb4afba5ca0 100644 --- a/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go +++ b/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go @@ -17,6 +17,8 @@ package statechange import ( + "runtime" + "github.com/erigontech/erigon/cl/abstract" "github.com/erigontech/erigon/cl/clparams" "github.com/erigontech/erigon/cl/cltypes/solid" @@ -47,13 +49,13 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida rewardMultipliers[i] = weights[i] * (flagsTotalBalances[i] / beaconConfig.EffectiveBalanceIncrement) } rewardDenominator := (totalActiveBalance / beaconConfig.EffectiveBalanceIncrement) * beaconConfig.WeightDenominator - var baseReward uint64 inactivityLeaking := state.InactivityLeaking(s) - // Now process deltas and whats nots. - for _, index := range eligibleValidators { - baseReward, err = s.BaseReward(index) + + return ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidators), func(i int) error { + index := eligibleValidators[i] + baseReward, err := s.BaseReward(index) if err != nil { - return + return err } delta := int64(0) for flagIdx := range weights { @@ -84,8 +86,8 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida } else if err := state.DecreaseBalance(s, index, uint64(-delta)); err != nil { return err } - } - return + return nil + }) } // processRewardsAndPenaltiesPhase0 process rewards and penalties for phase0 state. diff --git a/cl/transition/impl/eth2/statechange/process_slashings.go b/cl/transition/impl/eth2/statechange/process_slashings.go index 4fb4e9437a1..94347528892 100644 --- a/cl/transition/impl/eth2/statechange/process_slashings.go +++ b/cl/transition/impl/eth2/statechange/process_slashings.go @@ -17,9 +17,10 @@ package statechange import ( + "runtime" + "github.com/erigontech/erigon/cl/abstract" "github.com/erigontech/erigon/cl/clparams" - "github.com/erigontech/erigon/cl/cltypes/solid" "github.com/erigontech/erigon/cl/phase1/core/state" ) @@ -37,10 +38,10 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error { } beaconConfig := s.BeaconConfig() // Apply penalties to validators who have been slashed and reached the withdrawable epoch - var err error - s.ForEachValidator(func(validator solid.Validator, i, total int) bool { + return ParallellForLoop(runtime.NumCPU(), 0, s.ValidatorSet().Length(), func(i int) error { + validator := s.ValidatorSet().Get(i) if !validator.Slashed() || epoch+beaconConfig.EpochsPerSlashingsVector/2 != validator.WithdrawableEpoch() { - return true + return nil } // Get the effective balance increment increment := beaconConfig.EffectiveBalanceIncrement @@ -49,15 +50,8 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error { // Calculate the penalty by dividing the penalty numerator by the total balance and multiplying by the increment penalty := penaltyNumerator / totalBalance * increment // Decrease the validator's balance by the calculated penalty - if err = state.DecreaseBalance(s, uint64(i), penalty); err != nil { - return false - } - return true + return state.DecreaseBalance(s, uint64(i), penalty) }) - if err != nil { - return err - } - return nil } func ProcessSlashings(state abstract.BeaconState) error { diff --git a/cl/transition/impl/eth2/statechange/worker_pool.go b/cl/transition/impl/eth2/statechange/worker_pool.go index 25d98300c23..a34d8422549 100644 --- a/cl/transition/impl/eth2/statechange/worker_pool.go +++ b/cl/transition/impl/eth2/statechange/worker_pool.go @@ -70,3 +70,26 @@ func (wp *WorkerPool) AddWork(f func() error) { wp.wg.Add(1) wp.work <- f } + +func ParallellForLoop(numWorkers int, from, to int, f func(int) error) error { + // divide the work into numWorkers parts + size := (to - from) / numWorkers + wp := CreateWorkerPool(numWorkers) + for i := 0; i < numWorkers; i++ { + start := from + i*size + end := start + size + if i == numWorkers-1 { + end = to + } + wp.AddWork(func() error { + for j := start; j < end; j++ { + if err := f(j); err != nil { + return err + } + } + return nil + }) + } + wp.WaitAndClose() + return wp.Error() +}