Skip to content

Commit

Permalink
fix(kademlia): always connect to bootnodes on startup to identify them (
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Nov 21, 2024
1 parent 440ff5c commit 2f2ef4a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
12 changes: 9 additions & 3 deletions pkg/topology/kademlia/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func IsBootnode(b bool) RecordOp {
return func(cs *Counters) {
cs.Lock()
defer cs.Unlock()
cs.isBootnode = b
cs.IsBootnode = b
}
}

Expand Down Expand Up @@ -147,13 +147,15 @@ type Snapshot struct {
LatencyEWMA time.Duration
Reachability p2p.ReachabilityStatus
Healthy bool
IsBootnode bool
}

// persistentCounters is a helper struct used for persisting selected counters.
type persistentCounters struct {
PeerAddress swarm.Address `json:"peerAddress"`
LastSeenTimestamp int64 `json:"lastSeenTimestamp"`
ConnTotalDuration time.Duration `json:"connTotalDuration"`
IsBootnode bool `json:"isBootnode"`
}

// Counters represents a collection of peer metrics
Expand All @@ -164,7 +166,7 @@ type Counters struct {
// Bookkeeping.
isLoggedIn bool
peerAddress swarm.Address
isBootnode bool
IsBootnode bool

// Counters.
lastSeenTimestamp int64
Expand All @@ -187,6 +189,7 @@ func (cs *Counters) UnmarshalJSON(b []byte) (err error) {
cs.peerAddress = val.PeerAddress
cs.lastSeenTimestamp = val.LastSeenTimestamp
cs.connTotalDuration = val.ConnTotalDuration
cs.IsBootnode = val.IsBootnode
cs.Unlock()
return nil
}
Expand All @@ -198,6 +201,7 @@ func (cs *Counters) MarshalJSON() ([]byte, error) {
PeerAddress: cs.peerAddress,
LastSeenTimestamp: cs.lastSeenTimestamp,
ConnTotalDuration: cs.connTotalDuration,
IsBootnode: cs.IsBootnode,
}
cs.Unlock()
return json.Marshal(val)
Expand All @@ -224,6 +228,7 @@ func (cs *Counters) snapshot(t time.Time) *Snapshot {
LatencyEWMA: cs.latencyEWMA,
Reachability: cs.ReachabilityStatus,
Healthy: cs.Healthy,
IsBootnode: cs.IsBootnode,
}
}

Expand All @@ -249,6 +254,7 @@ func NewCollector(db *shed.DB) (*Collector, error) {
peerAddress: val.PeerAddress,
lastSeenTimestamp: val.LastSeenTimestamp,
connTotalDuration: val.ConnTotalDuration,
IsBootnode: val.IsBootnode,
})
}

Expand Down Expand Up @@ -321,7 +327,7 @@ type ExcludeOp func(*Counters) bool
// IsBootnode is used to filter bootnode peers.
func Bootnode() ExcludeOp {
return func(cs *Counters) bool {
return cs.isBootnode
return cs.IsBootnode
}
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/topology/kademlia/internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ func TestPeerMetricsCollector(t *testing.T) {
t.Fatalf("Snapshot(%q, ...): session connection duration counter mismatch: have %q; want %q", addr, have, want)
}

// Bootnode.
mc.Record(addr, metrics.IsBootnode(false))
ss = snapshot(t, mc, t2, addr)
if have, want := ss.IsBootnode, false; have != want {
t.Fatalf("Snapshot(%q, ...): latency mismatch: have %v; want %v", addr, have, want)
}
mc.Record(addr, metrics.IsBootnode(true))
ss = snapshot(t, mc, t2, addr)
if have, want := ss.IsBootnode, true; have != want {
t.Fatalf("Snapshot(%q, ...): is bootnode mismatch: have %v; want %v", addr, have, want)
}

// Latency.
mc.Record(addr, metrics.PeerLatency(t4))
ss = snapshot(t, mc, t2, addr)
Expand Down Expand Up @@ -188,6 +200,7 @@ func TestPeerMetricsCollector(t *testing.T) {
want = &metrics.Snapshot{
LastSeenTimestamp: ss.LastSeenTimestamp,
ConnectionTotalDuration: 2 * ss.ConnectionTotalDuration, // 2x because we've already logout with t3 and login with t1 again.
IsBootnode: true,
}
if diff := cmp.Diff(have, want); diff != "" {
t.Fatalf("unexpected snapshot difference:\n%s", diff)
Expand Down
6 changes: 5 additions & 1 deletion pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,11 @@ func (k *Kad) balancedSlotPeers(pseudoAddr swarm.Address, peers []swarm.Address,
return ret
}

func (k *Kad) Start(_ context.Context) error {
func (k *Kad) Start(ctx context.Context) error {

// always discover bootnodes on startup to exclude them from protocol requests
k.connectBootNodes(ctx)

k.wg.Add(1)
go k.manage()

Expand Down

0 comments on commit 2f2ef4a

Please sign in to comment.