Skip to content

Commit

Permalink
Feat: expose BlockKeyCacheSize and enable WriteThrough when bloom fil…
Browse files Browse the repository at this point in the history
…ter disabled
  • Loading branch information
hsanjuan committed Dec 6, 2024
1 parent 433444b commit cd96506
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 23 deletions.
5 changes: 3 additions & 2 deletions config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type Datastore struct {

Spec map[string]interface{}

HashOnRead bool
BloomFilterSize int
HashOnRead bool
BloomFilterSize int
BlockKeyCacheSize int
}

// DataStorePath returns the default data store path given a configuration root
Expand Down
1 change: 1 addition & 0 deletions config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func DefaultDatastoreConfig() Datastore {
StorageGCWatermark: 90, // 90%
GCPeriod: "1h",
BloomFilterSize: 0,
BlockKeyCacheSize: 64 << 10, // 64KiB
Spec: flatfsSpec(),
}
}
Expand Down
9 changes: 8 additions & 1 deletion core/commands/dag/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())
batch := ipld.NewBatch(req.Context, api.Dag(),
// 128 file descriptors needed in flatfs
ipld.MaxNodesBatchOption(128),
// 100MiB. When setting block size to 1MiB, we can add 100
// nodes maximum. With default 256KiB block-size, we will hit
// the max nodes limit at 32MiB.
ipld.MaxSizeBatchOption(100<<20),
)

roots := cid.NewSet()
var blockCount, blockBytesCount uint64
Expand Down
17 changes: 11 additions & 6 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
return nil
}

if settings.Offline {
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}

if settings.Offline {
cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
cs = node.DefaultIpnsCacheSize
Expand Down Expand Up @@ -244,7 +244,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e

if settings.Offline || !settings.FetchBlocks {
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
var bsopts []bserv.Option
// If bloom filter disable, do not do Has() when writing.
if cfg.Datastore.BloomFilterSize == 0 {
bsopts = append(bsopts, bserv.WriteThrough())
}
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange, bsopts...)
subAPI.dag = dag.NewDAGService(subAPI.blocks)
}

Expand Down
7 changes: 6 additions & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
pinning = nil // pinner will never be used
}

bserv := blockservice.New(addblockstore, exch) // hash security 001
var bsopts []blockservice.Option
// If bloom filter disabled, do not do Has() when writing.
if cfg.Datastore.BloomFilterSize == 0 {
bsopts = append(bsopts, blockservice.WriteThrough())
}
bserv := blockservice.New(addblockstore, exch, bsopts...) // hash security 001
dserv := merkledag.NewDAGService(bserv)

// add a sync call to the DagService
Expand Down
27 changes: 19 additions & 8 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,32 @@ import (
dagpb "github.com/ipld/go-codec-dagpb"
"go.uber.org/fx"

"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)

// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem)
func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
var opts []blockservice.Option
// If bloom filter is disabled, do not do Has() when writing.
// We defer to the datastore how to handle this efficiently,
// but we cannot assume that triggering Reads for every white
// is fine.
if cfg.Datastore.BloomFilterSize == 0 {
opts = append(opts, blockservice.WriteThrough())
}
bsvc := blockservice.New(bs, rem, opts...)

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return bsvc
return bsvc
}
}

// Pinning creates new pinner which tells GC which blocks should be kept
Expand Down
6 changes: 3 additions & 3 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
cacheOpts.HasTwoQueueCacheSize = cfg.Datastore.BlockKeyCacheSize
if !bcfg.Permanent {
cacheOpts.HasBloomFilterSize = 0
}
Expand All @@ -201,7 +202,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
return fx.Options(
fx.Provide(RepoConfig),
fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead)),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead, cfg.Datastore.BloomFilterSize == 0)),
finalBstore,
)
}
Expand Down Expand Up @@ -332,7 +333,6 @@ func Offline(cfg *config.Config) fx.Option {

// Core groups basic IPFS services
var Core = fx.Options(
fx.Provide(BlockService),
fx.Provide(Dag),
fx.Provide(FetcherConfig),
fx.Provide(PathResolverConfig),
Expand Down Expand Up @@ -387,7 +387,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
Identity(cfg),
IPNS,
Networked(bcfg, cfg, userResourceOverrides),

fx.Provide(BlockService(cfg)),
Core,
)
}
8 changes: 6 additions & 2 deletions core/node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ func Datastore(repo repo.Repo) datastore.Datastore {
type BaseBlocks blockstore.Blockstore

// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool, writeThrough bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
// hash security
bs = blockstore.NewBlockstore(repo.Datastore())
var opts []blockstore.Option
if writeThrough {
opts = append(opts, blockstore.WriteThrough())
}
bs = blockstore.NewBlockstore(repo.Datastore(), opts...)
bs = &verifbs.VerifBS{Blockstore: bs}
bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions docs/changelogs/v0.33.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ If you depended on removed ones, please fill an issue to add them to the upstrea

Onboarding files and directories with `ipfs add --to-files` now requires non-empty names. due to this, The `--to-files` and `--wrap` options are now mutually exclusive ([#10612](https://github.com/ipfs/kubo/issues/10612)).

#### New `Datastore.BlockKeyCacheSize` option and write-through blockstore/blockservice

This option controls the size of a blockstore caching layer that records whether the blockstore has certain block and their sizes (not the contents). This was previously an internal option. It is set by default to 64KiB.
This caching layer can be disabled by setting it to `0`. This option is similar to the existing `BloomFilterSize`, which creates another bloom-filter-based wrapper on the blockstore.

Additionally, setting `BloomFilterSize` to `0` will now trigger the use of "writethrough" blockservice and blockstores. Usually, the blockservice and the blockstore perform an `Has()` call on the underlying datastore before writing any block. If the datastore already has the block, they can skip the operation. In the "writethrough" mode, this does not happen. The reasoning is:

* If the bloom filter is disabled, at least the first `Has()` call will hit the datastore. Some datastore incurr a large penalty for `Has()` (i.e. flatfs must open folders and do listing, S3 will need a call upstream...).
* Some datastore like Pebble already include their own bloom filter and caching layers we should not duplicate such layers on top.
* Some datastores are very fast to write but incurr in read-amplification. Calling `Has()` for every block reduces data ingest performance when batching multiple blocks.
* Calling `Has()` can cause eviction of blocks from read-caches when writing.

For users trying Pebble as a datastore backend, they will be usually better off disabling all Kubo's key-caching and bloom filter, and thus enabling direct writes to Pebble. In general, we want to give users the freedom to play with these settings and find what works best for their workloads and backends.

#### 📦️ Dependency updates

- update `boxo` to [v0.24.TODO](https://github.com/ipfs/boxo/releases/tag/v0.24.TODO)
Expand Down
13 changes: 13 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,19 @@ Default: `0` (disabled)

Type: `integer` (non-negative, bytes)

### `Datastore.BlockKeyCacheSize`

A number representing the maximum size in bytes of the blockstore's Two-Queue
cache, which caches block-cids and their block-sizes. Use `0` to disable.

This cache, once primed, can greatly speed up operations like `ipfs repo stat`
as there is no need to read full blocks to know their sizes. Size should be
adjusted depending on the number of CIDs on disk.

Default: `65536` (64KiB)

Type: `integer` (non-negative, bytes)

### `Datastore.Spec`

Spec defines the structure of the ipfs datastore. It is a composable structure,
Expand Down

0 comments on commit cd96506

Please sign in to comment.