Skip to content

Commit

Permalink
Move panic inside
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Nov 8, 2023
1 parent 6fcb077 commit 8cc474d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 89 deletions.
20 changes: 10 additions & 10 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ where
}
perf_context!(log_write_duration).observe_since(now);
if sync {
// As per trait protocol, this error should be retriable. But we panic anyway to
// As per trait protocol, sync error should be retriable. But we panic anyway to
// save the trouble of propagating it to other group members.
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
self.pipe_log.sync(LogQueue::Append);
}
// Pass the perf context diff to all the writers.
let diff = get_perf_context();
Expand Down Expand Up @@ -2446,7 +2446,7 @@ pub(crate) mod tests {
builder.begin(&mut log_batch);
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
engine.pipe_log.rotate(LogQueue::Rewrite);
}
{
// begin - unrelated - end.
Expand All @@ -2466,7 +2466,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
engine.pipe_log.rotate(LogQueue::Rewrite);
}
{
// begin - middle - middle - end.
Expand All @@ -2491,7 +2491,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
engine.pipe_log.rotate(LogQueue::Rewrite);
}
{
// begin - begin - end.
Expand All @@ -2511,7 +2511,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
engine.pipe_log.rotate(LogQueue::Rewrite);
}
{
// end - middle - end.
Expand All @@ -2533,7 +2533,7 @@ pub(crate) mod tests {
rid += 1;
log_batch.put(rid, key.clone(), value.clone()).unwrap();
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
engine.pipe_log.rotate(LogQueue::Rewrite);
}
{
// end - begin - end
Expand All @@ -2554,7 +2554,7 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
engine.pipe_log.rotate(LogQueue::Rewrite);
}
{
// begin - end - begin - end.
Expand All @@ -2574,9 +2574,9 @@ pub(crate) mod tests {
log_batch.put(rid, key.clone(), value.clone()).unwrap();
data.insert(rid);
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
engine.pipe_log.rotate(LogQueue::Rewrite);
}
engine.pipe_log.sync(LogQueue::Rewrite).unwrap();
engine.pipe_log.sync(LogQueue::Rewrite);

let engine = engine.reopen();
for rid in engine.raft_groups() {
Expand Down
7 changes: 0 additions & 7 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,3 @@ pub(crate) fn is_no_space_err(e: &IoError) -> bool {
// `ErrorKind::StorageFull` is stable.
format!("{e}").contains("nospace")
}

/// Check whether the given error is a nospace error.
pub(crate) fn is_io_no_space_err(e: &Error) -> bool {
// TODO: make the following judgement more elegant when the error type
// `ErrorKind::StorageFull` is stable.
format!("{e}").contains("nospace")
}
18 changes: 9 additions & 9 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ impl<F: FileSystem> LogFileWriter<F> {
self.write(&buf, 0)
}

pub fn close(&mut self) -> IoResult<()> {
pub fn close(&mut self) {
// Necessary to truncate extra zeros from fallocate().
self.truncate()?;
self.sync()
self.truncate();
self.sync();
}

pub fn truncate(&mut self) -> IoResult<()> {
pub fn truncate(&mut self) {
if self.written < self.capacity {
fail_point!("file_pipe_log::log_file_writer::skip_truncate", |_| {
Ok(())
});
self.writer.truncate(self.written)?;
// Panic if truncate fails, in case of data loss.
self.writer.truncate(self.written).unwrap();
self.capacity = self.written;
}
Ok(())
}

pub fn write(&mut self, buf: &[u8], target_size_hint: usize) -> IoResult<()> {
Expand Down Expand Up @@ -117,10 +117,10 @@ impl<F: FileSystem> LogFileWriter<F> {
Ok(())
}

pub fn sync(&mut self) -> IoResult<()> {
pub fn sync(&mut self) {
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.handle.sync()?;
Ok(())
// Panic if sync fails, in case of data loss.
self.handle.sync().unwrap();
}

#[inline]
Expand Down
8 changes: 4 additions & 4 deletions src/file_pipe_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub mod debug {
len,
});
}
writer.close().unwrap();
writer.close();
// Read and verify.
let mut reader =
LogItemReader::new_file_reader(file_system.clone(), &file_path).unwrap();
Expand Down Expand Up @@ -280,7 +280,7 @@ pub mod debug {
true, /* create */
)
.unwrap();
writer.close().unwrap();
writer.close();

assert!(LogItemReader::new_file_reader(file_system.clone(), dir.path()).is_err());
assert!(
Expand Down Expand Up @@ -330,7 +330,7 @@ pub mod debug {
.unwrap();
let f = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
let len = writer.offset();
writer.close().unwrap();
writer.close();
if shorter {
f.set_len(len as u64 - 1).unwrap();
}
Expand All @@ -341,7 +341,7 @@ pub mod debug {
false, /* create */
)
.unwrap();
writer.close().unwrap();
writer.close();
let mut reader = build_file_reader(file_system.as_ref(), &path).unwrap();
assert_eq!(reader.parse_format().unwrap(), to);
std::fs::remove_file(&path).unwrap();
Expand Down
76 changes: 26 additions & 50 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use parking_lot::{Mutex, MutexGuard, RwLock};

use crate::config::Config;
use crate::env::{FileSystem, Permission};
use crate::errors::{is_io_no_space_err, is_no_space_err};
use crate::errors::is_no_space_err;
use crate::event_listener::EventListener;
use crate::metrics::*;
use crate::pipe_log::{
Expand Down Expand Up @@ -72,9 +72,7 @@ pub(super) struct SinglePipe<F: FileSystem> {
impl<F: FileSystem> Drop for SinglePipe<F> {
fn drop(&mut self) {
let mut writable_file = self.writable_file.lock();
if let Err(e) = writable_file.writer.close() {
error!("error while closing the active writer: {e}");
}
writable_file.writer.close();
let mut recycled_files = self.recycled_files.write();
let mut next_reserved_seq = recycled_files
.iter()
Expand Down Expand Up @@ -240,19 +238,20 @@ impl<F: FileSystem> SinglePipe<F> {
/// Creates a new file for write, and rotates the active log file.
///
/// This operation is atomic in face of errors.
fn rotate_imp(&self, writable_file: &mut MutexGuard<WritableFile<F>>) -> Result<()> {
fn rotate_imp(&self, writable_file: &mut MutexGuard<WritableFile<F>>) {
let _t = StopWatch::new((
&*LOG_ROTATE_DURATION_HISTOGRAM,
perf_context!(log_rotate_duration),
));
let new_seq = writable_file.seq + 1;
debug_assert!(new_seq > DEFAULT_FIRST_FILE_SEQ);

writable_file.writer.close()?;
writable_file.writer.close();

let (path_id, handle) = self
.recycle_file(new_seq)
.unwrap_or_else(|| self.new_file(new_seq))?;
.unwrap_or_else(|| self.new_file(new_seq))
.unwrap();
let f = File::<F> {
seq: new_seq,
handle: handle.into(),
Expand All @@ -267,13 +266,14 @@ impl<F: FileSystem> SinglePipe<F> {
f.handle.clone(),
f.format,
true, /* force_reset */
)?,
)
.unwrap(),
format: f.format,
};
// File header must be persisted. This way we can recover gracefully if power
// loss before a new entry is written.
new_file.writer.sync()?;
self.sync_dir(path_id)?;
new_file.writer.sync();
self.sync_dir(path_id).unwrap();

**writable_file = new_file;
let len = {
Expand All @@ -288,7 +288,6 @@ impl<F: FileSystem> SinglePipe<F> {
seq: new_seq,
});
}
Ok(())
}

/// Synchronizes current states to related metrics.
Expand Down Expand Up @@ -321,17 +320,7 @@ impl<F: FileSystem> SinglePipe<F> {
fail_point!("file_pipe_log::append");
let mut writable_file = self.writable_file.lock();
if writable_file.writer.offset() >= self.target_file_size {
if let Err(e) = self.rotate_imp(&mut writable_file) {
// As the disk is already full and is no valid and extra directory to flush
// data, it can safely return the `no space err` to users.
if is_io_no_space_err(&e) {
return Err(e);
}
panic!(
"error when rotate [{:?}:{}]: {e}",
self.queue, writable_file.seq,
);
}
self.rotate_imp(&mut writable_file);
}

let seq = writable_file.seq;
Expand Down Expand Up @@ -364,9 +353,7 @@ impl<F: FileSystem> SinglePipe<F> {
}
let start_offset = writer.offset();
if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) {
if let Err(te) = writer.truncate() {
panic!("error when truncate {seq} after error: {e}, get: {}", te);
}
writer.truncate();
if is_no_space_err(&e) {
// TODO: There exists several corner cases should be tackled if
// `bytes.len()` > `target_file_size`. For example,
Expand All @@ -377,11 +364,7 @@ impl<F: FileSystem> SinglePipe<F> {
// - [3] Both main-dir and spill-dir have several recycled logs.
// But as `bytes.len()` is always smaller than `target_file_size` in common
// cases, this issue will be ignored temprorarily.
if let Err(e) = self.rotate_imp(&mut writable_file) {
if is_io_no_space_err(&e) {
return Err(e);
}
}
self.rotate_imp(&mut writable_file);
// If there still exists free space for this record, rotate the file
// and return a special TryAgain Err (for retry) to the caller.
return Err(Error::TryAgain(format!(
Expand All @@ -405,18 +388,11 @@ impl<F: FileSystem> SinglePipe<F> {
Ok(handle)
}

fn sync(&self) -> Result<()> {
fn sync(&self) {
let mut writable_file = self.writable_file.lock();
let seq = writable_file.seq;
let writer = &mut writable_file.writer;
{
let _t = StopWatch::new(perf_context!(log_sync_duration));
if let Err(e) = writer.sync() {
panic!("error when sync [{:?}:{seq}]: {e}", self.queue);
}
}

Ok(())
let _t = StopWatch::new(perf_context!(log_sync_duration));
writer.sync();
}

fn file_span(&self) -> (FileSeq, FileSeq) {
Expand All @@ -429,8 +405,8 @@ impl<F: FileSystem> SinglePipe<F> {
(last_seq - first_seq + 1) as usize * self.target_file_size
}

fn rotate(&self) -> Result<()> {
self.rotate_imp(&mut self.writable_file.lock())
fn rotate(&self) {
self.rotate_imp(&mut self.writable_file.lock());
}

fn purge_to(&self, file_seq: FileSeq) -> Result<usize> {
Expand Down Expand Up @@ -524,8 +500,8 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
}

#[inline]
fn sync(&self, queue: LogQueue) -> Result<()> {
self.pipes[queue as usize].sync()
fn sync(&self, queue: LogQueue) {
self.pipes[queue as usize].sync();
}

#[inline]
Expand All @@ -539,8 +515,8 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
}

#[inline]
fn rotate(&self, queue: LogQueue) -> Result<()> {
self.pipes[queue as usize].rotate()
fn rotate(&self, queue: LogQueue) {
self.pipes[queue as usize].rotate();
}

#[inline]
Expand Down Expand Up @@ -652,7 +628,7 @@ mod tests {
assert_eq!(file_handle.offset, header_size);
assert_eq!(pipe_log.file_span(queue).1, 2);

pipe_log.rotate(queue).unwrap();
pipe_log.rotate(queue);

// purge file 1
assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1);
Expand Down Expand Up @@ -720,9 +696,9 @@ mod tests {
let mut handles = Vec::new();
for i in 0..10 {
handles.push(pipe_log.append(&mut &content(i)).unwrap());
pipe_log.sync().unwrap();
pipe_log.sync();
}
pipe_log.rotate().unwrap();
pipe_log.rotate();
let (first, last) = pipe_log.file_span();
// Cannot purge already expired logs or not existsed logs.
assert!(pipe_log.purge_to(first - 1).is_err());
Expand All @@ -737,7 +713,7 @@ mod tests {
let mut handles = Vec::new();
for i in 0..10 {
handles.push(pipe_log.append(&mut &content(i + 1)).unwrap());
pipe_log.sync().unwrap();
pipe_log.sync();
}
// Verify the data.
for (i, handle) in handles.into_iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl RhaiFilterMachine {
)?;
log_batch.drain();
}
writer.close()?;
writer.close();
}
}
// Delete backup file and defuse the guard.
Expand Down
4 changes: 2 additions & 2 deletions src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub trait PipeLog: Sized {
///
/// This operation might incurs a great latency overhead. It's advised to
/// call it once every batch of writes.
fn sync(&self, queue: LogQueue) -> Result<()>;
fn sync(&self, queue: LogQueue);

/// Returns the smallest and largest file sequence number, still in use,
/// of the specified log queue.
Expand All @@ -200,7 +200,7 @@ pub trait PipeLog: Sized {
///
/// Implementation should be atomic under error conditions but not
/// necessarily panic-safe.
fn rotate(&self, queue: LogQueue) -> Result<()>;
fn rotate(&self, queue: LogQueue);

/// Deletes all log files smaller than the specified file ID. The scope is
/// limited to the log queue of `file_id`.
Expand Down
Loading

0 comments on commit 8cc474d

Please sign in to comment.