Skip to content

Commit

Permalink
feat: peering shared cache
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Apr 18, 2024
1 parent ce6190d commit e259336
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 85 deletions.
11 changes: 11 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [`RAINBOW_GC_INTERVAL`](#rainbow_gc_interval)
- [`RAINBOW_GC_THRESHOLD`](#rainbow_gc_threshold)
- [`RAINBOW_IPNS_MAX_CACHE_TTL`](#rainbow_ipns_max_cache_ttl)
- [`RAINBOW_PEERING`](#rainbow_peering)
- [Logging](#logging)
- [`GOLOG_LOG_LEVEL`](#golog_log_level)
- [`GOLOG_LOG_FMT`](#golog_log_fmt)
Expand Down Expand Up @@ -90,6 +91,16 @@ with [DNSLink](https://dnslink.dev/).

Default: No upper bound, [TTL from IPNS Record](https://specs.ipfs.tech/ipns/ipns-record/#ttl-uint64) or [TTL from DNSLink](https://datatracker.ietf.org/doc/html/rfc2181#section-8) used as-is.

### `RAINBOW_PEERING`

A comma-separated list of multiaddresses of peers to stay connected to.

If `RAINBOW_SEED` is also present, Rainbow automatically replaces in multi-addresses
that contain `/p2p/rainbow-seed/N` with the peer ID generated with same seed
and index `N`.

Default: no peering.

## Logging

### `GOLOG_LOG_LEVEL`
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/ipfs-shipyard/nopfs v0.0.12
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a
github.com/ipfs/boxo v0.19.1-0.20240418055150-eeea41458735
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger4 v0.1.5
Expand Down Expand Up @@ -90,18 +91,19 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-blockservice v0.5.2 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-cbor v0.1.0 // indirect
github.com/ipfs/go-ipld-format v0.6.0 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/go-verifcid v0.0.3 // indirect
github.com/ipld/go-car v0.6.2 // indirect
github.com/ipld/go-car/v2 v2.13.1 // indirect
Expand Down
71 changes: 0 additions & 71 deletions handler_test.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,13 @@
package main

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"

chunker "github.com/ipfs/boxo/chunker"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
util "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = DHTStandard
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

sr := util.NewTimeSeededRand()
sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, sr)
require.NoError(t, err)

cdns := newCachedDNS(dnsCacheRefreshInterval)

t.Cleanup(func() {
_ = cdns.Close()
})

gnd, err := Setup(ctx, cfg, sk, cdns)
require.NoError(t, err)
return gnd
}

func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) {
gnd := mustTestNode(t, cfg)

handler, err := setupGatewayHandler(cfg, gnd)
if err != nil {
require.NoError(t, err)
}

ts := httptest.NewServer(handler)

return ts, gnd
}

func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid {
dsrv := merkledag.NewDAGService(gnd.bsrv)

// Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/
ufsImportParams := uih.DagBuilderParams{
Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block
RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper
CidBuilder: cid.V1Builder{ // Use CIDv1 for all links
Codec: uint64(multicodec.DagPb),
MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function
MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits)
},
Dagserv: dsrv,
NoCopy: false,
}
ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks
require.NoError(t, err)

nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout
require.NoError(t, err)

return nd.Cid()
}

func TestTrustless(t *testing.T) {
t.Parallel()

Expand Down
5 changes: 5 additions & 0 deletions keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"io"

libp2p "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -43,3 +44,7 @@ func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) {
key := ed25519.NewKeyFromSeed(keySeed)
return libp2p.UnmarshalEd25519PrivateKey(key)
}

func deriveKeyInfo(index int) []byte {
return []byte(fmt.Sprintf("rainbow-%d", index))
}
44 changes: 43 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"os"
"os/signal"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -205,6 +207,12 @@ Generate an identity seed and launch a gateway:
EnvVars: []string{"RAINBOW_PEERING"},
Usage: "Multiaddresses of peers to stay connected to (comma-separated)",
},
&cli.BoolFlag{
Name: "peering-shared-cache",
Value: false,
EnvVars: []string{"RAINBOW_PEERING_SHARED_CACHE"},
Usage: "Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
},
&cli.StringFlag{
Name: "blockstore",
Value: "flatfs",
Expand Down Expand Up @@ -281,7 +289,7 @@ share the same seed as long as the indexes are different.
index := cctx.Int("seed-index")
if len(seed) > 0 && index >= 0 {
fmt.Println("Deriving identity from seed")
priv, err = deriveKey(seed, []byte(fmt.Sprintf("rainbow-%d", index)))
priv, err = deriveKey(seed, deriveKeyInfo(index))

Check warning on line 292 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L292

Added line #L292 was not covered by tests
} else {
fmt.Println("Setting identity from libp2p.key")
keyFile := filepath.Join(secretsDir, "libp2p.key")
Expand All @@ -293,6 +301,13 @@ share the same seed as long as the indexes are different.

var peeringAddrs []peer.AddrInfo
for _, maStr := range cctx.StringSlice("peering") {
if len(seed) > 0 && index >= 0 {
maStr, err = replaceRainbowSeedWithPeer(maStr, seed)
if err != nil {
return err

Check warning on line 307 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L304-L307

Added lines #L304 - L307 were not covered by tests
}
}

ai, err := peer.AddrInfoFromString(maStr)
if err != nil {
return err
Expand All @@ -318,6 +333,7 @@ share the same seed as long as the indexes are different.
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
PeeringCache: cctx.Bool("peering-shared-cache"),

Check warning on line 336 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L336

Added line #L336 was not covered by tests
GCInterval: cctx.Duration("gc-interval"),
GCThreshold: cctx.Float64("gc-threshold"),
}
Expand Down Expand Up @@ -484,3 +500,29 @@ func printIfListConfigured(message string, list []string) {
fmt.Printf(message+"%v\n", strings.Join(list, ", "))
}
}

var rainbowSeedRegex = regexp.MustCompile(`/p2p/rainbow-seed/(\d+)`)

func replaceRainbowSeedWithPeer(addr string, seed string) (string, error) {
match := rainbowSeedRegex.FindStringSubmatch(addr)
if len(match) != 2 {
return addr, nil
}

index, err := strconv.Atoi(match[1])
if err != nil {
return "", err

Check warning on line 514 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L514

Added line #L514 was not covered by tests
}

priv, err := deriveKey(seed, deriveKeyInfo(index))
if err != nil {
return "", err

Check warning on line 519 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L519

Added line #L519 was not covered by tests
}

pid, err := peer.IDFromPublicKey(priv.GetPublic())
if err != nil {
return "", err

Check warning on line 524 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L524

Added line #L524 was not covered by tests
}

return strings.Replace(addr, match[0], "/p2p/"+pid.String(), 1), nil
}
133 changes: 133 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package main

import (
"bytes"
"context"
"crypto/rand"
"net/http/httptest"
"testing"

chunker "github.com/ipfs/boxo/chunker"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
"github.com/ipfs/go-cid"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func mustTestPeer(t *testing.T) (ic.PrivKey, peer.ID) {
sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, rand.Reader)
require.NoError(t, err)

pid, err := peer.IDFromPrivateKey(sk)
require.NoError(t, err)

return sk, pid
}

func mustTestPeerFromSeed(t *testing.T, seed string, index int) (ic.PrivKey, peer.ID) {
sk, err := deriveKey(seed, deriveKeyInfo(index))
require.NoError(t, err)

pid, err := peer.IDFromPrivateKey(sk)
require.NoError(t, err)

return sk, pid
}

func mustTestNode(t *testing.T, cfg Config) *Node {
sk, _ := mustTestPeer(t)
return mustTestNodeWithKey(t, cfg, sk)
}

func mustTestNodeWithKey(t *testing.T, cfg Config, sk ic.PrivKey) *Node {
// Set necessary fields if not defined.
if cfg.DataDir == "" {
cfg.DataDir = t.TempDir()
}
if cfg.BlockstoreType == "" {
cfg.BlockstoreType = "flatfs"
}
if cfg.DHTRouting == "" {
cfg.DHTRouting = DHTOff
}

ctx := context.Background()
cdns := newCachedDNS(dnsCacheRefreshInterval)

t.Cleanup(func() {
_ = cdns.Close()
})

nd, err := Setup(ctx, cfg, sk, cdns)
require.NoError(t, err)
return nd
}

func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) {
nd := mustTestNode(t, cfg)

handler, err := setupGatewayHandler(cfg, nd)
if err != nil {
require.NoError(t, err)
}

ts := httptest.NewServer(handler)
return ts, nd
}

func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid {
dsrv := merkledag.NewDAGService(gnd.bsrv)

// Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/
ufsImportParams := uih.DagBuilderParams{
Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block
RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper
CidBuilder: cid.V1Builder{ // Use CIDv1 for all links
Codec: uint64(multicodec.DagPb),
MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function
MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits)
},
Dagserv: dsrv,
NoCopy: false,
}
ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks
require.NoError(t, err)

nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout
require.NoError(t, err)

return nd.Cid()
}

func TestReplaceRainbowSeedWithPeer(t *testing.T) {
t.Parallel()

seed, err := newSeed()
require.NoError(t, err)

_, pid0 := mustTestPeerFromSeed(t, seed, 0)
_, pid1 := mustTestPeerFromSeed(t, seed, 1)
_, pid42 := mustTestPeerFromSeed(t, seed, 42)

testCases := []struct {
input string
output string
}{
{"/dns/example/tcp/4001", "/dns/example/tcp/4001"},
{"/dns/example/tcp/4001/p2p/" + pid0.String(), "/dns/example/tcp/4001/p2p/" + pid0.String()},
{"/dns/example/tcp/4001/p2p/rainbow-seed/0", "/dns/example/tcp/4001/p2p/" + pid0.String()},
{"/dns/example/tcp/4001/p2p/rainbow-seed/1", "/dns/example/tcp/4001/p2p/" + pid1.String()},
{"/dns/example/tcp/4001/p2p/rainbow-seed/42", "/dns/example/tcp/4001/p2p/" + pid42.String()},
}

for _, tc := range testCases {
res, err := replaceRainbowSeedWithPeer(tc.input, seed)
assert.NoError(t, err)
assert.Equal(t, tc.output, res)
}
}
Loading

0 comments on commit e259336

Please sign in to comment.