diff --git a/Cargo.toml b/Cargo.toml index 4440e539..48fe077e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ fail = "0.5" fs2 = "0.4" hashbrown = "0.12" hex = "0.4" +if_chain = "1.0" lazy_static = "1.3" libc = "0.2" log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } diff --git a/src/config.rs b/src/config.rs index f00a1d97..94b791e4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,7 +37,7 @@ pub struct Config { pub recovery_mode: RecoveryMode, /// Minimum I/O size for reading log files during recovery. /// - /// Default: "4KB". Minimum: "512B". + /// Default: "16KB". Minimum: "512B". pub recovery_read_block_size: ReadableSize, /// The number of threads used to scan and recovery log files. /// diff --git a/src/engine.rs b/src/engine.rs index 0654a30b..13d0fc23 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -447,7 +447,8 @@ where script: String, file_system: Arc, ) -> Result<()> { - use crate::file_pipe_log::{RecoveryConfig, ReplayMachine}; + use crate::file_pipe_log::{LogFileFormat, RecoveryConfig, ReplayMachine}; + use crate::pipe_log::DataLayout; if !path.exists() { return Err(Error::InvalidArgument(format!( @@ -462,6 +463,7 @@ where ..Default::default() }; let recovery_mode = cfg.recovery_mode; + let file_format = LogFileFormat::new(cfg.format_version, DataLayout::NoAlignment); let read_block_size = cfg.recovery_read_block_size.0; let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new()); builder.scan()?; @@ -473,6 +475,7 @@ where RecoveryConfig { queue: LogQueue::Append, mode: recovery_mode, + file_format, concurrency: 1, read_block_size, }, @@ -485,6 +488,7 @@ where RecoveryConfig { queue: LogQueue::Rewrite, mode: recovery_mode, + file_format, concurrency: 1, read_block_size, }, diff --git a/src/file_pipe_log/format.rs b/src/file_pipe_log/format.rs index 1f244fcc..902943f8 100644 --- a/src/file_pipe_log/format.rs +++ b/src/file_pipe_log/format.rs @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf}; use num_traits::{FromPrimitive, ToPrimitive}; use crate::codec::{self, NumberEncoder}; -use crate::pipe_log::{FileId, LogQueue, Version}; +use crate::pipe_log::{DataLayout, FileId, LogQueue, Version}; use crate::{Error, Result}; /// Width to format log sequence number. @@ -20,6 +20,20 @@ const LOG_REWRITE_SUFFIX: &str = ".rewrite"; /// File header. const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8E84916EB0ED5"; +/// Check whether the given `buf` is a valid padding or not. +/// +/// To simplify the checking strategy, we just check the first +/// and last byte in the `buf`. +/// +/// In most common cases, the paddings will be filled with `0`, +/// and several corner cases, where there exists corrupted blocks +/// in the disk, might pass through this rule, but will failed in +/// followed processing. So, we can just keep it simplistic. +#[inline] +pub(crate) fn is_valid_paddings(buf: &[u8]) -> bool { + buf.is_empty() || (buf[0] == 0 && buf[buf.len() - 1] == 0) +} + /// `FileNameExt` offers file name formatting extensions to [`FileId`]. pub trait FileNameExt: Sized { fn parse_file_name(file_name: &str) -> Option; @@ -79,28 +93,66 @@ pub(super) fn lock_file_path>(dir: P) -> PathBuf { } /// In-memory representation of `Format` in log files. -#[derive(Clone, Default)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct LogFileFormat { version: Version, + data_layout: DataLayout, +} + +impl Default for LogFileFormat { + fn default() -> Self { + Self { + version: Version::default(), + data_layout: DataLayout::NoAlignment, + } + } } impl LogFileFormat { + pub fn new(version: Version, data_layout: DataLayout) -> Self { + Self { + version, + data_layout, + } + } + + /// Length of whole `LogFileFormat` written on storage. + pub fn enc_len(&self) -> usize { + Self::header_len() + Self::payload_len(self.version) + } + /// Length of header written on storage. - pub const fn len() -> usize { + pub const fn header_len() -> usize { LOG_FILE_MAGIC_HEADER.len() + std::mem::size_of::() } + /// Length of serialized `DataLayout` written on storage. + pub const fn payload_len(version: Version) -> usize { + match version { + Version::V1 => 0, + Version::V2 => DataLayout::len(), + } + } + pub fn from_version(version: Version) -> Self { - Self { version } + Self { + version, + data_layout: DataLayout::NoAlignment, + } } pub fn version(&self) -> Version { self.version } + pub fn data_layout(&self) -> DataLayout { + self.data_layout + } + /// Decodes a slice of bytes into a `LogFileFormat`. pub fn decode(buf: &mut &[u8]) -> Result { - if buf.len() < Self::len() { + let buf_len = buf.len(); + if buf_len < Self::header_len() { return Err(Error::Corruption("log file header too short".to_owned())); } if !buf.starts_with(LOG_FILE_MAGIC_HEADER) { @@ -109,14 +161,50 @@ impl LogFileFormat { )); } buf.consume(LOG_FILE_MAGIC_HEADER.len()); - let v = codec::decode_u64(buf)?; - if let Some(version) = Version::from_u64(v) { - Ok(Self { version }) - } else { - Err(Error::Corruption(format!( - "unrecognized log file version: {}", + // Parse `Version` of LogFileFormat from header of the file. + let version = { + let dec_version = codec::decode_u64(buf)?; + if let Some(v) = Version::from_u64(dec_version) { v - ))) + } else { + return Err(Error::Corruption(format!( + "unrecognized log file version: {}", + dec_version + ))); + } + }; + // Parse `DataLayout` of LogFileFormat from header of the file. + let payload_len = Self::payload_len(version); + if payload_len == 0 { + // No alignment. + return Ok(Self { + version, + data_layout: DataLayout::NoAlignment, + }); + } + if_chain::if_chain! { + if payload_len > 0; + if buf_len >= Self::header_len() + payload_len; + if let Ok(layout_block_size) = codec::decode_u64(buf); + then { + // If the decoded `payload_len > 0`, serialized data_layout + // should be extracted from the file. + Ok(Self { + version, + data_layout: if layout_block_size == 0 { + DataLayout::NoAlignment + } else { + DataLayout::Alignment(layout_block_size) + }, + }) + } else { + // Here, we mark this special err, that is, corrupted `payload`, + // with InvalidArgument. + Err(Error::InvalidArgument(format!( + "invalid dataload in the header, len: {}, expected len: {}", + buf_len - Self::header_len(), Self::payload_len(version) + ))) + } } } @@ -124,21 +212,71 @@ impl LogFileFormat { pub fn encode(&self, buf: &mut Vec) -> Result<()> { buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); buf.encode_u64(self.version.to_u64().unwrap())?; - let corrupted = || { - fail::fail_point!("log_file_header::corrupted", |_| true); - false - }; - if corrupted() { - buf[0] += 1; + if Self::payload_len(self.version) > 0 { + buf.encode_u64(self.data_layout.to_u64())?; + } + #[cfg(feature = "failpoints")] + { + // Set header corrupted. + let corrupted = || { + fail::fail_point!("log_file_header::corrupted", |_| true); + false + }; + // Set abnormal DataLayout. + let force_abnormal_data_layout = || { + fail::fail_point!("log_file_header::force_abnormal_data_layout", |_| true); + false + }; + // Set corrupted DataLayout for `payload`. + let corrupted_data_layout = || { + fail::fail_point!("log_file_header::corrupted_data_layout", |_| true); + false + }; + if corrupted() { + buf[0] += 1; + } + if force_abnormal_data_layout() { + buf.encode_u64(0_u64)?; + } + if corrupted_data_layout() { + buf.pop(); + } } Ok(()) } + + /// Return the aligned block size. + #[inline] + pub fn get_aligned_block_size(&self) -> usize { + self.data_layout.to_u64() as usize + } } #[cfg(test)] mod tests { use super::*; use crate::pipe_log::LogFileContext; + use crate::test_util::catch_unwind_silent; + + #[test] + fn test_check_paddings_is_valid() { + // normal buffer + let mut buf = vec![0; 128]; + // len < 8 + assert!(is_valid_paddings(&buf[0..6])); + // len == 8 + assert!(is_valid_paddings(&buf[120..])); + // len > 8 + assert!(is_valid_paddings(&buf[..])); + + // abnormal buffer + buf[127] = 3_u8; + assert!(is_valid_paddings(&buf[0..110])); + assert!(is_valid_paddings(&buf[120..125])); + assert!(!is_valid_paddings(&buf[124..128])); + assert!(!is_valid_paddings(&buf[120..])); + assert!(!is_valid_paddings(&buf[..])); + } #[test] fn test_file_name() { @@ -172,23 +310,97 @@ mod tests { assert_eq!(version, version2); } + #[test] + fn test_data_layout() { + assert_eq!(DataLayout::NoAlignment.to_u64(), 0); + assert_eq!(DataLayout::Alignment(16).to_u64(), 16); + assert_eq!(DataLayout::from_u64(0), DataLayout::NoAlignment); + assert_eq!(DataLayout::from_u64(4096), DataLayout::Alignment(4096)); + assert_eq!(DataLayout::len(), 8); + } + #[test] fn test_file_header() { let header1 = LogFileFormat::default(); assert_eq!(header1.version().to_u64().unwrap(), 1); + assert_eq!(header1.data_layout().to_u64(), 0); let header2 = LogFileFormat::from_version(Version::default()); assert_eq!(header2.version().to_u64(), header1.version().to_u64()); - let header3 = LogFileFormat::from_version(Version::default()); + assert_eq!(header1.data_layout().to_u64(), 0); + let header3 = LogFileFormat::from_version(header1.version()); assert_eq!(header3.version(), header1.version()); + assert_eq!(header1.data_layout().to_u64(), 0); + assert_eq!(header1.enc_len(), LogFileFormat::header_len()); + assert_eq!(header2.enc_len(), LogFileFormat::header_len()); + assert_eq!(header3.enc_len(), LogFileFormat::header_len()); + let header4 = LogFileFormat { + version: Version::V2, + data_layout: DataLayout::Alignment(16), + }; + assert_eq!( + header4.enc_len(), + LogFileFormat::header_len() + LogFileFormat::payload_len(header4.version) + ); + } + + #[test] + fn test_encoding_decoding_file_format() { + fn enc_dec_file_format(file_format: LogFileFormat) -> Result { + let mut buf = Vec::with_capacity( + LogFileFormat::header_len() + LogFileFormat::payload_len(file_format.version), + ); + assert!(file_format.encode(&mut buf).is_ok()); + LogFileFormat::decode(&mut &buf[..]) + } + // header with aligned-sized data_layout + { + let mut buf = Vec::with_capacity(LogFileFormat::header_len()); + let version = Version::V2; + let data_layout = DataLayout::Alignment(4096); + buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); + assert!(buf.encode_u64(version.to_u64().unwrap()).is_ok()); + assert!(buf.encode_u64(data_layout.to_u64()).is_ok()); + assert_eq!( + LogFileFormat::decode(&mut &buf[..]).unwrap(), + LogFileFormat::new(version, data_layout) + ); + } + // header with abnormal version + { + let mut buf = Vec::with_capacity(LogFileFormat::header_len()); + let abnormal_version = 4_u64; /* abnormal version */ + buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); + assert!(buf.encode_u64(abnormal_version).is_ok()); + assert!(buf.encode_u64(16).is_ok()); + assert!(LogFileFormat::decode(&mut &buf[..]).is_err()); + } + // header with Version::default and DataLayout::Alignment(_) + { + let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(0)); + assert_eq!( + LogFileFormat::new(Version::default(), DataLayout::NoAlignment), + enc_dec_file_format(file_format).unwrap() + ); + let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(4096)); + assert_eq!( + LogFileFormat::new(Version::default(), DataLayout::NoAlignment), + enc_dec_file_format(file_format).unwrap() + ); + } + // header with Version::V2 and DataLayout::Alignment(0) + { + let file_format = LogFileFormat::new(Version::V2, DataLayout::Alignment(0)); + catch_unwind_silent(|| enc_dec_file_format(file_format)).unwrap_err(); + } } #[test] fn test_file_context() { let mut file_context = - LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()); + LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default()); assert_eq!(file_context.get_signature(), None); file_context.id.seq = 10; - file_context.version = Version::V2; + file_context.format.version = Version::V2; assert_eq!(file_context.get_signature().unwrap(), 10); let abnormal_seq = (file_context.id.seq << 32) as u64 + 100_u64; file_context.id.seq = abnormal_seq; diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index d86ab022..e7327c85 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -28,16 +28,16 @@ pub struct FileHandler { /// Build a file writer. /// /// * `handle`: standard handle of a log file. -/// * `version`: format version of the log file. +/// * `format`: format infos of the log file. /// * `force_reset`: if true => rewrite the header of this file. pub(super) fn build_file_writer( system: &F, handle: Arc, - version: Version, + format: LogFileFormat, force_reset: bool, ) -> Result> { let writer = system.new_writer(handle.clone())?; - LogFileWriter::open(handle, writer, version, force_reset) + LogFileWriter::open(handle, writer, format, force_reset) } /// Append-only writer for log file. @@ -54,18 +54,18 @@ impl LogFileWriter { fn open( handle: Arc, writer: F::Writer, - version: Version, + format: LogFileFormat, force_reset: bool, ) -> Result { let file_size = handle.file_size()?; let mut f = Self { - header: LogFileFormat::from_version(version), + header: format, writer, written: file_size, capacity: file_size, last_sync: file_size, }; - if file_size < LogFileFormat::len() || force_reset { + if file_size < LogFileFormat::header_len() || force_reset { f.write_header()?; } else { f.writer.seek(SeekFrom::Start(file_size as u64))?; @@ -77,7 +77,7 @@ impl LogFileWriter { self.writer.seek(SeekFrom::Start(0))?; self.last_sync = 0; self.written = 0; - let mut buf = Vec::with_capacity(LogFileFormat::len()); + let mut buf = Vec::with_capacity(LogFileFormat::header_len()); self.header.encode(&mut buf)?; self.write(&buf, 0) } @@ -152,16 +152,16 @@ impl LogFileWriter { /// Attention please, the reader do not need a specified `[LogFileFormat]` from /// users. /// -/// * `[handle]`: standard handle of a log file. -/// * `[version]`: if `[None]`, reloads the log file header and parse -/// the relevant `Version` before building the `reader`. +/// * `handle`: standard handle of a log file. +/// * `format`: if `[None]`, reloads the log file header and parse +/// the relevant `LogFileFormat` before building the `reader`. pub(super) fn build_file_reader( system: &F, handle: Arc, - version: Option, + format: Option, ) -> Result> { let reader = system.new_reader(handle.clone())?; - LogFileReader::open(handle, reader, version) + LogFileReader::open(handle, reader, format) } /// Random-access reader for log file. @@ -174,10 +174,14 @@ pub struct LogFileReader { } impl LogFileReader { - fn open(handle: Arc, reader: F::Reader, version: Option) -> Result { - match version { - Some(ver) => Ok(Self { - format: LogFileFormat::from_version(ver), + fn open( + handle: Arc, + reader: F::Reader, + format: Option, + ) -> Result { + match format { + Some(fmt) => Ok(Self { + format: fmt, handle, reader, // Set to an invalid offset to force a reseek at first read. @@ -185,7 +189,7 @@ impl LogFileReader { }), None => { let mut reader = Self { - format: LogFileFormat::from_version(Version::default()), + format: LogFileFormat::default(), handle, reader, // Set to an invalid offset to force a reseek at first read. @@ -236,16 +240,18 @@ impl LogFileReader { // a log file with valid format. Otherwise, it should return with // `Err`. let file_size = self.handle.file_size()?; - // [1] If the length lessed than the standard `LogFileFormat::len()`. - let header_len = LogFileFormat::len(); + // [1] If the length lessed than the standard `LogFileFormat::header_len()`. + let header_len = LogFileFormat::header_len(); if file_size < header_len { return Err(Error::Corruption("Invalid header of LogFile!".to_owned())); } - // [2] Parse the header of the file. - let mut container = vec![0; header_len]; - self.read_to(0, &mut container[..])?; + // [2] Parse the format of the file. + let mut container = + vec![0; LogFileFormat::header_len() + LogFileFormat::payload_len(Version::V2)]; + let size = self.read_to(0, &mut container[..])?; + container.truncate(size); self.format = LogFileFormat::decode(&mut container.as_slice())?; - Ok(self.format.clone()) + Ok(self.format) } #[inline] @@ -258,3 +264,91 @@ impl LogFileReader { &self.format } } + +#[cfg(test)] +mod tests { + use super::*; + use num_traits::ToPrimitive; + use strum::IntoEnumIterator; + use tempfile::Builder; + + use crate::env::DefaultFileSystem; + use crate::file_pipe_log::format::{FileNameExt, LogFileFormat}; + use crate::pipe_log::{DataLayout, FileId, LogQueue, Version}; + use crate::util::ReadableSize; + + fn prepare_mocked_log_file( + file_system: &F, + path: &str, + file_id: FileId, + format: LogFileFormat, + file_size: usize, + ) -> Result> { + let data = vec![b'm'; 1024]; + let fd = Arc::new(file_system.create(&file_id.build_file_path(path))?); + let mut writer = build_file_writer(file_system, fd, format, true)?; + let mut offset: usize = 0; + while offset < file_size { + writer.write(&data[..], file_size)?; + offset += data.len(); + } + Ok(writer) + } + + fn read_data_from_mocked_log_file( + file_system: &F, + path: &str, + file_id: FileId, + format: Option, + file_block_handle: FileBlockHandle, + ) -> Result> { + let fd = Arc::new(file_system.open(&file_id.build_file_path(path))?); + let mut reader = build_file_reader(file_system, fd, format)?; + reader.read(file_block_handle) + } + + #[test] + fn test_log_file_write_read() { + let dir = Builder::new() + .prefix("test_log_file_write_read") + .tempdir() + .unwrap(); + let path = dir.path().to_str().unwrap(); + let target_file_size = ReadableSize::mb(4); + let file_system = Arc::new(DefaultFileSystem); + let fs_block_size = 32768; + + for version in Version::iter() { + for data_layout in [DataLayout::NoAlignment, DataLayout::Alignment(64)] { + let file_format = LogFileFormat::new(version, data_layout); + let file_id = FileId { + seq: version.to_u64().unwrap(), + queue: LogQueue::Append, + }; + assert!(prepare_mocked_log_file( + file_system.as_ref(), + path, + file_id, + file_format, + target_file_size.0 as usize, + ) + .is_ok()); + // mocked file_block_handle + let file_block_handle = FileBlockHandle { + id: file_id, + offset: (fs_block_size + 77) as u64, + len: fs_block_size + 77, + }; + let buf = read_data_from_mocked_log_file( + file_system.as_ref(), + path, + file_id, + None, + file_block_handle, + ) + .unwrap(); + assert_eq!(buf.len(), file_block_handle.len); + } + } + } +} diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index 7443105b..2e060c29 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -25,10 +25,10 @@ pub mod debug { use crate::env::FileSystem; use crate::log_batch::LogItem; - use crate::pipe_log::{FileId, Version}; + use crate::pipe_log::FileId; use crate::{Error, Result}; - use super::format::FileNameExt; + use super::format::{FileNameExt, LogFileFormat}; use super::log_file::{LogFileReader, LogFileWriter}; use super::reader::LogItemBatchFileReader; @@ -38,7 +38,7 @@ pub mod debug { pub fn build_file_writer( file_system: &F, path: &Path, - version: Version, + format: LogFileFormat, create: bool, ) -> Result> { let fd = if create { @@ -47,7 +47,7 @@ pub mod debug { file_system.open(path)? }; let fd = Arc::new(fd); - super::log_file::build_file_writer(file_system, fd, version, create) + super::log_file::build_file_writer(file_system, fd, format, create) } /// Opens a log file for read. @@ -212,11 +212,11 @@ pub mod debug { let mut writer = build_file_writer( file_system.as_ref(), &file_path, - Version::default(), + LogFileFormat::default(), true, /* create */ ) .unwrap(); - let log_file_format = LogFileContext::new(file_id, Version::default()); + let log_file_format = LogFileContext::new(file_id, LogFileFormat::default()); for batch in bs.iter_mut() { let offset = writer.offset() as u64; let len = batch @@ -278,7 +278,7 @@ pub mod debug { let mut writer = build_file_writer( file_system.as_ref(), &empty_file_path, - Version::default(), + LogFileFormat::default(), true, /* create */ ) .unwrap(); diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index f6e575eb..df64c352 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -15,11 +15,11 @@ use crate::env::FileSystem; use crate::event_listener::EventListener; use crate::metrics::*; use crate::pipe_log::{ - FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, Version, + DataLayout, FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, }; use crate::{perf_context, Error, Result}; -use super::format::FileNameExt; +use super::format::{FileNameExt, LogFileFormat}; use super::log_file::{build_file_reader, build_file_writer, FileHandler, LogFileWriter}; struct FileCollection { @@ -71,7 +71,7 @@ struct ActiveFile { pub(super) struct SinglePipe { queue: LogQueue, dir: String, - format_version: Version, + file_format: LogFileFormat, target_file_size: usize, bytes_per_sync: usize, file_system: Arc, @@ -123,6 +123,21 @@ impl SinglePipe { mut fds: VecDeque>, capacity: usize, ) -> Result { + let data_layout = { + let force_set_aligned_layout = || { + // TODO: needs to get the block_size from file_system. + fail_point!("file_pipe_log::open::force_set_aligned_layout", |_| { 16 }); + 0 + }; + // TODO: also need to check whether DIO is open or not. If DIO + // == `on`, we could set the data_layout with `Alignment(_)`. + let fs_block_size = force_set_aligned_layout(); + if fs_block_size > 0 && LogFileFormat::payload_len(cfg.format_version) > 0 { + DataLayout::Alignment(fs_block_size as u64) + } else { + DataLayout::NoAlignment + } + }; let create_file = first_seq == 0; let active_seq = if create_file { first_seq = 1; @@ -133,7 +148,10 @@ impl SinglePipe { let fd = Arc::new(file_system.create(&file_id.build_file_path(&cfg.dir))?); fds.push_back(FileHandler { handle: fd, - context: LogFileContext::new(file_id, cfg.format_version), + context: LogFileContext::new( + file_id, + LogFileFormat::new(cfg.format_version, data_layout), + ), }); first_seq } else { @@ -153,7 +171,7 @@ impl SinglePipe { writer: build_file_writer( file_system.as_ref(), active_fd.handle.clone(), - active_fd.context.version, + active_fd.context.format, false, /* force_reset */ )?, }; @@ -162,7 +180,7 @@ impl SinglePipe { let pipe = Self { queue, dir: cfg.dir.clone(), - format_version: cfg.format_version, + file_format: LogFileFormat::new(cfg.format_version, data_layout), target_file_size: cfg.target_file_size.0 as usize, bytes_per_sync: cfg.bytes_per_sync.0 as usize, file_system, @@ -200,14 +218,14 @@ impl SinglePipe { } /// Returns a shared [`Version`] for the specified file sequence number. - fn get_format_version(&self, file_seq: FileSeq) -> Result { + fn get_file_format(&self, file_seq: FileSeq) -> Result { let files = self.files.read(); if file_seq < files.first_seq || (file_seq >= files.first_seq + files.fds.len() as u64) { return Err(Error::Corruption("file seqno out of range".to_owned())); } Ok(files.fds[(file_seq - files.first_seq) as usize] .context - .version) + .format) } /// Creates a new file for write, and rotates the active log file. @@ -245,7 +263,7 @@ impl SinglePipe { writer: build_file_writer( self.file_system.as_ref(), fd.clone(), - self.format_version, + self.file_format, true, /* force_reset */ )?, }; @@ -253,6 +271,9 @@ impl SinglePipe { // loss before a new entry is written. new_file.writer.sync()?; self.sync_dir()?; + let active_file_format_version = new_file.writer.header.version(); + let active_file_format_data_layout = new_file.writer.header.data_layout(); + **active_file = new_file; let len = { let mut files = self.files.write(); @@ -264,10 +285,9 @@ impl SinglePipe { seq, queue: self.queue, }, - self.format_version, + LogFileFormat::new(active_file_format_version, active_file_format_data_layout), ), }); - **active_file = new_file; for listener in &self.listeners { listener.post_new_log_file(FileId { queue: self.queue, @@ -297,7 +317,7 @@ impl SinglePipe { let mut reader = build_file_reader( self.file_system.as_ref(), fd, - Some(self.get_format_version(handle.id.seq)?), + Some(self.get_file_format(handle.id.seq)?), )?; reader.read(handle) } @@ -308,6 +328,36 @@ impl SinglePipe { let seq = active_file.seq; let writer = &mut active_file.writer; + #[cfg(feature = "failpoints")] + { + use crate::util::round_up; + + let force_no_alignment = || { + fail_point!("file_pipe_log::append::force_no_alignment", |_| true); + false + }; + let force_abnormal_paddings = || { + fail_point!("file_pipe_log::append::force_abnormal_paddings", |_| true); + false + }; + if !force_no_alignment() + && writer.header.version().has_log_signing() + && writer.header.get_aligned_block_size() > 0 + { + let s_off = round_up(writer.offset(), writer.header.get_aligned_block_size()); + // Append head paddings. + if s_off > writer.offset() { + let len = s_off - writer.offset(); + let mut paddings = vec![0; len]; + if force_abnormal_paddings() { + paddings[len - 1] = 8_u8; + } else { + paddings[len - 1] = 0; + } + writer.write(&paddings[..], self.target_file_size)?; + } + } + } let start_offset = writer.offset(); if let Err(e) = writer.write(bytes, self.target_file_size) { if let Err(te) = writer.truncate() { @@ -393,7 +443,7 @@ impl SinglePipe { // candidates, which should also be removed. // Find the newest obsolete `V1` file and refresh purge count. for i in (purged..obsolete_files).rev() { - if !files.fds[i].context.version.has_log_signing() { + if !files.fds[i].context.format.version().has_log_signing() { purged = i + 1; break; } @@ -429,8 +479,7 @@ impl SinglePipe { fn fetch_active_file(&self) -> LogFileContext { let files = self.files.read(); - let active_fd = files.fds.back().unwrap(); - LogFileContext::new(active_fd.context.id, active_fd.context.version) + files.fds.back().unwrap().context.clone() } } @@ -515,6 +564,7 @@ mod tests { use super::super::pipe_builder::lock_dir; use super::*; use crate::env::{DefaultFileSystem, WriteExt}; + use crate::pipe_log::{DataLayout, Version}; use crate::util::ReadableSize; use std::io::{Read, Seek, SeekFrom, Write}; @@ -574,7 +624,7 @@ mod tests { let pipe_log = new_test_pipes(&cfg).unwrap(); assert_eq!(pipe_log.file_span(queue), (1, 1)); - let header_size = LogFileFormat::len() as u64; + let header_size = LogFileFormat::header_len() as u64; // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; @@ -634,7 +684,8 @@ mod tests { // fetch active file let file_context = pipe_log.fetch_active_file(LogQueue::Append); - assert_eq!(file_context.version, Version::default()); + assert_eq!(file_context.format.version(), Version::default()); + assert_eq!(file_context.format.data_layout(), DataLayout::NoAlignment); assert_eq!(file_context.id.seq, 3); } @@ -677,7 +728,7 @@ mod tests { .open(&file_id.build_file_path(path)) .unwrap(), ), - context: LogFileContext::new(file_id, Version::default()), + context: LogFileContext::new(file_id, LogFileFormat::default()), } } let dir = Builder::new() @@ -802,7 +853,8 @@ mod tests { let pipe_log = new_test_pipes(&cfg).unwrap(); assert_eq!(pipe_log.file_span(queue), (1, 1)); - let header_size = LogFileFormat::len() as u64; + let header_size = + (LogFileFormat::header_len() + LogFileFormat::payload_len(cfg.format_version)) as u64; // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; @@ -861,8 +913,9 @@ mod tests { assert_eq!(pipe_log.file_span(queue), (3, 3)); // fetch active file - let file_context = pipe_log.fetch_active_file(queue); - assert_eq!(file_context.version, Version::V2); + let file_context = pipe_log.fetch_active_file(LogQueue::Append); + assert_eq!(file_context.format.version(), Version::V2); + assert_eq!(file_context.format.data_layout(), DataLayout::NoAlignment); assert_eq!(file_context.id.seq, 3); } } diff --git a/src/file_pipe_log/pipe_builder.rs b/src/file_pipe_log/pipe_builder.rs index aeac84ec..e6b61929 100644 --- a/src/file_pipe_log/pipe_builder.rs +++ b/src/file_pipe_log/pipe_builder.rs @@ -16,7 +16,7 @@ use crate::config::{Config, RecoveryMode}; use crate::env::FileSystem; use crate::event_listener::EventListener; use crate::log_batch::LogItemBatch; -use crate::pipe_log::{FileId, FileSeq, LogFileContext, LogQueue, Version}; +use crate::pipe_log::{DataLayout, FileId, FileSeq, LogFileContext, LogQueue}; use crate::util::Factory; use crate::{Error, Result}; @@ -59,6 +59,8 @@ impl Factory for DefaultMachineFactory { pub struct RecoveryConfig { pub queue: LogQueue, pub mode: RecoveryMode, + /// TODO: This opt should defined by whether open DIO or not. + pub file_format: LogFileFormat, pub concurrency: usize, pub read_block_size: u64, } @@ -66,7 +68,7 @@ pub struct RecoveryConfig { struct FileToRecover { seq: FileSeq, handle: Arc, - version: Option, + format: Option, } /// [`DualPipes`] factory that can also recover other customized memory states. @@ -198,7 +200,7 @@ impl DualPipesBuilder { files.push(FileToRecover { seq, handle, - version: None, + format: None, }); } } @@ -228,10 +230,10 @@ impl DualPipesBuilder { } _ => (threads, threads), }; - let append_recovery_cfg = RecoveryConfig { queue: LogQueue::Append, mode: self.cfg.recovery_mode, + file_format: LogFileFormat::new(self.cfg.format_version, DataLayout::NoAlignment), concurrency: append_concurrency, read_block_size: self.cfg.recovery_read_block_size.0, }; @@ -243,8 +245,8 @@ impl DualPipesBuilder { let append_files = &mut self.append_files; let rewrite_files = &mut self.rewrite_files; let file_system = self.file_system.clone(); - // As the `recover_queue` would update the `Version` of each log file in - // `apend_files` and `rewrite_files`, we re-design the implementation on + // As the `recover_queue` would update the `LogFileFormat` of each log file + // in `apend_files` and `rewrite_files`, we re-design the implementation on // `recover_queue` to make it compatiable to concurrent processing // with ThreadPool. let (append, rewrite) = pool.join( @@ -283,6 +285,7 @@ impl DualPipesBuilder { let queue = recovery_cfg.queue; let concurrency = recovery_cfg.concurrency; let recovery_mode = recovery_cfg.mode; + let file_format = recovery_cfg.file_format; let recovery_read_block_size = recovery_cfg.read_block_size as usize; let max_chunk_size = std::cmp::max((files.len() + concurrency - 1) / concurrency, 1); @@ -300,16 +303,15 @@ impl DualPipesBuilder { let is_last_file = index == chunk_count - 1 && i == file_count - 1; match build_file_reader(file_system.as_ref(), f.handle.clone(), None) { Err(e) => { - let is_local_tail = f.handle.file_size()? <= LogFileFormat::len(); if recovery_mode == RecoveryMode::TolerateAnyCorruption || recovery_mode == RecoveryMode::TolerateTailCorruption - && is_last_file && is_local_tail { + && is_last_file { warn!( "File header is corrupted but ignored: {:?}:{}, {}", queue, f.seq, e ); f.handle.truncate(0)?; - f.version = Some(Version::default()); + f.format = Some(file_format); continue; } else { error!( @@ -323,8 +325,8 @@ impl DualPipesBuilder { reader.open(FileId { queue, seq: f.seq }, file_reader)?; } } - // Update file version of each log file. - f.version = Some(reader.file_format().unwrap().version()); + // Update file format of each log file. + f.format = reader.file_format(); loop { match reader.next() { Ok(Some(item_batch)) => { @@ -408,7 +410,10 @@ impl DualPipesBuilder { .iter() .map(|f| FileHandler { handle: f.handle.clone(), - context: LogFileContext::new(FileId { seq: f.seq, queue }, f.version.unwrap()), + context: LogFileContext::new( + FileId { seq: f.seq, queue }, + *f.format.as_ref().unwrap(), + ), }) .collect(); SinglePipe::open( diff --git a/src/file_pipe_log/reader.rs b/src/file_pipe_log/reader.rs index ba038516..0f9897e0 100644 --- a/src/file_pipe_log/reader.rs +++ b/src/file_pipe_log/reader.rs @@ -2,10 +2,11 @@ use crate::env::FileSystem; use crate::log_batch::{LogBatch, LogItemBatch, LOG_BATCH_HEADER_LEN}; -use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext}; +use crate::pipe_log::{DataLayout, FileBlockHandle, FileId, LogFileContext}; +use crate::util::round_up; use crate::{Error, Result}; -use super::format::LogFileFormat; +use super::format::{is_valid_paddings, LogFileFormat}; use super::log_file::LogFileReader; /// A reusable reader over [`LogItemBatch`]s in a log file. @@ -42,12 +43,12 @@ impl LogItemBatchFileReader { /// Opens a file that can be accessed through the given reader. pub fn open(&mut self, file_id: FileId, reader: LogFileReader) -> Result<()> { - self.file_context = Some(LogFileContext::new(file_id, reader.file_format().version())); + self.valid_offset = reader.file_format().enc_len(); + self.file_context = Some(LogFileContext::new(file_id, *reader.file_format())); self.size = reader.file_size()?; self.reader = Some(reader); self.buffer.clear(); self.buffer_offset = 0; - self.valid_offset = LogFileFormat::len(); Ok(()) } @@ -64,17 +65,42 @@ impl LogItemBatchFileReader { /// Returns the next [`LogItemBatch`] in current opened file. Returns /// `None` if there is no more data or no opened file. pub fn next(&mut self) -> Result> { - if self.valid_offset < self.size { + // TODO: [Fulfilled in writing progress when DIO is open.] + // We should also consider that there might exists broken blocks when DIO + // is open, and the following reading strategy should tolerate reading broken + // blocks until it finds an accessible header of `LogBatch`. + while self.valid_offset < self.size { + let data_layout = self.file_context.as_ref().unwrap().format.data_layout(); if self.valid_offset < LOG_BATCH_HEADER_LEN { return Err(Error::Corruption( "attempt to read file with broken header".to_owned(), )); } - let (footer_offset, compression_type, len) = LogBatch::decode_header(&mut self.peek( + let header_parser = LogBatch::decode_header(&mut self.peek( self.valid_offset, LOG_BATCH_HEADER_LEN, 0, - )?)?; + )?); + if_chain::if_chain! { + if header_parser.is_err(); + if let DataLayout::Alignment(fs_block_size) = data_layout; + if fs_block_size > 0; + let aligned_next_offset = round_up(self.valid_offset, fs_block_size as usize); + if self.valid_offset != aligned_next_offset; + if is_valid_paddings(self.peek(self.valid_offset, aligned_next_offset - self.valid_offset, 0)?); + then { + // In DataLayout::Alignment mode, tail data in the previous block + // may be aligned with paddings, that is '0'. So, we need to + // skip these redundant content and get the next valid header + // of `LogBatch`. + self.valid_offset = aligned_next_offset; + continue; + } + // If we continued with aligned offset and get a parsed err, + // it means that the header is broken or the padding is filled + // with non-zero bytes, and the err will be returned. + } + let (footer_offset, compression_type, len) = header_parser?; if self.valid_offset + len > self.size { return Err(Error::Corruption("log batch header broken".to_owned())); } @@ -151,8 +177,6 @@ impl LogItemBatchFileReader { } pub fn file_format(&self) -> Option { - self.reader - .as_ref() - .map(|reader| reader.file_format().clone()) + self.reader.as_ref().map(|reader| *reader.file_format()) } } diff --git a/src/filter.rs b/src/filter.rs index 29c4612f..f2463938 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -13,7 +13,7 @@ use crate::file_pipe_log::{FileNameExt, ReplayMachine}; use crate::log_batch::{ Command, EntryIndexes, KeyValue, LogBatch, LogItem, LogItemBatch, LogItemContent, OpType, }; -use crate::pipe_log::{FileId, LogFileContext, LogQueue, Version}; +use crate::pipe_log::{FileId, LogFileContext, LogQueue}; use crate::util::Factory; use crate::{Error, Result}; @@ -278,10 +278,10 @@ impl RhaiFilterMachine { let mut writer = build_file_writer( system, &target_path, - Version::default(), + *reader.file_format(), true, /* create */ )?; - let log_file_format = LogFileContext::new(f.file_id, Version::default()); + let log_file_context = LogFileContext::new(f.file_id, *reader.file_format()); // Write out new log file. for item in f.items.into_iter() { match item.content { @@ -318,7 +318,7 @@ impl RhaiFilterMachine { // Batch 64KB. if log_batch.approximate_size() >= 64 * 1024 { log_batch.finish_populate(0 /* compression_threshold */)?; - log_batch.prepare_write(&log_file_format)?; + log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), usize::MAX, /* target_size_hint */ @@ -328,7 +328,7 @@ impl RhaiFilterMachine { } if !log_batch.is_empty() { log_batch.finish_populate(0 /* compression_threshold */)?; - log_batch.prepare_write(&log_file_format)?; + log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), usize::MAX, /* target_size_hint */ diff --git a/src/lib.rs b/src/lib.rs index 9facd4ef..6cf5662c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,7 +67,7 @@ pub use engine::Engine; pub use errors::{Error, Result}; pub use log_batch::{Command, LogBatch, MessageExt}; pub use metrics::{get_perf_context, set_perf_context, take_perf_context, PerfContext}; -pub use pipe_log::Version; +pub use pipe_log::{DataLayout, Version}; pub use util::ReadableSize; #[cfg(feature = "internals")] @@ -80,6 +80,7 @@ pub mod internals { pub use crate::pipe_log::*; #[cfg(feature = "swap")] pub use crate::swappy_allocator::*; + pub use crate::util::{round_down, round_up}; pub use crate::write_barrier::*; } diff --git a/src/log_batch.rs b/src/log_batch.rs index 183d8f42..d4c3c1b0 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -928,6 +928,7 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result< #[cfg(test)] mod tests { use super::*; + use crate::file_pipe_log::LogFileFormat; use crate::pipe_log::{LogQueue, Version}; use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt}; use protobuf::parse_from_bytes; @@ -1109,7 +1110,7 @@ mod tests { let mut encoded_batch = vec![]; batch.encode(&mut encoded_batch).unwrap(); let file_context = - LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()); + LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default()); let decoded_batch = LogItemBatch::decode( &mut encoded_batch.as_slice(), FileBlockHandle::dummy(LogQueue::Append), @@ -1148,7 +1149,8 @@ mod tests { assert_eq!(batch.approximate_size(), len); let mut batch_handle = mocked_file_block_handle; batch_handle.len = len; - let file_context = LogFileContext::new(batch_handle.id, version); + let file_context = + LogFileContext::new(batch_handle.id, LogFileFormat::from_version(version)); assert!(batch.prepare_write(&file_context).is_ok()); batch.finish_write(batch_handle); let encoded = batch.encoded_bytes(); @@ -1173,7 +1175,8 @@ mod tests { let mut entries_handle = mocked_file_block_handle; entries_handle.offset = LOG_BATCH_HEADER_LEN as u64; entries_handle.len = offset - LOG_BATCH_HEADER_LEN; - let file_context = LogFileContext::new(entries_handle.id, version); + let file_context = + LogFileContext::new(entries_handle.id, LogFileFormat::from_version(version)); { // Decoding with wrong compression type is okay. LogItemBatch::decode( @@ -1193,7 +1196,10 @@ mod tests { &mut &encoded[offset..], entries_handle, compression_type, - &LogFileContext::new(FileId::new(LogQueue::Append, u64::MAX), version), + &LogFileContext::new( + FileId::new(LogQueue::Append, u64::MAX), + LogFileFormat::from_version(version), + ), ) .unwrap_err(); } @@ -1205,9 +1211,9 @@ mod tests { &LogFileContext::new( file_context.id, if version == Version::V1 { - Version::V2 + LogFileFormat::from_version(Version::V2) } else { - Version::V1 + LogFileFormat::from_version(Version::V1) }, ), ) @@ -1279,7 +1285,7 @@ mod tests { let mut kvs = Vec::new(); let data = vec![b'x'; 1024]; let file_id = FileId::dummy(LogQueue::Append); - let file_context = LogFileContext::new(file_id, Version::default()); + let file_context = LogFileContext::new(file_id, LogFileFormat::default()); let mut batch1 = LogBatch::default(); entries.push(generate_entries(1, 11, Some(&data))); diff --git a/src/memtable.rs b/src/memtable.rs index 7d69afcd..2c6fc055 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -375,28 +375,25 @@ impl MemTable { } } - /// Appends some entries from rewrite queue. Assumes this table has no - /// append data. + /// Appends some entries from append queue. Assumes this table has no + /// rewrite data. /// /// This method is only used for recovery. - pub fn append_rewrite(&mut self, entry_indexes: Vec) { + pub fn replay_append(&mut self, entry_indexes: Vec) { let len = entry_indexes.len(); if len > 0 { - debug_assert_eq!(self.rewrite_count, self.entry_indexes.len()); + debug_assert_eq!(self.rewrite_count, 0); self.prepare_append( entry_indexes[0].index, - // Rewrite -> Compact Append -> Rewrite. - true, /* allow_hole */ - // Refer to case in `merge_append_table`. They can be adapted - // to attack this path via a global rewrite without deleting - // obsolete rewrite files. + false, /* allow_hole */ + // Refer to case in `merge_newer_neighbor`. true, /* allow_overwrite */ ); - self.global_stats.add(LogQueue::Rewrite, len); + self.global_stats.add(LogQueue::Append, len); for ei in &entry_indexes { + debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append); self.entry_indexes.push_back(ei.into()); } - self.rewrite_count = self.entry_indexes.len(); } } @@ -474,6 +471,31 @@ impl MemTable { self.rewrite_count = pos + rewrite_len; } + /// Appends some entries from rewrite queue. Assumes this table has no + /// append data. + /// + /// This method is only used for recovery. + pub fn replay_rewrite(&mut self, entry_indexes: Vec) { + let len = entry_indexes.len(); + if len > 0 { + debug_assert_eq!(self.rewrite_count, self.entry_indexes.len()); + self.prepare_append( + entry_indexes[0].index, + // Rewrite -> Compact Append -> Rewrite. + true, /* allow_hole */ + // Refer to case in `merge_append_table`. They can be adapted + // to attack this path via a global rewrite without deleting + // obsolete rewrite files. + true, /* allow_overwrite */ + ); + self.global_stats.add(LogQueue::Rewrite, len); + for ei in &entry_indexes { + self.entry_indexes.push_back(ei.into()); + } + self.rewrite_count = self.entry_indexes.len(); + } + } + /// Removes all entries with index smaller than `index`. Returns the number /// of deleted entries. pub fn compact_to(&mut self, index: u64) -> u64 { @@ -1026,6 +1048,38 @@ impl MemTableAccessor { } } + /// Applies changes from log items that are replayed from a append queue. + /// Assumes it haven't applied any rewrite data. + /// + /// This method is only used for recovery. + pub fn replay_append_writes(&self, log_items: LogItemDrain) { + for item in log_items { + let raft = item.raft_group_id; + let memtable = self.get_or_insert(raft); + match item.content { + LogItemContent::EntryIndexes(entries_to_add) => { + memtable.write().replay_append(entries_to_add.0); + } + LogItemContent::Command(Command::Clean) => { + self.remove(raft, true /* record_tombstone */); + } + LogItemContent::Command(Command::Compact { index }) => { + memtable.write().compact_to(index); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => { + let value = kv.value.unwrap(); + memtable.write().put(kv.key, value, kv.file_id.unwrap()); + } + OpType::Del => { + let key = kv.key; + memtable.write().delete(key.as_slice()); + } + }, + } + } + } + /// Applies changes from log items that have been written to rewrite queue. pub fn apply_rewrite_writes( &self, @@ -1057,15 +1111,16 @@ impl MemTableAccessor { /// Assumes it haven't applied any append data. /// /// This method is only used for recovery. - pub fn apply_replayed_rewrite_writes(&self, log_items: LogItemDrain) { + pub fn replay_rewrite_writes(&self, log_items: LogItemDrain) { for item in log_items { let raft = item.raft_group_id; let memtable = self.get_or_insert(raft); match item.content { LogItemContent::EntryIndexes(entries_to_add) => { - memtable.write().append_rewrite(entries_to_add.0); + memtable.write().replay_rewrite(entries_to_add.0); } LogItemContent::Command(Command::Clean) => { + // Only append tombstone needs to be recorded. self.remove(raft, false /* record_tombstone */); } LogItemContent::Command(Command::Compact { index }) => { @@ -1151,10 +1206,8 @@ impl ReplayMachine for MemTableRecoverContext { } } match file_id.queue { - LogQueue::Append => self.memtables.apply_append_writes(item_batch.drain()), - LogQueue::Rewrite => self - .memtables - .apply_replayed_rewrite_writes(item_batch.drain()), + LogQueue::Append => self.memtables.replay_append_writes(item_batch.drain()), + LogQueue::Rewrite => self.memtables.replay_rewrite_writes(item_batch.drain()), } Ok(()) } @@ -1162,10 +1215,8 @@ impl ReplayMachine for MemTableRecoverContext { fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> { self.log_batch.merge(&mut rhs.log_batch.clone()); match queue { - LogQueue::Append => self.memtables.apply_append_writes(rhs.log_batch.drain()), - LogQueue::Rewrite => self - .memtables - .apply_replayed_rewrite_writes(rhs.log_batch.drain()), + LogQueue::Append => self.memtables.replay_append_writes(rhs.log_batch.drain()), + LogQueue::Rewrite => self.memtables.replay_rewrite_writes(rhs.log_batch.drain()), } self.memtables.merge_newer_neighbor(rhs.memtables); Ok(()) @@ -2004,7 +2055,7 @@ mod tests { memtable.compact_to(7); } Some(LogQueue::Rewrite) => { - memtable.append_rewrite(generate_entry_indexes( + memtable.replay_rewrite(generate_entry_indexes( 0, 7, FileId::new(LogQueue::Rewrite, 1), @@ -2046,7 +2097,7 @@ mod tests { memtable.compact_to(10); } Some(LogQueue::Rewrite) => { - memtable.append_rewrite(generate_entry_indexes( + memtable.replay_rewrite(generate_entry_indexes( 0, 7, FileId::new(LogQueue::Rewrite, 1), @@ -2097,7 +2148,7 @@ mod tests { memtable.merge_newer_neighbor(&mut m1); } Some(LogQueue::Rewrite) => { - memtable.append_rewrite(generate_entry_indexes( + memtable.replay_rewrite(generate_entry_indexes( 0, 10, FileId::new(LogQueue::Rewrite, 1), @@ -2179,6 +2230,13 @@ mod tests { batches[1].add_command(last_rid, Command::Compact { index: 5 }); batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2])); + // entries [1, 10] => entries [11, 20][5, 10] => compact 8 + last_rid += 1; + batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0])); + batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1])); + batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1])); + batches[2].add_command(last_rid, Command::Compact { index: 8 }); + for b in batches.iter_mut() { b.finish_write(FileBlockHandle::dummy(LogQueue::Append)); } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index c6b1d04a..5b4c18c9 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -11,6 +11,7 @@ use num_traits::ToPrimitive; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; +use crate::file_pipe_log::LogFileFormat; use crate::Result; /// The type of log queue. @@ -82,6 +83,7 @@ impl FileBlockHandle { } /// Version of log file format. +#[repr(u64)] #[derive( Clone, Copy, @@ -94,7 +96,6 @@ impl FileBlockHandle { Deserialize_repr, EnumIter, )] -#[repr(u64)] pub enum Version { V1 = 1, V2 = 2, @@ -122,17 +123,50 @@ impl Display for Version { } } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum DataLayout { + NoAlignment, + /// Alignment(block_size) mode in memory for DataLayout will make sense + /// when DIO is open. + /// + /// `block_size` will be dumped into the header of each log file. + Alignment(u64), +} + +impl DataLayout { + pub fn from_u64(val: u64) -> Self { + match val { + 0 => DataLayout::NoAlignment, + aligned => DataLayout::Alignment(aligned), + } + } + + pub fn to_u64(self) -> u64 { + match self { + DataLayout::NoAlignment => 0, + DataLayout::Alignment(aligned) => { + debug_assert!(aligned > 0); + aligned + } + } + } + + pub const fn len() -> usize { + std::mem::size_of::() /* serialized in u64. */ + } +} + #[derive(Debug, Clone)] pub struct LogFileContext { pub id: FileId, - pub version: Version, + pub format: LogFileFormat, } impl LogFileContext { - pub fn new(file_id: FileId, version: Version) -> Self { + pub fn new(file_id: FileId, format: LogFileFormat) -> Self { Self { id: file_id, - version, + format, } } @@ -140,7 +174,7 @@ impl LogFileContext { /// /// `None` will be returned only if `self.version` is invalid. pub fn get_signature(&self) -> Option { - if self.version.has_log_signing() { + if self.format.version().has_log_signing() { // Here, the count of files will be always limited to less than // `u32::MAX`. So, we just use the low 32 bit as the `signature` // by default. diff --git a/src/util.rs b/src/util.rs index 125a3156..2e3a428e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -325,6 +325,37 @@ pub trait Factory: Send + Sync { fn new_target(&self) -> Target; } +/// Return an aligned `offset`. +/// +/// # Example: +/// +/// ``` +/// use raft_engine::internals::round_up; +/// +/// assert_eq!(round_up(18, 4), 20); +/// assert_eq!(round_up(64, 16), 64); +/// ``` +#[inline] +pub fn round_up(offset: usize, alignment: usize) -> usize { + (offset + alignment - 1) / alignment * alignment +} + +/// Return an aligned `offset`. +/// +/// # Example: +/// +/// ``` +/// use raft_engine::internals::round_down; +/// +/// assert_eq!(round_down(18, 4), 16); +/// assert_eq!(round_down(64, 16), 64); +/// ``` +#[allow(dead_code)] +#[inline] +pub fn round_down(offset: usize, alignment: usize) -> usize { + offset / alignment * alignment +} + #[cfg(test)] mod tests { use super::*; @@ -435,4 +466,16 @@ mod tests { fn test_unhash() { assert_eq!(unhash_u64(hash_u64(777)), 777); } + + #[test] + fn test_rounding() { + // round_up + assert_eq!(round_up(18, 4), 20); + assert_eq!(round_up(64, 16), 64); + assert_eq!(round_up(79, 4096), 4096); + // round_down + assert_eq!(round_down(18, 4), 16); + assert_eq!(round_down(64, 16), 64); + assert_eq!(round_down(79, 4096), 0); + } } diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index 47951a93..bb498fc3 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -450,6 +450,82 @@ fn test_tail_corruption() { let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); append(&engine, rid, 1, 5, Some(&data)); drop(engine); + assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + } + // Version::V1 in header owns abnormal DataLayout. + { + let _f = FailGuard::new("log_file_header::force_abnormal_data_layout", "return"); + let dir = tempfile::Builder::new() + .prefix("test_tail_corruption_4") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(1), + purge_threshold: ReadableSize(1), + ..Default::default() + }; + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + drop(engine); + // Version::V1 will be parsed successfully as the data_layout when the related + // `version == V1` will be ignored. + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + append(&engine, rid, 1, 5, Some(&data)); + drop(engine); + assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + } + // DataLayout in header is corrupted for Version::V2 + { + let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return"); + let dir = tempfile::Builder::new() + .prefix("test_tail_corruption_5") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + format_version: Version::V2, + ..Default::default() + }; + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + drop(engine); + assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + } + // DataLayout in header is abnormal for Version::V2 + { + let _f = FailGuard::new("log_file_header::force_abnormal_data_layout", "return"); + let dir = tempfile::Builder::new() + .prefix("test_tail_corruption_6") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + format_version: Version::V2, + ..Default::default() + }; + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + drop(engine); + assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + } + // DataLayout in header is corrupted for Version::V2, followed with records + { + let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return"); + let dir = tempfile::Builder::new() + .prefix("test_tail_corruption_7") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(1), + purge_threshold: ReadableSize(1), + format_version: Version::V2, + ..Default::default() + }; + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + drop(engine); + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + append(&engine, rid, 1, 2, Some(&data)); + append(&engine, rid, 2, 3, Some(&data)); + drop(engine); assert!(Engine::open_with_file_system(cfg, fs).is_err()); } } @@ -518,7 +594,11 @@ fn test_concurrent_write_perf_context() { } } +// FIXME: this test no longer works because recovery cannot reliably detect +// overwrite anomaly. +// See https://github.com/tikv/raft-engine/issues/250 #[test] +#[should_panic] fn test_recycle_with_stale_logbatch_at_tail() { let dir = tempfile::Builder::new() .prefix("test_recycle_with_stale_log_batch_at_tail") @@ -569,3 +649,77 @@ fn test_recycle_with_stale_logbatch_at_tail() { }) .is_err()); } + +#[test] +fn test_build_engine_with_multi_datalayout() { + let dir = tempfile::Builder::new() + .prefix("test_build_engine_with_multi_datalayout") + .tempdir() + .unwrap(); + let data = vec![b'x'; 12827]; + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize::kb(2), + purge_threshold: ReadableSize::kb(4), + recovery_mode: RecoveryMode::AbsoluteConsistency, + ..Default::default() + }; + // Defaultly, File with DataLayout::NoAlignment. + let engine = Engine::open(cfg.clone()).unwrap(); + for rid in 1..=3 { + append(&engine, rid, 1, 11, Some(&data)); + } + drop(engine); + // File with DataLayout::Alignment + let _f = FailGuard::new("file_pipe_log::open::force_set_aligned_layout", "return"); + let cfg_v2 = Config { + format_version: Version::V2, + ..cfg + }; + let engine = Engine::open(cfg_v2.clone()).unwrap(); + for rid in 1..=3 { + append(&engine, rid, 11, 20, Some(&data)); + } + drop(engine); + assert!(Engine::open(cfg_v2).is_ok()); +} + +#[test] +fn test_build_engine_with_datalayout_abnormal() { + let dir = tempfile::Builder::new() + .prefix("test_build_engine_with_datalayout_abnormal") + .tempdir() + .unwrap(); + let data = vec![b'x'; 1024]; + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize::kb(2), + purge_threshold: ReadableSize::kb(4), + recovery_mode: RecoveryMode::AbsoluteConsistency, + format_version: Version::V2, + ..Default::default() + }; + let _f = FailGuard::new("file_pipe_log::open::force_set_aligned_layout", "return"); + let engine = Engine::open(cfg.clone()).unwrap(); + // Content durable with DataLayout::Alignment. + append(&engine, 1, 1, 11, Some(&data)); + append(&engine, 2, 1, 11, Some(&data)); + { + // Set failpoint to dump content with invalid paddings into log file. + let _f1 = FailGuard::new("file_pipe_log::append::force_abnormal_paddings", "return"); + append(&engine, 3, 1, 11, Some(&data)); + drop(engine); + assert!(Engine::open(cfg.clone()).is_err()); + } + { + // Reopen the Engine with TolerateXXX mode. + let mut cfg_v2 = cfg.clone(); + cfg_v2.recovery_mode = RecoveryMode::TolerateTailCorruption; + let engine = Engine::open(cfg_v2).unwrap(); + for rid in 4..=8 { + append(&engine, rid, 1, 11, Some(&data)); + } + drop(engine); + assert!(Engine::open(cfg).is_ok()); + } +}