Skip to content

Commit

Permalink
Propagate error if writing header fails
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Nov 9, 2023
1 parent 1fd5416 commit c606f51
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 23 deletions.
14 changes: 7 additions & 7 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2446,7 +2446,7 @@ pub(crate) mod tests {
builder.begin(&mut log_batch);
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
// begin - unrelated - end.
Expand All @@ -2466,7 +2466,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
// begin - middle - middle - end.
Expand All @@ -2491,7 +2491,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
// begin - begin - end.
Expand All @@ -2511,7 +2511,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
// end - middle - end.
Expand All @@ -2533,7 +2533,7 @@ pub(crate) mod tests {
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
// end - begin - end
Expand All @@ -2554,7 +2554,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
{
// begin - end - begin - end.
Expand All @@ -2574,7 +2574,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
engine.pipe_log.sync(LogQueue::Rewrite);

Expand Down
22 changes: 11 additions & 11 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl<F: FileSystem> SinglePipe<F> {
/// Creates a new file for write, and rotates the active log file.
///
/// This operation is atomic in face of errors.
fn rotate_imp(&self, writable_file: &mut MutexGuard<WritableFile<F>>) {
fn rotate_imp(&self, writable_file: &mut MutexGuard<WritableFile<F>>) -> Result<()> {
let _t = StopWatch::new((
&*LOG_ROTATE_DURATION_HISTOGRAM,
perf_context!(log_rotate_duration),
Expand Down Expand Up @@ -266,8 +266,7 @@ impl<F: FileSystem> SinglePipe<F> {
f.handle.clone(),
f.format,
true, /* force_reset */
)
.unwrap(),
)?,
format: f.format,
};
// File header must be persisted. This way we can recover gracefully if power
Expand All @@ -288,6 +287,7 @@ impl<F: FileSystem> SinglePipe<F> {
seq: new_seq,
});
}
Ok(())
}

/// Synchronizes current states to related metrics.
Expand Down Expand Up @@ -320,7 +320,7 @@ impl<F: FileSystem> SinglePipe<F> {
fail_point!("file_pipe_log::append");
let mut writable_file = self.writable_file.lock();
if writable_file.writer.offset() >= self.target_file_size {
self.rotate_imp(&mut writable_file);
self.rotate_imp(&mut writable_file)?;
}

let seq = writable_file.seq;
Expand Down Expand Up @@ -364,7 +364,7 @@ impl<F: FileSystem> SinglePipe<F> {
// - [3] Both main-dir and spill-dir have several recycled logs.
// But as `bytes.len()` is always smaller than `target_file_size` in common
// cases, this issue will be ignored temprorarily.
self.rotate_imp(&mut writable_file);
self.rotate_imp(&mut writable_file)?;
// If there still exists free space for this record, rotate the file
// and return a special TryAgain Err (for retry) to the caller.
return Err(Error::TryAgain(format!(
Expand Down Expand Up @@ -405,8 +405,8 @@ impl<F: FileSystem> SinglePipe<F> {
(last_seq - first_seq + 1) as usize * self.target_file_size
}

fn rotate(&self) {
self.rotate_imp(&mut self.writable_file.lock());
fn rotate(&self) -> Result<()> {
self.rotate_imp(&mut self.writable_file.lock())
}

fn purge_to(&self, file_seq: FileSeq) -> Result<usize> {
Expand Down Expand Up @@ -515,8 +515,8 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
}

#[inline]
fn rotate(&self, queue: LogQueue) {
self.pipes[queue as usize].rotate();
fn rotate(&self, queue: LogQueue) -> Result<()> {
self.pipes[queue as usize].rotate()
}

#[inline]
Expand Down Expand Up @@ -628,7 +628,7 @@ mod tests {
assert_eq!(file_handle.offset, header_size);
assert_eq!(pipe_log.file_span(queue).1, 2);

pipe_log.rotate(queue);
pipe_log.rotate(queue).unwrap();

// purge file 1
assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1);
Expand Down Expand Up @@ -698,7 +698,7 @@ mod tests {
handles.push(pipe_log.append(&mut &content(i)).unwrap());
pipe_log.sync();
}
pipe_log.rotate();
pipe_log.rotate().unwrap();
let (first, last) = pipe_log.file_span();
// Cannot purge already expired logs or not existsed logs.
assert!(pipe_log.purge_to(first - 1).is_err());
Expand Down
2 changes: 1 addition & 1 deletion src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub trait PipeLog: Sized {
///
/// Implementation should be atomic under error conditions but not
/// necessarily panic-safe.
fn rotate(&self, queue: LogQueue);
fn rotate(&self, queue: LogQueue) -> Result<()>;

/// Deletes all log files smaller than the specified file ID. The scope is
/// limited to the log queue of `file_id`.
Expand Down
8 changes: 4 additions & 4 deletions src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where
let (_, last) = self.pipe_log.file_span(LogQueue::Append);
let watermark = watermark.map_or(last, |w| std::cmp::min(w, last));
if watermark == last {
self.pipe_log.rotate(LogQueue::Append);
self.pipe_log.rotate(LogQueue::Append).unwrap();
}
self.rewrite_append_queue_tombstones().unwrap();
if exit_after_step == Some(1) {
Expand Down Expand Up @@ -167,13 +167,13 @@ where

pub fn must_purge_all_stale(&self) {
let _lk = self.force_rewrite_candidates.try_lock().unwrap();
self.pipe_log.rotate(LogQueue::Rewrite);
self.pipe_log.rotate(LogQueue::Rewrite).unwrap();
self.rescan_memtables_and_purge_stale_files(
LogQueue::Rewrite,
self.pipe_log.file_span(LogQueue::Rewrite).1,
)
.unwrap();
self.pipe_log.rotate(LogQueue::Append);
self.pipe_log.rotate(LogQueue::Append).unwrap();
self.rescan_memtables_and_purge_stale_files(
LogQueue::Append,
self.pipe_log.file_span(LogQueue::Append).1,
Expand Down Expand Up @@ -273,7 +273,7 @@ where
// Rewrites the entire rewrite queue into new log files.
fn rewrite_rewrite_queue(&self) -> Result<Vec<u64>> {
let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM);
self.pipe_log.rotate(LogQueue::Rewrite);
self.pipe_log.rotate(LogQueue::Rewrite).unwrap();

let mut force_compact_regions = vec![];
let memtables = self.memtables.collect(|t| {
Expand Down

0 comments on commit c606f51

Please sign in to comment.