From 1710b561809f4f6cc004bd273afc3563c3da6ab8 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Mon, 22 Apr 2024 08:09:03 +0200 Subject: [PATCH] feat: shared cache for manual peering (#114) --- docs/environment-variables.md | 29 +++++++ go.mod | 4 +- handler_test.go | 71 ---------------- keys.go | 5 ++ main.go | 51 +++++++++++- main_test.go | 133 +++++++++++++++++++++++++++++ setup.go | 54 +++++++++--- setup_test.go | 152 ++++++++++++++++++++++++++++++++++ 8 files changed, 412 insertions(+), 87 deletions(-) create mode 100644 main_test.go create mode 100644 setup_test.go diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 0009283..7fcc007 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -9,6 +9,8 @@ - [`RAINBOW_GC_INTERVAL`](#rainbow_gc_interval) - [`RAINBOW_GC_THRESHOLD`](#rainbow_gc_threshold) - [`RAINBOW_IPNS_MAX_CACHE_TTL`](#rainbow_ipns_max_cache_ttl) + - [`RAINBOW_PEERING`](#rainbow_peering) + - [`RAINBOW_PEERING_SHARED_CACHE`](#rainbow_peering_shared_cache) - [Logging](#logging) - [`GOLOG_LOG_LEVEL`](#golog_log_level) - [`GOLOG_LOG_FMT`](#golog_log_fmt) @@ -90,6 +92,33 @@ with [DNSLink](https://dnslink.dev/). Default: No upper bound, [TTL from IPNS Record](https://specs.ipfs.tech/ipns/ipns-record/#ttl-uint64) or [TTL from DNSLink](https://datatracker.ietf.org/doc/html/rfc2181#section-8) used as-is. +### `RAINBOW_PEERING` + +A comma-separated list of [multiaddresses](https://docs.libp2p.io/concepts/fundamentals/addressing/) of peers to stay connected to. + + +If `RAINBOW_SEED` is set and `/p2p/rainbow-seed/N` value is found here, Rainbow +will replace it with a valid `/p2p/` for a peer ID generated from same seed +and index `N`. + +Default: not set (no peering) + + +### `RAINBOW_PEERING_SHARED_CACHE` + +Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING`. + +Once enabled, Rainbow will respond to [Bitswap](https://docs.ipfs.tech/concepts/bitswap/) +queries from these safelisted peers, serving locally cached blocks if requested. + +The main use case for this feature is scaling and load balancing acrosss a +fleet of rainbow, or other bitswap-capable IPFS services. Cache sharing allows +clustered services to check if any of the other instances has a requested CID. +This saves resources as data cached on other instance can be fetched internally +(e.g. LAN) rather than externally (WAN, p2p). + +Default: `false` (no cache sharing) + ## Logging ### `GOLOG_LOG_LEVEL` diff --git a/go.mod b/go.mod index 4cefc9a..1190e97 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/ipfs-shipyard/nopfs v0.0.12 github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a github.com/ipfs/boxo v0.19.1-0.20240418055150-eeea41458735 + github.com/ipfs/go-block-format v0.2.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger4 v0.1.5 @@ -90,11 +91,11 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect - github.com/ipfs/go-block-format v0.2.0 // indirect github.com/ipfs/go-blockservice v0.5.2 // indirect github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect + github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect github.com/ipfs/go-ipld-cbor v0.1.0 // indirect @@ -102,6 +103,7 @@ require ( github.com/ipfs/go-ipld-legacy v0.2.1 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-merkledag v0.11.0 // indirect + github.com/ipfs/go-peertaskqueue v0.8.1 // indirect github.com/ipfs/go-verifcid v0.0.3 // indirect github.com/ipld/go-car v0.6.2 // indirect github.com/ipld/go-car/v2 v2.13.1 // indirect diff --git a/handler_test.go b/handler_test.go index f51d2d3..120d775 100644 --- a/handler_test.go +++ b/handler_test.go @@ -1,84 +1,13 @@ package main import ( - "bytes" - "context" "net/http" - "net/http/httptest" "testing" - chunker "github.com/ipfs/boxo/chunker" - "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" - uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - util "github.com/ipfs/boxo/util" - "github.com/ipfs/go-cid" - ic "github.com/libp2p/go-libp2p/core/crypto" - "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func mustTestNode(t *testing.T, cfg Config) *Node { - cfg.DataDir = t.TempDir() - cfg.BlockstoreType = "flatfs" - cfg.DHTRouting = DHTStandard - cfg.RoutingV1Endpoints = []string{cidContactEndpoint} - - ctx := context.Background() - - sr := util.NewTimeSeededRand() - sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, sr) - require.NoError(t, err) - - cdns := newCachedDNS(dnsCacheRefreshInterval) - - t.Cleanup(func() { - _ = cdns.Close() - }) - - gnd, err := Setup(ctx, cfg, sk, cdns) - require.NoError(t, err) - return gnd -} - -func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) { - gnd := mustTestNode(t, cfg) - - handler, err := setupGatewayHandler(cfg, gnd) - if err != nil { - require.NoError(t, err) - } - - ts := httptest.NewServer(handler) - - return ts, gnd -} - -func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid { - dsrv := merkledag.NewDAGService(gnd.bsrv) - - // Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/ - ufsImportParams := uih.DagBuilderParams{ - Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block - RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper - CidBuilder: cid.V1Builder{ // Use CIDv1 for all links - Codec: uint64(multicodec.DagPb), - MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function - MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits) - }, - Dagserv: dsrv, - NoCopy: false, - } - ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks - require.NoError(t, err) - - nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout - require.NoError(t, err) - - return nd.Cid() -} - func TestTrustless(t *testing.T) { t.Parallel() diff --git a/keys.go b/keys.go index a22cf39..31c1d6d 100644 --- a/keys.go +++ b/keys.go @@ -5,6 +5,7 @@ import ( crand "crypto/rand" "crypto/sha256" "errors" + "fmt" "io" libp2p "github.com/libp2p/go-libp2p/core/crypto" @@ -43,3 +44,7 @@ func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) { key := ed25519.NewKeyFromSeed(keySeed) return libp2p.UnmarshalEd25519PrivateKey(key) } + +func deriveKeyInfo(index int) []byte { + return []byte(fmt.Sprintf("rainbow-%d", index)) +} diff --git a/main.go b/main.go index 324868d..43318d6 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,9 @@ import ( "os" "os/signal" "path/filepath" + "regexp" "runtime" + "strconv" "strings" "sync" "syscall" @@ -205,6 +207,12 @@ Generate an identity seed and launch a gateway: EnvVars: []string{"RAINBOW_PEERING"}, Usage: "Multiaddresses of peers to stay connected to (comma-separated)", }, + &cli.BoolFlag{ + Name: "peering-shared-cache", + Value: false, + EnvVars: []string{"RAINBOW_PEERING_SHARED_CACHE"}, + Usage: "Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.", + }, &cli.StringFlag{ Name: "blockstore", Value: "flatfs", @@ -250,6 +258,8 @@ share the same seed as long as the indexes are different. } app.Action = func(cctx *cli.Context) error { + fmt.Printf("Starting %s %s\n", name, version) + ddir := cctx.String("datadir") cdns := newCachedDNS(dnsCacheRefreshInterval) defer cdns.Close() @@ -280,8 +290,8 @@ share the same seed as long as the indexes are different. index := cctx.Int("seed-index") if len(seed) > 0 && index >= 0 { - fmt.Println("Deriving identity from seed") - priv, err = deriveKey(seed, []byte(fmt.Sprintf("rainbow-%d", index))) + fmt.Printf("Deriving identity from seed[%d]\n", index) + priv, err = deriveKey(seed, deriveKeyInfo(index)) } else { fmt.Println("Setting identity from libp2p.key") keyFile := filepath.Join(secretsDir, "libp2p.key") @@ -293,6 +303,15 @@ share the same seed as long as the indexes are different. var peeringAddrs []peer.AddrInfo for _, maStr := range cctx.StringSlice("peering") { + if len(seed) > 0 && index >= 0 { + maStr, err = replaceRainbowSeedWithPeer(maStr, seed) + if err != nil { + return err + } + } else if rainbowSeedRegex.MatchString(maStr) { + return fmt.Errorf("unable to peer with %q without defining --seed-index of this instance first", maStr) + } + ai, err := peer.AddrInfoFromString(maStr) if err != nil { return err @@ -318,6 +337,7 @@ share the same seed as long as the indexes are different. IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"), DenylistSubs: cctx.StringSlice("denylists"), Peering: peeringAddrs, + PeeringCache: cctx.Bool("peering-shared-cache"), GCInterval: cctx.Duration("gc-interval"), GCThreshold: cctx.Float64("gc-threshold"), } @@ -342,7 +362,6 @@ share the same seed as long as the indexes are different. Handler: handler, } - fmt.Printf("Starting %s %s\n", name, version) pid, err := peer.IDFromPublicKey(priv.GetPublic()) if err != nil { return err @@ -484,3 +503,29 @@ func printIfListConfigured(message string, list []string) { fmt.Printf(message+"%v\n", strings.Join(list, ", ")) } } + +var rainbowSeedRegex = regexp.MustCompile(`/p2p/rainbow-seed/(\d+)`) + +func replaceRainbowSeedWithPeer(addr string, seed string) (string, error) { + match := rainbowSeedRegex.FindStringSubmatch(addr) + if len(match) != 2 { + return addr, nil + } + + index, err := strconv.Atoi(match[1]) + if err != nil { + return "", err + } + + priv, err := deriveKey(seed, deriveKeyInfo(index)) + if err != nil { + return "", err + } + + pid, err := peer.IDFromPublicKey(priv.GetPublic()) + if err != nil { + return "", err + } + + return strings.Replace(addr, match[0], "/p2p/"+pid.String(), 1), nil +} diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..5b0d9b3 --- /dev/null +++ b/main_test.go @@ -0,0 +1,133 @@ +package main + +import ( + "bytes" + "context" + "crypto/rand" + "net/http/httptest" + "testing" + + chunker "github.com/ipfs/boxo/chunker" + "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" + uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" + "github.com/ipfs/go-cid" + ic "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multicodec" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func mustTestPeer(t *testing.T) (ic.PrivKey, peer.ID) { + sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, rand.Reader) + require.NoError(t, err) + + pid, err := peer.IDFromPrivateKey(sk) + require.NoError(t, err) + + return sk, pid +} + +func mustTestPeerFromSeed(t *testing.T, seed string, index int) (ic.PrivKey, peer.ID) { + sk, err := deriveKey(seed, deriveKeyInfo(index)) + require.NoError(t, err) + + pid, err := peer.IDFromPrivateKey(sk) + require.NoError(t, err) + + return sk, pid +} + +func mustTestNode(t *testing.T, cfg Config) *Node { + sk, _ := mustTestPeer(t) + return mustTestNodeWithKey(t, cfg, sk) +} + +func mustTestNodeWithKey(t *testing.T, cfg Config, sk ic.PrivKey) *Node { + // Set necessary fields if not defined. + if cfg.DataDir == "" { + cfg.DataDir = t.TempDir() + } + if cfg.BlockstoreType == "" { + cfg.BlockstoreType = "flatfs" + } + if cfg.DHTRouting == "" { + cfg.DHTRouting = DHTOff + } + + ctx := context.Background() + cdns := newCachedDNS(dnsCacheRefreshInterval) + + t.Cleanup(func() { + _ = cdns.Close() + }) + + nd, err := Setup(ctx, cfg, sk, cdns) + require.NoError(t, err) + return nd +} + +func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) { + nd := mustTestNode(t, cfg) + + handler, err := setupGatewayHandler(cfg, nd) + if err != nil { + require.NoError(t, err) + } + + ts := httptest.NewServer(handler) + return ts, nd +} + +func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid { + dsrv := merkledag.NewDAGService(gnd.bsrv) + + // Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/ + ufsImportParams := uih.DagBuilderParams{ + Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block + RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper + CidBuilder: cid.V1Builder{ // Use CIDv1 for all links + Codec: uint64(multicodec.DagPb), + MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function + MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits) + }, + Dagserv: dsrv, + NoCopy: false, + } + ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks + require.NoError(t, err) + + nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout + require.NoError(t, err) + + return nd.Cid() +} + +func TestReplaceRainbowSeedWithPeer(t *testing.T) { + t.Parallel() + + seed, err := newSeed() + require.NoError(t, err) + + _, pid0 := mustTestPeerFromSeed(t, seed, 0) + _, pid1 := mustTestPeerFromSeed(t, seed, 1) + _, pid42 := mustTestPeerFromSeed(t, seed, 42) + + testCases := []struct { + input string + output string + }{ + {"/dns/example/tcp/4001", "/dns/example/tcp/4001"}, + {"/dns/example/tcp/4001/p2p/" + pid0.String(), "/dns/example/tcp/4001/p2p/" + pid0.String()}, + {"/dns/example/tcp/4001/p2p/rainbow-seed/0", "/dns/example/tcp/4001/p2p/" + pid0.String()}, + {"/dns/example/tcp/4001/p2p/rainbow-seed/1", "/dns/example/tcp/4001/p2p/" + pid1.String()}, + {"/dns/example/tcp/4001/p2p/rainbow-seed/42", "/dns/example/tcp/4001/p2p/" + pid42.String()}, + } + + for _, tc := range testCases { + res, err := replaceRainbowSeedWithPeer(tc.input, seed) + assert.NoError(t, err) + assert.Equal(t, tc.output, res) + } +} diff --git a/setup.go b/setup.go index d1e168a..3ac52ea 100644 --- a/setup.go +++ b/setup.go @@ -15,8 +15,9 @@ import ( "github.com/dgraph-io/badger/v4/options" nopfs "github.com/ipfs-shipyard/nopfs" nopfsipfs "github.com/ipfs-shipyard/nopfs/ipfs" - bsclient "github.com/ipfs/boxo/bitswap/client" + "github.com/ipfs/boxo/bitswap" bsnet "github.com/ipfs/boxo/bitswap/network" + bsserver "github.com/ipfs/boxo/bitswap/server" "github.com/ipfs/boxo/blockservice" "github.com/ipfs/boxo/blockstore" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" @@ -75,7 +76,7 @@ type Node struct { dataDir string datastore datastore.Batching blockstore blockstore.Blockstore - bsClient *bsclient.Client + bs *bitswap.Bitswap bsrv blockservice.BlockService resolver resolver.Resolver @@ -112,6 +113,7 @@ type Config struct { DenylistSubs []string Peering []peer.AddrInfo + PeeringCache bool GCInterval time.Duration GCThreshold float64 @@ -273,11 +275,10 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } } - if len(routingV1Routers) == 0 && dhtRouter == nil { - return nil, errors.New("no routers configured: enable dht and/or configure /routing/v1 http endpoint") - } + // Default router is no routing at all: can be especially useful during tests. + router = &routinghelpers.Null{} - if len(routingV1Routers) == 0 { + if len(routingV1Routers) == 0 && dhtRouter != nil { router = dhtRouter } else { var routers []*routinghelpers.ParallelRouter @@ -301,7 +302,9 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached }) } - router = routinghelpers.NewComposableParallel(routers) + if len(routers) > 0 { + router = routinghelpers.NewComposableParallel(routers) + } } return router, nil @@ -321,17 +324,44 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } } + var ( + provideEnabled bool + peerBlockRequestFilter bsserver.PeerBlockRequestFilter + ) + if cfg.PeeringCache && len(cfg.Peering) > 0 { + peers := make(map[peer.ID]struct{}, len(cfg.Peering)) + for _, a := range cfg.Peering { + peers[a.ID] = struct{}{} + } + + provideEnabled = true + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { + _, ok := peers[p] + return ok + } + } else { + provideEnabled = false + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { + return false + } + } + bsctx := metri.CtxScope(ctx, "ipfs_bitswap") bn := bsnet.NewFromIpfsHost(h, router) - bswap := bsclient.New(bsctx, bn, blkst, + bswap := bitswap.New(bsctx, bn, blkst, + // --- Client Options // default is 1 minute to search for a random live-want (1 // CID). I think we want to search for random live-wants more // often although probably it overlaps with general // rebroadcasts. - bsclient.RebroadcastDelay(delay.Fixed(10*time.Second)), + bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)), // ProviderSearchDelay: default is 1 second. - bsclient.ProviderSearchDelay(time.Second), - bsclient.WithoutDuplicatedBlockStats(), + bitswap.ProviderSearchDelay(time.Second), + bitswap.WithoutDuplicatedBlockStats(), + + // ---- Server Options + bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter), + bitswap.ProvideEnabled(provideEnabled), ) bn.Start(bswap) @@ -394,7 +424,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached blockstore: blkst, dataDir: cfg.DataDir, datastore: ds, - bsClient: bswap, + bs: bswap, ns: ns, vs: router, bsrv: bsrv, diff --git a/setup_test.go b/setup_test.go new file mode 100644 index 0000000..4364ae2 --- /dev/null +++ b/setup_test.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + blocks "github.com/ipfs/go-block-format" + ic "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func mustFreePort(t *testing.T) (int, *net.TCPListener) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + require.NoError(t, err) + + l, err := net.ListenTCP("tcp", addr) + require.NoError(t, err) + + return l.Addr().(*net.TCPAddr).Port, l +} + +func mustFreePorts(t *testing.T, n int) []int { + ports := make([]int, 0) + for i := 0; i < n; i++ { + port, listener := mustFreePort(t) + defer listener.Close() + ports = append(ports, port) + } + + return ports +} + +func mustListenAddrWithPort(t *testing.T, port int) multiaddr.Multiaddr { + ma, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + require.NoError(t, err) + return ma +} + +// mustPeeredNodes creates a network of [Node]s with the given configuration. +// The configuration contains as many elements as there are nodes. Each element +// indicates to which other nodes it is connected. +// +// Example configuration: [][]int{ +// {1, 2}, +// {0}, +// {0}, +// } +// +// - Node 0 is connected to nodes 1 and 2. +// - Node 1 is connected to node 0. +// - Node 2 is connected to node 1. +func mustPeeredNodes(t *testing.T, configuration [][]int, peeringShareCache bool) []*Node { + n := len(configuration) + + // Generate ports, secrets keys, peer IDs and multiaddresses. + ports := mustFreePorts(t, n) + keys := make([]ic.PrivKey, n) + pids := make([]peer.ID, n) + mas := make([]multiaddr.Multiaddr, n) + addrInfos := make([]peer.AddrInfo, n) + + for i := 0; i < n; i++ { + keys[i], pids[i] = mustTestPeer(t) + mas[i] = mustListenAddrWithPort(t, ports[i]) + addrInfos[i] = peer.AddrInfo{ + ID: pids[i], + Addrs: []multiaddr.Multiaddr{mas[i]}, + } + } + + cfgs := make([]Config, n) + nodes := make([]*Node, n) + for i := 0; i < n; i++ { + cfgs[i] = Config{ + DHTRouting: DHTOff, + RoutingV1Endpoints: []string{}, + ListenAddrs: []string{mas[i].String()}, + Peering: []peer.AddrInfo{}, + PeeringCache: peeringShareCache, + } + + for _, j := range configuration[i] { + cfgs[i].Peering = append(cfgs[i].Peering, addrInfos[j]) + } + + nodes[i] = mustTestNodeWithKey(t, cfgs[i], keys[i]) + + t.Log("Node", i, "Addresses", nodes[i].host.Addrs(), "Peering", cfgs[i].Peering) + } + + require.Eventually(t, func() bool { + for i, node := range nodes { + for _, peer := range cfgs[i].Peering { + if node.host.Network().Connectedness(peer.ID) != network.Connected { + t.Log(node.host.Network().Connectedness(peer.ID)) + return false + } + } + } + + return true + }, time.Second*30, time.Millisecond*100) + + return nodes +} + +func TestPeering(t *testing.T) { + _ = mustPeeredNodes(t, [][]int{ + {1, 2}, + {0, 2}, + {0, 1}, + }, false) +} + +func TestPeeringCache(t *testing.T) { + nodes := mustPeeredNodes(t, [][]int{ + {1}, // 0 peered to 1 + {0}, // 1 peered to 0 + {}, // 2 not peered to anyone + }, true) + + bl := blocks.NewBlock([]byte(string("peering-cache-test"))) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + checkBitswap := func(i int, success bool) { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + _, err := nodes[i].bsrv.GetBlock(ctx, bl.Cid()) + if success { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + + err := nodes[0].bsrv.AddBlock(ctx, bl) + require.NoError(t, err) + + // confirm peering enables cache sharing, and bitswap retrieval from safe-listed node works + checkBitswap(1, true) + // confirm bitswap providing is disabled by default (no peering) + checkBitswap(2, false) +}