Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/cargo/clap-4.5
Browse files Browse the repository at this point in the history
  • Loading branch information
LykxSassinator authored Jun 7, 2024
2 parents f1e5a4e + cf5d1b9 commit ab61cfc
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 50 deletions.
17 changes: 10 additions & 7 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-07-01
toolchain: nightly-2023-12-31
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
Expand All @@ -34,6 +34,8 @@ jobs:
git diff --exit-code
- name: Clippy
run: make clippy
env:
EXTRA_CARGO_ARGS: '--fix'
- name: Run tests
run: make test
env:
Expand All @@ -60,7 +62,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.67.1
toolchain: 1.75.0
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
Expand All @@ -87,7 +89,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-07-01
toolchain: nightly-2023-12-31
override: true
components: llvm-tools-preview
- uses: Swatinem/rust-cache@v1
Expand All @@ -96,15 +98,16 @@ jobs:
- name: Install grcov
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi
- name: Run tests
run: |
make test_matrix
run: make test_matrix
env:
RUSTFLAGS: '-Zinstrument-coverage'
RUSTFLAGS: '-Cinstrument-coverage'
LLVM_PROFILE_FILE: '%p-%m.profraw'
EXTRA_CARGO_ARGS: '--verbose'
- name: Run grcov
run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov
- name: Upload
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
file: coverage.lcov
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

## [Unreleased]

### New Features

* Add a new Prometheus metric `raft_engine_write_compression_ratio` to track compression ratio of write #358

## [0.4.2] - 2024-04-16

### Behavior Changes

* Periodically flush unsynced bytes when rewriting to avoid I/O jitters if flushing too many bytes impede the foreground writes. (#347)
* Errors will be returned if rewriting fails, instread of `panic` directly. (#343)

## [0.4.1] - 2023-09-14

### Behavior Changes
Expand Down
36 changes: 17 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "raft-engine"
version = "0.4.1"
version = "0.4.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.66.0"
rust-version = "1.75.0"
description = "A persistent storage engine for Multi-Raft logs"
readme = "README.md"
repository = "https://github.com/tikv/raft-engine"
Expand Down Expand Up @@ -42,7 +42,10 @@ hex = "0.4"
if_chain = "1.0"
lazy_static = "1.3"
libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
log = { version = "0.4", features = [
"max_level_trace",
"release_max_level_debug",
] }
lz4-sys = "1.9"
memmap2 = { version = "0.9", optional = true }
nix = "0.26"
Expand All @@ -57,15 +60,19 @@ rhai = { version = "1.7", features = ["sync"], optional = true }
scopeguard = "1.1"
serde = { version = "1.0", features = ["derive"] }
serde_repr = "0.1"
strum = { version = "0.25.0", features = ["derive"] }
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0"

[dev-dependencies]
criterion = "0.4"
ctor = "0.2"
env_logger = "0.10"
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [
"protobuf-codec",
] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [
"protobuf-codec",
] }
rand = "0.8"
rand_distr = "0.4"
tempfile = "3.6"
Expand All @@ -74,19 +81,10 @@ toml = "0.8"
[features]
default = ["internals", "scripting"]
internals = []
nightly = [
"prometheus/nightly",
]
failpoints = [
"fail/failpoints",
]
scripting = [
"rhai",
]
swap = [
"nightly",
"memmap2",
]
nightly = ["prometheus/nightly"]
failpoints = ["fail/failpoints"]
scripting = ["rhai"]
swap = ["nightly", "memmap2"]
std_fs = []

nightly_group = ["nightly", "swap"]
Expand Down
9 changes: 6 additions & 3 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
[package]
name = "raft-engine-ctl"
version = "0.4.1"
version = "0.4.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.61.0"
rust-version = "1.75.0"
description = "A control tool for Raft Engine"
repository = "https://github.com/tikv/raft-engine"
license = "Apache-2.0"

[dependencies]
clap = { version = "4.5", features = ["derive", "cargo"] }
env_logger = "0.10"
raft-engine = { path = "..", version = "0.4.1", features = ["scripting", "internals"] }
raft-engine = { path = "..", version = "0.4.1", features = [
"scripting",
"internals",
] }
3 changes: 2 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where
return Ok(0);
}
let start = Instant::now();
let len = log_batch.finish_populate(
let (len, compression_ratio) = log_batch.finish_populate(
self.cfg.batch_compression_threshold.0 as usize,
self.cfg.compression_level,
)?;
Expand Down Expand Up @@ -225,6 +225,7 @@ where
now = end;
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM.observe(compression_ratio);
Ok(len)
}

Expand Down
4 changes: 2 additions & 2 deletions src/env/log_fd/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl LogFd {
while readed < buf.len() {
let bytes = match pread(self.0, &mut buf[readed..], offset as i64) {
Ok(bytes) => bytes,
Err(e) if e == Errno::EINTR => continue,
Err(Errno::EINTR) => continue,
Err(e) => return Err(from_nix_error(e, "pread")),
};
// EOF
Expand All @@ -106,7 +106,7 @@ impl LogFd {
while written < content.len() {
let bytes = match pwrite(self.0, &content[written..], offset as i64) {
Ok(bytes) => bytes,
Err(e) if e == Errno::EINTR => continue,
Err(Errno::EINTR) => continue,
Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")),
Err(e) => return Err(from_nix_error(e, "pwrite")),
};
Expand Down
2 changes: 1 addition & 1 deletion src/file_pipe_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ pub mod debug {
let log_file_format = LogFileContext::new(file_id, Version::default());
for batch in bs.iter_mut() {
let offset = writer.offset() as u64;
let len = batch
let (len, _) = batch
.finish_populate(1 /* compression_threshold */, None)
.unwrap();
batch.prepare_write(&log_file_format).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ mod tests {
// Retire files.
assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first);
// Try to read recycled file.
for (_, handle) in handles.into_iter().enumerate() {
for handle in handles.into_iter() {
assert!(pipe_log.read_bytes(handle).is_err());
}
// Try to reuse.
Expand Down
28 changes: 17 additions & 11 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,28 +767,32 @@ impl LogBatch {
&mut self,
compression_threshold: usize,
compression_level: Option<usize>,
) -> Result<usize> {
) -> Result<(usize, f64)> {
let _t = StopWatch::new(perf_context!(log_populating_duration));
debug_assert!(self.buf_state == BufState::Open);
if self.is_empty() {
self.buf_state = BufState::Encoded(self.buf.len(), 0);
return Ok(0);
return Ok((0, 0.0));
}
self.buf_state = BufState::Incomplete;

// entries
let (header_offset, compression_type) = if compression_threshold > 0
let (header_offset, compression_type, compression_ratio) = if compression_threshold > 0
&& self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
{
let buf_len = self.buf.len();
lz4::append_compress_block(
let compression_ratio = lz4::append_compress_block(
&mut self.buf,
LOG_BATCH_HEADER_LEN,
compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
)?;
(buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4)
(
buf_len - LOG_BATCH_HEADER_LEN,
CompressionType::Lz4,
compression_ratio,
)
} else {
(0, CompressionType::None)
(0, CompressionType::None, 0.0)
};

// checksum
Expand Down Expand Up @@ -830,7 +834,7 @@ impl LogBatch {
}

self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
Ok(self.buf.len() - header_offset)
Ok((self.buf.len() - header_offset, compression_ratio))
}

/// Make preparations for the write of `LogBatch`.
Expand Down Expand Up @@ -1328,7 +1332,7 @@ mod tests {
offset: 0,
};
let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(usize::from(compress), None).unwrap();
let (len, _) = batch.finish_populate(usize::from(compress), None).unwrap();
assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len);
let mut batch_handle = mocked_file_block_handle;
Expand Down Expand Up @@ -1493,7 +1497,7 @@ mod tests {
batch1.merge(&mut batch2).unwrap();
assert!(batch2.is_empty());

let len = batch1.finish_populate(0, None).unwrap();
let (len, _) = batch1.finish_populate(0, None).unwrap();
batch1.prepare_write(&file_context).unwrap();
let encoded = batch1.encoded_bytes();
assert_eq!(len, encoded.len());
Expand Down Expand Up @@ -1549,7 +1553,8 @@ mod tests {
offset: 0,
};
let buf_len = batch.buf.len();
let len = batch.finish_populate(1, None).unwrap();
let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
assert!(compression_ratio == 0.0);
assert!(len == 0);
assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
Expand Down Expand Up @@ -1671,7 +1676,8 @@ mod tests {
},
];
let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(1, None).unwrap();
let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
assert!(compression_ratio > 0.0);
assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len);
let checksum = batch.item_batch.checksum;
Expand Down
6 changes: 6 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ lazy_static! {
exponential_buckets(256.0, 1.8, 22).unwrap()
)
.unwrap();
pub static ref ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM: Histogram = register_histogram!(
"raft_engine_write_compression_ratio",
"Bucketed histogram of Raft Engine write compression ratio",
exponential_buckets(0.0005, 1.8, 16).unwrap()
)
.unwrap();
pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!(
"raft_engine_allocate_log_duration_seconds",
"Bucketed histogram of Raft Engine allocate log duration",
Expand Down
1 change: 1 addition & 0 deletions src/swappy_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl Page {
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path)
.map_err(|e| error!("Failed to open swap file: {e}"))
.ok()?;
Expand Down
14 changes: 10 additions & 4 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,10 @@ pub mod lz4 {
pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;

/// Compress content in `buf[skip..]`, and append output to `buf`.
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<()> {
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<f64> {
let buf_len = buf.len();
let content_len = buf_len - skip;
let mut compression_ratio = 0.0;
if content_len > 0 {
if content_len > i32::MAX as usize {
return Err(Error::InvalidArgument(format!(
Expand Down Expand Up @@ -256,10 +257,11 @@ pub mod lz4 {
if compressed == 0 {
return Err(Error::Other(box_err!("Compression failed")));
}
compression_ratio = compressed as f64 / content_len as f64;
buf.set_len(buf_len + 4 + compressed as usize);
}
}
Ok(())
Ok(compression_ratio)
}

pub fn decompress_block(src: &[u8]) -> Result<Vec<u8>> {
Expand Down Expand Up @@ -301,8 +303,12 @@ pub mod lz4 {
let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()];
for mut vec in vecs.into_iter() {
let uncompressed_len = vec.len();
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
.unwrap();
let compression_ratio =
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
.unwrap();
if uncompressed_len == 0 {
assert_eq!(compression_ratio, 0.0);
}
let res = super::decompress_block(&vec[uncompressed_len..]).unwrap();
assert_eq!(res, vec[..uncompressed_len].to_owned());
}
Expand Down
2 changes: 1 addition & 1 deletion stress/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "stress"
version = "0.4.1"
version = "0.4.2"
authors = ["The TiKV Authors"]
edition = "2018"

Expand Down

0 comments on commit ab61cfc

Please sign in to comment.