Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: default to bitswap-client-only #136

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Default: 100
### `RAINBOW_PEERING_SHARED_CACHE`

> [!WARNING]
> Experimental feature.
> Experimental feature, will result in increased network I/O due to Bitswap server being run in addition to the lean client.

Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING`
or `RAINBOW_SEED_PEERING`.
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@
Name: "peering-shared-cache",
Value: false,
EnvVars: []string{"RAINBOW_PEERING_SHARED_CACHE"},
Usage: "Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
Usage: "(EXPERIMENTAL: increased network I/O) Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
},
&cli.StringFlag{
Name: "blockstore",
Expand Down Expand Up @@ -360,7 +360,7 @@
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
PeeringCache: cctx.Bool("peering-shared-cache"),
PeeringSharedCache: cctx.Bool("peering-shared-cache"),

Check warning on line 363 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L363

Added line #L363 was not covered by tests
Seed: seed,
SeedIndex: index,
SeedPeering: cctx.Bool("seed-peering"),
Expand Down
109 changes: 7 additions & 102 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ import (
"github.com/dgraph-io/badger/v4/options"
nopfs "github.com/ipfs-shipyard/nopfs"
nopfsipfs "github.com/ipfs-shipyard/nopfs/ipfs"
"github.com/ipfs/boxo/bitswap"
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmspb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
Expand All @@ -27,13 +22,9 @@ import (
"github.com/ipfs/boxo/namesys"
"github.com/ipfs/boxo/path/resolver"
"github.com/ipfs/boxo/peering"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
badger4 "github.com/ipfs/go-ds-badger4"
flatfs "github.com/ipfs/go-ds-flatfs"
delay "github.com/ipfs/go-ipfs-delay"
metri "github.com/ipfs/go-metrics-interface"
mprome "github.com/ipfs/go-metrics-prometheus"
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
Expand Down Expand Up @@ -76,7 +67,7 @@ type Node struct {
dataDir string
datastore datastore.Batching
blockstore blockstore.Blockstore
bs *bitswap.Bitswap
exchange exchange.Interface
bsrv blockservice.BlockService
resolver resolver.Resolver
ns namesys.NameSystem
Expand Down Expand Up @@ -107,8 +98,9 @@ type Config struct {
IpnsMaxCacheTTL time.Duration

DenylistSubs []string
Peering []peer.AddrInfo
PeeringCache bool

Peering []peer.AddrInfo
PeeringSharedCache bool

Seed string
SeedIndex int
Expand Down Expand Up @@ -208,7 +200,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

bswap := setupBitswap(ctx, cfg, h, cr, blkst)
bswap := setupBitswapExchange(ctx, cfg, h, cr, blkst)

err = os.Mkdir(filepath.Join(cfg.DataDir, "denylists"), 0755)
if err != nil && !errors.Is(err, fs.ErrExist) {
Expand All @@ -233,7 +225,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

bsrv := blockservice.New(blkst, &noNotifyExchange{bswap},
bsrv := blockservice.New(blkst, bswap,
// if we are doing things right, our bitswap wantlists should
// not have blocks that we already have (see
// https://github.com/ipfs/boxo/blob/e0d4b3e9b91e9904066a10278e366c9a6d9645c7/blockservice/blockservice.go#L272). Thus
Expand Down Expand Up @@ -269,7 +261,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
blockstore: blkst,
dataDir: cfg.DataDir,
datastore: ds,
bs: bswap,
exchange: bswap,
ns: ns,
vs: vs,
bsrv: bsrv,
Expand Down Expand Up @@ -398,90 +390,3 @@ func setupPeering(cfg Config, h host.Host) error {

return nil
}

func setupBitswap(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) *bitswap.Bitswap {
var (
peerBlockRequestFilter bsserver.PeerBlockRequestFilter
)
if cfg.PeeringCache && len(cfg.Peering) > 0 {
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}

peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}
} else {
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
return false
}
}

bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
bswap := bitswap.New(bsctx, bn, bstore,
// --- Client Options
// default is 1 minute to search for a random live-want (1
// CID). I think we want to search for random live-wants more
// often although probably it overlaps with general
// rebroadcasts.
bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)),
// ProviderSearchDelay: default is 1 second.
bitswap.ProviderSearchDelay(time.Second),
bitswap.WithoutDuplicatedBlockStats(),

// ---- Server Options
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
bitswap.ProvideEnabled(false),
// Do not keep track of other peer's wantlists, we only want to reply if we
// have a block. If we get it later, it's no longer relevant.
bitswap.WithPeerLedger(&noopPeerLedger{}),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
)
bn.Start(bswap)

return bswap
}

type noopPeerLedger struct{}

func (*noopPeerLedger) Wants(p peer.ID, e wl.Entry) {}

func (*noopPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return false
}

func (*noopPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ bsmspb.Message_Wantlist_WantType) {
}

func (*noopPeerLedger) Peers(k cid.Cid) []bsserver.PeerEntry {
return nil
}

func (*noopPeerLedger) CollectPeerIDs() []peer.ID {
return nil
}

func (*noopPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return 0
}

func (*noopPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
return nil
}

func (*noopPeerLedger) ClearPeerWantlist(p peer.ID) {}

func (*noopPeerLedger) PeerDisconnected(p peer.ID) {}

type noNotifyExchange struct {
exchange.Interface
}

func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}
122 changes: 122 additions & 0 deletions setup_bitswap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"time"

"github.com/ipfs/boxo/bitswap"
bsclient "github.com/ipfs/boxo/bitswap/client"
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmspb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
metri "github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)

// --- Client Options
// bitswap.RebroadcastDelay: default is 1 minute to search for a random
// live-want (1 CID). I think we want to search for random live-wants more
// often although probably it overlaps with general rebroadcasts.
rebroadcastDelay := delay.Fixed(10 * time.Second)
// bitswap.ProviderSearchDelay: default is 1 second.
providerSearchDelay := 1 * time.Second

// If peering and shared cache are both enabled, we initialize both a
// Client and a Server with custom request filter and custom options.
// client+server is more expensive but necessary when deployment requires
// serving cached blocks to safelisted peerids
if cfg.PeeringSharedCache && len(cfg.Peering) > 0 {
var peerBlockRequestFilter bsserver.PeerBlockRequestFilter

// Set up request filter to only respond to request for safelisted (peered) nodes
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}

// Initialize client+server
bswap := bitswap.New(bsctx, bn, bstore,
// --- Client Options
bitswap.RebroadcastDelay(rebroadcastDelay),
bitswap.ProviderSearchDelay(providerSearchDelay),
bitswap.WithoutDuplicatedBlockStats(),

// ---- Server Options
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
bitswap.ProvideEnabled(false),
// Do not keep track of other peer's wantlists, we only want to reply if we
// have a block. If we get it later, it's no longer relevant.
bitswap.WithPeerLedger(&noopPeerLedger{}),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
)
bn.Start(bswap)
return &noNotifyExchange{bswap}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I wonder if doing this (making NotifyNewBlocks no-op) has impact on the client. Just to be sure, we do not do this by default.

}

// By default, rainbow runs with bitswap client alone
bswap := bsclient.New(bsctx, bn, bstore,
// --- Client Options
bsclient.RebroadcastDelay(rebroadcastDelay),
bsclient.ProviderSearchDelay(providerSearchDelay),
bsclient.WithoutDuplicatedBlockStats(),
)
bn.Start(bswap)
return bswap
}

type noopPeerLedger struct{}

func (*noopPeerLedger) Wants(p peer.ID, e wl.Entry) {}

func (*noopPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return false
}

func (*noopPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ bsmspb.Message_Wantlist_WantType) {
}

func (*noopPeerLedger) Peers(k cid.Cid) []bsserver.PeerEntry {
return nil

Check warning on line 96 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}

func (*noopPeerLedger) CollectPeerIDs() []peer.ID {
return nil

Check warning on line 100 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}

func (*noopPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return 0
}

func (*noopPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
return nil

Check warning on line 108 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L107-L108

Added lines #L107 - L108 were not covered by tests
}

func (*noopPeerLedger) ClearPeerWantlist(p peer.ID) {}

Check warning on line 111 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L111

Added line #L111 was not covered by tests

func (*noopPeerLedger) PeerDisconnected(p peer.ID) {}

Check warning on line 113 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L113

Added line #L113 was not covered by tests

type noNotifyExchange struct {
exchange.Interface
}

func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}
4 changes: 2 additions & 2 deletions setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func mustPeeredNodes(t *testing.T, configuration [][]int, peeringShareCache bool
RoutingV1Endpoints: []string{},
ListenAddrs: []string{mas[i].String()},
Peering: []peer.AddrInfo{},
PeeringCache: peeringShareCache,
PeeringSharedCache: peeringShareCache,
}

for _, j := range configuration[i] {
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestPeering(t *testing.T) {
}, false)
}

func TestPeeringCache(t *testing.T) {
func TestPeeringSharedCache(t *testing.T) {
nodes := mustPeeredNodes(t, [][]int{
{1}, // 0 peered to 1
{0}, // 1 peered to 0
Expand Down
Loading