From cd1a056b7b505018cda82ebc408b27a94b2aa44a Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 4 Oct 2024 19:14:31 +0200 Subject: [PATCH] save --- .../impl/eth2/statechange/process_epoch.go | 1 - .../statechange/process_inactivity_scores.go | 46 +++++++++---------- .../process_rewards_and_penalties.go | 21 ++++++--- .../eth2/statechange/process_slashings.go | 18 +++----- .../impl/eth2/statechange/worker_pool.go | 23 ++++++++++ 5 files changed, 67 insertions(+), 42 deletions(-) diff --git a/cl/transition/impl/eth2/statechange/process_epoch.go b/cl/transition/impl/eth2/statechange/process_epoch.go index 72f722f9641..a9f94073d64 100644 --- a/cl/transition/impl/eth2/statechange/process_epoch.go +++ b/cl/transition/impl/eth2/statechange/process_epoch.go @@ -55,7 +55,6 @@ func ProcessEpoch(s abstract.BeaconState) error { } monitor.ObserveProcessJustificationBitsAndFinalityTime(start) // fmt.Println("ProcessJustificationBitsAndFinality", time.Since(start)) - // start = time.Now() if s.Version() >= clparams.AltairVersion { start = time.Now() diff --git a/cl/transition/impl/eth2/statechange/process_inactivity_scores.go b/cl/transition/impl/eth2/statechange/process_inactivity_scores.go index b8369117207..25400f91c19 100644 --- a/cl/transition/impl/eth2/statechange/process_inactivity_scores.go +++ b/cl/transition/impl/eth2/statechange/process_inactivity_scores.go @@ -29,29 +29,29 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies return nil } - wp := CreateWorkerPool(runtime.NumCPU()) - for _, validatorIndex := range eligibleValidatorsIndicies { - wp.AddWork(func() error { - // retrieve validator inactivity score index. - score, err := s.ValidatorInactivityScore(int(validatorIndex)) - if err != nil { - return err - } - if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { - score -= min(1, score) - } else { - score += s.BeaconConfig().InactivityScoreBias - } - if !state.InactivityLeaking(s) { - score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score) - } - if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil { - return err - } + return ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidatorsIndicies), func(i int) error { + validatorIndex := eligibleValidatorsIndicies[i] + + // retrieve validator inactivity score index. + score, err := s.ValidatorInactivityScore(int(validatorIndex)) + if err != nil { + return err + } + if score == 0 && unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { return nil - }) - } + } - wp.WaitAndClose() - return wp.Error() + if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] { + score -= min(1, score) + } else { + score += s.BeaconConfig().InactivityScoreBias + } + if !state.InactivityLeaking(s) { + score -= min(s.BeaconConfig().InactivityScoreRecoveryRate, score) + } + if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil { + return err + } + return nil + }) } diff --git a/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go b/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go index 5cd1adb2c52..3e54b95d442 100644 --- a/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go +++ b/cl/transition/impl/eth2/statechange/process_rewards_and_penalties.go @@ -17,6 +17,8 @@ package statechange import ( + "runtime" + "github.com/erigontech/erigon/cl/abstract" "github.com/erigontech/erigon/cl/clparams" "github.com/erigontech/erigon/cl/cltypes/solid" @@ -47,13 +49,14 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida rewardMultipliers[i] = weights[i] * (flagsTotalBalances[i] / beaconConfig.EffectiveBalanceIncrement) } rewardDenominator := (totalActiveBalance / beaconConfig.EffectiveBalanceIncrement) * beaconConfig.WeightDenominator - var baseReward uint64 inactivityLeaking := state.InactivityLeaking(s) - // Now process deltas and whats nots. - for _, index := range eligibleValidators { + + err = ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidators), func(i int) error { + var baseReward uint64 + index := eligibleValidators[i] baseReward, err = s.BaseReward(index) if err != nil { - return + return err } delta := int64(0) for flagIdx := range weights { @@ -65,6 +68,7 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida delta -= int64(baseReward * weights[flagIdx] / beaconConfig.WeightDenominator) } } + if !flagsUnslashedIndiciesSet[beaconConfig.TimelyTargetFlagIndex][index] { inactivityScore, err := s.ValidatorInactivityScore(int(index)) if err != nil { @@ -77,6 +81,7 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida } delta -= int64((effectiveBalance * inactivityScore) / inactivityPenaltyDenominator) } + if delta > 0 { if err := state.IncreaseBalance(s, index, uint64(delta)); err != nil { return err @@ -84,8 +89,12 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida } else if err := state.DecreaseBalance(s, index, uint64(-delta)); err != nil { return err } - } - return + // } + return nil + }) + + return err + // return } // processRewardsAndPenaltiesPhase0 process rewards and penalties for phase0 state. diff --git a/cl/transition/impl/eth2/statechange/process_slashings.go b/cl/transition/impl/eth2/statechange/process_slashings.go index 4fb4e9437a1..94347528892 100644 --- a/cl/transition/impl/eth2/statechange/process_slashings.go +++ b/cl/transition/impl/eth2/statechange/process_slashings.go @@ -17,9 +17,10 @@ package statechange import ( + "runtime" + "github.com/erigontech/erigon/cl/abstract" "github.com/erigontech/erigon/cl/clparams" - "github.com/erigontech/erigon/cl/cltypes/solid" "github.com/erigontech/erigon/cl/phase1/core/state" ) @@ -37,10 +38,10 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error { } beaconConfig := s.BeaconConfig() // Apply penalties to validators who have been slashed and reached the withdrawable epoch - var err error - s.ForEachValidator(func(validator solid.Validator, i, total int) bool { + return ParallellForLoop(runtime.NumCPU(), 0, s.ValidatorSet().Length(), func(i int) error { + validator := s.ValidatorSet().Get(i) if !validator.Slashed() || epoch+beaconConfig.EpochsPerSlashingsVector/2 != validator.WithdrawableEpoch() { - return true + return nil } // Get the effective balance increment increment := beaconConfig.EffectiveBalanceIncrement @@ -49,15 +50,8 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error { // Calculate the penalty by dividing the penalty numerator by the total balance and multiplying by the increment penalty := penaltyNumerator / totalBalance * increment // Decrease the validator's balance by the calculated penalty - if err = state.DecreaseBalance(s, uint64(i), penalty); err != nil { - return false - } - return true + return state.DecreaseBalance(s, uint64(i), penalty) }) - if err != nil { - return err - } - return nil } func ProcessSlashings(state abstract.BeaconState) error { diff --git a/cl/transition/impl/eth2/statechange/worker_pool.go b/cl/transition/impl/eth2/statechange/worker_pool.go index 25d98300c23..a34d8422549 100644 --- a/cl/transition/impl/eth2/statechange/worker_pool.go +++ b/cl/transition/impl/eth2/statechange/worker_pool.go @@ -70,3 +70,26 @@ func (wp *WorkerPool) AddWork(f func() error) { wp.wg.Add(1) wp.work <- f } + +func ParallellForLoop(numWorkers int, from, to int, f func(int) error) error { + // divide the work into numWorkers parts + size := (to - from) / numWorkers + wp := CreateWorkerPool(numWorkers) + for i := 0; i < numWorkers; i++ { + start := from + i*size + end := start + size + if i == numWorkers-1 { + end = to + } + wp.AddWork(func() error { + for j := start; j < end; j++ { + if err := f(j); err != nil { + return err + } + } + return nil + }) + } + wp.WaitAndClose() + return wp.Error() +}