diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f6d967c6..1ca6f12f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,7 +19,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2023-07-01 + toolchain: nightly-2023-12-31 override: true components: rustfmt, clippy, rust-src - uses: Swatinem/rust-cache@v1 @@ -34,6 +34,8 @@ jobs: git diff --exit-code - name: Clippy run: make clippy + env: + EXTRA_CARGO_ARGS: '--fix' - name: Run tests run: make test env: @@ -60,7 +62,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: 1.67.1 + toolchain: 1.75.0 override: true components: rustfmt, clippy, rust-src - uses: Swatinem/rust-cache@v1 @@ -87,7 +89,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2023-07-01 + toolchain: nightly-2023-12-31 override: true components: llvm-tools-preview - uses: Swatinem/rust-cache@v1 @@ -96,15 +98,16 @@ jobs: - name: Install grcov run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi - name: Run tests - run: | - make test_matrix + run: make test_matrix env: - RUSTFLAGS: '-Zinstrument-coverage' + RUSTFLAGS: '-Cinstrument-coverage' LLVM_PROFILE_FILE: '%p-%m.profraw' EXTRA_CARGO_ARGS: '--verbose' - name: Run grcov run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov - name: Upload - uses: codecov/codecov-action@v2 + uses: codecov/codecov-action@v3 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} with: file: coverage.lcov diff --git a/CHANGELOG.md b/CHANGELOG.md index e2aaa484..fb0c9b30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ ## [Unreleased] +### New Features + +* Add a new Prometheus metric `raft_engine_write_compression_ratio` to track compression ratio of write #358 + +## [0.4.2] - 2024-04-16 + +### Behavior Changes + +* Periodically flush unsynced bytes when rewriting to avoid I/O jitters if flushing too many bytes impede the foreground writes. (#347) +* Errors will be returned if rewriting fails, instread of `panic` directly. (#343) + ## [0.4.1] - 2023-09-14 ### Behavior Changes diff --git a/Cargo.toml b/Cargo.toml index 4f6f4846..fe1e4853 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "raft-engine" -version = "0.4.1" +version = "0.4.2" authors = ["The TiKV Project Developers"] edition = "2018" -rust-version = "1.66.0" +rust-version = "1.75.0" description = "A persistent storage engine for Multi-Raft logs" readme = "README.md" repository = "https://github.com/tikv/raft-engine" @@ -42,7 +42,10 @@ hex = "0.4" if_chain = "1.0" lazy_static = "1.3" libc = "0.2" -log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } +log = { version = "0.4", features = [ + "max_level_trace", + "release_max_level_debug", +] } lz4-sys = "1.9" memmap2 = { version = "0.9", optional = true } nix = "0.26" @@ -57,15 +60,19 @@ rhai = { version = "1.7", features = ["sync"], optional = true } scopeguard = "1.1" serde = { version = "1.0", features = ["derive"] } serde_repr = "0.1" -strum = { version = "0.25.0", features = ["derive"] } +strum = { version = "0.26.2", features = ["derive"] } thiserror = "1.0" [dev-dependencies] criterion = "0.4" ctor = "0.2" env_logger = "0.10" -kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] } -raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } +kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [ + "protobuf-codec", +] } +raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [ + "protobuf-codec", +] } rand = "0.8" rand_distr = "0.4" tempfile = "3.6" @@ -74,19 +81,10 @@ toml = "0.8" [features] default = ["internals", "scripting"] internals = [] -nightly = [ - "prometheus/nightly", -] -failpoints = [ - "fail/failpoints", -] -scripting = [ - "rhai", -] -swap = [ - "nightly", - "memmap2", -] +nightly = ["prometheus/nightly"] +failpoints = ["fail/failpoints"] +scripting = ["rhai"] +swap = ["nightly", "memmap2"] std_fs = [] nightly_group = ["nightly", "swap"] diff --git a/ctl/Cargo.toml b/ctl/Cargo.toml index 674b46dc..0b95c8ab 100644 --- a/ctl/Cargo.toml +++ b/ctl/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "raft-engine-ctl" -version = "0.4.1" +version = "0.4.2" authors = ["The TiKV Project Developers"] edition = "2018" -rust-version = "1.61.0" +rust-version = "1.75.0" description = "A control tool for Raft Engine" repository = "https://github.com/tikv/raft-engine" license = "Apache-2.0" @@ -11,4 +11,7 @@ license = "Apache-2.0" [dependencies] clap = { version = "4.5", features = ["derive", "cargo"] } env_logger = "0.10" -raft-engine = { path = "..", version = "0.4.1", features = ["scripting", "internals"] } +raft-engine = { path = "..", version = "0.4.1", features = [ + "scripting", + "internals", +] } diff --git a/src/engine.rs b/src/engine.rs index 1a09b397..10416a29 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -142,7 +142,7 @@ where return Ok(0); } let start = Instant::now(); - let len = log_batch.finish_populate( + let (len, compression_ratio) = log_batch.finish_populate( self.cfg.batch_compression_threshold.0 as usize, self.cfg.compression_level, )?; @@ -225,6 +225,7 @@ where now = end; ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64()); ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64); + ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM.observe(compression_ratio); Ok(len) } diff --git a/src/env/log_fd/unix.rs b/src/env/log_fd/unix.rs index 608cca70..e9b75542 100644 --- a/src/env/log_fd/unix.rs +++ b/src/env/log_fd/unix.rs @@ -83,7 +83,7 @@ impl LogFd { while readed < buf.len() { let bytes = match pread(self.0, &mut buf[readed..], offset as i64) { Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, + Err(Errno::EINTR) => continue, Err(e) => return Err(from_nix_error(e, "pread")), }; // EOF @@ -106,7 +106,7 @@ impl LogFd { while written < content.len() { let bytes = match pwrite(self.0, &content[written..], offset as i64) { Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, + Err(Errno::EINTR) => continue, Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")), Err(e) => return Err(from_nix_error(e, "pwrite")), }; diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index 64042e01..c65515a3 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -218,7 +218,7 @@ pub mod debug { let log_file_format = LogFileContext::new(file_id, Version::default()); for batch in bs.iter_mut() { let offset = writer.offset() as u64; - let len = batch + let (len, _) = batch .finish_populate(1 /* compression_threshold */, None) .unwrap(); batch.prepare_write(&log_file_format).unwrap(); diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 43b3483a..27ea8267 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -713,7 +713,7 @@ mod tests { // Retire files. assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first); // Try to read recycled file. - for (_, handle) in handles.into_iter().enumerate() { + for handle in handles.into_iter() { assert!(pipe_log.read_bytes(handle).is_err()); } // Try to reuse. diff --git a/src/log_batch.rs b/src/log_batch.rs index c6ce147c..537609dc 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -767,28 +767,32 @@ impl LogBatch { &mut self, compression_threshold: usize, compression_level: Option, - ) -> Result { + ) -> Result<(usize, f64)> { let _t = StopWatch::new(perf_context!(log_populating_duration)); debug_assert!(self.buf_state == BufState::Open); if self.is_empty() { self.buf_state = BufState::Encoded(self.buf.len(), 0); - return Ok(0); + return Ok((0, 0.0)); } self.buf_state = BufState::Incomplete; // entries - let (header_offset, compression_type) = if compression_threshold > 0 + let (header_offset, compression_type, compression_ratio) = if compression_threshold > 0 && self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold { let buf_len = self.buf.len(); - lz4::append_compress_block( + let compression_ratio = lz4::append_compress_block( &mut self.buf, LOG_BATCH_HEADER_LEN, compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL), )?; - (buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4) + ( + buf_len - LOG_BATCH_HEADER_LEN, + CompressionType::Lz4, + compression_ratio, + ) } else { - (0, CompressionType::None) + (0, CompressionType::None, 0.0) }; // checksum @@ -830,7 +834,7 @@ impl LogBatch { } self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN); - Ok(self.buf.len() - header_offset) + Ok((self.buf.len() - header_offset, compression_ratio)) } /// Make preparations for the write of `LogBatch`. @@ -1328,7 +1332,7 @@ mod tests { offset: 0, }; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(usize::from(compress), None).unwrap(); + let (len, _) = batch.finish_populate(usize::from(compress), None).unwrap(); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let mut batch_handle = mocked_file_block_handle; @@ -1493,7 +1497,7 @@ mod tests { batch1.merge(&mut batch2).unwrap(); assert!(batch2.is_empty()); - let len = batch1.finish_populate(0, None).unwrap(); + let (len, _) = batch1.finish_populate(0, None).unwrap(); batch1.prepare_write(&file_context).unwrap(); let encoded = batch1.encoded_bytes(); assert_eq!(len, encoded.len()); @@ -1549,7 +1553,8 @@ mod tests { offset: 0, }; let buf_len = batch.buf.len(); - let len = batch.finish_populate(1, None).unwrap(); + let (len, compression_ratio) = batch.finish_populate(1, None).unwrap(); + assert!(compression_ratio == 0.0); assert!(len == 0); assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0)); let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2); @@ -1671,7 +1676,8 @@ mod tests { }, ]; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(1, None).unwrap(); + let (len, compression_ratio) = batch.finish_populate(1, None).unwrap(); + assert!(compression_ratio > 0.0); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let checksum = batch.item_batch.checksum; diff --git a/src/metrics.rs b/src/metrics.rs index 6ca10940..3fcf692c 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -201,6 +201,12 @@ lazy_static! { exponential_buckets(256.0, 1.8, 22).unwrap() ) .unwrap(); + pub static ref ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM: Histogram = register_histogram!( + "raft_engine_write_compression_ratio", + "Bucketed histogram of Raft Engine write compression ratio", + exponential_buckets(0.0005, 1.8, 16).unwrap() + ) + .unwrap(); pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!( "raft_engine_allocate_log_duration_seconds", "Bucketed histogram of Raft Engine allocate log duration", diff --git a/src/swappy_allocator.rs b/src/swappy_allocator.rs index 8baa4835..0cb8db9b 100644 --- a/src/swappy_allocator.rs +++ b/src/swappy_allocator.rs @@ -288,6 +288,7 @@ impl Page { .read(true) .write(true) .create(true) + .truncate(true) .open(path) .map_err(|e| error!("Failed to open swap file: {e}")) .ok()?; diff --git a/src/util.rs b/src/util.rs index 2e35a83e..7e1d09c0 100644 --- a/src/util.rs +++ b/src/util.rs @@ -226,9 +226,10 @@ pub mod lz4 { pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1; /// Compress content in `buf[skip..]`, and append output to `buf`. - pub fn append_compress_block(buf: &mut Vec, skip: usize, level: usize) -> Result<()> { + pub fn append_compress_block(buf: &mut Vec, skip: usize, level: usize) -> Result { let buf_len = buf.len(); let content_len = buf_len - skip; + let mut compression_ratio = 0.0; if content_len > 0 { if content_len > i32::MAX as usize { return Err(Error::InvalidArgument(format!( @@ -256,10 +257,11 @@ pub mod lz4 { if compressed == 0 { return Err(Error::Other(box_err!("Compression failed"))); } + compression_ratio = compressed as f64 / content_len as f64; buf.set_len(buf_len + 4 + compressed as usize); } } - Ok(()) + Ok(compression_ratio) } pub fn decompress_block(src: &[u8]) -> Result> { @@ -301,8 +303,12 @@ pub mod lz4 { let vecs: Vec> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()]; for mut vec in vecs.into_iter() { let uncompressed_len = vec.len(); - super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL) - .unwrap(); + let compression_ratio = + super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL) + .unwrap(); + if uncompressed_len == 0 { + assert_eq!(compression_ratio, 0.0); + } let res = super::decompress_block(&vec[uncompressed_len..]).unwrap(); assert_eq!(res, vec[..uncompressed_len].to_owned()); } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 482a3986..ae834050 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stress" -version = "0.4.1" +version = "0.4.2" authors = ["The TiKV Authors"] edition = "2018"