diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index fb1c5fc485a4..6169e8798a80 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -80,6 +80,7 @@ use std::{ use anyhow::Context; use once_cell::sync::OnceCell; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, @@ -451,7 +452,7 @@ impl PageCache { /// async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option { let cache_key_orig = cache_key.clone(); - if let Some(slot_idx) = self.search_mapping(cache_key) { + if let Some(slot_idx) = self.search_mapping(cache_key).await { // The page was found in the mapping. Lock the slot, and re-check // that it's still what we expected (because we released the mapping // lock already, another thread could have evicted the page) @@ -626,7 +627,7 @@ impl PageCache { /// returns. The caller is responsible for re-checking that the slot still /// contains the page with the same key before using it. /// - fn search_mapping(&self, cache_key: &mut CacheKey) -> Option { + async fn search_mapping(&self, cache_key: &mut CacheKey) -> Option { match cache_key { CacheKey::MaterializedPage { hash_key, lsn } => { let map = self.materialized_page_map.read().unwrap(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a204f8a22b5d..8bb6a3b1d7d7 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -114,7 +114,6 @@ pub mod block_io; pub mod disk_btree; pub(crate) mod ephemeral_file; pub mod layer_map; -pub mod manifest; mod span; pub mod metadata; diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index f5ff15b50c22..96cdd6c5f554 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -91,71 +91,3 @@ pub trait BlobWriter { /// which can be used to retrieve the data later. fn write_blob(&mut self, srcbuf: &[u8]) -> Result; } - -/// -/// An implementation of BlobWriter to write blobs to anything that -/// implements std::io::Write. -/// -pub struct WriteBlobWriter -where - W: std::io::Write, -{ - inner: W, - offset: u64, -} - -impl WriteBlobWriter -where - W: std::io::Write, -{ - pub fn new(inner: W, start_offset: u64) -> Self { - WriteBlobWriter { - inner, - offset: start_offset, - } - } - - pub fn size(&self) -> u64 { - self.offset - } - - /// Access the underlying Write object. - /// - /// NOTE: WriteBlobWriter keeps track of the current write offset. If - /// you write something directly to the inner Write object, it makes the - /// internally tracked 'offset' to go out of sync. So don't do that. - pub fn into_inner(self) -> W { - self.inner - } -} - -impl BlobWriter for WriteBlobWriter -where - W: std::io::Write, -{ - fn write_blob(&mut self, srcbuf: &[u8]) -> Result { - let offset = self.offset; - - if srcbuf.len() < 128 { - // Short blob. Write a 1-byte length header - let len_buf = srcbuf.len() as u8; - self.inner.write_all(&[len_buf])?; - self.offset += 1; - } else { - // Write a 4-byte length header - if srcbuf.len() > 0x7fff_ffff { - return Err(Error::new( - ErrorKind::Other, - format!("blob too large ({} bytes)", srcbuf.len()), - )); - } - let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes(); - len_buf[0] |= 0x80; - self.inner.write_all(&len_buf)?; - self.offset += 4; - } - self.inner.write_all(srcbuf)?; - self.offset += srcbuf.len() as u64; - Ok(offset) - } -} diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 69d5b49c6d41..a9934f8af31c 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -150,52 +150,58 @@ pub struct FileBlockReader { file_id: page_cache::FileId, } -impl FileBlockReader -where - F: FileExt, -{ +impl FileBlockReader { pub fn new(file: F) -> Self { let file_id = page_cache::next_file_id(); FileBlockReader { file_id, file } } +} - /// Read a page from the underlying file into given buffer. - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { - assert!(buf.len() == PAGE_SZ); - self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) - } - /// Read a block. - /// - /// Returns a "lease" object that can be used to - /// access to the contents of the page. (For the page cache, the - /// lease object represents a lock on the buffer.) - pub async fn read_blk(&self, blknum: u32) -> Result { - let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.file_id, blknum) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to read immutable buf: {e:#}"), - ) - })? { - ReadBufResult::Found(guard) => break Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { - // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum)?; - write_guard.mark_valid(); - - // Swap for read lock - continue; +macro_rules! impls { + (FileBlockReader<$ty:ty>) => { + impl FileBlockReader<$ty> { + /// Read a page from the underlying file into given buffer. + fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { + assert!(buf.len() == PAGE_SZ); + self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + } + /// Read a block. + /// + /// Returns a "lease" object that can be used to + /// access to the contents of the page. (For the page cache, the + /// lease object represents a lock on the buffer.) + pub async fn read_blk(&self, blknum: u32) -> Result { + let cache = page_cache::get(); + loop { + match cache + .read_immutable_buf(self.file_id, blknum) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to read immutable buf: {e:#}"), + ) + })? { + ReadBufResult::Found(guard) => break Ok(guard.into()), + ReadBufResult::NotFound(mut write_guard) => { + // Read the page from disk into the buffer + self.fill_buffer(write_guard.deref_mut(), blknum)?; + write_guard.mark_valid(); + + // Swap for read lock + continue; + } + }; } - }; + } } - } + }; } +impls!(FileBlockReader); +impls!(FileBlockReader); + impl BlockReader for FileBlockReader { fn block_cursor(&self) -> BlockCursor<'_> { BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self)) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 31db3869d978..02ef7166e578 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,7 +9,6 @@ use std::cmp::min; use std::fs::OpenOptions; use std::io::{self, ErrorKind}; use std::ops::DerefMut; -use std::os::unix::prelude::FileExt; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use tracing::*; diff --git a/pageserver/src/tenant/manifest.rs b/pageserver/src/tenant/manifest.rs deleted file mode 100644 index 1d2835114f48..000000000000 --- a/pageserver/src/tenant/manifest.rs +++ /dev/null @@ -1,325 +0,0 @@ -//! This module contains the encoding and decoding of the local manifest file. -//! -//! MANIFEST is a write-ahead log which is stored locally to each timeline. It -//! records the state of the storage engine. It contains a snapshot of the -//! state and all operations proceeding that snapshot. The file begins with a -//! header recording MANIFEST version number. After that, it contains a snapshot. -//! The snapshot is followed by a list of operations. Each operation is a list -//! of records. Each record is either an addition or a removal of a layer. -//! -//! With MANIFEST, we can: -//! -//! 1. recover state quickly by reading the file, potentially boosting the -//! startup speed. -//! 2. ensure all operations are atomic and avoid corruption, solving issues -//! like redundant image layer and preparing us for future compaction -//! strategies. -//! -//! There is also a format for storing all layer files on S3, called -//! `index_part.json`. Compared with index_part, MANIFEST is an WAL which -//! records all operations as logs, and therefore we can easily replay the -//! operations when recovering from crash, while ensuring those operations -//! are atomic upon restart. -//! -//! Currently, this is not used in the system. Future refactors will ensure -//! the storage state will be recorded in this file, and the system can be -//! recovered from this file. This is tracked in -//! - -use std::io::{self, Read, Write}; - -use crate::virtual_file::VirtualFile; -use anyhow::Result; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use crc32c::crc32c; -use serde::{Deserialize, Serialize}; -use tracing::log::warn; -use utils::lsn::Lsn; - -use super::storage_layer::PersistentLayerDesc; - -pub struct Manifest { - file: VirtualFile, -} - -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)] -pub struct Snapshot { - pub layers: Vec, -} - -/// serde by default encode this in tagged enum, and therefore it will be something -/// like `{ "AddLayer": { ... } }`. -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)] -pub enum Record { - AddLayer(PersistentLayerDesc), - RemoveLayer(PersistentLayerDesc), -} - -/// `echo neon.manifest | sha1sum` and take the leading 8 bytes. -const MANIFEST_MAGIC_NUMBER: u64 = 0xf5c44592b806109c; -const MANIFEST_VERSION: u64 = 1; - -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)] -pub struct ManifestHeader { - magic_number: u64, - version: u64, -} - -const MANIFEST_HEADER_LEN: usize = 16; - -impl ManifestHeader { - fn encode(&self) -> BytesMut { - let mut buf = BytesMut::with_capacity(MANIFEST_HEADER_LEN); - buf.put_u64(self.magic_number); - buf.put_u64(self.version); - buf - } - - fn decode(mut buf: &[u8]) -> Self { - assert!(buf.len() == MANIFEST_HEADER_LEN, "invalid header"); - Self { - magic_number: buf.get_u64(), - version: buf.get_u64(), - } - } -} - -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)] -pub enum Operation { - /// A snapshot of the current state. - /// - /// Lsn field represents the LSN that is persisted to disk for this snapshot. - Snapshot(Snapshot, Lsn), - /// An atomic operation that changes the state. - /// - /// Lsn field represents the LSN that is persisted to disk after the operation is done. - /// This will only change when new L0 is flushed to the disk. - Operation(Vec, Lsn), -} - -struct RecordHeader { - size: u32, - checksum: u32, -} - -const RECORD_HEADER_LEN: usize = 8; - -impl RecordHeader { - fn encode(&self) -> BytesMut { - let mut buf = BytesMut::with_capacity(RECORD_HEADER_LEN); - buf.put_u32(self.size); - buf.put_u32(self.checksum); - buf - } - - fn decode(mut buf: &[u8]) -> Self { - assert!(buf.len() == RECORD_HEADER_LEN, "invalid header"); - Self { - size: buf.get_u32(), - checksum: buf.get_u32(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum ManifestLoadError { - #[error("manifest header is corrupted")] - CorruptedManifestHeader, - #[error("unsupported manifest version: got {0}, expected {1}")] - UnsupportedVersion(u64, u64), - #[error("error when decoding record: {0}")] - DecodeRecord(serde_json::Error), - #[error("I/O error: {0}")] - Io(io::Error), -} - -#[must_use = "Should check if the manifest is partially corrupted"] -pub struct ManifestPartiallyCorrupted(bool); - -impl Manifest { - /// Create a new manifest by writing the manifest header and a snapshot record to the given file. - pub fn init(file: VirtualFile, snapshot: Snapshot, lsn: Lsn) -> Result { - let mut manifest = Self { file }; - manifest.append_manifest_header(ManifestHeader { - magic_number: MANIFEST_MAGIC_NUMBER, - version: MANIFEST_VERSION, - })?; - manifest.append_operation(Operation::Snapshot(snapshot, lsn))?; - Ok(manifest) - } - - /// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted, - /// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and - /// backup the current one. - pub fn load( - mut file: VirtualFile, - ) -> Result<(Self, Vec, ManifestPartiallyCorrupted), ManifestLoadError> { - let mut buf = vec![]; - file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?; - - // Read manifest header - let mut buf = Bytes::from(buf); - if buf.remaining() < MANIFEST_HEADER_LEN { - return Err(ManifestLoadError::CorruptedManifestHeader); - } - let header = ManifestHeader::decode(&buf[..MANIFEST_HEADER_LEN]); - buf.advance(MANIFEST_HEADER_LEN); - if header.version != MANIFEST_VERSION { - return Err(ManifestLoadError::UnsupportedVersion( - header.version, - MANIFEST_VERSION, - )); - } - - // Read operations - let mut operations = Vec::new(); - let corrupted = loop { - if buf.remaining() == 0 { - break false; - } - if buf.remaining() < RECORD_HEADER_LEN { - warn!("incomplete header when decoding manifest, could be corrupted"); - break true; - } - let RecordHeader { size, checksum } = RecordHeader::decode(&buf[..RECORD_HEADER_LEN]); - let size = size as usize; - buf.advance(RECORD_HEADER_LEN); - if buf.remaining() < size { - warn!("incomplete data when decoding manifest, could be corrupted"); - break true; - } - let data = &buf[..size]; - if crc32c(data) != checksum { - warn!("checksum mismatch when decoding manifest, could be corrupted"); - break true; - } - // if the following decode fails, we cannot use the manifest or safely ignore any record. - operations.push(serde_json::from_slice(data).map_err(ManifestLoadError::DecodeRecord)?); - buf.advance(size); - }; - Ok(( - Self { file }, - operations, - ManifestPartiallyCorrupted(corrupted), - )) - } - - fn append_data(&mut self, data: &[u8]) -> Result<()> { - if data.len() >= u32::MAX as usize { - panic!("data too large"); - } - let header = RecordHeader { - size: data.len() as u32, - checksum: crc32c(data), - }; - let header = header.encode(); - self.file.write_all(&header)?; - self.file.write_all(data)?; - self.file.sync_all()?; - Ok(()) - } - - fn append_manifest_header(&mut self, header: ManifestHeader) -> Result<()> { - let encoded = header.encode(); - self.file.write_all(&encoded)?; - Ok(()) - } - - /// Add an operation to the manifest. The operation will be appended to the end of the file, - /// and the file will fsync. - pub fn append_operation(&mut self, operation: Operation) -> Result<()> { - let encoded = Vec::from(serde_json::to_string(&operation)?); - self.append_data(&encoded) - } -} - -#[cfg(test)] -mod tests { - use std::fs::OpenOptions; - - use crate::repository::Key; - - use super::*; - - #[test] - fn test_read_manifest() { - let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest"); - std::fs::create_dir_all(&testdir).unwrap(); - let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap(); - let layer1 = PersistentLayerDesc::new_test(Key::from_i128(0)..Key::from_i128(233)); - let layer2 = PersistentLayerDesc::new_test(Key::from_i128(233)..Key::from_i128(2333)); - let layer3 = PersistentLayerDesc::new_test(Key::from_i128(2333)..Key::from_i128(23333)); - let layer4 = PersistentLayerDesc::new_test(Key::from_i128(23333)..Key::from_i128(233333)); - - // Write a manifest with a snapshot and some operations - let snapshot = Snapshot { - layers: vec![layer1, layer2], - }; - let mut manifest = Manifest::init(file, snapshot.clone(), Lsn::from(0)).unwrap(); - manifest - .append_operation(Operation::Operation( - vec![Record::AddLayer(layer3.clone())], - Lsn::from(1), - )) - .unwrap(); - drop(manifest); - - // Open the second time and write - let file = VirtualFile::open_with_options( - &testdir.join("MANIFEST"), - OpenOptions::new() - .read(true) - .write(true) - .create_new(false) - .truncate(false), - ) - .unwrap(); - let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap(); - assert!(!corrupted.0); - assert_eq!(operations.len(), 2); - assert_eq!( - &operations[0], - &Operation::Snapshot(snapshot.clone(), Lsn::from(0)) - ); - assert_eq!( - &operations[1], - &Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1)) - ); - manifest - .append_operation(Operation::Operation( - vec![ - Record::RemoveLayer(layer3.clone()), - Record::AddLayer(layer4.clone()), - ], - Lsn::from(2), - )) - .unwrap(); - drop(manifest); - - // Open the third time and verify - let file = VirtualFile::open_with_options( - &testdir.join("MANIFEST"), - OpenOptions::new() - .read(true) - .write(true) - .create_new(false) - .truncate(false), - ) - .unwrap(); - let (_manifest, operations, corrupted) = Manifest::load(file).unwrap(); - assert!(!corrupted.0); - assert_eq!(operations.len(), 3); - assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0))); - assert_eq!( - &operations[1], - &Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1)) - ); - assert_eq!( - &operations[2], - &Operation::Operation( - vec![Record::RemoveLayer(layer3), Record::AddLayer(layer4)], - Lsn::from(2) - ) - ); - } -} diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index d9df346a1440..fdcd89eac524 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -31,7 +31,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; -use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; +use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; +use std::io::SeekFrom; use std::io::{BufWriter, Write}; -use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; @@ -582,8 +582,6 @@ struct DeltaLayerWriterInner { lsn_range: Range, tree: DiskBtreeBuilder, - - blob_writer: WriteBlobWriter>, } impl DeltaLayerWriterInner { @@ -605,12 +603,6 @@ impl DeltaLayerWriterInner { // FIXME: throw an error instead? let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range); - let mut file = VirtualFile::create(&path)?; - // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64))?; - let buf_writer = BufWriter::new(file); - let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64); - // Initialize the b-tree index builder let block_buf = BlockBuf::new(); let tree_builder = DiskBtreeBuilder::new(block_buf); @@ -623,7 +615,6 @@ impl DeltaLayerWriterInner { key_start, lsn_range, tree: tree_builder, - blob_writer, }) } @@ -632,11 +623,12 @@ impl DeltaLayerWriterInner { /// /// The values must be appended in key, lsn order. /// - fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { + async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init()) + .await } - fn put_value_bytes( + async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, @@ -644,30 +636,20 @@ impl DeltaLayerWriterInner { will_init: bool, ) -> anyhow::Result<()> { assert!(self.lsn_range.start <= lsn); - - let off = self.blob_writer.write_blob(val)?; - - let blob_ref = BlobRef::new(off, will_init); - - let delta_key = DeltaKey::from_key_lsn(&key, lsn); - self.tree.append(&delta_key.0, blob_ref.0)?; - - Ok(()) + todo!("use TBD EphemeralFile superclass"); } fn size(&self) -> u64 { - self.blob_writer.size() + self.tree.borrow_writer().size() + todo!() } /// /// Finish writing the delta layer. /// fn finish(self, key_end: Key) -> anyhow::Result { - let index_start_blk = - ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; + let index_start_blk: u32 = todo!(); - let buf_writer = self.blob_writer.into_inner(); - let mut file = buf_writer.into_inner()?; + let mut file: VirtualFile = todo!("EphemeralFile superclass needs into_inner() api"); // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; @@ -797,11 +779,11 @@ impl DeltaLayerWriter { /// /// The values must be appended in key, lsn order. /// - pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { - self.inner.as_mut().unwrap().put_value(key, lsn, val) + pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { + self.inner.as_mut().unwrap().put_value(key, lsn, val).await } - pub fn put_value_bytes( + pub async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, @@ -812,6 +794,7 @@ impl DeltaLayerWriter { .as_mut() .unwrap() .put_value_bytes(key, lsn, val, will_init) + .await } pub fn size(&self) -> u64 { @@ -828,15 +811,7 @@ impl DeltaLayerWriter { impl Drop for DeltaLayerWriter { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - match inner.blob_writer.into_inner().into_inner() { - Ok(vfile) => vfile.remove(), - Err(err) => warn!( - "error while flushing buffer of image layer temporary file: {}", - err - ), - } - } + todo!("TBD EpheemralFile superclass into_inner(); => VirtualFile => remove()"); } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b1fc25709277..e0a6d576b7d9 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,7 +27,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, KEY_SIZE}; -use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; +use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; +use std::io::SeekFrom; use std::io::Write; -use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; @@ -511,7 +511,6 @@ struct ImageLayerWriterInner { key_range: Range, lsn: Lsn, - blob_writer: WriteBlobWriter, tree: DiskBtreeBuilder, } @@ -538,13 +537,6 @@ impl ImageLayerWriterInner { }, ); info!("new image layer {}", path.display()); - let mut file = VirtualFile::open_with_options( - &path, - std::fs::OpenOptions::new().write(true).create_new(true), - )?; - // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64))?; - let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -558,7 +550,6 @@ impl ImageLayerWriterInner { key_range: key_range.clone(), lsn, tree: tree_builder, - blob_writer, }; Ok(writer) @@ -570,13 +561,7 @@ impl ImageLayerWriterInner { /// The page versions must be appended in blknum order. /// fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { - ensure!(self.key_range.contains(&key)); - let off = self.blob_writer.write_blob(img)?; - - let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; - key.write_to_byte_slice(&mut keybuf); - self.tree.append(&keybuf, off)?; - + todo!("use TBD EphemeralFile superclass that skips copying into mutable_tail"); Ok(()) } @@ -584,10 +569,9 @@ impl ImageLayerWriterInner { /// Finish writing the image layer. /// fn finish(self) -> anyhow::Result { - let index_start_blk = - ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; + let index_start_blk: u32 = todo!(); - let mut file = self.blob_writer.into_inner(); + let mut file: VirtualFile = todo!("EphemeralFile superclass needs into_inner() api"); // Write out the index file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?; @@ -725,7 +709,7 @@ impl ImageLayerWriter { impl Drop for ImageLayerWriter { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - inner.blob_writer.into_inner().remove(); + todo!("TBD EpheemralFile superclass into_inner(); => VirtualFile => remove()"); } } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 35a77a733161..e954f896943d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -348,7 +348,7 @@ impl InMemoryLayer { for (lsn, pos) in vec_map.as_slice() { cursor.read_blob_into_buf(*pos, &mut buf).await?; let will_init = Value::des(&buf)?.will_init(); - delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?; + delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init).await?; } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 04da85a24136..952ea23f51a8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3599,7 +3599,7 @@ impl Timeline { ))) }); - writer.as_mut().unwrap().put_value(key, lsn, value)?; + writer.as_mut().unwrap().put_value(key, lsn, value).await?; prev_key = Some(key); } if let Some(writer) = writer { diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index a86b8fa2a6fa..59ad73071bda 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -13,7 +13,7 @@ use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; -use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}; +use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -321,54 +321,8 @@ impl VirtualFile { drop(self); std::fs::remove_file(path).expect("failed to remove the virtual file"); } -} - -impl Drop for VirtualFile { - /// If a VirtualFile is dropped, close the underlying file if it was open. - fn drop(&mut self) { - let handle = self.handle.get_mut().unwrap(); - - // We could check with a read-lock first, to avoid waiting on an - // unrelated I/O. - let slot = &get_open_files().slots[handle.index]; - let mut slot_guard = slot.inner.write().unwrap(); - if slot_guard.tag == handle.tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also operation "close-by-replace" for closes done on eviction for - // comparison. - STORAGE_IO_TIME - .with_label_values(&["close"]) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } - } -} - -impl Read for VirtualFile { - fn read(&mut self, buf: &mut [u8]) -> Result { - let pos = self.pos; - let n = self.read_at(buf, pos)?; - self.pos += n as u64; - Ok(n) - } -} - -impl Write for VirtualFile { - fn write(&mut self, buf: &[u8]) -> Result { - let pos = self.pos; - let n = self.write_at(buf, pos)?; - self.pos += n as u64; - Ok(n) - } - fn flush(&mut self) -> Result<(), std::io::Error> { - // flush is no-op for File (at least on unix), so we don't need to do - // anything here either. - Ok(()) - } -} - -impl Seek for VirtualFile { - fn seek(&mut self, pos: SeekFrom) -> Result { + pub fn seek(&mut self, pos: SeekFrom) -> Result { match pos { SeekFrom::Start(offset) => { self.pos = offset; @@ -392,10 +346,50 @@ impl Seek for VirtualFile { } Ok(self.pos) } -} -impl FileExt for VirtualFile { - fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 + pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.read_at(buf, offset) { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } + Ok(n) => { + buf = &mut buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 + pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.write_at(buf, offset) { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )); + } + Ok(n) => { + buf = &buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { let result = self.with_file("read", |file| file.read_at(buf, offset))?; if let Ok(size) = result { STORAGE_IO_SIZE @@ -405,7 +399,7 @@ impl FileExt for VirtualFile { result } - fn write_at(&self, buf: &[u8], offset: u64) -> Result { + pub fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = self.with_file("write", |file| file.write_at(buf, offset))?; if let Ok(size) = result { STORAGE_IO_SIZE @@ -416,6 +410,26 @@ impl FileExt for VirtualFile { } } +impl Drop for VirtualFile { + /// If a VirtualFile is dropped, close the underlying file if it was open. + fn drop(&mut self) { + let handle = self.handle.get_mut().unwrap(); + + // We could check with a read-lock first, to avoid waiting on an + // unrelated I/O. + let slot = &get_open_files().slots[handle.index]; + let mut slot_guard = slot.inner.write().unwrap(); + if slot_guard.tag == handle.tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also operation "close-by-replace" for closes done on eviction for + // comparison. + STORAGE_IO_TIME + .with_label_values(&["close"]) + .observe_closure_duration(|| drop(slot_guard.file.take())); + } + } +} + impl OpenFiles { fn new(num_slots: usize) -> OpenFiles { let mut slots = Box::new(Vec::with_capacity(num_slots));