Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scope out virtualfile asyncification work #5158

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use std::{

use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
Expand Down Expand Up @@ -451,7 +452,7 @@ impl PageCache {
///
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
if let Some(slot_idx) = self.search_mapping(cache_key).await {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we released the mapping
// lock already, another thread could have evicted the page)
Expand Down Expand Up @@ -626,7 +627,7 @@ impl PageCache {
/// returns. The caller is responsible for re-checking that the slot still
/// contains the page with the same key before using it.
///
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
async fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
match cache_key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
pub mod manifest;
mod span;

pub mod metadata;
Expand Down
68 changes: 0 additions & 68 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,71 +91,3 @@ pub trait BlobWriter {
/// which can be used to retrieve the data later.
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error>;
}

///
/// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write.
///
pub struct WriteBlobWriter<W>
where
W: std::io::Write,
{
inner: W,
offset: u64,
}

impl<W> WriteBlobWriter<W>
where
W: std::io::Write,
{
pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter {
inner,
offset: start_offset,
}
}

pub fn size(&self) -> u64 {
self.offset
}

/// Access the underlying Write object.
///
/// NOTE: WriteBlobWriter keeps track of the current write offset. If
/// you write something directly to the inner Write object, it makes the
/// internally tracked 'offset' to go out of sync. So don't do that.
pub fn into_inner(self) -> W {
self.inner
}
}

impl<W> BlobWriter for WriteBlobWriter<W>
where
W: std::io::Write,
{
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
let offset = self.offset;

if srcbuf.len() < 128 {
// Short blob. Write a 1-byte length header
let len_buf = srcbuf.len() as u8;
self.inner.write_all(&[len_buf])?;
self.offset += 1;
} else {
// Write a 4-byte length header
if srcbuf.len() > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", srcbuf.len()),
));
}
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.inner.write_all(&len_buf)?;
self.offset += 4;
}
self.inner.write_all(srcbuf)?;
self.offset += srcbuf.len() as u64;
Ok(offset)
}
}
78 changes: 42 additions & 36 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,52 +150,58 @@ pub struct FileBlockReader<F> {
file_id: page_cache::FileId,
}

impl<F> FileBlockReader<F>
where
F: FileExt,
{
impl<F> FileBlockReader<F> {
pub fn new(file: F) -> Self {
let file_id = page_cache::next_file_id();

FileBlockReader { file_id, file }
}
}

/// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
}
/// Read a block.
///
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();

// Swap for read lock
continue;
macro_rules! impls {
(FileBlockReader<$ty:ty>) => {
impl FileBlockReader<$ty> {
/// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
}
/// Read a block.
///
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();

// Swap for read lock
continue;
}
};
}
};
}
}
}
};
}

impls!(FileBlockReader<File>);
impls!(FileBlockReader<VirtualFile>);

impl BlockReader for FileBlockReader<File> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use tracing::*;
Expand Down
Loading