Skip to content

Commit

Permalink
don't think in pages, but DIO chunks; remove read_page & page_caching…
Browse files Browse the repository at this point in the history
… remnants
  • Loading branch information
problame committed Aug 14, 2024
1 parent 21ad9c4 commit b580a44
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 587 deletions.
179 changes: 161 additions & 18 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::virtual_file::{self, VirtualFile};
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
use anyhow::Context;
use bytes::BytesMut;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tracing::error;

use std::io;
use std::sync::atomic::AtomicU64;
Expand All @@ -15,14 +22,19 @@ use utils::id::TimelineId;
pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,

rw: page_caching::RW,
page_cache_file_id: page_cache::FileId,
bytes_written: u32,
buffered_writer: owned_buffers_io::write::BufferedWriter<
BytesMut,
size_tracking_writer::Writer<VirtualFile>,
>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
}

pub(super) mod page_caching;

use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue;
mod zero_padded_read_write;

const TAIL_SZ: usize = 64 * 1024;

impl EphemeralFile {
pub async fn create(
Expand Down Expand Up @@ -52,33 +64,161 @@ impl EphemeralFile {
)
.await?;

let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore

Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, gate_guard),
page_cache_file_id,
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
size_tracking_writer::Writer::new(file),
BytesMut::with_capacity(TAIL_SZ),
),
_gate_guard: gate_guard,
})
}
}

impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
}
}
}

impl EphemeralFile {
pub(crate) fn len(&self) -> u32 {
self.rw.bytes_written()
self.bytes_written
}

pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
self.rw.page_cache_file_id()
self.page_cache_file_id
}

/// See [`self::page_caching::RW::load_to_vec`].
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
self.rw.load_to_vec(ctx).await
let size = usize::try_from(self.len()).unwrap();
let vec = Vec::with_capacity(size);

// read from disk what we've already flushed
let file_size_tracker = self.buffered_writer.as_inner();
let flushed_offset = usize::try_from(file_size_tracker.bytes_written()).unwrap();
let flushed_range = 0..flushed_offset;
let file: &VirtualFile = file_size_tracker.as_inner();
let mut vec = file
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
ctx,
)
.await?
.into_inner();

// copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
vec.extend_from_slice(buffered);
assert_eq!(vec.len(), size);
Ok(vec)
}

pub(crate) async fn read_page(
/// Fill dst will dst.bytes_total() bytes from the bytes written to the buffered writer from offset `start` and later.
/// If `dst` is larger than the available bytes, the read will be short.
/// The read will never be short for other reasons.
/// The number of bytes read into `dst` is returned as part of the result tuple.
/// No guarantees are made about the remaining bytes in `dst`, i.e., assume their contents are random.
pub(crate) async fn read_at_to_end<B: IoBufMut + Send>(
&self,
blknum: u32,
dst: page_caching::PageBuf,
start: u32,
dst: Slice<B>,
ctx: &RequestContext,
) -> Result<page_caching::ReadResult, io::Error> {
self.rw.read_page(blknum, dst, ctx).await
) -> std::io::Result<(Slice<B>, usize)> {
let file_size_tracking_writer = self.buffered_writer.as_inner();
let flushed_offset = u32::try_from(file_size_tracking_writer.bytes_written())
.expect("we don't allow writing more than u32::MAX bytes");

let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];

let dst_cap = u32::try_from(dst.bytes_total())
.with_context(|| {
format!(
"read_aligned: dst.bytes_total() is too large: {}",
dst.len()
)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let end = {
let mut end = start
.checked_add(dst_cap)
.with_context(|| {
format!("read_aligned: offset + dst.bytes_total() is too large: {start} + {dst_cap}",)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
if end > self.bytes_written {
end = self.bytes_written;
}
end
};

// inclusive, exclusive
#[derive(Debug)]
struct Range(u32, u32);
impl Range {
fn len(&self) -> u32 {
if self.0 > self.1 {
0
} else {
self.1 - self.0
}
}
}
let written_range = Range(start, std::cmp::min(end, flushed_offset));
let buffered_range = Range(std::cmp::max(start, flushed_offset), end);

let dst = if written_range.len() > 0 {
let file: &VirtualFile = file_size_tracking_writer.as_inner();
let bounds = dst.bounds();
let slice = file
.read_exact_at(
dst.slice(0..written_range.len() as usize),
u64::try_from(start).unwrap(),
ctx,
)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
} else {
dst
};

let dst = if buffered_range.len() > 0 {
let offset_in_buffer =
usize::try_from(buffered_range.0.checked_sub(flushed_offset).unwrap()).unwrap();
let to_copy =
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len() as usize)];
let bounds = dst.bounds();
let mut view = dst.slice(written_range.len() as usize..buffered_range.len() as usize);
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};

// TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs

Ok((dst, (end - start) as usize))
}

pub(crate) async fn write_blob(
Expand All @@ -87,7 +227,7 @@ impl EphemeralFile {
will_init: bool,
ctx: &RequestContext,
) -> Result<InMemoryLayerIndexValue, io::Error> {
let pos = self.rw.bytes_written();
let pos = self.bytes_written;
let len = u32::try_from(buf.len()).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
Expand All @@ -104,7 +244,10 @@ impl EphemeralFile {
)
})?;

self.rw.write_all_borrowed(buf, ctx).await?;
self.buffered_writer
.write_buffered_borrowed(buf, ctx)
.await?;
self.bytes_written += len;

Ok(InMemoryLayerIndexValue {
pos,
Expand Down
Loading

0 comments on commit b580a44

Please sign in to comment.