Skip to content

Commit

Permalink
pageserver: add direct io config to virtual file (#9214)
Browse files Browse the repository at this point in the history
## Problem
We need a way to incrementally switch to direct IO. During the rollout
we might want to switch to O_DIRECT on image and delta layer read path
first before others.

## Summary of changes
- Revisited and simplified direct io config in `PageserverConf`. 
- We could add a fallback mode for open, but for read there isn't a
reasonable alternative (without creating another buffered virtual file).
- Added a wrapper around `VirtualFile`, current implementation become
`VirtualFileInner`
- Use `open_v2`, `create_v2`, `open_with_options_v2` when we want to use
the IO mode specified in PS config.
- Once we onboard all IO through VirtualFile using this new API, we will
delete the old code path.
- Make io mode live configurable for benchmarking.
- Only guaranteed for files opened after the config change, so do it
before the experiment.

As an example, we are using `open_v2` with
`virtual_file::IoMode::Direct` in
#9169

We also remove `io_buffer_alignment` config in
a04cfd7 and use it as a compile time
constant. This way we don't have to carry the alignment around or make
frequent call to retrieve this information from the static variable.

Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 authored Oct 9, 2024
1 parent 63e7fab commit bee04b8
Show file tree
Hide file tree
Showing 18 changed files with 333 additions and 248 deletions.
8 changes: 2 additions & 6 deletions libs/pageserver_api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -388,10 +387,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),

io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,

virtual_file_io_mode: None,
tenant_config: TenantConfigToml::default(),
}
}
Expand Down
75 changes: 34 additions & 41 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,6 @@ pub struct TopTenantShardsResponse {
}

pub mod virtual_file {
use std::path::PathBuf;

#[derive(
Copy,
Clone,
Expand All @@ -994,50 +992,45 @@ pub mod virtual_file {
}

/// Direct IO modes for a pageserver.
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum DirectIoMode {
/// Direct IO disabled (uses usual buffered IO).
#[default]
Disabled,
/// Direct IO disabled (performs checks and perf simulations).
Evaluate {
/// Alignment check level
alignment_check: DirectIoAlignmentCheckLevel,
/// Latency padded for performance simulation.
latency_padding: DirectIoLatencyPadding,
},
/// Direct IO enabled.
Enabled {
/// Actions to perform on alignment error.
on_alignment_error: DirectIoOnAlignmentErrorAction,
},
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO, error out if the operation fails.
#[cfg(target_os = "linux")]
Direct,
}

#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoAlignmentCheckLevel {
#[default]
Error,
Log,
None,
impl IoMode {
pub const fn preferred() -> Self {
Self::Buffered
}
}

#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoOnAlignmentErrorAction {
Error,
#[default]
FallbackToBuffered,
}
impl TryFrom<u8> for IoMode {
type Error = u8;

#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum DirectIoLatencyPadding {
/// Pad virtual file operations with IO to a fake file.
FakeFileRW { path: PathBuf },
#[default]
None,
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
x => return Err(x),
})
}
}
}

Expand Down
6 changes: 1 addition & 5 deletions pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
virtual_file::init(16384, virtual_file::io_engine_for_bench());
page_cache::init(conf.page_cache_size);

{
Expand Down
11 changes: 7 additions & 4 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,13 @@ impl Client {
.map_err(Error::ReceiveBody)
}

/// Configs io buffer alignment at runtime.
pub async fn put_io_alignment(&self, align: usize) -> Result<()> {
let uri = format!("{}/v1/io_alignment", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, align)
/// Configs io mode at runtime.
pub async fn put_io_mode(
&self,
mode: &pageserver_api::models::virtual_file::IoMode,
) -> Result<()> {
let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, mode)
.await?
.json()
.await
Expand Down
6 changes: 1 addition & 5 deletions pageserver/ctl/src/layer_map_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);

let mut total_delta_layers = 0usize;
Expand Down
8 changes: 2 additions & 6 deletions pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {

async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
Expand Down Expand Up @@ -190,11 +190,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);

let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
Expand Down
8 changes: 2 additions & 6 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use pageserver::{
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use pageserver_api::{config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, shard::TenantShardId};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::ControlFileData;
use remote_storage::{RemotePath, RemoteStorageConfig};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -205,11 +205,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {

async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
Expand Down
8 changes: 4 additions & 4 deletions pageserver/pagebench/src/cmd/getpage_latest_lsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ pub(crate) struct Args {
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,

/// Before starting the benchmark, live-reconfigure the pageserver to use specified alignment for io buffers.
/// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
#[clap(long)]
set_io_alignment: Option<usize>,
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,

targets: Option<Vec<TenantTimelineId>>,
}
Expand Down Expand Up @@ -129,8 +129,8 @@ async fn main_impl(
mgmt_api_client.put_io_engine(engine_str).await?;
}

if let Some(align) = args.set_io_alignment {
mgmt_api_client.put_io_alignment(align).await?;
if let Some(mode) = &args.set_io_mode {
mgmt_api_client.put_io_mode(mode).await?;
}

// discover targets
Expand Down
9 changes: 2 additions & 7 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ fn main() -> anyhow::Result<()> {

// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");

// The tenants directory contains all the pageserver local disk state.
// Create if not exists and make sure all the contents are durable before proceeding.
Expand Down Expand Up @@ -168,11 +167,7 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();

// Basic initialization of things that don't change after startup
virtual_file::init(
conf.max_file_descriptors,
conf.virtual_file_io_engine,
conf.io_buffer_alignment,
);
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
page_cache::init(conf.page_cache_size);

start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
Expand Down
10 changes: 3 additions & 7 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,

/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,

pub io_buffer_alignment: usize,
pub virtual_file_io_mode: virtual_file::IoMode,
}

/// Token for authentication to safekeepers
Expand Down Expand Up @@ -325,11 +323,10 @@ impl PageServerConf {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_direct_io,
virtual_file_io_mode,
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
io_buffer_alignment,
tenant_config,
} = config_toml;

Expand Down Expand Up @@ -368,8 +365,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
virtual_file_direct_io,
io_buffer_alignment,

// ------------------------------------------------------------
// fields that require additional validation or custom handling
Expand Down Expand Up @@ -408,6 +403,7 @@ impl PageServerConf {
l0_flush: l0_flush
.map(crate::l0_flush::L0FlushConfig::from)
.unwrap_or_default(),
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
};

// ------------------------------------------------------------
Expand Down
15 changes: 5 additions & 10 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
Expand Down Expand Up @@ -2381,17 +2382,13 @@ async fn put_io_engine_handler(
json_response(StatusCode::OK, ())
}

async fn put_io_alignment_handler(
async fn put_io_mode_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let align: usize = json_request(&mut r).await?;
crate::virtual_file::set_io_buffer_alignment(align).map_err(|align| {
ApiError::PreconditionFailed(
format!("Requested io alignment ({align}) is not a power of two").into(),
)
})?;
let mode: IoMode = json_request(&mut r).await?;
crate::virtual_file::set_io_mode(mode);
json_response(StatusCode::OK, ())
}

Expand Down Expand Up @@ -3082,9 +3079,7 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_alignment", |r| {
api_handler(r, put_io_alignment_handler)
})
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),
Expand Down
6 changes: 3 additions & 3 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let path = self.buffered_writer.as_inner().as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
Expand Down Expand Up @@ -356,7 +356,7 @@ mod tests {
}

let file_contents =
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
assert_eq!(file_contents, &content[0..cap]);

let buffer_contents = file.buffered_writer.inspect_buffer();
Expand Down Expand Up @@ -392,7 +392,7 @@ mod tests {
.buffered_writer
.as_inner()
.as_inner()
.path
.path()
.metadata()
.unwrap();
assert_eq!(
Expand Down
Loading

1 comment on commit bee04b8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5094 tests run: 4886 passed, 1 failed, 207 skipped (full report)


Failures on Postgres 15

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_subscriber_restart[release-pg15]"
Flaky tests (3)

Postgres 17

Postgres 15

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
bee04b8 at 2024-10-09T13:28:01.752Z :recycle:

Please sign in to comment.