diff --git a/CHANGELOG.md b/CHANGELOG.md index 150d73d10..c1c02b94d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The following emojis are used to highlight certain changes: - Eliminate `..` elements that begin a rooted path: that is, replace "`/..`" by "`/`" at the beginning of a path. * 🛠 The signature of `CoreAPI.ResolvePath` in `coreiface` has changed to now return the remainder segments as a second return value, matching the signature of `resolver.ResolveToLastNode`. +* 🛠 `routing/http/client.FindPeers` now returns `iter.ResultIter[types.PeerRecord]` instead of `iter.ResultIter[types.Record]`. The specification indicates that records for this method will always be Peer Records. ### Removed diff --git a/examples/routing/delegated-routing-client/main.go b/examples/routing/delegated-routing-client/main.go index 0b586d4bf..8fac342ac 100644 --- a/examples/routing/delegated-routing-client/main.go +++ b/examples/routing/delegated-routing-client/main.go @@ -69,22 +69,6 @@ func findProviders(w io.Writer, ctx context.Context, client *client.Client, cidS return printIter(w, recordsIter) } -func findPeers(w io.Writer, ctx context.Context, client *client.Client, pidStr string) error { - // Parses the given Peer ID to lookup the information for. - pid, err := peer.Decode(pidStr) - if err != nil { - return err - } - - // Ask for information about the peer with the given peer ID. - recordsIter, err := client.FindPeers(ctx, pid) - if err != nil { - return err - } - defer recordsIter.Close() - return printIter(w, recordsIter) -} - func printIter(w io.Writer, iter iter.ResultIter[types.Record]) error { // The response is streamed. Alternatively, you could use [iter.ReadAll] // to fetch all the results all at once, instead of iterating as they are @@ -118,6 +102,44 @@ func printIter(w io.Writer, iter iter.ResultIter[types.Record]) error { return nil } +func findPeers(w io.Writer, ctx context.Context, client *client.Client, pidStr string) error { + // Parses the given Peer ID to lookup the information for. + pid, err := peer.Decode(pidStr) + if err != nil { + return err + } + + // Ask for information about the peer with the given peer ID. + recordsIter, err := client.FindPeers(ctx, pid) + if err != nil { + return err + } + defer recordsIter.Close() + + // The response is streamed. Alternatively, you could use [iter.ReadAll] + // to fetch all the results all at once, instead of iterating as they are + // streamed. + for recordsIter.Next() { + res := recordsIter.Val() + + // Check for error, but do not complain if we exceeded the timeout. We are + // expecting that to happen: we explicitly defined a timeout. + if res.Err != nil { + if !errors.Is(res.Err, context.DeadlineExceeded) { + return res.Err + } + + return nil + } + + fmt.Fprintln(w, res.Val.ID) + fmt.Fprintln(w, "\tProtocols:", res.Val.Protocols) + fmt.Fprintln(w, "\tAddresses:", res.Val.Addrs) + } + + return nil +} + func findIPNS(w io.Writer, ctx context.Context, client *client.Client, nameStr string) error { // Parses the given name string to get a record for. name, err := ipns.NameFromString(nameStr) diff --git a/routing/http/client/client.go b/routing/http/client/client.go index 39dd698cb..cda9097fd 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -160,6 +160,8 @@ func (c *measuringIter[T]) Close() error { return c.Iter.Close() } +// FindProviders searches for providers that are able to provide the given [cid.Cid]. +// In a more generic way, it is also used as a mapping between CIDs and relevant metadata. func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter.ResultIter[types.Record], err error) { // TODO test measurements m := newMeasurement("FindProviders") @@ -332,7 +334,8 @@ func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri return 0, nil } -func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) { +// FindPeers searches for information for the given [peer.ID]. +func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) { m := newMeasurement("FindPeers") url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String() @@ -359,7 +362,7 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI if resp.StatusCode == http.StatusNotFound { resp.Body.Close() m.record(ctx) - return iter.FromSlice[iter.Result[types.Record]](nil), nil + return iter.FromSlice[iter.Result[*types.PeerRecord]](nil), nil } if resp.StatusCode != http.StatusOK { @@ -387,24 +390,27 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI } }() - var it iter.ResultIter[types.Record] + var it iter.ResultIter[*types.PeerRecord] switch mediaType { case mediaTypeJSON: parsedResp := &jsontypes.PeersResponse{} err = json.NewDecoder(resp.Body).Decode(parsedResp) - var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Peers) + var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers) it = iter.ToResultIter(sliceIt) case mediaTypeNDJSON: skipBodyClose = true - it = ndjson.NewRecordsIter(resp.Body) + it = ndjson.NewPeerRecordsIter(resp.Body) default: logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType) return nil, errors.New("unknown content type") } - return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil + return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil } +// GetIPNS tries to retrieve the [ipns.Record] for the given [ipns.Name]. The record is +// validated against the given name. If validation fails, an error is returned, but no +// record. func (c *Client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { url := c.baseURL + "/routing/v1/ipns/" + name.String() @@ -443,6 +449,7 @@ func (c *Client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, err return record, nil } +// PutIPNS attempts at putting the given [ipns.Record] for the given [ipns.Name]. func (c *Client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { url := c.baseURL + "/routing/v1/ipns/" + name.String() diff --git a/routing/http/client/client_test.go b/routing/http/client/client_test.go index b6aa8456b..7edd77c10 100644 --- a/routing/http/client/client_test.go +++ b/routing/http/client/client_test.go @@ -42,9 +42,9 @@ func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *server.Bits return args.Get(0).(time.Duration), args.Error(1) } -func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) { +func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { args := m.Called(ctx, pid, limit) - return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) + return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { @@ -486,7 +486,7 @@ func TestClient_Provide(t *testing.T) { func TestClient_FindPeers(t *testing.T) { peerRecord := makePeerRecord() - peerRecords := []iter.Result[types.Record]{ + peerRecords := []iter.Result[*types.PeerRecord]{ {Val: &peerRecord}, } pid := *peerRecord.ID @@ -495,13 +495,13 @@ func TestClient_FindPeers(t *testing.T) { name string httpStatusCode int stopServer bool - routerResult []iter.Result[types.Record] + routerResult []iter.Result[*types.PeerRecord] routerErr error clientRequiresStreaming bool serverStreamingDisabled bool expErrContains osErrContains - expResult []iter.Result[types.Record] + expResult []iter.Result[*types.PeerRecord] expStreamingResponse bool expJSONResponse bool }{ @@ -606,7 +606,7 @@ func TestClient_FindPeers(t *testing.T) { resultIter, err := client.FindPeers(ctx, pid) c.expErrContains.errContains(t, err) - results := iter.ReadAll[iter.Result[types.Record]](resultIter) + results := iter.ReadAll[iter.Result[*types.PeerRecord]](resultIter) assert.Equal(t, c.expResult, results) }) } diff --git a/routing/http/contentrouter/contentrouter.go b/routing/http/contentrouter/contentrouter.go index 2438d4fea..9115ef154 100644 --- a/routing/http/contentrouter/contentrouter.go +++ b/routing/http/contentrouter/contentrouter.go @@ -26,7 +26,7 @@ const ttl = 24 * time.Hour type Client interface { FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) - FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) + FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error } @@ -196,28 +196,15 @@ func (c *contentRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInf logger.Warnw("error iterating provider responses: %s", res.Err) continue } - v := res.Val - if v.GetSchema() == types.SchemaPeer { - result, ok := v.(*types.PeerRecord) - if !ok { - logger.Errorw( - "problem casting find providers result", - "Schema", v.GetSchema(), - "Type", reflect.TypeOf(v).String(), - ) - continue - } - - var addrs []multiaddr.Multiaddr - for _, a := range result.Addrs { - addrs = append(addrs, a.Multiaddr) - } - - return peer.AddrInfo{ - ID: *result.ID, - Addrs: addrs, - }, nil + var addrs []multiaddr.Multiaddr + for _, a := range res.Val.Addrs { + addrs = append(addrs, a.Multiaddr) } + + return peer.AddrInfo{ + ID: *res.Val.ID, + Addrs: addrs, + }, nil } return peer.AddrInfo{}, err diff --git a/routing/http/contentrouter/contentrouter_test.go b/routing/http/contentrouter/contentrouter_test.go index 2147dc975..1c47850b9 100644 --- a/routing/http/contentrouter/contentrouter_test.go +++ b/routing/http/contentrouter/contentrouter_test.go @@ -32,9 +32,9 @@ func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.Resul return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } -func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[types.Record], error) { +func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[*types.PeerRecord], error) { args := m.Called(ctx, pid) - return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) + return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } func (m *mockClient) Ready(ctx context.Context) (bool, error) { @@ -183,17 +183,14 @@ func TestFindPeer(t *testing.T) { crc := NewContentRoutingClient(client) p1 := peer.ID("peer1") - ais := []types.Record{ - &types.UnknownRecord{ - Schema: "unknown", - }, - &types.PeerRecord{ + ais := []*types.PeerRecord{ + { Schema: types.SchemaPeer, ID: &p1, Protocols: []string{"transport-bitswap"}, }, } - aisIter := iter.ToResultIter[types.Record](iter.FromSlice(ais)) + aisIter := iter.ToResultIter[*types.PeerRecord](iter.FromSlice(ais)) client.On("FindPeers", ctx, p1).Return(aisIter, nil) diff --git a/routing/http/server/server.go b/routing/http/server/server.go index 9e7d81a04..d9be47eb2 100644 --- a/routing/http/server/server.go +++ b/routing/http/server/server.go @@ -63,7 +63,7 @@ type ContentRouter interface { // FindPeers searches for peers who have the provided [peer.ID]. // Limit indicates the maximum amount of results to return; 0 means unbounded. - FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) + FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) // GetIPNS searches for an [ipns.Record] for the given [ipns.Name]. GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) @@ -267,7 +267,7 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { } var ( - handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) + handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord]) recordsLimit int ) @@ -347,7 +347,7 @@ func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) { writeJSONResult(w, "Provide", resp) } -func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) { +func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) { defer peersIter.Close() peers, err := iter.ReadAllResults(peersIter) @@ -361,7 +361,7 @@ func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[ }) } -func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) { +func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) { writeResultsIterNDJSON(w, peersIter) } @@ -491,7 +491,7 @@ func logErr(method, msg string, err error) { logger.Infow(msg, "Method", method, "Error", err) } -func writeResultsIterNDJSON(w http.ResponseWriter, resultIter iter.ResultIter[types.Record]) { +func writeResultsIterNDJSON[T any](w http.ResponseWriter, resultIter iter.ResultIter[T]) { defer resultIter.Close() w.Header().Set("Content-Type", mediaTypeNDJSON) diff --git a/routing/http/server/server_test.go b/routing/http/server/server_test.go index c2c752057..f823ac25a 100644 --- a/routing/http/server/server_test.go +++ b/routing/http/server/server_test.go @@ -168,7 +168,7 @@ func TestPeers(t *testing.T) { t.Parallel() _, pid := makePeerID(t) - results := iter.FromSlice([]iter.Result[types.Record]{ + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ {Val: &types.PeerRecord{ Schema: types.SchemaPeer, ID: &pid, @@ -203,7 +203,7 @@ func TestPeers(t *testing.T) { t.Parallel() _, pid := makePeerID(t) - results := iter.FromSlice([]iter.Result[types.Record]{ + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ {Val: &types.PeerRecord{ Schema: types.SchemaPeer, ID: &pid, @@ -374,9 +374,9 @@ func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *BitswapWrit return args.Get(0).(time.Duration), args.Error(1) } -func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) { +func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { args := m.Called(ctx, pid, limit) - return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) + return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { diff --git a/routing/http/types/json/responses.go b/routing/http/types/json/responses.go index dfcfad830..cc687df48 100644 --- a/routing/http/types/json/responses.go +++ b/routing/http/types/json/responses.go @@ -13,7 +13,7 @@ type ProvidersResponse struct { // PeersResponse is the result of a GET Peers request. type PeersResponse struct { - Peers RecordsArray + Peers []*types.PeerRecord } // RecordsArray is an array of [types.Record] diff --git a/routing/http/types/ndjson/records.go b/routing/http/types/ndjson/records.go index d1a36b411..819cd521d 100644 --- a/routing/http/types/ndjson/records.go +++ b/routing/http/types/ndjson/records.go @@ -44,3 +44,30 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] { return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn) } + +// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given [io.Reader]. +// Records with a different schema are safely ignored. If you want to read all records, use +// [NewRecordsIter] instead. +func NewPeerRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.PeerRecord]] { + jsonIter := iter.FromReaderJSON[types.UnknownRecord](r) + mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[*types.PeerRecord] { + var result iter.Result[*types.PeerRecord] + if upr.Err != nil { + result.Err = upr.Err + return result + } + switch upr.Val.Schema { + case types.SchemaPeer: + var prov types.PeerRecord + err := json.Unmarshal(upr.Val.Bytes, &prov) + if err != nil { + result.Err = err + return result + } + result.Val = &prov + } + return result + } + + return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn) +}