From f478f512698d041cf2c53fba8be2483a6f910d01 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:39:07 +0300 Subject: [PATCH 1/4] fix(kademlia): exclude bootnodes from protocol requests iterators --- pkg/topology/kademlia/kademlia.go | 13 ++++++++++--- pkg/topology/kademlia/kademlia_test.go | 21 +++++++++++++++++++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 9462372093f..923e382acc6 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -14,6 +14,7 @@ import ( "math/rand" "path/filepath" "sync" + "sync/atomic" "time" "github.com/ethersphere/bee/v2/pkg/addressbook" @@ -201,6 +202,7 @@ type Kad struct { bgBroadcastCtx context.Context bgBroadcastCancel context.CancelFunc reachability p2p.ReachabilityStatus + bootnodeOverlay atomic.Value } // New returns a new Kademlia. @@ -252,6 +254,8 @@ func New( storageRadius: swarm.MaxPO, } + k.bootnodeOverlay.Store(swarm.ZeroAddress) + if k.opt.PruneFunc == nil { k.opt.PruneFunc = k.pruneOversaturatedBins } @@ -265,7 +269,8 @@ func New( if k.opt.ExcludeFunc == nil { k.opt.ExcludeFunc = func(f ...im.ExcludeOp) peerExcludeFunc { return func(peer swarm.Address) bool { - return k.collector.Exclude(peer, f...) + // exclude bootnode addresses as well + return k.collector.Exclude(peer, f...) || k.bootnodeOverlay.Load().(swarm.Address).Equal(peer) } } } @@ -838,6 +843,8 @@ func (k *Kad) connectBootNodes(ctx context.Context) { return false, nil } + k.bootnodeOverlay.Store(bzzAddress.Overlay) + if err := k.onConnected(ctx, bzzAddress.Overlay); err != nil { return false, err } @@ -1316,7 +1323,7 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, filter topology. func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) error { filters := excludeOps(filter) return k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { - if len(filters) > 0 && k.opt.ExcludeFunc(filters...)(addr) { + if k.opt.ExcludeFunc(filters...)(addr) { return false, false, nil } return f(addr, po) @@ -1327,7 +1334,7 @@ func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) func (k *Kad) EachConnectedPeerRev(f topology.EachPeerFunc, filter topology.Select) error { filters := excludeOps(filter) return k.connectedPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) { - if len(filters) > 0 && k.opt.ExcludeFunc(filters...)(addr) { + if k.opt.ExcludeFunc(filters...)(addr) { return false, false, nil } return f(addr, po) diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 9a0079220a2..b64f96e0dff 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1200,18 +1200,21 @@ func TestStart(t *testing.T) { t.Parallel() var bootnodes []ma.Multiaddr + var bootnodesOverlays []swarm.Address for i := 0; i < 10; i++ { - multiaddr, err := ma.NewMultiaddr(underlayBase + swarm.RandAddress(t).String()) + overlay := swarm.RandAddress(t) + + multiaddr, err := ma.NewMultiaddr(underlayBase + overlay.String()) if err != nil { t.Fatal(err) } bootnodes = append(bootnodes, multiaddr) + bootnodesOverlays = append(bootnodesOverlays, overlay) } t.Run("non-empty addressbook", func(t *testing.T) { t.Parallel() - t.Skip("test flakes") var conns, failedConns int32 // how many connect calls were made to the p2p mock _, kad, ab, _, signer := newTestKademlia(t, &conns, &failedConns, kademlia.Options{Bootnodes: bootnodes}) @@ -1238,6 +1241,20 @@ func TestStart(t *testing.T) { waitCounter(t, &conns, 3) waitCounter(t, &failedConns, 0) + + err := kad.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { + for _, b := range bootnodesOverlays { + fmt.Println(b, addr) + if b.Equal(addr) { + return false, false, errors.New("did not expect bootnode address from the iterator") + } + } + return false, false, nil + + }, topology.Select{}) + if err != nil { + t.Fatal(err) + } }) t.Run("empty addressbook", func(t *testing.T) { From d0edbe43ebe351816491206d7bfe22188a3ac994 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:59:18 +0300 Subject: [PATCH 2/4] fix: asd --- .../kademlia/internal/metrics/metrics.go | 17 +++++++++ .../kademlia/internal/metrics/metrics_test.go | 36 +++++++++++++++++++ pkg/topology/kademlia/kademlia.go | 24 +++++-------- pkg/topology/kademlia/kademlia_test.go | 1 - 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/pkg/topology/kademlia/internal/metrics/metrics.go b/pkg/topology/kademlia/internal/metrics/metrics.go index 23f8e902671..89fa1b1f516 100644 --- a/pkg/topology/kademlia/internal/metrics/metrics.go +++ b/pkg/topology/kademlia/internal/metrics/metrics.go @@ -33,6 +33,15 @@ const ( // operation whose execution modifies a specific metrics. type RecordOp func(*Counters) +// Bootnode will mark the peer metric as bootnode based on the bool arg. +func IsBootnode(b bool) RecordOp { + return func(cs *Counters) { + cs.Lock() + defer cs.Unlock() + cs.isBootnode = b + } +} + // PeerLogIn will first update the current last seen to the give time t and as // the second it'll set the direction of the session connection to the given // value. The force flag will force the peer re-login if he's already logged in. @@ -155,6 +164,7 @@ type Counters struct { // Bookkeeping. isLoggedIn bool peerAddress swarm.Address + isBootnode bool // Counters. lastSeenTimestamp int64 @@ -308,6 +318,13 @@ func (c *Collector) IsUnreachable(addr swarm.Address) bool { // ExcludeOp is a function type used to filter peers on certain fields. type ExcludeOp func(*Counters) bool +// IsBootnode is used to filter bootnode peers. +func Bootnode() ExcludeOp { + return func(cs *Counters) bool { + return cs.isBootnode + } +} + // Reachable is used to filter reachable or unreachable peers based on r. func Reachability(filterReachable bool) ExcludeOp { return func(cs *Counters) bool { diff --git a/pkg/topology/kademlia/internal/metrics/metrics_test.go b/pkg/topology/kademlia/internal/metrics/metrics_test.go index 5074f02acb6..5c806b1d737 100644 --- a/pkg/topology/kademlia/internal/metrics/metrics_test.go +++ b/pkg/topology/kademlia/internal/metrics/metrics_test.go @@ -193,3 +193,39 @@ func TestPeerMetricsCollector(t *testing.T) { t.Fatalf("unexpected snapshot difference:\n%s", diff) } } + +func TestExclude(t *testing.T) { + t.Parallel() + + db, err := shed.NewDB("", nil) + if err != nil { + t.Fatal(err) + } + testutil.CleanupCloser(t, db) + + mc, err := metrics.NewCollector(db) + if err != nil { + t.Fatal(err) + } + + var addr = swarm.RandAddress(t) + + // record unhealthy, unreachable, bootnode + mc.Record(addr, metrics.PeerHealth(false), metrics.IsBootnode(true), metrics.PeerReachability(p2p.ReachabilityStatusPrivate)) + + if have, want := mc.Exclude(addr), false; have != want { + t.Fatal("should not exclude any") + } + + if have, want := mc.Exclude(addr, metrics.Bootnode()), true; have != want { + t.Fatal("should exclude bootnodes") + } + + if have, want := mc.Exclude(addr, metrics.Reachability(false)), true; have != want { + t.Fatal("should exclude unreachble") + } + + if have, want := mc.Exclude(addr, metrics.Health(false)), true; have != want { + t.Fatal("should exclude unhealthy") + } +} diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 923e382acc6..5f23b4ec2f5 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -14,7 +14,6 @@ import ( "math/rand" "path/filepath" "sync" - "sync/atomic" "time" "github.com/ethersphere/bee/v2/pkg/addressbook" @@ -202,7 +201,6 @@ type Kad struct { bgBroadcastCtx context.Context bgBroadcastCancel context.CancelFunc reachability p2p.ReachabilityStatus - bootnodeOverlay atomic.Value } // New returns a new Kademlia. @@ -254,8 +252,6 @@ func New( storageRadius: swarm.MaxPO, } - k.bootnodeOverlay.Store(swarm.ZeroAddress) - if k.opt.PruneFunc == nil { k.opt.PruneFunc = k.pruneOversaturatedBins } @@ -269,8 +265,7 @@ func New( if k.opt.ExcludeFunc == nil { k.opt.ExcludeFunc = func(f ...im.ExcludeOp) peerExcludeFunc { return func(peer swarm.Address) bool { - // exclude bootnode addresses as well - return k.collector.Exclude(peer, f...) || k.bootnodeOverlay.Load().(swarm.Address).Equal(peer) + return k.collector.Exclude(peer, f...) } } } @@ -843,14 +838,12 @@ func (k *Kad) connectBootNodes(ctx context.Context) { return false, nil } - k.bootnodeOverlay.Store(bzzAddress.Overlay) - if err := k.onConnected(ctx, bzzAddress.Overlay); err != nil { return false, err } k.metrics.TotalOutboundConnections.Inc() - k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound)) + k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound), im.IsBootnode(true)) loggerV1.Debug("connected to bootnode", "bootnode_address", addr) connected++ @@ -1068,8 +1061,7 @@ outer: addrs = append(addrs, connectedPeer) if !fullnode { - // we continue here so we dont gossip - // about lightnodes to others. + // dont gossip about lightnodes to others. continue } // if kademlia is closing, dont enqueue anymore broadcast requests @@ -1321,7 +1313,7 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, filter topology. // EachConnectedPeer implements topology.PeerIterator interface. func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) error { - filters := excludeOps(filter) + filters := excludeFromIterator(filter) return k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { if k.opt.ExcludeFunc(filters...)(addr) { return false, false, nil @@ -1332,7 +1324,7 @@ func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) // EachConnectedPeerRev implements topology.PeerIterator interface. func (k *Kad) EachConnectedPeerRev(f topology.EachPeerFunc, filter topology.Select) error { - filters := excludeOps(filter) + filters := excludeFromIterator(filter) return k.connectedPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) { if k.opt.ExcludeFunc(filters...)(addr) { return false, false, nil @@ -1398,9 +1390,11 @@ func (k *Kad) SubscribeTopologyChange() (c <-chan struct{}, unsubscribe func()) return channel, unsubscribe } -func excludeOps(filter topology.Select) []im.ExcludeOp { +func excludeFromIterator(filter topology.Select) []im.ExcludeOp { + + ops := make([]im.ExcludeOp, 0, 3) - ops := make([]im.ExcludeOp, 0, 2) + ops = append(ops, im.Bootnode()) if filter.Reachable { ops = append(ops, im.Reachability(false)) diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index b64f96e0dff..c999307a54e 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1244,7 +1244,6 @@ func TestStart(t *testing.T) { err := kad.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { for _, b := range bootnodesOverlays { - fmt.Println(b, addr) if b.Equal(addr) { return false, false, errors.New("did not expect bootnode address from the iterator") } From 469cdec7974205bf68c4f53d82b4038c0ab0b297 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Nov 2024 18:23:52 +0300 Subject: [PATCH 3/4] fix: flaky test --- pkg/storer/internal/reserve/reserve_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 65ba6d55822..95218e992f6 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -648,7 +648,9 @@ func TestEvictSOC(t *testing.T) { if err != nil { t.Fatal(err) } - checkChunk(t, ts, chunks[9], false) // chunk should still persist, eg refCnt > 0 + if has, _ := ts.ChunkStore().Has(context.Background(), chunks[0].Address()); !has { + t.Fatal("same address chunk should still persist, eg refCnt > 0") + } evicted, err := r.EvictBatchBin(context.Background(), batch.ID, 10, swarm.MaxBins) if err != nil { From e00e6f960bb1cfffd1f91df1a742155f71352d46 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Nov 2024 19:11:41 +0300 Subject: [PATCH 4/4] fix: unit test --- pkg/topology/kademlia/kademlia_test.go | 27 +++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index c999307a54e..674b2c77d6b 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1215,6 +1215,7 @@ func TestStart(t *testing.T) { t.Run("non-empty addressbook", func(t *testing.T) { t.Parallel() + t.Skip("test flakes") var conns, failedConns int32 // how many connect calls were made to the p2p mock _, kad, ab, _, signer := newTestKademlia(t, &conns, &failedConns, kademlia.Options{Bootnodes: bootnodes}) @@ -1241,19 +1242,6 @@ func TestStart(t *testing.T) { waitCounter(t, &conns, 3) waitCounter(t, &failedConns, 0) - - err := kad.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { - for _, b := range bootnodesOverlays { - if b.Equal(addr) { - return false, false, errors.New("did not expect bootnode address from the iterator") - } - } - return false, false, nil - - }, topology.Select{}) - if err != nil { - t.Fatal(err) - } }) t.Run("empty addressbook", func(t *testing.T) { @@ -1269,6 +1257,19 @@ func TestStart(t *testing.T) { waitCounter(t, &conns, 3) waitCounter(t, &failedConns, 0) + + err := kad.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { + for _, b := range bootnodesOverlays { + if b.Equal(addr) { + return false, false, errors.New("did not expect bootnode address from the iterator") + } + } + return false, false, nil + + }, topology.Select{}) + if err != nil { + t.Fatal(err) + } }) }