From 4228658fc098483b0cf3ff1624051e3a2f2b63f3 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 17 Apr 2024 12:56:23 +0200 Subject: [PATCH] feat: seed-based automatic peering --- go.mod | 1 + keys.go | 29 ++++- main.go | 31 +++++ setup.go | 309 ++++++++++++++++++++++++++++++-------------------- setup_test.go | 76 ++++++++++++- 5 files changed, 323 insertions(+), 123 deletions(-) diff --git a/go.mod b/go.mod index 9701b7e..b44cc94 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.25.2 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.7.3 + github.com/libp2p/go-libp2p-testing v0.12.0 github.com/mitchellh/go-server-timing v1.0.1 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.2 diff --git a/keys.go b/keys.go index 31c1d6d..6cbf000 100644 --- a/keys.go +++ b/keys.go @@ -9,6 +9,7 @@ import ( "io" libp2p "github.com/libp2p/go-libp2p/core/crypto" + peer "github.com/libp2p/go-libp2p/core/peer" "github.com/mr-tron/base58" "golang.org/x/crypto/hkdf" ) @@ -25,7 +26,7 @@ func newSeed() (string, error) { return base58.Encode(bs), nil } -// derive derives libp2p keys from a b58-encoded seed. +// deriveKey derives libp2p keys from a b58-encoded seed. func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) { secret, err := base58.Decode(b58secret) if err != nil { @@ -45,6 +46,32 @@ func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) { return libp2p.UnmarshalEd25519PrivateKey(key) } +// derivePeerIDs derives the peer IDs of all the peers with the same seed up to +// maxIndex. Our peer ID (with index 'ourIndex') is not generated. +func derivePeerIDs(seed string, ourIndex int, maxIndex int) ([]peer.ID, error) { + peerIDs := []peer.ID{} + + for i := 0; i <= maxIndex; i++ { + if i == ourIndex { + continue + } + + peerPriv, err := deriveKey(seed, deriveKeyInfo(i)) + if err != nil { + return nil, err + } + + pid, err := peer.IDFromPrivateKey(peerPriv) + if err != nil { + return nil, err + } + + peerIDs = append(peerIDs, pid) + } + + return peerIDs, nil +} + func deriveKeyInfo(index int) []byte { return []byte(fmt.Sprintf("rainbow-%d", index)) } diff --git a/main.go b/main.go index 68fa607..00dcd2e 100644 --- a/main.go +++ b/main.go @@ -91,6 +91,18 @@ Generate an identity seed and launch a gateway: EnvVars: []string{"RAINBOW_SEED_INDEX"}, Usage: "Index to derivate the peerID (needs --seed)", }, + &cli.BoolFlag{ + Name: "seed-peering", + Value: false, + EnvVars: []string{"RAINBOW_SEED_PEERING"}, + Usage: "Automatic peering with peers with the same seed (requires --seed and --seed-index). Runs a separate light DHT for peer routing with the main host if --dht-routing or --dht-shared-host are disabled", + }, + &cli.UintFlag{ + Name: "seed-peering-max-index", + Value: 100, + EnvVars: []string{"RAINBOW_SEED_PEERING_MAX_INDEX"}, + Usage: "Largest index to derive automatic peering peer IDs for", + }, &cli.StringSliceFlag{ Name: "gateway-domains", Value: cli.NewStringSlice(), @@ -315,6 +327,24 @@ share the same seed as long as the indexes are different. peeringAddrs = append(peeringAddrs, *ai) } + if cctx.Bool("seed-peering") { + if !cctx.IsSet("seed") || !cctx.IsSet("seed-index") { + return errors.New("--seed and --seed-index must be explicitly defined when --seed-peering is enabled") + } + + maxIndex := cctx.Uint("seed-peering-max-index") + peeringIDs, err := derivePeerIDs(seed, index, int(maxIndex)) + if err != nil { + return err + } + + for _, pid := range peeringIDs { + // The peering module will automatically perform lookups to find the + // addresses of the given peers. + peeringAddrs = append(peeringAddrs, peer.AddrInfo{ID: pid}) + } + } + cfg := Config{ DataDir: ddir, BlockstoreType: cctx.String("blockstore"), @@ -334,6 +364,7 @@ share the same seed as long as the indexes are different. DenylistSubs: cctx.StringSlice("denylists"), Peering: peeringAddrs, PeeringCache: cctx.Bool("peering-shared-cache"), + SeedPeering: cctx.Bool("seed-peering"), GCInterval: cctx.Duration("gc-interval"), GCThreshold: cctx.Float64("gc-threshold"), } diff --git a/setup.go b/setup.go index 5553163..8f5eaaf 100644 --- a/setup.go +++ b/setup.go @@ -45,9 +45,11 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/connmgr" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats/view" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" @@ -69,6 +71,11 @@ const ( DHTOff DHTRouting = "off" ) +func init() { + // Lets us discover our own public address with a single observation + identify.ActivationThresh = 1 +} + type Node struct { vs routing.ValueStore host host.Host @@ -81,6 +88,7 @@ type Node struct { resolver resolver.Resolver ns namesys.NameSystem + ps *peering.PeeringService bwc *metrics.BandwidthCounter @@ -114,6 +122,7 @@ type Config struct { DenylistSubs []string Peering []peer.AddrInfo PeeringCache bool + SeedPeering bool GCInterval time.Duration GCThreshold float64 @@ -187,133 +196,24 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached ) blkst = blockstore.NewIdStore(blkst) - var router routing.Routing - - // Increase per-host connection pool since we are making lots of concurrent requests. - httpClient := &http.Client{ - Transport: otelhttp.NewTransport( - &routingv1client.ResponseBodyLimitedTransport{ - RoundTripper: &customTransport{ - // Roundtripper with increased defaults than http.Transport such that retrieving - // multiple lookups concurrently is fast. - RoundTripper: &http.Transport{ - MaxIdleConns: 1000, - MaxConnsPerHost: 100, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 90 * time.Second, - DialContext: dnsCache.dialWithCachedDNS, - ForceAttemptHTTP2: true, - }, - }, - LimitBytes: 1 << 20, - }), - } + var ( + cr routing.ContentRouting + pr routing.PeerRouting + vs routing.ValueStore + ) opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - var routingV1Routers []routing.Routing - for _, endpoint := range cfg.RoutingV1Endpoints { - rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)} - if endpoint != cidContactEndpoint { - rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired()) - } - httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...) - if err != nil { - return nil, err - } - routingV1Routers = append(routingV1Routers, httpClient) - } - - var dhtRouter routing.Routing - if cfg.DHTRouting != DHTOff { - var dhtHost host.Host - if cfg.DHTSharedHost { - dhtHost = h - } else { - dhtHost, err = libp2p.New( - libp2p.NoListenAddrs, - libp2p.BandwidthReporter(bwc), - libp2p.DefaultTransports, - libp2p.DefaultMuxers, - libp2p.ResourceManager(dhtRcMgr), - ) - if err != nil { - return nil, err - } - } - - standardClient, err := dht.New(ctx, dhtHost, - dht.Datastore(ds), - dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.Mode(dht.ModeClient), - ) - if err != nil { - return nil, err - } - - if cfg.DHTRouting == DHTAccelerated { - fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, - fullrt.DHTOption( - dht.Validator(record.NamespacedValidator{ - "pk": record.PublicKeyValidator{}, - "ipns": ipns.Validator{KeyBook: h.Peerstore()}, - }), - dht.Datastore(ds), - dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.BucketSize(20), - )) - if err != nil { - return nil, err - } - dhtRouter = &bundledDHT{ - standard: standardClient, - fullRT: fullRTClient, - } - } else { - dhtRouter = standardClient - } - } - - // Default router is no routing at all: can be especially useful during tests. - router = &routinghelpers.Null{} - - if len(routingV1Routers) == 0 && dhtRouter != nil { - router = dhtRouter - } else { - var routers []*routinghelpers.ParallelRouter - - if dhtRouter != nil { - routers = append(routers, &routinghelpers.ParallelRouter{ - Router: dhtRouter, - ExecuteAfter: 0, - DoNotWaitForSearchValue: true, - IgnoreError: false, - }) - } - - for _, routingV1Router := range routingV1Routers { - routers = append(routers, &routinghelpers.ParallelRouter{ - Timeout: 15 * time.Second, - Router: routingV1Router, - ExecuteAfter: 0, - DoNotWaitForSearchValue: true, - IgnoreError: true, - }) - } - - if len(routers) > 0 { - router = routinghelpers.NewComposableParallel(routers) - } - } - - return router, nil + cr, pr, vs, err = setupRouting(ctx, cfg, h, ds, dhtRcMgr, bwc, dnsCache) + return pr, err })) h, err := libp2p.New(opts...) if err != nil { return nil, err } + var ps *peering.PeeringService if len(cfg.Peering) > 0 { - ps := peering.NewPeeringService(h) + ps = peering.NewPeeringService(h) if err := ps.Start(); err != nil { return nil, err } @@ -345,7 +245,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } bsctx := metri.CtxScope(ctx, "ipfs_bitswap") - bn := bsnet.NewFromIpfsHost(h, router) + bn := bsnet.NewFromIpfsHost(h, cr) bswap := bitswap.New(bsctx, bn, blkst, // --- Client Options // default is 1 minute to search for a random live-want (1 @@ -405,7 +305,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached if cfg.IpnsMaxCacheTTL > 0 { nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL)) } - ns, err := namesys.NewNameSystem(router, nsOptions...) + ns, err := namesys.NewNameSystem(vs, nsOptions...) if err != nil { return nil, err } @@ -424,7 +324,8 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached datastore: ds, bs: bswap, ns: ns, - vs: router, + vs: vs, + ps: ps, bsrv: bsrv, resolver: r, bwc: bwc, @@ -489,6 +390,172 @@ func setupDatastore(cfg Config) (datastore.Batching, error) { } } +func setupDelegatedRouting(cfg Config, dnsCache *cachedDNS) ([]routing.Routing, error) { + // Increase per-host connection pool since we are making lots of concurrent requests. + httpClient := &http.Client{ + Transport: otelhttp.NewTransport( + &routingv1client.ResponseBodyLimitedTransport{ + RoundTripper: &customTransport{ + // Roundtripper with increased defaults than http.Transport such that retrieving + // multiple lookups concurrently is fast. + RoundTripper: &http.Transport{ + MaxIdleConns: 1000, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + DialContext: dnsCache.dialWithCachedDNS, + ForceAttemptHTTP2: true, + }, + }, + LimitBytes: 1 << 20, + }), + } + + var ( + routingV1Routers []routing.Routing + ) + + for _, endpoint := range cfg.RoutingV1Endpoints { + rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)} + if endpoint != cidContactEndpoint { + rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired()) + } + delegatedRouter, err := delegatedHTTPContentRouter(endpoint, rv1Opts...) + if err != nil { + return nil, err + } + routingV1Routers = append(routingV1Routers, delegatedRouter) + } + + return routingV1Routers, nil +} + +func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter) (routing.Routing, error) { + if cfg.DHTRouting == DHTOff { + return nil, nil + } + + var err error + + var dhtHost host.Host + if cfg.DHTSharedHost { + dhtHost = h + } else { + dhtHost, err = libp2p.New( + libp2p.NoListenAddrs, + libp2p.BandwidthReporter(bwc), + libp2p.DefaultTransports, + libp2p.DefaultMuxers, + libp2p.ResourceManager(dhtRcMgr), + ) + if err != nil { + return nil, err + } + } + + standardClient, err := dht.New(ctx, dhtHost, + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.Mode(dht.ModeClient), + ) + if err != nil { + return nil, err + } + + if cfg.DHTRouting == DHTStandard { + return standardClient, nil + } + + if cfg.DHTRouting == DHTAccelerated { + fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, + fullrt.DHTOption( + dht.Validator(record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": ipns.Validator{KeyBook: h.Peerstore()}, + }), + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.BucketSize(20), + )) + if err != nil { + return nil, err + } + return &bundledDHT{ + standard: standardClient, + fullRT: fullRTClient, + }, nil + } + + return nil, fmt.Errorf("unknown DHTRouting option: %q", cfg.DHTRouting) +} + +func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter, dnsCache *cachedDNS) (routing.ContentRouting, routing.PeerRouting, routing.ValueStore, error) { + routingV1Routers, err := setupDelegatedRouting(cfg, dnsCache) + if err != nil { + return nil, nil, nil, err + } + + dhtRouter, err := setupDHTRouting(ctx, cfg, h, ds, dhtRcMgr, bwc) + if err != nil { + return nil, nil, nil, err + } + + // Default router is no routing at all: can be especially useful during tests. + var router routing.Routing + router = &routinghelpers.Null{} + + if len(routingV1Routers) == 0 && dhtRouter != nil { + router = dhtRouter + } else { + var routers []*routinghelpers.ParallelRouter + + if dhtRouter != nil { + routers = append(routers, &routinghelpers.ParallelRouter{ + Router: dhtRouter, + ExecuteAfter: 0, + DoNotWaitForSearchValue: true, + IgnoreError: false, + }) + } + + for _, routingV1Router := range routingV1Routers { + routers = append(routers, &routinghelpers.ParallelRouter{ + Timeout: 15 * time.Second, + Router: routingV1Router, + ExecuteAfter: 0, + DoNotWaitForSearchValue: true, + IgnoreError: true, + }) + } + + if len(routers) > 0 { + router = routinghelpers.NewComposableParallel(routers) + } + } + + var ( + cr routing.ContentRouting = router + pr routing.PeerRouting = router + vs routing.ValueStore = router + ) + + // If we're using seed peering, we need to run a lighter Amino DHT for the + // peering routing. We need to run a separate DHT with the main host if + // the shared host is disabled, or if we're not running any DHT at all. + if cfg.SeedPeering && (!cfg.DHTSharedHost || dhtRouter == nil) { + pr, err = dht.New(ctx, h, + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.Mode(dht.ModeClient), + ) + if err != nil { + return nil, nil, nil, err + } + } + + return cr, pr, vs, nil +} + type bundledDHT struct { standard *dht.IpfsDHT fullRT *fullrt.FullRT diff --git a/setup_test.go b/setup_test.go index 4364ae2..0f77a41 100644 --- a/setup_test.go +++ b/setup_test.go @@ -8,6 +8,7 @@ import ( "time" blocks "github.com/ipfs/go-block-format" + ci "github.com/libp2p/go-libp2p-testing/ci" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -98,7 +99,6 @@ func mustPeeredNodes(t *testing.T, configuration [][]int, peeringShareCache bool for i, node := range nodes { for _, peer := range cfgs[i].Peering { if node.host.Network().Connectedness(peer.ID) != network.Connected { - t.Log(node.host.Network().Connectedness(peer.ID)) return false } } @@ -150,3 +150,77 @@ func TestPeeringCache(t *testing.T) { // confirm bitswap providing is disabled by default (no peering) checkBitswap(2, false) } + +func testSeedPeering(t *testing.T, n int, dhtRouting DHTRouting, dhtSharedHost bool) ([]ic.PrivKey, []peer.ID, []*Node) { + cdns := newCachedDNS(dnsCacheRefreshInterval) + defer cdns.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + seed, err := newSeed() + require.NoError(t, err) + + keys := make([]ic.PrivKey, n) + pids := make([]peer.ID, n) + + for i := 0; i < n; i++ { + keys[i], pids[i] = mustTestPeerFromSeed(t, seed, i) + } + + cfgs := make([]Config, n) + nodes := make([]*Node, n) + + for i := 0; i < n; i++ { + cfgs[i] = Config{ + DataDir: t.TempDir(), + BlockstoreType: "flatfs", + DHTRouting: dhtRouting, + DHTSharedHost: dhtSharedHost, + SeedPeering: true, + } + + // Add all remaining peers to the peering config. + for j, pid := range pids { + if j == i { + continue + } + cfgs[i].Peering = append(cfgs[i].Peering, peer.AddrInfo{ID: pid}) + } + + nodes[i], err = Setup(ctx, cfgs[i], keys[i], cdns) + require.NoError(t, err) + } + + require.Eventually(t, func() bool { + for i, node := range nodes { + for _, peer := range cfgs[i].Peering { + if node.host.Network().Connectedness(peer.ID) != network.Connected { + return false + } + } + } + + return true + }, time.Second*120, time.Millisecond*100) + + return keys, pids, nodes +} + +func TestSeedPeering(t *testing.T) { + if ci.IsRunning() { + t.Skip("don't run seed peering tests in ci") + } + + t.Run("DHT disabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTOff, false) + }) + + t.Run("DHT enabled with shared host disabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTStandard, false) + }) + + t.Run("DHT enabled with shared host enabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTStandard, true) + }) +}