diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6f770471..01571c5e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -10,7 +10,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ ubuntu-latest, macos-latest ] + os: [ ubuntu-22.04, macos-13 ] steps: - uses: actions/checkout@v2 with: @@ -26,7 +26,7 @@ jobs: with: sharedKey: ${{ matrix.os }} - name: Cache dependencies - if: ${{ matrix.os == 'ubuntu-latest' }} + if: ${{ matrix.os == 'ubuntu-22.04' }} run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov --version 0.8.9; fi - name: Format run: | @@ -42,7 +42,7 @@ jobs: RUST_BACKTRACE: 1 EXTRA_CARGO_ARGS: '--verbose' - name: Run asan tests - if: ${{ matrix.os == 'ubuntu-latest' }} + if: ${{ matrix.os == 'ubuntu-22.04' }} run: make test env: RUST_BACKTRACE: 1 @@ -53,7 +53,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ ubuntu-latest ] + os: [ ubuntu-22.04 ] steps: - uses: actions/checkout@v2 with: @@ -79,7 +79,7 @@ jobs: EXTRA_CARGO_ARGS: '--verbose' WITH_STABLE_TOOLCHAIN: 'force' coverage: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 needs: nightly steps: - uses: actions/checkout@v2 @@ -94,7 +94,7 @@ jobs: components: llvm-tools-preview - uses: Swatinem/rust-cache@v1 with: - sharedKey: ubuntu-latest + sharedKey: ubuntu-22.04 - name: Install grcov run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov --version 0.8.9; fi - name: Run tests diff --git a/src/engine.rs b/src/engine.rs index a9d368df..64a43354 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -331,9 +331,12 @@ where let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let mut ents_idx: Vec = Vec::with_capacity((end - begin) as usize); - memtable - .read() - .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; + // Ensure that the corresponding memtable is locked with a read lock before + // completing the fetching of entries from the raft logs. This + // prevents the scenario where the index could become stale while + // being concurrently updated by the `rewrite` operation. + let immutable = memtable.read(); + immutable.fetch_entries_to(begin, end, max_size, &mut ents_idx)?; for i in ents_idx.iter() { vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } @@ -632,9 +635,11 @@ pub(crate) mod tests { use crate::util::ReadableSize; use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; + use rand::{thread_rng, Rng}; use std::collections::{BTreeSet, HashSet}; use std::fs::OpenOptions; use std::path::PathBuf; + use std::sync::atomic::{AtomicBool, Ordering}; pub(crate) type RaftLogEngine = Engine; impl RaftLogEngine { @@ -1925,8 +1930,6 @@ pub(crate) mod tests { #[cfg(feature = "nightly")] #[bench] fn bench_engine_fetch_entries(b: &mut test::Bencher) { - use rand::{thread_rng, Rng}; - let dir = tempfile::Builder::new() .prefix("bench_engine_fetch_entries") .tempdir() @@ -2548,6 +2551,53 @@ pub(crate) mod tests { assert!(data.is_empty()); } + #[test] + fn test_fetch_with_concurrently_rewrite() { + let dir = tempfile::Builder::new() + .prefix("test_fetch_with_concurrently_rewrite") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(2048), + ..Default::default() + }; + let fs = Arc::new(DeleteMonitoredFileSystem::new()); + let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs).unwrap()); + let entry_data = vec![b'x'; 128]; + // Set up a concurrent write with purge, and fetch. + let mut vec: Vec = Vec::new(); + let fetch_engine = engine.clone(); + let flag = Arc::new(AtomicBool::new(false)); + let start_flag = flag.clone(); + let th = std::thread::spawn(move || { + while !start_flag.load(Ordering::Acquire) { + std::thread::sleep(Duration::from_millis(10)); + } + for _ in 0..10 { + let region_id = thread_rng().gen_range(1..=10); + // Should not return file seqno out of range error. + let _ = fetch_engine + .fetch_entries_to::(region_id, 1, 101, None, &mut vec) + .map_err(|e| { + assert!(!format!("{e}").contains("file seqno out of")); + }); + vec.clear(); + } + }); + for i in 0..10 { + for rid in 1..=10 { + engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data)); + } + flag.store(true, Ordering::Release); + for rid in 1..=10 { + engine.clean(rid); + } + engine.purge_expired_files().unwrap(); + } + th.join().unwrap(); + } + #[test] fn test_internal_key_filter() { let dir = tempfile::Builder::new()