Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add async walredo mode (disabled-by-default, opt-in via config) #6548

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
2c1652a
WIP: async walredo
problame Jan 27, 2024
a93be15
remove wal_redo_timeout
problame Jan 31, 2024
8012b80
some cleanup work
problame Jan 31, 2024
2736f61
error handling
problame Jan 31, 2024
639ed3c
clippy + compile errors
problame Jan 31, 2024
a29ac8b
clippy (again?)
problame Jan 31, 2024
4160d40
cfg(testing) still needs io::Write
problame Jan 31, 2024
70b37cf
WIP poison
problame Jan 31, 2024
b1b8ca3
working impl
problame Jan 31, 2024
9641374
move `poison` to `utils` and document
problame Jan 31, 2024
0cf5619
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 13, 2024
cd6d9ab
WIP: throughput-oriented walredo benchmark
problame Mar 15, 2024
f31f2e9
finish benchmark impl (switch to criterion)
problame Mar 20, 2024
c853c61
replace bench_walredo with my impl
problame Mar 20, 2024
f038304
minimize diff
problame Mar 20, 2024
80de856
Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchma…
problame Mar 20, 2024
48b22bd
walredo: better benchmark
problame Mar 20, 2024
929423c
add i3en.3xlarge reference numbers
problame Mar 20, 2024
081af38
Merge branch 'main' into problame/async-walredo/better-benchmark
problame Mar 20, 2024
a37d713
Merge branch 'main' into problame/async-walredo/better-benchmark
problame Mar 20, 2024
8677136
Merge branch 'problame/async-walredo/better-benchmark' into problame/…
problame Mar 20, 2024
15cfa7b
apply review suggestions
problame Mar 21, 2024
db3333e
yield after ever redo execution
problame Mar 21, 2024
d6c4562
update numbers (the yield makes a big difference, who would have thun…
problame Mar 21, 2024
c6a74bd
Merge branch 'problame/async-walredo/better-benchmark' into problame/…
problame Mar 21, 2024
a21409b
measure results
problame Mar 21, 2024
b2f5b84
cargo fmt
problame Mar 21, 2024
e669b6d
Merge branch 'problame/async-walredo/better-benchmark' into problame/…
problame Mar 21, 2024
86b0df9
apply review suggestion https://github.com/neondatabase/neon/pull/719…
problame Mar 21, 2024
3a5994b
Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchma…
problame Mar 21, 2024
3dfc7de
use chrono::DateTime for Poisoned errors
problame Mar 21, 2024
655d3b6
audit for cancellation-safety
problame Mar 21, 2024
cca66e5
HACK: restore old impl, make runtime configurable (how to: reconfigur…
problame Mar 22, 2024
67a7abc
make the default process kind runtime-configurable, and switch to sync
problame Apr 3, 2024
c77ce7c
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Apr 3, 2024
31d4d1e
env_config from PR #6125
problame Apr 5, 2024
43cf9d1
env_config improvements
problame Apr 5, 2024
dc03f7a
pageserver: ability to use a single runtime
problame Apr 5, 2024
3779854
rename "single runtime" to "one runtime", allow configuring current_t…
problame Apr 5, 2024
5cf45df
remove env_config::Bool
problame Apr 5, 2024
740efb0
cleanup
problame Apr 5, 2024
6b820bb
fixup env var value parsing
problame Apr 5, 2024
70fb7e3
metric, useful for rollout / analyzing grafana metrics
problame Apr 5, 2024
edd7f69
make current_thread mode work
problame Apr 5, 2024
871a3ca
change thread name
problame Apr 5, 2024
dc8e318
fix copy-pasta
problame Apr 5, 2024
aa5439c
Merge remote-tracking branch 'origin/main' into problame/configurable…
problame Apr 8, 2024
5efadde
Merge remote-tracking branch 'origin/problame/configurable-one-runtim…
problame Apr 8, 2024
b72891d
Revert "make the default process kind runtime-configurable, and switc…
problame Apr 8, 2024
c38b3e6
Revert "HACK: restore old impl, make runtime configurable (how to: re…
problame Apr 8, 2024
d8a9266
tokio-test not necessary
problame Apr 8, 2024
ffef90f
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Apr 8, 2024
4ef2fb2
bring back wal_redo_timeout
problame Apr 8, 2024
bea2e12
Revert "Revert "HACK: restore old impl, make runtime configurable (ho…
problame Apr 8, 2024
f489a10
fixup: re-apply bring-back of wal_redo_timeout changes after file mov…
problame Apr 8, 2024
825c0e3
Revert "Revert "make the default process kind runtime-configurable, a…
problame Apr 8, 2024
c28ed6a
HACK: set walredo process kind metric on startup
problame Apr 9, 2024
845f2ea
adjust bench for both sync and async benchmarking
problame Apr 9, 2024
6f236e8
benchmark numbers
problame Apr 9, 2024
99c20c5
cleanups around metric
problame Apr 12, 2024
883a071
expose kind in tenant status
problame Apr 12, 2024
644f7f9
add failing test to ensure walredo config option works
problame Apr 12, 2024
237f27a
also assert metric is set
problame Apr 12, 2024
4a26245
remove runtime reconfiguration capability + assert a bit more (can pa…
problame Apr 12, 2024
f334235
address https://github.com/neondatabase/neon/pull/6548#discussion_r15…
problame Apr 15, 2024
005dcbd
simplify around ProcessKind enum type; addresses https://github.com/n…
problame Apr 15, 2024
18c4b35
indentation
problame Apr 15, 2024
df5feb7
fixup(005dcbd6a89f06db2577edfb51d3aea0f287d491): bench_walredo
problame Apr 15, 2024
b6e168b
fixup(4a26245d993a840ec36942e4ebab476e6d8524aa): sometimes bench runs…
problame Apr 15, 2024
fb11c39
rerun benchmark
problame Apr 15, 2024
b311615
also level = DEBUG the process_std
problame Apr 15, 2024
bd53ab8
rerun benches
problame Apr 15, 2024
989de61
undo level = DEBUG and re-run benchmarks
problame Apr 15, 2024
4fd26c2
fixup: empty line
problame Apr 15, 2024
cecc9bc
Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchma…
problame Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,18 @@ pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalRedoManagerProcessStatus {
pub pid: u32,
/// The strum-generated `into::<&'static str>()` for `pageserver::walredo::ProcessKind`.
/// `ProcessKind` are a transitory thing, so, they have no enum representation in `pageserver_api`.
pub kind: Cow<'static, str>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalRedoManagerStatus {
pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
pub pid: Option<u32>,
pub process: Option<WalRedoManagerProcessStatus>,
}

/// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
Expand Down
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub mod zstd;

pub mod env;

pub mod poison;

/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:
Expand Down
121 changes: 121 additions & 0 deletions libs/utils/src/poison.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//! Protect a piece of state from reuse after it is left in an inconsistent state.
//!
//! # Example
//!
//! ```
//! # tokio_test::block_on(async {
//! use utils::poison::Poison;
//! use std::time::Duration;
//!
//! struct State {
//! clean: bool,
//! }
//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true }));
//!
//! let mut mutex_guard = state.lock().await;
//! let mut poison_guard = mutex_guard.check_and_arm()?;
//! let state = poison_guard.data_mut();
//! state.clean = false;
//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail.
//! tokio::time::sleep(Duration::from_secs(10)).await;
//! state.clean = true;
//! poison_guard.disarm();
//! # Ok::<(), utils::poison::Error>(())
//! # });
//! ```
use tracing::warn;

pub struct Poison<T> {
what: &'static str,
state: State,
data: T,
}

#[derive(Clone, Copy)]
enum State {
Clean,
Armed,
Poisoned { at: chrono::DateTime<chrono::Utc> },
}

impl<T> Poison<T> {
/// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed.
pub fn new(what: &'static str, data: T) -> Self {
Self {
what,
state: State::Clean,
data,
}
}

/// Check for poisoning and return a [`Guard`] that provides access to the wrapped state.
pub fn check_and_arm(&mut self) -> Result<Guard<T>, Error> {
match self.state {
State::Clean => {
self.state = State::Armed;
Ok(Guard(self))
}
State::Armed => unreachable!("transient state"),
State::Poisoned { at } => Err(Error::Poisoned {
what: self.what,
at,
}),
}
}
}

/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
/// Once modifications are done, use [`Self::disarm`].
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned
/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error.
pub struct Guard<'a, T>(&'a mut Poison<T>);

impl<'a, T> Guard<'a, T> {
pub fn data(&self) -> &T {
&self.0.data
}
pub fn data_mut(&mut self) -> &mut T {
&mut self.0.data
}

pub fn disarm(self) {
match self.0.state {
State::Clean => unreachable!("we set it to Armed in check_and_arm()"),
State::Armed => {
self.0.state = State::Clean;
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state: {at}")
}
}
}
}

impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
match self.0.state {
State::Clean => {
// set by disarm()
}
State::Armed => {
// still armed => poison it
let at = chrono::Utc::now();
self.0.state = State::Poisoned { at };
warn!(at=?at, "poisoning {}", self.0.what);
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state: {at}")
}
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("poisoned at {at}: {what}")]
Poisoned {
what: &'static str,
at: chrono::DateTime<chrono::Utc>,
},
}
147 changes: 97 additions & 50 deletions pageserver/benches/bench_walredo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,50 @@
//!
//! # Reference Numbers
//!
//! 2024-04-04 on i3en.3xlarge
//! 2024-04-15 on i3en.3xlarge
//!
//! ```text
//! short/1 time: [25.925 µs 26.060 µs 26.209 µs]
//! short/2 time: [31.277 µs 31.483 µs 31.722 µs]
//! short/4 time: [45.496 µs 45.831 µs 46.182 µs]
//! short/8 time: [84.298 µs 84.920 µs 85.566 µs]
//! short/16 time: [185.04 µs 186.41 µs 187.88 µs]
//! short/32 time: [385.01 µs 386.77 µs 388.70 µs]
//! short/64 time: [770.24 µs 773.04 µs 776.04 µs]
//! short/128 time: [1.5017 ms 1.5064 ms 1.5113 ms]
//! medium/1 time: [106.65 µs 107.20 µs 107.85 µs]
//! medium/2 time: [153.28 µs 154.24 µs 155.56 µs]
//! medium/4 time: [325.67 µs 327.01 µs 328.71 µs]
//! medium/8 time: [646.82 µs 650.17 µs 653.91 µs]
//! medium/16 time: [1.2645 ms 1.2701 ms 1.2762 ms]
//! medium/32 time: [2.4409 ms 2.4550 ms 2.4692 ms]
//! medium/64 time: [4.6814 ms 4.7114 ms 4.7408 ms]
//! medium/128 time: [8.7790 ms 8.9037 ms 9.0282 ms]
//! async-short/1 time: [24.584 µs 24.737 µs 24.922 µs]
//! async-short/2 time: [33.479 µs 33.660 µs 33.888 µs]
//! async-short/4 time: [42.713 µs 43.046 µs 43.440 µs]
//! async-short/8 time: [71.814 µs 72.478 µs 73.240 µs]
//! async-short/16 time: [132.73 µs 134.45 µs 136.22 µs]
//! async-short/32 time: [258.31 µs 260.73 µs 263.27 µs]
//! async-short/64 time: [511.61 µs 514.44 µs 517.51 µs]
//! async-short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
//! async-medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
//! async-medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
//! async-medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
//! async-medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
//! async-medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
//! async-medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
//! async-medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
//! async-medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! sync-short/1 time: [25.503 µs 25.626 µs 25.771 µs]
//! sync-short/2 time: [30.850 µs 31.013 µs 31.208 µs]
//! sync-short/4 time: [45.543 µs 45.856 µs 46.193 µs]
//! sync-short/8 time: [84.114 µs 84.639 µs 85.220 µs]
//! sync-short/16 time: [185.22 µs 186.15 µs 187.13 µs]
//! sync-short/32 time: [377.43 µs 378.87 µs 380.46 µs]
//! sync-short/64 time: [756.49 µs 759.04 µs 761.70 µs]
//! sync-short/128 time: [1.4825 ms 1.4874 ms 1.4923 ms]
//! sync-medium/1 time: [105.66 µs 106.01 µs 106.43 µs]
//! sync-medium/2 time: [153.10 µs 153.84 µs 154.72 µs]
//! sync-medium/4 time: [327.13 µs 329.44 µs 332.27 µs]
//! sync-medium/8 time: [654.26 µs 658.73 µs 663.63 µs]
//! sync-medium/16 time: [1.2682 ms 1.2748 ms 1.2816 ms]
//! sync-medium/32 time: [2.4456 ms 2.4595 ms 2.4731 ms]
//! sync-medium/64 time: [4.6523 ms 4.6890 ms 4.7256 ms]
//! sync-medium/128 time: [8.7215 ms 8.8323 ms 8.9344 ms]
//! ```

use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager};
use pageserver::{
config::PageServerConf,
walrecord::NeonWalRecord,
walredo::{PostgresRedoManager, ProcessKind},
};
use pageserver_api::{key::Key, shard::TenantShardId};
use std::{
sync::Arc,
Expand All @@ -60,44 +80,56 @@ use tokio::{sync::Barrier, task::JoinSet};
use utils::{id::TenantId, lsn::Lsn};

fn bench(c: &mut Criterion) {
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("short");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
for process_kind in &[ProcessKind::Async, ProcessKind::Sync] {
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(format!("{process_kind}-short"));
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| {
bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients)
});
},
);
}
}
}

{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("medium");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(format!("{process_kind}-medium"));
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| {
bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients)
});
},
);
}
}
}
}
criterion::criterion_group!(benches, bench);
criterion::criterion_main!(benches);

// Returns the sum of each client's wall-clock time spent executing their share of the n_redos.
fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration {
fn bench_impl(
process_kind: ProcessKind,
redo_work: Arc<Request>,
n_redos: u64,
nclients: u64,
) -> Duration {
let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();

let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let mut conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
conf.walredo_process_kind = process_kind;
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());

Expand All @@ -113,25 +145,40 @@ fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
let manager = Arc::new(manager);

// divide the amount of work equally among the clients.
let nredos_per_client = n_redos / nclients;
for _ in 0..nclients {
rt.block_on(async {
tasks.spawn(client(
Arc::clone(&manager),
Arc::clone(&start),
Arc::clone(&redo_work),
// divide the amount of work equally among the clients
n_redos / nclients,
nredos_per_client,
))
});
}

rt.block_on(async move {
let mut total_wallclock_time = std::time::Duration::from_millis(0);
let elapsed = rt.block_on(async move {
let mut total_wallclock_time = Duration::ZERO;
while let Some(res) = tasks.join_next().await {
total_wallclock_time += res.unwrap();
}
total_wallclock_time
})
});

// consistency check to ensure process kind setting worked
if nredos_per_client > 0 {
assert_eq!(
manager
.status()
.process
.map(|p| p.kind)
.expect("the benchmark work causes a walredo process to be spawned"),
std::borrow::Cow::Borrowed(process_kind.into())
);
}

elapsed
}

async fn client(
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ fn start_pageserver(
))
.unwrap();
pageserver::preinitialize_metrics();
pageserver::metrics::wal_redo::set_process_kind_metric(conf.walredo_process_kind);

// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
Expand Down
Loading
Loading