diff --git a/src/engine.rs b/src/engine.rs index 10416a29..6d92e850 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -334,9 +334,12 @@ where let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let mut ents_idx: Vec = Vec::with_capacity((end - begin) as usize); - memtable - .read() - .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; + // Ensure that the corresponding memtable is locked with a read lock before + // completing the fetching of entries from the raft logs. This + // prevents the scenario where the index could become stale while + // being concurrently updated by the `rewrite` operation. + let immutable = memtable.read(); + immutable.fetch_entries_to(begin, end, max_size, &mut ents_idx)?; for i in ents_idx.iter() { vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } @@ -635,9 +638,11 @@ pub(crate) mod tests { use crate::util::ReadableSize; use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; + use rand::{thread_rng, Rng}; use std::collections::{BTreeSet, HashSet}; use std::fs::OpenOptions; use std::path::PathBuf; + use std::sync::atomic::{AtomicBool, Ordering}; pub(crate) type RaftLogEngine = Engine; impl RaftLogEngine { @@ -1929,8 +1934,6 @@ pub(crate) mod tests { #[cfg(feature = "nightly")] #[bench] fn bench_engine_fetch_entries(b: &mut test::Bencher) { - use rand::{thread_rng, Rng}; - let dir = tempfile::Builder::new() .prefix("bench_engine_fetch_entries") .tempdir() @@ -2587,6 +2590,53 @@ pub(crate) mod tests { assert!(data.is_empty(), "data loss {:?}", data); } + #[test] + fn test_fetch_with_concurrently_rewrite() { + let dir = tempfile::Builder::new() + .prefix("test_fetch_with_concurrently_rewrite") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(2048), + ..Default::default() + }; + let fs = Arc::new(DeleteMonitoredFileSystem::new()); + let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap()); + let entry_data = vec![b'x'; 128]; + // Set up a concurrent write with purge, and fetch. + let mut vec: Vec = Vec::new(); + let fetch_engine = engine.clone(); + let flag = Arc::new(AtomicBool::new(false)); + let start_flag = flag.clone(); + let th = std::thread::spawn(move || { + while !start_flag.load(Ordering::Acquire) { + std::thread::sleep(Duration::from_millis(10)); + } + for _ in 0..10 { + let region_id = thread_rng().gen_range(1..=10); + // Should not return file seqno out of range error. + let _ = fetch_engine + .fetch_entries_to::(region_id, 1, 101, None, &mut vec) + .map_err(|e| { + assert!(!format!("{e}").contains("file seqno out of")); + }); + vec.clear(); + } + }); + for i in 0..10 { + for rid in 1..=10 { + engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data)); + } + flag.store(true, Ordering::Release); + for rid in 1..=10 { + engine.clean(rid); + } + engine.purge_expired_files().unwrap(); + } + th.join().unwrap(); + } + #[test] fn test_internal_key_filter() { let dir = tempfile::Builder::new()