Skip to content

Commit

Permalink
feat(session): do not record erroneous session want sends (#452)
Browse files Browse the repository at this point in the history
* feat(session): do not record erroneous session want sends

Co-authored-by: gammazero <[email protected]>
  • Loading branch information
hannahhoward and gammazero authored Dec 4, 2024
1 parent 5965637 commit 86120e2
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- Do not erroneously update the state of sent wants when a send a peer disconnected and the send did not happen. [#452](https://github.com/ipfs/boxo/pull/452)

### Security

## [v0.24.3]
Expand Down
8 changes: 5 additions & 3 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C

// SendWants sends the given want-blocks and want-haves to the given peer.
// It filters out wants that have previously been sent to the peer.
func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

if _, ok := pm.peerQueues[p]; ok {
pm.pwm.sendWants(p, wantBlocks, wantHaves)
if _, ok := pm.peerQueues[p]; !ok {
return false
}
pm.pwm.sendWants(p, wantBlocks, wantHaves)
return true
}

// SendCancels sends cancels for the given keys to all peers who had previously
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type PeerManager interface {
// interested in a peer's connection state
UnregisterSession(uint64)
// SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
Expand Down
8 changes: 5 additions & 3 deletions bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@ func newFakePeerManager() *fakePeerManager {
}
}

func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (pm *fakePeerManager) UnregisterSession(uint64) {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (pm *fakePeerManager) UnregisterSession(uint64) {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool {
return true
}
func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) {
select {
case pm.wantReqs <- wantReq{cids}:
Expand Down
21 changes: 16 additions & 5 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) {
type wantSets struct {
wantBlocks *cid.Set
wantHaves *cid.Set
sent bool
}

type allWants map[peer.ID]*wantSets
Expand Down Expand Up @@ -551,9 +552,6 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) {
continue
}

// Record that we are sending a want-block for this want to the peer
sws.setWantSentTo(c, wi.bestPeer)

// Send a want-block to the chosen peer
toSend.forPeer(wi.bestPeer).wantBlocks.Add(c)

Expand All @@ -567,6 +565,16 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) {

// Send any wants we've collected
sws.sendWants(toSend)

for c, wi := range sws.wants {
if wi.bestPeer != "" && wi.sentTo == "" {
// check if a want block was successfully sent to the best peer
if toSend.forPeer(wi.bestPeer).sent {
// Record that we are sending a want-block for this want to the peer
sws.setWantSentTo(c, wi.bestPeer)
}
}
}
}

// sendWants sends want-have and want-blocks to the appropriate peers
Expand All @@ -584,8 +592,11 @@ func (sws *sessionWantSender) sendWants(sends allWants) {
// precedence over want-haves.
wblks := snd.wantBlocks.Keys()
whaves := snd.wantHaves.Keys()
sws.pm.SendWants(sws.ctx, p, wblks, whaves)

snd.sent = sws.pm.SendWants(sws.ctx, p, wblks, whaves)
if !snd.sent {
// Do not update state if the wants not sent.
continue
}
// Inform the session that we've sent the wants
sws.onSend(p, wblks, whaves)

Expand Down
3 changes: 2 additions & 1 deletion bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.lk.Lock()
defer pm.lk.Unlock()

Expand All @@ -92,6 +92,7 @@ func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks
pm.peerSends[p] = sw
}
sw.add(wantBlocks, wantHaves)
return true
}

func (pm *mockPeerManager) waitNextWants() map[peer.ID]*sentWants {
Expand Down
8 changes: 4 additions & 4 deletions bitswap/client/internal/sessionmanager/sessionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ type fakePeerManager struct {
cancels []cid.Cid
}

func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true }
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
fpm.lk.Lock()
defer fpm.lk.Unlock()
Expand Down

0 comments on commit 86120e2

Please sign in to comment.