diff --git a/src/engine.rs b/src/engine.rs index b369d5c0..d8f21b41 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -71,7 +71,7 @@ where cfg: Config, file_system: Arc, ) -> Result>> { - Engine::open_with(cfg, file_system, vec![]) + Self::open_with(cfg, file_system, vec![]) } fn open_with( @@ -739,7 +739,8 @@ pub(crate) mod tests { dir: sub_dir.to_str().unwrap().to_owned(), ..Default::default() }; - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); } #[test] @@ -757,7 +758,7 @@ pub(crate) mod tests { ..Default::default() }; - let engine = Engine::open_with_file_system( + let engine = RaftLogEngine::open_with_file_system( cfg.clone(), Arc::new(ObfuscatedFileSystem::default()), ) @@ -811,7 +812,7 @@ pub(crate) mod tests { target_file_size: ReadableSize(1), ..Default::default() }; - let engine = Engine::open_with_file_system( + let engine = RaftLogEngine::open_with_file_system( cfg.clone(), Arc::new(ObfuscatedFileSystem::default()), ) @@ -885,7 +886,8 @@ pub(crate) mod tests { }; let rid = 1; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); engine .scan_messages::(rid, None, None, false, |_, _| { @@ -973,7 +975,8 @@ pub(crate) mod tests { delete_batch.delete(rid, key.clone()); let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); assert_eq!( engine.get_message::(rid, &key).unwrap(), None @@ -1083,7 +1086,8 @@ pub(crate) mod tests { ..Default::default() }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let data = vec![b'x'; 1024]; // rewrite:[1 ..10] @@ -1195,7 +1199,8 @@ pub(crate) mod tests { }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let data = vec![b'x'; 1024]; for index in 0..100 { engine.append(1, index, index + 1, Some(&data)); @@ -1255,7 +1260,8 @@ pub(crate) mod tests { }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let data = vec![b'x'; 1024]; // write 50 small entries into region 1~3, it should trigger force compact. for rid in 1..=3 { @@ -1309,7 +1315,8 @@ pub(crate) mod tests { ..Default::default() }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let data = vec![b'x'; 1024]; // Put 100 entries into 10 regions. @@ -1374,7 +1381,8 @@ pub(crate) mod tests { ..Default::default() }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let mut log_batch = LogBatch::default(); let empty_entry = Entry::new(); @@ -1433,7 +1441,8 @@ pub(crate) mod tests { ..Default::default() }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let data = vec![b'x'; 16]; let cases = [[false, false], [false, true], [true, true]]; for (i, writes) in cases.iter().enumerate() { @@ -1460,7 +1469,8 @@ pub(crate) mod tests { ..Default::default() }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let data = vec![b'x'; 1024]; for rid in 1..21 { @@ -1492,7 +1502,8 @@ pub(crate) mod tests { ..Default::default() }; let engine = - Engine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); let data = vec![b'x'; 2 * 1024 * 1024]; for rid in 1..=3 { @@ -1650,7 +1661,7 @@ pub(crate) mod tests { ..Default::default() }; - let engine = Engine::open_with_file_system(cfg, fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); for bs in batches.iter_mut() { for batch in bs.iter_mut() { engine.write(batch, false).unwrap(); @@ -1711,7 +1722,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=50 { engine.append(rid, 1, 6, Some(&entry_data)); } @@ -1748,7 +1759,7 @@ pub(crate) mod tests { ) .unwrap(); - let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); for rid in 1..25 { engine.scan_entries(rid, 1, 6, |_, _, d| { assert_eq!(d, &entry_data); @@ -1776,7 +1787,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=50 { engine.append(rid, 1, 6, Some(&entry_data)); } @@ -1810,7 +1821,7 @@ pub(crate) mod tests { ) .unwrap(); - let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); for rid in 1..25 { if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) { continue; @@ -1857,7 +1868,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=50 { engine.append(rid, 1, 6, Some(&entry_data)); } @@ -1878,11 +1889,11 @@ pub(crate) mod tests { // Corrupt a log batch. f.set_len(f.metadata().unwrap().len() - 1).unwrap(); - Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); // Corrupt the file header. f.set_len(1).unwrap(); - Engine::open_with_file_system(cfg, fs).unwrap(); + RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); } #[test] @@ -1899,7 +1910,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } @@ -1907,7 +1918,7 @@ pub(crate) mod tests { assert!(RaftLogEngine::open(cfg.clone()).is_err()); - let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); for rid in 1..10 { engine.scan_entries(rid, 1, 11, |_, _, d| { assert_eq!(d, &entry_data); @@ -1959,7 +1970,7 @@ pub(crate) mod tests { let fs = Arc::new(ObfuscatedFileSystem::default()); let rid = 1; - let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); assert!(engine.is_empty()); engine.append(rid, 1, 11, Some(&entry_data)); assert!(!engine.is_empty()); @@ -2096,7 +2107,7 @@ pub(crate) mod tests { ..Default::default() }; let fs = Arc::new(DeleteMonitoredFileSystem::new()); - let engine = Engine::open_with_file_system(cfg, fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } @@ -2154,7 +2165,7 @@ pub(crate) mod tests { }; let recycle_capacity = cfg.recycle_capacity() as u64; let fs = Arc::new(DeleteMonitoredFileSystem::new()); - let engine = Engine::open_with_file_system(cfg, fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); let reserved_start = *fs.reserved_metadata.lock().unwrap().first().unwrap(); for rid in 1..=10 { @@ -2262,14 +2273,14 @@ pub(crate) mod tests { assert!(cfg_v2.recycle_capacity() > 0); // Prepare files with format_version V1 { - let engine = Engine::open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } } // Reopen the Engine with V2 and purge { - let engine = Engine::open_with_file_system(cfg_v2.clone(), fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_v2.clone(), fs.clone()).unwrap(); let (start, _) = engine.file_span(LogQueue::Append); for rid in 6..=10 { engine.append(rid, 11, 20, Some(&entry_data)); @@ -2283,7 +2294,7 @@ pub(crate) mod tests { } // Reopen the Engine with V1 -> V2 and purge { - let engine = Engine::open_with_file_system(cfg_v1, fs.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_v1, fs.clone()).unwrap(); let (start, _) = engine.file_span(LogQueue::Append); for rid in 6..=10 { engine.append(rid, 20, 30, Some(&entry_data)); @@ -2297,7 +2308,7 @@ pub(crate) mod tests { assert_eq!(engine.file_span(LogQueue::Append).0, start); let file_count = engine.file_count(Some(LogQueue::Append)); drop(engine); - let engine = Engine::open_with_file_system(cfg_v2, fs).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_v2, fs).unwrap(); assert_eq!(engine.file_span(LogQueue::Append).0, start); assert_eq!(engine.file_count(Some(LogQueue::Append)), file_count); // Mark all regions obsolete. @@ -2328,7 +2339,7 @@ pub(crate) mod tests { enable_log_recycle: false, ..Default::default() }; - let engine = Engine::open_with_file_system(cfg, file_system.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, file_system.clone()).unwrap(); let (start, _) = engine.file_span(LogQueue::Append); // Only one valid file left, the last one => active_file. assert_eq!(engine.file_count(Some(LogQueue::Append)), 1); @@ -2350,7 +2361,8 @@ pub(crate) mod tests { prefill_for_recycle: true, ..Default::default() }; - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); let (start, end) = engine.file_span(LogQueue::Append); // Only one valid file left, the last one => active_file. assert_eq!(start, end); @@ -2373,7 +2385,8 @@ pub(crate) mod tests { purge_threshold: ReadableSize(50), ..cfg }; - let engine = Engine::open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap(); assert_eq!(engine.file_span(LogQueue::Append), (start, end)); assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None)); // Recycled files have filled the LogQueue::Append, purge_expired_files won't @@ -2397,7 +2410,7 @@ pub(crate) mod tests { prefill_for_recycle: false, ..cfg_v2 }; - let engine = Engine::open_with_file_system(cfg_v3, file_system.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_v3, file_system.clone()).unwrap(); assert_eq!(file_system.inner.file_count(), engine.file_count(None)); } @@ -2418,7 +2431,7 @@ pub(crate) mod tests { let key = vec![b'x'; 2]; let value = vec![b'y'; 8]; - let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); let mut data = HashSet::new(); let mut rid = 1; // Directly write to pipe log. @@ -2585,7 +2598,7 @@ pub(crate) mod tests { ..Default::default() }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); let value = vec![b'y'; 8]; let mut log_batch = LogBatch::default(); log_batch.put_unchecked(1, crate::make_internal_key(&[1]), value.clone()); @@ -2685,7 +2698,8 @@ pub(crate) mod tests { }; // Step 1: write data into the main directory. - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 10, Some(&entry_data)); } @@ -2699,7 +2713,7 @@ pub(crate) mod tests { purge_threshold: ReadableSize(40), ..cfg }; - let engine = Engine::open_with_file_system(cfg_2, file_system).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system).unwrap(); assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); for rid in 1..=10 { assert_eq!(engine.first_index(rid).unwrap(), 1); @@ -2748,7 +2762,8 @@ pub(crate) mod tests { }; // Step 1: write data into the main directory. - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 10, Some(&entry_data)); } @@ -2774,7 +2789,8 @@ pub(crate) mod tests { // abnormal case - Empty second dir { std::fs::remove_dir_all(sec_dir.path()).unwrap(); - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); // All files in first dir are copied to second dir assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); @@ -2796,7 +2812,8 @@ pub(crate) mod tests { file_count += 1; } } - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); // Missing append files are copied assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); @@ -2818,7 +2835,8 @@ pub(crate) mod tests { file_count += 1; } } - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); // Missing rewrite files are copied assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); @@ -2836,7 +2854,8 @@ pub(crate) mod tests { file_count += 1; } } - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); // Missing reserve files are copied assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); @@ -2855,7 +2874,8 @@ pub(crate) mod tests { .unwrap(); } } - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); // Extra files are untouched. assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path())); assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); @@ -2874,7 +2894,8 @@ pub(crate) mod tests { f.write_all(b"corrupted").unwrap(); } } - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); // Corrupted files are untouched. assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path())); assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); @@ -2911,7 +2932,8 @@ pub(crate) mod tests { }; { // Step 1: write data into the main directory. - let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 10, Some(&entry_data)); } @@ -2948,7 +2970,7 @@ pub(crate) mod tests { ..cfg.clone() }; let recycle_capacity = cfg_2.recycle_capacity() as u64; - let engine = Engine::open_with_file_system(cfg_2, file_system.clone()).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap(); assert!(number_of_files(spill_dir.path()) > 0); for rid in 1..=10 { assert_eq!(engine.first_index(rid).unwrap(), 1); @@ -2975,7 +2997,7 @@ pub(crate) mod tests { ..cfg }; drop(engine); - let engine = Engine::open_with_file_system(cfg_3, file_system).unwrap(); + let engine = RaftLogEngine::open_with_file_system(cfg_3, file_system).unwrap(); assert!(number_of_files(spill_dir.path()) > 0); for rid in 1..=10 { assert_eq!(engine.first_index(rid).unwrap(), 20);