From 9ec5a77371cca74c3e513a13e84d5864317357f9 Mon Sep 17 00:00:00 2001 From: tabokie Date: Tue, 2 Aug 2022 13:11:59 +0800 Subject: [PATCH] refactor a bit and add makefile Signed-off-by: tabokie --- .github/workflows/rust.yml | 32 +++++----- README.md | 8 +-- src/file_pipe_log/pipe.rs | 126 +++++++++++++------------------------ 3 files changed, 62 insertions(+), 104 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 850128c5..93458a4e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -29,24 +29,24 @@ jobs: if: ${{ matrix.os == 'ubuntu-latest' }} run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install grcov; fi - name: Format - run: cargo fmt --all -- --check + run: | + make format + git diff --quiet || git diff - name: Clippy - run: cargo clippy --all --all-features --all-targets -- -D clippy::all + run: make clippy - name: Run tests - run: | - cargo test --all --features all_except_failpoints --verbose -- --nocapture - cargo test --test failpoints --all-features --verbose -- --test-threads 1 --nocapture + run: make test_nightly env: RUST_BACKTRACE: 1 + EXTRA_CARGO_ARGS: '--verbose' - name: Run asan tests if: ${{ matrix.os == 'ubuntu-latest' }} - run: | - cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --all --features all_except_failpoints --verbose -- --nocapture - cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --test failpoints --all-features --verbose -- --test-threads 1 --nocapture + run: make test_nightly env: RUST_BACKTRACE: 1 RUSTFLAGS: '-Zsanitizer=address' RUSTDOCFLAGS: '-Zsanitizer=address' + EXTRA_CARGO_ARGS: '--verbose -Zbuild-std --target x86_64-unknown-linux-gnu' stable: runs-on: ${{ matrix.os }} strategy: @@ -66,16 +66,13 @@ jobs: - uses: Swatinem/rust-cache@v1 with: sharedKey: ${{ matrix.os }}-stable - - name: Format - run: cargo fmt --all -- --check - name: Clippy - run: cargo clippy --all --features all_stable --all-targets -- -D clippy::all + run: make clippy - name: Run tests - run: | - cargo test --all --features all_stable_except_failpoints --verbose -- --nocapture - cargo test --test failpoints --features all_stable --verbose -- --test-threads 1 --nocapture + run: make test env: RUST_BACKTRACE: 1 + EXTRA_CARGO_ARGS: '--verbose' coverage: runs-on: ubuntu-latest needs: nightly @@ -97,13 +94,12 @@ jobs: run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi - name: Run tests run: | - cargo test --all --features all_except_failpoints - cargo test --test failpoints --all-features -- --test-threads 1 - cargo test --all --features all_stable_except_failpoints - cargo test --test failpoints --features all_stable -- --test-threads 1 + make test + make test_nightly env: RUSTFLAGS: '-Zinstrument-coverage' LLVM_PROFILE_FILE: '%p-%m.profraw' + EXTRA_CARGO_ARGS: '--verbose' - name: Run grcov run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov - name: Upload diff --git a/README.md b/README.md index 9698348b..999aec8a 100644 --- a/README.md +++ b/README.md @@ -75,10 +75,10 @@ Contributions are always welcome! Here are a few tips for making a PR: - Tests are automatically run against the changes, some of them can be run locally: ``` -cargo fmt --all -- --check -cargo +nightly clippy --all --all-features --all-targets -- -D clippy::all -cargo +nightly test --all --features all_except_failpoints -cargo +nightly test --test failpoints --all-features -- --test-threads 1 +# rustup default nightly +make +# filter a specific test case +env EXTRA_CARGO_ARGS= make test_nightly ``` - For changes that might induce performance effects, please quote the targeted benchmark results in the PR description. In addition to micro-benchmarks, there is a standalone [stress test tool](https://github.com/tikv/raft-engine/tree/master/stress) which you can use to demonstrate the system performance. diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 4e74b769..f6e575eb 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -33,44 +33,32 @@ struct FileCollection { capacity: usize, } -#[cfg(test)] -impl Default for FileCollection { - fn default() -> Self { - Self { - first_seq: 0, - first_seq_in_use: 0, - fds: VecDeque::new(), - capacity: 0, - } - } -} - impl FileCollection { - /// Recycle the first obsolete(stale) file and renewed with new FileId. + /// Recycles the first obsolete file and renewed with new FileId. /// /// Attention please, the recycled file would be automatically `renamed` in /// this func. - pub fn recycle_one_file(&mut self, file_system: &F, dir_path: &str, dst_fd: FileId) -> bool { - if self.capacity == 0 || self.first_seq >= self.first_seq_in_use { - return false; - } + fn recycle_one_file(&mut self, file_system: &F, dir_path: &str, dst_fd: FileId) -> bool { + debug_assert!(self.first_seq <= self.first_seq_in_use); debug_assert!(!self.fds.is_empty()); - let first_file_id = FileId { - queue: dst_fd.queue, - seq: self.first_seq, - }; - let src_path = first_file_id.build_file_path(dir_path); // src filepath - let dst_path = dst_fd.build_file_path(dir_path); // dst filepath - if let Err(e) = file_system.reuse(&src_path, &dst_path) { - error!("error while trying to recycle one expired file: {}", e); - false - } else { - // Only if `rename` made sense, could we update the first_seq and return - // success. - self.fds.pop_front().unwrap(); - self.first_seq += 1; - true + if self.first_seq < self.first_seq_in_use { + let first_file_id = FileId { + queue: dst_fd.queue, + seq: self.first_seq, + }; + let src_path = first_file_id.build_file_path(dir_path); // src filepath + let dst_path = dst_fd.build_file_path(dir_path); // dst filepath + if let Err(e) = file_system.reuse(&src_path, &dst_path) { + error!("error while trying to recycle one expired file: {}", e); + } else { + // Only if `rename` made sense, could we update the first_seq and return + // success. + self.fds.pop_front().unwrap(); + self.first_seq += 1; + return true; + } } + false } } @@ -252,7 +240,7 @@ impl SinglePipe { }; let mut new_file = ActiveFile { seq, - // The file might generated from a recycled stale-file, we should reset the file + // The file might generated from a recycled stale-file, always reset the file // header of it. writer: build_file_writer( self.file_system.as_ref(), @@ -265,8 +253,6 @@ impl SinglePipe { // loss before a new entry is written. new_file.writer.sync()?; self.sync_dir()?; - let active_file_format_version = new_file.writer.header.version(); - **active_file = new_file; let len = { let mut files = self.files.write(); @@ -278,9 +264,10 @@ impl SinglePipe { seq, queue: self.queue, }, - active_file_format_version, + self.format_version, ), }); + **active_file = new_file; for listener in &self.listeners { listener.post_new_log_file(FileId { queue: self.queue, @@ -396,6 +383,7 @@ impl SinglePipe { return Ok(0); } + // TODO: move these under FileCollection. // Remove some obsolete files if capacity is exceeded. let obsolete_files = (file_seq - files.first_seq) as usize; // When capacity is zero, always remove logically deleted files. @@ -404,17 +392,18 @@ impl SinglePipe { // The files with format_version `V1` cannot be chosen as recycle // candidates, which should also be removed. // Find the newest obsolete `V1` file and refresh purge count. - for recycle_idx in (purged..obsolete_files).rev() { - if !files.fds[recycle_idx].context.version.has_log_signing() { - purged = recycle_idx + 1; + for i in (purged..obsolete_files).rev() { + if !files.fds[i].context.version.has_log_signing() { + purged = i + 1; break; } } // Update metadata of files + let old_first_seq = files.first_seq; files.first_seq += purged as u64; files.first_seq_in_use = file_seq; files.fds.drain(..purged); - (files.first_seq - purged as u64, purged, files.fds.len()) + (old_first_seq, purged, files.fds.len()) }; self.flush_metrics(remained); for seq in first_purge_seq..first_purge_seq + purged as u64 { @@ -681,6 +670,16 @@ mod tests { buf[..] == expected_data[..] }) } + fn new_file_handler(path: &str, file_id: FileId) -> FileHandler { + FileHandler { + handle: Arc::new( + DefaultFileSystem + .open(&file_id.build_file_path(path)) + .unwrap(), + ), + context: LogFileContext::new(file_id, Version::default()), + } + } let dir = Builder::new() .prefix("test_recycle_file_collections") .tempdir() @@ -688,19 +687,6 @@ mod tests { let path = dir.path().to_str().unwrap(); let data = vec![b'x'; 1024]; let file_system = Arc::new(DefaultFileSystem); - // test FileCollection with Default(Invalid) - { - let mut recycle_collections = FileCollection::::default(); - assert_eq!(recycle_collections.first_seq, 0); - assert_eq!(recycle_collections.first_seq_in_use, 0); - assert_eq!(recycle_collections.capacity, 0); - assert_eq!(recycle_collections.fds.len(), 0); - assert!(!recycle_collections.recycle_one_file( - &file_system, - path, - FileId::dummy(LogQueue::Append) - )); - } // test FileCollection with a valid file { // mock @@ -721,31 +707,15 @@ mod tests { first_seq: old_file_id.seq, first_seq_in_use: old_file_id.seq, capacity: 3, - ..Default::default() + fds: vec![new_file_handler(path, old_file_id)].into(), }; - recycle_collections.fds.push_back(FileHandler { - handle: Arc::new( - file_system - .open(&old_file_id.build_file_path(path)) - .unwrap(), - ), - context: LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()), - }); // recycle an old file assert!(!recycle_collections.recycle_one_file(&file_system, path, new_file_id)); // update the reycle collection { - recycle_collections.fds.push_back(FileHandler { - handle: Arc::new( - file_system - .open(&old_file_id.build_file_path(path)) - .unwrap(), - ), - context: LogFileContext::new( - FileId::dummy(LogQueue::Append), - Version::default(), - ), - }); + recycle_collections + .fds + .push_back(new_file_handler(path, old_file_id)); recycle_collections.first_seq_in_use = cur_file_id.seq; } // recycle an old file @@ -787,16 +757,8 @@ mod tests { first_seq: fake_file_id.seq, first_seq_in_use: fake_file_id.seq + 1, capacity: 2, - ..Default::default() + fds: vec![new_file_handler(path, fake_file_id)].into(), }; - recycle_collections.fds.push_back(FileHandler { - handle: Arc::new( - file_system - .open(&fake_file_id.build_file_path(path)) - .unwrap(), - ), - context: LogFileContext::new(fake_file_id, Version::default()), - }); let first_file_id = recycle_collections.fds.front().unwrap().context.id; assert_eq!(first_file_id, fake_file_id); // mock the failure on `rename`