diff --git a/CHANGELOG.md b/CHANGELOG.md index 88d8413..c62833b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ The following emojis are used to highlight certain changes: ### Added +- ✨ Now supports automatic peering with peers that have the same seed via `--seed-peering` (`RAINBOW_SEED_PEERING`). To enable this, you must configure `--seed` (`RAINBOW_SEED`) and `--seed-index` (`RAINBOW_SEED_INDEX`). + ### Changed ### Removed diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 3b88421..72b911e 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -10,6 +10,10 @@ - [`RAINBOW_GC_THRESHOLD`](#rainbow_gc_threshold) - [`RAINBOW_IPNS_MAX_CACHE_TTL`](#rainbow_ipns_max_cache_ttl) - [`RAINBOW_PEERING`](#rainbow_peering) + - [`RAINBOW_SEED`](#rainbow_seed) + - [`RAINBOW_SEED_INDEX`](#rainbow_seed_index) + - [`RAINBOW_SEED_PEERING`](#rainbow_seed_peering) + - [`RAINBOW_SEED_PEERING_MAX_INDEX`](#rainbow_seed_peering_max_index) - [`RAINBOW_PEERING_SHARED_CACHE`](#rainbow_peering_shared_cache) - [Logging](#logging) - [`GOLOG_LOG_LEVEL`](#golog_log_level) @@ -96,26 +100,65 @@ Default: No upper bound, [TTL from IPNS Record](https://specs.ipfs.tech/ipns/ipn A comma-separated list of [multiaddresses](https://docs.libp2p.io/concepts/fundamentals/addressing/) of peers to stay connected to. - -If `RAINBOW_SEED` is set and `/p2p/rainbow-seed/N` value is found here, Rainbow -will replace it with a valid `/p2p/` for a peer ID generated from same seed -and index `N`. +> [!TIP] +> If `RAINBOW_SEED` is set and `/p2p/rainbow-seed/N` value is found here, Rainbow +> will replace it with a valid `/p2p/` for a peer ID generated from same seed +> and index `N`. This is useful when `RAINBOW_SEED_PEERING` can't be used, +> or when peer routing should be skipped and specific address should be used. Default: not set (no peering) +### `RAINBOW_SEED` + +Base58 seed to derive PeerID from. Can be generated with `rainbow gen-seed`. +If set, requires `RAINBOW_SEED_INDEX` to be set as well. + +Default: not set + +### `RAINBOW_SEED_INDEX` + +Index to derivate the PeerID identity from `RAINBOW_SEED`. + +Default: not set + +### `RAINBOW_SEED_PEERING` + +> [!WARNING] +> Experimental feature. + +Automated version of `RAINBOW_PEERING` which does not require providing multiaddrs. + +Instead, it will set up peering with peers that share the same seed (requires `RAINBOW_SEED_INDEX` to be set up). + +> [!NOTE] +> Runs a separate light DHT for peer routing with the main host if DHT routing is disabled. + +Default: `false` (disabled) + +### `RAINBOW_SEED_PEERING_MAX_INDEX` + +Informs the largest index to derive for `RAINBOW_SEED_PEERING`. +If you have more instances than the default, increase it here. + +Default: 100 ### `RAINBOW_PEERING_SHARED_CACHE` -Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING`. +> [!WARNING] +> Experimental feature. + +Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING` +or `RAINBOW_SEED_PEERING`. Once enabled, Rainbow will respond to [Bitswap](https://docs.ipfs.tech/concepts/bitswap/) queries from these safelisted peers, serving locally cached blocks if requested. -The main use case for this feature is scaling and load balancing across a -fleet of rainbow, or other bitswap-capable IPFS services. Cache sharing allows -clustered services to check if any of the other instances has a requested CID. -This saves resources as data cached on other instance can be fetched internally -(e.g. LAN) rather than externally (WAN, p2p). +> [!TIP] +> The main use case for this feature is scaling and load balancing across a +> fleet of rainbow, or other bitswap-capable IPFS services. Cache sharing allows +> clustered services to check if any of the other instances has a requested CID. +> This saves resources as data cached on other instance can be fetched internally +> (e.g. LAN) rather than externally (WAN, p2p). Default: `false` (no cache sharing) diff --git a/go.mod b/go.mod index 1190e97..7f04635 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.25.2 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.7.3 + github.com/libp2p/go-libp2p-testing v0.12.0 github.com/mitchellh/go-server-timing v1.0.1 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.3 diff --git a/keys.go b/keys.go index 31c1d6d..6cbf000 100644 --- a/keys.go +++ b/keys.go @@ -9,6 +9,7 @@ import ( "io" libp2p "github.com/libp2p/go-libp2p/core/crypto" + peer "github.com/libp2p/go-libp2p/core/peer" "github.com/mr-tron/base58" "golang.org/x/crypto/hkdf" ) @@ -25,7 +26,7 @@ func newSeed() (string, error) { return base58.Encode(bs), nil } -// derive derives libp2p keys from a b58-encoded seed. +// deriveKey derives libp2p keys from a b58-encoded seed. func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) { secret, err := base58.Decode(b58secret) if err != nil { @@ -45,6 +46,32 @@ func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) { return libp2p.UnmarshalEd25519PrivateKey(key) } +// derivePeerIDs derives the peer IDs of all the peers with the same seed up to +// maxIndex. Our peer ID (with index 'ourIndex') is not generated. +func derivePeerIDs(seed string, ourIndex int, maxIndex int) ([]peer.ID, error) { + peerIDs := []peer.ID{} + + for i := 0; i <= maxIndex; i++ { + if i == ourIndex { + continue + } + + peerPriv, err := deriveKey(seed, deriveKeyInfo(i)) + if err != nil { + return nil, err + } + + pid, err := peer.IDFromPrivateKey(peerPriv) + if err != nil { + return nil, err + } + + peerIDs = append(peerIDs, pid) + } + + return peerIDs, nil +} + func deriveKeyInfo(index int) []byte { return []byte(fmt.Sprintf("rainbow-%d", index)) } diff --git a/main.go b/main.go index 43318d6..0995c71 100644 --- a/main.go +++ b/main.go @@ -91,6 +91,29 @@ Generate an identity seed and launch a gateway: EnvVars: []string{"RAINBOW_SEED_INDEX"}, Usage: "Index to derivate the peerID (needs --seed)", }, + &cli.BoolFlag{ + Name: "seed-peering", + Value: false, + EnvVars: []string{"RAINBOW_SEED_PEERING"}, + Usage: "Automatic peering with peers with the same seed (requires --seed and --seed-index). Runs a separate light DHT for peer routing with the main host if --dht-routing or --dht-shared-host are disabled", + Action: func(ctx *cli.Context, b bool) error { + if !b { + return nil + } + + if !ctx.IsSet("seed") || !ctx.IsSet("seed-index") { + return errors.New("--seed and --seed-index must be explicitly defined when --seed-peering is enabled") + } + + return nil + }, + }, + &cli.IntFlag{ + Name: "seed-peering-max-index", + Value: 100, + EnvVars: []string{"RAINBOW_SEED_PEERING_MAX_INDEX"}, + Usage: "Largest index to derive automatic peering peer IDs for", + }, &cli.StringSliceFlag{ Name: "gateway-domains", Value: cli.NewStringSlice(), @@ -338,6 +361,10 @@ share the same seed as long as the indexes are different. DenylistSubs: cctx.StringSlice("denylists"), Peering: peeringAddrs, PeeringCache: cctx.Bool("peering-shared-cache"), + Seed: seed, + SeedIndex: index, + SeedPeering: cctx.Bool("seed-peering"), + SeedPeeringMaxIndex: cctx.Int("seed-peering-max-index"), GCInterval: cctx.Duration("gc-interval"), GCThreshold: cctx.Float64("gc-threshold"), } diff --git a/setup.go b/setup.go index 3ac52ea..d54fbea 100644 --- a/setup.go +++ b/setup.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io/fs" - "net/http" "os" "path/filepath" "time" @@ -22,12 +21,9 @@ import ( "github.com/ipfs/boxo/blockstore" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/boxo/gateway" - "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/namesys" "github.com/ipfs/boxo/path/resolver" "github.com/ipfs/boxo/peering" - routingv1client "github.com/ipfs/boxo/routing/http/client" - httpcontentrouter "github.com/ipfs/boxo/routing/http/contentrouter" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" badger4 "github.com/ipfs/go-ds-badger4" @@ -38,19 +34,14 @@ import ( "github.com/ipfs/go-unixfsnode" dagpb "github.com/ipld/go-codec-dagpb" "github.com/libp2p/go-libp2p" - dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p-kad-dht/fullrt" - record "github.com/libp2p/go-libp2p-record" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/connmgr" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/multiformats/go-multiaddr" - "go.opencensus.io/stats/view" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) func init() { @@ -69,23 +60,23 @@ const ( DHTOff DHTRouting = "off" ) +func init() { + // Lets us discover our own public address with a single observation + identify.ActivationThresh = 1 +} + type Node struct { vs routing.ValueStore host host.Host - dataDir string - datastore datastore.Batching - blockstore blockstore.Blockstore - bs *bitswap.Bitswap - bsrv blockservice.BlockService - resolver resolver.Resolver - - ns namesys.NameSystem - - bwc *metrics.BandwidthCounter - + dataDir string + datastore datastore.Batching + blockstore blockstore.Blockstore + bs *bitswap.Bitswap + bsrv blockservice.BlockService + resolver resolver.Resolver + ns namesys.NameSystem denylistSubs []*nopfs.HTTPSubscriber - blocker *nopfs.Blocker } type Config struct { @@ -115,6 +106,11 @@ type Config struct { Peering []peer.AddrInfo PeeringCache bool + Seed string + SeedIndex int + SeedPeering bool + SeedPeeringMaxIndex int + GCInterval time.Duration GCThreshold float64 } @@ -188,182 +184,27 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached ) blkst = blockstore.NewIdStore(blkst) - var router routing.Routing - - // Increase per-host connection pool since we are making lots of concurrent requests. - httpClient := &http.Client{ - Transport: otelhttp.NewTransport( - &routingv1client.ResponseBodyLimitedTransport{ - RoundTripper: &customTransport{ - // Roundtripper with increased defaults than http.Transport such that retrieving - // multiple lookups concurrently is fast. - RoundTripper: &http.Transport{ - MaxIdleConns: 1000, - MaxConnsPerHost: 100, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 90 * time.Second, - DialContext: dnsCache.dialWithCachedDNS, - ForceAttemptHTTP2: true, - }, - }, - LimitBytes: 1 << 20, - }), - } + var ( + cr routing.ContentRouting + pr routing.PeerRouting + vs routing.ValueStore + ) opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - var routingV1Routers []routing.Routing - for _, endpoint := range cfg.RoutingV1Endpoints { - rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)} - if endpoint != cidContactEndpoint { - rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired()) - } - httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...) - if err != nil { - return nil, err - } - routingV1Routers = append(routingV1Routers, httpClient) - } - - var dhtRouter routing.Routing - if cfg.DHTRouting != DHTOff { - var dhtHost host.Host - if cfg.DHTSharedHost { - dhtHost = h - } else { - dhtHost, err = libp2p.New( - libp2p.NoListenAddrs, - libp2p.BandwidthReporter(bwc), - libp2p.DefaultTransports, - libp2p.DefaultMuxers, - libp2p.UserAgent("rainbow/"+buildVersion()), - libp2p.ResourceManager(dhtRcMgr), - ) - if err != nil { - return nil, err - } - } - - standardClient, err := dht.New(ctx, dhtHost, - dht.Datastore(ds), - dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.Mode(dht.ModeClient), - ) - if err != nil { - return nil, err - } - - if cfg.DHTRouting == DHTAccelerated { - fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, - fullrt.DHTOption( - dht.Validator(record.NamespacedValidator{ - "pk": record.PublicKeyValidator{}, - "ipns": ipns.Validator{KeyBook: h.Peerstore()}, - }), - dht.Datastore(ds), - dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.BucketSize(20), - )) - if err != nil { - return nil, err - } - dhtRouter = &bundledDHT{ - standard: standardClient, - fullRT: fullRTClient, - } - } else { - dhtRouter = standardClient - } - } - - // Default router is no routing at all: can be especially useful during tests. - router = &routinghelpers.Null{} - - if len(routingV1Routers) == 0 && dhtRouter != nil { - router = dhtRouter - } else { - var routers []*routinghelpers.ParallelRouter - - if dhtRouter != nil { - routers = append(routers, &routinghelpers.ParallelRouter{ - Router: dhtRouter, - ExecuteAfter: 0, - DoNotWaitForSearchValue: true, - IgnoreError: false, - }) - } - - for _, routingV1Router := range routingV1Routers { - routers = append(routers, &routinghelpers.ParallelRouter{ - Timeout: 15 * time.Second, - Router: routingV1Router, - ExecuteAfter: 0, - DoNotWaitForSearchValue: true, - IgnoreError: true, - }) - } - - if len(routers) > 0 { - router = routinghelpers.NewComposableParallel(routers) - } - } - - return router, nil + cr, pr, vs, err = setupRouting(ctx, cfg, h, ds, dhtRcMgr, bwc, dnsCache) + return pr, err })) h, err := libp2p.New(opts...) if err != nil { return nil, err } - if len(cfg.Peering) > 0 { - ps := peering.NewPeeringService(h) - if err := ps.Start(); err != nil { - return nil, err - } - for _, a := range cfg.Peering { - ps.AddPeer(a) - } - } - - var ( - provideEnabled bool - peerBlockRequestFilter bsserver.PeerBlockRequestFilter - ) - if cfg.PeeringCache && len(cfg.Peering) > 0 { - peers := make(map[peer.ID]struct{}, len(cfg.Peering)) - for _, a := range cfg.Peering { - peers[a.ID] = struct{}{} - } - - provideEnabled = true - peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { - _, ok := peers[p] - return ok - } - } else { - provideEnabled = false - peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { - return false - } + err = setupPeering(cfg, h) + if err != nil { + return nil, err } - bsctx := metri.CtxScope(ctx, "ipfs_bitswap") - bn := bsnet.NewFromIpfsHost(h, router) - bswap := bitswap.New(bsctx, bn, blkst, - // --- Client Options - // default is 1 minute to search for a random live-want (1 - // CID). I think we want to search for random live-wants more - // often although probably it overlaps with general - // rebroadcasts. - bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)), - // ProviderSearchDelay: default is 1 second. - bitswap.ProviderSearchDelay(time.Second), - bitswap.WithoutDuplicatedBlockStats(), - - // ---- Server Options - bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter), - bitswap.ProvideEnabled(provideEnabled), - ) - bn.Start(bswap) + bswap := setupBitswap(ctx, cfg, h, cr, blkst) err = os.Mkdir(filepath.Join(cfg.DataDir, "denylists"), 0755) if err != nil && !errors.Is(err, fs.ErrExist) { @@ -407,7 +248,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached if cfg.IpnsMaxCacheTTL > 0 { nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL)) } - ns, err := namesys.NewNameSystem(router, nsOptions...) + ns, err := namesys.NewNameSystem(vs, nsOptions...) if err != nil { return nil, err } @@ -426,11 +267,9 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached datastore: ds, bs: bswap, ns: ns, - vs: router, + vs: vs, bsrv: bsrv, resolver: r, - bwc: bwc, - blocker: blocker, denylistSubs: denylists, }, nil } @@ -491,75 +330,6 @@ func setupDatastore(cfg Config) (datastore.Batching, error) { } } -type bundledDHT struct { - standard *dht.IpfsDHT - fullRT *fullrt.FullRT -} - -func (b *bundledDHT) getDHT() routing.Routing { - if b.fullRT.Ready() { - return b.fullRT - } - return b.standard -} - -func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { - return b.getDHT().Provide(ctx, c, brdcst) -} - -func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo { - return b.getDHT().FindProvidersAsync(ctx, c, i) -} - -func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { - return b.getDHT().FindPeer(ctx, id) -} - -func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error { - return b.getDHT().PutValue(ctx, k, v, option...) -} - -func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) { - return b.getDHT().GetValue(ctx, s, option...) -} - -func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) { - return b.getDHT().SearchValue(ctx, s, option...) -} - -func (b *bundledDHT) Bootstrap(ctx context.Context) error { - return b.standard.Bootstrap(ctx) -} - -var _ routing.Routing = (*bundledDHT)(nil) - -func delegatedHTTPContentRouter(endpoint string, rv1Opts ...routingv1client.Option) (routing.Routing, error) { - cli, err := routingv1client.New( - endpoint, - append([]routingv1client.Option{ - routingv1client.WithUserAgent(buildVersion()), - }, rv1Opts...)..., - ) - if err != nil { - return nil, err - } - - cr := httpcontentrouter.NewContentRoutingClient( - cli, - ) - - err = view.Register(routingv1client.OpenCensusViews...) - if err != nil { - return nil, fmt.Errorf("registering HTTP delegated routing views: %w", err) - } - - return &routinghelpers.Compose{ - ValueStore: cr, - PeerRouting: cr, - ContentRouting: cr, - }, nil -} - func loadOrInitPeerKey(kf string) (crypto.PrivKey, error) { data, err := os.ReadFile(kf) if err != nil { @@ -585,3 +355,87 @@ func loadOrInitPeerKey(kf string) (crypto.PrivKey, error) { } return crypto.UnmarshalPrivateKey(data) } + +func setupPeering(cfg Config, h host.Host) error { + if len(cfg.Peering) == 0 && !cfg.SeedPeering { + return nil + } + + ps := peering.NewPeeringService(h) + if err := ps.Start(); err != nil { + return err + } + for _, a := range cfg.Peering { + ps.AddPeer(a) + } + + if !cfg.SeedPeering { + return nil + } + + if cfg.SeedIndex < 0 { + return fmt.Errorf("seed index must be equal or greater than 0, it is %d", cfg.SeedIndex) + } + + if cfg.SeedPeeringMaxIndex < 0 { + return fmt.Errorf("seed peering max index must be a positive number, it is %d", cfg.SeedPeeringMaxIndex) + } + + pids, err := derivePeerIDs(cfg.Seed, cfg.SeedIndex, cfg.SeedPeeringMaxIndex) + if err != nil { + return err + } + + for _, pid := range pids { + // The peering module will automatically perform lookups to find the + // addresses of the given peers. + ps.AddPeer(peer.AddrInfo{ID: pid}) + } + + return nil +} + +func setupBitswap(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) *bitswap.Bitswap { + var ( + provideEnabled bool + peerBlockRequestFilter bsserver.PeerBlockRequestFilter + ) + if cfg.PeeringCache && len(cfg.Peering) > 0 { + peers := make(map[peer.ID]struct{}, len(cfg.Peering)) + for _, a := range cfg.Peering { + peers[a.ID] = struct{}{} + } + + provideEnabled = true + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { + _, ok := peers[p] + return ok + } + } else { + provideEnabled = false + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { + return false + } + } + + bsctx := metri.CtxScope(ctx, "ipfs_bitswap") + bn := bsnet.NewFromIpfsHost(h, cr) + bswap := bitswap.New(bsctx, bn, bstore, + // --- Client Options + // default is 1 minute to search for a random live-want (1 + // CID). I think we want to search for random live-wants more + // often although probably it overlaps with general + // rebroadcasts. + bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)), + // ProviderSearchDelay: default is 1 second. + bitswap.ProviderSearchDelay(time.Second), + bitswap.WithoutDuplicatedBlockStats(), + + // ---- Server Options + bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter), + bitswap.ProvideEnabled(provideEnabled), + ) + bn.Start(bswap) + + return bswap +} diff --git a/setup_routing.go b/setup_routing.go new file mode 100644 index 0000000..dd8d24e --- /dev/null +++ b/setup_routing.go @@ -0,0 +1,267 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/ipfs/boxo/ipns" + routingv1client "github.com/ipfs/boxo/routing/http/client" + httpcontentrouter "github.com/ipfs/boxo/routing/http/contentrouter" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" + record "github.com/libp2p/go-libp2p-record" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "go.opencensus.io/stats/view" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +func setupDelegatedRouting(cfg Config, dnsCache *cachedDNS) ([]routing.Routing, error) { + // Increase per-host connection pool since we are making lots of concurrent requests. + httpClient := &http.Client{ + Transport: otelhttp.NewTransport( + &routingv1client.ResponseBodyLimitedTransport{ + RoundTripper: &customTransport{ + // Roundtripper with increased defaults than http.Transport such that retrieving + // multiple lookups concurrently is fast. + RoundTripper: &http.Transport{ + MaxIdleConns: 1000, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + DialContext: dnsCache.dialWithCachedDNS, + ForceAttemptHTTP2: true, + }, + }, + LimitBytes: 1 << 20, + }), + } + + var ( + delegatedRouters []routing.Routing + ) + + for _, endpoint := range cfg.RoutingV1Endpoints { + rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)} + if endpoint != cidContactEndpoint { + rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired()) + } + delegatedRouter, err := delegatedHTTPContentRouter(endpoint, rv1Opts...) + if err != nil { + return nil, err + } + delegatedRouters = append(delegatedRouters, delegatedRouter) + } + + return delegatedRouters, nil +} + +func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter) (routing.Routing, error) { + if cfg.DHTRouting == DHTOff { + return nil, nil + } + + var err error + + var dhtHost host.Host + if cfg.DHTSharedHost { + dhtHost = h + } else { + dhtHost, err = libp2p.New( + libp2p.NoListenAddrs, + libp2p.BandwidthReporter(bwc), + libp2p.DefaultTransports, + libp2p.DefaultMuxers, + libp2p.ResourceManager(dhtRcMgr), + ) + if err != nil { + return nil, err + } + } + + standardClient, err := dht.New(ctx, dhtHost, + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.Mode(dht.ModeClient), + ) + if err != nil { + return nil, err + } + + if cfg.DHTRouting == DHTStandard { + return standardClient, nil + } + + if cfg.DHTRouting == DHTAccelerated { + fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, + fullrt.DHTOption( + dht.Validator(record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": ipns.Validator{KeyBook: h.Peerstore()}, + }), + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.BucketSize(20), + )) + if err != nil { + return nil, err + } + return &bundledDHT{ + standard: standardClient, + fullRT: fullRTClient, + }, nil + } + + return nil, fmt.Errorf("unknown DHTRouting option: %q", cfg.DHTRouting) +} + +func setupCompositeRouting(delegatedRouters []routing.Routing, dht routing.Routing) routing.Routing { + // Default router is no routing at all: can be especially useful during tests. + var router routing.Routing + router = &routinghelpers.Null{} + + if len(delegatedRouters) == 0 && dht != nil { + router = dht + } else { + var routers []*routinghelpers.ParallelRouter + + if dht != nil { + routers = append(routers, &routinghelpers.ParallelRouter{ + Router: dht, + ExecuteAfter: 0, + DoNotWaitForSearchValue: true, + IgnoreError: false, + }) + } + + for _, routingV1Router := range delegatedRouters { + routers = append(routers, &routinghelpers.ParallelRouter{ + Timeout: 15 * time.Second, + Router: routingV1Router, + ExecuteAfter: 0, + DoNotWaitForSearchValue: true, + IgnoreError: true, + }) + } + + if len(routers) > 0 { + router = routinghelpers.NewComposableParallel(routers) + } + } + + return router +} + +func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter, dnsCache *cachedDNS) (routing.ContentRouting, routing.PeerRouting, routing.ValueStore, error) { + delegatedRouters, err := setupDelegatedRouting(cfg, dnsCache) + if err != nil { + return nil, nil, nil, err + } + + dhtRouter, err := setupDHTRouting(ctx, cfg, h, ds, dhtRcMgr, bwc) + if err != nil { + return nil, nil, nil, err + } + + router := setupCompositeRouting(delegatedRouters, dhtRouter) + + var ( + cr routing.ContentRouting = router + pr routing.PeerRouting = router + vs routing.ValueStore = router + ) + + // If we're using seed peering, we need to run a lighter Amino DHT for the + // peering routing. We need to run a separate DHT with the main host if + // the shared host is disabled, or if we're not running any DHT at all. + if cfg.SeedPeering && (!cfg.DHTSharedHost || dhtRouter == nil) { + pr, err = dht.New(ctx, h, + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.Mode(dht.ModeClient), + ) + if err != nil { + return nil, nil, nil, err + } + } + + return cr, pr, vs, nil +} + +type bundledDHT struct { + standard *dht.IpfsDHT + fullRT *fullrt.FullRT +} + +func (b *bundledDHT) getDHT() routing.Routing { + if b.fullRT.Ready() { + return b.fullRT + } + return b.standard +} + +func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { + return b.getDHT().Provide(ctx, c, brdcst) +} + +func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo { + return b.getDHT().FindProvidersAsync(ctx, c, i) +} + +func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + return b.getDHT().FindPeer(ctx, id) +} + +func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error { + return b.getDHT().PutValue(ctx, k, v, option...) +} + +func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) { + return b.getDHT().GetValue(ctx, s, option...) +} + +func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) { + return b.getDHT().SearchValue(ctx, s, option...) +} + +func (b *bundledDHT) Bootstrap(ctx context.Context) error { + return b.standard.Bootstrap(ctx) +} + +var _ routing.Routing = (*bundledDHT)(nil) + +func delegatedHTTPContentRouter(endpoint string, rv1Opts ...routingv1client.Option) (routing.Routing, error) { + cli, err := routingv1client.New( + endpoint, + append([]routingv1client.Option{ + routingv1client.WithUserAgent(buildVersion()), + }, rv1Opts...)..., + ) + if err != nil { + return nil, err + } + + cr := httpcontentrouter.NewContentRoutingClient( + cli, + ) + + err = view.Register(routingv1client.OpenCensusViews...) + if err != nil { + return nil, fmt.Errorf("registering HTTP delegated routing views: %w", err) + } + + return &routinghelpers.Compose{ + ValueStore: cr, + PeerRouting: cr, + ContentRouting: cr, + }, nil +} diff --git a/setup_test.go b/setup_test.go index 4364ae2..ac852ee 100644 --- a/setup_test.go +++ b/setup_test.go @@ -8,6 +8,7 @@ import ( "time" blocks "github.com/ipfs/go-block-format" + ci "github.com/libp2p/go-libp2p-testing/ci" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -98,7 +99,6 @@ func mustPeeredNodes(t *testing.T, configuration [][]int, peeringShareCache bool for i, node := range nodes { for _, peer := range cfgs[i].Peering { if node.host.Network().Connectedness(peer.ID) != network.Connected { - t.Log(node.host.Network().Connectedness(peer.ID)) return false } } @@ -150,3 +150,76 @@ func TestPeeringCache(t *testing.T) { // confirm bitswap providing is disabled by default (no peering) checkBitswap(2, false) } + +func testSeedPeering(t *testing.T, n int, dhtRouting DHTRouting, dhtSharedHost bool) ([]ic.PrivKey, []peer.ID, []*Node) { + cdns := newCachedDNS(dnsCacheRefreshInterval) + defer cdns.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + seed, err := newSeed() + require.NoError(t, err) + + keys := make([]ic.PrivKey, n) + pids := make([]peer.ID, n) + + for i := 0; i < n; i++ { + keys[i], pids[i] = mustTestPeerFromSeed(t, seed, i) + } + + cfgs := make([]Config, n) + nodes := make([]*Node, n) + + for i := 0; i < n; i++ { + cfgs[i] = Config{ + DataDir: t.TempDir(), + BlockstoreType: "flatfs", + DHTRouting: dhtRouting, + DHTSharedHost: dhtSharedHost, + Seed: seed, + SeedIndex: i, + SeedPeering: true, + SeedPeeringMaxIndex: n, + } + + nodes[i], err = Setup(ctx, cfgs[i], keys[i], cdns) + require.NoError(t, err) + } + + require.Eventually(t, func() bool { + for i, node := range nodes { + for j, pid := range pids { + if i == j { + continue + } + + if node.host.Network().Connectedness(pid) != network.Connected { + return false + } + } + } + + return true + }, time.Second*120, time.Millisecond*100) + + return keys, pids, nodes +} + +func TestSeedPeering(t *testing.T) { + if ci.IsRunning() { + t.Skip("don't run seed peering tests in ci") + } + + t.Run("DHT disabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTOff, false) + }) + + t.Run("DHT enabled with shared host disabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTStandard, false) + }) + + t.Run("DHT enabled with shared host enabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTStandard, true) + }) +}