diff --git a/src/config.rs b/src/config.rs index 3835f1ee..2f7767eb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,6 +4,7 @@ use log::warn; use serde::{Deserialize, Serialize}; use crate::pipe_log::Version; +use crate::util::dev_ext::on_same_dev; use crate::{util::ReadableSize, Result}; const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512; @@ -31,6 +32,15 @@ pub struct Config { /// Default: "" pub dir: String, + /// Secondary directory to store log files. Will create on startup if + /// set and not exists. + /// + /// Newly logs will be put into this dir when the main `dir` is full + /// or not accessible. + /// + /// Default: "" + pub secondary_dir: Option, + /// How to deal with file corruption during recovery. /// /// Default: "tolerate-tail-corruption". @@ -98,6 +108,7 @@ impl Default for Config { #[allow(unused_mut)] let mut cfg = Config { dir: "".to_owned(), + secondary_dir: None, recovery_mode: RecoveryMode::TolerateTailCorruption, recovery_read_block_size: ReadableSize::kb(16), recovery_threads: 4, @@ -160,6 +171,26 @@ impl Config { if self.memory_limit.is_some() { warn!("memory-limit will be ignored because swap feature is disabled"); } + // Validate `dir`. + if let Err(e) = validate_dir(&self.dir) { + return Err(box_err!( + "dir ({}) is invalid, err: {}, please check it again", + self.dir, + e + )); + } + // Validate `secondary-dir`. + if_chain::if_chain! { + if let Some(secondary_dir) = self.secondary_dir.as_ref(); + if validate_dir(secondary_dir).is_ok(); + if let Ok(true) = on_same_dev(&self.dir, secondary_dir); + then { + warn!( + "secondary-dir ({}) and dir ({}) are on same device, recommend setting it to another device", + secondary_dir, self.dir + ); + } + } Ok(()) } @@ -183,10 +214,31 @@ impl Config { } } +/// Check the given `dir` is valid or not. If `dir` not exists, +/// it would be created automatically. +fn validate_dir(dir: &str) -> Result<()> { + use std::path::Path; + + let path = Path::new(dir); + if !path.exists() { + std::fs::create_dir(dir)?; + return Ok(()); + } + if !path.is_dir() { + return Err(box_err!("Not directory: {}", dir)); + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; + fn remove_dir(dir: &str) -> Result<()> { + std::fs::remove_dir_all(dir)?; + Ok(()) + } + #[test] fn test_serde() { let value = Config::default(); @@ -219,6 +271,7 @@ mod tests { #[test] fn test_invalid() { let hard_error = r#" + dir = "./" target-file-size = "5MB" purge-threshold = "3MB" "#; @@ -226,6 +279,7 @@ mod tests { assert!(hard_load.sanitize().is_err()); let soft_error = r#" + dir = "./" recovery-read-block-size = "1KB" recovery-threads = 0 target-file-size = "5000MB" @@ -245,6 +299,7 @@ mod tests { assert!(soft_sanitized.enable_log_recycle); let recycle_error = r#" + dir = "./" enable-log-recycle = true format-version = 1 "#; @@ -256,6 +311,7 @@ mod tests { fn test_backward_compactibility() { // Upgrade from older version. let old = r#" + dir = "./" recovery-mode = "tolerate-corrupted-tail-records" "#; let mut load: Config = toml::from_str(old).unwrap(); @@ -265,4 +321,36 @@ mod tests { .unwrap() .contains("tolerate-corrupted-tail-records")); } + + #[test] + fn test_validate_dir_setting() { + { + let dir_list = r#""#; + let mut load: Config = toml::from_str(dir_list).unwrap(); + assert!(load.sanitize().is_err()); + } + { + // Set the sub-dir same with main dir + let dir_list = r#" + dir = "./test_validate_dir_setting/" + secondary-dir = "./test_validate_dir_setting/" + "#; + let mut load: Config = toml::from_str(dir_list).unwrap(); + load.sanitize().unwrap(); + assert!(load.secondary_dir.is_some()); + assert!(remove_dir(&load.dir).is_ok()); + } + { + // Set the sub-dir with `"..."` + let dir_list = r#" + dir = "./test_validate_dir_setting" + secondary-dir = "./test_validate_dir_setting_secondary" + "#; + let mut load: Config = toml::from_str(dir_list).unwrap(); + load.sanitize().unwrap(); + assert!(load.secondary_dir.is_some()); + assert!(remove_dir(&load.dir).is_ok()); + assert!(remove_dir(&load.secondary_dir.unwrap()).is_ok()); + } + } } diff --git a/src/engine.rs b/src/engine.rs index df14cfa2..3a0174da 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -25,6 +25,8 @@ use crate::write_barrier::{WriteBarrier, Writer}; use crate::{perf_context, Error, GlobalStats, Result}; const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30); +/// Max retry count for `write`. +const MAX_WRITE_RETRY_COUNT: u64 = 2; pub struct Engine> where @@ -142,7 +144,14 @@ where let start = Instant::now(); let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; debug_assert!(len > 0); - let block_handle = { + + let mut attempt_count = 0_u64; + let block_handle = loop { + // Max retry count is limited to `2`. If the first `append` retry because of + // `NOSPC` error, the next `append` should success, unless there exists + // several abnormal cases in the IO device. In that case, `Engine::write` + // must return `Err`. + attempt_count += 1; let mut writer = Writer::new(log_batch, sync); // Snapshot and clear the current perf context temporarily, so the write group // leader will collect the perf context diff later. @@ -154,14 +163,16 @@ where for writer in group.iter_mut() { writer.entered_time = Some(now); sync |= writer.sync; + let log_batch = writer.mut_payload(); let res = self.pipe_log.append(LogQueue::Append, log_batch); writer.set_output(res); } perf_context!(log_write_duration).observe_since(now); if sync { - // As per trait protocol, this error should be retriable. But we panic anyway to - // save the trouble of propagating it to other group members. + // As per trait protocol, this error should be retriable. But we panic + // anyway to save the trouble of propagating it to + // other group members. self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()"); } // Pass the perf context diff to all the writers. @@ -178,7 +189,27 @@ where debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO); perf_context += &writer.perf_context_diff; set_perf_context(perf_context); - writer.finish()? + // Retry if `writer.finish()` returns a special 'Error::Other', remarking that + // there still exists free space for this `LogBatch`. + match writer.finish() { + Ok(handle) => { + break handle; + } + Err(Error::Other(_)) => { + // A special err, we will retry this LogBatch `append` by appending + // this writer to the next write group, and the current write leader + // will not hang on this write and will return timely. + if attempt_count >= MAX_WRITE_RETRY_COUNT { + return Err(Error::Other(box_err!( + "Failed to write logbatch, exceed max_retry_count: ({})", + MAX_WRITE_RETRY_COUNT + ))); + } + } + Err(e) => { + return Err(e); + } + } }; let mut now = Instant::now(); log_batch.finish_write(block_handle); diff --git a/src/env/default.rs b/src/env/default.rs index 451ee9fe..7f8432b6 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -106,7 +106,16 @@ impl LogFd { let bytes = match pwrite(self.0, &content[written..], offset as i64) { Ok(bytes) => bytes, Err(e) if e == Errno::EINTR => continue, - Err(e) => return Err(from_nix_error(e, "pwrite")), + Err(e) => { + return { + if e == Errno::ENOSPC { + // no space left + Err(from_nix_error(e, "nospace")) + } else { + Err(from_nix_error(e, "pwrite")) + } + }; + } }; if bytes == 0 { break; @@ -114,6 +123,9 @@ impl LogFd { written += bytes; offset += bytes; } + fail_point!("log_fd::write::no_space_err", |_| { + Err(from_nix_error(nix::Error::ENOSPC, "nospace")) + }); fail_point!("log_fd::write::err", |_| { Err(from_nix_error(nix::Error::EINVAL, "fp")) }); diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index ad99904f..4f75438d 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -9,6 +9,7 @@ use crossbeam::utils::CachePadded; use fail::fail_point; use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; +use strum::{EnumIter, IntoEnumIterator}; use crate::config::Config; use crate::env::FileSystem; @@ -22,10 +23,105 @@ use crate::{perf_context, Error, Result}; use super::format::{FileNameExt, LogFileFormat}; use super::log_file::{build_file_reader, build_file_writer, LogFileWriter}; +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, EnumIter)] +pub enum DirPathId { + Main = 0, + Secondary = 1, +} + +/// Mananges multi dirs for storing logs, including `main dir` and +/// `secondary dir`. +#[derive(Default)] +pub struct DirectoryManager { + dirs: Vec, + locks: Vec, +} + +impl DirectoryManager { + #[cfg(test)] + fn new(dir: String, secondary_dir: Option) -> Self { + let mut dirs = vec![dir; 1]; + if let Some(sec_dir) = secondary_dir { + dirs.push(sec_dir); + } + Self { + dirs, + locks: Vec::default(), + } + } + + #[inline] + pub fn add_dir(&mut self, dir: String, dir_lock: File) { + self.dirs.push(dir); + self.locks.push(dir_lock); + } + + #[inline] + pub fn get_all_dir(&self) -> &Vec { + &self.dirs + } + + #[inline] + fn get_free_dir(&self, target_size: usize) -> Option<(&str, DirPathId)> { + #[cfg(feature = "failpoints")] + { + fail::fail_point!("file_pipe_log::force_use_secondary_dir", |_| { + Some(( + self.dirs[DirPathId::Secondary as usize].as_str(), + DirPathId::Secondary, + )) + }); + fail::fail_point!("file_pipe_log::force_no_free_space", |_| { None }); + } + for t in DirPathId::iter() { + let idx = t as usize; + if idx >= self.dirs.len() { + break; + } + let disk_stats = match fs2::statvfs(&self.dirs[idx]) { + Err(e) => { + error!( + "get disk stat for raft engine failed, dir_path: {}, err: {}", + &self.dirs[idx], e + ); + return None; + } + Ok(stats) => stats, + }; + if target_size <= disk_stats.available_space() as usize { + return Some((&self.dirs[idx], t)); + } + } + None + } + + #[inline] + fn get_dir(&self, path_id: DirPathId) -> Option<&str> { + let idx = path_id as usize; + if idx >= self.dirs.len() { + None + } else { + Some(&self.dirs[idx]) + } + } + + #[inline] + fn sync_dir(&self, path_id: DirPathId) -> Result<()> { + let idx = path_id as usize; + if idx < self.dirs.len() { + let path = PathBuf::from(&self.dirs[idx]); + std::fs::File::open(path).and_then(|d| d.sync_all())?; + } + Ok(()) + } +} + #[derive(Debug)] pub struct FileWithFormat { pub handle: Arc, pub format: LogFileFormat, + pub path_id: DirPathId, } struct FileCollection { @@ -51,14 +147,15 @@ struct FileState { impl FileCollection { /// Takes a stale file if there is one. #[inline] - fn recycle_one_file(&mut self) -> Option { + fn recycle_one_file(&mut self) -> Option<(FileSeq, DirPathId)> { debug_assert!(self.first_seq <= self.first_seq_in_use); debug_assert!(!self.fds.is_empty()); if self.first_seq < self.first_seq_in_use { let seq = self.first_seq; + let path_id = self.fds[0].path_id; self.fds.pop_front().unwrap(); self.first_seq += 1; - Some(seq) + Some((seq, path_id)) } else { None } @@ -75,7 +172,11 @@ impl FileCollection { } #[inline] - fn logical_purge(&mut self, file_seq: FileSeq) -> (FileState, FileState) { + fn logical_purge( + &mut self, + file_seq: FileSeq, + ) -> (FileState, FileState, Vec<(FileSeq, DirPathId)>) { + let mut purged_files = Vec::<(FileSeq, DirPathId)>::default(); let prev = FileState { first_seq: self.first_seq, first_seq_in_use: self.first_seq_in_use, @@ -96,6 +197,10 @@ impl FileCollection { break; } } + purged_files.reserve(purged); + for i in 0..purged { + purged_files.push((i as u64 + self.first_seq, self.fds[i].path_id)); + } self.first_seq += purged as u64; self.first_seq_in_use = file_seq; self.fds.drain(..purged); @@ -105,7 +210,7 @@ impl FileCollection { first_seq_in_use: self.first_seq_in_use, total_len: self.fds.len(), }; - (prev, current) + (prev, current, purged_files) } } @@ -118,7 +223,6 @@ struct ActiveFile { /// A file-based log storage that arranges files as one single queue. pub(super) struct SinglePipe { queue: LogQueue, - dir: String, file_format: LogFileFormat, target_file_size: usize, file_system: Arc, @@ -130,6 +234,8 @@ pub(super) struct SinglePipe { /// `active_file` must be locked first to acquire both `files` and /// `active_file` active_file: CachePadded>>, + /// Manager of directory. + dir_mgr: Arc, } impl Drop for SinglePipe { @@ -146,7 +252,11 @@ impl Drop for SinglePipe { queue: self.queue, seq, }; - let path = file_id.build_file_path(&self.dir); + let dir = self + .dir_mgr + .get_dir(files.fds[(seq - files.first_seq) as usize].path_id); + debug_assert!(dir.is_some()); + let path = file_id.build_file_path(dir.unwrap()); if let Err(e) = self.file_system.delete(&path) { error!( "error while deleting stale file: {}, err_msg: {}", @@ -160,9 +270,11 @@ impl Drop for SinglePipe { impl SinglePipe { /// Opens a new [`SinglePipe`]. + #[allow(clippy::too_many_arguments)] pub fn open( cfg: &Config, file_system: Arc, + dir_mgr: Arc, listeners: Vec>, queue: LogQueue, mut first_seq: FileSeq, @@ -187,14 +299,24 @@ impl SinglePipe { let create_file = first_seq == 0; let active_seq = if create_file { first_seq = 1; + let (dir, path_id) = match dir_mgr.get_free_dir(cfg.target_file_size.0 as usize) { + Some((d, t)) => (d, t), + None => { + // No space for writing. + return Err(Error::Other(box_err!( + "no free space for recording new logs." + ))); + } + }; let file_id = FileId { queue, seq: first_seq, }; - let fd = Arc::new(file_system.create(&file_id.build_file_path(&cfg.dir))?); + let fd = Arc::new(file_system.create(&file_id.build_file_path(&dir))?); fds.push_back(FileWithFormat { handle: fd, format: LogFileFormat::new(cfg.format_version, alignment), + path_id, }); first_seq } else { @@ -221,7 +343,6 @@ impl SinglePipe { let total_files = fds.len(); let pipe = Self { queue, - dir: cfg.dir.clone(), file_format: LogFileFormat::new(cfg.format_version, alignment), target_file_size: cfg.target_file_size.0 as usize, file_system, @@ -234,6 +355,7 @@ impl SinglePipe { capacity, })), active_file: CachePadded::new(Mutex::new(active_file)), + dir_mgr, }; pipe.flush_metrics(total_files); Ok(pipe) @@ -241,10 +363,8 @@ impl SinglePipe { /// Synchronizes all metadatas associated with the working directory to the /// filesystem. - fn sync_dir(&self) -> Result<()> { - let path = PathBuf::from(&self.dir); - std::fs::File::open(path).and_then(|d| d.sync_all())?; - Ok(()) + fn sync_dir(&self, path_id: DirPathId) -> Result<()> { + self.dir_mgr.sync_dir(path_id) } /// Returns a shared [`LogFd`] for the specified file sequence number. @@ -277,26 +397,39 @@ impl SinglePipe { queue: self.queue, seq, }; - let path = file_id.build_file_path(&self.dir); - let fd = Arc::new(if let Some(seq) = self.files.write().recycle_one_file() { - let src_file_id = FileId { - queue: self.queue, - seq, - }; - let src_path = src_file_id.build_file_path(&self.dir); - let dst_path = file_id.build_file_path(&self.dir); - if let Err(e) = self.file_system.reuse(&src_path, &dst_path) { - error!("error while trying to reuse one expired file: {}", e); - if let Err(e) = self.file_system.delete(&src_path) { - error!("error while trying to delete one expired file: {}", e); + // Generate a new fd from a newly chosen file, might be reused from a stale + // file or generated from a newly created file. + let (fd, path_id) = { + let mut files = self.files.write(); + if let Some((seq, path_id)) = files.recycle_one_file() { + // Has stale files for recycling, the old file will be reused. + let dir = self.dir_mgr.get_dir(path_id).unwrap(); + let src_file_id = FileId { + queue: self.queue, + seq, + }; + let src_path = src_file_id.build_file_path(&dir); + let dst_path = file_id.build_file_path(&dir); + if let Err(e) = self.file_system.reuse(&src_path, &dst_path) { + error!("error while trying to reuse one expired file: {}", e); + if let Err(e) = self.file_system.delete(&src_path) { + error!("error while trying to delete one expired file: {}", e); + } + (Arc::new(self.file_system.create(&dst_path)?), path_id) + } else { + (Arc::new(self.file_system.open(&dst_path)?), path_id) } - self.file_system.create(&path)? + } else if let Some((d, t)) = self.dir_mgr.get_free_dir(self.target_file_size) { + // Has free space for newly writing, a new file is introduced. + let path = file_id.build_file_path(&d); + (Arc::new(self.file_system.create(&path)?), t) } else { - self.file_system.open(&path)? + // Neither has stale files nor has space for writing. + return Err(Error::Other(box_err!( + "no free space for recording new logs." + ))); } - } else { - self.file_system.create(&path)? - }); + }; let mut new_file = ActiveFile { seq, // The file might generated from a recycled stale-file, always reset the file @@ -312,7 +445,7 @@ impl SinglePipe { // File header must be persisted. This way we can recover gracefully if power // loss before a new entry is written. new_file.writer.sync()?; - self.sync_dir()?; + self.sync_dir(path_id)?; let version = new_file.format.version; let alignment = new_file.format.alignment; **active_file = new_file; @@ -320,6 +453,7 @@ impl SinglePipe { let state = self.files.write().push(FileWithFormat { handle: fd, format: LogFileFormat::new(version, alignment), + path_id, }); for listener in &self.listeners { listener.post_new_log_file(FileId { @@ -397,6 +531,40 @@ impl SinglePipe { seq, e, te ); } + // TODO: Refine the following judgement if the error type + // `ErrorKind::StorageFull` is stable. + let no_space_err = { + if_chain::if_chain! { + if let Error::Io(ref e) = e; + let err_msg = format!("{}", e.get_ref().unwrap()); + if err_msg.contains("nospace"); + then { + true + } else { + false + } + } + }; + let has_free_space = { + let files = self.files.read(); + files.first_seq < files.first_seq_in_use /* has stale files */ + || self.dir_mgr.get_free_dir(self.target_file_size).is_some() + }; + // If there still exists free space for this record, rotate the file + // and return a special Err (for retry) to the caller. + if no_space_err && has_free_space { + if let Err(e) = self.rotate_imp(&mut active_file) { + panic!( + "error when rotate [{:?}:{}]: {}", + self.queue, active_file.seq, e + ); + } + return Err(Error::Other(box_err!( + "failed to write {} file, get {} try to flush it to other dir", + seq, + e + ))); + } return Err(e); } let handle = FileBlockHandle { @@ -448,19 +616,26 @@ impl SinglePipe { /// /// Return the actual removed count of purged files. fn purge_to(&self, file_seq: FileSeq) -> Result { - let (prev, current) = self.files.write().logical_purge(file_seq); + let (prev, current, purged_files) = self.files.write().logical_purge(file_seq); if file_seq > prev.first_seq + prev.total_len as u64 - 1 { debug_assert_eq!(prev, current); return Err(box_err!("Purge active or newer files")); } else if prev == current { + debug_assert!(purged_files.is_empty()); return Ok(0); } - for seq in prev.first_seq..current.first_seq { + debug_assert_eq!( + purged_files.len() as u64, + current.first_seq - prev.first_seq + ); + for (seq, dir_type) in purged_files.iter() { let file_id = FileId { queue: self.queue, - seq, + seq: *seq, }; - let path = file_id.build_file_path(&self.dir); + let dir = self.dir_mgr.get_dir(*dir_type); + debug_assert!(dir.is_some()); + let path = file_id.build_file_path(dir.unwrap()); #[cfg(feature = "failpoints")] { let remove_skipped = || { @@ -481,25 +656,18 @@ impl SinglePipe { /// A [`PipeLog`] implementation that stores data in filesystem. pub struct DualPipes { pipes: [SinglePipe; 2], - - _dir_lock: File, } impl DualPipes { /// Open a new [`DualPipes`]. Assumes the two [`SinglePipe`]s share the /// same directory, and that directory is locked by `dir_lock`. - pub(super) fn open( - dir_lock: File, - appender: SinglePipe, - rewriter: SinglePipe, - ) -> Result { + pub(super) fn open(appender: SinglePipe, rewriter: SinglePipe) -> Result { // TODO: remove this dependency. debug_assert_eq!(LogQueue::Append as usize, 0); debug_assert_eq!(LogQueue::Rewrite as usize, 1); Ok(Self { pipes: [appender, rewriter], - _dir_lock: dir_lock, }) } @@ -565,10 +733,12 @@ mod tests { cfg: &Config, queue: LogQueue, fs: Arc, + dir_mgr: Arc, ) -> Result> { SinglePipe::open( cfg, fs, + dir_mgr, Vec::new(), queue, 0, @@ -581,10 +751,17 @@ mod tests { } fn new_test_pipes(cfg: &Config) -> Result> { + let mut dirs = DirectoryManager::default(); + dirs.add_dir(cfg.dir.clone(), lock_dir(&cfg.dir)?); + let dir_mgr = Arc::new(dirs); DualPipes::open( - lock_dir(&cfg.dir)?, - new_test_pipe(cfg, LogQueue::Append, Arc::new(DefaultFileSystem))?, - new_test_pipe(cfg, LogQueue::Rewrite, Arc::new(DefaultFileSystem))?, + new_test_pipe( + cfg, + LogQueue::Append, + Arc::new(DefaultFileSystem), + dir_mgr.clone(), + )?, + new_test_pipe(cfg, LogQueue::Rewrite, Arc::new(DefaultFileSystem), dir_mgr)?, ) } @@ -691,6 +868,7 @@ mod tests { .unwrap(), ), format: LogFileFormat::new(version, 0 /* alignment */), + path_id: DirPathId::Main, } } let dir = Builder::new() @@ -725,7 +903,7 @@ mod tests { // 12 13 | 14 files.logical_purge(14); // 13 | 14 - assert_eq!(files.recycle_one_file().unwrap(), 12); + assert_eq!(files.recycle_one_file().unwrap().0, 12); // 13 | 14 15 files.push(new_file_handler( path, @@ -748,7 +926,7 @@ mod tests { // 16 17 18 | 19 20 files.logical_purge(19); // 17 18 | 19 20 - assert_eq!(files.recycle_one_file().unwrap(), 16); + assert_eq!(files.recycle_one_file().unwrap().0, 16); } #[test] @@ -769,7 +947,9 @@ mod tests { }; let queue = LogQueue::Append; let fs = Arc::new(ObfuscatedFileSystem::default()); - let pipe_log = new_test_pipe(&cfg, queue, fs.clone()).unwrap(); + let mut dir_mgr = DirectoryManager::default(); + dir_mgr.add_dir(cfg.dir.clone(), lock_dir(&cfg.dir).unwrap()); + let pipe_log = new_test_pipe(&cfg, queue, fs.clone(), Arc::new(dir_mgr)).unwrap(); assert_eq!(pipe_log.file_span(), (1, 1)); fn content(i: usize) -> Vec { @@ -807,4 +987,30 @@ mod tests { assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1)); } } + + #[test] + fn test_directory_manager() { + let dir = Builder::new() + .prefix("test_directory_manager_main_dir") + .tempdir() + .unwrap(); + let secondary_dir = Builder::new() + .prefix("test_directory_manager_sec_dir") + .tempdir() + .unwrap(); + let path = dir.path().to_str().unwrap(); + let sec_path = secondary_dir.path().to_str().unwrap(); + { + // Test DirectoryManager with main dir only. + let dir_mgr = DirectoryManager::new(path.to_owned(), None); + assert_eq!(dir_mgr.get_dir(DirPathId::Main).unwrap(), path); + assert!(dir_mgr.get_dir(DirPathId::Secondary).is_none()); + } + { + // Test DirectoryManager both with main dir and secondary dir. + let dir_mgr = DirectoryManager::new(path.to_owned(), Some(sec_path.to_owned())); + assert_eq!(dir_mgr.get_dir(DirPathId::Main).unwrap(), path); + assert_eq!(dir_mgr.get_dir(DirPathId::Secondary).unwrap(), sec_path); + } + } } diff --git a/src/file_pipe_log/pipe_builder.rs b/src/file_pipe_log/pipe_builder.rs index 8272dbe5..bbc03a23 100644 --- a/src/file_pipe_log/pipe_builder.rs +++ b/src/file_pipe_log/pipe_builder.rs @@ -9,7 +9,7 @@ use std::path::Path; use std::sync::Arc; use fs2::FileExt; -use log::{error, info, warn}; +use log::{error, warn}; use rayon::prelude::*; use crate::config::{Config, RecoveryMode}; @@ -22,7 +22,7 @@ use crate::{Error, Result}; use super::format::{lock_file_path, FileNameExt, LogFileFormat}; use super::log_file::build_file_reader; -use super::pipe::{DualPipes, FileWithFormat, SinglePipe}; +use super::pipe::{DirPathId, DirectoryManager, DualPipes, FileWithFormat, SinglePipe}; use super::reader::LogItemBatchFileReader; use crate::env::Handle; @@ -67,6 +67,7 @@ struct FileToRecover { seq: FileSeq, handle: Arc, format: Option, + path_id: DirPathId, } /// [`DualPipes`] factory that can also recover other customized memory states. @@ -76,7 +77,7 @@ pub struct DualPipesBuilder { listeners: Vec>, /// Only filled after a successful call of `DualPipesBuilder::scan`. - dir_lock: Option, + dir_mgr: Option>, /// Only filled after a successful call of `DualPipesBuilder::scan`. append_files: Vec>, /// Only filled after a successful call of `DualPipesBuilder::scan`. @@ -90,31 +91,108 @@ impl DualPipesBuilder { cfg, file_system, listeners, - dir_lock: None, + dir_mgr: None, append_files: Vec::new(), rewrite_files: Vec::new(), } } - /// Scans for all log files under the working directory. The directory will - /// be created if not exists. + /// Scans for all log files under the working directory. pub fn scan(&mut self) -> Result<()> { - let dir = &self.cfg.dir; - let path = Path::new(dir); - if !path.exists() { - info!("Create raft log directory: {}", dir); - fs::create_dir(dir)?; - self.dir_lock = Some(lock_dir(dir)?); - return Ok(()); + // Scan main `dir` and `secondary-dir`, if `secondary-dir` is valid. + let mut dir_mgr = DirectoryManager::default(); + dir_mgr.add_dir(self.cfg.dir.clone(), lock_dir(&self.cfg.dir)?); + DualPipesBuilder::::scan_dir_imp( + &self.file_system, + DirPathId::Main, + &self.cfg.dir, + &mut self.append_files, + &mut self.rewrite_files, + )?; + if let Some(secondary_dir) = self.cfg.secondary_dir.as_ref() { + dir_mgr.add_dir(secondary_dir.clone(), lock_dir(secondary_dir)?); + DualPipesBuilder::::scan_dir_imp( + &self.file_system, + DirPathId::Secondary, + secondary_dir, + &mut self.append_files, + &mut self.rewrite_files, + )?; } - if !path.is_dir() { - return Err(box_err!("Not directory: {}", dir)); + // Sorts the expected `file_list` according to `file_seq`. + self.append_files.sort_by(|a, b| a.seq.cmp(&b.seq)); + self.rewrite_files.sort_by(|a, b| a.seq.cmp(&b.seq)); + // Validate rewrite & append `file_list` individually, and clear stale metadata. + for (queue, files) in [ + (LogQueue::Append, &mut self.append_files), + (LogQueue::Rewrite, &mut self.rewrite_files), + ] { + if files.is_empty() { + continue; + } + // Check the file_list and remove the hole of files. + let mut current_seq = files[0].seq; + let mut invalid_files = 0_usize; + debug_assert!(current_seq > 0); + for (i, f) in files.iter().enumerate() { + if f.seq > current_seq + (i - invalid_files) as u64 { + warn!( + "Detected a hole when scanning directory, discarding files before file_seq {}.", + f.seq, + ); + current_seq = f.seq + 1; + invalid_files = i; + } else if f.seq < current_seq { + return Err(Error::InvalidArgument("Duplicate file".to_string())); + } + } + files.drain(..invalid_files); + // Try to cleanup stale metadata left by the previous version. + if files.is_empty() { + continue; + } + let mut cleared = 0_u64; + let clear_start: u64 = { + // TODO: Need a more efficient way to remove sparse stale metadata, + // without iterating one by one. + 1 + }; + for seq in (clear_start..files[0].seq).rev() { + let file_id = FileId { queue, seq }; + for dir in dir_mgr.get_all_dir().iter() { + let path = file_id.build_file_path(dir); + if self.file_system.exists_metadata(&path) { + if let Err(e) = self.file_system.delete_metadata(&path) { + error!("failed to delete metadata of {}: {}.", path.display(), e); + break; + } + cleared += 1; + } + } + } + if cleared > 0 { + warn!( + "clear {} stale files of {:?} in range [{}, {}).", + cleared, queue, 0, files[0].seq, + ); + } } - self.dir_lock = Some(lock_dir(dir)?); + self.dir_mgr = Some(Arc::new(dir_mgr)); + Ok(()) + } - let (mut min_append_id, mut max_append_id) = (u64::MAX, 0); - let (mut min_rewrite_id, mut max_rewrite_id) = (u64::MAX, 0); - fs::read_dir(path)?.for_each(|e| { + /// Scans and parses all log files under the given directory. + /// + /// Returns the valid file count + fn scan_dir_imp( + file_system: &F, + path_id: DirPathId, + dir: &str, + append_files: &mut Vec>, + rewrite_files: &mut Vec>, + ) -> Result { + let mut valid_file_count: usize = 0; + fs::read_dir(Path::new(dir))?.try_for_each(|e| -> Result<()> { if let Ok(e) = e { let p = e.path(); if p.is_file() { @@ -123,88 +201,43 @@ impl DualPipesBuilder { queue: LogQueue::Append, seq, }) => { - min_append_id = std::cmp::min(min_append_id, seq); - max_append_id = std::cmp::max(max_append_id, seq); + let file_id = FileId { + queue: LogQueue::Append, + seq, + }; + let path = file_id.build_file_path(dir); + append_files.push(FileToRecover { + seq, + handle: Arc::new(file_system.open(&path)?), + format: None, + path_id, + }); + valid_file_count += 1; } Some(FileId { queue: LogQueue::Rewrite, seq, }) => { - min_rewrite_id = std::cmp::min(min_rewrite_id, seq); - max_rewrite_id = std::cmp::max(max_rewrite_id, seq); + let file_id = FileId { + queue: LogQueue::Rewrite, + seq, + }; + let path = file_id.build_file_path(dir); + rewrite_files.push(FileToRecover { + seq, + handle: Arc::new(file_system.open(&path)?), + format: None, + path_id, + }); + valid_file_count += 1; } _ => {} } } } - }); - - for (queue, min_id, max_id, files) in [ - ( - LogQueue::Append, - min_append_id, - max_append_id, - &mut self.append_files, - ), - ( - LogQueue::Rewrite, - min_rewrite_id, - max_rewrite_id, - &mut self.rewrite_files, - ), - ] { - if max_id > 0 { - // Try to cleanup stale metadata left by the previous version. - let max_sample = 100; - // Find the first obsolete metadata. - let mut delete_start = None; - for i in 0..max_sample { - let seq = i * min_id / max_sample; - let file_id = FileId { queue, seq }; - let path = file_id.build_file_path(dir); - if self.file_system.exists_metadata(&path) { - delete_start = Some(i.saturating_sub(1) * min_id / max_sample + 1); - break; - } - } - // Delete metadata starting from the oldest. Abort on error. - if let Some(start) = delete_start { - let mut success = 0; - for seq in start..min_id { - let file_id = FileId { queue, seq }; - let path = file_id.build_file_path(dir); - if let Err(e) = self.file_system.delete_metadata(&path) { - error!("failed to delete metadata of {}: {}.", path.display(), e); - break; - } - success += 1; - } - warn!( - "deleted {} stale files of {:?} in range [{}, {}).", - success, queue, start, min_id, - ); - } - for seq in min_id..=max_id { - let file_id = FileId { queue, seq }; - let path = file_id.build_file_path(dir); - if !path.exists() { - warn!( - "Detected a hole when scanning directory, discarding files before {:?}.", - file_id, - ); - files.clear(); - } else { - let handle = Arc::new(self.file_system.open(&path)?); - files.push(FileToRecover { - seq, - handle, - format: None, - }); - } - } - } - } - Ok(()) + Ok(()) + })?; + Ok(valid_file_count) } /// Reads through log items in all available log files, and replays them to @@ -408,11 +441,13 @@ impl DualPipesBuilder { .map(|f| FileWithFormat { handle: f.handle.clone(), format: f.format.unwrap(), + path_id: f.path_id, }) .collect(); SinglePipe::open( &self.cfg, self.file_system.clone(), + self.dir_mgr.clone().unwrap(), self.listeners.clone(), queue, first_seq, @@ -428,7 +463,7 @@ impl DualPipesBuilder { pub fn finish(self) -> Result> { let appender = self.build_pipe(LogQueue::Append)?; let rewriter = self.build_pipe(LogQueue::Rewrite)?; - DualPipes::open(self.dir_lock.unwrap(), appender, rewriter) + DualPipes::open(appender, rewriter) } } diff --git a/src/log_batch.rs b/src/log_batch.rs index 714120cd..8010eec0 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -409,28 +409,6 @@ impl LogItemBatch { } } - /// Prepare the `write` by signing a checksum, so-called `signature`, - /// into the `LogBatch`. - /// - /// The `signature` is both generated by the given `LogFileContext`. - /// That is, the final checksum of each `LogBatch` consists of this - /// `signature` and the original `checksum` of the contents. - pub(crate) fn prepare_write(buf: &mut Vec, file_context: &LogFileContext) -> Result<()> { - if !buf.is_empty() { - if let Some(signature) = file_context.get_signature() { - // Insert the signature into the encoded bytes. Rewrite checksum of - // `LogItemBatch` in `LogBatch`. - let footer_checksum_offset = buf.len() - LOG_BATCH_CHECKSUM_LEN; - let original_checksum = codec::decode_u32_le(&mut &buf[footer_checksum_offset..])?; - // The final checksum is generated by `signature` ***XOR*** - // `original checksum of buf`. - (&mut buf[footer_checksum_offset..]) - .write_u32::(original_checksum ^ signature)?; - } - } - Ok(()) - } - pub(crate) fn finish_write(&mut self, handle: FileBlockHandle) { for item in self.items.iter_mut() { match &mut item.content { @@ -551,10 +529,10 @@ enum BufState { /// state only briefly exists between encoding and writing, user operation /// will panic under this state. /// # Content - /// (header_offset, entries_len) + /// (header_offset, entries_len, signature) /// # Invariants /// LOG_BATCH_HEADER_LEN <= buf.len() - Sealed(usize, usize), + Sealed(usize, usize, Option), /// Buffer is undergoing writes. User operation will panic under this state. Incomplete, } @@ -727,6 +705,9 @@ impl LogBatch { /// compression type to each entry index. pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result { let _t = StopWatch::new(perf_context!(log_populating_duration)); + if let BufState::Encoded(header_offset, _) = self.buf_state { + return Ok(self.buf.len() - header_offset); + } debug_assert!(self.buf_state == BufState::Open); if self.is_empty() { self.buf_state = BufState::Encoded(self.buf.len(), 0); @@ -779,13 +760,35 @@ impl LogBatch { Ok(self.buf.len() - header_offset) } - /// Make preparations for the write of `LogBatch`. + /// Makes preparations for the write of `LogBatch`. + /// + /// Internally rewrites the checksum of each `LogBatch` by a signature, + /// generated by the given `LogFileContext`. That is, the final checksum + /// of each `LogBatch` consists of this `signature` and the original + /// `checksum` of the contents. #[inline] pub(crate) fn prepare_write(&mut self, file_context: &LogFileContext) -> Result<()> { + let new_signature = file_context.get_signature(); match self.buf_state { BufState::Encoded(header_offset, entries_len) => { - LogItemBatch::prepare_write(&mut self.buf, file_context)?; - self.buf_state = BufState::Sealed(header_offset, entries_len); + sign_checksum(&mut self.buf, new_signature)?; + self.buf_state = BufState::Sealed(header_offset, entries_len, new_signature); + } + BufState::Sealed(header_offset, entries_len, old_signature) => { + // Re-seal the LogBatch. + match (old_signature, new_signature) { + (Some(old), Some(new)) => { + sign_checksum(&mut self.buf, Some(old ^ new))?; + } + (Some(old), None) => { + sign_checksum(&mut self.buf, Some(old))?; + } + (None, Some(new)) => { + sign_checksum(&mut self.buf, Some(new))?; + } + _ => {} + } + self.buf_state = BufState::Sealed(header_offset, entries_len, new_signature); } _ => unreachable!(), } @@ -796,7 +799,7 @@ impl LogBatch { /// Assumes called after a successful call of [`prepare_write`]. pub(crate) fn encoded_bytes(&self) -> &[u8] { match self.buf_state { - BufState::Sealed(header_offset, _) => &self.buf[header_offset..], + BufState::Sealed(header_offset, _, _) => &self.buf[header_offset..], _ => unreachable!(), } } @@ -805,12 +808,12 @@ impl LogBatch { /// /// Internally sets the file locations of each log entry indexes. pub(crate) fn finish_write(&mut self, mut handle: FileBlockHandle) { - debug_assert!(matches!(self.buf_state, BufState::Sealed(_, _))); + debug_assert!(matches!(self.buf_state, BufState::Sealed(_, _, _))); if !self.is_empty() { // adjust log batch handle to log entries handle. handle.offset += LOG_BATCH_HEADER_LEN as u64; match self.buf_state { - BufState::Sealed(_, entries_len) => { + BufState::Sealed(_, entries_len, _) => { debug_assert!(LOG_BATCH_HEADER_LEN + entries_len < handle.len as usize); handle.len = entries_len; } @@ -841,7 +844,7 @@ impl LogBatch { self.buf.len() + LOG_BATCH_CHECKSUM_LEN + self.item_batch.approximate_size() } BufState::Encoded(header_offset, _) => self.buf.len() - header_offset, - BufState::Sealed(header_offset, _) => self.buf.len() - header_offset, + BufState::Sealed(header_offset, _, _) => self.buf.len() - header_offset, s => { error!("querying incomplete log batch with state {:?}", s); 0 @@ -909,6 +912,23 @@ impl ReactiveBytes for LogBatch { } } +/// Signs a checksum, so-called `signature`, into the `LogBatch`. +fn sign_checksum(buf: &mut Vec, signature: Option) -> Result<()> { + if !buf.is_empty() { + if let Some(signature) = signature { + // Insert the signature into the encoded bytes. Rewrite checksum of + // `LogItemBatch` in `LogBatch`. + let footer_checksum_offset = buf.len() - LOG_BATCH_CHECKSUM_LEN; + let original_checksum = codec::decode_u32_le(&mut &buf[footer_checksum_offset..])?; + // The final checksum is generated by `signature` ***XOR*** + // `original checksum of buf`. + (&mut buf[footer_checksum_offset..]) + .write_u32::(original_checksum ^ signature)?; + } + } + Ok(()) +} + /// Verifies the checksum of a slice of bytes that sequentially holds data and /// checksum. The checksum field may be signed by XOR-ing with an u32. fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result<()> { diff --git a/src/util.rs b/src/util.rs index 05bb15e6..83a6a0cc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -325,6 +325,36 @@ pub trait Factory: Send + Sync { fn new_target(&self) -> Target; } +pub mod dev_ext { + use std::io::Result as IoResult; + use std::path::Path; + + use fail::fail_point; + + /// Judges whether `dir1` and `dir2` reside on same device. + pub(crate) fn on_same_dev>(dir1: P, dir2: P) -> IoResult { + fail_point!("env::force_on_different_dev", |_| { Ok(false) }); + #[cfg(any(target_os = "unix", target_os = "macos"))] + { + use std::os::unix::fs::MetadataExt; + let meta1 = std::fs::metadata(dir1.as_ref())?; + let meta2 = std::fs::metadata(dir2.as_ref())?; + Ok(meta1.dev() == meta2.dev()) + } + #[cfg(target_os = "linux")] + { + use std::os::linux::fs::MetadataExt; + let meta1 = std::fs::metadata(dir1.as_ref())?; + let meta2 = std::fs::metadata(dir2.as_ref())?; + Ok(meta1.st_dev() == meta2.st_dev()) + } + #[cfg(not(any(target_os = "linux", target_os = "unix", target_os = "macos")))] + { + Err(Error::Other(box_err!("Unrecognized file system"))) + } + } +} + /// Returns an aligned `offset`. /// /// # Example: diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index 7a51d717..e0a94d01 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -1,5 +1,6 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use std::fs; use std::sync::{Arc, Barrier}; use std::time::Duration; @@ -724,3 +725,201 @@ fn test_build_engine_with_datalayout_abnormal() { Engine::open(cfg).unwrap(); } } + +#[test] +fn test_build_engine_with_multi_dir() { + let dir = tempfile::Builder::new() + .prefix("test_build_engine_with_multi_dir_1") + .tempdir() + .unwrap(); + let secondary_dir = tempfile::Builder::new() + .prefix("test_build_engine_with_multi_dir_2") + .tempdir() + .unwrap(); + let data = vec![b'x'; 1024]; + let rid = 1; + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + secondary_dir: Some(secondary_dir.path().to_str().unwrap().to_owned()), + target_file_size: ReadableSize::kb(2), + purge_threshold: ReadableSize::kb(4), + ..Default::default() + }; + let mut start_seq: u64; + { + // Set config with abnormal settings => same prefix + let abnormal_dir = format!("{}/abnormal", dir.path().to_str().unwrap().to_owned()); + let abnormal_sec_dir = format!("{}/testing", abnormal_dir); + let cfg_err = Config { + dir: abnormal_dir, + secondary_dir: Some(abnormal_sec_dir), + ..cfg + }; + let engine = Engine::open(cfg_err).unwrap(); + append(&engine, rid, 1, 2, Some(&data)); // file_seq: 1 + append(&engine, rid, 2, 3, Some(&data)); + append(&engine, rid, 3, 4, Some(&data)); // file_seq: 2 + append(&engine, rid, 4, 5, Some(&data)); + append(&engine, rid, 5, 6, Some(&data)); // file_seq: 3 + start_seq = engine.file_span(LogQueue::Append).0; + assert_eq!( + 5, + engine + .fetch_entries_to::( + rid, /* region */ + 1, /* begin */ + 6, /* end */ + None, /* max_size */ + &mut vec![], + ) + .unwrap() + ); + engine.compact_to(rid, 3); + engine.purge_expired_files().unwrap(); + assert!(engine.file_span(LogQueue::Append).0 > start_seq); + assert_eq!( + 2, + engine + .fetch_entries_to::(rid, 4, 6, None, &mut vec![],) + .unwrap() + ); + } + { + // Set config with abnormal settings => same device + let cfg_err = Config { + secondary_dir: Some("./abnormal_testing".to_owned()), + ..cfg.clone() + }; + let engine = Engine::open(cfg_err.clone()).unwrap(); + append(&engine, rid, 1, 2, Some(&data)); // file_seq: 1 + append(&engine, rid, 2, 3, Some(&data)); + append(&engine, rid, 3, 4, Some(&data)); // file_seq: 2 + append(&engine, rid, 4, 5, Some(&data)); + append(&engine, rid, 5, 6, Some(&data)); // file_seq: 3 + start_seq = engine.file_span(LogQueue::Append).0; + assert_eq!( + 5, + engine + .fetch_entries_to::(rid, 1, 6, None, &mut vec![],) + .unwrap() + ); + engine.compact_to(rid, 3); + engine.purge_expired_files().unwrap(); + assert!(engine.file_span(LogQueue::Append).0 > start_seq); + start_seq = engine.file_span(LogQueue::Append).0; + assert!(fs::remove_dir_all(&cfg_err.secondary_dir.unwrap()).is_ok()); + } + // Open engine with multi directories, main dir and secondary dir. + { + // (1) Write to main dir + let engine = Engine::open(cfg.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append).0, start_seq); + append(&engine, rid, 6, 7, Some(&data)); + append(&engine, rid, 7, 8, Some(&data)); // file_seq: 4 + append(&engine, rid, 8, 9, Some(&data)); + append(&engine, rid, 9, 10, Some(&data)); // file_seq: 5 + assert_eq!( + 6, + engine + .fetch_entries_to::(rid, 4, 10, None, &mut vec![],) + .unwrap() + ); + engine.compact_to(rid, 8); + engine.purge_expired_files().unwrap(); + assert!(engine.file_span(LogQueue::Append).0 > start_seq); + start_seq = engine.file_span(LogQueue::Append).0; + } + { + // (2) Write to secondary dir + let _f1 = FailGuard::new("env::force_on_different_dev", "return"); + let _f2 = FailGuard::new("file_pipe_log::force_use_secondary_dir", "return"); + let engine = Engine::open(cfg.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append).0, start_seq); + append(&engine, rid, 10, 11, Some(&data)); + append(&engine, rid, 11, 12, Some(&data)); + append(&engine, rid, 12, 13, Some(&data)); + assert_eq!( + 4, + engine + .fetch_entries_to::(rid, 9, 13, None, &mut vec![],) + .unwrap() + ); + engine.compact_to(rid, 11); + engine.purge_expired_files().unwrap(); + assert!(engine.file_span(LogQueue::Append).0 > start_seq); + start_seq = engine.file_span(LogQueue::Append).0; + drop(engine); + let engine = Engine::open(cfg.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append).0, start_seq); + append(&engine, rid, 13, 14, Some(&data)); + append(&engine, rid, 14, 15, Some(&data)); + assert_eq!( + 3, + engine + .fetch_entries_to::(rid, 12, 15, None, &mut vec![],) + .unwrap() + ); + } + { + // (3) Back to main dir + let _f1 = FailGuard::new("env::force_on_different_dev", "return"); + let engine = Engine::open(cfg.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append).0, start_seq); + append(&engine, rid, 15, 16, Some(&data)); + append(&engine, rid, 16, 17, Some(&data)); + append(&engine, rid, 17, 18, Some(&data)); + append(&engine, rid, 18, 19, Some(&data)); + assert_eq!( + 5, + engine + .fetch_entries_to::(rid, 12, 17, None, &mut vec![],) + .unwrap() + ); + let before = engine.file_span(LogQueue::Append); + drop(engine); + let engine = Engine::open(cfg.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append), before); + } + { + // (4) Open recycling logs feature. + let _f1 = FailGuard::new("env::force_on_different_dev", "return"); + let cfg_rec = Config { + format_version: Version::V2, + enable_log_recycle: true, + target_file_size: ReadableSize(1), + purge_threshold: ReadableSize(3), + ..cfg + }; + let engine = Engine::open(cfg_rec.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append).0, start_seq); + engine.compact_to(rid, 18); + engine.purge_expired_files().unwrap(); + assert!(engine.file_span(LogQueue::Append).0 > start_seq); + append(&engine, rid, 19, 20, Some(&data)); + append(&engine, rid, 20, 21, Some(&data)); + append(&engine, rid, 21, 22, Some(&data)); + let before = engine.file_span(LogQueue::Append); + drop(engine); + // recycling stale files by compaction + let engine = Engine::open(cfg_rec.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append), before); + engine.compact_to(rid, 20); + engine.purge_expired_files().unwrap(); + append(&engine, rid, 22, 23, Some(&data)); // reuse + append(&engine, rid, 23, 24, Some(&data)); // reuse + start_seq = engine.file_span(LogQueue::Append).0; + drop(engine); + let engine = Engine::open(cfg_rec.clone()).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append).0, start_seq); + // append records to secondary dir -> recycle -> reopen() + let _f = FailGuard::new("file_pipe_log::force_use_secondary_dir", "return"); + append(&engine, rid, 24, 25, Some(&data)); + append(&engine, rid, 25, 26, Some(&data)); + engine.compact_to(rid, 21); + engine.purge_expired_files().unwrap(); + start_seq = engine.file_span(LogQueue::Append).0; + drop(engine); + let engine = Engine::open(cfg_rec).unwrap(); + assert_eq!(engine.file_span(LogQueue::Append).0, start_seq); + } +} diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index 52302327..5e8c5701 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -287,6 +287,163 @@ fn test_concurrent_write_error() { ); } +#[test] +fn test_no_space_write_error() { + let mut cfg_list = [ + Config { + target_file_size: ReadableSize::kb(2), + recovery_mode: RecoveryMode::AbsoluteConsistency, + format_version: Version::V1, + enable_log_recycle: false, + ..Default::default() + }, + Config { + target_file_size: ReadableSize::kb(2), + recovery_mode: RecoveryMode::AbsoluteConsistency, + format_version: Version::V2, + enable_log_recycle: true, + ..Default::default() + }, + ]; + let entry = vec![b'x'; 1024]; + for cfg in cfg_list.iter_mut() { + let dir = tempfile::Builder::new() + .prefix("test_no_space_write_error") + .tempdir() + .unwrap(); + cfg.dir = dir.path().to_str().unwrap().to_owned(); + { + // If disk is full, a new Engine cannot be opened. + let _f = FailGuard::new("file_pipe_log::force_no_free_space", "return"); + assert!(Engine::open(cfg.clone()).is_err()); + } + { + // If disk is full after writing, the old engine should be available + // for `read`. + let engine = Engine::open(cfg.clone()).unwrap(); + engine + .write(&mut generate_batch(1, 11, 21, Some(&entry)), true) + .unwrap(); + drop(engine); + let _f = FailGuard::new("file_pipe_log::force_no_free_space", "return"); + let engine = Engine::open(cfg.clone()).unwrap(); + assert_eq!( + 10, + engine + .fetch_entries_to::(1, 11, 21, None, &mut vec![]) + .unwrap() + ); + } + { + // `Write` is abnormal for no space left, Engine should panic at `rotate`. + let _f = FailGuard::new("log_fd::write::no_space_err", "return"); + let cfg_err = Config { + target_file_size: ReadableSize(1), + ..cfg.clone() + }; + let engine = Engine::open(cfg_err).unwrap(); + assert!(catch_unwind_silent(|| { + engine + .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) + .unwrap_err(); + }) + .is_err()); + } + { + // Disk goes from `spare(nospace err)` -> `full` -> `spare`. + let _f1 = FailGuard::new("file_pipe_log::force_no_free_space", "1*off->1*return->off"); + let _f2 = FailGuard::new("log_fd::write::no_space_err", "1*return->off"); + let engine = Engine::open(cfg.clone()).unwrap(); + assert!(catch_unwind_silent(|| { + engine + .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) + .unwrap_err(); + }) + .is_err()); + engine + .write(&mut generate_batch(3, 11, 21, Some(&entry)), true) + .unwrap(); + assert_eq!( + 0, + engine + .fetch_entries_to::(2, 11, 21, None, &mut vec![]) + .unwrap() + ); + assert_eq!( + 10, + engine + .fetch_entries_to::(3, 11, 21, None, &mut vec![]) + .unwrap() + ); + } + { + // Disk is `full` -> `spare`, the first `write` operation should failed. + let _f1 = FailGuard::new("file_pipe_log::force_no_free_space", "1*return->off"); + let _f2 = FailGuard::new("log_fd::write::no_space_err", "1*return->off"); + let engine = Engine::open(cfg.clone()).unwrap(); + engine + .write(&mut generate_batch(4, 11, 21, Some(&entry)), true) + .unwrap_err(); + engine + .write(&mut generate_batch(4, 11, 21, Some(&entry)), true) + .unwrap(); + assert_eq!( + 0, + engine + .fetch_entries_to::(2, 11, 21, None, &mut vec![]) + .unwrap() + ); + assert_eq!( + 10, + engine + .fetch_entries_to::(4, 11, 21, None, &mut vec![]) + .unwrap() + ); + } + { + // Disk goes from `spare(nospace err)` -> `spare(another dir has enough space)`. + let _f = FailGuard::new("log_fd::write::no_space_err", "1*return->off"); + let engine = Engine::open(cfg.clone()).unwrap(); + engine + .write(&mut generate_batch(5, 11, 21, Some(&entry)), true) + .unwrap(); + engine + .write(&mut generate_batch(6, 11, 21, Some(&entry)), true) + .unwrap(); + assert_eq!( + 10, + engine + .fetch_entries_to::(5, 11, 21, None, &mut vec![]) + .unwrap() + ); + assert_eq!( + 10, + engine + .fetch_entries_to::(6, 11, 21, None, &mut vec![]) + .unwrap() + ); + } + { + // Disk goes into endless `spare(nospace err)`, engine do panic for multi- + // retrying. + let _f = FailGuard::new( + "log_fd::write::no_space_err", + "1*return->1*off->1*return->1*off", + ); + let engine = Engine::open(cfg.clone()).unwrap(); + assert!(engine + .write(&mut generate_batch(7, 11, 21, Some(&entry)), true) + .is_err()); + assert_eq!( + 0, + engine + .fetch_entries_to::(7, 11, 21, None, &mut vec![]) + .unwrap() + ); + } + } +} + #[test] fn test_non_atomic_write_error() { let dir = tempfile::Builder::new()