From e00c3a7d37e9d24370d28783be8262820be91cc2 Mon Sep 17 00:00:00 2001 From: amirylm Date: Mon, 13 Nov 2023 20:44:15 -0300 Subject: [PATCH] peer score params --- commons/config.go | 96 -------------- commons/config_pubsub.go | 262 +++++++++++++++++++++++++++++++++++++++ core/gossip/scoring.go | 18 +-- 3 files changed, 272 insertions(+), 104 deletions(-) create mode 100644 commons/config_pubsub.go diff --git a/commons/config.go b/commons/config.go index ac45ec0..07f89e3 100644 --- a/commons/config.go +++ b/commons/config.go @@ -82,102 +82,6 @@ func (dc *DiscoveryConfig) Defaults() { } } -// PubsubConfig contains config for the pubsub router -type PubsubConfig struct { - Topics []TopicConfig `json:"topics" yaml:"topics"` - Overlay *OverlayParams `json:"overlay,omitempty" yaml:"overlay,omitempty"` - SubFilter *SubscriptionFilter `json:"subFilter,omitempty" yaml:"subFilter,omitempty"` - MaxMessageSize int `json:"maxMessageSize,omitempty" yaml:"maxMessageSize,omitempty"` - MsgValidationTimeout time.Duration `json:"msgValidationTimeout,omitempty" yaml:"msgValidationTimeout,omitempty"` - Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"` - MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"` - MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"` - Trace bool `json:"trace,omitempty" yaml:"trace,omitempty"` -} - -func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) { - for _, t := range psc.Topics { - if t.Name == name { - return t, true - } - } - return TopicConfig{}, false -} - -type MsgIDFnConfig struct { - Type string `json:"type,omitempty" yaml:"type,omitempty"` - Size int `json:"size,omitempty" yaml:"size,omitempty"` -} - -type MsgValidationConfig struct { - Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` - Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"` -} - -func (mvc MsgValidationConfig) Defaults(other *MsgValidationConfig) MsgValidationConfig { - if mvc.Timeout.Milliseconds() == 0 { - mvc.Timeout = time.Second * 5 - } - if mvc.Concurrency == 0 { - mvc.Concurrency = 10 - } - if other != nil { - if other.Timeout.Milliseconds() > 0 { - mvc.Timeout = other.Timeout - } - if other.Concurrency > 0 { - mvc.Concurrency = other.Concurrency - } - } - return mvc -} - -// TopicConfig contains configuration of a pubsub topic -type TopicConfig struct { - Name string `json:"name" yaml:"name"` - BufferSize int `json:"bufferSize,omitempty" yaml:"bufferSize,omitempty"` - RateLimit float64 `json:"rateLimit,omitempty" yaml:"rateLimit,omitempty"` - Overlay *OverlayParams `json:"overlay,omitempty" yaml:"overlay,omitempty"` - Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"` - MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"` - MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"` -} - -// SubscriptionFilter configurations -type SubscriptionFilter struct { - // Pattern of topics to accept - Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` - // Limit is the max number of topics to accept - Limit int `json:"limit,omitempty" yaml:"limit,omitempty"` -} - -// OverlayParams are the overlay params as defined in -// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#parameters -type OverlayParams struct { - // D is the desired outbound degree of the network (6) - D int32 `json:"d,omitempty" yaml:"d,omitempty"` - // Dlow is the lower bound for outbound degree (4) - Dlow int32 `json:"dlo,omitempty" yaml:"dlo,omitempty"` - // Dhi is the upper bound for outbound degree (12) - Dhi int32 `json:"dhi,omitempty" yaml:"dhi,omitempty"` - // Dlazy is the outbound degree for gossip emission (D) - Dlazy int32 `json:"dlazy,omitempty" yaml:"dlazy,omitempty"` - // HeartbeatInterval is the time between heartbeats (1sec) - HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty" yaml:"heartbeatInterval,omitempty"` - // FanoutTtl time-to-live for each topic's fanout state (60sec) - FanoutTtl time.Duration `json:"fanoutTTL,omitempty" yaml:"fanoutTTL,omitempty"` - // McacheLen is the number of history windows in message cache (5) - McacheLen int32 `json:"mCacheLen,omitempty" yaml:"mCacheLen,omitempty"` - // McacheGossip is the number of history windows to use when emitting gossip (3) - McacheGossip int32 `json:"mCacheGossip,omitempty" yaml:"mCacheGossip,omitempty"` - // SeenTtl is the expiry time for cache of seen message ids (2min) - SeenTtl time.Duration `json:"seenTTL,omitempty" yaml:"seenTTL,omitempty"` -} - -type ScoringParams struct { - // TODO -} - type PProf struct { Enabled bool Port uint diff --git a/commons/config_pubsub.go b/commons/config_pubsub.go new file mode 100644 index 0000000..3e757ce --- /dev/null +++ b/commons/config_pubsub.go @@ -0,0 +1,262 @@ +package commons + +import ( + "net" + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" +) + +// PubsubConfig contains config for the pubsub router +type PubsubConfig struct { + Topics []TopicConfig `json:"topics" yaml:"topics"` + Overlay *OverlayParams `json:"overlay,omitempty" yaml:"overlay,omitempty"` + SubFilter *SubscriptionFilter `json:"subFilter,omitempty" yaml:"subFilter,omitempty"` + MaxMessageSize int `json:"maxMessageSize,omitempty" yaml:"maxMessageSize,omitempty"` + MsgValidationTimeout time.Duration `json:"msgValidationTimeout,omitempty" yaml:"msgValidationTimeout,omitempty"` + Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"` + MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"` + MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"` + Trace bool `json:"trace,omitempty" yaml:"trace,omitempty"` +} + +func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) { + for _, t := range psc.Topics { + if t.Name == name { + return t, true + } + } + return TopicConfig{}, false +} + +type MsgIDFnConfig struct { + Type string `json:"type,omitempty" yaml:"type,omitempty"` + Size int `json:"size,omitempty" yaml:"size,omitempty"` +} + +type MsgValidationConfig struct { + Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"` +} + +func (mvc MsgValidationConfig) Defaults(other *MsgValidationConfig) MsgValidationConfig { + if mvc.Timeout.Milliseconds() == 0 { + mvc.Timeout = time.Second * 5 + } + if mvc.Concurrency == 0 { + mvc.Concurrency = 10 + } + if other != nil { + if other.Timeout.Milliseconds() > 0 { + mvc.Timeout = other.Timeout + } + if other.Concurrency > 0 { + mvc.Concurrency = other.Concurrency + } + } + return mvc +} + +// TopicConfig contains configuration of a pubsub topic +type TopicConfig struct { + Name string `json:"name" yaml:"name"` + BufferSize int `json:"bufferSize,omitempty" yaml:"bufferSize,omitempty"` + RateLimit float64 `json:"rateLimit,omitempty" yaml:"rateLimit,omitempty"` + Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"` + MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"` + MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"` +} + +// SubscriptionFilter configurations +type SubscriptionFilter struct { + // Pattern of topics to accept + Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` + // Limit is the max number of topics to accept + Limit int `json:"limit,omitempty" yaml:"limit,omitempty"` +} + +// OverlayParams are the overlay params as defined in +// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#parameters +type OverlayParams struct { + // D is the desired outbound degree of the network (6) + D int32 `json:"d,omitempty" yaml:"d,omitempty"` + // Dlow is the lower bound for outbound degree (4) + Dlow int32 `json:"dlo,omitempty" yaml:"dlo,omitempty"` + // Dhi is the upper bound for outbound degree (12) + Dhi int32 `json:"dhi,omitempty" yaml:"dhi,omitempty"` + // Dlazy is the outbound degree for gossip emission (D) + Dlazy int32 `json:"dlazy,omitempty" yaml:"dlazy,omitempty"` + // HeartbeatInterval is the time between heartbeats (1sec) + HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty" yaml:"heartbeatInterval,omitempty"` + // FanoutTtl time-to-live for each topic's fanout state (60sec) + FanoutTtl time.Duration `json:"fanoutTTL,omitempty" yaml:"fanoutTTL,omitempty"` + // McacheLen is the number of history windows in message cache (5) + McacheLen int32 `json:"mCacheLen,omitempty" yaml:"mCacheLen,omitempty"` + // McacheGossip is the number of history windows to use when emitting gossip (3) + McacheGossip int32 `json:"mCacheGossip,omitempty" yaml:"mCacheGossip,omitempty"` + // SeenTtl is the expiry time for cache of seen message ids (2min) + SeenTtl time.Duration `json:"seenTTL,omitempty" yaml:"seenTTL,omitempty"` +} + +type ScoringParams struct { + // whether it is allowed to just set some params and not all of them + SkipAtomic bool `json:"skipAtomic,omitempty" yaml:"skipAtomic,omitempty"` + // params (P1-P4) per topic + Topics map[string]TopicScoreParams `json:"topics,omitempty" yaml:"topics,omitempty"` + // Aggregate topic score cap; this limits the total contribution of topics towards a positive + // score. It must be positive (or 0 for no cap). + TopicScoreCap float64 `json:"topicScoreCap,omitempty" yaml:"topicScoreCap,omitempty"` + // P5: Application-specific peer scoring + AppSpecific *AppSpecificScoring `json:"appSpecificScore,omitempty" yaml:"appSpecificScore,omitempty"` + // P6: IP-colocation factor. + // The parameter has an associated counter which counts the number of peers with the same IP. + IPColocationFactor *IpColocationScoring `json:"ipColocationFactor,omitempty" yaml:"ipColocationFactor,omitempty"` + // P7: behavioural pattern penalties. + // This parameter has an associated counter which tracks misbehaviour as detected by the + // router. The router currently applies penalties for the following behaviors: + // - attempting to re-graft before the prune backoff time has elapsed. + // - not following up in IWANT requests for messages advertised with IHAVE. + BehaviourPenalty *ScoringParamExtended `json:"behaviourPenalty,omitempty" yaml:"behaviourPenalty,omitempty"` +} + +func (sc *ScoringParams) ToStd() *pubsub.PeerScoreParams { + stdParams := pubsub.PeerScoreParams{ + SkipAtomicValidation: sc.SkipAtomic, + Topics: make(map[string]*pubsub.TopicScoreParams), + TopicScoreCap: sc.TopicScoreCap, + } + + for topic, tsp := range sc.Topics { + stdParams.Topics[topic] = tsp.ToStd() + } + + if p := sc.AppSpecific; p != nil { + stdParams.AppSpecificScore = p.ScoreFn + stdParams.AppSpecificWeight = p.Weight + } + + if p := sc.IPColocationFactor; p != nil { + stdParams.IPColocationFactorThreshold = p.Threshold + stdParams.IPColocationFactorWeight = p.Weight + stdParams.IPColocationFactorWhitelist = p.Whitelist + } + + if p := sc.BehaviourPenalty; p != nil { + stdParams.BehaviourPenaltyWeight = p.Weight + stdParams.BehaviourPenaltyDecay = p.Decay + stdParams.BehaviourPenaltyThreshold = p.Threshold + } + + return &stdParams +} + +// TopicScoreParams is used to configure topic scoring. +// The struct provides a simpler abstraction of pubsub.TopicScoreParams. +// for more information please refer to pubsub.TopicScoreParams. +type TopicScoreParams struct { + // whether it is allowed to just set some params and not all of them + SkipAtomic bool `json:"skipAtomic,omitempty" yaml:"skipAtomic,omitempty"` + // The weight of the topic + Weight float64 `json:"weight,omitempty" yaml:"weight,omitempty"` + // P1: time in the mesh + // This is the time the peer has been grafted in the mesh. + TimeInMesh *ScoringParamBase `json:"timeInMesh,omitempty" yaml:"timeInMesh,omitempty"` + // P2: first message deliveries + // This is the number of message deliveries in the topic. + FirstMessageDeliveries *ScoringParamBase `json:"firstMessageDeliveries,omitempty" yaml:"firstMessageDeliveries,omitempty"` + // P3: mesh message deliveries + // This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of + // message validation; deliveries during validation also count and are retroactively applied + // when validation succeeds. + // This window accounts for the minimum time before a hostile mesh peer trying to game the score + // could replay back a valid message we just sent them. + // It effectively tracks first and near-first deliveries, i.e., a message seen from a mesh peer + // before we have forwarded it to them. + MeshMessageDeliveries *ScoringParamExtended `json:"meshMessageDeliveries,omitempty" yaml:"meshMessageDeliveries,omitempty"` + // P3b: sticky mesh propagation failures + // This is a sticky penalty that applies when a peer gets pruned from the mesh with an active + // mesh message delivery penalty. + MeshFailurePenalty *ScoringParamBase `json:"meshFailurePenalty,omitempty" yaml:"meshFailurePenalty,omitempty"` + // P4: invalid messages + // This is the number of invalid messages in the topic. + InvalidMessageDeliveries *ScoringParamBase `json:"invalidMessageDeliveries,omitempty" yaml:"invalidMessageDeliveries,omitempty"` +} + +// ToStd translates the TopicScoreParams to pubsub.TopicScoreParams +func (tsp *TopicScoreParams) ToStd() *pubsub.TopicScoreParams { + stdParams := &pubsub.TopicScoreParams{ + SkipAtomicValidation: tsp.SkipAtomic, + TopicWeight: tsp.Weight, + } + + if p := tsp.TimeInMesh; p != nil { + stdParams.TimeInMeshCap = p.Cap + stdParams.TimeInMeshQuantum = p.Quantum + stdParams.TimeInMeshWeight = p.Weight + } + + if p := tsp.FirstMessageDeliveries; p != nil { + stdParams.FirstMessageDeliveriesCap = p.Cap + stdParams.FirstMessageDeliveriesDecay = p.Decay + stdParams.FirstMessageDeliveriesWeight = p.Weight + } + + if p := tsp.MeshMessageDeliveries; p != nil { + stdParams.MeshMessageDeliveriesCap = p.Cap + stdParams.MeshMessageDeliveriesDecay = p.Decay + stdParams.MeshMessageDeliveriesWeight = p.Weight + stdParams.MeshMessageDeliveriesThreshold = p.Threshold + stdParams.MeshMessageDeliveriesActivation = p.Activation + stdParams.MeshMessageDeliveriesWindow = p.Window + } + + if p := tsp.MeshFailurePenalty; p != nil { + stdParams.MeshFailurePenaltyDecay = p.Decay + stdParams.MeshFailurePenaltyWeight = p.Weight + } + + if p := tsp.InvalidMessageDeliveries; p != nil { + stdParams.InvalidMessageDeliveriesDecay = p.Decay + stdParams.InvalidMessageDeliveriesWeight = p.Weight + } + + return stdParams +} + +type AppSpecificScoring struct { + ScoreFn func(p peer.ID) float64 + Weight float64 +} + +type ScoringParamBase struct { + Weight float64 `json:"weight,omitempty" yaml:"weight,omitempty"` + Decay float64 `json:"decay,omitempty" yaml:"decay,omitempty"` + Cap float64 `json:"cap,omitempty" yaml:"cap,omitempty"` + // used only for TimeInMesh (p1) + Quantum time.Duration `json:"quantum,omitempty" yaml:"quantum,omitempty"` +} + +type ScoringParamExtended struct { + Weight float64 `json:"weight,omitempty" yaml:"weight,omitempty"` + Decay float64 `json:"decay,omitempty" yaml:"decay,omitempty"` + Cap float64 `json:"cap,omitempty" yaml:"cap,omitempty"` + Threshold float64 `json:"threshold,omitempty" yaml:"threshold,omitempty"` + Window time.Duration `json:"window,omitempty" yaml:"window,omitempty"` + Activation time.Duration `json:"activation,omitempty" yaml:"activation,omitempty"` +} + +// P6: IP-colocation factor. +// The parameter has an associated counter which counts the number of peers with the same IP. +// If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value +// is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2. +// If the number of peers in the same IP is less than the threshold, then the value is 0. +// The weight of the parameter MUST be negative, unless you want to disable for testing. +// +// Note: In order to simulate many IPs in a managable manner when testing, +// you can set the weight to 0 thus disabling the IP colocation penalty. +type IpColocationScoring struct { + Weight float64 + Threshold int + Whitelist []*net.IPNet +} diff --git a/core/gossip/scoring.go b/core/gossip/scoring.go index 6c7e47e..33eb3b2 100644 --- a/core/gossip/scoring.go +++ b/core/gossip/scoring.go @@ -6,12 +6,14 @@ import ( ) func PeerScores(cfg commons.PubsubConfig) (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds) { - return &pubsub.PeerScoreParams{ - // TODO - }, &pubsub.PeerScoreThresholds{ - // TODO: using reasonable defaults, requires tuning - GossipThreshold: -10000, - PublishThreshold: -2000, - GraylistThreshold: -400, - } + peerScores := &pubsub.PeerScoreParams{} + if cfg.Scoring != nil { + peerScores = cfg.Scoring.ToStd() + } + return peerScores, &pubsub.PeerScoreThresholds{ + // TODO: using reasonable defaults, requires tuning + GossipThreshold: -10000, + PublishThreshold: -2000, + GraylistThreshold: -400, + } }