diff --git a/src/engine.rs b/src/engine.rs index 0d055296..ef2f9bef 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -172,9 +172,9 @@ where } perf_context!(log_write_duration).observe_since(now); if sync { - // As per trait protocol, this error should be retriable. But we panic anyway to + // As per trait protocol, sync error should be retriable. But we panic anyway to // save the trouble of propagating it to other group members. - self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()"); + self.pipe_log.sync(LogQueue::Append); } // Pass the perf context diff to all the writers. let diff = get_perf_context(); @@ -2446,7 +2446,7 @@ pub(crate) mod tests { builder.begin(&mut log_batch); log_batch.put(rid, key.clone(), value.clone()).unwrap(); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + engine.pipe_log.rotate(LogQueue::Rewrite); } { // begin - unrelated - end. @@ -2466,7 +2466,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + engine.pipe_log.rotate(LogQueue::Rewrite); } { // begin - middle - middle - end. @@ -2491,7 +2491,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + engine.pipe_log.rotate(LogQueue::Rewrite); } { // begin - begin - end. @@ -2511,7 +2511,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + engine.pipe_log.rotate(LogQueue::Rewrite); } { // end - middle - end. @@ -2533,7 +2533,7 @@ pub(crate) mod tests { rid += 1; log_batch.put(rid, key.clone(), value.clone()).unwrap(); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + engine.pipe_log.rotate(LogQueue::Rewrite); } { // end - begin - end @@ -2554,7 +2554,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + engine.pipe_log.rotate(LogQueue::Rewrite); } { // begin - end - begin - end. @@ -2574,9 +2574,9 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + engine.pipe_log.rotate(LogQueue::Rewrite); } - engine.pipe_log.sync(LogQueue::Rewrite).unwrap(); + engine.pipe_log.sync(LogQueue::Rewrite); let engine = engine.reopen(); for rid in engine.raft_groups() { diff --git a/src/errors.rs b/src/errors.rs index 15a3ed45..fe35e6d8 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -39,10 +39,3 @@ pub(crate) fn is_no_space_err(e: &IoError) -> bool { // `ErrorKind::StorageFull` is stable. format!("{e}").contains("nospace") } - -/// Check whether the given error is a nospace error. -pub(crate) fn is_io_no_space_err(e: &Error) -> bool { - // TODO: make the following judgement more elegant when the error type - // `ErrorKind::StorageFull` is stable. - format!("{e}").contains("nospace") -} diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index f3b70620..135eced9 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -74,21 +74,21 @@ impl LogFileWriter { self.write(&buf, 0) } - pub fn close(&mut self) -> IoResult<()> { + pub fn close(&mut self) { // Necessary to truncate extra zeros from fallocate(). - self.truncate()?; - self.sync() + self.truncate(); + self.sync(); } - pub fn truncate(&mut self) -> IoResult<()> { + pub fn truncate(&mut self) { if self.written < self.capacity { fail_point!("file_pipe_log::log_file_writer::skip_truncate", |_| { Ok(()) }); - self.writer.truncate(self.written)?; + // Panic if truncate fails, in case of data loss. + self.writer.truncate(self.written).unwrap(); self.capacity = self.written; } - Ok(()) } pub fn write(&mut self, buf: &[u8], target_size_hint: usize) -> IoResult<()> { @@ -117,10 +117,10 @@ impl LogFileWriter { Ok(()) } - pub fn sync(&mut self) -> IoResult<()> { + pub fn sync(&mut self) { let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM); - self.handle.sync()?; - Ok(()) + // Panic if sync fails, in case of data loss. + self.handle.sync().unwrap(); } #[inline] diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index 64042e01..b322e8a9 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -231,7 +231,7 @@ pub mod debug { len, }); } - writer.close().unwrap(); + writer.close(); // Read and verify. let mut reader = LogItemReader::new_file_reader(file_system.clone(), &file_path).unwrap(); @@ -280,7 +280,7 @@ pub mod debug { true, /* create */ ) .unwrap(); - writer.close().unwrap(); + writer.close(); assert!(LogItemReader::new_file_reader(file_system.clone(), dir.path()).is_err()); assert!( @@ -330,7 +330,7 @@ pub mod debug { .unwrap(); let f = std::fs::OpenOptions::new().write(true).open(&path).unwrap(); let len = writer.offset(); - writer.close().unwrap(); + writer.close(); if shorter { f.set_len(len as u64 - 1).unwrap(); } @@ -341,7 +341,7 @@ pub mod debug { false, /* create */ ) .unwrap(); - writer.close().unwrap(); + writer.close(); let mut reader = build_file_reader(file_system.as_ref(), &path).unwrap(); assert_eq!(reader.parse_format().unwrap(), to); std::fs::remove_file(&path).unwrap(); diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index e54f1b83..0e9e2966 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -12,7 +12,7 @@ use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::config::Config; use crate::env::{FileSystem, Permission}; -use crate::errors::{is_io_no_space_err, is_no_space_err}; +use crate::errors::is_no_space_err; use crate::event_listener::EventListener; use crate::metrics::*; use crate::pipe_log::{ @@ -72,9 +72,7 @@ pub(super) struct SinglePipe { impl Drop for SinglePipe { fn drop(&mut self) { let mut writable_file = self.writable_file.lock(); - if let Err(e) = writable_file.writer.close() { - error!("error while closing the active writer: {e}"); - } + writable_file.writer.close(); let mut recycled_files = self.recycled_files.write(); let mut next_reserved_seq = recycled_files .iter() @@ -240,7 +238,7 @@ impl SinglePipe { /// Creates a new file for write, and rotates the active log file. /// /// This operation is atomic in face of errors. - fn rotate_imp(&self, writable_file: &mut MutexGuard>) -> Result<()> { + fn rotate_imp(&self, writable_file: &mut MutexGuard>) { let _t = StopWatch::new(( &*LOG_ROTATE_DURATION_HISTOGRAM, perf_context!(log_rotate_duration), @@ -248,11 +246,12 @@ impl SinglePipe { let new_seq = writable_file.seq + 1; debug_assert!(new_seq > DEFAULT_FIRST_FILE_SEQ); - writable_file.writer.close()?; + writable_file.writer.close(); let (path_id, handle) = self .recycle_file(new_seq) - .unwrap_or_else(|| self.new_file(new_seq))?; + .unwrap_or_else(|| self.new_file(new_seq)) + .unwrap(); let f = File:: { seq: new_seq, handle: handle.into(), @@ -267,13 +266,14 @@ impl SinglePipe { f.handle.clone(), f.format, true, /* force_reset */ - )?, + ) + .unwrap(), format: f.format, }; // File header must be persisted. This way we can recover gracefully if power // loss before a new entry is written. - new_file.writer.sync()?; - self.sync_dir(path_id)?; + new_file.writer.sync(); + self.sync_dir(path_id).unwrap(); **writable_file = new_file; let len = { @@ -288,7 +288,6 @@ impl SinglePipe { seq: new_seq, }); } - Ok(()) } /// Synchronizes current states to related metrics. @@ -321,17 +320,7 @@ impl SinglePipe { fail_point!("file_pipe_log::append"); let mut writable_file = self.writable_file.lock(); if writable_file.writer.offset() >= self.target_file_size { - if let Err(e) = self.rotate_imp(&mut writable_file) { - // As the disk is already full and is no valid and extra directory to flush - // data, it can safely return the `no space err` to users. - if is_io_no_space_err(&e) { - return Err(e); - } - panic!( - "error when rotate [{:?}:{}]: {e}", - self.queue, writable_file.seq, - ); - } + self.rotate_imp(&mut writable_file); } let seq = writable_file.seq; @@ -364,9 +353,7 @@ impl SinglePipe { } let start_offset = writer.offset(); if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) { - if let Err(te) = writer.truncate() { - panic!("error when truncate {seq} after error: {e}, get: {}", te); - } + writer.truncate(); if is_no_space_err(&e) { // TODO: There exists several corner cases should be tackled if // `bytes.len()` > `target_file_size`. For example, @@ -377,11 +364,7 @@ impl SinglePipe { // - [3] Both main-dir and spill-dir have several recycled logs. // But as `bytes.len()` is always smaller than `target_file_size` in common // cases, this issue will be ignored temprorarily. - if let Err(e) = self.rotate_imp(&mut writable_file) { - if is_io_no_space_err(&e) { - return Err(e); - } - } + self.rotate_imp(&mut writable_file); // If there still exists free space for this record, rotate the file // and return a special TryAgain Err (for retry) to the caller. return Err(Error::TryAgain(format!( @@ -405,18 +388,11 @@ impl SinglePipe { Ok(handle) } - fn sync(&self) -> Result<()> { + fn sync(&self) { let mut writable_file = self.writable_file.lock(); - let seq = writable_file.seq; let writer = &mut writable_file.writer; - { - let _t = StopWatch::new(perf_context!(log_sync_duration)); - if let Err(e) = writer.sync() { - panic!("error when sync [{:?}:{seq}]: {e}", self.queue); - } - } - - Ok(()) + let _t = StopWatch::new(perf_context!(log_sync_duration)); + writer.sync(); } fn file_span(&self) -> (FileSeq, FileSeq) { @@ -429,8 +405,8 @@ impl SinglePipe { (last_seq - first_seq + 1) as usize * self.target_file_size } - fn rotate(&self) -> Result<()> { - self.rotate_imp(&mut self.writable_file.lock()) + fn rotate(&self) { + self.rotate_imp(&mut self.writable_file.lock()); } fn purge_to(&self, file_seq: FileSeq) -> Result { @@ -524,8 +500,8 @@ impl PipeLog for DualPipes { } #[inline] - fn sync(&self, queue: LogQueue) -> Result<()> { - self.pipes[queue as usize].sync() + fn sync(&self, queue: LogQueue) { + self.pipes[queue as usize].sync(); } #[inline] @@ -539,8 +515,8 @@ impl PipeLog for DualPipes { } #[inline] - fn rotate(&self, queue: LogQueue) -> Result<()> { - self.pipes[queue as usize].rotate() + fn rotate(&self, queue: LogQueue) { + self.pipes[queue as usize].rotate(); } #[inline] @@ -652,7 +628,7 @@ mod tests { assert_eq!(file_handle.offset, header_size); assert_eq!(pipe_log.file_span(queue).1, 2); - pipe_log.rotate(queue).unwrap(); + pipe_log.rotate(queue); // purge file 1 assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1); @@ -720,9 +696,9 @@ mod tests { let mut handles = Vec::new(); for i in 0..10 { handles.push(pipe_log.append(&mut &content(i)).unwrap()); - pipe_log.sync().unwrap(); + pipe_log.sync(); } - pipe_log.rotate().unwrap(); + pipe_log.rotate(); let (first, last) = pipe_log.file_span(); // Cannot purge already expired logs or not existsed logs. assert!(pipe_log.purge_to(first - 1).is_err()); @@ -737,7 +713,7 @@ mod tests { let mut handles = Vec::new(); for i in 0..10 { handles.push(pipe_log.append(&mut &content(i + 1)).unwrap()); - pipe_log.sync().unwrap(); + pipe_log.sync(); } // Verify the data. for (i, handle) in handles.into_iter().enumerate() { diff --git a/src/filter.rs b/src/filter.rs index f992d788..e59162d8 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -333,7 +333,7 @@ impl RhaiFilterMachine { )?; log_batch.drain(); } - writer.close()?; + writer.close(); } } // Delete backup file and defuse the guard. diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 33ca4071..4cf94393 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -179,7 +179,7 @@ pub trait PipeLog: Sized { /// /// This operation might incurs a great latency overhead. It's advised to /// call it once every batch of writes. - fn sync(&self, queue: LogQueue) -> Result<()>; + fn sync(&self, queue: LogQueue); /// Returns the smallest and largest file sequence number, still in use, /// of the specified log queue. @@ -200,7 +200,7 @@ pub trait PipeLog: Sized { /// /// Implementation should be atomic under error conditions but not /// necessarily panic-safe. - fn rotate(&self, queue: LogQueue) -> Result<()>; + fn rotate(&self, queue: LogQueue); /// Deletes all log files smaller than the specified file ID. The scope is /// limited to the log queue of `file_id`. diff --git a/src/purge.rs b/src/purge.rs index b1183438..70c6430e 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -137,7 +137,7 @@ where let (_, last) = self.pipe_log.file_span(LogQueue::Append); let watermark = watermark.map_or(last, |w| std::cmp::min(w, last)); if watermark == last { - self.pipe_log.rotate(LogQueue::Append).unwrap(); + self.pipe_log.rotate(LogQueue::Append); } self.rewrite_append_queue_tombstones().unwrap(); if exit_after_step == Some(1) { @@ -167,13 +167,13 @@ where pub fn must_purge_all_stale(&self) { let _lk = self.force_rewrite_candidates.try_lock().unwrap(); - self.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + self.pipe_log.rotate(LogQueue::Rewrite); self.rescan_memtables_and_purge_stale_files( LogQueue::Rewrite, self.pipe_log.file_span(LogQueue::Rewrite).1, ) .unwrap(); - self.pipe_log.rotate(LogQueue::Append).unwrap(); + self.pipe_log.rotate(LogQueue::Append); self.rescan_memtables_and_purge_stale_files( LogQueue::Append, self.pipe_log.file_span(LogQueue::Append).1, @@ -273,7 +273,7 @@ where // Rewrites the entire rewrite queue into new log files. fn rewrite_rewrite_queue(&self) -> Result> { let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM); - self.pipe_log.rotate(LogQueue::Rewrite)?; + self.pipe_log.rotate(LogQueue::Rewrite); let mut force_compact_regions = vec![]; let memtables = self.memtables.collect(|t| { @@ -430,7 +430,7 @@ where ) -> Result> { if log_batch.is_empty() { debug_assert!(sync); - self.pipe_log.sync(LogQueue::Rewrite)?; + self.pipe_log.sync(LogQueue::Rewrite); return Ok(None); } log_batch.finish_populate( @@ -439,7 +439,7 @@ where )?; let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; if sync { - self.pipe_log.sync(LogQueue::Rewrite)? + self.pipe_log.sync(LogQueue::Rewrite); } log_batch.finish_write(file_handle); self.memtables.apply_rewrite_writes(