Skip to content

Commit

Permalink
Caplin: added better rate limiting of bandwidth (#12999)
Browse files Browse the repository at this point in the history
Remove all of our rate-limiting by just replacing it with a bandwidth
focused one
  • Loading branch information
Giulio2002 authored Dec 5, 2024
1 parent 234947a commit 644c9a9
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 164 deletions.
19 changes: 11 additions & 8 deletions cl/clparams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"gopkg.in/yaml.v2"

"github.com/c2h5oh/datasize"
"github.com/erigontech/erigon-lib/chain/networkname"
libcommon "github.com/erigontech/erigon-lib/common"

Expand Down Expand Up @@ -59,14 +60,16 @@ type CaplinConfig struct {
CustomGenesisStatePath string

// Network stuff
CaplinDiscoveryAddr string
CaplinDiscoveryPort uint64
CaplinDiscoveryTCPPort uint64
SentinelAddr string
SentinelPort uint64
SubscribeAllTopics bool
MaxPeerCount uint64
EnableUPnP bool
CaplinDiscoveryAddr string
CaplinDiscoveryPort uint64
CaplinDiscoveryTCPPort uint64
SentinelAddr string
SentinelPort uint64
SubscribeAllTopics bool
MaxPeerCount uint64
EnableUPnP bool
MaxInboundTrafficPerPeer datasize.ByteSize
MaxOutboundTrafficPerPeer datasize.ByteSize
// Erigon Sync
LoopBlockLimit uint64
// Beacon API router configuration
Expand Down
4 changes: 4 additions & 0 deletions cl/phase1/stages/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ func postForkchoiceOperations(ctx context.Context, tx kv.RwTx, logger log.Logger
if err != nil {
return fmt.Errorf("failed to get state at block root: %w", err)
}
// fail-safe check§
if headState == nil {
return nil
}
if _, err = cfg.attestationDataProducer.ProduceAndCacheAttestationData(tx, headState, headRoot, headState.Slot(), 0); err != nil {
logger.Warn("failed to produce and cache attestation data", "err", err)
}
Expand Down
4 changes: 4 additions & 0 deletions cl/sentinel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"

"github.com/c2h5oh/datasize"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"

"github.com/libp2p/go-libp2p"
Expand All @@ -40,6 +41,9 @@ type SentinelConfig struct {
IpAddr string
Port int
TCPPort uint

MaxInboundTrafficPerPeer datasize.ByteSize
MaxOutboundTrafficPerPeer datasize.ByteSize
// Optional
LocalIP string
EnableUPnP bool
Expand Down
11 changes: 0 additions & 11 deletions cl/sentinel/handlers/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,11 @@ import (
const maxBlobsThroughoutputPerRequest = 72

func (c *ConsensusHandlers) blobsSidecarsByRangeHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()

req := &cltypes.BlobsByRangeRequest{}
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.DenebVersion); err != nil {
return err
}
if err := c.checkRateLimit(peerId, "blobSidecar", rateLimits.blobSidecarsLimit, int(req.Count)); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}

tx, err := c.indiciesDB.BeginRo(c.ctx)
if err != nil {
Expand Down Expand Up @@ -85,18 +80,12 @@ func (c *ConsensusHandlers) blobsSidecarsByRangeHandler(s network.Stream) error
}

func (c *ConsensusHandlers) blobsSidecarsByIdsHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()

req := solid.NewStaticListSSZ[*cltypes.BlobIdentifier](40269, 40)
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.DenebVersion); err != nil {
return err
}

if err := c.checkRateLimit(peerId, "blobSidecar", rateLimits.blobSidecarsLimit, req.Len()); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}

tx, err := c.indiciesDB.BeginRo(c.ctx)
if err != nil {
return err
Expand Down
10 changes: 0 additions & 10 deletions cl/sentinel/handlers/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,11 @@ const (
)

func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()

req := &cltypes.BeaconBlocksByRangeRequest{}
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.Phase0Version); err != nil {
return err
}
if err := c.checkRateLimit(peerId, "beaconBlocksByRange", rateLimits.beaconBlocksByRangeLimit, int(req.Count)); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}

tx, err := c.indiciesDB.BeginRo(c.ctx)
if err != nil {
Expand Down Expand Up @@ -85,16 +80,11 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error {
}

func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()

var req solid.HashListSSZ = solid.NewHashList(100)
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.Phase0Version); err != nil {
return err
}
if err := c.checkRateLimit(peerId, "beaconBlocksByRoot", rateLimits.beaconBlocksByRootLimit, req.Length()); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}

blockRoots := []libcommon.Hash{}
for i := 0; i < req.Length(); i++ {
Expand Down
39 changes: 0 additions & 39 deletions cl/sentinel/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package handlers
import (
"context"
"errors"
"math"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon/cl/persistence/blob_storage"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
Expand Down Expand Up @@ -57,26 +54,6 @@ type RateLimits struct {
blobSidecarsLimit int
}

const (
punishmentPeriod = time.Minute
heartBeatRateLimit = math.MaxInt
blockHandlerRateLimit = 200
lightClientRateLimit = 500
blobHandlerRateLimit = 50 // very generous here.
)

var rateLimits = RateLimits{
pingLimit: heartBeatRateLimit,
goodbyeLimit: heartBeatRateLimit,
metadataV1Limit: heartBeatRateLimit,
metadataV2Limit: heartBeatRateLimit,
statusLimit: heartBeatRateLimit,
beaconBlocksByRangeLimit: blockHandlerRateLimit,
beaconBlocksByRootLimit: blockHandlerRateLimit,
lightClientLimit: lightClientRateLimit,
blobSidecarsLimit: blobHandlerRateLimit,
}

type ConsensusHandlers struct {
handlers map[protocol.ID]network.StreamHandler
hs *handshake.HandShaker
Expand All @@ -86,7 +63,6 @@ type ConsensusHandlers struct {
beaconDB freezeblocks.BeaconSnapshotReader

indiciesDB kv.RoDB
peerRateLimits sync.Map
punishmentEndTimes sync.Map
forkChoiceReader forkchoice.ForkChoiceStorageReader
host host.Host
Expand All @@ -113,7 +89,6 @@ func NewConsensusHandlers(ctx context.Context, db freezeblocks.BeaconSnapshotRea
ethClock: ethClock,
beaconConfig: beaconConfig,
ctx: ctx,
peerRateLimits: sync.Map{},
punishmentEndTimes: sync.Map{},
enableBlocks: enabledBlocks,
forkChoiceReader: forkChoiceReader,
Expand Down Expand Up @@ -158,20 +133,6 @@ func (c *ConsensusHandlers) checkRateLimit(peerId string, method string, limit,
c.punishmentEndTimes.Delete(keyHash)
}

value, ok := c.peerRateLimits.Load(keyHash)
if !ok {
value = rate.NewLimiter(rate.Every(time.Minute), limit)
c.peerRateLimits.Store(keyHash, value)
}

limiter := value.(*rate.Limiter)

if !limiter.AllowN(time.Now(), n) {
c.punishmentEndTimes.Store(keyHash, time.Now().Add(punishmentPeriod))
c.peerRateLimits.Delete(keyHash)
return errors.New("rate limit exceeded")
}

return nil
}

Expand Down
26 changes: 0 additions & 26 deletions cl/sentinel/handlers/heartbeats.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,13 @@ import (
// Since packets are just structs, they can be resent with no issue

func (c *ConsensusHandlers) pingHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "ping", rateLimits.pingLimit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}
return ssz_snappy.EncodeAndWrite(s, &cltypes.Ping{
Id: c.me.Seq(),
}, SuccessfulResponsePrefix)
}

func (c *ConsensusHandlers) goodbyeHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "goodbye", rateLimits.goodbyeLimit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}
gid := &cltypes.Ping{}
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, gid, clparams.Phase0Version); err != nil {
return err
Expand All @@ -64,11 +55,6 @@ func (c *ConsensusHandlers) goodbyeHandler(s network.Stream) error {
}

func (c *ConsensusHandlers) metadataV1Handler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "metadataV1", rateLimits.metadataV1Limit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}
subnetField := [8]byte{}
attSubEnr := enr.WithEntry(c.netCfg.AttSubnetKey, &subnetField)
if err := c.me.Node().Load(attSubEnr); err != nil {
Expand All @@ -82,12 +68,6 @@ func (c *ConsensusHandlers) metadataV1Handler(s network.Stream) error {
}

func (c *ConsensusHandlers) metadataV2Handler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()

if err := c.checkRateLimit(peerId, "metadataV2", rateLimits.metadataV2Limit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}
subnetField := [8]byte{}
syncnetField := [1]byte{}
attSubEnr := enr.WithEntry(c.netCfg.AttSubnetKey, &subnetField)
Expand All @@ -108,11 +88,5 @@ func (c *ConsensusHandlers) metadataV2Handler(s network.Stream) error {

// TODO: Actually respond with proper status
func (c *ConsensusHandlers) statusHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "status", rateLimits.statusLimit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}

return ssz_snappy.EncodeAndWrite(s, c.hs.Status(), SuccessfulResponsePrefix)
}
22 changes: 0 additions & 22 deletions cl/sentinel/handlers/light_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ import (
const maxLightClientsPerRequest = 100

func (c *ConsensusHandlers) optimisticLightClientUpdateHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "light_client", rateLimits.lightClientLimit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}
lc := c.forkChoiceReader.NewestLightClientUpdate()
if lc == nil {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavailablePrefix)
Expand All @@ -52,11 +47,6 @@ func (c *ConsensusHandlers) optimisticLightClientUpdateHandler(s network.Stream)
}

func (c *ConsensusHandlers) finalityLightClientUpdateHandler(s network.Stream) error {
peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "light_client", rateLimits.lightClientLimit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}
lc := c.forkChoiceReader.NewestLightClientUpdate()
if lc == nil {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavailablePrefix)
Expand All @@ -82,12 +72,6 @@ func (c *ConsensusHandlers) lightClientBootstrapHandler(s network.Stream) error
return err
}

peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "light_client", rateLimits.lightClientLimit, 1); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}

lc, has := c.forkChoiceReader.GetLightClientBootstrap(root.Root)
if !has {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavailablePrefix)
Expand All @@ -108,12 +92,6 @@ func (c *ConsensusHandlers) lightClientUpdatesByRangeHandler(s network.Stream) e
return err
}

peerId := s.Conn().RemotePeer().String()
if err := c.checkRateLimit(peerId, "light_client", rateLimits.lightClientLimit, int(req.Count)); err != nil {
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
return err
}

lightClientUpdates := make([]*cltypes.LightClientUpdate, 0, maxLightClientsPerRequest)

endPeriod := req.StartPeriod + req.Count
Expand Down
2 changes: 1 addition & 1 deletion cl/sentinel/peers/peers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewPool() *Pool {
return &Pool{
peerData: make(map[peer.ID]*Item),
queue: ring.NewBuffer[*Item](0, 1024),
bannedPeers: lru.NewWithTTL[peer.ID, struct{}]("bannedPeers", 100_000, 5*time.Minute),
bannedPeers: lru.NewWithTTL[peer.ID, struct{}]("bannedPeers", 100_000, 30*time.Minute),
}
}

Expand Down
Loading

0 comments on commit 644c9a9

Please sign in to comment.