Skip to content

Commit

Permalink
backport changes
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 5, 2023
1 parent 8097daf commit 4ac9597
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
24 changes: 21 additions & 3 deletions pkg/api/v3/p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,27 @@ func (n *Node) DialNetwork() message.Dialer {
type discoverer Node

func (d *discoverer) Discover(ctx context.Context, req *dial.DiscoveryRequest) (<-chan peer.AddrInfo, error) {
addr, err := req.Service.MultiaddrFor(req.Network)
if err != nil {
return nil, err
var addr multiaddr.Multiaddr
if req.Network != "" {
c, err := multiaddr.NewComponent(api.N_ACC, req.Network)
if err != nil {
return nil, errors.BadRequest.WithFormat("create network multiaddr: %w", err)
}
addr = c
}
if req.Service != nil {
if req.Service.Type == api.ServiceTypeUnknown {
return nil, errors.BadRequest.With("missing service type")
}
c := req.Service.Multiaddr()
if addr == nil {
addr = c
} else {
addr = addr.Encapsulate(c)
}
}
if addr == nil {
return nil, errors.BadRequest.With("no network or service specified")
}

_, ok := (*Node)(d).getOwnService(req.Network, req.Service)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v3/p2p/dial/.mockery.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: ^(Connector|Discoverer)$
name: ^(Connector|Discoverer|MessageStreamHandler)$
inpackage: true
testonly: true
with-expecter: true
Expand Down
27 changes: 21 additions & 6 deletions pkg/api/v3/p2p/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,28 @@ func newPeerManager(ctx context.Context, host host.Host, getServices func() []*s

// getPeers queries the DHT for peers that provide the given service.
func (m *peerManager) getPeers(ctx context.Context, ma multiaddr.Multiaddr, limit int, timeout time.Duration) (<-chan peer.AddrInfo, error) {
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
ch, err := m.routing.FindPeers(ctx, ma.String(), discovery.Limit(limit))
if err != nil || timeout == 0 {
return ch, err
}

return m.routing.FindPeers(ctx, ma.String(), discovery.Limit(limit))
ch2 := make(chan peer.AddrInfo)
stop := time.After(timeout)
go func() {
defer close(ch2)
for {
select {
case <-stop:
return
case v, ok := <-ch:
if !ok {
return
}
ch2 <- v
}
}
}()
return ch2, nil
}

// advertizeNewService advertizes new whoami info to everyone.
Expand Down Expand Up @@ -144,7 +159,7 @@ func (m *peerManager) waitFor(ctx context.Context, addr multiaddr.Multiaddr) err
wait := <-m.wait

// Look for a peer
ch, err := m.routing.FindPeers(ctx, addr.String(), discovery.Limit(1))
ch, err := m.getPeers(ctx, addr, 1, 0)
if err != nil {
return err
}
Expand Down

0 comments on commit 4ac9597

Please sign in to comment.