Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: expose BlockKeyCacheSize and enable WriteThrough datastore options #10614

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
23 changes: 19 additions & 4 deletions config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,21 @@ import (
"encoding/json"
)

// DefaultDataStoreDirectory is the directory to store all the local IPFS data.
const DefaultDataStoreDirectory = "datastore"
const (
// DefaultDataStoreDirectory is the directory to store all the local IPFS data.
DefaultDataStoreDirectory = "datastore"

// DefaultBlockKeyCacheSize is the size for the blockstore two-queue
// cache which caches block keys and sizes.
DefaultBlockKeyCacheSize = 64 << 10

// DefaultWriteThrough specifies whether to use a "write-through"
// Blockstore and Blockservice. This means that they will write
// without performing any reads to check if the incoming blocks are
// already present in the datastore. Enable for datastores with fast
// writes and slower reads.
DefaultWriteThrough = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(if we switch to Flag)

Suggested change
DefaultWriteThrough = true
DefaultWriteThrough = True

)

// Datastore tracks the configuration of the datastore.
type Datastore struct {
Expand All @@ -21,8 +34,10 @@ type Datastore struct {

Spec map[string]interface{}

HashOnRead bool
BloomFilterSize int
HashOnRead bool
BloomFilterSize int
BlockKeyCacheSize OptionalInteger `json:",omitempty"`
WriteThrough bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hsanjuan are you planning to write a migration for existing users (who don't have this in config), or was it intentional to keep existing users stuck with false?

Both feel like a potential headache :)

Perhaps we should switch this from bool to Flag would allow us to control implicit default better, without having to write migration for existing users?

Suggested change
WriteThrough bool
WriteThrough Flag `json:",omitempty"`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the option is not present, it will take the default which is true? Or you mean the config-parsing code does not know if its false or unset?

}

// DataStorePath returns the default data store path given a configuration root
Expand Down
11 changes: 11 additions & 0 deletions config/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ const (
DefaultUnixFSRawLeaves = false
DefaultUnixFSChunker = "size-262144"
DefaultHashFunction = "sha2-256"

// DefaultBatchMaxNodes controls the maximum number of nodes in a
// write-batch. The total size of the batch is limited by
// BatchMaxnodes and BatchMaxSize.
DefaultBatchMaxNodes = 128
// DefaultBatchMaxSize controls the maximum size of a single
// write-batch. The total size of the batch is limited by
// BatchMaxnodes and BatchMaxSize.
DefaultBatchMaxSize = 100 << 20 // 20MiB
)

// Import configures the default options for ingesting data. This affects commands
Expand All @@ -14,4 +23,6 @@ type Import struct {
UnixFSRawLeaves Flag
UnixFSChunker OptionalString
HashFunction OptionalString
BatchMaxNodes OptionalInteger
BatchMaxSize OptionalInteger
}
1 change: 1 addition & 0 deletions config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func DefaultDatastoreConfig() Datastore {
GCPeriod: "1h",
BloomFilterSize: 0,
Spec: flatfsSpec(),
WriteThrough: DefaultWriteThrough,
}
}

Expand Down
15 changes: 14 additions & 1 deletion core/commands/dag/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
cmds "github.com/ipfs/go-ipfs-cmds"
ipld "github.com/ipfs/go-ipld-format"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/coreiface/options"
gocarv2 "github.com/ipld/go-car/v2"

Expand All @@ -24,6 +25,11 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}

cfg, err := node.Repo.Config()
if err != nil {
return err
}

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
Expand Down Expand Up @@ -55,7 +61,14 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())
batch := ipld.NewBatch(req.Context, api.Dag(),
// Default: 128. Means 128 file descriptors needed in flatfs
ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))),
// Default 100MiB. When setting block size to 1MiB, we can add
// ~100 nodes maximum. With default 256KiB block-size, we will
// hit the max nodes limit at 32MiB.p
ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))),
)

roots := cid.NewSet()
var blockCount, blockBytesCount uint64
Expand Down
16 changes: 10 additions & 6 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
return nil
}

if settings.Offline {
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}

if settings.Offline {
cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
cs = node.DefaultIpnsCacheSize
Expand Down Expand Up @@ -244,7 +244,11 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e

if settings.Offline || !settings.FetchBlocks {
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
var bsopts []bserv.Option
if cfg.Datastore.WriteThrough {
bsopts = append(bsopts, bserv.WriteThrough())
}
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange, bsopts...)
Comment on lines +247 to +251
Copy link
Contributor

@gammazero gammazero Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok, but is might be nicer to have a blockservice option WithWriteThrough that takes a bool. Then code becomes:

subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange,
    bserv.WithWriteThrough(cfg.Datastore.WriteThrough))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right? We need to change that in boxo. I can take care, but we can merge this and fix that later for next boxo release.

subAPI.dag = dag.NewDAGService(subAPI.blocks)
}

Expand Down
6 changes: 5 additions & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
pinning = nil // pinner will never be used
}

bserv := blockservice.New(addblockstore, exch) // hash security 001
var bsopts []blockservice.Option
if cfg.Datastore.WriteThrough {
bsopts = append(bsopts, blockservice.WriteThrough())
}
bserv := blockservice.New(addblockstore, exch, bsopts...) // hash security 001
dserv := merkledag.NewDAGService(bserv)

// add a sync call to the DagService
Expand Down
23 changes: 15 additions & 8 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,28 @@ import (
dagpb "github.com/ipld/go-codec-dagpb"
"go.uber.org/fx"

"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)

// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem)
func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
var opts []blockservice.Option
if cfg.Datastore.WriteThrough {
opts = append(opts, blockservice.WriteThrough())
}
bsvc := blockservice.New(bs, rem, opts...)

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return bsvc
return bsvc
}
}

// Pinning creates new pinner which tells GC which blocks should be kept
Expand Down
6 changes: 3 additions & 3 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
cacheOpts.HasTwoQueueCacheSize = int(cfg.Datastore.BlockKeyCacheSize.WithDefault(config.DefaultBlockKeyCacheSize))
if !bcfg.Permanent {
cacheOpts.HasBloomFilterSize = 0
}
Expand All @@ -201,7 +202,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
return fx.Options(
fx.Provide(RepoConfig),
fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead)),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead, cfg.Datastore.WriteThrough)),
finalBstore,
)
}
Expand Down Expand Up @@ -332,7 +333,6 @@ func Offline(cfg *config.Config) fx.Option {

// Core groups basic IPFS services
var Core = fx.Options(
fx.Provide(BlockService),
fx.Provide(Dag),
fx.Provide(FetcherConfig),
fx.Provide(PathResolverConfig),
Expand Down Expand Up @@ -387,7 +387,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
Identity(cfg),
IPNS,
Networked(bcfg, cfg, userResourceOverrides),

fx.Provide(BlockService(cfg)),
Core,
)
}
8 changes: 6 additions & 2 deletions core/node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ func Datastore(repo repo.Repo) datastore.Datastore {
type BaseBlocks blockstore.Blockstore

// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool, writeThrough bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
// hash security
bs = blockstore.NewBlockstore(repo.Datastore())
var opts []blockstore.Option
if writeThrough {
opts = append(opts, blockstore.WriteThrough())
}
bs = blockstore.NewBlockstore(repo.Datastore(), opts...)
bs = &verifbs.VerifBS{Blockstore: bs}
bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions docs/changelogs/v0.33.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ If you depended on removed ones, please fill an issue to add them to the upstrea

Onboarding files and directories with `ipfs add --to-files` now requires non-empty names. due to this, The `--to-files` and `--wrap` options are now mutually exclusive ([#10612](https://github.com/ipfs/kubo/issues/10612)).

#### New datastore options for faster writes: `WriteThrough`, `BlockKeyCacheSize`

Now that Kubo supports [`pebble`](../datastores.md#pebbleds) as a datastore backend, it becomes very useful to expose some additional configuration options for how the blockservice/blockstore/datastore combo behaves.

Usually, LSM-tree based datastore like Pebble or Badger have very fast write performance (blocks are streamed to disk) while incurring in read-amplification penalties (blocks need to be looked up in the index to know where they are on disk). Prior to this version, `BlockService` and `Blockstore` implementations performed a `Has(cid)` for every block that was going to be written, skipping the writes altogether if the block was already present in the datastore.

The performance impact of this `Has()` call can vary. The `Datastore` implementation might include block-caching and things like bloom-filters to speed up lookups and mitigate read-penalties. Our `Blockstore` implementation also includes a bloom-filter (controlled by `BloomFilterSize`, and disabled by default), and a two-queue cache for keys and block sizes. If we assume that most of the blocks added to Kubo are new blocks, not already present in the datastore, or that the datastore itself includes mechanisms to optimize writes and avoid writing the same data twice, the calls to `Has()` at both BlockService and Blockstore layers seem superflous and we have seen it harm performance when importing large amounts of data.

For these reasons, from now on, the default is to use "write through" implementation of Blockservice/Blockstore. We have added a new option `Datastore.WriteThrough`, which defaults to `true`. Previous behaviour can be obtained by switching it to `false`.

We have additionally made the size of the two-queue blockstore cache with another option: `Datastore.BlockKeyCacheSize` which defaults to `65536` (64KiB). This option does not appear on the configuration by default, but it can be set manually and also allows to disable this caching layer by setting it to `0`.

This option controls the size of a blockstore caching layer that records whether the blockstore has certain block and their sizes (not the contents). This was previously an internal option. It is set by default to 64KiB.
This caching layer can be disabled by setting it to `0`. This option is similar to the existing `BloomFilterSize`, which creates another bloom-filter-based wrapper on the blockstore.

As a reminder, details from all the options are explained in the [configuration documentation](../config.md).

We recommend users trying Pebble as a datastore backend to disable both blockstore bloom-filter and key caching layers and enable write through as a way to evaluate the raw performance of the underlying datastore, which includes its own bloom-filter and caching layers (default cache size is `8MiB` and can be configured in the [options](../datastores.md#pebbleds).


#### 📦️ Dependency updates

- update `boxo` to [v0.25.0](https://github.com/ipfs/boxo/releases/tag/v0.25.0)
Expand Down
58 changes: 58 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ config file at runtime.
- [`Datastore.GCPeriod`](#datastoregcperiod)
- [`Datastore.HashOnRead`](#datastorehashonread)
- [`Datastore.BloomFilterSize`](#datastorebloomfiltersize)
- [`Datastore.WriteTrhough`](#datastorewritethrough)
- [`Datastore.BlockKeyCacheSize`](#datastoreblockkeycachesize)
- [`Datastore.Spec`](#datastorespec)
- [`Discovery`](#discovery)
- [`Discovery.MDNS`](#discoverymdns)
Expand Down Expand Up @@ -176,6 +178,8 @@ config file at runtime.
- [`Import.UnixFSRawLeaves`](#importunixfsrawleaves)
- [`Import.UnixFSChunker`](#importunixfschunker)
- [`Import.HashFunction`](#importhashfunction)
- [`Import.BatchMaxNodes`](#importbatchmaxnodes)
- [`Import.BatchMaxSize`](#importbatchmaxsize)
- [`Version`](#version)
- [`Version.AgentSuffix`](#versionagentsuffix)
- [`Version.SwarmCheckEnabled`](#versionswarmcheckenabled)
Expand Down Expand Up @@ -629,10 +633,48 @@ we'd want to use 1199120 bytes. As of writing, [7 hash
functions](https://github.com/ipfs/go-ipfs-blockstore/blob/547442836ade055cc114b562a3cc193d4e57c884/caching.go#L22)
are used, so the constant `k` is 7 in the formula.

Enabling the BloomFilter can provide performance improvements specially when
responding to many requests for inexistant blocks. It however requires a full
sweep of all the datastore keys on daemon start. On very large datastores this
can be a very taxing operation, particulary if the datastore does not support
querying existing keys without reading their values at the same time (blocks).

Default: `0` (disabled)

Type: `integer` (non-negative, bytes)

### `Datastore.WriteThrough`

This option controls whether a block that already exist in the datastore
should be written to it. When set to `false`, a `Has()` call is performed
against the datastore prior to writing every block. If the block is already
stored, the write is skipped. This check happens both on the Blockservice and
the Blockstore layers and this setting affects both.

When set to `true`, no checks are performed and blocks are written to the
datastore, which depending on the implementation may perform its own checks.

This option can affect performance and the strategy should be taken in
conjunction with [`BlockKeyCacheSize`](#datastoreblockkeycachesize) and
[`BloomFilterSize`](#datastoreboomfiltersize`).

Default: `true`

Type: `bool`

### `Datastore.BlockKeyCacheSize`

A number representing the maximum size in bytes of the blockstore's Two-Queue
cache, which caches block-cids and their block-sizes. Use `0` to disable.

This cache, once primed, can greatly speed up operations like `ipfs repo stat`
as there is no need to read full blocks to know their sizes. Size should be
adjusted depending on the number of CIDs on disk (`NumObjects in `ipfs repo stat`).

Default: `65536` (64KiB)

Type: `optionalInteger` (non-negative, bytes)

### `Datastore.Spec`

Spec defines the structure of the ipfs datastore. It is a composable structure,
Expand Down Expand Up @@ -2421,6 +2463,22 @@ Default: `sha2-256`

Type: `optionalString`

### `Import.BatchMaxNodes

The maximum number of nodes in a write-batch. The total size of the batch is limited by `BatchMaxnodes` and `BatchMaxSize`.

Default: `128`

Type: `optionalInteger`

### `Import.BatchMaxSize`

The maximum size of a single write-batch (computed as the sum of the sizes of the blocks). The total size of the batch is limited by `BatchMaxnodes` and `BatchMaxSize`.

Default: `20971520` (20MiB)

Type: `optionalInteger`

## `Version`

Options to configure agent version announced to the swarm, and leveraging
Expand Down
Loading