Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn BlockCursor::{read_blob,read_blob_into_buf} async fn #4905

Merged
merged 11 commits into from
Aug 14, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ nix.workspace = true
num_cpus = { version = "1.15" }
num-traits.workspace = true
once_cell.workspace = true
parking_lot = { workspace = true, features = ["send_guard"] }
pin-project-lite.workspace = true
postgres.workspace = true
postgres_backend.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
.await?;
let cursor = BlockCursor::new(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos())?;
let value = cursor.read_blob(v.pos()).await?;
println!("key:{} value_len:{}", k, value.len());
}
// TODO(chi): special handling for last key?
Expand Down
6 changes: 3 additions & 3 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ where
R: BlockReader,
{
/// Read a blob into a new buffer.
pub fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf)?;
self.read_blob_into_buf(offset, &mut buf).await?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub fn read_blob_into_buf(
pub async fn read_blob_into_buf(
&self,
offset: u64,
dstbuf: &mut Vec<u8>,
Expand Down
23 changes: 14 additions & 9 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::OnceCell;
use tracing::*;

Expand Down Expand Up @@ -262,7 +263,8 @@ impl Layer for DeltaLayer {

// A subroutine to dump a single blob
let dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
// TODO this is not ideal, but on the other hand we are in dumping code...
let buf = Handle::current().block_on(cursor.read_blob(blob_ref.pos()))?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
Expand Down Expand Up @@ -352,12 +354,15 @@ impl Layer for DeltaLayer {
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
cursor
.read_blob_into_buf(pos, &mut buf)
.await
.with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
Expand Down Expand Up @@ -986,8 +991,8 @@ pub struct ValueRef {

impl ValueRef {
/// Loads the value from disk
pub fn load(&self) -> Result<Value> {
let buf = self.reader.read_blob(self.blob_ref.pos())?;
pub async fn load(&self) -> Result<Value> {
let buf = self.reader.read_blob(self.blob_ref.pos()).await?;
let val = Value::des(&buf)?;
Ok(val)
}
Expand Down
18 changes: 11 additions & 7 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,17 @@ impl Layer for ImageLayer {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf).await? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let blob = file
.block_cursor()
.read_blob(offset)
.await
.with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);

reconstruct_state.img = Some((self.lsn, value));
Expand Down
28 changes: 14 additions & 14 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use utils::{
};
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
use parking_lot::RwLock;
use std::fmt::Write as _;
use std::ops::Range;
use std::sync::RwLock;

use super::{DeltaLayer, DeltaLayerWriter, Layer};

Expand Down Expand Up @@ -101,7 +101,7 @@ impl InMemoryLayer {

pub fn info(&self) -> InMemoryLayerInfo {
let lsn_start = self.start_lsn;
let lsn_end = self.inner.read().unwrap().end_lsn;
let lsn_end = self.inner.read().end_lsn;

match lsn_end {
Some(lsn_end) => InMemoryLayerInfo::Frozen { lsn_start, lsn_end },
Expand All @@ -117,7 +117,7 @@ impl Layer for InMemoryLayer {
}

fn get_lsn_range(&self) -> Range<Lsn> {
let inner = self.inner.read().unwrap();
let inner = self.inner.read();

let end_lsn = if let Some(end_lsn) = inner.end_lsn {
end_lsn
Expand All @@ -134,7 +134,7 @@ impl Layer for InMemoryLayer {

/// debugging function to print out the contents of the layer
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().unwrap();
let inner = self.inner.read();

let end_str = inner
.end_lsn
Expand All @@ -156,7 +156,7 @@ impl Layer for InMemoryLayer {
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
let mut desc = String::new();
cursor.read_blob_into_buf(*pos, &mut buf)?;
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
Expand Down Expand Up @@ -194,15 +194,15 @@ impl Layer for InMemoryLayer {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;

let inner = self.inner.read().unwrap();
let inner = self.inner.read();

let reader = inner.file.block_cursor();

// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let buf = reader.read_blob(*pos).await?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl Layer for InMemoryLayer {

impl std::fmt::Display for InMemoryLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.inner.read().unwrap();
let inner = self.inner.read();

let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
Expand All @@ -248,7 +248,7 @@ impl InMemoryLayer {
/// Get layer size on the disk
///
pub fn size(&self) -> Result<u64> {
let inner = self.inner.read().unwrap();
let inner = self.inner.read();
Ok(inner.file.size)
}

Expand Down Expand Up @@ -284,7 +284,7 @@ impl InMemoryLayer {
/// Adds the page version to the in-memory tree
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write();
inner.assert_writeable();

let off = {
Expand Down Expand Up @@ -317,7 +317,7 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub fn freeze(&self, end_lsn: Lsn) {
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write();

assert!(self.start_lsn < end_lsn);
inner.end_lsn = Some(end_lsn);
Expand All @@ -332,7 +332,7 @@ impl InMemoryLayer {
/// Write this frozen in-memory layer to disk.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub fn write_to_disk(&self) -> Result<DeltaLayer> {
pub async fn write_to_disk(&self) -> Result<DeltaLayer> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
Expand All @@ -342,7 +342,7 @@ impl InMemoryLayer {
// lock, it will see that it's not writeable anymore and retry, but it
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
let inner = self.inner.read();

let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
Expand All @@ -363,7 +363,7 @@ impl InMemoryLayer {
let key = **key;
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
}
Expand Down
5 changes: 3 additions & 2 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2946,7 +2946,7 @@ impl Timeline {
let frozen_layer = Arc::clone(frozen_layer);
move || {
// Write it out
let new_delta = frozen_layer.write_to_disk()?;
let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?;
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
let new_delta_path = new_delta.path();

// Sync it to disk.
Expand Down Expand Up @@ -3644,7 +3644,8 @@ impl Timeline {
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
for (key, lsn, value_ref) in all_values_iter {
let value = value_ref.load()?;
// TODO replace this with an await once we fully go async
let value = Handle::current().block_on(value_ref.load())?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
Expand Down