Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Aug 13, 2024
1 parent 52dedff commit 59a0df0
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 367 deletions.
19 changes: 1 addition & 18 deletions pageserver/src/l0_flush.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use std::{num::NonZeroUsize, sync::Arc};

use crate::tenant::ephemeral_file;

#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
PageCached,
#[serde(rename_all = "snake_case")]
Direct {
max_concurrency: NonZeroUsize,
},
Direct { max_concurrency: NonZeroUsize },
}

impl Default for L0FlushConfig {
Expand All @@ -25,14 +20,12 @@ impl Default for L0FlushConfig {
pub struct L0FlushGlobalState(Arc<Inner>);

pub enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}

impl L0FlushGlobalState {
pub fn new(config: L0FlushConfig) -> Self {
match config {
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
L0FlushConfig::Direct { max_concurrency } => {
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
Self(Arc::new(Inner::Direct { semaphore }))
Expand All @@ -44,13 +37,3 @@ impl L0FlushGlobalState {
&self.0
}
}

impl L0FlushConfig {
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
use L0FlushConfig::*;
match self {
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
}
}
}
23 changes: 0 additions & 23 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! Low-level Block-oriented I/O functions
//!
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
Expand Down Expand Up @@ -81,9 +80,7 @@ impl<'a> Deref for BlockLease<'a> {
/// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> {
FileBlockReader(&'a FileBlockReader<'a>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
Slice(&'a [u8]),
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
#[cfg(test)]
Expand All @@ -100,9 +97,7 @@ impl<'a> BlockReaderRef<'a> {
use BlockReaderRef::*;
match self {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
Slice(s) => Self::read_blk_slice(s, blknum),
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
Expand All @@ -111,24 +106,6 @@ impl<'a> BlockReaderRef<'a> {
}
}

impl<'a> BlockReaderRef<'a> {
fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
let end = start.checked_add(PAGE_SZ).unwrap();
if end > slice.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("slice too short, len={} end={}", slice.len(), end),
));
}
let slice = &slice[start..end];
let page_sized: &[u8; PAGE_SZ] = slice
.try_into()
.expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
Ok(BlockLease::Slice(page_sized))
}
}

///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///
Expand Down
130 changes: 30 additions & 100 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, VirtualFile};
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
Expand All @@ -20,8 +19,9 @@ pub struct EphemeralFile {
rw: page_caching::RW,
}

mod page_caching;
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
pub(super) mod page_caching;

use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue;
mod zero_padded_read_write;

impl EphemeralFile {
Expand Down Expand Up @@ -52,16 +52,14 @@ impl EphemeralFile {
)
.await?;

let prewarm = conf.l0_flush.prewarm_on_write();

Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, prewarm, gate_guard),
rw: page_caching::RW::new(file, gate_guard),
})
}

pub(crate) fn len(&self) -> u64 {
pub(crate) fn len(&self) -> u32 {
self.rw.bytes_written()
}

Expand All @@ -74,37 +72,40 @@ impl EphemeralFile {
self.rw.load_to_vec(ctx).await
}

pub(crate) async fn read_blk(
pub(crate) async fn read_page(
&self,
blknum: u32,
dst: page_caching::PageBuf,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
self.rw.read_blk(blknum, ctx).await
) -> Result<page_caching::ReadResult, io::Error> {
self.rw.read_page(blknum, dst, ctx).await
}

pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
buf: &[u8],
ctx: &RequestContext,
) -> Result<u64, io::Error> {
) -> Result<InMemoryLayerIndexValue, io::Error> {
let pos = self.rw.bytes_written();

// Write the length field
if srcbuf.len() < 0x80 {
// short one-byte length header
let len_buf = [srcbuf.len() as u8];

self.rw.write_all_borrowed(&len_buf, ctx).await?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
self.rw.write_all_borrowed(&len_buf, ctx).await?;
}

// Write the payload
self.rw.write_all_borrowed(srcbuf, ctx).await?;

Ok(pos)
let len = u32::try_from(buf.len()).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!(
"EphemeralFile::write_blob value too large: {}: {e}",
buf.len()
),
)
})?;
pos.checked_add(len).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"EphemeralFile::write_blob: overflow",
)
})?;

self.rw.write_all_borrowed(buf, ctx).await?;

Ok(InMemoryLayerIndexValue { pos, len })
}
}

Expand All @@ -117,19 +118,11 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
}
}

impl BlockReader for EphemeralFile {
fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use rand::{thread_rng, RngCore};
use std::fs;
use std::str::FromStr;

Expand Down Expand Up @@ -160,69 +153,6 @@ mod tests {
Ok((conf, tenant_shard_id, timeline_id, ctx))
}

#[tokio::test]
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;

let gate = utils::sync::gate::Gate::default();

let entered = gate.enter().unwrap();

let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;

let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
);
let pos_bar = file.write_blob(b"bar", &ctx).await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
);
assert_eq!(
b"bar",
file.block_cursor()
.read_blob(pos_bar, &ctx)
.await?
.as_slice()
);

let mut blobs = Vec::new();
for i in 0..10000 {
let data = Vec::from(format!("blob{}", i).as_bytes());
let pos = file.write_blob(&data, &ctx).await?;
blobs.push((pos, data));
}
// also test with a large blobs
for i in 0..100 {
let data = format!("blob{}", i).as_bytes().repeat(100);
let pos = file.write_blob(&data, &ctx).await?;
blobs.push((pos, data));
}

let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
for (pos, expected) in blobs {
let actual = cursor.read_blob(pos, &ctx).await?;
assert_eq!(actual, expected);
}

// Test a large blob that spans multiple pages
let mut large_data = vec![0; 20000];
thread_rng().fill_bytes(&mut large_data);
let pos_large = file.write_blob(&large_data, &ctx).await?;
let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
assert_eq!(result, large_data);

Ok(())
}

#[tokio::test]
async fn ephemeral_file_holds_gate_open() {
const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
Expand Down
Loading

0 comments on commit 59a0df0

Please sign in to comment.