Skip to content

Commit

Permalink
Add a timeout to service discovery [#3440]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 9, 2023
1 parent 8113e22 commit bc612e1
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 10 deletions.
12 changes: 12 additions & 0 deletions pkg/api/v3/message/types_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3169,6 +3169,7 @@ func (v *FindServiceRequest) MarshalJSON() ([]byte, error) {
Network string `json:"network,omitempty"`
Service *api.ServiceAddress `json:"service,omitempty"`
Known bool `json:"known,omitempty"`
Timeout interface{} `json:"timeout,omitempty"`
}{}
u.Type = v.Type()
if !(len(v.FindServiceOptions.Network) == 0) {
Expand All @@ -3183,6 +3184,10 @@ func (v *FindServiceRequest) MarshalJSON() ([]byte, error) {

u.Known = v.FindServiceOptions.Known
}
if !(v.FindServiceOptions.Timeout == 0) {

u.Timeout = encoding.DurationToJSON(v.FindServiceOptions.Timeout)
}
return json.Marshal(&u)
}

Expand Down Expand Up @@ -3566,11 +3571,13 @@ func (v *FindServiceRequest) UnmarshalJSON(data []byte) error {
Network string `json:"network,omitempty"`
Service *api.ServiceAddress `json:"service,omitempty"`
Known bool `json:"known,omitempty"`
Timeout interface{} `json:"timeout,omitempty"`
}{}
u.Type = v.Type()
u.Network = v.FindServiceOptions.Network
u.Service = v.FindServiceOptions.Service
u.Known = v.FindServiceOptions.Known
u.Timeout = encoding.DurationToJSON(v.FindServiceOptions.Timeout)
if err := json.Unmarshal(data, &u); err != nil {
return err
}
Expand All @@ -3580,6 +3587,11 @@ func (v *FindServiceRequest) UnmarshalJSON(data []byte) error {
v.FindServiceOptions.Network = u.Network
v.FindServiceOptions.Service = u.Service
v.FindServiceOptions.Known = u.Known
if x, err := encoding.DurationFromJSON(u.Timeout); err != nil {
return fmt.Errorf("error decoding Timeout: %w", err)
} else {
v.FindServiceOptions.Timeout = x
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/api/v3/options.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ FindServiceOptions:
description: restricts the results to known peers
type: boolean
optional: true
- name: Timeout
description: is the time to wait before stopping, when querying the DHT
type: duration
optional: true

ConsensusStatusOptions:
fields:
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v3/p2p/dial/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type DiscoveryRequest struct {
Network string
Service *api.ServiceAddress
Limit int
Timeout time.Duration
}

type DiscoveryResponse interface {
Expand Down Expand Up @@ -120,6 +121,7 @@ func (d *dialer) newNetworkStream(ctx context.Context, service *api.ServiceAddre
Network: netName,
Service: service,
Limit: 10,
Timeout: 1 * time.Second,
})
if err != nil {
return nil, errors.UnknownError.Wrap(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v3/p2p/dial_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (d *discoverer) Discover(ctx context.Context, req *dial.DiscoveryRequest) (
}), nil
}

ch, err := (*Node)(d).peermgr.getPeers(ctx, addr, req.Limit)
ch, err := (*Node)(d).peermgr.getPeers(ctx, addr, req.Limit, req.Timeout)
return dial.DiscoveredPeers(ch), err
}

Expand Down
27 changes: 24 additions & 3 deletions pkg/api/v3/p2p/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,29 @@ func newPeerManager(ctx context.Context, host host.Host, getServices func() []*s
}

// getPeers queries the DHT for peers that provide the given service.
func (m *peerManager) getPeers(ctx context.Context, ma multiaddr.Multiaddr, limit int) (<-chan peer.AddrInfo, error) {
return m.routing.FindPeers(ctx, ma.String(), discovery.Limit(limit))
func (m *peerManager) getPeers(ctx context.Context, ma multiaddr.Multiaddr, limit int, timeout time.Duration) (<-chan peer.AddrInfo, error) {
ch, err := m.routing.FindPeers(ctx, ma.String(), discovery.Limit(limit))
if err != nil || timeout == 0 {
return ch, err
}

ch2 := make(chan peer.AddrInfo)
stop := time.After(timeout)
go func() {
defer close(ch2)
for {
select {
case <-stop:
return
case v, ok := <-ch:
if !ok {
return
}
ch2 <- v
}
}
}()
return ch2, nil
}

// advertizeNewService advertizes new whoami info to everyone.
Expand Down Expand Up @@ -138,7 +159,7 @@ func (m *peerManager) waitFor(ctx context.Context, addr multiaddr.Multiaddr) err
wait := <-m.wait

// Look for a peer
ch, err := m.getPeers(ctx, addr, 1)
ch, err := m.getPeers(ctx, addr, 1, 0)
if err != nil {
return err
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/api/v3/p2p/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package p2p
import (
"context"
"runtime/debug"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
Expand Down Expand Up @@ -115,11 +116,11 @@ func (n *nodeService) FindService(ctx context.Context, opts api.FindServiceOptio
results = n.getKnownPeers(ctx, addr)

} else {
// Discover peers
// Discover peers using the DHT
var err error
results, err = n.discoverPeers(ctx, addr)
results, err = n.discoverPeers(ctx, addr, opts.Timeout)
if err != nil {
return nil, errors.UnknownError.Wrap(err)
return nil, err
}
}

Expand Down Expand Up @@ -148,8 +149,12 @@ func (n *nodeService) getKnownPeers(ctx context.Context, addr multiaddr.Multiadd
return results
}

func (n *nodeService) discoverPeers(ctx context.Context, addr multiaddr.Multiaddr) ([]*api.FindServiceResult, error) {
ch, err := n.peermgr.getPeers(ctx, addr, 100)
func (n *nodeService) discoverPeers(ctx context.Context, addr multiaddr.Multiaddr, timeout time.Duration) ([]*api.FindServiceResult, error) {
if timeout == 0 {
timeout = 2 * time.Second
}

ch, err := n.peermgr.getPeers(ctx, addr, 100, timeout)
if err != nil {
return nil, err
}
Expand Down
62 changes: 61 additions & 1 deletion pkg/api/v3/types_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ type FindServiceOptions struct {
Network string `json:"network,omitempty" form:"network" query:"network" validate:"required"`
Service *ServiceAddress `json:"service,omitempty" form:"service" query:"service" validate:"required"`
// Known restricts the results to known peers.
Known bool `json:"known,omitempty" form:"known" query:"known"`
Known bool `json:"known,omitempty" form:"known" query:"known"`
// Timeout is the time to wait before stopping, when querying the DHT.
Timeout time.Duration `json:"timeout,omitempty" form:"timeout" query:"timeout"`
extraData []byte
}

Expand Down Expand Up @@ -851,6 +853,7 @@ func (v *FindServiceOptions) Copy() *FindServiceOptions {
u.Service = (v.Service).Copy()
}
u.Known = v.Known
u.Timeout = v.Timeout
if len(v.extraData) > 0 {
u.extraData = make([]byte, len(v.extraData))
copy(u.extraData, v.extraData)
Expand Down Expand Up @@ -1837,6 +1840,9 @@ func (v *FindServiceOptions) Equal(u *FindServiceOptions) bool {
if !(v.Known == u.Known) {
return false
}
if !(v.Timeout == u.Timeout) {
return false
}

return true
}
Expand Down Expand Up @@ -3468,6 +3474,7 @@ var fieldNames_FindServiceOptions = []string{
1: "Network",
2: "Service",
3: "Known",
4: "Timeout",
}

func (v *FindServiceOptions) MarshalBinary() ([]byte, error) {
Expand All @@ -3487,6 +3494,9 @@ func (v *FindServiceOptions) MarshalBinary() ([]byte, error) {
if !(!v.Known) {
writer.WriteBool(3, v.Known)
}
if !(v.Timeout == 0) {
writer.WriteDuration(4, v.Timeout)
}

_, _, err := writer.Reset(fieldNames_FindServiceOptions)
if err != nil {
Expand Down Expand Up @@ -5922,6 +5932,9 @@ func (v *FindServiceOptions) UnmarshalBinaryFrom(rd io.Reader) error {
if x, ok := reader.ReadBool(3); ok {
v.Known = x
}
if x, ok := reader.ReadDuration(4); ok {
v.Timeout = x
}

seen, err := reader.Reset(fieldNames_FindServiceOptions)
if err != nil {
Expand Down Expand Up @@ -7276,6 +7289,28 @@ func (v *ErrorRecord) MarshalJSON() ([]byte, error) {
return json.Marshal(&u)
}

func (v *FindServiceOptions) MarshalJSON() ([]byte, error) {
u := struct {
Network string `json:"network,omitempty"`
Service *ServiceAddress `json:"service,omitempty"`
Known bool `json:"known,omitempty"`
Timeout interface{} `json:"timeout,omitempty"`
}{}
if !(len(v.Network) == 0) {
u.Network = v.Network
}
if !(v.Service == nil) {
u.Service = v.Service
}
if !(!v.Known) {
u.Known = v.Known
}
if !(v.Timeout == 0) {
u.Timeout = encoding.DurationToJSON(v.Timeout)
}
return json.Marshal(&u)
}

func (v *FindServiceResult) MarshalJSON() ([]byte, error) {
u := struct {
PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"`
Expand Down Expand Up @@ -8058,6 +8093,31 @@ func (v *ErrorRecord) UnmarshalJSON(data []byte) error {
return nil
}

func (v *FindServiceOptions) UnmarshalJSON(data []byte) error {
u := struct {
Network string `json:"network,omitempty"`
Service *ServiceAddress `json:"service,omitempty"`
Known bool `json:"known,omitempty"`
Timeout interface{} `json:"timeout,omitempty"`
}{}
u.Network = v.Network
u.Service = v.Service
u.Known = v.Known
u.Timeout = encoding.DurationToJSON(v.Timeout)
if err := json.Unmarshal(data, &u); err != nil {
return err
}
v.Network = u.Network
v.Service = u.Service
v.Known = u.Known
if x, err := encoding.DurationFromJSON(u.Timeout); err != nil {
return fmt.Errorf("error decoding Timeout: %w", err)
} else {
v.Timeout = x
}
return nil
}

func (v *FindServiceResult) UnmarshalJSON(data []byte) error {
u := struct {
PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"`
Expand Down

0 comments on commit bc612e1

Please sign in to comment.