From c606f51aaa0977d6f808592a81eab2b87e03f06c Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Wed, 8 Nov 2023 16:05:22 -0800 Subject: [PATCH] Propagate error if writing header fails Signed-off-by: Yang Zhang --- src/engine.rs | 14 +++++++------- src/file_pipe_log/pipe.rs | 22 +++++++++++----------- src/pipe_log.rs | 2 +- src/purge.rs | 8 ++++---- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index ef2f9bef..84258c8f 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2446,7 +2446,7 @@ pub(crate) mod tests { builder.begin(&mut log_batch); log_batch.put(rid, key.clone(), value.clone()).unwrap(); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { // begin - unrelated - end. @@ -2466,7 +2466,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { // begin - middle - middle - end. @@ -2491,7 +2491,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { // begin - begin - end. @@ -2511,7 +2511,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { // end - middle - end. @@ -2533,7 +2533,7 @@ pub(crate) mod tests { rid += 1; log_batch.put(rid, key.clone(), value.clone()).unwrap(); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { // end - begin - end @@ -2554,7 +2554,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { // begin - end - begin - end. @@ -2574,7 +2574,7 @@ pub(crate) mod tests { log_batch.put(rid, key.clone(), value.clone()).unwrap(); data.insert(rid); flush(&mut log_batch); - engine.pipe_log.rotate(LogQueue::Rewrite); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } engine.pipe_log.sync(LogQueue::Rewrite); diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 0e9e2966..2ed6d6a6 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -238,7 +238,7 @@ impl SinglePipe { /// Creates a new file for write, and rotates the active log file. /// /// This operation is atomic in face of errors. - fn rotate_imp(&self, writable_file: &mut MutexGuard>) { + fn rotate_imp(&self, writable_file: &mut MutexGuard>) -> Result<()> { let _t = StopWatch::new(( &*LOG_ROTATE_DURATION_HISTOGRAM, perf_context!(log_rotate_duration), @@ -266,8 +266,7 @@ impl SinglePipe { f.handle.clone(), f.format, true, /* force_reset */ - ) - .unwrap(), + )?, format: f.format, }; // File header must be persisted. This way we can recover gracefully if power @@ -288,6 +287,7 @@ impl SinglePipe { seq: new_seq, }); } + Ok(()) } /// Synchronizes current states to related metrics. @@ -320,7 +320,7 @@ impl SinglePipe { fail_point!("file_pipe_log::append"); let mut writable_file = self.writable_file.lock(); if writable_file.writer.offset() >= self.target_file_size { - self.rotate_imp(&mut writable_file); + self.rotate_imp(&mut writable_file)?; } let seq = writable_file.seq; @@ -364,7 +364,7 @@ impl SinglePipe { // - [3] Both main-dir and spill-dir have several recycled logs. // But as `bytes.len()` is always smaller than `target_file_size` in common // cases, this issue will be ignored temprorarily. - self.rotate_imp(&mut writable_file); + self.rotate_imp(&mut writable_file)?; // If there still exists free space for this record, rotate the file // and return a special TryAgain Err (for retry) to the caller. return Err(Error::TryAgain(format!( @@ -405,8 +405,8 @@ impl SinglePipe { (last_seq - first_seq + 1) as usize * self.target_file_size } - fn rotate(&self) { - self.rotate_imp(&mut self.writable_file.lock()); + fn rotate(&self) -> Result<()> { + self.rotate_imp(&mut self.writable_file.lock()) } fn purge_to(&self, file_seq: FileSeq) -> Result { @@ -515,8 +515,8 @@ impl PipeLog for DualPipes { } #[inline] - fn rotate(&self, queue: LogQueue) { - self.pipes[queue as usize].rotate(); + fn rotate(&self, queue: LogQueue) -> Result<()> { + self.pipes[queue as usize].rotate() } #[inline] @@ -628,7 +628,7 @@ mod tests { assert_eq!(file_handle.offset, header_size); assert_eq!(pipe_log.file_span(queue).1, 2); - pipe_log.rotate(queue); + pipe_log.rotate(queue).unwrap(); // purge file 1 assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1); @@ -698,7 +698,7 @@ mod tests { handles.push(pipe_log.append(&mut &content(i)).unwrap()); pipe_log.sync(); } - pipe_log.rotate(); + pipe_log.rotate().unwrap(); let (first, last) = pipe_log.file_span(); // Cannot purge already expired logs or not existsed logs. assert!(pipe_log.purge_to(first - 1).is_err()); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 4cf94393..725d607b 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -200,7 +200,7 @@ pub trait PipeLog: Sized { /// /// Implementation should be atomic under error conditions but not /// necessarily panic-safe. - fn rotate(&self, queue: LogQueue); + fn rotate(&self, queue: LogQueue) -> Result<()>; /// Deletes all log files smaller than the specified file ID. The scope is /// limited to the log queue of `file_id`. diff --git a/src/purge.rs b/src/purge.rs index 70c6430e..35373bb6 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -137,7 +137,7 @@ where let (_, last) = self.pipe_log.file_span(LogQueue::Append); let watermark = watermark.map_or(last, |w| std::cmp::min(w, last)); if watermark == last { - self.pipe_log.rotate(LogQueue::Append); + self.pipe_log.rotate(LogQueue::Append).unwrap(); } self.rewrite_append_queue_tombstones().unwrap(); if exit_after_step == Some(1) { @@ -167,13 +167,13 @@ where pub fn must_purge_all_stale(&self) { let _lk = self.force_rewrite_candidates.try_lock().unwrap(); - self.pipe_log.rotate(LogQueue::Rewrite); + self.pipe_log.rotate(LogQueue::Rewrite).unwrap(); self.rescan_memtables_and_purge_stale_files( LogQueue::Rewrite, self.pipe_log.file_span(LogQueue::Rewrite).1, ) .unwrap(); - self.pipe_log.rotate(LogQueue::Append); + self.pipe_log.rotate(LogQueue::Append).unwrap(); self.rescan_memtables_and_purge_stale_files( LogQueue::Append, self.pipe_log.file_span(LogQueue::Append).1, @@ -273,7 +273,7 @@ where // Rewrites the entire rewrite queue into new log files. fn rewrite_rewrite_queue(&self) -> Result> { let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM); - self.pipe_log.rotate(LogQueue::Rewrite); + self.pipe_log.rotate(LogQueue::Rewrite).unwrap(); let mut force_compact_regions = vec![]; let memtables = self.memtables.collect(|t| {