Skip to content

Commit

Permalink
routing/http: implement provide and provide peer
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Jan 26, 2024
1 parent 0552c9c commit 3afb7a9
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 258 deletions.
212 changes: 139 additions & 73 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multibase"
)

var (
Expand All @@ -51,22 +52,20 @@ type Client struct {
clock clock.Clock
accepts string

peerID peer.ID
addrs []types.Multiaddr
identity crypto.PrivKey
identity crypto.PrivKey
peerID peer.ID
addrs []types.Multiaddr
protocols []string

// Called immediately after signing a provide request. It is used
// Called immediately after signing a provide (peer) request. It is used
// for testing, e.g., testing the server with a mangled signature.
//lint:ignore SA1019 // ignore staticcheck
afterSignCallback func(req *types.WriteBitswapRecord)
afterSignCallback func(req *types.AnnouncementRecord)
}

// defaultUserAgent is used as a fallback to inform HTTP server which library
// version sent a request
var defaultUserAgent = moduleVersion()

var _ contentrouter.Client = &Client{}

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
Expand Down Expand Up @@ -102,9 +101,10 @@ func WithUserAgent(ua string) Option {
}
}

func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr) Option {
func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr, protocols []string) Option {
return func(c *Client) {
c.peerID = peerID
c.protocols = protocols
for _, a := range addrs {
c.addrs = append(c.addrs, types.Multiaddr{Multiaddr: a})
}
Expand Down Expand Up @@ -236,102 +236,121 @@ func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter
return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
}

// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]:
//
// [IPIP-378]: https://github.com/ipfs/specs/pull/378
func (c *Client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
if c.identity == nil {
return 0, errors.New("cannot provide Bitswap records without an identity")
}
if c.peerID.Size() == 0 {
return 0, errors.New("cannot provide Bitswap records without a peer ID")
}

ks := make([]types.CID, len(keys))
for i, c := range keys {
ks[i] = types.CID{Cid: c}
func (c *Client) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementRecord], error) {
if err := c.canProvide(); err != nil {
return nil, err
}

now := c.clock.Now()
records := make([]types.Record, len(announcements))

for i, announcement := range announcements {
record := &types.AnnouncementRecord{
Schema: types.SchemaAnnouncement,
Payload: types.AnnouncementPayload{
CID: announcement.CID,
Scope: announcement.Scope,
Timestamp: now,
TTL: announcement.TTL,
ID: &c.peerID,
Addrs: c.addrs,
Protocols: c.protocols,
},
}

req := types.WriteBitswapRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
Payload: types.BitswapPayload{
Keys: ks,
AdvisoryTTL: &types.Duration{Duration: ttl},
Timestamp: &types.Time{Time: now},
ID: &c.peerID,
Addrs: c.addrs,
},
}
err := req.Sign(c.peerID, c.identity)
if err != nil {
return 0, err
}
if len(announcement.Metadata) != 0 {
var err error
record.Payload.Metadata, err = multibase.Encode(multibase.Base64, announcement.Metadata)
if err != nil {
return nil, fmt.Errorf("multibase-encoding metadata: %w", err)
}

Check warning on line 266 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L262-L266

Added lines #L262 - L266 were not covered by tests
}

if c.afterSignCallback != nil {
c.afterSignCallback(&req)
err := record.Sign(c.peerID, c.identity)
if err != nil {
return nil, err
}

Check warning on line 272 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L271-L272

Added lines #L271 - L272 were not covered by tests

if c.afterSignCallback != nil {
c.afterSignCallback(record)
}

records[i] = record
}

advisoryTTL, err := c.provideSignedBitswapRecord(ctx, &req)
if err != nil {
return 0, err
// TODO: trailing slash?
url := c.baseURL + "/routing/v1/providers"
req := jsontypes.AnnounceProvidersRequest{
Providers: records,
}

return advisoryTTL, err
return c.provide(ctx, url, req)
}

// ProvideAsync makes a provide request to a delegated router
//
//lint:ignore SA1019 // ignore staticcheck
func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapRecord) (time.Duration, error) {
//lint:ignore SA1019 // ignore staticcheck
req := jsontypes.WriteProvidersRequest{Providers: []types.Record{bswp}}

url := c.baseURL + "/routing/v1/providers/"

func (c *Client) provide(ctx context.Context, url string, req interface{}) (iter.ResultIter[*types.AnnouncementRecord], error) {
b, err := drjson.MarshalJSONBytes(req)
if err != nil {
return 0, err
return nil, err

Check warning on line 293 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L293

Added line #L293 was not covered by tests
}

httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(b))
if err != nil {
return 0, err
return nil, err

Check warning on line 298 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L298

Added line #L298 was not covered by tests
}

resp, err := c.httpClient.Do(httpReq)
if err != nil {
return 0, fmt.Errorf("making HTTP req to provide a signed record: %w", err)
return nil, fmt.Errorf("making HTTP req to provide a signed peer record: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return 0, httpError(resp.StatusCode, resp.Body)
resp.Body.Close()
return nil, httpError(resp.StatusCode, resp.Body)
}

//lint:ignore SA1019 // ignore staticcheck
var provideResult jsontypes.WriteProvidersResponse
err = json.NewDecoder(resp.Body).Decode(&provideResult)
respContentType := resp.Header.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(respContentType)
if err != nil {
return 0, err
}
if len(provideResult.ProvideResults) != 1 {
return 0, fmt.Errorf("expected 1 result but got %d", len(provideResult.ProvideResults))
resp.Body.Close()
return nil, fmt.Errorf("parsing Content-Type: %w", err)

Check warning on line 315 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L314-L315

Added lines #L314 - L315 were not covered by tests
}

//lint:ignore SA1019 // ignore staticcheck
v, ok := provideResult.ProvideResults[0].(*types.WriteBitswapRecordResponse)
if !ok {
return 0, errors.New("expected AdvisoryTTL field")
}
var skipBodyClose bool
defer func() {
if !skipBodyClose {
resp.Body.Close()
}
}()

if v.AdvisoryTTL != nil {
return v.AdvisoryTTL.Duration, nil
var it iter.ResultIter[*types.AnnouncementRecord]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.AnnouncePeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
if err != nil {
return nil, err
}

Check warning on line 332 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L331-L332

Added lines #L331 - L332 were not covered by tests
var sliceIt iter.Iter[*types.AnnouncementRecord] = iter.FromSlice(parsedResp.ProvideResults)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewAnnouncementRecordsIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")

Check warning on line 340 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L335-L340

Added lines #L335 - L340 were not covered by tests
}

return 0, nil
return it, nil
}

func (c *Client) canProvide() error {
if c.identity == nil {
return errors.New("cannot provide without identity")
}
if c.peerID.Size() == 0 {
return errors.New("cannot provide without peer ID")
}
return nil
}

// FindPeers searches for information for the given [peer.ID].
Expand Down Expand Up @@ -395,6 +414,9 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI
case mediaTypeJSON:
parsedResp := &jsontypes.PeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
if err != nil {
return nil, err
}

Check warning on line 419 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L418-L419

Added lines #L418 - L419 were not covered by tests
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
Expand All @@ -408,6 +430,50 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI
return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
}

// ProvidePeer provides information regarding your own peer, setup with [WithProviderInfo].
func (c *Client) ProvidePeer(ctx context.Context, ttl time.Duration, metadata []byte) (iter.ResultIter[*types.AnnouncementRecord], error) {
if err := c.canProvide(); err != nil {
return nil, err
}

Check warning on line 437 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L434-L437

Added lines #L434 - L437 were not covered by tests

record := &types.AnnouncementRecord{
Schema: types.SchemaAnnouncement,
Payload: types.AnnouncementPayload{
// TODO: CID, Scope not present for /routing/v1/peers, right?
Timestamp: time.Now(),
TTL: ttl,
ID: &c.peerID,
Addrs: c.addrs,
Protocols: c.protocols,
},
}

if len(metadata) != 0 {
var err error
record.Payload.Metadata, err = multibase.Encode(multibase.Base64, metadata)
if err != nil {
return nil, fmt.Errorf("multibase-encoding metadata: %w", err)
}

Check warning on line 456 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L439-L456

Added lines #L439 - L456 were not covered by tests
}

err := record.Sign(c.peerID, c.identity)
if err != nil {
return nil, err
}

Check warning on line 462 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L459-L462

Added lines #L459 - L462 were not covered by tests

if c.afterSignCallback != nil {
c.afterSignCallback(record)
}

Check warning on line 466 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L464-L466

Added lines #L464 - L466 were not covered by tests

// TODO: trailing slash?
url := c.baseURL + "/routing/v1/peers"
req := jsontypes.AnnouncePeersRequest{
Providers: []types.Record{record},
}

return c.provide(ctx, url, req)

Check warning on line 474 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L469-L474

Added lines #L469 - L474 were not covered by tests
}

// GetIPNS tries to retrieve the [ipns.Record] for the given [ipns.Name]. The record is
// validated against the given name. If validation fails, an error is returned, but no
// record.
Expand Down
Loading

0 comments on commit 3afb7a9

Please sign in to comment.