Skip to content

Commit

Permalink
Caplin: Fixed single validator performance (#12830)
Browse files Browse the repository at this point in the history
Issues: 
- Had to fix an hack I added for a race
- Basically we need to process attestations otherwise the network kicks
us out. However, we can rate limit the stream.
- Fixed switching ENRs logic
  • Loading branch information
Giulio2002 authored Nov 22, 2024
1 parent 7f8a584 commit eb89d4d
Show file tree
Hide file tree
Showing 23 changed files with 202 additions and 123 deletions.
26 changes: 26 additions & 0 deletions cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ func (a *ApiHandler) waitUntilHeadStateAtEpochIsReadyOrCountAsMissed(ctx context
time.Sleep(30 * time.Millisecond)
}
}

func (a *ApiHandler) waitForHeadSlot(slot uint64) {
stopCh := time.After(time.Second)
for {
if a.syncedData.HeadSlot() >= slot {
return
}
time.Sleep(1 * time.Millisecond)
select {
case <-stopCh:
a.slotWaitedForAttestationProduction.Add(slot, struct{}{})
return
default:
}
if a.slotWaitedForAttestationProduction.Contains(slot) {
return
}
}
}

func (a *ApiHandler) GetEthV1ValidatorAttestationData(
w http.ResponseWriter,
r *http.Request,
Expand Down Expand Up @@ -141,6 +161,11 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
errors.New("slot is required"),
)
}
if *slot > a.ethClock.GetCurrentSlot() {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, errors.New("slot is in the future"))
}

a.waitForHeadSlot(*slot)
clversion := a.beaconChainCfg.GetCurrentStateVersion(*slot / a.beaconChainCfg.SlotsPerEpoch)
if clversion.BeforeOrEqual(clparams.DenebVersion) && committeeIndex == nil {
return nil, beaconhttp.NewEndpointError(
Expand All @@ -162,6 +187,7 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
*slot,
*committeeIndex,
)

if err == attestation_producer.ErrHeadStateBehind {
return beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
Expand Down
46 changes: 26 additions & 20 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ type ApiHandler struct {
logger log.Logger

// Validator data structures
validatorParams *validator_params.ValidatorParams
blobBundles *lru.Cache[common.Bytes48, BlobBundle] // Keep recent bundled blobs from the execution layer.
engine execution_client.ExecutionEngine
syncMessagePool sync_contribution_pool.SyncContributionPool
committeeSub *committee_subscription.CommitteeSubscribeMgmt
attestationProducer attestation_producer.AttestationDataProducer
aggregatePool aggregation.AggregationPool
validatorParams *validator_params.ValidatorParams
blobBundles *lru.Cache[common.Bytes48, BlobBundle] // Keep recent bundled blobs from the execution layer.
engine execution_client.ExecutionEngine
syncMessagePool sync_contribution_pool.SyncContributionPool
committeeSub *committee_subscription.CommitteeSubscribeMgmt
attestationProducer attestation_producer.AttestationDataProducer
slotWaitedForAttestationProduction *lru.Cache[uint64, struct{}]
aggregatePool aggregation.AggregationPool

// services
syncCommitteeMessagesService services.SyncCommitteeMessagesService
Expand Down Expand Up @@ -152,20 +153,25 @@ func NewApiHandler(
if err != nil {
panic(err)
}
slotWaitedForAttestationProduction, err := lru.New[uint64, struct{}]("slotWaitedForAttestationProduction", 1024)
if err != nil {
panic(err)
}
return &ApiHandler{
logger: logger,
validatorParams: validatorParams,
o: sync.Once{},
netConfig: netConfig,
ethClock: ethClock,
beaconChainCfg: beaconChainConfig,
indiciesDB: indiciesDB,
forkchoiceStore: forkchoiceStore,
operationsPool: operationsPool,
blockReader: rcsn,
syncedData: syncedData,
stateReader: stateReader,
caplinStateSnapshots: caplinStateSnapshots,
logger: logger,
validatorParams: validatorParams,
o: sync.Once{},
netConfig: netConfig,
ethClock: ethClock,
beaconChainCfg: beaconChainConfig,
indiciesDB: indiciesDB,
forkchoiceStore: forkchoiceStore,
operationsPool: operationsPool,
blockReader: rcsn,
syncedData: syncedData,
stateReader: stateReader,
caplinStateSnapshots: caplinStateSnapshots,
slotWaitedForAttestationProduction: slotWaitedForAttestationProduction,
randaoMixesPool: sync.Pool{New: func() interface{} {
return solid.NewHashVector(int(beaconChainConfig.EpochsPerHistoricalVector))
}},
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
fcu.Pool = opPool

if useRealSyncDataMgr {
syncedData = synced_data.NewSyncedDataManager(&bcfg, true, 0)
syncedData = synced_data.NewSyncedDataManager(&bcfg, true)
} else {
syncedData = sync_mock_services.NewMockSyncedData(ctrl)
}
Expand Down
19 changes: 5 additions & 14 deletions cl/beacon/synced_data/synced_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,22 @@ var _ SyncedData = (*SyncedDataManager)(nil)

func EmptyCancel() {}

const MinHeadStateDelay = 600 * time.Millisecond

type SyncedDataManager struct {
enabled bool
cfg *clparams.BeaconChainConfig

headRoot atomic.Value
headSlot atomic.Uint64

headState *state.CachingBeaconState
minHeadStateDelay time.Duration
headState *state.CachingBeaconState

mu sync.RWMutex
}

func NewSyncedDataManager(cfg *clparams.BeaconChainConfig, enabled bool, minHeadStateDelay time.Duration) *SyncedDataManager {
func NewSyncedDataManager(cfg *clparams.BeaconChainConfig, enabled bool) *SyncedDataManager {
return &SyncedDataManager{
enabled: enabled,
cfg: cfg,
minHeadStateDelay: minHeadStateDelay,
enabled: enabled,
cfg: cfg,
}
}

Expand All @@ -67,7 +63,7 @@ func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err
defer s.mu.Unlock()

var blkRoot common.Hash
start := time.Now()

if s.headState == nil {
s.headState, err = newState.Copy()
} else {
Expand All @@ -82,11 +78,6 @@ func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err
}
s.headSlot.Store(newState.Slot())
s.headRoot.Store(blkRoot)
took := time.Since(start)
// Delay head update to avoid being out of sync with slower nodes.
if took < s.minHeadStateDelay {
time.Sleep(s.minHeadStateDelay - took)
}
return err
}

Expand Down
4 changes: 2 additions & 2 deletions cl/phase1/forkchoice/fork_choice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestForkChoiceBasic(t *testing.T) {
Root: libcommon.HexToHash("0x564d76d91f66c1fb2977484a6184efda2e1c26dd01992e048353230e10f83201"),
Epoch: 0,
}
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true)
// Decode test blocks
block0x3a, block0xc2, block0xd4 := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig, clparams.DenebVersion),
cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig, clparams.DenebVersion),
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestForkChoiceChainBellatrix(t *testing.T) {
// Initialize forkchoice store
pool := pool.NewOperationsPool(&clparams.MainnetBeaconConfig)
emitters := beaconevents.NewEventEmitter()
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{
Beacon: true,
}, emitters), emitters, sd, nil, nil, public_keys_registry.NewInMemoryPublicKeysRegistry(), false)
Expand Down
8 changes: 4 additions & 4 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type GossipManager struct {
voluntaryExitService services.VoluntaryExitService
blsToExecutionChangeService services.BLSToExecutionChangeService
proposerSlashingService services.ProposerSlashingService
attestationsLimiter *timeBasedRateLimiter
}

func NewGossipReceiver(
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewGossipReceiver(
voluntaryExitService: voluntaryExitService,
blsToExecutionChangeService: blsToExecutionChangeService,
proposerSlashingService: proposerSlashingService,
attestationsLimiter: newTimeBasedRateLimiter(6*time.Second, 250),
}
}

Expand Down Expand Up @@ -273,12 +275,10 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
if err := obj.Attestation.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
return err
}

if g.committeeSub.NeedToAggregate(obj.Attestation) {
if g.committeeSub.NeedToAggregate(obj.Attestation) || g.attestationsLimiter.tryAcquire() {
return g.attestationService.ProcessMessage(ctx, data.SubnetId, obj)
}

return nil
return services.ErrIgnore
default:
return fmt.Errorf("unknown topic %s", data.Name)
}
Expand Down
1 change: 0 additions & 1 deletion cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(
subnet *uint64,
aggregateAndProof *cltypes.SignedAggregateAndProofData,
) error {

selectionProof := aggregateAndProof.SignedAggregateAndProof.Message.SelectionProof
aggregateData := aggregateAndProof.SignedAggregateAndProof.Message.Aggregate.Data
aggregate := aggregateAndProof.SignedAggregateAndProof.Message.Aggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func setupAggregateAndProofTest(t *testing.T) (AggregateAndProofService, *synced
ctx, cn := context.WithCancel(context.Background())
cn()
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true, 0)
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true)
forkchoiceMock := mock_services.NewForkChoiceStorageMock(t)
p := pool.OperationsPool{}
p.AttestationsPool = pool.NewOperationPool[libcommon.Bytes96, *solid.Attestation](100, "test")
Expand Down
97 changes: 48 additions & 49 deletions cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewAttestationService(
attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration),
}

go a.loop(ctx)
//go a.loop(ctx)
return a
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
// [IGNORE] The block being voted for (attestation.data.beacon_block_root) has been seen (via both gossip and non-gossip sources)
// (a client MAY queue attestations for processing once block is retrieved).
if _, ok := s.forkchoiceStore.GetHeader(root); !ok {
s.scheduleAttestationForLaterProcessing(att)
//s.scheduleAttestationForLaterProcessing(att)
return ErrIgnore
}

Expand All @@ -245,10 +245,6 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return fmt.Errorf("invalid finalized checkpoint %w", ErrIgnore)
}

if !s.committeeSubscribe.NeedToAggregate(att.Attestation) {
return ErrIgnore
}

aggregateVerificationData := &AggregateVerificationData{
Signatures: [][]byte{signature[:]},
SignRoots: [][]byte{signingRoot[:]},
Expand All @@ -257,10 +253,13 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
F: func() {
start := time.Now()
defer monitor.ObserveAggregateAttestation(start)
err = s.committeeSubscribe.AggregateAttestation(att.Attestation)
if errors.Is(err, aggregation.ErrIsSuperset) {
return
if s.committeeSubscribe.NeedToAggregate(att.Attestation) {
err = s.committeeSubscribe.AggregateAttestation(att.Attestation)
if errors.Is(err, aggregation.ErrIsSuperset) {
return
}
}

if err != nil {
log.Warn("could not check aggregate attestation", "err", err)
return
Expand All @@ -284,47 +283,47 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return ErrIgnore
}

type attestationJob struct {
att *AttestationWithGossipData
creationTime time.Time
subnet uint64
}
// type attestationJob struct {
// att *AttestationWithGossipData
// creationTime time.Time
// subnet uint64
// }

func (a *attestationService) scheduleAttestationForLaterProcessing(att *AttestationWithGossipData) {
key, err := att.Attestation.HashSSZ()
if err != nil {
return
}
a.attestationsToBeLaterProcessed.Store(key, &attestationJob{
att: att,
creationTime: time.Now(),
})
}
// func (a *attestationService) scheduleAttestationForLaterProcessing(att *AttestationWithGossipData) {
// key, err := att.Attestation.HashSSZ()
// if err != nil {
// return
// }
// a.attestationsToBeLaterProcessed.Store(key, &attestationJob{
// att: att,
// creationTime: time.Now(),
// })
// }

func (a *attestationService) loop(ctx context.Context) {
ticker := time.NewTicker(singleAttestationIntervalTick)
defer ticker.Stop()
// func (a *attestationService) loop(ctx context.Context) {
// ticker := time.NewTicker(singleAttestationIntervalTick)
// defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
a.attestationsToBeLaterProcessed.Range(func(key, value any) bool {
k := key.([32]byte)
v := value.(*attestationJob)
if time.Now().After(v.creationTime.Add(singleAttestationJobExpiry)) {
a.attestationsToBeLaterProcessed.Delete(k)
return true
}
// for {
// select {
// case <-ctx.Done():
// return
// case <-ticker.C:
// }
// a.attestationsToBeLaterProcessed.Range(func(key, value any) bool {
// k := key.([32]byte)
// v := value.(*attestationJob)
// if time.Now().After(v.creationTime.Add(singleAttestationJobExpiry)) {
// a.attestationsToBeLaterProcessed.Delete(k)
// return true
// }

root := v.att.Attestation.Data.BeaconBlockRoot
if _, ok := a.forkchoiceStore.GetHeader(root); !ok {
return true
}
a.ProcessMessage(ctx, &v.subnet, v.att)
return true
})
}
}
// root := v.att.Attestation.Data.BeaconBlockRoot
// if _, ok := a.forkchoiceStore.GetHeader(root); !ok {
// return true
// }
// a.ProcessMessage(ctx, &v.subnet, v.att)
// return true
// })
// }
// }
2 changes: 1 addition & 1 deletion cl/phase1/network/services/attestation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (t *attestationTestSuite) SetupTest() {
t.gomockCtrl = gomock.NewController(t.T())
t.mockForkChoice = &mock_services.ForkChoiceStorageMock{}
_, st, _ := tests.GetBellatrixRandom()
t.syncedData = synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
t.syncedData = synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true)
t.syncedData.OnHeadState(st)
t.committeeSubscibe = mockCommittee.NewMockCommitteeSubscribe(t.gomockCtrl)
t.ethClock = eth_clock.NewMockEthereumClock(t.gomockCtrl)
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/blob_sidecar_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func setupBlobSidecarService(t *testing.T, ctrl *gomock.Controller, test bool) (
ctx2, cn := context.WithTimeout(ctx, 1)
cn()
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true, 0)
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true)
ethClock := eth_clock.NewMockEthereumClock(ctrl)
forkchoiceMock := mock_services.NewForkChoiceStorageMock(t)
emitters := beaconevents.NewEventEmitter()
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/block_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func setupBlockService(t *testing.T, ctrl *gomock.Controller) (BlockService, *synced_data.SyncedDataManager, *eth_clock.MockEthereumClock, *mock_services.ForkChoiceStorageMock) {
db := memdb.NewTestDB(t)
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true, 0)
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true)
ethClock := eth_clock.NewMockEthereumClock(ctrl)
forkchoiceMock := mock_services.NewForkChoiceStorageMock(t)
blockService := NewBlockService(context.Background(), db, forkchoiceMock, syncedDataManager, ethClock, cfg, nil)
Expand Down
Loading

0 comments on commit eb89d4d

Please sign in to comment.