Skip to content

Commit

Permalink
pageserver: add bench_ingest (#7409)
Browse files Browse the repository at this point in the history
## Problem

We lack a rust bench for the inmemory layer and delta layer write paths:
it is useful to benchmark these components independent of postgres & WAL
decoding.

Related: #8452

## Summary of changes

- Refactor DeltaLayerWriter to avoid carrying a Timeline, so that it can
be cleanly tested + benched without a Tenant/Timeline test harness. It
only needed the Timeline for building `Layer`, so this can be done in a
separate step.
- Add `bench_ingest`, which exercises a variety of workload "shapes"
(big values, small values, sequential keys, random keys)
- Include a small uncontroversial optimization: in `freeze`, only
exhaustively walk values to assert ordering relative to end_lsn in debug
mode.

These benches are limited by drive performance on a lot of machines, but
still useful as a local tool for iterating on CPU/memory improvements
around this code path.

Anecdotal measurements on Hetzner AX102 (Ryzen 7950xd):

```

ingest-small-values/ingest 128MB/100b seq
                        time:   [1.1160 s 1.1230 s 1.1289 s]
                        thrpt:  [113.38 MiB/s 113.98 MiB/s 114.70 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low mild
Benchmarking ingest-small-values/ingest 128MB/100b rand: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 18.9s.
ingest-small-values/ingest 128MB/100b rand
                        time:   [1.9001 s 1.9056 s 1.9110 s]
                        thrpt:  [66.982 MiB/s 67.171 MiB/s 67.365 MiB/s]
Benchmarking ingest-small-values/ingest 128MB/100b rand-1024keys: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 11.0s.
ingest-small-values/ingest 128MB/100b rand-1024keys
                        time:   [1.0715 s 1.0828 s 1.0937 s]
                        thrpt:  [117.04 MiB/s 118.21 MiB/s 119.46 MiB/s]
ingest-small-values/ingest 128MB/100b seq, no delta
                        time:   [425.49 ms 429.07 ms 432.04 ms]
                        thrpt:  [296.27 MiB/s 298.32 MiB/s 300.83 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low mild

ingest-big-values/ingest 128MB/8k seq
                        time:   [373.03 ms 375.84 ms 379.17 ms]
                        thrpt:  [337.58 MiB/s 340.57 MiB/s 343.13 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild
ingest-big-values/ingest 128MB/8k seq, no delta
                        time:   [81.534 ms 82.811 ms 83.364 ms]
                        thrpt:  [1.4994 GiB/s 1.5095 GiB/s 1.5331 GiB/s]
Found 1 outliers among 10 measurements (10.00%)


```
  • Loading branch information
jcsp authored Aug 6, 2024
1 parent 3727c6f commit ca5390a
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 75 deletions.
4 changes: 4 additions & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ harness = false
[[bench]]
name = "bench_walredo"
harness = false

[[bench]]
name = "bench_ingest"
harness = false
235 changes: 235 additions & 0 deletions pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use std::{env, num::NonZeroUsize};

use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{criterion_group, criterion_main, Criterion};
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
l0_flush::{L0FlushConfig, L0FlushGlobalState},
page_cache,
repository::Value,
task_mgr::TaskKind,
tenant::storage_layer::InMemoryLayer,
virtual_file::{self, api::IoEngineKind},
};
use pageserver_api::{key::Key, shard::TenantShardId};
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
};

// A very cheap hash for generating non-sequential keys.
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h = h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}

enum KeyLayout {
/// Sequential unique keys
Sequential,
/// Random unique keys
Random,
/// Random keys, but only use the bits from the mask of them
RandomReuse(u32),
}

enum WriteDelta {
Yes,
No,
}

async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) -> anyhow::Result<()> {
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);

let timeline_id = TimelineId::generate();
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);

tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;

let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;

let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
let ctx = RequestContext::new(
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
pageserver::context::DownloadBehavior::Download,
);

for i in 0..put_count {
lsn += put_size as u64;

// Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people
// usually care the most about write performance when they're blasting a huge batch of data into a huge table.
match key_layout {
KeyLayout::Sequential => {
// Use sequential order to illustrate the experience a user is likely to have
// when ingesting bulk data.
key.field6 = i as u32;
}
KeyLayout::Random => {
// Use random-order keys to avoid giving a false advantage to data structures that are
// faster when inserting on the end.
key.field6 = murmurhash32(i as u32);
}
KeyLayout::RandomReuse(mask) => {
// Use low bits only, to limit cardinality
key.field6 = murmurhash32(i as u32) & mask;
}
}

layer.put_value(key, lsn, &data, &ctx).await?;
}
layer.freeze(lsn + 1).await;

if matches!(write_delta, WriteDelta::Yes) {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;
}

Ok(())
}

/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

runtime.block_on(async move {
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
if let Err(e) = r {
panic!("{e:?}");
}
});
}

/// Declare a series of benchmarks for the Pageserver's ingest write path.
///
/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either
/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set).
///
/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on
/// a fast disk, CPU is the bottleneck at time of writing.
fn criterion_benchmark(c: &mut Criterion) {
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap();
eprintln!("Data directory: {}", temp_dir.path());

let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(16384, IoEngineKind::TokioEpollUring);
page_cache::init(conf.page_cache_size);

{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}

{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
4 changes: 2 additions & 2 deletions pageserver/src/l0_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Default for L0FlushConfig {
#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);

pub(crate) enum Inner {
pub enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
Expand All @@ -40,7 +40,7 @@ impl L0FlushGlobalState {
}
}

pub(crate) fn inner(&self) -> &Arc<Inner> {
pub fn inner(&self) -> &Arc<Inner> {
&self.0
}
}
Expand Down
Loading

1 comment on commit ca5390a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2200 tests run: 2119 passed, 0 failed, 81 skipped (full report)


Code coverage* (full report)

  • functions: 32.8% (7154 of 21803 functions)
  • lines: 50.5% (57753 of 114293 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
ca5390a at 2024-08-06T18:38:30.194Z :recycle:

Please sign in to comment.