Skip to content

Commit

Permalink
refactor(VirtualFile::crashsafe_overwrite): avoid Handle::block_on in…
Browse files Browse the repository at this point in the history
… callers (#6731)

Some callers of `VirtualFile::crashsafe_overwrite` call it on the
executor thread, thereby potentially stalling it.

Others are more diligent and wrap it in `spawn_blocking(...,
Handle::block_on, ... )` to avoid stalling the executor thread.

However, because `crashsafe_overwrite` uses
VirtualFile::open_with_options internally, we spawn a new thread-local
`tokio-epoll-uring::System` in the blocking pool thread that's used for
the `spawn_blocking` call.

This PR refactors the situation such that we do the `spawn_blocking`
inside `VirtualFile::crashsafe_overwrite`. This unifies the situation
for the better:

1. Callers who didn't wrap in `spawn_blocking(..., Handle::block_on,
...)` before no longer stall the executor.
2. Callers who did it before now can avoid the `block_on`, resolving the
problem with the short-lived `tokio-epoll-uring::System`s in the
blocking pool threads.

A future PR will build on top of this and divert to tokio-epoll-uring if
it's configures as the IO engine.

Changes
-------

- Convert implementation to std::fs and move it into `crashsafe.rs`
- Yes, I know, Safekeepers (cc @arssher ) added `durable_rename` and
`fsync_async_opt` recently. However, `crashsafe_overwrite` is different
in the sense that it's higher level, i.e., it's more like
`std::fs::write` and the Safekeeper team's code is more building block
style.
- The consequence is that we don't use the VirtualFile file descriptor
cache anymore.
- I don't think it's a big deal because we have plenty of slack wrt
production file descriptor limit rlimit (see [this
dashboard](https://neonprod.grafana.net/d/e4a40325-9acf-4aa0-8fd9-f6322b3f30bd/pageserver-open-file-descriptors?orgId=1))

- Use `tokio::task::spawn_blocking` in
`VirtualFile::crashsafe_overwrite` to call the new
`crashsafe::overwrite` API.
- Inspect all callers to remove any double-`spawn_blocking`
- spawn_blocking requires the captures data to be 'static + Send. So,
refactor the callers. We'll need this for future tokio-epoll-uring
support anyway, because tokio-epoll-uring requires owned buffers.

Related Issues
--------------

- overall epic to enable write path to tokio-epoll-uring: #6663
- this is also kind of relevant to the tokio-epoll-uring System creation
failures that we encountered in staging, investigation being tracked in
#6667
- why is it relevant? Because this PR removes two uses of
`spawn_blocking+Handle::block_on`
  • Loading branch information
problame authored Feb 14, 2024
1 parent f39b0fc commit df5d588
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 78 deletions.
44 changes: 43 additions & 1 deletion libs/utils/src/crashsafe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
borrow::Cow,
fs::{self, File},
io,
io::{self, Write},
};

use camino::{Utf8Path, Utf8PathBuf};
Expand Down Expand Up @@ -161,6 +161,48 @@ pub async fn durable_rename(
Ok(())
}

/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
///
/// The file is first written to the specified `tmp_path`, and in a second
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
/// and atomic rename guarantee that, if we crash at any point, there will never
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
///
/// Callers are responsible for serializing calls of this function for a given `final_path`.
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
/// I.e., the atomticity guarantees still hold.
pub fn overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true)
.open(tmp_path)?;
file.write_all(content)?;
file.sync_all()?;
drop(file); // don't keep the fd open for longer than we have to

std::fs::rename(tmp_path, final_path)?;

let final_parent_dirfd = std::fs::OpenOptions::new()
.read(true)
.open(final_path_parent)?;

final_parent_dirfd.sync_all()?;
Ok(())
}

#[cfg(test)]
mod tests {

Expand Down
5 changes: 3 additions & 2 deletions pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
.await
.maybe_fatal_err("save deletion header")?;

Expand Down Expand Up @@ -325,7 +325,8 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);

let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)

VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
Expand Down
33 changes: 10 additions & 23 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -2878,17 +2877,10 @@ impl Tenant {

let tenant_shard_id = *tenant_shard_id;
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {config_path}")
})
})
})
.await??;
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;

Ok(())
}
Expand All @@ -2915,17 +2907,12 @@ impl Tenant {

let tenant_shard_id = *tenant_shard_id;
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})
})
})
.await??;
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub async fn save_metadata(
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes)
.await
.context("write metadata")?;
Ok(())
Expand Down
11 changes: 3 additions & 8 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> {
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
let heatmap_path_bg = heatmap_path.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
})
})
.await
.expect("Blocking task is never aborted")
.maybe_fatal_err(&context_msg)?;
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
.await
.maybe_fatal_err(&context_msg)?;

tracing::debug!("Wrote local heatmap to {}", heatmap_path);

Expand Down
72 changes: 29 additions & 43 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};

use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;

pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
Expand Down Expand Up @@ -404,47 +403,34 @@ impl VirtualFile {
Ok(vfile)
}

/// Writes a file to the specified `final_path` in a crash safe fasion
/// Async version of [`::utils::crashsafe::overwrite`].
///
/// The file is first written to the specified tmp_path, and in a second
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
/// # NB:
///
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
/// it did at an earlier time.
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
final_parent_dirfd.sync_all().await?;
Ok(())
// TODO: use tokio_epoll_uring if configured as `io_engine`.
// See https://github.com/neondatabase/neon/issues/6663

tokio::task::spawn_blocking(move || {
let slice_storage;
let content_len = content.bytes_init();
let content = if content.bytes_init() > 0 {
slice_storage = Some(content.slice(0..content_len));
slice_storage.as_deref().expect("just set it to Some()")
} else {
&[]
};
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
})
.await
.expect("blocking task is never aborted")
}

/// Call File::sync_all() on the underlying File.
Expand Down Expand Up @@ -1315,7 +1301,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1324,7 +1310,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1346,7 +1332,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

Expand Down

1 comment on commit df5d588

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2511 tests run: 2382 passed, 4 failed, 125 skipped (full report)


Failures on Postgres 14

  • test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]"

Code coverage (full report)

  • functions: 55.9% (12843 of 22966 functions)
  • lines: 82.5% (69325 of 83998 lines)

The comment gets automatically updated with the latest test results
df5d588 at 2024-02-14T15:21:20.848Z :recycle:

Please sign in to comment.