diff --git a/src/env/default.rs b/src/env/default.rs index 31211355..6d505287 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -232,6 +232,9 @@ impl Read for LogFile { impl Seek for LogFile { fn seek(&mut self, pos: SeekFrom) -> IoResult { + fail_point!("log_file::seek::err", |_| { + Err(std::io::Error::new(std::io::ErrorKind::Other, "fp")) + }); match pos { SeekFrom::Start(offset) => self.offset = offset as usize, SeekFrom::Current(i) => self.offset = (self.offset as i64 + i) as usize, diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index d67daea3..7298d359 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -104,7 +104,14 @@ impl LogFileWriter { } self.capacity += alloc; } - self.writer.write_all(buf)?; + self.writer.write_all(buf).map_err(|e| { + self.writer + .seek(SeekFrom::Start(self.written as u64)) + .unwrap_or_else(|e| { + panic!("failed to reseek after write failure: {}", e); + }); + e + })?; self.written = new_written; Ok(()) } diff --git a/src/log_batch.rs b/src/log_batch.rs index dc4938a7..62b0e805 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -737,12 +737,12 @@ impl LogBatch { #[cfg(feature = "failpoints")] { - let corrupted_entries = || { - fail::fail_point!("log_batch::corrupted_entries", |_| true); + let corrupted_items = || { + fail::fail_point!("log_batch::corrupted_items", |_| true); false }; - if corrupted_entries() && footer_roffset > LOG_BATCH_HEADER_LEN { - self.buf[footer_roffset - 1] += 1; + if corrupted_items() { + self.buf[footer_roffset] += 1; } } diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index d5a69c5c..219cc630 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -287,6 +287,68 @@ fn test_concurrent_write_error() { ); } +#[test] +fn test_non_atomic_write_error() { + let dir = tempfile::Builder::new() + .prefix("test_non_atomic_write_error") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + bytes_per_sync: ReadableSize::kb(1024), + target_file_size: ReadableSize::kb(1024), + ..Default::default() + }; + let fs = Arc::new(ObfuscatedFileSystem::default()); + let entry = vec![b'x'; 1024]; + let rid = 1; + + { + // Write partially succeeds. We can reopen. + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let _f1 = FailGuard::new("log_fd::write::err", "return"); + engine + .write(&mut generate_batch(rid, 0, 1, Some(&entry)), true) + .unwrap_err(); + } + { + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + assert_eq!(engine.first_index(rid), None); + } + { + // Write partially succeeds. We can overwrite. + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let _f1 = FailGuard::new("log_fd::write::err", "1*off->1*return->off"); + engine + .write(&mut generate_batch(rid, 0, 1, Some(&entry)), true) + .unwrap_err(); + engine + .write(&mut generate_batch(rid, 5, 6, Some(&entry)), true) + .unwrap(); + assert_eq!(engine.first_index(rid).unwrap(), 5); + } + { + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + assert_eq!(engine.first_index(rid).unwrap(), 5); + } + { + // Write partially succeeds and can't be reverted. We panic. + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let _f1 = FailGuard::new("log_fd::write::err", "return"); + let _f2 = FailGuard::new("log_file::seek::err", "return"); + assert!(catch_unwind_silent(|| { + engine + .write(&mut generate_batch(rid, 6, 7, Some(&entry)), true) + .unwrap_err(); + }) + .is_err()); + } + { + let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + assert_eq!(engine.last_index(rid), Some(5)); + } +} + #[cfg(feature = "scripting")] #[test] fn test_error_during_repair() {