Skip to content

Commit

Permalink
refactor: introduce entry struct
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Jul 29, 2024
1 parent 859f019 commit f45613b
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<Key, VecMap<Lsn, u64>>,
index: BTreeMap<Key, VecMap<Lsn, InMemoryLayerIndexValue>>,

/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
Expand All @@ -91,6 +91,11 @@ pub struct InMemoryLayerInner {

resource_units: GlobalResourceUnits,
}
struct InMemoryLayerIndexValue {
/// The offset returned by [`EphemeralFile::write_blob`]
/// when we wrote the [`Value`] in [`InMemoryLayerInner::put_value`].
pos: u64,
}

impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -274,7 +279,8 @@ impl InMemoryLayer {
let cursor = inner.file.block_cursor();
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
for (lsn, value) in vec_map.as_slice() {
let InMemoryLayerIndexValue { pos } = value;
let mut desc = String::new();
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let val = Value::des(&buf);
Expand Down Expand Up @@ -325,7 +331,8 @@ impl InMemoryLayer {
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
for (entry_lsn, value) in slice.iter().rev() {
let InMemoryLayerIndexValue { pos } = value;
let buf = reader.read_blob(*pos, &ctx).await?;
let value = Value::des(&buf)?;
match value {
Expand Down Expand Up @@ -384,7 +391,8 @@ impl InMemoryLayer {

let slice = vec_map.slice_range(lsn_range);

for (entry_lsn, pos) in slice.iter().rev() {
for (entry_lsn, value) in slice.iter().rev() {
let InMemoryLayerIndexValue { pos } = value;
// TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
let buf = reader.read_blob(*pos, &ctx).await;
if let Err(e) = buf {
Expand Down Expand Up @@ -516,9 +524,10 @@ impl InMemoryLayer {
)
.await?
};
let entry = InMemoryLayerIndexValue { pos: off };

let vec_map = locked_inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
let old = vec_map.append_or_update_last(lsn, entry).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
Expand Down Expand Up @@ -640,7 +649,8 @@ impl InMemoryLayer {

for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
for (lsn, entry) in vec_map.as_slice() {
let InMemoryLayerIndexValue { pos } = entry;
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
Expand Down Expand Up @@ -673,7 +683,8 @@ impl InMemoryLayer {

for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
for (lsn, entry) in vec_map.as_slice() {
let InMemoryLayerIndexValue { pos } = entry;
// TODO: once we have blob lengths in the in-memory index, we can
// 1. get rid of the blob_io / BlockReaderRef::Slice business and
// 2. load the file contents into a Bytes and
Expand Down

0 comments on commit f45613b

Please sign in to comment.