Skip to content

Commit

Permalink
reseek after write failure
Browse files Browse the repository at this point in the history
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie committed Jul 15, 2022
1 parent 9747bcf commit 99af65f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ impl Read for LogFile {

impl Seek for LogFile {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
fail_point!("log_file::seek::err", |_| {
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp"))
});
match pos {
SeekFrom::Start(offset) => self.offset = offset as usize,
SeekFrom::Current(i) => self.offset = (self.offset as i64 + i) as usize,
Expand Down
9 changes: 8 additions & 1 deletion src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,14 @@ impl<F: FileSystem> LogFileWriter<F> {
}
self.capacity += alloc;
}
self.writer.write_all(buf)?;
self.writer.write_all(buf).map_err(|e| {
self.writer
.seek(SeekFrom::Start(self.written as u64))
.unwrap_or_else(|e| {
panic!("failed to reseek after write failure: {}", e);
});
e
})?;
self.written = new_written;
Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,12 +737,12 @@ impl LogBatch {

#[cfg(feature = "failpoints")]
{
let corrupted_entries = || {
fail::fail_point!("log_batch::corrupted_entries", |_| true);
let corrupted_items = || {
fail::fail_point!("log_batch::corrupted_items", |_| true);
false
};
if corrupted_entries() && footer_roffset > LOG_BATCH_HEADER_LEN {
self.buf[footer_roffset - 1] += 1;
if corrupted_items() {
self.buf[footer_roffset] += 1;
}
}

Expand Down
62 changes: 62 additions & 0 deletions tests/failpoints/test_io_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,68 @@ fn test_concurrent_write_error() {
);
}

#[test]
fn test_non_atomic_write_error() {
let dir = tempfile::Builder::new()
.prefix("test_non_atomic_write_error")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
bytes_per_sync: ReadableSize::kb(1024),
target_file_size: ReadableSize::kb(1024),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let entry = vec![b'x'; 1024];
let rid = 1;

{
// Write partially succeeds. We can reopen.
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let _f1 = FailGuard::new("log_fd::write::err", "return");
engine
.write(&mut generate_batch(rid, 0, 1, Some(&entry)), true)
.unwrap_err();
}
{
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
assert_eq!(engine.first_index(rid), None);
}
{
// Write partially succeeds. We can overwrite.
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let _f1 = FailGuard::new("log_fd::write::err", "1*off->1*return->off");
engine
.write(&mut generate_batch(rid, 0, 1, Some(&entry)), true)
.unwrap_err();
engine
.write(&mut generate_batch(rid, 5, 6, Some(&entry)), true)
.unwrap();
assert_eq!(engine.first_index(rid).unwrap(), 5);
}
{
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
assert_eq!(engine.first_index(rid).unwrap(), 5);
}
{
// Write partially succeeds and can't be reverted. We panic.
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let _f1 = FailGuard::new("log_fd::write::err", "return");
let _f2 = FailGuard::new("log_file::seek::err", "return");
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(rid, 6, 7, Some(&entry)), true)
.unwrap_err();
})
.is_err());
}
{
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
assert_eq!(engine.last_index(rid), Some(5));
}
}

#[cfg(feature = "scripting")]
#[test]
fn test_error_during_repair() {
Expand Down

0 comments on commit 99af65f

Please sign in to comment.