From d3d1d6004e386aeab5245a23662e3fa912146505 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Sun, 17 Nov 2024 18:59:50 +0100 Subject: [PATCH] Caplin: Reuse same state for reorg + alloc reduction in forkchoice (#12735) --- cl/cltypes/solid/hash_vector.go | 1 + cl/cltypes/solid/validator_set.go | 3 +- cl/persistence/state/state_accessors.go | 10 ++ cl/phase1/core/state/cache.go | 2 +- cl/phase1/core/state/raw/ssz.go | 5 +- cl/phase1/core/state/raw/state.go | 35 +++--- cl/phase1/forkchoice/checkpoint_state.go | 44 ++----- cl/phase1/forkchoice/fork_choice_test.go | 5 +- .../forkchoice/fork_graph/fork_graph_disk.go | 61 +++++----- .../fork_graph/fork_graph_disk_fs.go | 58 ++++++--- cl/phase1/forkchoice/fork_graph/interface.go | 4 +- .../participation_indicies_store.go | 41 +++++++ cl/phase1/forkchoice/forkchoice.go | 36 +++--- cl/phase1/forkchoice/get_head.go | 35 +----- cl/phase1/forkchoice/latest_messages_store.go | 79 ++++++++++++ cl/phase1/forkchoice/on_attestation.go | 16 +-- .../db_public_keys_registry.go | 56 +++++++++ .../in_memory_public_keys_registry.go | 95 +++++++++++++++ .../public_keys_registry/interface.go | 18 +++ cl/phase1/forkchoice/utils.go | 5 +- .../services/aggregate_and_proof_service.go | 2 +- .../services/batch_signature_verification.go | 4 +- .../network/services/blob_sidecar_service.go | 112 +++++++++--------- cl/phase1/network/services/block_service.go | 6 - .../sync_committee_messages_service.go | 31 +++-- cl/phase1/stages/forkchoice.go | 73 +++++++----- cl/phase1/stages/stage_history_download.go | 4 +- cl/spectest/Makefile | 2 +- cl/spectest/consensus_tests/fork_choice.go | 3 +- cmd/caplin/caplin1/run.go | 7 +- 30 files changed, 580 insertions(+), 273 deletions(-) create mode 100644 cl/phase1/forkchoice/fork_graph/participation_indicies_store.go create mode 100644 cl/phase1/forkchoice/latest_messages_store.go create mode 100644 cl/phase1/forkchoice/public_keys_registry/db_public_keys_registry.go create mode 100644 cl/phase1/forkchoice/public_keys_registry/in_memory_public_keys_registry.go create mode 100644 cl/phase1/forkchoice/public_keys_registry/interface.go diff --git a/cl/cltypes/solid/hash_vector.go b/cl/cltypes/solid/hash_vector.go index 8999d02ab0b..697c8323d48 100644 --- a/cl/cltypes/solid/hash_vector.go +++ b/cl/cltypes/solid/hash_vector.go @@ -86,6 +86,7 @@ func (h *hashVector) DecodeSSZ(buf []byte, version int) error { if len(buf) < h.Length()*length.Hash { return ssz.ErrBadDynamicLength } + h.u.MerkleTree = nil copy(h.u.u, buf) return nil } diff --git a/cl/cltypes/solid/validator_set.go b/cl/cltypes/solid/validator_set.go index 1d831872609..1af61dfd9cc 100644 --- a/cl/cltypes/solid/validator_set.go +++ b/cl/cltypes/solid/validator_set.go @@ -55,8 +55,6 @@ type ValidatorSet struct { // We have phase0 data below phase0Data []Phase0Data attesterBits []byte - - hashBuf } func NewValidatorSet(c int) *ValidatorSet { @@ -158,6 +156,7 @@ func (v *ValidatorSet) DecodeSSZ(buf []byte, _ int) error { } v.expandBuffer(len(buf) / validatorSize) copy(v.buffer, buf) + v.MerkleTree = nil v.l = len(buf) / validatorSize v.phase0Data = make([]Phase0Data, v.l) v.attesterBits = make([]byte, v.l) diff --git a/cl/persistence/state/state_accessors.go b/cl/persistence/state/state_accessors.go index 05353687069..d25082b6d9b 100644 --- a/cl/persistence/state/state_accessors.go +++ b/cl/persistence/state/state_accessors.go @@ -142,6 +142,16 @@ func IncrementHistoricalSummariesTable(tx kv.RwTx, state *state.CachingBeaconSta return nil } +func ReadPublicKeyByIndexNoCopy(tx kv.Tx, index uint64) ([]byte, error) { + var pks []byte + var err error + key := base_encoding.Encode64ToBytes4(index) + if pks, err = tx.GetOne(kv.ValidatorPublicKeys, key); err != nil { + return nil, err + } + return pks, err +} + func ReadPublicKeyByIndex(tx kv.Tx, index uint64) (libcommon.Bytes48, error) { var pks []byte var err error diff --git a/cl/phase1/core/state/cache.go b/cl/phase1/core/state/cache.go index a4da6a8e275..a3986878753 100644 --- a/cl/phase1/core/state/cache.go +++ b/cl/phase1/core/state/cache.go @@ -273,7 +273,6 @@ func (b *CachingBeaconState) InitBeaconState() error { b._refreshActiveBalancesIfNeeded() b.publicKeyIndicies = make(map[[48]byte]uint64) - b.ForEachValidator(func(validator solid.Validator, i, total int) bool { b.publicKeyIndicies[validator.PublicKey()] = uint64(i) @@ -287,6 +286,7 @@ func (b *CachingBeaconState) InitBeaconState() error { if b.Version() >= clparams.Phase0Version { return b._initializeValidatorsPhase0() } + return nil } diff --git a/cl/phase1/core/state/raw/ssz.go b/cl/phase1/core/state/raw/ssz.go index e8e6b0a8db0..c1e1dfff3fd 100644 --- a/cl/phase1/core/state/raw/ssz.go +++ b/cl/phase1/core/state/raw/ssz.go @@ -86,9 +86,12 @@ func (b *BeaconState) getSchema() []interface{} { func (b *BeaconState) DecodeSSZ(buf []byte, version int) error { b.version = clparams.StateVersion(version) - if len(buf) < b.EncodingSizeSSZ() { + if len(buf) < int(b.baseOffsetSSZ()) { return fmt.Errorf("[BeaconState] err: %s", ssz.ErrLowBufferSize) } + if version >= int(clparams.BellatrixVersion) { + b.latestExecutionPayloadHeader = &cltypes.Eth1Header{} + } if err := ssz2.UnmarshalSSZ(buf, version, b.getSchema()...); err != nil { return err } diff --git a/cl/phase1/core/state/raw/state.go b/cl/phase1/core/state/raw/state.go index 1ed31ee7798..3acbd5e2d11 100644 --- a/cl/phase1/core/state/raw/state.go +++ b/cl/phase1/core/state/raw/state.go @@ -95,29 +95,30 @@ func New(cfg *clparams.BeaconChainConfig) *BeaconState { currentSyncCommittee: &solid.SyncCommittee{}, nextSyncCommittee: &solid.SyncCommittee{}, latestExecutionPayloadHeader: &cltypes.Eth1Header{}, - //inactivityScores: solid.NewSimpleUint64Slice(int(cfg.ValidatorRegistryLimit)), - inactivityScores: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)), - balances: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)), - previousEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)), - currentEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)), - slashings: solid.NewUint64VectorSSZ(SlashingsLength), - currentEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.CurrentEpochAttestationsLength())), - previousEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.PreviousEpochAttestationsLength())), - historicalRoots: solid.NewHashList(int(cfg.HistoricalRootsLimit)), - blockRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)), - stateRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)), - randaoMixes: solid.NewHashVector(int(cfg.EpochsPerHistoricalVector)), - validators: solid.NewValidatorSet(int(cfg.ValidatorRegistryLimit)), - leaves: make([]byte, 32*32), + inactivityScores: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)), + balances: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)), + previousEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)), + currentEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)), + slashings: solid.NewUint64VectorSSZ(SlashingsLength), + currentEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.CurrentEpochAttestationsLength())), + previousEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.PreviousEpochAttestationsLength())), + historicalRoots: solid.NewHashList(int(cfg.HistoricalRootsLimit)), + blockRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)), + stateRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)), + randaoMixes: solid.NewHashVector(int(cfg.EpochsPerHistoricalVector)), + validators: solid.NewValidatorSet(int(cfg.ValidatorRegistryLimit)), + leaves: make([]byte, 32*32), } state.init() return state } +func (b *BeaconState) SetValidatorSet(validatorSet *solid.ValidatorSet) { + b.validators = validatorSet +} + func (b *BeaconState) init() error { - if b.touchedLeaves == nil { - b.touchedLeaves = make([]atomic.Uint32, StateLeafSize) - } + b.touchedLeaves = make([]atomic.Uint32, StateLeafSize) return nil } diff --git a/cl/phase1/forkchoice/checkpoint_state.go b/cl/phase1/forkchoice/checkpoint_state.go index 073a29847e2..0e96d24e29d 100644 --- a/cl/phase1/forkchoice/checkpoint_state.go +++ b/cl/phase1/forkchoice/checkpoint_state.go @@ -25,10 +25,9 @@ import ( "github.com/erigontech/erigon/cl/monitor" "github.com/erigontech/erigon/cl/monitor/shuffling_metrics" "github.com/erigontech/erigon/cl/phase1/core/state/shuffling" + "github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry" - "github.com/Giulio2002/bls" libcommon "github.com/erigontech/erigon-lib/common" - "github.com/erigontech/erigon-lib/common/length" "github.com/erigontech/erigon/cl/clparams" "github.com/erigontech/erigon/cl/cltypes" @@ -44,17 +43,18 @@ type checkpointState struct { shuffledSet []uint64 // shuffled set of active validators // validator data balances []uint64 - // These are flattened to save memory and anchor public keys are static and shared. - anchorPublicKeys []byte // flattened base public keys - publicKeys []byte // flattened public keys - actives []byte - slasheds []byte + // bitlists of active indexes and slashed indexes + actives []byte + slasheds []byte + + publicKeysRegistry public_keys_registry.PublicKeyRegistry validatorSetSize int // fork data genesisValidatorsRoot libcommon.Hash fork *cltypes.Fork activeBalance, epoch uint64 // current active balance and epoch + checkpoint solid.Checkpoint } func writeToBitset(bitset []byte, i int, value bool) { @@ -73,9 +73,8 @@ func readFromBitset(bitset []byte, i int) bool { return (bitset[sliceIndex] & (1 << uint(bitIndex))) > 0 } -func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKeys []byte, validatorSet []solid.Validator, randaoMixes solid.HashVectorSSZ, - genesisValidatorsRoot libcommon.Hash, fork *cltypes.Fork, activeBalance, epoch uint64) *checkpointState { - publicKeys := make([]byte, (len(validatorSet)-(len(anchorPublicKeys)/length.Bytes48))*length.Bytes48) +func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, publicKeysRegistry public_keys_registry.PublicKeyRegistry, validatorSet []solid.Validator, randaoMixes solid.HashVectorSSZ, + genesisValidatorsRoot libcommon.Hash, fork *cltypes.Fork, activeBalance, epoch uint64, checkpoint solid.Checkpoint) *checkpointState { balances := make([]uint64, len(validatorSet)) bitsetSize := (len(validatorSet) + 7) / 8 @@ -86,11 +85,6 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe writeToBitset(actives, i, validatorSet[i].Active(epoch)) writeToBitset(slasheds, i, validatorSet[i].Slashed()) } - // Add the post-anchor public keys as surplus - for i := len(anchorPublicKeys) / length.Bytes48; i < len(validatorSet); i++ { - pos := i - len(anchorPublicKeys)/length.Bytes48 - copy(publicKeys[pos*length.Bytes48:(pos+1)*length.Bytes48], validatorSet[i].PublicKeyBytes()) - } mixes := solid.NewHashVector(randaoMixesLength) randaoMixes.CopyTo(mixes) @@ -100,16 +94,15 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe beaconConfig: beaconConfig, randaoMixes: mixes, balances: balances, - anchorPublicKeys: anchorPublicKeys, - publicKeys: publicKeys, genesisValidatorsRoot: genesisValidatorsRoot, fork: fork, activeBalance: activeBalance, slasheds: slasheds, actives: actives, validatorSetSize: len(validatorSet), - - epoch: epoch, + checkpoint: checkpoint, + epoch: epoch, + publicKeysRegistry: publicKeysRegistry, } mixPosition := (epoch + beaconConfig.EpochsPerHistoricalVector - beaconConfig.MinSeedLookahead - 1) % beaconConfig.EpochsPerHistoricalVector @@ -196,17 +189,6 @@ func (c *checkpointState) isValidIndexedAttestation(att *cltypes.IndexedAttestat return false, errors.New("isValidIndexedAttestation: attesting indices are not sorted or are null") } - pks := [][]byte{} - inds.Range(func(_ int, v uint64, _ int) bool { - if v < uint64(len(c.anchorPublicKeys))/length.Bytes48 { - pks = append(pks, c.anchorPublicKeys[v*length.Bytes48:(v+1)*length.Bytes48]) - } else { - offset := uint64(len(c.anchorPublicKeys) / length.Bytes48) - pks = append(pks, c.publicKeys[(v-offset)*length.Bytes48:(v-offset+1)*length.Bytes48]) - } - return true - }) - domain, err := c.getDomain(c.beaconConfig.DomainBeaconAttester, att.Data.Target.Epoch) if err != nil { return false, fmt.Errorf("unable to get the domain: %v", err) @@ -217,7 +199,7 @@ func (c *checkpointState) isValidIndexedAttestation(att *cltypes.IndexedAttestat return false, fmt.Errorf("unable to get signing root: %v", err) } - valid, err := bls.VerifyAggregate(att.Signature[:], signingRoot[:], pks) + valid, err := c.publicKeysRegistry.VerifyAggregateSignature(c.checkpoint, inds, signingRoot[:], att.Signature) if err != nil { return false, fmt.Errorf("error while validating signature: %v", err) } diff --git a/cl/phase1/forkchoice/fork_choice_test.go b/cl/phase1/forkchoice/fork_choice_test.go index d484cbe03a1..999f809ed14 100644 --- a/cl/phase1/forkchoice/fork_choice_test.go +++ b/cl/phase1/forkchoice/fork_choice_test.go @@ -33,6 +33,7 @@ import ( "github.com/erigontech/erigon/cl/phase1/core/state" "github.com/erigontech/erigon/cl/phase1/forkchoice" "github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph" + "github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry" "github.com/erigontech/erigon/cl/pool" "github.com/erigontech/erigon/cl/transition" @@ -84,7 +85,7 @@ func TestForkChoiceBasic(t *testing.T) { pool := pool.NewOperationsPool(&clparams.MainnetBeaconConfig) emitters := beaconevents.NewEventEmitter() validatorMonitor := monitor.NewValidatorMonitor(false, nil, nil, nil) - store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), emitters, sd, nil, validatorMonitor, false) + store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), emitters, sd, nil, validatorMonitor, public_keys_registry.NewInMemoryPublicKeysRegistry(), false) require.NoError(t, err) // first steps store.OnTick(0) @@ -150,7 +151,7 @@ func TestForkChoiceChainBellatrix(t *testing.T) { sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0) store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{ Beacon: true, - }, emitters), emitters, sd, nil, nil, false) + }, emitters), emitters, sd, nil, nil, public_keys_registry.NewInMemoryPublicKeysRegistry(), false) store.OnTick(2000) require.NoError(t, err) for _, block := range blocks { diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index 177265bfe89..7622c52717b 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -77,10 +77,6 @@ func (r ChainSegmentInsertionResult) String() string { } } -type savedStateRecord struct { - slot uint64 -} - func convertHashSliceToHashList(in [][32]byte) solid.HashVectorSSZ { out := solid.NewHashVector(len(in)) for i, v := range in { @@ -110,8 +106,8 @@ type forkGraphDisk struct { syncCommittees sync.Map lightclientBootstraps sync.Map - previousIndicies sync.Map - currentIndicies sync.Map + previousIndicies participationIndiciesStore + currentIndicies participationIndiciesStore // configurations beaconCfg *clparams.BeaconChainConfig @@ -131,6 +127,8 @@ type forkGraphDisk struct { rcfg beacon_router_configuration.RouterConfiguration emitter *beaconevents.EventEmitter + + stateDumpLock sync.Mutex } // Initialize fork graph with a new state @@ -160,10 +158,10 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r } f.lowestAvailableBlock.Store(anchorState.Slot()) f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader) + f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2) f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true) // preallocate buffer - f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2) return f } @@ -188,11 +186,6 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, f.badBlocks.Store(libcommon.Hash(blockRoot), struct{}{}) return nil, BelowAnchor, nil } - // Check if block being process right now was marked as invalid. - if _, ok := f.badBlocks.Load(libcommon.Hash(blockRoot)); ok { - log.Debug("block has invalid parent", "slot", block.Slot, "hash", libcommon.Hash(blockRoot)) - return nil, InvalidBlock, nil - } newState, err := f.GetState(block.ParentRoot, false) if err != nil { @@ -264,8 +257,9 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, // update diff storages. if f.rcfg.Beacon || f.rcfg.Validator || f.rcfg.Lighthouse { if block.Version() != clparams.Phase0Version { - f.currentIndicies.Store(libcommon.Hash(blockRoot), libcommon.Copy(newState.RawCurrentEpochParticipation())) - f.previousIndicies.Store(libcommon.Hash(blockRoot), libcommon.Copy(newState.RawPreviousEpochParticipation())) + epoch := state.Epoch(newState) + f.currentIndicies.add(epoch, newState.RawCurrentEpochParticipation()) + f.previousIndicies.add(epoch, newState.RawPreviousEpochParticipation()) } f.blockRewards.Store(libcommon.Hash(blockRoot), blockRewardsCollector) @@ -425,6 +419,10 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) { return } + // prune the indicies for the epoch + f.currentIndicies.prune(pruneSlot / f.beaconCfg.SlotsPerEpoch) + f.previousIndicies.prune(pruneSlot / f.beaconCfg.SlotsPerEpoch) + f.lowestAvailableBlock.Store(pruneSlot + 1) for _, root := range oldRoots { f.badBlocks.Delete(root) @@ -436,9 +434,6 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) { f.blockRewards.Delete(root) f.fs.Remove(getBeaconStateFilename(root)) f.fs.Remove(getBeaconStateCacheFilename(root)) - - f.previousIndicies.Delete(root) - f.currentIndicies.Delete(root) } log.Debug("Pruned old blocks", "pruneSlot", pruneSlot) return @@ -510,28 +505,40 @@ func (f *forkGraphDisk) GetInactivitiesScores(blockRoot libcommon.Hash) (solid.U return st.InactivityScores(), nil } -func (f *forkGraphDisk) GetPreviousParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) { - b, ok := f.previousIndicies.Load(blockRoot) +func (f *forkGraphDisk) GetPreviousParticipationIndicies(epoch uint64) (*solid.ParticipationBitList, error) { + b, ok := f.previousIndicies.get(epoch) if !ok { - return nil, nil + if epoch == 0 { + return nil, nil + } + b, ok = f.previousIndicies.get(epoch - 1) + if !ok { + return nil, nil + } } - if len(b.([]byte)) == 0 { + if len(b) == 0 { return nil, nil } out := solid.NewParticipationBitList(0, int(f.beaconCfg.ValidatorRegistryLimit)) - return out, out.DecodeSSZ(b.([]byte), 0) + return out, out.DecodeSSZ(b, 0) } -func (f *forkGraphDisk) GetCurrentParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) { - b, ok := f.currentIndicies.Load(blockRoot) +func (f *forkGraphDisk) GetCurrentParticipationIndicies(epoch uint64) (*solid.ParticipationBitList, error) { + b, ok := f.currentIndicies.get(epoch) if !ok { - return nil, nil + if epoch == 0 { + return nil, nil + } + b, ok = f.currentIndicies.get(epoch - 1) + if !ok { + return nil, nil + } } - if len(b.([]byte)) == 0 { + if len(b) == 0 { return nil, nil } out := solid.NewParticipationBitList(0, int(f.beaconCfg.ValidatorRegistryLimit)) - return out, out.DecodeSSZ(b.([]byte), 0) + return out, out.DecodeSSZ(b, 0) } func (f *forkGraphDisk) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) { diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go index c0a163ccf65..811e6fd48e2 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go @@ -17,6 +17,7 @@ package fork_graph import ( + "bytes" "encoding/binary" "fmt" "io" @@ -26,6 +27,7 @@ import ( "github.com/spf13/afero" libcommon "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cl/phase1/core/state" ) @@ -39,6 +41,9 @@ func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string { func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) { var file afero.File + f.stateDumpLock.Lock() + defer f.stateDumpLock.Unlock() + file, err = f.fs.Open(getBeaconStateFilename(blockRoot)) if err != nil { return @@ -72,8 +77,8 @@ func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *s return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot) } f.sszBuffer = f.sszBuffer[:n] - bs = state.New(f.beaconCfg) + if err = bs.DecodeSSZ(f.sszBuffer, int(v[0])); err != nil { return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, len(f.sszBuffer), bs) } @@ -101,11 +106,11 @@ func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *stat if err != nil { return } + version := bs.Version() - var dumpedFile afero.File - dumpedFile, err = f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) + dumpedFile, err := f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) if err != nil { - return + return err } defer dumpedFile.Close() @@ -116,39 +121,58 @@ func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *stat } // First write the hard fork version - if _, err := f.sszSnappyWriter.Write([]byte{byte(bs.Version())}); err != nil { + if _, err := f.sszSnappyWriter.Write([]byte{byte(version)}); err != nil { + log.Error("failed to write hard fork version", "err", err) return err } // Second write the length length := make([]byte, 8) binary.BigEndian.PutUint64(length, uint64(len(f.sszBuffer))) if _, err := f.sszSnappyWriter.Write(length); err != nil { + log.Error("failed to write length", "err", err) return err } // Lastly dump the state if _, err := f.sszSnappyWriter.Write(f.sszBuffer); err != nil { + log.Error("failed to write ssz buffer", "err", err) return err } if err = f.sszSnappyWriter.Flush(); err != nil { - return - } - if err = dumpedFile.Sync(); err != nil { - return + log.Error("failed to flush snappy writer", "err", err) + return err } - cacheFile, err := f.fs.OpenFile(getBeaconStateCacheFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) - if err != nil { - return - } - defer cacheFile.Close() + b := bytes.NewBuffer(f.sszBuffer) + b.Reset() - if err := bs.EncodeCaches(cacheFile); err != nil { + if err := bs.EncodeCaches(b); err != nil { + log.Error("failed to encode caches", "err", err) return err } - - if err = cacheFile.Sync(); err != nil { + if err = dumpedFile.Sync(); err != nil { + log.Error("failed to sync dumped file", "err", err) return } + f.stateDumpLock.Lock() + go func() { + cacheFile, err := f.fs.OpenFile(getBeaconStateCacheFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) + if err != nil { + log.Error("failed to open cache file", "err", err) + return + } + defer cacheFile.Close() + defer f.stateDumpLock.Unlock() + + if _, err = cacheFile.Write(b.Bytes()); err != nil { + log.Error("failed to write cache file", "err", err) + return + } + if err = cacheFile.Sync(); err != nil { + log.Error("failed to sync cache file", "err", err) + return + } + }() + return } diff --git a/cl/phase1/forkchoice/fork_graph/interface.go b/cl/phase1/forkchoice/fork_graph/interface.go index 3d24b59efda..173cc7a9cd9 100644 --- a/cl/phase1/forkchoice/fork_graph/interface.go +++ b/cl/phase1/forkchoice/fork_graph/interface.go @@ -51,8 +51,8 @@ type ForkGraph interface { GetLightClientUpdate(period uint64) (*cltypes.LightClientUpdate, bool) GetBalances(blockRoot libcommon.Hash) (solid.Uint64ListSSZ, error) GetInactivitiesScores(blockRoot libcommon.Hash) (solid.Uint64ListSSZ, error) - GetPreviousParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) - GetCurrentParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) + GetCurrentParticipationIndicies(epoch uint64) (*solid.ParticipationBitList, error) + GetPreviousParticipationIndicies(epoch uint64) (*solid.ParticipationBitList, error) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, state *state.CachingBeaconState, forced bool) error } diff --git a/cl/phase1/forkchoice/fork_graph/participation_indicies_store.go b/cl/phase1/forkchoice/fork_graph/participation_indicies_store.go new file mode 100644 index 00000000000..eb276a8c99c --- /dev/null +++ b/cl/phase1/forkchoice/fork_graph/participation_indicies_store.go @@ -0,0 +1,41 @@ +package fork_graph + +import ( + "sync" + + "github.com/erigontech/erigon-lib/common" +) + +type participationIndiciesStore struct { + s sync.Map +} + +func (p *participationIndiciesStore) get(epoch uint64) ([]byte, bool) { + val, ok := p.s.Load(epoch) + if !ok { + return nil, false + } + return val.([]byte), true +} + +func (p *participationIndiciesStore) add(epoch uint64, participations []byte) { + prevBitlistInterface, ok := p.s.Load(epoch) + if !ok { + p.s.Store(epoch, common.Copy(participations)) + return + } + // Reuse the existing slice if possible + prevBitlist := prevBitlistInterface.([]byte) + prevBitlist = prevBitlist[:0] + p.s.Store(epoch, append(prevBitlist, participations...)) +} + +func (p *participationIndiciesStore) prune(epoch uint64) { + // iterate over the map and delete all keys less or equal than epoch + p.s.Range(func(key, value interface{}) bool { + if key.(uint64) <= epoch { + p.s.Delete(key) + } + return true + }) +} diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index 8223a45d7a0..41cc13b9088 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -34,6 +34,7 @@ import ( "github.com/erigontech/erigon/cl/phase1/execution_client" "github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph" "github.com/erigontech/erigon/cl/phase1/forkchoice/optimistic" + "github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry" "github.com/erigontech/erigon/cl/pool" "github.com/erigontech/erigon/cl/transition/impl/eth2" "github.com/erigontech/erigon/cl/utils/eth_clock" @@ -41,7 +42,6 @@ import ( lru "github.com/hashicorp/golang-lru/v2" libcommon "github.com/erigontech/erigon-lib/common" - "github.com/erigontech/erigon-lib/common/length" ) // ForkNode is a struct that represents a node in the fork choice tree. @@ -103,10 +103,10 @@ type ForkChoiceStore struct { forkGraph fork_graph.ForkGraph blobStorage blob_storage.BlobStorage // I use the cache due to the convenient auto-cleanup feauture. - checkpointStates sync.Map // We keep ssz snappy of it as the full beacon state is full of rendundant data. + checkpointStates sync.Map // We keep ssz snappy of it as the full beacon state is full of rendundant data. + publicKeysRegistry public_keys_registry.PublicKeyRegistry - latestMessages []LatestMessage - anchorPublicKeys []byte + latestMessages *latestMessagesStore syncedDataManager *synced_data.SyncedDataManager // We keep track of them so that we can forkchoice with EL. eth2Roots *lru.Cache[libcommon.Hash, libcommon.Hash] // ETH2 root -> ETH1 hash @@ -160,6 +160,7 @@ func NewForkChoiceStore( syncedDataManager *synced_data.SyncedDataManager, blobStorage blob_storage.BlobStorage, validatorMonitor monitor.ValidatorMonitor, + publicKeysRegistry public_keys_registry.PublicKeyRegistry, probabilisticHeadGetter bool, ) (*ForkChoiceStore, error) { anchorRoot, err := anchorState.BlockRoot() @@ -192,15 +193,6 @@ func NewForkChoiceStore( return nil, err } - anchorPublicKeys := make([]byte, anchorState.ValidatorLength()*length.Bytes48) - for idx := 0; idx < anchorState.ValidatorLength(); idx++ { - pk, err := anchorState.ValidatorPublicKey(idx) - if err != nil { - return nil, err - } - copy(anchorPublicKeys[idx*length.Bytes48:], pk[:]) - } - preverifiedSizes, err := lru.New[libcommon.Hash, preverifiedAppendListsSizes](checkpointsPerCache * 10) if err != nil { return nil, err @@ -225,7 +217,7 @@ func NewForkChoiceStore( if err != nil { return nil, err } - + publicKeysRegistry.ResetAnchor(anchorState) participation.Add(state.Epoch(anchorState.BeaconState), anchorState.CurrentEpochParticipation().Copy()) totalActiveBalances.Add(anchorRoot, anchorState.GetTotalActiveBalance()) @@ -237,11 +229,10 @@ func NewForkChoiceStore( f := &ForkChoiceStore{ forkGraph: forkGraph, equivocatingIndicies: make([]byte, anchorState.ValidatorLength(), anchorState.ValidatorLength()*2), - latestMessages: make([]LatestMessage, anchorState.ValidatorLength(), anchorState.ValidatorLength()*2), + latestMessages: newLatestMessagesStore(anchorState.ValidatorLength()), eth2Roots: eth2Roots, engine: engine, operationsPool: operationsPool, - anchorPublicKeys: anchorPublicKeys, beaconCfg: anchorState.BeaconConfig(), preverifiedSizes: preverifiedSizes, finalityCheckpoints: finalityCheckpoints, @@ -262,6 +253,7 @@ func NewForkChoiceStore( optimisticStore: optimistic.NewOptimisticStore(), validatorMonitor: validatorMonitor, probabilisticHeadGetter: probabilisticHeadGetter, + publicKeysRegistry: publicKeysRegistry, } f.justifiedCheckpoint.Store(anchorCheckpoint) f.finalizedCheckpoint.Store(anchorCheckpoint) @@ -514,7 +506,11 @@ func (f *ForkChoiceStore) GetInactivitiesScores(blockRoot libcommon.Hash) (solid } func (f *ForkChoiceStore) GetPreviousParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) { - return f.forkGraph.GetPreviousParticipationIndicies(blockRoot) + header, ok := f.GetHeader(blockRoot) + if !ok { + return nil, nil + } + return f.forkGraph.GetPreviousParticipationIndicies(header.Slot / f.beaconCfg.SlotsPerEpoch) } func (f *ForkChoiceStore) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) { @@ -522,7 +518,11 @@ func (f *ForkChoiceStore) GetValidatorSet(blockRoot libcommon.Hash) (*solid.Vali } func (f *ForkChoiceStore) GetCurrentParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) { - return f.forkGraph.GetCurrentParticipationIndicies(blockRoot) + header, ok := f.GetHeader(blockRoot) + if !ok { + return nil, nil + } + return f.forkGraph.GetCurrentParticipationIndicies(header.Slot / f.beaconCfg.SlotsPerEpoch) } func (f *ForkChoiceStore) IsRootOptimistic(root libcommon.Hash) bool { diff --git a/cl/phase1/forkchoice/get_head.go b/cl/phase1/forkchoice/get_head.go index 5917a69a2fd..ebd4a99fa2e 100644 --- a/cl/phase1/forkchoice/get_head.go +++ b/cl/phase1/forkchoice/get_head.go @@ -62,8 +62,8 @@ func (f *ForkChoiceStore) computeVotes(justifiedCheckpoint solid.Checkpoint, che startIdx = gen.Intn(sampleBasis) step = sampleBasis + gen.Intn(sampleFactor) } - for validatorIndex := startIdx; validatorIndex < len(f.latestMessages); validatorIndex += step { - message := f.latestMessages[validatorIndex] + for validatorIndex := startIdx; validatorIndex < f.latestMessages.latestMessagesCount(); validatorIndex += step { + message, _ := f.latestMessages.get(validatorIndex) v := auxilliaryState.ValidatorSet().Get(validatorIndex) if !v.Active(justifiedCheckpoint.Epoch) || v.Slashed() { continue @@ -79,7 +79,8 @@ func (f *ForkChoiceStore) computeVotes(justifiedCheckpoint solid.Checkpoint, che votes[boostRoot] += (boost * auxilliaryState.BeaconConfig().ProposerScoreBoost) / 100 } } else { - for validatorIndex, message := range f.latestMessages { + for validatorIndex := 0; validatorIndex < f.latestMessages.latestMessagesCount(); validatorIndex++ { + message, _ := f.latestMessages.get(validatorIndex) if message == (LatestMessage{}) { continue } @@ -200,34 +201,6 @@ func (f *ForkChoiceStore) filterValidatorSetForAttestationScores(c *checkpointSt return filtered } -// getWeight computes weight in head decision of canonical chain. -func (f *ForkChoiceStore) getWeight(root libcommon.Hash, indicies []uint64, state *checkpointState) uint64 { - header, has := f.forkGraph.GetHeader(root) - if !has { - return 0 - } - // Compute attestation score - var attestationScore uint64 - for _, validatorIndex := range indicies { - if f.Ancestor(f.latestMessages[validatorIndex].Root, header.Slot) != root { - continue - } - attestationScore += state.balances[validatorIndex] - } - - boostRoot := f.proposerBoostRoot.Load().(libcommon.Hash) - if boostRoot == (libcommon.Hash{}) { - return attestationScore - } - - // Boost is applied if root is an ancestor of proposer_boost_root - if f.Ancestor(boostRoot, header.Slot) == root { - committeeWeight := state.activeBalance / state.beaconConfig.SlotsPerEpoch - attestationScore += (committeeWeight * state.beaconConfig.ProposerScoreBoost) / 100 - } - return attestationScore -} - // getFilteredBlockTree filters out dumb blocks. func (f *ForkChoiceStore) getFilteredBlockTree(base libcommon.Hash) map[libcommon.Hash]*cltypes.BeaconBlockHeader { blocks := make(map[libcommon.Hash]*cltypes.BeaconBlockHeader) diff --git a/cl/phase1/forkchoice/latest_messages_store.go b/cl/phase1/forkchoice/latest_messages_store.go new file mode 100644 index 00000000000..f31ba7ea606 --- /dev/null +++ b/cl/phase1/forkchoice/latest_messages_store.go @@ -0,0 +1,79 @@ +package forkchoice + +import "sync" + +// latestMessagesStore keeps in memory an inverted index +type latestMessagesStore struct { + messageByIdx map[uint16]LatestMessage + idxByMessage map[LatestMessage]uint16 + countOfMessages map[LatestMessage]int + latestMessages []uint16 + currID uint16 // can overflow by design + mu sync.RWMutex +} + +func newLatestMessagesStore(size int) *latestMessagesStore { + return &latestMessagesStore{ + messageByIdx: map[uint16]LatestMessage{}, + idxByMessage: map[LatestMessage]uint16{}, + countOfMessages: map[LatestMessage]int{}, + latestMessages: make([]uint16, size, (size*3)/2), + } +} + +func (l *latestMessagesStore) set(idx int, m LatestMessage) { + l.mu.Lock() + defer l.mu.Unlock() + _, isPresent := l.idxByMessage[m] + if !isPresent { + l.currID++ + l.idxByMessage[m] = l.currID + l.messageByIdx[l.currID] = m + } + messageID := l.idxByMessage[m] + if idx >= len(l.latestMessages) { + if idx >= cap(l.latestMessages) { + l.latestMessages = make([]uint16, idx+1, ((idx+1)*3)/2) + } + l.latestMessages = l.latestMessages[:idx+1] + } + + prev := l.messageByIdx[l.latestMessages[idx]] + if prev != (LatestMessage{}) && l.countOfMessages[prev] > 0 { + l.countOfMessages[prev]-- + } + l.latestMessages[idx] = messageID + l.countOfMessages[m]++ + + l.doPrune() +} + +func (l *latestMessagesStore) doPrune() { + pruneMessages := []LatestMessage{} + // try to clean up old stuff + for message, count := range l.countOfMessages { + if count == 0 { + pruneMessages = append(pruneMessages, message) + } + } + for _, msg := range pruneMessages { + idx := l.idxByMessage[msg] + delete(l.messageByIdx, idx) + delete(l.idxByMessage, msg) + delete(l.countOfMessages, msg) + } +} + +func (l *latestMessagesStore) get(idx int) (LatestMessage, bool) { + l.mu.RLock() + defer l.mu.RUnlock() + if idx >= len(l.latestMessages) { + return LatestMessage{}, false + } + msg, ok := l.messageByIdx[l.latestMessages[idx]] + return msg, ok +} + +func (l *latestMessagesStore) latestMessagesCount() int { + return len(l.latestMessages) +} diff --git a/cl/phase1/forkchoice/on_attestation.go b/cl/phase1/forkchoice/on_attestation.go index 2739866f2ec..7544509b311 100644 --- a/cl/phase1/forkchoice/on_attestation.go +++ b/cl/phase1/forkchoice/on_attestation.go @@ -158,23 +158,11 @@ func (f *ForkChoiceStore) verifyAttestationWithState( } func (f *ForkChoiceStore) setLatestMessage(index uint64, message LatestMessage) { - if index >= uint64(len(f.latestMessages)) { - if index >= uint64(cap(f.latestMessages)) { - tmp := make([]LatestMessage, index+1, index*2) - copy(tmp, f.latestMessages) - f.latestMessages = tmp - } - f.latestMessages = f.latestMessages[:index+1] - } - f.latestMessages[index] = message + f.latestMessages.set(int(index), message) } func (f *ForkChoiceStore) getLatestMessage(validatorIndex uint64) (LatestMessage, bool) { - if validatorIndex >= uint64(len(f.latestMessages)) || - f.latestMessages[validatorIndex] == (LatestMessage{}) { - return LatestMessage{}, false - } - return f.latestMessages[validatorIndex], true + return f.latestMessages.get(int(validatorIndex)) } func (f *ForkChoiceStore) isUnequivocating(validatorIndex uint64) bool { diff --git a/cl/phase1/forkchoice/public_keys_registry/db_public_keys_registry.go b/cl/phase1/forkchoice/public_keys_registry/db_public_keys_registry.go new file mode 100644 index 00000000000..c6822ddb42b --- /dev/null +++ b/cl/phase1/forkchoice/public_keys_registry/db_public_keys_registry.go @@ -0,0 +1,56 @@ +package public_keys_registry + +import ( + "context" + + "github.com/Giulio2002/bls" + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon/cl/abstract" + "github.com/erigontech/erigon/cl/cltypes/solid" + state_accessors "github.com/erigontech/erigon/cl/persistence/state" +) + +type DBPublicKeysRegistry struct { + db kv.RoDB +} + +func NewDBPublicKeysRegistry(db kv.RoDB) *DBPublicKeysRegistry { + return &DBPublicKeysRegistry{db: db} +} + +// ResetAnchor resets the public keys registry to the anchor state +func (r *DBPublicKeysRegistry) ResetAnchor(s abstract.BeaconState) { + // no-op +} + +// VerifyAggregateSignature verifies the aggregate signature +func (r *DBPublicKeysRegistry) VerifyAggregateSignature(checkpoint solid.Checkpoint, pubkeysIdxs *solid.RawUint64List, message []byte, signature common.Bytes96) (bool, error) { + pks := make([][]byte, 0, pubkeysIdxs.Length()) + + if err := r.db.View(context.TODO(), func(tx kv.Tx) error { + pubkeysIdxs.Range(func(_ int, value uint64, length int) bool { + pk, err := state_accessors.ReadPublicKeyByIndexNoCopy(tx, value) + if err != nil { + return false + } + pks = append(pks, pk) + return true + }) + return nil + }); err != nil { + return false, err + } + + return bls.VerifyAggregate(signature[:], message, pks) +} + +// AddState adds the state to the public keys registry +func (r *DBPublicKeysRegistry) AddState(checkpoint solid.Checkpoint, s abstract.BeaconState) { + // no-op +} + +// Prune removes the public keys registry for the given epoch +func (r *DBPublicKeysRegistry) Prune(epoch uint64) { + // no-op +} diff --git a/cl/phase1/forkchoice/public_keys_registry/in_memory_public_keys_registry.go b/cl/phase1/forkchoice/public_keys_registry/in_memory_public_keys_registry.go new file mode 100644 index 00000000000..b7d0f114285 --- /dev/null +++ b/cl/phase1/forkchoice/public_keys_registry/in_memory_public_keys_registry.go @@ -0,0 +1,95 @@ +package public_keys_registry + +import ( + "fmt" + "sync" + + "github.com/Giulio2002/bls" + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon/cl/abstract" + "github.com/erigontech/erigon/cl/cltypes/solid" +) + +type InMemoryPublicKeysRegistry struct { + basePublicKeyRegistry []common.Bytes48 + statesRegistry map[solid.Checkpoint][]common.Bytes48 + + mu sync.RWMutex +} + +func NewInMemoryPublicKeysRegistry() *InMemoryPublicKeysRegistry { + return &InMemoryPublicKeysRegistry{ + basePublicKeyRegistry: make([]common.Bytes48, 0), + statesRegistry: make(map[solid.Checkpoint][]common.Bytes48), + } +} + +// // PublicKeyRegistry is a registry of public keys +// // It is used to store public keys and their indices +// type PublicKeyRegistry interface { +// ResetAnchor(s abstract.BeaconState) +// VerifyAggregateSignature(blockRoot common.Hash, slot uint64, pubkeysIdxs []uint64, message []byte, signature common.Bytes96) error +// AddState(epoch uint64, blockRoot common.Hash, s abstract.BeaconState) +// Prune(epoch uint64) +// } + +// ResetAnchor resets the public keys registry to the anchor state +func (r *InMemoryPublicKeysRegistry) ResetAnchor(s abstract.BeaconState) { + r.mu.Lock() + defer r.mu.Unlock() + r.statesRegistry = make(map[solid.Checkpoint][]common.Bytes48) // reset the registry + r.basePublicKeyRegistry = make([]common.Bytes48, s.ValidatorLength()) // reset the public keys registry + for i := 0; i < s.ValidatorLength(); i++ { + copy(r.basePublicKeyRegistry[i][:], s.ValidatorSet().Get(i).PublicKeyBytes()) + } +} + +// VerifyAggregateSignature verifies the aggregate signature +func (r *InMemoryPublicKeysRegistry) VerifyAggregateSignature(checkpoint solid.Checkpoint, pubkeysIdxs *solid.RawUint64List, message []byte, signature common.Bytes96) (bool, error) { + r.mu.RLock() + defer r.mu.RUnlock() + if pubkeysIdxs.Length() == 0 { + return false, fmt.Errorf("no public keys provided, %d, %s", checkpoint.Epoch, checkpoint.Root) + } + basePublicKeyRegistryLength := uint64(len(r.basePublicKeyRegistry)) + statePublicKeys, ok := r.statesRegistry[checkpoint] + if !ok { + return false, fmt.Errorf("public keys registry not found for epoch %d and block root %s", checkpoint.Epoch, checkpoint.Root) + } + + pks := make([][]byte, 0, pubkeysIdxs.Length()) + pubkeysIdxs.Range(func(_ int, value uint64, length int) bool { + if value >= basePublicKeyRegistryLength { + if value-basePublicKeyRegistryLength >= uint64(len(statePublicKeys)) { + return false + } + pks = append(pks, statePublicKeys[value-basePublicKeyRegistryLength][:]) + return true + } + pks = append(pks, r.basePublicKeyRegistry[value][:]) + return true + }) + return bls.VerifyAggregate(signature[:], message, pks) +} + +// AddState adds the state to the public keys registry +func (r *InMemoryPublicKeysRegistry) AddState(checkpoint solid.Checkpoint, s abstract.BeaconState) { + r.mu.Lock() + defer r.mu.Unlock() + statePublicKeys := make([]common.Bytes48, s.ValidatorLength()-len(r.basePublicKeyRegistry)) + for i := len(r.basePublicKeyRegistry); i < s.ValidatorLength(); i++ { + copy(statePublicKeys[i-len(r.basePublicKeyRegistry)][:], s.ValidatorSet().Get(i).PublicKeyBytes()) + } + r.statesRegistry[checkpoint] = statePublicKeys +} + +// Prune removes the public keys registry for the given epoch +func (r *InMemoryPublicKeysRegistry) Prune(epoch uint64) { + r.mu.Lock() + defer r.mu.Unlock() + for k := range r.statesRegistry { + if k.Epoch < epoch { + delete(r.statesRegistry, k) + } + } +} diff --git a/cl/phase1/forkchoice/public_keys_registry/interface.go b/cl/phase1/forkchoice/public_keys_registry/interface.go new file mode 100644 index 00000000000..b8115f5d865 --- /dev/null +++ b/cl/phase1/forkchoice/public_keys_registry/interface.go @@ -0,0 +1,18 @@ +package public_keys_registry + +import ( + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon/cl/abstract" + "github.com/erigontech/erigon/cl/cltypes/solid" +) + +// This package as a whole gets plenty of test coverage in spectests, so we can skip unit testing here. + +// PublicKeyRegistry is a registry of public keys +// It is used to store public keys and their indices +type PublicKeyRegistry interface { + ResetAnchor(s abstract.BeaconState) + VerifyAggregateSignature(checkpoint solid.Checkpoint, pubkeysIdxs *solid.RawUint64List, message []byte, signature common.Bytes96) (bool, error) + AddState(checkpoint solid.Checkpoint, s abstract.BeaconState) + Prune(epoch uint64) +} diff --git a/cl/phase1/forkchoice/utils.go b/cl/phase1/forkchoice/utils.go index 0b1cf0c62e6..1963e32bc60 100644 --- a/cl/phase1/forkchoice/utils.go +++ b/cl/phase1/forkchoice/utils.go @@ -155,8 +155,9 @@ func (f *ForkChoiceStore) getCheckpointState(checkpoint solid.Checkpoint) (*chec validators[idx] = v return true }) - checkpointState := newCheckpointState(f.beaconCfg, f.anchorPublicKeys, validators, - mixes, baseState.GenesisValidatorsRoot(), baseState.Fork(), baseState.GetTotalActiveBalance(), state.Epoch(baseState.BeaconState)) + f.publicKeysRegistry.AddState(checkpoint, baseState) + checkpointState := newCheckpointState(f.beaconCfg, f.publicKeysRegistry, validators, + mixes, baseState.GenesisValidatorsRoot(), baseState.Fork(), baseState.GetTotalActiveBalance(), state.Epoch(baseState.BeaconState), checkpoint) // Cache in memory what we are left with. f.checkpointStates.Store(checkpoint, checkpointState) return checkpointState, nil diff --git a/cl/phase1/network/services/aggregate_and_proof_service.go b/cl/phase1/network/services/aggregate_and_proof_service.go index 40730e683fd..27dd73698ca 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service.go +++ b/cl/phase1/network/services/aggregate_and_proof_service.go @@ -107,7 +107,7 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage( committeeIndex := aggregateAndProof.SignedAggregateAndProof.Message.Aggregate.Data.CommitteeIndex if aggregateData.Slot > a.syncedDataManager.HeadSlot() { - a.scheduleAggregateForLaterProcessing(aggregateAndProof) + //a.scheduleAggregateForLaterProcessing(aggregateAndProof) return ErrIgnore } diff --git a/cl/phase1/network/services/batch_signature_verification.go b/cl/phase1/network/services/batch_signature_verification.go index 7d62ed693eb..c7c7cef9ca8 100644 --- a/cl/phase1/network/services/batch_signature_verification.go +++ b/cl/phase1/network/services/batch_signature_verification.go @@ -162,6 +162,7 @@ func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificat // we could locate failing signature with binary search but for now let's choose simplicity over optimisation. func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerificationData []*AggregateVerificationData) { + alreadyBanned := false for _, v := range aggregateVerificationData { valid, err := blsVerifyMultipleSignatures(v.Signatures, v.SignRoots, v.Pks) if err != nil { @@ -173,12 +174,13 @@ func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerification } if !valid { - if v.GossipData == nil { + if v.GossipData == nil || alreadyBanned { continue } log.Debug("[BatchVerifier] received invalid signature on the gossip", "topic", v.GossipData.Name) if b.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil { b.sentinel.BanPeer(b.ctx, v.GossipData.Peer) + alreadyBanned = true } continue } diff --git a/cl/phase1/network/services/blob_sidecar_service.go b/cl/phase1/network/services/blob_sidecar_service.go index a628042b124..0ee124b5a6b 100644 --- a/cl/phase1/network/services/blob_sidecar_service.go +++ b/cl/phase1/network/services/blob_sidecar_service.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/Giulio2002/bls" @@ -27,6 +28,7 @@ import ( "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/crypto/kzg" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cl/beacon/beaconevents" "github.com/erigontech/erigon/cl/beacon/synced_data" "github.com/erigontech/erigon/cl/clparams" @@ -46,8 +48,8 @@ type blobSidecarService struct { ethClock eth_clock.EthereumClock emitters *beaconevents.EventEmitter - // blobSidecarsScheduledForLaterExecution sync.Map - test bool + blobSidecarsScheduledForLaterExecution sync.Map + test bool } type blobSidecarJob struct { @@ -114,7 +116,7 @@ func (b *blobSidecarService) ProcessMessage(ctx context.Context, subnetId *uint6 parentHeader, has := b.forkchoiceStore.GetHeader(msg.SignedBlockHeader.Header.ParentRoot) if !has { - //b.scheduleBlobSidecarForLaterExecution(msg) + b.scheduleBlobSidecarForLaterExecution(msg) return ErrIgnore } if msg.SignedBlockHeader.Header.Slot <= parentHeader.Slot { @@ -194,55 +196,55 @@ func (b *blobSidecarService) verifySidecarsSignature(header *cltypes.SignedBeaco return nil } -// func (b *blobSidecarService) scheduleBlobSidecarForLaterExecution(blobSidecar *cltypes.BlobSidecar) { -// blobSidecarJob := &blobSidecarJob{ -// blobSidecar: blobSidecar, -// creationTime: time.Now(), -// } -// blobSidecarHash, err := blobSidecar.HashSSZ() -// if err != nil { -// return -// } -// b.blobSidecarsScheduledForLaterExecution.Store(blobSidecarHash, blobSidecarJob) -// } - -// // loop is the main loop of the block service -// func (b *blobSidecarService) loop(ctx context.Context) { -// ticker := time.NewTicker(blobJobsIntervalTick) -// defer ticker.Stop() -// if b.test { -// return -// } -// for { -// select { -// case <-ctx.Done(): -// return -// case <-ticker.C: -// } - -// b.blobSidecarsScheduledForLaterExecution.Range(func(key, value any) bool { -// job := value.(*blobSidecarJob) -// // check if it has expired -// if time.Since(job.creationTime) > blobJobExpiry { -// b.blobSidecarsScheduledForLaterExecution.Delete(key.([32]byte)) -// return true -// } -// blockRoot, err := job.blobSidecar.SignedBlockHeader.Header.HashSSZ() -// if err != nil { -// log.Debug("blob sidecar verification failed", "err", err) -// return true -// } -// if _, has := b.forkchoiceStore.GetHeader(blockRoot); has { -// b.blobSidecarsScheduledForLaterExecution.Delete(key.([32]byte)) -// return true -// } -// if err := b.verifyAndStoreBlobSidecar(job.blobSidecar); err != nil { -// log.Trace("blob sidecar verification failed", "err", err, -// "slot", job.blobSidecar.SignedBlockHeader.Header.Slot) -// return true -// } -// b.blobSidecarsScheduledForLaterExecution.Delete(key.([32]byte)) -// return true -// }) -// } -// } +func (b *blobSidecarService) scheduleBlobSidecarForLaterExecution(blobSidecar *cltypes.BlobSidecar) { + blobSidecarJob := &blobSidecarJob{ + blobSidecar: blobSidecar, + creationTime: time.Now(), + } + blobSidecarHash, err := blobSidecar.HashSSZ() + if err != nil { + return + } + b.blobSidecarsScheduledForLaterExecution.Store(blobSidecarHash, blobSidecarJob) +} + +// loop is the main loop of the block service +func (b *blobSidecarService) loop(ctx context.Context) { + ticker := time.NewTicker(blobJobsIntervalTick) + defer ticker.Stop() + if b.test { + return + } + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + b.blobSidecarsScheduledForLaterExecution.Range(func(key, value any) bool { + job := value.(*blobSidecarJob) + // check if it has expired + if time.Since(job.creationTime) > blobJobExpiry { + b.blobSidecarsScheduledForLaterExecution.Delete(key.([32]byte)) + return true + } + blockRoot, err := job.blobSidecar.SignedBlockHeader.Header.HashSSZ() + if err != nil { + log.Debug("blob sidecar verification failed", "err", err) + return true + } + if _, has := b.forkchoiceStore.GetHeader(blockRoot); has { + b.blobSidecarsScheduledForLaterExecution.Delete(key.([32]byte)) + return true + } + if err := b.verifyAndStoreBlobSidecar(job.blobSidecar); err != nil { + log.Trace("blob sidecar verification failed", "err", err, + "slot", job.blobSidecar.SignedBlockHeader.Header.Slot) + return true + } + b.blobSidecarsScheduledForLaterExecution.Delete(key.([32]byte)) + return true + }) + } +} diff --git a/cl/phase1/network/services/block_service.go b/cl/phase1/network/services/block_service.go index 060f0794c1e..ea1fe2d6597 100644 --- a/cl/phase1/network/services/block_service.go +++ b/cl/phase1/network/services/block_service.go @@ -121,12 +121,6 @@ func (b *blockService) ProcessMessage(ctx context.Context, _ *uint64, msg *cltyp return ErrIgnore } - // headState, cn := b.syncedData.HeadState() - // defer cn() - // if headState == nil { - // b.scheduleBlockForLaterProcessing(msg) - // return ErrIgnore - // } if err := b.syncedData.ViewHeadState(func(headState *state.CachingBeaconState) error { // [IGNORE] The block is from a slot greater than the latest finalized slot -- i.e. validate that signed_beacon_block.message.slot > compute_start_slot_at_epoch(store.finalized_checkpoint.epoch) // (a client MAY choose to validate and store such blocks for additional purposes -- e.g. slashing detection, archive nodes, etc). diff --git a/cl/phase1/network/services/sync_committee_messages_service.go b/cl/phase1/network/services/sync_committee_messages_service.go index 74d724688ac..9af0269cc93 100644 --- a/cl/phase1/network/services/sync_committee_messages_service.go +++ b/cl/phase1/network/services/sync_committee_messages_service.go @@ -39,7 +39,7 @@ type seenSyncCommitteeMessage struct { } type syncCommitteeMessagesService struct { - seenSyncCommitteeMessages map[seenSyncCommitteeMessage]struct{} + seenSyncCommitteeMessages sync.Map syncedDataManager *synced_data.SyncedDataManager beaconChainCfg *clparams.BeaconChainConfig syncContributionPool sync_contribution_pool.SyncContributionPool @@ -60,13 +60,12 @@ func NewSyncCommitteeMessagesService( test bool, ) SyncCommitteeMessagesService { return &syncCommitteeMessagesService{ - seenSyncCommitteeMessages: make(map[seenSyncCommitteeMessage]struct{}), - ethClock: ethClock, - syncedDataManager: syncedDataManager, - beaconChainCfg: beaconChainCfg, - syncContributionPool: syncContributionPool, - batchSignatureVerifier: batchSignatureVerifier, - test: test, + ethClock: ethClock, + syncedDataManager: syncedDataManager, + beaconChainCfg: beaconChainCfg, + syncContributionPool: syncContributionPool, + batchSignatureVerifier: batchSignatureVerifier, + test: test, } } @@ -96,7 +95,8 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne return fmt.Errorf("validator is not into any subnet %d", *subnet) } // [IGNORE] There has been no other valid sync committee message for the declared slot for the validator referenced by sync_committee_message.validator_index. - if _, ok := s.seenSyncCommitteeMessages[seenSyncCommitteeMessageIdentifier]; ok { + + if _, ok := s.seenSyncCommitteeMessages.Load(seenSyncCommitteeMessageIdentifier); ok { return ErrIgnore } // [REJECT] The signature is valid for the message beacon_block_root for the validator referenced by validator_index @@ -110,7 +110,7 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne Pks: [][]byte{pubKey}, GossipData: msg.GossipData, F: func() { - s.seenSyncCommitteeMessages[seenSyncCommitteeMessageIdentifier] = struct{}{} + s.seenSyncCommitteeMessages.Store(seenSyncCommitteeMessageIdentifier, struct{}{}) s.cleanupOldSyncCommitteeMessages() // cleanup old messages // Aggregate the message s.syncContributionPool.AddSyncCommitteeMessage(headState, *subnet, msg.SyncCommitteeMessage) @@ -135,10 +135,17 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne // cleanupOldSyncCommitteeMessages removes old sync committee messages from the cache func (s *syncCommitteeMessagesService) cleanupOldSyncCommitteeMessages() { headSlot := s.syncedDataManager.HeadSlot() - for k := range s.seenSyncCommitteeMessages { + + entriesToRemove := []seenSyncCommitteeMessage{} + s.seenSyncCommitteeMessages.Range(func(key, value interface{}) bool { + k := key.(seenSyncCommitteeMessage) if headSlot > k.slot+1 { - delete(s.seenSyncCommitteeMessages, k) + entriesToRemove = append(entriesToRemove, k) } + return true + }) + for _, k := range entriesToRemove { + s.seenSyncCommitteeMessages.Delete(k) } } diff --git a/cl/phase1/stages/forkchoice.go b/cl/phase1/stages/forkchoice.go index 2b095c25758..2aaa77c14ba 100644 --- a/cl/phase1/stages/forkchoice.go +++ b/cl/phase1/stages/forkchoice.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "runtime" + "sync/atomic" "time" "github.com/erigontech/erigon-lib/common" @@ -81,7 +82,6 @@ func computeAndNotifyServicesOfNewForkChoice(ctx context.Context, logger log.Log headRoot, headSlot); err2 != nil { logger.Warn("Could not set status", "err", err2) } - return } @@ -313,6 +313,7 @@ func saveHeadStateOnDiskIfNeeded(cfg *Cfg, headState *state.CachingBeaconState) // postForkchoiceOperations performs the post fork choice operations such as updating the head state, producing and caching attestation data, // these sets of operations can take as long as they need to run, as by-now we are already synced. func postForkchoiceOperations(ctx context.Context, tx kv.RwTx, logger log.Logger, cfg *Cfg, headSlot uint64, headRoot common.Hash) error { + // Retrieve the head state headState, err := cfg.forkChoice.GetStateAtBlockRoot(headRoot, false) if err != nil { @@ -323,6 +324,10 @@ func postForkchoiceOperations(ctx context.Context, tx kv.RwTx, logger log.Logger if err := cfg.syncedData.OnHeadState(headState); err != nil { return fmt.Errorf("failed to set head state: %w", err) } + start := time.Now() + defer func() { + logger.Debug("Post forkchoice operations", "duration", time.Since(start)) + }() return cfg.syncedData.ViewHeadState(func(headState *state.CachingBeaconState) error { // Produce and cache attestation data for validator node (this is not an expensive operation so we can do it for all nodes) @@ -344,6 +349,7 @@ func postForkchoiceOperations(ctx context.Context, tx kv.RwTx, logger log.Logger if err := saveHeadStateOnDiskIfNeeded(cfg, headState); err != nil { return fmt.Errorf("failed to save head state on disk: %w", err) } + // Lastly, emit the head event emitHeadEvent(cfg, headSlot, headRoot, headState) emitNextPaylodAttributesEvent(cfg, headSlot, headRoot, headState) @@ -371,7 +377,6 @@ func doForkchoiceRoutine(ctx context.Context, logger log.Logger, cfg *Cfg, args return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() - if err := updateCanonicalChainInTheDatabase(ctx, tx, headSlot, headRoot, cfg); err != nil { return fmt.Errorf("failed to update canonical chain in the database: %w", err) } @@ -390,36 +395,46 @@ func doForkchoiceRoutine(ctx context.Context, logger log.Logger, cfg *Cfg, args return tx.Commit() } -func preCacheNextShuffledValidatorSet(ctx context.Context, logger log.Logger, cfg *Cfg, b *state.CachingBeaconState) { - nextEpoch := state.Epoch(b) + 1 - beaconConfig := cfg.beaconCfg +// we need to generate only one goroutine for pre-caching shuffled set +var workingPreCacheNextShuffledValidatorSet atomic.Bool - // Check if any action is needed - refSlot := ((nextEpoch - 1) * b.BeaconConfig().SlotsPerEpoch) - 1 - if refSlot >= b.Slot() { - return - } - // Get the block root at the beginning of the previous epoch - blockRootAtBegginingPrevEpoch, err := b.GetBlockRootAtSlot(refSlot) - if err != nil { - logger.Warn("failed to get block root at slot for pre-caching shuffled set", "err", err) - return - } - // Skip if the shuffled set is already pre-cached - if _, ok := caches.ShuffledIndiciesCacheGlobal.Get(nextEpoch, blockRootAtBegginingPrevEpoch); ok { +func preCacheNextShuffledValidatorSet(ctx context.Context, logger log.Logger, cfg *Cfg, b *state.CachingBeaconState) { + if workingPreCacheNextShuffledValidatorSet.Load() { return } + workingPreCacheNextShuffledValidatorSet.Store(true) + go func() { + defer workingPreCacheNextShuffledValidatorSet.Store(false) + nextEpoch := state.Epoch(b) + 1 + beaconConfig := cfg.beaconCfg - indicies := b.GetActiveValidatorsIndices(nextEpoch) - shuffledIndicies := make([]uint64, len(indicies)) - mixPosition := (nextEpoch + beaconConfig.EpochsPerHistoricalVector - beaconConfig.MinSeedLookahead - 1) % - beaconConfig.EpochsPerHistoricalVector - // Input for the seed hash. - mix := b.GetRandaoMix(int(mixPosition)) - start := time.Now() - shuffledIndicies = shuffling.ComputeShuffledIndicies(b.BeaconConfig(), mix, shuffledIndicies, indicies, nextEpoch*beaconConfig.SlotsPerEpoch) - shuffling_metrics.ObserveComputeShuffledIndiciesTime(start) + // Check if any action is needed + refSlot := ((nextEpoch - 1) * b.BeaconConfig().SlotsPerEpoch) - 1 + if refSlot >= b.Slot() { + return + } + // Get the block root at the beginning of the previous epoch + blockRootAtBegginingPrevEpoch, err := b.GetBlockRootAtSlot(refSlot) + if err != nil { + logger.Warn("failed to get block root at slot for pre-caching shuffled set", "err", err) + return + } + // Skip if the shuffled set is already pre-cached + if _, ok := caches.ShuffledIndiciesCacheGlobal.Get(nextEpoch, blockRootAtBegginingPrevEpoch); ok { + return + } - caches.ShuffledIndiciesCacheGlobal.Put(nextEpoch, blockRootAtBegginingPrevEpoch, shuffledIndicies) - log.Info("Pre-cached shuffled set", "epoch", nextEpoch, "len", len(shuffledIndicies), "mix", common.Hash(mix)) + indicies := b.GetActiveValidatorsIndices(nextEpoch) + shuffledIndicies := make([]uint64, len(indicies)) + mixPosition := (nextEpoch + beaconConfig.EpochsPerHistoricalVector - beaconConfig.MinSeedLookahead - 1) % + beaconConfig.EpochsPerHistoricalVector + // Input for the seed hash. + mix := b.GetRandaoMix(int(mixPosition)) + start := time.Now() + shuffledIndicies = shuffling.ComputeShuffledIndicies(b.BeaconConfig(), mix, shuffledIndicies, indicies, nextEpoch*beaconConfig.SlotsPerEpoch) + shuffling_metrics.ObserveComputeShuffledIndiciesTime(start) + + caches.ShuffledIndiciesCacheGlobal.Put(nextEpoch, blockRootAtBegginingPrevEpoch, shuffledIndicies) + log.Info("Pre-cached shuffled set", "epoch", nextEpoch, "len", len(shuffledIndicies), "mix", common.Hash(mix)) + }() } diff --git a/cl/phase1/stages/stage_history_download.go b/cl/phase1/stages/stage_history_download.go index 18a6fc93095..2b41eb31ed9 100644 --- a/cl/phase1/stages/stage_history_download.go +++ b/cl/phase1/stages/stage_history_download.go @@ -202,10 +202,12 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co logArgs = append(logArgs, "slot", currProgress, "blockNumber", currEth1Progress.Load(), - "frozenBlocks", cfg.engine.FrozenBlocks(ctx), "blk/sec", fmt.Sprintf("%.1f", speed), "snapshots", cfg.sn.SegmentsMax(), ) + if cfg.engine != nil && cfg.engine.SupportInsertion() { + logArgs = append(logArgs, "frozenBlocks", cfg.engine.FrozenBlocks(ctx)) + } logMsg := "Node is still syncing... downloading past blocks" if isBackfilling.Load() { logMsg = "Node has finished syncing... full history is being downloaded for archiving purposes" diff --git a/cl/spectest/Makefile b/cl/spectest/Makefile index bf8e758f355..b8e1fedbb1a 100644 --- a/cl/spectest/Makefile +++ b/cl/spectest/Makefile @@ -11,4 +11,4 @@ clean: rm -rf tests mainnet: - CGO_CFLAGS=-D__BLST_PORTABLE__ go test -tags=spectest -run=/mainnet/altair/ -v --timeout 30m + CGO_CFLAGS=-D__BLST_PORTABLE__ go test -tags=spectest -run=/mainnet/ -v --timeout 30m diff --git a/cl/spectest/consensus_tests/fork_choice.go b/cl/spectest/consensus_tests/fork_choice.go index e31950d3825..8d9b8015cee 100644 --- a/cl/spectest/consensus_tests/fork_choice.go +++ b/cl/spectest/consensus_tests/fork_choice.go @@ -38,6 +38,7 @@ import ( "github.com/erigontech/erigon/cl/persistence/blob_storage" "github.com/erigontech/erigon/cl/phase1/forkchoice" "github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph" + "github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry" "github.com/erigontech/erigon/cl/phase1/network/services" "github.com/erigontech/erigon/cl/pool" "github.com/erigontech/erigon/cl/utils/eth_clock" @@ -209,7 +210,7 @@ func (b *ForkChoice) Run(t *testing.T, root fs.FS, c spectest.TestCase) (err err forkStore, err := forkchoice.NewForkChoiceStore( ethClock, anchorState, nil, pool.NewOperationsPool(&clparams.MainnetBeaconConfig), fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), - emitters, synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0), blobStorage, validatorMonitor, false) + emitters, synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0), blobStorage, validatorMonitor, public_keys_registry.NewInMemoryPublicKeysRegistry(), false) require.NoError(t, err) forkStore.SetSynced(true) diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 4eb48502d59..b2f6438d6b6 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -67,6 +67,7 @@ import ( "github.com/erigontech/erigon/cl/phase1/execution_client" "github.com/erigontech/erigon/cl/phase1/forkchoice" "github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph" + "github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry" "github.com/erigontech/erigon/cl/phase1/network" "github.com/erigontech/erigon/cl/phase1/network/services" "github.com/erigontech/erigon/cl/phase1/stages" @@ -261,9 +262,13 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi aggregationPool := aggregation.NewAggregationPool(ctx, beaconConfig, networkConfig, ethClock) validatorMonitor := monitor.NewValidatorMonitor(config.EnableValidatorMonitor, ethClock, beaconConfig, syncedDataManager) doLMDSampling := len(state.GetActiveValidatorsIndices(state.Slot()/beaconConfig.SlotsPerEpoch)) >= 20_000 + + // create the public keys registry + pksRegistry := public_keys_registry.NewDBPublicKeysRegistry(indexDB) + forkChoice, err := forkchoice.NewForkChoiceStore( ethClock, state, engine, pool, fork_graph.NewForkGraphDisk(state, fcuFs, config.BeaconAPIRouter, emitters), - emitters, syncedDataManager, blobStorage, validatorMonitor, doLMDSampling) + emitters, syncedDataManager, blobStorage, validatorMonitor, pksRegistry, doLMDSampling) if err != nil { logger.Error("Could not create forkchoice", "err", err) return err