Skip to content

Commit

Permalink
Caplin: fix occasional missed attestations (#12856)
Browse files Browse the repository at this point in the history
Cache attestation before calling `OnHeadState`, the reason why it was a
probably is that it takes up to a second to update the head state but if
we wait too much other nodes might stop accepting attestations and we
may lose the block vote
  • Loading branch information
Giulio2002 authored Nov 26, 2024
1 parent 1ecaaef commit 1770d35
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 29 deletions.
26 changes: 21 additions & 5 deletions cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,27 @@ func (a *ApiHandler) waitUntilHeadStateAtEpochIsReadyOrCountAsMissed(ctx context
func (a *ApiHandler) waitForHeadSlot(slot uint64) {
stopCh := time.After(time.Second)
for {
if a.syncedData.HeadSlot() >= slot {
headSlot := a.syncedData.HeadSlot()
if headSlot >= slot || a.slotWaitedForAttestationProduction.Contains(slot) {
return
}
_, ok, err := a.attestationProducer.CachedAttestationData(slot, 0)
if err != nil {
log.Warn("Failed to get attestation data", "err", err)
}
if ok {
a.slotWaitedForAttestationProduction.Add(slot, struct{}{})
return
}

time.Sleep(1 * time.Millisecond)
select {
case <-stopCh:
a.slotWaitedForAttestationProduction.Add(slot, struct{}{})
return
default:
}
if a.slotWaitedForAttestationProduction.Contains(slot) {
return
}

}
}

Expand Down Expand Up @@ -166,6 +174,15 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
}

a.waitForHeadSlot(*slot)

attestationData, ok, err := a.attestationProducer.CachedAttestationData(*slot, *committeeIndex)
if err != nil {
log.Warn("Failed to get attestation data", "err", err)
}
if ok {
return newBeaconResponse(attestationData), nil
}

clversion := a.beaconChainCfg.GetCurrentStateVersion(*slot / a.beaconChainCfg.SlotsPerEpoch)
if clversion.BeforeOrEqual(clparams.DenebVersion) && committeeIndex == nil {
return nil, beaconhttp.NewEndpointError(
Expand All @@ -178,7 +195,6 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
committeeIndex = &zero
}

var attestationData solid.AttestationData
if err := a.syncedData.ViewHeadState(func(headState *state.CachingBeaconState) error {
attestationData, err = a.attestationProducer.ProduceAndCacheAttestationData(
tx,
Expand Down
9 changes: 4 additions & 5 deletions cl/phase1/stages/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,27 +313,26 @@ func saveHeadStateOnDiskIfNeeded(cfg *Cfg, headState *state.CachingBeaconState)
// postForkchoiceOperations performs the post fork choice operations such as updating the head state, producing and caching attestation data,
// these sets of operations can take as long as they need to run, as by-now we are already synced.
func postForkchoiceOperations(ctx context.Context, tx kv.RwTx, logger log.Logger, cfg *Cfg, headSlot uint64, headRoot common.Hash) error {

// Retrieve the head state
headState, err := cfg.forkChoice.GetStateAtBlockRoot(headRoot, false)
if err != nil {
return fmt.Errorf("failed to get state at block root: %w", err)
}
if _, err = cfg.attestationDataProducer.ProduceAndCacheAttestationData(tx, headState, headRoot, headState.Slot(), 0); err != nil {
logger.Warn("failed to produce and cache attestation data", "err", err)
}
start := time.Now()
cfg.forkChoice.SetSynced(true) // Now we are synced
// Update the head state with the new head state
if err := cfg.syncedData.OnHeadState(headState); err != nil {
return fmt.Errorf("failed to set head state: %w", err)
}
start := time.Now()
defer func() {
logger.Debug("Post forkchoice operations", "duration", time.Since(start))
}()

return cfg.syncedData.ViewHeadState(func(headState *state.CachingBeaconState) error {
// Produce and cache attestation data for validator node (this is not an expensive operation so we can do it for all nodes)
// if _, err = cfg.attestationDataProducer.ProduceAndCacheAttestationData(tx, headState, headRoot, headState.Slot(), 0); err != nil {
// logger.Warn("failed to produce and cache attestation data", "err", err)
// }

// Run indexing routines for the database
if err := runIndexingRoutines(ctx, tx, cfg, headState); err != nil {
Expand Down
41 changes: 22 additions & 19 deletions cl/validator/attestation_producer/attestation_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type attestationProducer struct {
}

func New(ctx context.Context, beaconCfg *clparams.BeaconChainConfig) AttestationDataProducer {
ttl := time.Duration(beaconCfg.SecondsPerSlot) * time.Second
ttl := time.Duration(beaconCfg.SecondsPerSlot) * time.Second / 2
attestationsCache := lru.NewWithTTL[uint64, solid.AttestationData]("attestations", attestationsCacheSize, ttl)
blockRootsUsedForSlotCache, err := lru.New[uint64, libcommon.Hash]("blockRootsUsedForSlot", attestationsCacheSize)
if err != nil {
Expand Down Expand Up @@ -119,36 +119,32 @@ func (ap *attestationProducer) computeTargetCheckpoint(tx kv.Tx, baseState *stat
}, nil
}

func (ap *attestationProducer) ProduceAndCacheAttestationData(tx kv.Tx, baseState *state.CachingBeaconState, baseStateBlockRoot libcommon.Hash, slot uint64, committeeIndex uint64) (solid.AttestationData, error) {
func (ap *attestationProducer) CachedAttestationData(slot uint64, committeeIndex uint64) (solid.AttestationData, bool, error) {
epoch := slot / ap.beaconCfg.SlotsPerEpoch
var err error
ap.attCacheMutex.RLock()
defer ap.attCacheMutex.RUnlock()
if baseAttestationData, ok := ap.attestationsCache.Get(epoch); ok {
ap.attCacheMutex.RUnlock()
beaconBlockRoot, err := ap.beaconBlockRootForSlot(baseState, baseStateBlockRoot, slot)
if err != nil {
return solid.AttestationData{}, err
}
targetCheckpoint, err := ap.computeTargetCheckpoint(tx, baseState, baseStateBlockRoot, slot)
if err != nil {
return solid.AttestationData{}, err
beaconBlockRoot, ok := ap.blockRootsUsedForSlotCache.Get(slot)
if !ok {
return solid.AttestationData{}, false, nil
}
return solid.AttestationData{
Slot: slot,
CommitteeIndex: committeeIndex,
BeaconBlockRoot: beaconBlockRoot,
Source: baseAttestationData.Source,
Target: targetCheckpoint,
}, nil
Target: baseAttestationData.Target,
}, true, nil
}
ap.attCacheMutex.RUnlock()
return solid.AttestationData{}, false, nil
}

// in case the target epoch is not found, let's generate it with lock to avoid everyone trying to generate it
// at the same time, which would be a waste of memory resources
ap.attCacheMutex.Lock()
defer ap.attCacheMutex.Unlock()
// check again if the target epoch is already generated
func (ap *attestationProducer) ProduceAndCacheAttestationData(tx kv.Tx, baseState *state.CachingBeaconState, baseStateBlockRoot libcommon.Hash, slot uint64, committeeIndex uint64) (solid.AttestationData, error) {
epoch := slot / ap.beaconCfg.SlotsPerEpoch
var err error
ap.attCacheMutex.RLock()
if baseAttestationData, ok := ap.attestationsCache.Get(epoch); ok {
ap.attCacheMutex.RUnlock()
beaconBlockRoot, err := ap.beaconBlockRootForSlot(baseState, baseStateBlockRoot, slot)
if err != nil {
return solid.AttestationData{}, err
Expand All @@ -166,6 +162,12 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(tx kv.Tx, baseStat
Target: targetCheckpoint,
}, nil
}
ap.attCacheMutex.RUnlock()

// in case the target epoch is not found, let's generate it with lock to avoid everyone trying to generate it
// at the same time, which would be a waste of memory resources
ap.attCacheMutex.Lock()
defer ap.attCacheMutex.Unlock()

stateEpoch := state.Epoch(baseState)
if baseState.Slot() > slot {
Expand Down Expand Up @@ -198,6 +200,7 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(tx kv.Tx, baseStat
Source: baseState.CurrentJustifiedCheckpoint(),
Target: targetCheckpoint,
}
fmt.Println("baseAttestationData", baseAttestationData, "epoch", epoch, "baseStateBlockRoot", baseStateBlockRoot)
ap.attestationsCache.Add(epoch, baseAttestationData)
ap.blockRootsUsedForSlotCache.Add(slot, baseStateBlockRoot)

Expand Down
1 change: 1 addition & 0 deletions cl/validator/attestation_producer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ import (

type AttestationDataProducer interface {
ProduceAndCacheAttestationData(tx kv.Tx, baseState *state.CachingBeaconState, baseStateBlockRoot libcommon.Hash, slot uint64, committeeIndex uint64) (solid.AttestationData, error)
CachedAttestationData(slot uint64, committeeIndex uint64) (solid.AttestationData, bool, error)
}

0 comments on commit 1770d35

Please sign in to comment.