Skip to content

Commit

Permalink
refactor a bit and add makefile
Browse files Browse the repository at this point in the history
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie committed Aug 2, 2022
1 parent 02ed068 commit 9ec5a77
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 104 deletions.
32 changes: 14 additions & 18 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ jobs:
if: ${{ matrix.os == 'ubuntu-latest' }}
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install grcov; fi
- name: Format
run: cargo fmt --all -- --check
run: |
make format
git diff --quiet || git diff
- name: Clippy
run: cargo clippy --all --all-features --all-targets -- -D clippy::all
run: make clippy
- name: Run tests
run: |
cargo test --all --features all_except_failpoints --verbose -- --nocapture
cargo test --test failpoints --all-features --verbose -- --test-threads 1 --nocapture
run: make test_nightly
env:
RUST_BACKTRACE: 1
EXTRA_CARGO_ARGS: '--verbose'
- name: Run asan tests
if: ${{ matrix.os == 'ubuntu-latest' }}
run: |
cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --all --features all_except_failpoints --verbose -- --nocapture
cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --test failpoints --all-features --verbose -- --test-threads 1 --nocapture
run: make test_nightly
env:
RUST_BACKTRACE: 1
RUSTFLAGS: '-Zsanitizer=address'
RUSTDOCFLAGS: '-Zsanitizer=address'
EXTRA_CARGO_ARGS: '--verbose -Zbuild-std --target x86_64-unknown-linux-gnu'
stable:
runs-on: ${{ matrix.os }}
strategy:
Expand All @@ -66,16 +66,13 @@ jobs:
- uses: Swatinem/rust-cache@v1
with:
sharedKey: ${{ matrix.os }}-stable
- name: Format
run: cargo fmt --all -- --check
- name: Clippy
run: cargo clippy --all --features all_stable --all-targets -- -D clippy::all
run: make clippy
- name: Run tests
run: |
cargo test --all --features all_stable_except_failpoints --verbose -- --nocapture
cargo test --test failpoints --features all_stable --verbose -- --test-threads 1 --nocapture
run: make test
env:
RUST_BACKTRACE: 1
EXTRA_CARGO_ARGS: '--verbose'
coverage:
runs-on: ubuntu-latest
needs: nightly
Expand All @@ -97,13 +94,12 @@ jobs:
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi
- name: Run tests
run: |
cargo test --all --features all_except_failpoints
cargo test --test failpoints --all-features -- --test-threads 1
cargo test --all --features all_stable_except_failpoints
cargo test --test failpoints --features all_stable -- --test-threads 1
make test
make test_nightly
env:
RUSTFLAGS: '-Zinstrument-coverage'
LLVM_PROFILE_FILE: '%p-%m.profraw'
EXTRA_CARGO_ARGS: '--verbose'
- name: Run grcov
run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov
- name: Upload
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ Contributions are always welcome! Here are a few tips for making a PR:
- Tests are automatically run against the changes, some of them can be run locally:

```
cargo fmt --all -- --check
cargo +nightly clippy --all --all-features --all-targets -- -D clippy::all
cargo +nightly test --all --features all_except_failpoints
cargo +nightly test --test failpoints --all-features -- --test-threads 1
# rustup default nightly
make
# filter a specific test case
env EXTRA_CARGO_ARGS=<testname> make test_nightly
```

- For changes that might induce performance effects, please quote the targeted benchmark results in the PR description. In addition to micro-benchmarks, there is a standalone [stress test tool](https://github.com/tikv/raft-engine/tree/master/stress) which you can use to demonstrate the system performance.
Expand Down
126 changes: 44 additions & 82 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,32 @@ struct FileCollection<F: FileSystem> {
capacity: usize,
}

#[cfg(test)]
impl<F: FileSystem> Default for FileCollection<F> {
fn default() -> Self {
Self {
first_seq: 0,
first_seq_in_use: 0,
fds: VecDeque::new(),
capacity: 0,
}
}
}

impl<F: FileSystem> FileCollection<F> {
/// Recycle the first obsolete(stale) file and renewed with new FileId.
/// Recycles the first obsolete file and renewed with new FileId.
///
/// Attention please, the recycled file would be automatically `renamed` in
/// this func.
pub fn recycle_one_file(&mut self, file_system: &F, dir_path: &str, dst_fd: FileId) -> bool {
if self.capacity == 0 || self.first_seq >= self.first_seq_in_use {
return false;
}
fn recycle_one_file(&mut self, file_system: &F, dir_path: &str, dst_fd: FileId) -> bool {
debug_assert!(self.first_seq <= self.first_seq_in_use);
debug_assert!(!self.fds.is_empty());
let first_file_id = FileId {
queue: dst_fd.queue,
seq: self.first_seq,
};
let src_path = first_file_id.build_file_path(dir_path); // src filepath
let dst_path = dst_fd.build_file_path(dir_path); // dst filepath
if let Err(e) = file_system.reuse(&src_path, &dst_path) {
error!("error while trying to recycle one expired file: {}", e);
false
} else {
// Only if `rename` made sense, could we update the first_seq and return
// success.
self.fds.pop_front().unwrap();
self.first_seq += 1;
true
if self.first_seq < self.first_seq_in_use {
let first_file_id = FileId {
queue: dst_fd.queue,
seq: self.first_seq,
};
let src_path = first_file_id.build_file_path(dir_path); // src filepath
let dst_path = dst_fd.build_file_path(dir_path); // dst filepath
if let Err(e) = file_system.reuse(&src_path, &dst_path) {
error!("error while trying to recycle one expired file: {}", e);
} else {
// Only if `rename` made sense, could we update the first_seq and return
// success.
self.fds.pop_front().unwrap();
self.first_seq += 1;
return true;
}
}
false
}
}

Expand Down Expand Up @@ -252,7 +240,7 @@ impl<F: FileSystem> SinglePipe<F> {
};
let mut new_file = ActiveFile {
seq,
// The file might generated from a recycled stale-file, we should reset the file
// The file might generated from a recycled stale-file, always reset the file
// header of it.
writer: build_file_writer(
self.file_system.as_ref(),
Expand All @@ -265,8 +253,6 @@ impl<F: FileSystem> SinglePipe<F> {
// loss before a new entry is written.
new_file.writer.sync()?;
self.sync_dir()?;
let active_file_format_version = new_file.writer.header.version();
**active_file = new_file;

let len = {
let mut files = self.files.write();
Expand All @@ -278,9 +264,10 @@ impl<F: FileSystem> SinglePipe<F> {
seq,
queue: self.queue,
},
active_file_format_version,
self.format_version,
),
});
**active_file = new_file;
for listener in &self.listeners {
listener.post_new_log_file(FileId {
queue: self.queue,
Expand Down Expand Up @@ -396,6 +383,7 @@ impl<F: FileSystem> SinglePipe<F> {
return Ok(0);
}

// TODO: move these under FileCollection.
// Remove some obsolete files if capacity is exceeded.
let obsolete_files = (file_seq - files.first_seq) as usize;
// When capacity is zero, always remove logically deleted files.
Expand All @@ -404,17 +392,18 @@ impl<F: FileSystem> SinglePipe<F> {
// The files with format_version `V1` cannot be chosen as recycle
// candidates, which should also be removed.
// Find the newest obsolete `V1` file and refresh purge count.
for recycle_idx in (purged..obsolete_files).rev() {
if !files.fds[recycle_idx].context.version.has_log_signing() {
purged = recycle_idx + 1;
for i in (purged..obsolete_files).rev() {
if !files.fds[i].context.version.has_log_signing() {
purged = i + 1;
break;
}
}
// Update metadata of files
let old_first_seq = files.first_seq;
files.first_seq += purged as u64;
files.first_seq_in_use = file_seq;
files.fds.drain(..purged);
(files.first_seq - purged as u64, purged, files.fds.len())
(old_first_seq, purged, files.fds.len())
};
self.flush_metrics(remained);
for seq in first_purge_seq..first_purge_seq + purged as u64 {
Expand Down Expand Up @@ -681,26 +670,23 @@ mod tests {
buf[..] == expected_data[..]
})
}
fn new_file_handler(path: &str, file_id: FileId) -> FileHandler<DefaultFileSystem> {
FileHandler {
handle: Arc::new(
DefaultFileSystem
.open(&file_id.build_file_path(path))
.unwrap(),
),
context: LogFileContext::new(file_id, Version::default()),
}
}
let dir = Builder::new()
.prefix("test_recycle_file_collections")
.tempdir()
.unwrap();
let path = dir.path().to_str().unwrap();
let data = vec![b'x'; 1024];
let file_system = Arc::new(DefaultFileSystem);
// test FileCollection with Default(Invalid)
{
let mut recycle_collections = FileCollection::<DefaultFileSystem>::default();
assert_eq!(recycle_collections.first_seq, 0);
assert_eq!(recycle_collections.first_seq_in_use, 0);
assert_eq!(recycle_collections.capacity, 0);
assert_eq!(recycle_collections.fds.len(), 0);
assert!(!recycle_collections.recycle_one_file(
&file_system,
path,
FileId::dummy(LogQueue::Append)
));
}
// test FileCollection with a valid file
{
// mock
Expand All @@ -721,31 +707,15 @@ mod tests {
first_seq: old_file_id.seq,
first_seq_in_use: old_file_id.seq,
capacity: 3,
..Default::default()
fds: vec![new_file_handler(path, old_file_id)].into(),
};
recycle_collections.fds.push_back(FileHandler {
handle: Arc::new(
file_system
.open(&old_file_id.build_file_path(path))
.unwrap(),
),
context: LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()),
});
// recycle an old file
assert!(!recycle_collections.recycle_one_file(&file_system, path, new_file_id));
// update the reycle collection
{
recycle_collections.fds.push_back(FileHandler {
handle: Arc::new(
file_system
.open(&old_file_id.build_file_path(path))
.unwrap(),
),
context: LogFileContext::new(
FileId::dummy(LogQueue::Append),
Version::default(),
),
});
recycle_collections
.fds
.push_back(new_file_handler(path, old_file_id));
recycle_collections.first_seq_in_use = cur_file_id.seq;
}
// recycle an old file
Expand Down Expand Up @@ -787,16 +757,8 @@ mod tests {
first_seq: fake_file_id.seq,
first_seq_in_use: fake_file_id.seq + 1,
capacity: 2,
..Default::default()
fds: vec![new_file_handler(path, fake_file_id)].into(),
};
recycle_collections.fds.push_back(FileHandler {
handle: Arc::new(
file_system
.open(&fake_file_id.build_file_path(path))
.unwrap(),
),
context: LogFileContext::new(fake_file_id, Version::default()),
});
let first_file_id = recycle_collections.fds.front().unwrap().context.id;
assert_eq!(first_file_id, fake_file_id);
// mock the failure on `rename`
Expand Down

0 comments on commit 9ec5a77

Please sign in to comment.