Skip to content

Commit

Permalink
Add a timeout to service discovery [#3440]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 4, 2023
1 parent 93e74f4 commit 034dd38
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 21 deletions.
4 changes: 4 additions & 0 deletions pkg/api/v3/options.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ FindServiceOptions:
description: restricts the results to known peers
type: boolean
optional: true
- name: Timeout
description: is the time to wait before stopping, when querying the DHT
type: duration
optional: true

ConsensusStatusOptions:
fields:
Expand Down
77 changes: 57 additions & 20 deletions pkg/api/v3/p2p/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package p2p
import (
"context"
"runtime/debug"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
Expand Down Expand Up @@ -109,35 +110,71 @@ func (n *nodeService) FindService(ctx context.Context, opts api.FindServiceOptio
return nil, errors.BadRequest.With("no network or service specified")
}

var results []*api.FindServiceResult
if opts.Known {
// Find known peers
var results []*api.FindServiceResult
for _, peer := range n.tracker.allGood(ctx, addr) {
results = append(results, &api.FindServiceResult{
PeerID: peer,
Status: api.PeerStatusIsKnownGood,
})
}
for _, peer := range n.tracker.allBad(ctx, addr) {
results = append(results, &api.FindServiceResult{
PeerID: peer,
Status: api.PeerStatusIsKnownBad,
})
results = n.getKnownPeers(ctx, addr)

} else {
// Discover peers using the DHT
var err error
results, err = n.discoverPeers(ctx, addr, opts.Timeout)
if err != nil {
return nil, err
}
return results, nil
}

ch, err := n.peermgr.getPeers(ctx, addr, 100)
if err != nil {
return nil, err
// Return an empty array, not nil, because JSON-RPC handles that better
if results == nil {
return []*api.FindServiceResult{}, nil
}
return results, nil
}

func (n *nodeService) getKnownPeers(ctx context.Context, addr multiaddr.Multiaddr) []*api.FindServiceResult {
// Find known peers
var results []*api.FindServiceResult
for peer := range ch {
for _, peer := range n.tracker.allGood(ctx, addr) {
results = append(results, &api.FindServiceResult{
PeerID: peer.ID,
Status: n.tracker.status(ctx, peer.ID, addr),
PeerID: peer,
Status: api.PeerStatusIsKnownGood,
})
}
return results, nil
for _, peer := range n.tracker.allBad(ctx, addr) {
results = append(results, &api.FindServiceResult{
PeerID: peer,
Status: api.PeerStatusIsKnownBad,
})
}
return results
}

func (n *nodeService) discoverPeers(ctx context.Context, addr multiaddr.Multiaddr, timeout time.Duration) ([]*api.FindServiceResult, error) {
if timeout == 0 {
timeout = 2 * time.Second
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

ch, err := n.peermgr.getPeers(ctx, addr, 100)
if err != nil {
return nil, err
}

results := []*api.FindServiceResult{}
for {
select {
case <-ctx.Done():
return results, nil
case peer, ok := <-ch:
if !ok {
return results, nil
}
results = append(results, &api.FindServiceResult{
PeerID: peer.ID,
Status: n.tracker.status(ctx, peer.ID, addr),
})
}
}
}
62 changes: 61 additions & 1 deletion pkg/api/v3/types_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ type FindServiceOptions struct {
Network string `json:"network,omitempty" form:"network" query:"network" validate:"required"`
Service *ServiceAddress `json:"service,omitempty" form:"service" query:"service" validate:"required"`
// Known restricts the results to known peers.
Known bool `json:"known,omitempty" form:"known" query:"known"`
Known bool `json:"known,omitempty" form:"known" query:"known"`
// Timeout is the time to wait before stopping, when querying the DHT.
Timeout time.Duration `json:"timeout,omitempty" form:"timeout" query:"timeout"`
extraData []byte
}

Expand Down Expand Up @@ -851,6 +853,7 @@ func (v *FindServiceOptions) Copy() *FindServiceOptions {
u.Service = (v.Service).Copy()
}
u.Known = v.Known
u.Timeout = v.Timeout
if len(v.extraData) > 0 {
u.extraData = make([]byte, len(v.extraData))
copy(u.extraData, v.extraData)
Expand Down Expand Up @@ -1837,6 +1840,9 @@ func (v *FindServiceOptions) Equal(u *FindServiceOptions) bool {
if !(v.Known == u.Known) {
return false
}
if !(v.Timeout == u.Timeout) {
return false
}

return true
}
Expand Down Expand Up @@ -3468,6 +3474,7 @@ var fieldNames_FindServiceOptions = []string{
1: "Network",
2: "Service",
3: "Known",
4: "Timeout",
}

func (v *FindServiceOptions) MarshalBinary() ([]byte, error) {
Expand All @@ -3487,6 +3494,9 @@ func (v *FindServiceOptions) MarshalBinary() ([]byte, error) {
if !(!v.Known) {
writer.WriteBool(3, v.Known)
}
if !(v.Timeout == 0) {
writer.WriteDuration(4, v.Timeout)
}

_, _, err := writer.Reset(fieldNames_FindServiceOptions)
if err != nil {
Expand Down Expand Up @@ -5922,6 +5932,9 @@ func (v *FindServiceOptions) UnmarshalBinaryFrom(rd io.Reader) error {
if x, ok := reader.ReadBool(3); ok {
v.Known = x
}
if x, ok := reader.ReadDuration(4); ok {
v.Timeout = x
}

seen, err := reader.Reset(fieldNames_FindServiceOptions)
if err != nil {
Expand Down Expand Up @@ -7276,6 +7289,28 @@ func (v *ErrorRecord) MarshalJSON() ([]byte, error) {
return json.Marshal(&u)
}

func (v *FindServiceOptions) MarshalJSON() ([]byte, error) {
u := struct {
Network string `json:"network,omitempty"`
Service *ServiceAddress `json:"service,omitempty"`
Known bool `json:"known,omitempty"`
Timeout interface{} `json:"timeout,omitempty"`
}{}
if !(len(v.Network) == 0) {
u.Network = v.Network
}
if !(v.Service == nil) {
u.Service = v.Service
}
if !(!v.Known) {
u.Known = v.Known
}
if !(v.Timeout == 0) {
u.Timeout = encoding.DurationToJSON(v.Timeout)
}
return json.Marshal(&u)
}

func (v *FindServiceResult) MarshalJSON() ([]byte, error) {
u := struct {
PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"`
Expand Down Expand Up @@ -8058,6 +8093,31 @@ func (v *ErrorRecord) UnmarshalJSON(data []byte) error {
return nil
}

func (v *FindServiceOptions) UnmarshalJSON(data []byte) error {
u := struct {
Network string `json:"network,omitempty"`
Service *ServiceAddress `json:"service,omitempty"`
Known bool `json:"known,omitempty"`
Timeout interface{} `json:"timeout,omitempty"`
}{}
u.Network = v.Network
u.Service = v.Service
u.Known = v.Known
u.Timeout = encoding.DurationToJSON(v.Timeout)
if err := json.Unmarshal(data, &u); err != nil {
return err
}
v.Network = u.Network
v.Service = u.Service
v.Known = u.Known
if x, err := encoding.DurationFromJSON(u.Timeout); err != nil {
return fmt.Errorf("error decoding Timeout: %w", err)
} else {
v.Timeout = x
}
return nil
}

func (v *FindServiceResult) UnmarshalJSON(data []byte) error {
u := struct {
PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"`
Expand Down

0 comments on commit 034dd38

Please sign in to comment.