Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add async walredo mode (disabled-by-default, opt-in via config) #6548

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
2c1652a
WIP: async walredo
problame Jan 27, 2024
a93be15
remove wal_redo_timeout
problame Jan 31, 2024
8012b80
some cleanup work
problame Jan 31, 2024
2736f61
error handling
problame Jan 31, 2024
639ed3c
clippy + compile errors
problame Jan 31, 2024
a29ac8b
clippy (again?)
problame Jan 31, 2024
4160d40
cfg(testing) still needs io::Write
problame Jan 31, 2024
70b37cf
WIP poison
problame Jan 31, 2024
b1b8ca3
working impl
problame Jan 31, 2024
9641374
move `poison` to `utils` and document
problame Jan 31, 2024
0cf5619
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 13, 2024
cd6d9ab
WIP: throughput-oriented walredo benchmark
problame Mar 15, 2024
f31f2e9
finish benchmark impl (switch to criterion)
problame Mar 20, 2024
c853c61
replace bench_walredo with my impl
problame Mar 20, 2024
f038304
minimize diff
problame Mar 20, 2024
80de856
Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchma…
problame Mar 20, 2024
48b22bd
walredo: better benchmark
problame Mar 20, 2024
929423c
add i3en.3xlarge reference numbers
problame Mar 20, 2024
081af38
Merge branch 'main' into problame/async-walredo/better-benchmark
problame Mar 20, 2024
a37d713
Merge branch 'main' into problame/async-walredo/better-benchmark
problame Mar 20, 2024
8677136
Merge branch 'problame/async-walredo/better-benchmark' into problame/…
problame Mar 20, 2024
15cfa7b
apply review suggestions
problame Mar 21, 2024
db3333e
yield after ever redo execution
problame Mar 21, 2024
d6c4562
update numbers (the yield makes a big difference, who would have thun…
problame Mar 21, 2024
c6a74bd
Merge branch 'problame/async-walredo/better-benchmark' into problame/…
problame Mar 21, 2024
a21409b
measure results
problame Mar 21, 2024
b2f5b84
cargo fmt
problame Mar 21, 2024
e669b6d
Merge branch 'problame/async-walredo/better-benchmark' into problame/…
problame Mar 21, 2024
86b0df9
apply review suggestion https://github.com/neondatabase/neon/pull/719…
problame Mar 21, 2024
3a5994b
Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchma…
problame Mar 21, 2024
3dfc7de
use chrono::DateTime for Poisoned errors
problame Mar 21, 2024
655d3b6
audit for cancellation-safety
problame Mar 21, 2024
cca66e5
HACK: restore old impl, make runtime configurable (how to: reconfigur…
problame Mar 22, 2024
67a7abc
make the default process kind runtime-configurable, and switch to sync
problame Apr 3, 2024
c77ce7c
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Apr 3, 2024
31d4d1e
env_config from PR #6125
problame Apr 5, 2024
43cf9d1
env_config improvements
problame Apr 5, 2024
dc03f7a
pageserver: ability to use a single runtime
problame Apr 5, 2024
3779854
rename "single runtime" to "one runtime", allow configuring current_t…
problame Apr 5, 2024
5cf45df
remove env_config::Bool
problame Apr 5, 2024
740efb0
cleanup
problame Apr 5, 2024
6b820bb
fixup env var value parsing
problame Apr 5, 2024
70fb7e3
metric, useful for rollout / analyzing grafana metrics
problame Apr 5, 2024
edd7f69
make current_thread mode work
problame Apr 5, 2024
871a3ca
change thread name
problame Apr 5, 2024
dc8e318
fix copy-pasta
problame Apr 5, 2024
aa5439c
Merge remote-tracking branch 'origin/main' into problame/configurable…
problame Apr 8, 2024
5efadde
Merge remote-tracking branch 'origin/problame/configurable-one-runtim…
problame Apr 8, 2024
b72891d
Revert "make the default process kind runtime-configurable, and switc…
problame Apr 8, 2024
c38b3e6
Revert "HACK: restore old impl, make runtime configurable (how to: re…
problame Apr 8, 2024
d8a9266
tokio-test not necessary
problame Apr 8, 2024
ffef90f
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Apr 8, 2024
4ef2fb2
bring back wal_redo_timeout
problame Apr 8, 2024
bea2e12
Revert "Revert "HACK: restore old impl, make runtime configurable (ho…
problame Apr 8, 2024
f489a10
fixup: re-apply bring-back of wal_redo_timeout changes after file mov…
problame Apr 8, 2024
825c0e3
Revert "Revert "make the default process kind runtime-configurable, a…
problame Apr 8, 2024
c28ed6a
HACK: set walredo process kind metric on startup
problame Apr 9, 2024
845f2ea
adjust bench for both sync and async benchmarking
problame Apr 9, 2024
6f236e8
benchmark numbers
problame Apr 9, 2024
99c20c5
cleanups around metric
problame Apr 12, 2024
883a071
expose kind in tenant status
problame Apr 12, 2024
644f7f9
add failing test to ensure walredo config option works
problame Apr 12, 2024
237f27a
also assert metric is set
problame Apr 12, 2024
4a26245
remove runtime reconfiguration capability + assert a bit more (can pa…
problame Apr 12, 2024
f334235
address https://github.com/neondatabase/neon/pull/6548#discussion_r15…
problame Apr 15, 2024
005dcbd
simplify around ProcessKind enum type; addresses https://github.com/n…
problame Apr 15, 2024
18c4b35
indentation
problame Apr 15, 2024
df5feb7
fixup(005dcbd6a89f06db2577edfb51d3aea0f287d491): bench_walredo
problame Apr 15, 2024
b6e168b
fixup(4a26245d993a840ec36942e4ebab476e6d8524aa): sometimes bench runs…
problame Apr 15, 2024
fb11c39
rerun benchmark
problame Apr 15, 2024
b311615
also level = DEBUG the process_std
problame Apr 15, 2024
bd53ab8
rerun benches
problame Apr 15, 2024
989de61
undo level = DEBUG and re-run benchmarks
problame Apr 15, 2024
4fd26c2
fixup: empty line
problame Apr 15, 2024
cecc9bc
Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchma…
problame Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub mod defaults {
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;

pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";

pub const DEFAULT_SUPERUSER: &str = "cloud_admin";

Expand Down Expand Up @@ -94,7 +93,6 @@ pub mod defaults {
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'

#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'

#page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
#max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
Expand Down Expand Up @@ -162,8 +160,6 @@ pub struct PageServerConf {

// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
pub wait_lsn_timeout: Duration,
// How long to wait for WAL redo to complete.
pub wal_redo_timeout: Duration,

pub superuser: String,

Expand Down Expand Up @@ -291,7 +287,6 @@ struct PageServerConfigBuilder {
availability_zone: BuilderValue<Option<String>>,

wait_lsn_timeout: BuilderValue<Duration>,
wal_redo_timeout: BuilderValue<Duration>,

superuser: BuilderValue<String>,

Expand Down Expand Up @@ -354,8 +349,6 @@ impl Default for PageServerConfigBuilder {
availability_zone: Set(None),
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
.expect("cannot parse default wal redo timeout")),
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
Expand Down Expand Up @@ -440,10 +433,6 @@ impl PageServerConfigBuilder {
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
}

pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
}

pub fn superuser(&mut self, superuser: String) {
self.superuser = BuilderValue::Set(superuser)
}
Expand Down Expand Up @@ -601,9 +590,6 @@ impl PageServerConfigBuilder {
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow!("missing wait_lsn_timeout"))?,
wal_redo_timeout: self
.wal_redo_timeout
.ok_or(anyhow!("missing wal_redo_timeout"))?,
superuser: self.superuser.ok_or(anyhow!("missing superuser"))?,
page_cache_size: self
.page_cache_size
Expand Down Expand Up @@ -860,7 +846,6 @@ impl PageServerConf {
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
"page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
"max_file_descriptors" => {
Expand Down Expand Up @@ -978,7 +963,6 @@ impl PageServerConf {
PageServerConf {
id: NodeId(0),
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
Expand Down Expand Up @@ -1164,7 +1148,6 @@ listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'

wait_lsn_timeout = '111 s'
wal_redo_timeout = '111 s'

page_cache_size = 444
max_file_descriptors = 333
Expand Down Expand Up @@ -1205,7 +1188,6 @@ background_task_maximum_delay = '334 s'
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
availability_zone: None,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
Expand Down Expand Up @@ -1279,7 +1261,6 @@ background_task_maximum_delay = '334 s'
listen_http_addr: "127.0.0.1:9898".to_string(),
availability_zone: None,
wait_lsn_timeout: Duration::from_secs(111),
wal_redo_timeout: Duration::from_secs(111),
superuser: "zzzz".to_string(),
page_cache_size: 444,
max_file_descriptors: 333,
Expand Down
135 changes: 38 additions & 97 deletions pageserver/src/walredo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,20 @@
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use std::collections::VecDeque;
use std::io;
use std::io::prelude::*;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::process::{Child, Command};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
use utils::{bin_ser::BeSer, lsn::Lsn};

#[cfg(feature = "testing")]
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -73,12 +71,12 @@ pub(crate) struct BufferTag {
}

struct ProcessInput {
stdin: ChildStdin,
stdin: tokio::process::ChildStdin,
n_requests: usize,
}

struct ProcessOutput {
stdout: ChildStdout,
stdout: tokio::process::ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
Expand Down Expand Up @@ -154,9 +152,9 @@ impl PostgresRedoManager {
img,
base_img_lsn,
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);

Expand All @@ -174,9 +172,9 @@ impl PostgresRedoManager {
img,
base_img_lsn,
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
Expand Down Expand Up @@ -217,14 +215,13 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
base_img: Option<Bytes>,
base_img_lsn: Lsn,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> anyhow::Result<Bytes> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
Expand Down Expand Up @@ -269,7 +266,8 @@ impl PostgresRedoManager {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = proc
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
.apply_wal_records(buf_tag, &base_img, records)
.await
.context("apply_wal_records");

let duration = started_at.elapsed();
Expand Down Expand Up @@ -647,8 +645,8 @@ struct WalRedoProcess {
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stdout: tokio::sync::Mutex<ProcessOutput>,
stdin: tokio::sync::Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
Expand Down Expand Up @@ -699,17 +697,10 @@ impl WalRedoProcess {
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
let stdin =
tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
let stdout = tokio::process::ChildStdout::from_std(stdout)
.context("convert to tokio::ChildStdout")?;

// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
Expand Down Expand Up @@ -754,11 +745,11 @@ impl WalRedoProcess {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin: tokio::sync::Mutex::new(ProcessInput {
stdin,
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout: tokio::sync::Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
Expand All @@ -779,15 +770,12 @@ impl WalRedoProcess {
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
fn apply_wal_records(
async fn apply_wal_records(
&self,
tag: BufferTag,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().unwrap();

// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
Expand Down Expand Up @@ -816,7 +804,7 @@ impl WalRedoProcess {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);

let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
let res = self.apply_wal_records0(&writebuf).await;

if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
Expand All @@ -827,41 +815,16 @@ impl WalRedoProcess {
res
}

fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;

while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;

if n == 0 {
anyhow::bail!("WAL redo timed out");
}

// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
let mut input = self.stdin.lock().await;
input
.stdin
.write_all(writebuf)
.await
.context("write to walredo stdin")?;
let request_no = input.n_requests;
input.n_requests += 1;
drop(input);

// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
Expand All @@ -875,40 +838,17 @@ impl WalRedoProcess {
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.

let mut output = self.stdout.lock().unwrap();
let mut output = self.stdout.lock().await;
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;

if n == 0 {
anyhow::bail!("WAL redo timed out");
}

// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output
.stdout
.read_exact(&mut resultbuf)
.await
.context("read walredo stdout")?;
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
Expand Down Expand Up @@ -973,6 +913,7 @@ impl WalRedoProcess {

let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);

use std::io::Write;
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
Expand Down
Loading