Skip to content

Commit

Permalink
implement coalescing of multiple reads onto same page
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Aug 14, 2024
1 parent e408cba commit 21ad9c4
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 61 deletions.
7 changes: 5 additions & 2 deletions pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ async fn ingest(
let layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;

let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
let value = Value::Image(Bytes::from(vec![0u8; put_size]));
let data = value.ser()?;
let ctx = RequestContext::new(
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
pageserver::context::DownloadBehavior::Download,
Expand Down Expand Up @@ -95,7 +96,9 @@ async fn ingest(
}
}

layer.put_value(key.to_compact(), lsn, &data, &ctx).await?;
layer
.put_value(key.to_compact(), lsn, &data, value.will_init(), &ctx)
.await?;
}
layer.freeze(lsn + 1).await;

Expand Down
7 changes: 6 additions & 1 deletion pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl EphemeralFile {
pub(crate) async fn write_blob(
&mut self,
buf: &[u8],
will_init: bool,
ctx: &RequestContext,
) -> Result<InMemoryLayerIndexValue, io::Error> {
let pos = self.rw.bytes_written();
Expand All @@ -105,7 +106,11 @@ impl EphemeralFile {

self.rw.write_all_borrowed(buf, ctx).await?;

Ok(InMemoryLayerIndexValue { pos, len })
Ok(InMemoryLayerIndexValue {
pos,
len,
will_init,
})
}
}

Expand Down
194 changes: 137 additions & 57 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use std::collections::BTreeMap;
use std::sync::{Arc, OnceLock};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::Instant;
use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
Expand Down Expand Up @@ -93,6 +93,7 @@ pub struct InMemoryLayerInner {
pub(crate) struct InMemoryLayerIndexValue {
pub(crate) pos: u32,
pub(crate) len: u32,
pub(crate) will_init: bool, // XXX this blows up the size, can we shrink down `len`?
}

impl std::fmt::Debug for InMemoryLayerInner {
Expand Down Expand Up @@ -282,6 +283,14 @@ impl InMemoryLayer {

let inner = self.inner.read().await;

// Determine ValueReads
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
struct ValueRead {
entry_lsn: Lsn,
pos: u32,
len: u32,
value_buf: Mutex<Result<Vec<u8>, Arc<std::io::Error>>>,
}
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
.index
Expand All @@ -295,62 +304,126 @@ impl InMemoryLayer {

let slice = vec_map.slice_range(lsn_range);

'foreach_value: for (entry_lsn, value) in slice.iter().rev() {
let InMemoryLayerIndexValue { pos, len } = value;

// TODO: coalesce multiple reads that hit the same page into one page read
// Yuchen is working on a VectoredReadPlanner change to support this.
// In the meantime, we prepare the way for direct IO by doing full page reads.
let len = usize::try_from(*len).unwrap();
let mut value_buf = Vec::with_capacity(len);
let mut page_buf_storage = Some(PageBuf::from(Box::new([0u8; PAGE_SZ])));
let mut page_no = *pos / (PAGE_SZ as u32);
let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap();
while value_buf.len() < len {
let read_result = match inner
.file
.read_page(
page_no,
page_buf_storage
.take()
.expect("we put it back each iteration"),
&ctx,
)
.await
{
Ok(page) => page,
Err(e) => {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break 'foreach_value;
}
};
{
let page_contents = read_result.contents();
let remaining_in_page = std::cmp::min(
len - value_buf.len(),
page_contents.len() - offset_in_page,
);
value_buf.extend_from_slice(
&page_contents[offset_in_page..offset_in_page + remaining_in_page],
);
}
offset_in_page = 0;
page_no += 1;
page_buf_storage = Some(read_result.into_page_buf());
for (entry_lsn, index_value) in slice.iter().rev() {
reads.entry(key).or_default().push(ValueRead {
entry_lsn: *entry_lsn,
pos: index_value.pos,
len: index_value.len,
value_buf: Mutex::new(Ok(Vec::with_capacity(index_value.len as usize))),
});
if index_value.will_init {
break;
}
assert!(value_buf.len() == len);
}
}
}

// Plan which parts of which pages need to be appended to which value_buf
struct PageReadDestination<'a> {
value_read: &'a ValueRead,
offset_in_page: u32,
len: u32,
}
// use of BTreeMap's sorted iterator is critical to esnure value_buf is filled in order
let mut page_reads: BTreeMap<u32, Vec<PageReadDestination>> = BTreeMap::new();
for value_read in reads.iter().flat_map(|(_, v)| v.iter()) {
let ValueRead { pos, len, .. } = value_read;
let mut remaining = usize::try_from(*len).unwrap();
let mut page_no = *pos / (PAGE_SZ as u32);
let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap();
while remaining > 0 {
let remaining_in_page = std::cmp::min(remaining, PAGE_SZ - offset_in_page);
page_reads
.entry(page_no)
.or_default()
.push(PageReadDestination {
value_read,
offset_in_page: offset_in_page as u32,
len: remaining_in_page as u32,
});
offset_in_page = 0;
page_no += 1;
remaining -= remaining_in_page;
}
}

let value = Value::des(&value_buf);
if let Err(e) = value {
// Execute reads and fill the destination
// TODO: prefetch
let mut page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ]));
for (page_no, dsts) in page_reads.into_iter() {
let all_done = dsts.iter().all(|PageReadDestination { value_read, .. }| {
let value_buf = value_read.value_buf.lock().unwrap();
let Ok(buf) = &*value_buf else {
return true; // on Err() there's no need to read more
};
buf.len() == value_read.len as usize
});
if all_done {
continue;
}
let read_result = match inner.file.read_page(page_no, page_buf, &ctx).await {
Ok(read_result) => read_result,
Err(e) => {
let e = Arc::new(e);
for PageReadDestination { value_read, .. } in dsts {
*value_read.value_buf.lock().unwrap() = Err(Arc::clone(&e));
// this will make later reads short-circuit, see top of loop body
}
page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ])); // TODO: change read_page API to return the buffer
continue;
}
};
let page_contents = read_result.contents();
for PageReadDestination {
value_read,
offset_in_page,
len,
} in dsts
{
if let Ok(buf) = &mut *value_read.value_buf.lock().unwrap() {
buf.extend_from_slice(
&page_contents[offset_in_page as usize..(offset_in_page + len) as usize],
);
}
}
page_buf = read_result.into_page_buf();
}
drop(page_buf);

// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {
for ValueRead {
entry_lsn,
value_buf,
len,
..
} in value_reads
{
let value_buf = value_buf.into_inner().unwrap();
match value_buf {
Err(e) => {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
break;
continue 'next_key;
}
Ok(value_buf) => {
assert_eq!(
value_buf.len(),
len as usize,
"bug in this function's planning logic"
);
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
}

let key_situation =
reconstruct_state.update_key(&key, *entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
break;
let key_situation =
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
// TODO: metric to see if we fetched more values than necessary
continue 'next_key;
}
}
}
}
Expand Down Expand Up @@ -431,11 +504,13 @@ impl InMemoryLayer {
key: CompactKey,
lsn: Lsn,
buf: &[u8],
will_init: bool,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
self.put_value_locked(&mut inner, key, lsn, buf, will_init, ctx)
.await
}

async fn put_value_locked(
Expand All @@ -444,6 +519,7 @@ impl InMemoryLayer {
key: CompactKey,
lsn: Lsn,
buf: &[u8],
will_init: bool,
ctx: &RequestContext,
) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
Expand All @@ -452,6 +528,7 @@ impl InMemoryLayer {
.file
.write_blob(
buf,
will_init,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
Expand Down Expand Up @@ -593,15 +670,18 @@ impl InMemoryLayer {
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, entry) in vec_map.as_slice() {
let InMemoryLayerIndexValue { pos, len } = entry;
let InMemoryLayerIndexValue {
pos,
len,
will_init,
} = entry;
let buf = file_contents.slice(*pos as usize..(*pos + *len) as usize);
let will_init = Value::des(&buf)?.will_init();
let (_buf, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
buf.slice_len(),
will_init,
*will_init,
ctx,
)
.await;
Expand Down
4 changes: 3 additions & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5519,7 +5519,9 @@ impl<'a> TimelineWriter<'a> {

let action = self.get_open_layer_action(lsn, buf_size);
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
let res = layer.put_value(key.to_compact(), lsn, &buf, ctx).await;
let res = layer
.put_value(key.to_compact(), lsn, &buf, value.will_init(), ctx)
.await;

if res.is_ok() {
// Update the current size only when the entire write was ok.
Expand Down

0 comments on commit 21ad9c4

Please sign in to comment.