diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index fa3c8c5f..60e33efe 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,7 +19,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2022-05-01 + toolchain: nightly-2022-07-13 override: true components: rustfmt, clippy, rust-src - uses: Swatinem/rust-cache@v1 @@ -29,24 +29,24 @@ jobs: if: ${{ matrix.os == 'ubuntu-latest' }} run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install grcov; fi - name: Format - run: cargo fmt --all -- --check + run: | + make format + git diff --exit-code - name: Clippy - run: cargo clippy --all --all-features --all-targets -- -D clippy::all + run: make clippy - name: Run tests - run: | - cargo test --all --features all_except_failpoints --verbose -- --nocapture - cargo test --test failpoints --all-features --verbose -- --test-threads 1 --nocapture + run: make test env: RUST_BACKTRACE: 1 + EXTRA_CARGO_ARGS: '--verbose' - name: Run asan tests if: ${{ matrix.os == 'ubuntu-latest' }} - run: | - cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --all --features all_except_failpoints --verbose -- --nocapture - cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --test failpoints --all-features --verbose -- --test-threads 1 --nocapture + run: make test env: RUST_BACKTRACE: 1 RUSTFLAGS: '-Zsanitizer=address' RUSTDOCFLAGS: '-Zsanitizer=address' + EXTRA_CARGO_ARGS: '--verbose -Zbuild-std --target x86_64-unknown-linux-gnu' stable: runs-on: ${{ matrix.os }} strategy: @@ -66,16 +66,16 @@ jobs: - uses: Swatinem/rust-cache@v1 with: sharedKey: ${{ matrix.os }}-stable - - name: Format - run: cargo fmt --all -- --check - name: Clippy - run: cargo clippy --all --features all_stable --all-targets -- -D clippy::all + run: make clippy + env: + WITH_STABLE_TOOLCHAIN: 'true' - name: Run tests - run: | - cargo test --all --features all_stable_except_failpoints --verbose -- --nocapture - cargo test --test failpoints --features all_stable --verbose -- --test-threads 1 --nocapture + run: make test env: RUST_BACKTRACE: 1 + EXTRA_CARGO_ARGS: '--verbose' + WITH_STABLE_TOOLCHAIN: 'true' coverage: runs-on: ubuntu-latest needs: nightly @@ -87,7 +87,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2022-05-01 + toolchain: nightly-2022-07-13 override: true components: llvm-tools-preview - uses: Swatinem/rust-cache@v1 @@ -97,13 +97,12 @@ jobs: run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi - name: Run tests run: | - cargo test --all --features all_except_failpoints - cargo test --test failpoints --all-features -- --test-threads 1 - cargo test --all --features all_stable_except_failpoints - cargo test --test failpoints --features all_stable -- --test-threads 1 + make test + env WITH_STABLE_TOOLCHAIN=true make test env: RUSTFLAGS: '-Zinstrument-coverage' LLVM_PROFILE_FILE: '%p-%m.profraw' + EXTRA_CARGO_ARGS: '--verbose' - name: Run grcov run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov - name: Upload diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..aaeddcf0 --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ +# Makefile + +## Additionaly arguments passed to cargo. +EXTRA_CARGO_ARGS ?= +## Whether to disable nightly-only feature. [true/false] +WITH_STABLE_TOOLCHAIN ?= + +.PHONY: format clippy test + +all: format clippy test + +## Format code in-place using rustfmt. +format: + cargo fmt --all + +## Run clippy. +ifeq ($(WITH_STABLE_TOOLCHAIN), true) +clippy: + cargo clippy --all --features all_stable --all-targets -- -D clippy::all +else +clippy: + cargo clippy --all --all-features --all-targets -- -D clippy::all +endif + +## Run tests. +ifeq ($(WITH_STABLE_TOOLCHAIN), true) +test: + cargo test --all --features all_stable_except_failpoints ${EXTRA_CARGO_ARGS} -- --nocapture + cargo test --test failpoints --features all_stable ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture +else +test: + cargo test --all --features all_except_failpoints ${EXTRA_CARGO_ARGS} -- --nocapture + cargo test --test failpoints --all-features ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture +endif diff --git a/README.md b/README.md index 9698348b..33c1e8ac 100644 --- a/README.md +++ b/README.md @@ -75,10 +75,12 @@ Contributions are always welcome! Here are a few tips for making a PR: - Tests are automatically run against the changes, some of them can be run locally: ``` -cargo fmt --all -- --check -cargo +nightly clippy --all --all-features --all-targets -- -D clippy::all -cargo +nightly test --all --features all_except_failpoints -cargo +nightly test --test failpoints --all-features -- --test-threads 1 +# rustup default nightly +make +# rustup default stable +env WITH_STABLE_TOOLCHAIN=true make +# filter a specific test case +env EXTRA_CARGO_ARGS= make test ``` - For changes that might induce performance effects, please quote the targeted benchmark results in the PR description. In addition to micro-benchmarks, there is a standalone [stress test tool](https://github.com/tikv/raft-engine/tree/master/stress) which you can use to demonstrate the system performance. diff --git a/src/config.rs b/src/config.rs index ce82f3af..94b791e4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,7 +9,7 @@ use crate::{util::ReadableSize, Result}; const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512; const MIN_RECOVERY_THREADS: usize = 1; -#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum RecoveryMode { AbsoluteConsistency, @@ -84,12 +84,11 @@ pub struct Config { /// Default: None pub memory_limit: Option, - /// Whether to recycle stale logs. - /// If `true`, `purge` operations on logs will firstly put stale - /// files into a list for recycle. It's only available if - /// `format_version` >= `2`. + /// Whether to recycle stale log files. + /// If `true`, logically purged log files will be reserved for recycling. + /// Only available for `format_version` 2 and above. /// - /// Default: false, + /// Default: false pub enable_log_recycle: bool, } @@ -152,19 +151,19 @@ impl Config { if self.enable_log_recycle { if !self.format_version.has_log_signing() { return Err(box_err!( - "format_version: {:?} is invalid when 'enable_log_recycle' on, setting it to V2", + "format version {} doesn't support log recycle, use 2 or above", self.format_version )); } if self.purge_threshold.0 / self.target_file_size.0 >= std::u32::MAX as u64 { return Err(box_err!( - "File count exceed UINT32_MAX, calculated by 'purge-threshold / target-file-size'" + "File count exceed u32::MAX, calculated by `purge-threshold / target-file-size`" )); } } #[cfg(not(feature = "swap"))] if self.memory_limit.is_some() { - warn!("memory-limit will be ignored because swap feature is not enabled"); + warn!("memory-limit will be ignored because swap feature is disabled"); } Ok(()) } diff --git a/src/engine.rs b/src/engine.rs index 9a19845b..0ff84b77 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -151,7 +151,7 @@ where for writer in group.iter_mut() { writer.entered_time = Some(now); sync |= writer.sync; - let log_batch = writer.get_mut_payload(); + let log_batch = writer.mut_payload(); let res = if !log_batch.is_empty() { log_batch.prepare_write(&file_context)?; self.pipe_log @@ -228,27 +228,31 @@ where Ok(None) } + /// Iterates over [start_key, end_key) range of Raft Group key-values and + /// yields messages of the required type. Unparsable items are skipped. pub fn scan_messages( &self, region_id: u64, start_key: Option<&[u8]>, end_key: Option<&[u8]>, reverse: bool, - callback: C, + mut callback: C, ) -> Result<()> where S: Message, C: FnMut(&[u8], S) -> bool, { - let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM); - if let Some(memtable) = self.memtables.get(region_id) { - memtable - .read() - .scan_messages(start_key, end_key, reverse, callback)?; - } - Ok(()) + self.scan_raw_messages(region_id, start_key, end_key, reverse, move |k, raw_v| { + if let Ok(v) = parse_from_bytes(raw_v) { + callback(k, v) + } else { + true + } + }) } + /// Iterates over [start_key, end_key) range of Raft Group key-values and + /// yields all key value pairs as bytes. pub fn scan_raw_messages( &self, region_id: u64, @@ -264,7 +268,7 @@ where if let Some(memtable) = self.memtables.get(region_id) { memtable .read() - .scan_raw_messages(start_key, end_key, reverse, callback)?; + .scan(start_key, end_key, reverse, callback)?; } Ok(()) } @@ -361,8 +365,8 @@ where self.memtables.is_empty() } - /// Returns the range of sequence number of `active` files of the - /// specific `LogQueue`. + /// Returns the sequence number range of active log files in the specific + /// log queue. /// For testing only. pub fn file_span(&self, queue: LogQueue) -> (u64, u64) { self.pipe_log.file_span(queue) @@ -443,8 +447,7 @@ where script: String, file_system: Arc, ) -> Result<()> { - use crate::file_pipe_log::{LogFileFormat, RecoveryConfig, ReplayMachine}; - use crate::pipe_log::DataLayout; + use crate::file_pipe_log::{RecoveryConfig, ReplayMachine}; if !path.exists() { return Err(Error::InvalidArgument(format!( @@ -459,7 +462,6 @@ where ..Default::default() }; let recovery_mode = cfg.recovery_mode; - let file_format = LogFileFormat::new(cfg.format_version, DataLayout::NoAlignment); let read_block_size = cfg.recovery_read_block_size.0; let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new()); builder.scan()?; @@ -471,7 +473,6 @@ where RecoveryConfig { queue: LogQueue::Append, mode: recovery_mode, - file_format, concurrency: 1, read_block_size, }, @@ -484,7 +485,6 @@ where RecoveryConfig { queue: LogQueue::Rewrite, mode: recovery_mode, - file_format, concurrency: 1, read_block_size, }, @@ -834,6 +834,99 @@ mod tests { run_steps(&[Some((1, 5)), None]); } + #[test] + fn test_key_value_scan() { + fn key(i: u64) -> Vec { + format!("k{}", i).as_bytes().to_vec() + } + fn value(i: u64) -> Vec { + format!("v{}", i).as_bytes().to_vec() + } + fn rich_value(i: u64) -> RaftLocalState { + RaftLocalState { + last_index: i, + ..Default::default() + } + } + + let dir = tempfile::Builder::new() + .prefix("test_key_value_scan") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(1), + ..Default::default() + }; + let rid = 1; + let engine = + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); + + engine + .scan_messages::(rid, None, None, false, |_, _| { + panic!("unexpected message."); + }) + .unwrap(); + + let mut batch = LogBatch::default(); + let mut res = Vec::new(); + let mut rich_res = Vec::new(); + batch.put(rid, key(1), value(1)); + batch.put(rid, key(2), value(2)); + batch.put(rid, key(3), value(3)); + engine.write(&mut batch, false).unwrap(); + + engine + .scan_raw_messages(rid, None, None, false, |k, v| { + res.push((k.to_vec(), v.to_vec())); + true + }) + .unwrap(); + assert_eq!( + res, + vec![(key(1), value(1)), (key(2), value(2)), (key(3), value(3))] + ); + res.clear(); + engine + .scan_raw_messages(rid, None, None, true, |k, v| { + res.push((k.to_vec(), v.to_vec())); + true + }) + .unwrap(); + assert_eq!( + res, + vec![(key(3), value(3)), (key(2), value(2)), (key(1), value(1))] + ); + res.clear(); + engine + .scan_messages::(rid, None, None, false, |_, _| { + panic!("unexpected message.") + }) + .unwrap(); + + batch.put_message(rid, key(22), &rich_value(22)).unwrap(); + batch.put_message(rid, key(33), &rich_value(33)).unwrap(); + engine.write(&mut batch, false).unwrap(); + + engine + .scan_messages(rid, None, None, false, |k, v| { + rich_res.push((k.to_vec(), v)); + false + }) + .unwrap(); + assert_eq!(rich_res, vec![(key(22), rich_value(22))]); + rich_res.clear(); + engine + .scan_messages(rid, None, None, true, |k, v| { + rich_res.push((k.to_vec(), v)); + false + }) + .unwrap(); + assert_eq!(rich_res, vec![(key(33), rich_value(33))]); + rich_res.clear(); + } + #[test] fn test_delete_key_value() { let dir = tempfile::Builder::new() @@ -855,16 +948,28 @@ mod tests { let mut delete_batch = LogBatch::default(); delete_batch.delete(rid, key.clone()); - // put | delete - // ^ rewrite let engine = RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) .unwrap(); + assert_eq!( + engine.get_message::(rid, &key).unwrap(), + None + ); + assert_eq!(engine.get(rid, &key), None); + + // put | delete + // ^ rewrite engine.write(&mut batch_1.clone(), true).unwrap(); + assert!(engine.get_message::(rid, &key).is_err()); engine.purge_manager.must_rewrite_append_queue(None, None); engine.write(&mut delete_batch.clone(), true).unwrap(); let engine = engine.reopen(); assert_eq!(engine.get(rid, &key), None); + assert_eq!( + engine.get_message::(rid, &key).unwrap(), + None + ); + // Incomplete purge. engine.write(&mut batch_1.clone(), true).unwrap(); engine @@ -921,14 +1026,6 @@ mod tests { engine.write(&mut batch_2.clone(), true).unwrap(); let engine = engine.reopen(); assert_eq!(engine.get(rid, &key).unwrap(), v2); - let mut res = vec![]; - engine - .scan_raw_messages(rid, Some(&key), None, false, |key, value| { - res.push((key.to_vec(), value.to_vec())); - true - }) - .unwrap(); - assert_eq!(res, vec![(key.clone(), v2.clone())]); // put | delete | put | // ^ rewrite @@ -1901,76 +1998,6 @@ mod tests { } } - #[test] - fn test_filesystem_move_file() { - use std::io::{Read, Write}; - - let dir = tempfile::Builder::new() - .prefix("test_filesystem_move_file") - .tempdir() - .unwrap(); - let path = dir.path().to_str().unwrap(); - let entry_data = vec![b'x'; 128]; - let fs = Arc::new(DeleteMonitoredFileSystem::new()); - // Move file from src to dst by `rename` - { - let src_file_id = FileId { - seq: 12, - queue: LogQueue::Append, - }; - let dst_file_id = FileId { - seq: src_file_id.seq + 1, - ..src_file_id - }; - let src_path = src_file_id.build_file_path(path); // src filepath - let dst_path = dst_file_id.build_file_path(path); // dst filepath - { - // Create file and write data with DeleteMonitoredFileSystem - let fd = Arc::new(fs.create(&src_file_id.build_file_path(path)).unwrap()); - let mut writer = fs.new_writer(fd).unwrap(); - writer.write_all(&entry_data[..]).unwrap(); - } - fs.rename(&src_path, &dst_path).unwrap(); - { - // Reopen the file and check data - let mut buf = vec![0; 1024]; - let fd = Arc::new(fs.open(&dst_file_id.build_file_path(path)).unwrap()); - let mut new_reader = fs.new_reader(fd).unwrap(); - let actual_len = new_reader.read(&mut buf[..]).unwrap(); - assert_eq!(actual_len, 1); - assert!(buf[0] == entry_data[0]); - } - } - // Move file from src to dst by `reuse` - { - let src_file_id = FileId { - seq: 14, - queue: LogQueue::Append, - }; - let dst_file_id = FileId { - seq: src_file_id.seq + 1, - ..src_file_id - }; - let src_path = src_file_id.build_file_path(path); // src filepath - let dst_path = dst_file_id.build_file_path(path); // dst filepath - { - // Create file and write data with DeleteMonitoredFileSystem - let fd = Arc::new(fs.create(&src_file_id.build_file_path(path)).unwrap()); - let mut writer = fs.new_writer(fd).unwrap(); - writer.write_all(&entry_data[..]).unwrap(); - } - fs.reuse(&src_path, &dst_path).unwrap(); - { - // Reopen the file and check whether the file is empty - let mut buf = vec![0; 1024]; - let fd = Arc::new(fs.open(&dst_file_id.build_file_path(path)).unwrap()); - let mut new_reader = fs.new_reader(fd).unwrap(); - let actual_len = new_reader.read(&mut buf[..]).unwrap(); - assert_eq!(actual_len, 0); - } - } - } - #[test] fn test_managed_file_deletion() { let dir = tempfile::Builder::new() @@ -2154,6 +2181,7 @@ mod tests { enable_log_recycle: true, ..Default::default() }; + assert_eq!(cfg_v2.recycle_capacity(), 15); // Prepare files with format_version V1 { let engine = RaftLogEngine::open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap(); @@ -2169,9 +2197,7 @@ mod tests { engine.append(rid, 11, 20, Some(&entry_data)); } // Mark region_id -> 6 obsolete. - for rid in 6..=6 { - engine.clean(rid); - } + engine.clean(6); // the [1, 12] files are recycled engine.purge_expired_files().unwrap(); assert_eq!(engine.file_count(Some(LogQueue::Append)), 5); diff --git a/src/env/mod.rs b/src/env/mod.rs index a6c23e7d..50cce189 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -24,10 +24,8 @@ pub trait FileSystem: Send + Sync { fn rename>(&self, src_path: P, dst_path: P) -> Result<()>; - /// Reuse the old `src_path` with new `dst_path` filepath. - /// - /// It's an open interface for user-defined implementation. - /// Default implemented by `self.rename(...)`. + /// Reuses file at `src_path` as a new file at `dst_path`. The default + /// implementation simply renames the file. fn reuse>(&self, src_path: P, dst_path: P) -> Result<()> { self.rename(src_path, dst_path) } diff --git a/src/file_pipe_log/format.rs b/src/file_pipe_log/format.rs index 902943f8..b1c7e4c3 100644 --- a/src/file_pipe_log/format.rs +++ b/src/file_pipe_log/format.rs @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf}; use num_traits::{FromPrimitive, ToPrimitive}; use crate::codec::{self, NumberEncoder}; -use crate::pipe_log::{DataLayout, FileId, LogQueue, Version}; +use crate::pipe_log::{FileId, LogQueue, Version}; use crate::{Error, Result}; /// Width to format log sequence number. @@ -20,7 +20,7 @@ const LOG_REWRITE_SUFFIX: &str = ".rewrite"; /// File header. const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8E84916EB0ED5"; -/// Check whether the given `buf` is a valid padding or not. +/// Checks whether the given `buf` is padded with zeros. /// /// To simplify the checking strategy, we just check the first /// and last byte in the `buf`. @@ -30,7 +30,7 @@ const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8 /// in the disk, might pass through this rule, but will failed in /// followed processing. So, we can just keep it simplistic. #[inline] -pub(crate) fn is_valid_paddings(buf: &[u8]) -> bool { +pub(crate) fn is_zero_padded(buf: &[u8]) -> bool { buf.is_empty() || (buf[0] == 0 && buf[buf.len() - 1] == 0) } @@ -93,119 +93,66 @@ pub(super) fn lock_file_path>(dir: P) -> PathBuf { } /// In-memory representation of `Format` in log files. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] pub struct LogFileFormat { - version: Version, - data_layout: DataLayout, -} - -impl Default for LogFileFormat { - fn default() -> Self { - Self { - version: Version::default(), - data_layout: DataLayout::NoAlignment, - } - } + pub version: Version, + pub alignment: u64, } impl LogFileFormat { - pub fn new(version: Version, data_layout: DataLayout) -> Self { - Self { - version, - data_layout, - } - } - - /// Length of whole `LogFileFormat` written on storage. - pub fn enc_len(&self) -> usize { - Self::header_len() + Self::payload_len(self.version) + pub fn new(version: Version, alignment: u64) -> Self { + Self { version, alignment } } /// Length of header written on storage. - pub const fn header_len() -> usize { + const fn header_len() -> usize { LOG_FILE_MAGIC_HEADER.len() + std::mem::size_of::() } - /// Length of serialized `DataLayout` written on storage. - pub const fn payload_len(version: Version) -> usize { + const fn payload_len(version: Version) -> usize { match version { Version::V1 => 0, - Version::V2 => DataLayout::len(), + Version::V2 => std::mem::size_of::(), } } - pub fn from_version(version: Version) -> Self { - Self { - version, - data_layout: DataLayout::NoAlignment, - } + pub const fn max_encode_len() -> usize { + Self::header_len() + Self::payload_len(Version::V2) } - pub fn version(&self) -> Version { - self.version - } - - pub fn data_layout(&self) -> DataLayout { - self.data_layout + /// Length of whole `LogFileFormat` written on storage. + pub fn encode_len(version: Version) -> usize { + Self::header_len() + Self::payload_len(version) } /// Decodes a slice of bytes into a `LogFileFormat`. pub fn decode(buf: &mut &[u8]) -> Result { - let buf_len = buf.len(); - if buf_len < Self::header_len() { - return Err(Error::Corruption("log file header too short".to_owned())); - } + let mut format = LogFileFormat::default(); if !buf.starts_with(LOG_FILE_MAGIC_HEADER) { return Err(Error::Corruption( "log file magic header mismatch".to_owned(), )); } buf.consume(LOG_FILE_MAGIC_HEADER.len()); - // Parse `Version` of LogFileFormat from header of the file. - let version = { - let dec_version = codec::decode_u64(buf)?; - if let Some(v) = Version::from_u64(dec_version) { - v - } else { - return Err(Error::Corruption(format!( - "unrecognized log file version: {}", - dec_version - ))); - } - }; - // Parse `DataLayout` of LogFileFormat from header of the file. - let payload_len = Self::payload_len(version); - if payload_len == 0 { - // No alignment. - return Ok(Self { - version, - data_layout: DataLayout::NoAlignment, - }); + + let version_u64 = codec::decode_u64(buf)?; + if let Some(version) = Version::from_u64(version_u64) { + format.version = version; + } else { + return Err(Error::Corruption(format!( + "unrecognized log file version: {}", + version_u64 + ))); } - if_chain::if_chain! { - if payload_len > 0; - if buf_len >= Self::header_len() + payload_len; - if let Ok(layout_block_size) = codec::decode_u64(buf); - then { - // If the decoded `payload_len > 0`, serialized data_layout - // should be extracted from the file. - Ok(Self { - version, - data_layout: if layout_block_size == 0 { - DataLayout::NoAlignment - } else { - DataLayout::Alignment(layout_block_size) - }, - }) - } else { - // Here, we mark this special err, that is, corrupted `payload`, - // with InvalidArgument. - Err(Error::InvalidArgument(format!( - "invalid dataload in the header, len: {}, expected len: {}", - buf_len - Self::header_len(), Self::payload_len(version) - ))) - } + + let payload_len = Self::payload_len(format.version); + if buf.len() < payload_len { + return Err(Error::Corruption("missing header payload".to_owned())); + } else if payload_len > 0 { + format.alignment = codec::decode_u64(buf)?; } + + Ok(format) } /// Encodes this header and appends the bytes to the provided buffer. @@ -213,7 +160,9 @@ impl LogFileFormat { buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); buf.encode_u64(self.version.to_u64().unwrap())?; if Self::payload_len(self.version) > 0 { - buf.encode_u64(self.data_layout.to_u64())?; + buf.encode_u64(self.alignment)?; + } else { + assert_eq!(self.alignment, 0); } #[cfg(feature = "failpoints")] { @@ -223,33 +172,28 @@ impl LogFileFormat { false }; // Set abnormal DataLayout. - let force_abnormal_data_layout = || { - fail::fail_point!("log_file_header::force_abnormal_data_layout", |_| true); + let too_large = || { + fail::fail_point!("log_file_header::too_large", |_| true); false }; // Set corrupted DataLayout for `payload`. - let corrupted_data_layout = || { - fail::fail_point!("log_file_header::corrupted_data_layout", |_| true); + let too_small = || { + fail::fail_point!("log_file_header::too_small", |_| true); false }; if corrupted() { buf[0] += 1; } - if force_abnormal_data_layout() { + assert!(!(too_large() && too_small())); + if too_large() { buf.encode_u64(0_u64)?; } - if corrupted_data_layout() { + if too_small() { buf.pop(); } } Ok(()) } - - /// Return the aligned block size. - #[inline] - pub fn get_aligned_block_size(&self) -> usize { - self.data_layout.to_u64() as usize - } } #[cfg(test)] @@ -263,19 +207,19 @@ mod tests { // normal buffer let mut buf = vec![0; 128]; // len < 8 - assert!(is_valid_paddings(&buf[0..6])); + assert!(is_zero_padded(&buf[0..6])); // len == 8 - assert!(is_valid_paddings(&buf[120..])); + assert!(is_zero_padded(&buf[120..])); // len > 8 - assert!(is_valid_paddings(&buf[..])); + assert!(is_zero_padded(&buf)); // abnormal buffer buf[127] = 3_u8; - assert!(is_valid_paddings(&buf[0..110])); - assert!(is_valid_paddings(&buf[120..125])); - assert!(!is_valid_paddings(&buf[124..128])); - assert!(!is_valid_paddings(&buf[120..])); - assert!(!is_valid_paddings(&buf[..])); + assert!(is_zero_padded(&buf[0..110])); + assert!(is_zero_padded(&buf[120..125])); + assert!(!is_zero_padded(&buf[124..128])); + assert!(!is_zero_padded(&buf[120..])); + assert!(!is_zero_padded(&buf)); } #[test] @@ -310,59 +254,26 @@ mod tests { assert_eq!(version, version2); } - #[test] - fn test_data_layout() { - assert_eq!(DataLayout::NoAlignment.to_u64(), 0); - assert_eq!(DataLayout::Alignment(16).to_u64(), 16); - assert_eq!(DataLayout::from_u64(0), DataLayout::NoAlignment); - assert_eq!(DataLayout::from_u64(4096), DataLayout::Alignment(4096)); - assert_eq!(DataLayout::len(), 8); - } - - #[test] - fn test_file_header() { - let header1 = LogFileFormat::default(); - assert_eq!(header1.version().to_u64().unwrap(), 1); - assert_eq!(header1.data_layout().to_u64(), 0); - let header2 = LogFileFormat::from_version(Version::default()); - assert_eq!(header2.version().to_u64(), header1.version().to_u64()); - assert_eq!(header1.data_layout().to_u64(), 0); - let header3 = LogFileFormat::from_version(header1.version()); - assert_eq!(header3.version(), header1.version()); - assert_eq!(header1.data_layout().to_u64(), 0); - assert_eq!(header1.enc_len(), LogFileFormat::header_len()); - assert_eq!(header2.enc_len(), LogFileFormat::header_len()); - assert_eq!(header3.enc_len(), LogFileFormat::header_len()); - let header4 = LogFileFormat { - version: Version::V2, - data_layout: DataLayout::Alignment(16), - }; - assert_eq!( - header4.enc_len(), - LogFileFormat::header_len() + LogFileFormat::payload_len(header4.version) - ); - } - #[test] fn test_encoding_decoding_file_format() { fn enc_dec_file_format(file_format: LogFileFormat) -> Result { let mut buf = Vec::with_capacity( LogFileFormat::header_len() + LogFileFormat::payload_len(file_format.version), ); - assert!(file_format.encode(&mut buf).is_ok()); + file_format.encode(&mut buf).unwrap(); LogFileFormat::decode(&mut &buf[..]) } // header with aligned-sized data_layout { let mut buf = Vec::with_capacity(LogFileFormat::header_len()); let version = Version::V2; - let data_layout = DataLayout::Alignment(4096); + let alignment = 4096; buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); - assert!(buf.encode_u64(version.to_u64().unwrap()).is_ok()); - assert!(buf.encode_u64(data_layout.to_u64()).is_ok()); + buf.encode_u64(version.to_u64().unwrap()).unwrap(); + buf.encode_u64(alignment).unwrap(); assert_eq!( LogFileFormat::decode(&mut &buf[..]).unwrap(), - LogFileFormat::new(version, data_layout) + LogFileFormat::new(version, alignment) ); } // header with abnormal version @@ -370,37 +281,28 @@ mod tests { let mut buf = Vec::with_capacity(LogFileFormat::header_len()); let abnormal_version = 4_u64; /* abnormal version */ buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); - assert!(buf.encode_u64(abnormal_version).is_ok()); - assert!(buf.encode_u64(16).is_ok()); + buf.encode_u64(abnormal_version).unwrap(); + buf.encode_u64(16).unwrap(); assert!(LogFileFormat::decode(&mut &buf[..]).is_err()); } - // header with Version::default and DataLayout::Alignment(_) { - let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(0)); - assert_eq!( - LogFileFormat::new(Version::default(), DataLayout::NoAlignment), - enc_dec_file_format(file_format).unwrap() - ); - let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(4096)); + let file_format = LogFileFormat::new(Version::default(), 0); assert_eq!( - LogFileFormat::new(Version::default(), DataLayout::NoAlignment), + LogFileFormat::new(Version::default(), 0), enc_dec_file_format(file_format).unwrap() ); - } - // header with Version::V2 and DataLayout::Alignment(0) - { - let file_format = LogFileFormat::new(Version::V2, DataLayout::Alignment(0)); - catch_unwind_silent(|| enc_dec_file_format(file_format)).unwrap_err(); + let file_format = LogFileFormat::new(Version::default(), 4096); + assert!(catch_unwind_silent(|| enc_dec_file_format(file_format)).is_err()); } } #[test] fn test_file_context() { let mut file_context = - LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default()); + LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()); assert_eq!(file_context.get_signature(), None); file_context.id.seq = 10; - file_context.format.version = Version::V2; + file_context.version = Version::V2; assert_eq!(file_context.get_signature().unwrap(), 10); let abnormal_seq = (file_context.id.seq << 32) as u64 + 100_u64; file_context.id.seq = abnormal_seq; diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index e7327c85..5a621ff6 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -10,7 +10,7 @@ use log::warn; use crate::env::{FileSystem, Handle, WriteExt}; use crate::metrics::*; -use crate::pipe_log::{FileBlockHandle, LogFileContext, Version}; +use crate::pipe_log::FileBlockHandle; use crate::{Error, Result}; use super::format::LogFileFormat; @@ -18,14 +18,9 @@ use super::format::LogFileFormat; /// Maximum number of bytes to allocate ahead. const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024; -/// Combination of `[Handle]` and `[Version]`, specifying a handler of a file. -#[derive(Debug)] -pub struct FileHandler { - pub handle: Arc, - pub context: LogFileContext, -} - -/// Build a file writer. +/// Builds a file writer. +/// +/// # Arguments /// /// * `handle`: standard handle of a log file. /// * `format`: format infos of the log file. @@ -40,10 +35,8 @@ pub(super) fn build_file_writer( LogFileWriter::open(handle, writer, format, force_reset) } -/// Append-only writer for log file. +/// Append-only writer for log file. It also handles the file header write. pub struct LogFileWriter { - /// header of file - pub header: LogFileFormat, writer: F::Writer, written: usize, capacity: usize, @@ -59,26 +52,26 @@ impl LogFileWriter { ) -> Result { let file_size = handle.file_size()?; let mut f = Self { - header: format, writer, written: file_size, capacity: file_size, last_sync: file_size, }; - if file_size < LogFileFormat::header_len() || force_reset { - f.write_header()?; + // TODO: add tests for file_size in [header_len, max_encode_len]. + if file_size < LogFileFormat::encode_len(format.version) || force_reset { + f.write_header(format)?; } else { f.writer.seek(SeekFrom::Start(file_size as u64))?; } Ok(f) } - fn write_header(&mut self) -> Result<()> { + fn write_header(&mut self, format: LogFileFormat) -> Result<()> { self.writer.seek(SeekFrom::Start(0))?; self.last_sync = 0; self.written = 0; - let mut buf = Vec::with_capacity(LogFileFormat::header_len()); - self.header.encode(&mut buf)?; + let mut buf = Vec::with_capacity(LogFileFormat::encode_len(format.version)); + format.encode(&mut buf)?; self.write(&buf, 0) } @@ -148,25 +141,16 @@ impl LogFileWriter { } /// Build a file reader. -/// -/// Attention please, the reader do not need a specified `[LogFileFormat]` from -/// users. -/// -/// * `handle`: standard handle of a log file. -/// * `format`: if `[None]`, reloads the log file header and parse -/// the relevant `LogFileFormat` before building the `reader`. pub(super) fn build_file_reader( system: &F, handle: Arc, - format: Option, ) -> Result> { let reader = system.new_reader(handle.clone())?; - LogFileReader::open(handle, reader, format) + Ok(LogFileReader::open(handle, reader)) } /// Random-access reader for log file. pub struct LogFileReader { - format: LogFileFormat, handle: Arc, reader: F::Reader, @@ -174,33 +158,28 @@ pub struct LogFileReader { } impl LogFileReader { - fn open( - handle: Arc, - reader: F::Reader, - format: Option, - ) -> Result { - match format { - Some(fmt) => Ok(Self { - format: fmt, - handle, - reader, - // Set to an invalid offset to force a reseek at first read. - offset: u64::MAX, - }), - None => { - let mut reader = Self { - format: LogFileFormat::default(), - handle, - reader, - // Set to an invalid offset to force a reseek at first read. - offset: u64::MAX, - }; - reader.parse_format()?; - Ok(reader) - } + fn open(handle: Arc, reader: F::Reader) -> LogFileReader { + Self { + handle, + reader, + // Set to an invalid offset to force a reseek at first read. + offset: u64::MAX, } } + /// Function for reading the header of the log file, and return a + /// `[LogFileFormat]`. + /// + /// Attention please, this function would move the `reader.offset` + /// to `0`, that is, the beginning of the file, to parse the + /// related `[LogFileFormat]`. + pub fn parse_format(&mut self) -> Result { + let mut container = vec![0; LogFileFormat::max_encode_len()]; + let size = self.read_to(0, &mut container)?; + container.truncate(size); + LogFileFormat::decode(&mut container.as_slice()) + } + pub fn read(&mut self, handle: FileBlockHandle) -> Result> { let mut buf = vec![0; handle.len as usize]; let size = self.read_to(handle.offset, &mut buf)?; @@ -229,126 +208,8 @@ impl LogFileReader { Ok((self.offset - offset) as usize) } - /// Function for reading the header of the log file, and return a - /// `[LogFileFormat]`. - /// - /// Attention please, this function would move the `reader.offset` - /// to `0`, that is, the beginning of the file, to parse the - /// related `[LogFileFormat]`. - pub fn parse_format(&mut self) -> Result { - // Here, the caller expected that the given `handle` has pointed to - // a log file with valid format. Otherwise, it should return with - // `Err`. - let file_size = self.handle.file_size()?; - // [1] If the length lessed than the standard `LogFileFormat::header_len()`. - let header_len = LogFileFormat::header_len(); - if file_size < header_len { - return Err(Error::Corruption("Invalid header of LogFile!".to_owned())); - } - // [2] Parse the format of the file. - let mut container = - vec![0; LogFileFormat::header_len() + LogFileFormat::payload_len(Version::V2)]; - let size = self.read_to(0, &mut container[..])?; - container.truncate(size); - self.format = LogFileFormat::decode(&mut container.as_slice())?; - Ok(self.format) - } - #[inline] pub fn file_size(&self) -> Result { Ok(self.handle.file_size()?) } - - #[inline] - pub fn file_format(&self) -> &LogFileFormat { - &self.format - } -} - -#[cfg(test)] -mod tests { - use super::*; - use num_traits::ToPrimitive; - use strum::IntoEnumIterator; - use tempfile::Builder; - - use crate::env::DefaultFileSystem; - use crate::file_pipe_log::format::{FileNameExt, LogFileFormat}; - use crate::pipe_log::{DataLayout, FileId, LogQueue, Version}; - use crate::util::ReadableSize; - - fn prepare_mocked_log_file( - file_system: &F, - path: &str, - file_id: FileId, - format: LogFileFormat, - file_size: usize, - ) -> Result> { - let data = vec![b'm'; 1024]; - let fd = Arc::new(file_system.create(&file_id.build_file_path(path))?); - let mut writer = build_file_writer(file_system, fd, format, true)?; - let mut offset: usize = 0; - while offset < file_size { - writer.write(&data[..], file_size)?; - offset += data.len(); - } - Ok(writer) - } - - fn read_data_from_mocked_log_file( - file_system: &F, - path: &str, - file_id: FileId, - format: Option, - file_block_handle: FileBlockHandle, - ) -> Result> { - let fd = Arc::new(file_system.open(&file_id.build_file_path(path))?); - let mut reader = build_file_reader(file_system, fd, format)?; - reader.read(file_block_handle) - } - - #[test] - fn test_log_file_write_read() { - let dir = Builder::new() - .prefix("test_log_file_write_read") - .tempdir() - .unwrap(); - let path = dir.path().to_str().unwrap(); - let target_file_size = ReadableSize::mb(4); - let file_system = Arc::new(DefaultFileSystem); - let fs_block_size = 32768; - - for version in Version::iter() { - for data_layout in [DataLayout::NoAlignment, DataLayout::Alignment(64)] { - let file_format = LogFileFormat::new(version, data_layout); - let file_id = FileId { - seq: version.to_u64().unwrap(), - queue: LogQueue::Append, - }; - assert!(prepare_mocked_log_file( - file_system.as_ref(), - path, - file_id, - file_format, - target_file_size.0 as usize, - ) - .is_ok()); - // mocked file_block_handle - let file_block_handle = FileBlockHandle { - id: file_id, - offset: (fs_block_size + 77) as u64, - len: fs_block_size + 77, - }; - let buf = read_data_from_mocked_log_file( - file_system.as_ref(), - path, - file_id, - None, - file_block_handle, - ) - .unwrap(); - assert_eq!(buf.len(), file_block_handle.len); - } - } - } } diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index 2e060c29..b75a7168 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -10,7 +10,7 @@ mod pipe; mod pipe_builder; mod reader; -pub use format::{FileNameExt, LogFileFormat}; +pub use format::FileNameExt; pub use pipe::DualPipes as FilePipeLog; pub use pipe_builder::{ DefaultMachineFactory, DualPipesBuilder as FilePipeLogBuilder, RecoveryConfig, ReplayMachine, @@ -47,7 +47,7 @@ pub mod debug { file_system.open(path)? }; let fd = Arc::new(fd); - super::log_file::build_file_writer(file_system, fd, format, create) + super::log_file::build_file_writer(file_system, fd, format, create /* force_reset */) } /// Opens a log file for read. @@ -56,7 +56,7 @@ pub mod debug { path: &Path, ) -> Result> { let fd = Arc::new(file_system.open(path)?); - super::log_file::build_file_reader(file_system, fd, None) + super::log_file::build_file_reader(file_system, fd) } /// An iterator over the log items in log files. @@ -157,8 +157,9 @@ pub mod debug { fn find_next_readable_file(&mut self) -> Result<()> { while let Some((file_id, path)) = self.files.pop_front() { - self.batch_reader - .open(file_id, build_file_reader(self.system.as_ref(), &path)?)?; + let mut reader = build_file_reader(self.system.as_ref(), &path)?; + let format = reader.parse_format()?; + self.batch_reader.open(file_id, format, reader)?; if let Some(b) = self.batch_reader.next()? { self.items.extend(b.into_items()); break; @@ -174,7 +175,7 @@ pub mod debug { use crate::env::DefaultFileSystem; use crate::log_batch::{Command, LogBatch}; use crate::pipe_log::{FileBlockHandle, LogFileContext, LogQueue, Version}; - use crate::test_util::generate_entries; + use crate::test_util::{generate_entries, PanicGuard}; use raft::eraftpb::Entry; #[test] @@ -216,13 +217,13 @@ pub mod debug { true, /* create */ ) .unwrap(); - let log_file_format = LogFileContext::new(file_id, LogFileFormat::default()); + let log_file_format = LogFileContext::new(file_id, Version::default()); for batch in bs.iter_mut() { let offset = writer.offset() as u64; let len = batch .finish_populate(1 /* compression_threshold */) .unwrap(); - assert!(batch.prepare_write(&log_file_format).is_ok()); + batch.prepare_write(&log_file_format).unwrap(); writer .write(batch.encoded_bytes(), 0 /* target_file_hint */) .unwrap(); @@ -233,7 +234,6 @@ pub mod debug { }); } writer.close().unwrap(); - assert_eq!(writer.header.version(), Version::default()); // Read and verify. let mut reader = LogItemReader::new_file_reader(file_system.clone(), &file_path).unwrap(); @@ -291,11 +291,66 @@ pub mod debug { assert!( LogItemReader::new_directory_reader(file_system.clone(), &empty_file_path).is_err() ); - assert!(LogItemReader::new_file_reader(file_system.clone(), &empty_file_path).is_ok()); + LogItemReader::new_file_reader(file_system.clone(), &empty_file_path).unwrap(); let mut reader = LogItemReader::new_directory_reader(file_system, dir.path()).unwrap(); assert!(reader.next().unwrap().is_err()); assert!(reader.next().is_none()); } + + #[test] + fn test_recover_from_partial_write() { + let dir = tempfile::Builder::new() + .prefix("test_debug_file_overwrite") + .tempdir() + .unwrap(); + let file_system = Arc::new(DefaultFileSystem); + + let path = FileId::dummy(LogQueue::Append).build_file_path(dir.path()); + + let formats = [ + LogFileFormat::new(Version::V1, 0), + LogFileFormat::new(Version::V2, 1), + ]; + for from in formats { + for to in formats { + for shorter in [true, false] { + if LogFileFormat::encode_len(to.version) + < LogFileFormat::encode_len(from.version) + { + continue; + } + let _guard = PanicGuard::with_prompt(format!( + "case: [{:?}, {:?}, {:?}]", + from, to, shorter + )); + let mut writer = build_file_writer( + file_system.as_ref(), + &path, + from, + true, /* create */ + ) + .unwrap(); + let f = std::fs::OpenOptions::new().write(true).open(&path).unwrap(); + let len = writer.offset(); + writer.close().unwrap(); + if shorter { + f.set_len(len as u64 - 1).unwrap(); + } + let mut writer = build_file_writer( + file_system.as_ref(), + &path, + to, + false, /* create */ + ) + .unwrap(); + writer.close().unwrap(); + let mut reader = build_file_reader(file_system.as_ref(), &path).unwrap(); + assert_eq!(reader.parse_format().unwrap(), to); + std::fs::remove_file(&path).unwrap(); + } + } + } + } } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 9f645762..f5f6e4b1 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -14,69 +14,62 @@ use crate::config::Config; use crate::env::FileSystem; use crate::event_listener::EventListener; use crate::metrics::*; -use crate::pipe_log::{ - DataLayout, FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, -}; +use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog}; use crate::{perf_context, Error, Result}; use super::format::{FileNameExt, LogFileFormat}; -use super::log_file::{build_file_reader, build_file_writer, FileHandler, LogFileWriter}; +use super::log_file::{build_file_reader, build_file_writer, LogFileWriter}; + +#[derive(Debug)] +pub struct FileWithFormat { + pub handle: Arc, + pub format: LogFileFormat, +} struct FileCollection { /// Sequence number of the first file. first_seq: FileSeq, /// Sequence number of the first file that is in use. first_seq_in_use: FileSeq, - fds: VecDeque>, + fds: VecDeque>, /// `0` => no capbility for recycling stale files /// `_` => finite volume for recycling stale files capacity: usize, } -#[cfg(test)] -impl Default for FileCollection { - fn default() -> Self { - Self { - first_seq: 0, - first_seq_in_use: 0, - fds: VecDeque::new(), - capacity: 0, - } - } -} - impl FileCollection { - /// Recycle the first obsolete(stale) file and renewed with new FileId. + /// Recycles the first obsolete file and renewed with new FileId. /// /// Attention please, the recycled file would be automatically `renamed` in /// this func. - pub fn recycle_one_file(&mut self, file_system: &F, dir_path: &str, dst_fd: FileId) -> bool { - if self.capacity == 0 || self.first_seq >= self.first_seq_in_use { - return false; - } + fn recycle_one_file(&mut self, file_system: &F, dir_path: &str, dst_fd: FileId) -> bool { + debug_assert!(self.first_seq <= self.first_seq_in_use); debug_assert!(!self.fds.is_empty()); - let first_file_id = FileId { - queue: dst_fd.queue, - seq: self.first_seq, - }; - let src_path = first_file_id.build_file_path(dir_path); // src filepath - let dst_path = dst_fd.build_file_path(dir_path); // dst filepath - if let Err(e) = file_system.reuse(&src_path, &dst_path) { - error!("error while trying to recycle one expired file: {}", e); - false - } else { - // Only if `rename` made sense, could we update the first_seq and return - // success. - self.fds.pop_front().unwrap(); - self.first_seq += 1; - true + if self.first_seq < self.first_seq_in_use { + let first_file_id = FileId { + queue: dst_fd.queue, + seq: self.first_seq, + }; + let src_path = first_file_id.build_file_path(dir_path); // src filepath + let dst_path = dst_fd.build_file_path(dir_path); // dst filepath + if let Err(e) = file_system.reuse(&src_path, &dst_path) { + error!("error while trying to recycle one expired file: {}", e); + } else { + // Only if `rename` made sense, could we update the first_seq and return + // success. + self.fds.pop_front().unwrap(); + self.first_seq += 1; + return true; + } } + false } } struct ActiveFile { seq: FileSeq, writer: LogFileWriter, + format: LogFileFormat, } /// A file-based log storage that arranges files as one single queue. @@ -93,8 +86,8 @@ pub(super) struct SinglePipe { files: CachePadded>>, /// The log file opened for write. /// - /// `active_file` must be locked first to acquire - /// both `files` and `active_file`. + /// `active_file` must be locked first to acquire both `files` and + /// `active_file` active_file: CachePadded>>, } @@ -132,24 +125,24 @@ impl SinglePipe { listeners: Vec>, queue: LogQueue, mut first_seq: FileSeq, - mut fds: VecDeque>, + mut fds: VecDeque>, capacity: usize, ) -> Result { - let data_layout = { + #[allow(unused_mut)] + let mut alignment = 0; + #[cfg(feature = "failpoints")] + { let force_set_aligned_layout = || { - // TODO: needs to get the block_size from file_system. - fail_point!("file_pipe_log::open::force_set_aligned_layout", |_| { 16 }); - 0 + fail_point!("file_pipe_log::open::force_set_aligned_layout", |_| { + true + }); + false }; - // TODO: also need to check whether DIO is open or not. If DIO - // == `on`, we could set the data_layout with `Alignment(_)`. - let fs_block_size = force_set_aligned_layout(); - if fs_block_size > 0 && LogFileFormat::payload_len(cfg.format_version) > 0 { - DataLayout::Alignment(fs_block_size as u64) - } else { - DataLayout::NoAlignment + if force_set_aligned_layout() { + alignment = 16; } - }; + } + let create_file = first_seq == 0; let active_seq = if create_file { first_seq = 1; @@ -158,12 +151,9 @@ impl SinglePipe { seq: first_seq, }; let fd = Arc::new(file_system.create(&file_id.build_file_path(&cfg.dir))?); - fds.push_back(FileHandler { + fds.push_back(FileWithFormat { handle: fd, - context: LogFileContext::new( - file_id, - LogFileFormat::new(cfg.format_version, data_layout), - ), + format: LogFileFormat::new(cfg.format_version, alignment), }); first_seq } else { @@ -183,16 +173,17 @@ impl SinglePipe { writer: build_file_writer( file_system.as_ref(), active_fd.handle.clone(), - active_fd.context.format, + active_fd.format, false, /* force_reset */ )?, + format: active_fd.format, }; let total_files = fds.len(); let pipe = Self { queue, dir: cfg.dir.clone(), - file_format: LogFileFormat::new(cfg.format_version, data_layout), + file_format: LogFileFormat::new(cfg.format_version, alignment), target_file_size: cfg.target_file_size.0 as usize, bytes_per_sync: cfg.bytes_per_sync.0 as usize, file_system, @@ -229,17 +220,6 @@ impl SinglePipe { .clone()) } - /// Returns a shared [`Version`] for the specified file sequence number. - fn get_file_format(&self, file_seq: FileSeq) -> Result { - let files = self.files.read(); - if file_seq < files.first_seq || (file_seq >= files.first_seq + files.fds.len() as u64) { - return Err(Error::Corruption("file seqno out of range".to_owned())); - } - Ok(files.fds[(file_seq - files.first_seq) as usize] - .context - .format) - } - /// Creates a new file for write, and rotates the active log file. /// /// This operation is atomic in face of errors. @@ -270,7 +250,7 @@ impl SinglePipe { }; let mut new_file = ActiveFile { seq, - // The file might generated from a recycled stale-file, we should reset the file + // The file might generated from a recycled stale-file, always reset the file // header of it. writer: build_file_writer( self.file_system.as_ref(), @@ -278,27 +258,22 @@ impl SinglePipe { self.file_format, true, /* force_reset */ )?, + format: self.file_format, }; // File header must be persisted. This way we can recover gracefully if power // loss before a new entry is written. new_file.writer.sync()?; self.sync_dir()?; - let active_file_format_version = new_file.writer.header.version(); - let active_file_format_data_layout = new_file.writer.header.data_layout(); + let version = new_file.format.version; + let alignment = new_file.format.alignment; **active_file = new_file; let len = { let mut files = self.files.write(); debug_assert!(files.first_seq + files.fds.len() as u64 == seq); - files.fds.push_back(FileHandler { + files.fds.push_back(FileWithFormat { handle: fd, - context: LogFileContext::new( - FileId { - seq, - queue: self.queue, - }, - LogFileFormat::new(active_file_format_version, active_file_format_data_layout), - ), + format: LogFileFormat::new(version, alignment), }); for listener in &self.listeners { listener.post_new_log_file(FileId { @@ -326,11 +301,7 @@ impl SinglePipe { let fd = self.get_fd(handle.id.seq)?; // As the header of each log file already parsed in the processing of loading // log files, we just need to build the `LogFileReader`. - let mut reader = build_file_reader( - self.file_system.as_ref(), - fd, - Some(self.get_file_format(handle.id.seq)?), - )?; + let mut reader = build_file_reader(self.file_system.as_ref(), fd)?; reader.read(handle) } @@ -338,35 +309,27 @@ impl SinglePipe { fail_point!("file_pipe_log::append"); let mut active_file = self.active_file.lock(); let seq = active_file.seq; + #[cfg(feature = "failpoints")] + let format = active_file.format; let writer = &mut active_file.writer; #[cfg(feature = "failpoints")] { use crate::util::round_up; - let force_no_alignment = || { - fail_point!("file_pipe_log::append::force_no_alignment", |_| true); + let corrupted_padding = || { + fail_point!("file_pipe_log::append::corrupted_padding", |_| true); false }; - let force_abnormal_paddings = || { - fail_point!("file_pipe_log::append::force_abnormal_paddings", |_| true); - false - }; - if !force_no_alignment() - && writer.header.version().has_log_signing() - && writer.header.get_aligned_block_size() > 0 - { - let s_off = round_up(writer.offset(), writer.header.get_aligned_block_size()); - // Append head paddings. + if format.version.has_log_signing() && format.alignment > 0 { + let s_off = round_up(writer.offset(), format.alignment as usize); if s_off > writer.offset() { let len = s_off - writer.offset(); - let mut paddings = vec![0; len]; - if force_abnormal_paddings() { - paddings[len - 1] = 8_u8; - } else { - paddings[len - 1] = 0; + let mut zeros = vec![0; len]; + if corrupted_padding() { + zeros[len - 1] = 8_u8; } - writer.write(&paddings[..], self.target_file_size)?; + writer.write(&zeros, self.target_file_size)?; } } } @@ -445,6 +408,7 @@ impl SinglePipe { return Ok(0); } + // TODO: move these under FileCollection. // Remove some obsolete files if capacity is exceeded. let obsolete_files = (file_seq - files.first_seq) as usize; // When capacity is zero, always remove logically deleted files. @@ -453,22 +417,18 @@ impl SinglePipe { // The files with format_version `V1` cannot be chosen as recycle // candidates, which should also be removed. // Find the newest obsolete `V1` file and refresh purge count. - for recycle_idx in (purged..obsolete_files).rev() { - if !files.fds[recycle_idx] - .context - .format - .version() - .has_log_signing() - { - purged = recycle_idx + 1; + for i in (purged..obsolete_files).rev() { + if !files.fds[i].format.version.has_log_signing() { + purged = i + 1; break; } } // Update metadata of files + let old_first_seq = files.first_seq; files.first_seq += purged as u64; files.first_seq_in_use = file_seq; files.fds.drain(..purged); - (files.first_seq - purged as u64, purged, files.fds.len()) + (old_first_seq, purged, files.fds.len()) }; self.flush_metrics(remained); for seq in first_purge_seq..first_purge_seq + purged as u64 { @@ -494,7 +454,10 @@ impl SinglePipe { fn fetch_active_file(&self) -> LogFileContext { let files = self.files.read(); - files.fds.back().unwrap().context.clone() + LogFileContext { + id: FileId::new(self.queue, files.first_seq + files.fds.len() as u64 - 1), + version: files.fds.back().unwrap().format.version, + } } } @@ -579,7 +542,7 @@ mod tests { use super::super::pipe_builder::lock_dir; use super::*; use crate::env::{DefaultFileSystem, WriteExt}; - use crate::pipe_log::{DataLayout, Version}; + use crate::pipe_log::Version; use crate::util::ReadableSize; use std::io::{Read, Seek, SeekFrom, Write}; @@ -639,7 +602,7 @@ mod tests { let pipe_log = new_test_pipes(&cfg).unwrap(); assert_eq!(pipe_log.file_span(queue), (1, 1)); - let header_size = LogFileFormat::header_len() as u64; + let header_size = LogFileFormat::encode_len(Version::default()) as u64; // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; @@ -694,13 +657,12 @@ mod tests { assert!(abnormal_content_readed.is_err()); // leave only 1 file to truncate - assert!(pipe_log.purge_to(FileId { queue, seq: 3 }).is_ok()); + pipe_log.purge_to(FileId { queue, seq: 3 }).unwrap(); assert_eq!(pipe_log.file_span(queue), (3, 3)); // fetch active file let file_context = pipe_log.fetch_active_file(LogQueue::Append); - assert_eq!(file_context.format.version(), Version::default()); - assert_eq!(file_context.format.data_layout(), DataLayout::NoAlignment); + assert_eq!(file_context.version, Version::default()); assert_eq!(file_context.id.seq, 3); } @@ -729,13 +691,23 @@ mod tests { let mut buf = vec![0; 1024]; let fd = Arc::new(file_system.open(&file_id.build_file_path(path)).unwrap()); let mut new_reader = file_system.new_reader(fd).unwrap(); - let actual_len = new_reader.read(&mut buf[..]).unwrap(); + let actual_len = new_reader.read(&mut buf).unwrap(); Ok(if actual_len != data_len { false } else { - buf[..] == expected_data[..] + buf == expected_data }) } + fn new_file_handler(path: &str, file_id: FileId) -> FileWithFormat { + FileWithFormat { + handle: Arc::new( + DefaultFileSystem + .open(&file_id.build_file_path(path)) + .unwrap(), + ), + format: LogFileFormat::default(), + } + } let dir = Builder::new() .prefix("test_recycle_file_collections") .tempdir() @@ -743,19 +715,6 @@ mod tests { let path = dir.path().to_str().unwrap(); let data = vec![b'x'; 1024]; let file_system = Arc::new(DefaultFileSystem); - // test FileCollection with Default(Invalid) - { - let mut recycle_collections = FileCollection::::default(); - assert_eq!(recycle_collections.first_seq, 0); - assert_eq!(recycle_collections.first_seq_in_use, 0); - assert_eq!(recycle_collections.capacity, 0); - assert_eq!(recycle_collections.fds.len(), 0); - assert!(!recycle_collections.recycle_one_file( - &file_system, - path, - FileId::dummy(LogQueue::Append) - )); - } // test FileCollection with a valid file { // mock @@ -771,47 +730,27 @@ mod tests { queue: LogQueue::Append, seq: cur_file_id.seq + 1, }; - let _ = prepare_file(file_system.as_ref(), path, old_file_id, &data[..]); // prepare old file + let _ = prepare_file(file_system.as_ref(), path, old_file_id, &data); // prepare old file let mut recycle_collections = FileCollection:: { first_seq: old_file_id.seq, first_seq_in_use: old_file_id.seq, capacity: 3, - ..Default::default() + fds: vec![new_file_handler(path, old_file_id)].into(), }; - recycle_collections.fds.push_back(FileHandler { - handle: Arc::new( - file_system - .open(&old_file_id.build_file_path(path)) - .unwrap(), - ), - context: LogFileContext::new( - FileId::dummy(LogQueue::Append), - LogFileFormat::default(), - ), - }); // recycle an old file assert!(!recycle_collections.recycle_one_file(&file_system, path, new_file_id)); // update the reycle collection { - recycle_collections.fds.push_back(FileHandler { - handle: Arc::new( - file_system - .open(&old_file_id.build_file_path(path)) - .unwrap(), - ), - context: LogFileContext::new( - FileId::dummy(LogQueue::Append), - LogFileFormat::default(), - ), - }); + recycle_collections + .fds + .push_back(new_file_handler(path, old_file_id)); recycle_collections.first_seq_in_use = cur_file_id.seq; } // recycle an old file assert!(recycle_collections.recycle_one_file(&file_system, path, new_file_id)); // validate the content of recycled file assert!( - validate_content_of_file(file_system.as_ref(), path, new_file_id, &data[..]) - .unwrap() + validate_content_of_file(file_system.as_ref(), path, new_file_id, &data).unwrap() ); // rewrite then rename with validation on the content. { @@ -822,14 +761,14 @@ mod tests { .unwrap(), ); let mut new_writer = file_system.new_writer(fd).unwrap(); - assert!(new_writer.seek(SeekFrom::Start(0)).is_ok()); - assert_eq!(new_writer.write(&refreshed_data[..]).unwrap(), 1024); - assert!(new_writer.sync().is_ok()); + new_writer.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(new_writer.write(&refreshed_data).unwrap(), 1024); + new_writer.sync().unwrap(); assert!(validate_content_of_file( file_system.as_ref(), path, new_file_id, - &refreshed_data[..] + &refreshed_data ) .unwrap()); } @@ -840,27 +779,17 @@ mod tests { queue: LogQueue::Append, seq: 11, }; - let _ = prepare_file(file_system.as_ref(), path, fake_file_id, &data[..]); // prepare old file + let _ = prepare_file(file_system.as_ref(), path, fake_file_id, &data); // prepare old file let mut recycle_collections = FileCollection:: { first_seq: fake_file_id.seq, first_seq_in_use: fake_file_id.seq + 1, capacity: 2, - ..Default::default() + fds: vec![new_file_handler(path, fake_file_id)].into(), }; - recycle_collections.fds.push_back(FileHandler { - handle: Arc::new( - file_system - .open(&fake_file_id.build_file_path(path)) - .unwrap(), - ), - context: LogFileContext::new(fake_file_id, LogFileFormat::default()), - }); - let first_file_id = recycle_collections.fds.front().unwrap().context.id; - assert_eq!(first_file_id, fake_file_id); // mock the failure on `rename` - assert!(file_system + file_system .delete(&fake_file_id.build_file_path(path)) - .is_ok()); + .unwrap(); let new_file_id = FileId { queue: LogQueue::Append, seq: 13, @@ -869,9 +798,8 @@ mod tests { // recycled. assert!(!recycle_collections.recycle_one_file(&file_system, path, new_file_id)); assert_eq!(recycle_collections.fds.len(), 1); - assert_eq!(first_file_id, fake_file_id); // rebuild the file for recycle - prepare_file(file_system.as_ref(), path, fake_file_id, &data[..]).unwrap(); + prepare_file(file_system.as_ref(), path, fake_file_id, &data).unwrap(); assert!(recycle_collections.recycle_one_file(&file_system, path, new_file_id)); assert!(recycle_collections.fds.is_empty()); } @@ -898,8 +826,7 @@ mod tests { let pipe_log = new_test_pipes(&cfg).unwrap(); assert_eq!(pipe_log.file_span(queue), (1, 1)); - let header_size = - (LogFileFormat::header_len() + LogFileFormat::payload_len(cfg.format_version)) as u64; + let header_size = LogFileFormat::encode_len(cfg.format_version) as u64; // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; @@ -954,13 +881,12 @@ mod tests { assert!(abnormal_content_readed.is_err()); // leave only 1 file to truncate - assert!(pipe_log.purge_to(FileId { queue, seq: 3 }).is_ok()); + pipe_log.purge_to(FileId { queue, seq: 3 }).unwrap(); assert_eq!(pipe_log.file_span(queue), (3, 3)); // fetch active file let file_context = pipe_log.fetch_active_file(LogQueue::Append); - assert_eq!(file_context.format.version(), Version::V2); - assert_eq!(file_context.format.data_layout(), DataLayout::NoAlignment); + assert_eq!(file_context.version, Version::V2); assert_eq!(file_context.id.seq, 3); } } diff --git a/src/file_pipe_log/pipe_builder.rs b/src/file_pipe_log/pipe_builder.rs index e6b61929..8272dbe5 100644 --- a/src/file_pipe_log/pipe_builder.rs +++ b/src/file_pipe_log/pipe_builder.rs @@ -16,13 +16,13 @@ use crate::config::{Config, RecoveryMode}; use crate::env::FileSystem; use crate::event_listener::EventListener; use crate::log_batch::LogItemBatch; -use crate::pipe_log::{DataLayout, FileId, FileSeq, LogFileContext, LogQueue}; +use crate::pipe_log::{FileId, FileSeq, LogQueue}; use crate::util::Factory; use crate::{Error, Result}; use super::format::{lock_file_path, FileNameExt, LogFileFormat}; -use super::log_file::{build_file_reader, FileHandler}; -use super::pipe::{DualPipes, SinglePipe}; +use super::log_file::build_file_reader; +use super::pipe::{DualPipes, FileWithFormat, SinglePipe}; use super::reader::LogItemBatchFileReader; use crate::env::Handle; @@ -59,8 +59,6 @@ impl Factory for DefaultMachineFactory { pub struct RecoveryConfig { pub queue: LogQueue, pub mode: RecoveryMode, - /// TODO: This opt should defined by whether open DIO or not. - pub file_format: LogFileFormat, pub concurrency: usize, pub read_block_size: u64, } @@ -233,7 +231,6 @@ impl DualPipesBuilder { let append_recovery_cfg = RecoveryConfig { queue: LogQueue::Append, mode: self.cfg.recovery_mode, - file_format: LogFileFormat::new(self.cfg.format_version, DataLayout::NoAlignment), concurrency: append_concurrency, read_block_size: self.cfg.recovery_read_block_size.0, }; @@ -285,7 +282,6 @@ impl DualPipesBuilder { let queue = recovery_cfg.queue; let concurrency = recovery_cfg.concurrency; let recovery_mode = recovery_cfg.mode; - let file_format = recovery_cfg.file_format; let recovery_read_block_size = recovery_cfg.read_block_size as usize; let max_chunk_size = std::cmp::max((files.len() + concurrency - 1) / concurrency, 1); @@ -301,8 +297,10 @@ impl DualPipesBuilder { let file_count = chunk.len(); for (i, f) in chunk.iter_mut().enumerate() { let is_last_file = index == chunk_count - 1 && i == file_count - 1; - match build_file_reader(file_system.as_ref(), f.handle.clone(), None) { + let mut file_reader = build_file_reader(file_system.as_ref(), f.handle.clone())?; + match file_reader.parse_format() { Err(e) => { + // TODO: More reliable tail detection. if recovery_mode == RecoveryMode::TolerateAnyCorruption || recovery_mode == RecoveryMode::TolerateTailCorruption && is_last_file { @@ -311,7 +309,7 @@ impl DualPipesBuilder { queue, f.seq, e ); f.handle.truncate(0)?; - f.format = Some(file_format); + f.format = Some(LogFileFormat::default()); continue; } else { error!( @@ -321,12 +319,11 @@ impl DualPipesBuilder { return Err(e); } }, - Ok(file_reader) => { - reader.open(FileId { queue, seq: f.seq }, file_reader)?; + Ok(format) => { + f.format = Some(format); + reader.open(FileId { queue, seq: f.seq }, format, file_reader)?; } } - // Update file format of each log file. - f.format = reader.file_format(); loop { match reader.next() { Ok(Some(item_batch)) => { @@ -406,14 +403,11 @@ impl DualPipesBuilder { LogQueue::Rewrite => &self.rewrite_files, }; let first_seq = files.first().map(|f| f.seq).unwrap_or(0); - let files: VecDeque> = files + let files: VecDeque> = files .iter() - .map(|f| FileHandler { + .map(|f| FileWithFormat { handle: f.handle.clone(), - context: LogFileContext::new( - FileId { seq: f.seq, queue }, - *f.format.as_ref().unwrap(), - ), + format: f.format.unwrap(), }) .collect(); SinglePipe::open( diff --git a/src/file_pipe_log/reader.rs b/src/file_pipe_log/reader.rs index 0f9897e0..c5ad7e62 100644 --- a/src/file_pipe_log/reader.rs +++ b/src/file_pipe_log/reader.rs @@ -2,16 +2,17 @@ use crate::env::FileSystem; use crate::log_batch::{LogBatch, LogItemBatch, LOG_BATCH_HEADER_LEN}; -use crate::pipe_log::{DataLayout, FileBlockHandle, FileId, LogFileContext}; +use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext}; use crate::util::round_up; use crate::{Error, Result}; -use super::format::{is_valid_paddings, LogFileFormat}; +use super::format::{is_zero_padded, LogFileFormat}; use super::log_file::LogFileReader; /// A reusable reader over [`LogItemBatch`]s in a log file. pub(super) struct LogItemBatchFileReader { - file_context: Option, + file_id: Option, + format: Option, reader: Option>, size: usize, @@ -29,7 +30,8 @@ impl LogItemBatchFileReader { /// Creates a new reader. pub fn new(read_block_size: usize) -> Self { Self { - file_context: None, + file_id: None, + format: None, reader: None, size: 0, @@ -42,9 +44,15 @@ impl LogItemBatchFileReader { } /// Opens a file that can be accessed through the given reader. - pub fn open(&mut self, file_id: FileId, reader: LogFileReader) -> Result<()> { - self.valid_offset = reader.file_format().enc_len(); - self.file_context = Some(LogFileContext::new(file_id, *reader.file_format())); + pub fn open( + &mut self, + file_id: FileId, + format: LogFileFormat, + reader: LogFileReader, + ) -> Result<()> { + self.valid_offset = LogFileFormat::encode_len(format.version); + self.file_id = Some(file_id); + self.format = Some(format); self.size = reader.file_size()?; self.reader = Some(reader); self.buffer.clear(); @@ -54,12 +62,13 @@ impl LogItemBatchFileReader { /// Closes any ongoing file access. pub fn reset(&mut self) { + self.file_id = None; + self.format = None; self.reader = None; self.size = 0; self.buffer.clear(); self.buffer_offset = 0; self.valid_offset = 0; - self.file_context = None; } /// Returns the next [`LogItemBatch`] in current opened file. Returns @@ -70,24 +79,23 @@ impl LogItemBatchFileReader { // is open, and the following reading strategy should tolerate reading broken // blocks until it finds an accessible header of `LogBatch`. while self.valid_offset < self.size { - let data_layout = self.file_context.as_ref().unwrap().format.data_layout(); + let alignment = self.format.unwrap().alignment; if self.valid_offset < LOG_BATCH_HEADER_LEN { return Err(Error::Corruption( "attempt to read file with broken header".to_owned(), )); } - let header_parser = LogBatch::decode_header(&mut self.peek( + let r = LogBatch::decode_header(&mut self.peek( self.valid_offset, LOG_BATCH_HEADER_LEN, 0, )?); if_chain::if_chain! { - if header_parser.is_err(); - if let DataLayout::Alignment(fs_block_size) = data_layout; - if fs_block_size > 0; - let aligned_next_offset = round_up(self.valid_offset, fs_block_size as usize); + if r.is_err(); + if alignment > 0; + let aligned_next_offset = round_up(self.valid_offset, alignment as usize); if self.valid_offset != aligned_next_offset; - if is_valid_paddings(self.peek(self.valid_offset, aligned_next_offset - self.valid_offset, 0)?); + if is_zero_padded(self.peek(self.valid_offset, aligned_next_offset - self.valid_offset, 0)?); then { // In DataLayout::Alignment mode, tail data in the previous block // may be aligned with paddings, that is '0'. So, we need to @@ -100,16 +108,19 @@ impl LogItemBatchFileReader { // it means that the header is broken or the padding is filled // with non-zero bytes, and the err will be returned. } - let (footer_offset, compression_type, len) = header_parser?; + let (footer_offset, compression_type, len) = r?; if self.valid_offset + len > self.size { return Err(Error::Corruption("log batch header broken".to_owned())); } - let file_context = self.file_context.as_ref().unwrap().clone(); let handle = FileBlockHandle { - id: file_context.id, + id: self.file_id.unwrap(), offset: (self.valid_offset + LOG_BATCH_HEADER_LEN) as u64, len: footer_offset - LOG_BATCH_HEADER_LEN, }; + let context = LogFileContext { + id: self.file_id.unwrap(), + version: self.format.unwrap().version, + }; let item_batch = LogItemBatch::decode( &mut self.peek( self.valid_offset + footer_offset, @@ -118,7 +129,7 @@ impl LogItemBatchFileReader { )?, handle, compression_type, - &file_context, + &context, )?; self.valid_offset += len; return Ok(Some(item_batch)); @@ -175,8 +186,4 @@ impl LogItemBatchFileReader { pub fn valid_offset(&self) -> usize { self.valid_offset } - - pub fn file_format(&self) -> Option { - self.reader.as_ref().map(|reader| *reader.file_format()) - } } diff --git a/src/filter.rs b/src/filter.rs index 920b74f4..74609895 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -19,7 +19,7 @@ use crate::{Error, Result}; /// `FilterResult` determines how to alter the existing log items in /// `RhaiFilterMachine`. -#[derive(PartialEq)] +#[derive(PartialEq, Eq)] enum FilterResult { /// Apply in the usual way. Default, @@ -275,13 +275,10 @@ impl RhaiFilterMachine { }), )); let mut reader = build_file_reader(system, &bak_path)?; - let mut writer = build_file_writer( - system, - &target_path, - *reader.file_format(), - true, /* create */ - )?; - let log_file_context = LogFileContext::new(f.file_id, *reader.file_format()); + let format = reader.parse_format()?; + let mut writer = + build_file_writer(system, &target_path, format, true /* create */)?; + let log_file_context = LogFileContext::new(f.file_id, format.version); // Write out new log file. for item in f.items.into_iter() { match item.content { diff --git a/src/lib.rs b/src/lib.rs index 6cf5662c..9facd4ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,7 +67,7 @@ pub use engine::Engine; pub use errors::{Error, Result}; pub use log_batch::{Command, LogBatch, MessageExt}; pub use metrics::{get_perf_context, set_perf_context, take_perf_context, PerfContext}; -pub use pipe_log::{DataLayout, Version}; +pub use pipe_log::Version; pub use util::ReadableSize; #[cfg(feature = "internals")] @@ -80,7 +80,6 @@ pub mod internals { pub use crate::pipe_log::*; #[cfg(feature = "swap")] pub use crate::swappy_allocator::*; - pub use crate::util::{round_down, round_up}; pub use crate::write_barrier::*; } diff --git a/src/log_batch.rs b/src/log_batch.rs index e791213e..5d3f0676 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -67,7 +67,7 @@ type SliceReader<'a> = &'a [u8]; // Format: // { count | first index | [ tail offsets ] } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct EntryIndexes(pub Vec); impl EntryIndexes { @@ -114,7 +114,7 @@ impl EntryIndexes { // Format: // { type | (index) } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Command { Clean, Compact { index: u64 }, @@ -157,7 +157,7 @@ impl Command { } #[repr(u8)] -#[derive(Debug, PartialEq, Copy, Clone)] +#[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum OpType { Put = 1, Del = 2, @@ -179,7 +179,7 @@ impl OpType { // Format: // { op_type | key len | key | ( value len | value ) } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct KeyValue { pub op_type: OpType, pub key: Vec, @@ -238,13 +238,13 @@ impl KeyValue { // Format: // { 8 byte region id | 1 byte type | item } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct LogItem { pub raft_group_id: u64, pub content: LogItemContent, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum LogItemContent { EntryIndexes(EntryIndexes), Command(Command), @@ -342,7 +342,7 @@ pub(crate) type LogItemDrain<'a> = std::vec::Drain<'a, LogItem>; /// A lean batch of log item, without entry data. // Format: // { item count | [items] | crc32 } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct LogItemBatch { items: Vec, item_size: usize, @@ -505,7 +505,7 @@ impl LogItemBatch { file_context: &LogFileContext, ) -> Result { // Validate the checksum of each LogItemBatch by the signature. - verify_checksum_with_context(buf, file_context)?; + verify_checksum_with_signature(buf, file_context.get_signature())?; *buf = &buf[..buf.len() - LOG_BATCH_CHECKSUM_LEN]; let count = codec::decode_var_u64(buf)?; let mut items = LogItemBatch::with_capacity(count as usize); @@ -535,7 +535,7 @@ impl LogItemBatch { } } -#[derive(Copy, Clone, Debug, PartialEq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] enum BufState { /// Buffer contains header and optionally entries. /// # Invariants @@ -573,7 +573,7 @@ enum BufState { /// limits. // Calling protocol: // Insert log items -> [`finish_populate`] -> [`finish_write`] -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, PartialEq, Eq, Debug)] pub struct LogBatch { item_batch: LogItemBatch, buf_state: BufState, @@ -770,16 +770,9 @@ impl LogBatch { fail::fail_point!("log_batch::corrupted_items", |_| true); false }; - let corrupted_entries = || { - fail::fail_point!("log_batch::corrupted_entries", |_| true); - false - }; if corrupted_items() { self.buf[footer_roffset] += 1; } - if corrupted_entries() && footer_roffset > LOG_BATCH_HEADER_LEN { - self.buf[footer_roffset - 1] += 1; - } } self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN); @@ -894,7 +887,7 @@ impl LogBatch { compression: CompressionType, ) -> Result> { if handle.len > 0 { - verify_checksum(&buf[0..handle.len])?; + verify_checksum_with_signature(&buf[0..handle.len], None)?; match compression { CompressionType::None => Ok(buf[..handle.len - LOG_BATCH_CHECKSUM_LEN].to_owned()), CompressionType::Lz4 => { @@ -910,28 +903,8 @@ impl LogBatch { } /// Verifies the checksum of a slice of bytes that sequentially holds data and -/// checksum. -fn verify_checksum(buf: &[u8]) -> Result<()> { - if buf.len() <= LOG_BATCH_CHECKSUM_LEN { - return Err(Error::Corruption(format!( - "Content too short {}", - buf.len() - ))); - } - let expected = codec::decode_u32_le(&mut &buf[buf.len() - LOG_BATCH_CHECKSUM_LEN..])?; - let actual = crc32(&buf[..buf.len() - LOG_BATCH_CHECKSUM_LEN]); - if actual != expected { - return Err(Error::Corruption(format!( - "Checksum expected {} but got {}", - expected, actual - ))); - } - Ok(()) -} - -/// Verifies the checksum of a slice of bytes that sequentially holds data and -/// checksum generated with the given `file_context`. -fn verify_checksum_with_context(buf: &[u8], file_context: &LogFileContext) -> Result<()> { +/// checksum. The checksum field may be signed by XOR-ing with an u32. +fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result<()> { if buf.len() <= LOG_BATCH_CHECKSUM_LEN { return Err(Error::Corruption(format!( "Content too short {}", @@ -940,15 +913,13 @@ fn verify_checksum_with_context(buf: &[u8], file_context: &LogFileContext) -> Re } let expected = codec::decode_u32_le(&mut &buf[buf.len() - LOG_BATCH_CHECKSUM_LEN..])?; let mut actual = crc32(&buf[..buf.len() - LOG_BATCH_CHECKSUM_LEN]); - if let Some(signature) = file_context.get_signature() { + if let Some(signature) = signature { actual ^= signature; } if actual != expected { return Err(Error::Corruption(format!( - "Checksum expected {} but got {}, format_version: {:?}", - expected, - actual, - file_context.format.version() + "Checksum expected {} but got {}", + expected, actual ))); } Ok(()) @@ -957,7 +928,6 @@ fn verify_checksum_with_context(buf: &[u8], file_context: &LogFileContext) -> Re #[cfg(test)] mod tests { use super::*; - use crate::file_pipe_log::LogFileFormat; use crate::pipe_log::{LogQueue, Version}; use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt}; use protobuf::parse_from_bytes; @@ -1139,7 +1109,7 @@ mod tests { let mut encoded_batch = vec![]; batch.encode(&mut encoded_batch).unwrap(); let file_context = - LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default()); + LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()); let decoded_batch = LogItemBatch::decode( &mut encoded_batch.as_slice(), FileBlockHandle::dummy(LogQueue::Append), @@ -1178,9 +1148,8 @@ mod tests { assert_eq!(batch.approximate_size(), len); let mut batch_handle = mocked_file_block_handle; batch_handle.len = len; - let file_context = - LogFileContext::new(batch_handle.id, LogFileFormat::from_version(version)); - assert!(batch.prepare_write(&file_context).is_ok()); + let file_context = LogFileContext::new(batch_handle.id, version); + batch.prepare_write(&file_context).unwrap(); batch.finish_write(batch_handle); let encoded = batch.encoded_bytes(); assert_eq!(encoded.len(), len); @@ -1196,7 +1165,7 @@ mod tests { let item_batch = batch.item_batch.clone(); // decode item batch - let mut bytes_slice = &*encoded; + let mut bytes_slice = encoded; let (offset, compression_type, len) = LogBatch::decode_header(&mut bytes_slice).unwrap(); assert_eq!(len, encoded.len()); @@ -1204,8 +1173,46 @@ mod tests { let mut entries_handle = mocked_file_block_handle; entries_handle.offset = LOG_BATCH_HEADER_LEN as u64; entries_handle.len = offset - LOG_BATCH_HEADER_LEN; - let file_context = - LogFileContext::new(entries_handle.id, LogFileFormat::from_version(version)); + let file_context = LogFileContext::new(entries_handle.id, version); + { + // Decoding with wrong compression type is okay. + LogItemBatch::decode( + &mut &encoded[offset..], + entries_handle, + if compression_type == CompressionType::None { + CompressionType::Lz4 + } else { + CompressionType::None + }, + &file_context, + ) + .unwrap(); + // Decode with wrong file number. + if version.has_log_signing() { + LogItemBatch::decode( + &mut &encoded[offset..], + entries_handle, + compression_type, + &LogFileContext::new(FileId::new(LogQueue::Append, u64::MAX), version), + ) + .unwrap_err(); + } + // Decode with wrong version. + LogItemBatch::decode( + &mut &encoded[offset..], + entries_handle, + compression_type, + &LogFileContext::new( + file_context.id, + if version == Version::V1 { + Version::V2 + } else { + Version::V1 + }, + ), + ) + .unwrap_err(); + } let decoded_item_batch = LogItemBatch::decode( &mut &encoded[offset..], entries_handle, @@ -1257,9 +1264,10 @@ mod tests { // Validate with different Versions for version in Version::iter() { - for (batch, entry_data) in batches.clone().into_iter() { - decode_and_encode(batch.clone(), true, version, &entry_data); - decode_and_encode(batch, false, version, &entry_data); + for compress in [true, false] { + for (batch, entry_data) in batches.clone().into_iter() { + decode_and_encode(batch, compress, version, &entry_data); + } } } } @@ -1271,7 +1279,7 @@ mod tests { let mut kvs = Vec::new(); let data = vec![b'x'; 1024]; let file_id = FileId::dummy(LogQueue::Append); - let file_context = LogFileContext::new(file_id, LogFileFormat::default()); + let file_context = LogFileContext::new(file_id, Version::default()); let mut batch1 = LogBatch::default(); entries.push(generate_entries(1, 11, Some(&data))); @@ -1287,6 +1295,8 @@ mod tests { kvs.push((k, v)); } + batch1.merge(&mut LogBatch::default()).unwrap(); + let mut batch2 = LogBatch::default(); entries.push(generate_entries(11, 21, Some(&data))); batch2 @@ -1305,7 +1315,7 @@ mod tests { assert!(batch2.is_empty()); let len = batch1.finish_populate(0).unwrap(); - assert!(batch1.prepare_write(&file_context).is_ok()); + batch1.prepare_write(&file_context).unwrap(); let encoded = batch1.encoded_bytes(); assert_eq!(len, encoded.len()); @@ -1354,71 +1364,6 @@ mod tests { assert!(batch.is_empty()); } - #[test] - fn test_verify_checksum() { - use rand::{thread_rng, Rng}; - // Invalid header - let invalid_data: Vec = (0..LOG_BATCH_CHECKSUM_LEN) - .map(|_| thread_rng().gen()) - .collect(); - assert!(verify_checksum_with_context( - &invalid_data[..], - &LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default()) - ) - .is_err()); - { - // Sign checksum and verify it with Version::V1 - let mut data: Vec = (0..128).map(|_| thread_rng().gen()).collect(); - let checksum = crc32(&data[..]); - data.encode_u32_le(checksum).unwrap(); - assert!(verify_checksum_with_context( - &data[..], - &LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default()) - ) - .is_ok()); - // file_context.signature() == 0 - assert!(verify_checksum_with_context( - &data[..], - &LogFileContext::new( - FileId::dummy(LogQueue::Rewrite), - LogFileFormat::from_version(Version::V2), - ), - ) - .is_ok()); - let file_context = LogFileContext::new( - FileId { - seq: 11, - queue: LogQueue::Rewrite, - }, - LogFileFormat::default(), - ); - assert!(verify_checksum_with_context(&data[..], &file_context).is_ok()); - } - { - // Sign checksum and verify it with Version::V2 - let file_context_v1 = LogFileContext::new( - FileId { - seq: 11, - queue: LogQueue::Rewrite, - }, - LogFileFormat::from_version(Version::V1), - ); - let file_context_v2 = LogFileContext::new( - FileId { - seq: 11, - queue: LogQueue::Rewrite, - }, - LogFileFormat::from_version(Version::V2), - ); - let mut data: Vec = (0..128).map(|_| thread_rng().gen()).collect(); - let checksum = crc32(&data[..]); - data.encode_u32_le(checksum ^ file_context_v2.get_signature().unwrap()) - .unwrap(); - assert!(verify_checksum_with_context(&data[..], &file_context_v1).is_err()); - assert!(verify_checksum_with_context(&data[..], &file_context_v2).is_ok()); - } - } - #[cfg(feature = "nightly")] #[bench] fn bench_log_batch_add_entry_and_encode(b: &mut test::Bencher) { diff --git a/src/memtable.rs b/src/memtable.rs index 88afeab7..2c6fc055 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -9,7 +9,6 @@ use std::sync::Arc; use fail::fail_point; use hashbrown::HashMap; use parking_lot::{Mutex, RwLock}; -use protobuf::{parse_from_bytes, Message}; use crate::config::Config; use crate::file_pipe_log::ReplayMachine; @@ -73,7 +72,7 @@ const CAPACITY_INIT: usize = 32 - 1; const MEMTABLE_SLOT_COUNT: usize = 128; /// Location of a log entry. -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct EntryIndex { /// Logical index. pub index: u64, @@ -113,7 +112,7 @@ impl EntryIndex { } } -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] struct ThinEntryIndex { entries: Option, compression_type: CompressionType, @@ -261,41 +260,9 @@ impl MemTable { self.kvs.get(key).map(|v| v.0.clone()) } - /// Iterator over [start_key, end_key) range. - pub fn scan_messages( - &self, - start_key: Option<&[u8]>, - end_key: Option<&[u8]>, - reverse: bool, - mut f: F, - ) -> Result<()> - where - S: Message, - F: FnMut(&[u8], S) -> bool, - { - let lower = start_key.map(Bound::Included).unwrap_or(Bound::Unbounded); - let upper = end_key.map(Bound::Excluded).unwrap_or(Bound::Unbounded); - let iter = self.kvs.range::<[u8], _>((lower, upper)); - if reverse { - for (key, (v, _)) in iter.rev() { - let value = parse_from_bytes(v)?; - if !f(key, value) { - break; - } - } - } else { - for (key, (v, _)) in iter { - let value = parse_from_bytes(v)?; - if !f(key, value) { - break; - } - } - } - Ok(()) - } - - /// Iterator over [start_key, end_key) range. - pub fn scan_raw_messages( + /// Iterates over [start_key, end_key) range and yields all key value pairs + /// as bytes. + pub fn scan( &self, start_key: Option<&[u8]>, end_key: Option<&[u8]>, @@ -1715,83 +1682,75 @@ mod tests { #[test] fn test_memtable_kv_operations() { + fn key(i: u64) -> Vec { + format!("k{}", i).as_bytes().to_vec() + } + fn value(i: u64) -> Vec { + format!("v{}", i).as_bytes().to_vec() + } + let region_id = 8; let mut memtable = MemTable::new(region_id, Arc::new(GlobalStats::default())); - let (k1, v1) = (b"key1", b"value1"); - let (k5, v5) = (b"key5", b"value5"); - memtable.put(k1.to_vec(), v1.to_vec(), FileId::new(LogQueue::Append, 1)); - memtable.put(k5.to_vec(), v5.to_vec(), FileId::new(LogQueue::Append, 5)); + memtable.put(key(1), value(1), FileId::new(LogQueue::Append, 1)); + memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 5)); assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1); assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 5); - assert_eq!(memtable.get(k1.as_ref()), Some(v1.to_vec())); - assert_eq!(memtable.get(k5.as_ref()), Some(v5.to_vec())); + assert_eq!(memtable.get(&key(1)), Some(value(1))); + assert_eq!(memtable.get(&key(5)), Some(value(5))); - let mut res = vec![]; + let mut res = Vec::new(); memtable - .scan_raw_messages(None, None, false, |key, value| { - res.push((key.to_vec(), value.to_vec())); - true + .scan(None, None, false, |k, v| { + res.push((k.to_vec(), v.to_vec())); + false }) .unwrap(); - assert_eq!( - res, - vec![(k1.to_vec(), v1.to_vec()), (k5.to_vec(), v5.to_vec())] - ); + assert_eq!(res, vec![(key(1), value(1))]); res.clear(); memtable - .scan_raw_messages(None, None, true, |key, value| { - res.push((key.to_vec(), value.to_vec())); - true + .scan(None, None, true, |k, v| { + res.push((k.to_vec(), v.to_vec())); + false }) .unwrap(); - assert_eq!( - res, - vec![(k5.to_vec(), v5.to_vec()), (k1.to_vec(), v1.to_vec())] - ); + assert_eq!(res, vec![(key(5), value(5))]); res.clear(); memtable - .scan_raw_messages(None, Some(b"key1"), false, |key, value| { + .scan(Some(&key(5)), None, false, |key, value| { res.push((key.to_vec(), value.to_vec())); true }) .unwrap(); - assert_eq!(res, vec![]); - memtable - .scan_raw_messages(Some(b"key5"), None, false, |key, value| { - res.push((key.to_vec(), value.to_vec())); - true - }) - .unwrap(); - assert_eq!(res, vec![(k5.to_vec(), v5.to_vec())]); + assert_eq!(res, vec![(key(5), value(5))]); res.clear(); memtable - .scan_raw_messages(Some(b"key1"), Some(b"key5"), false, |key, value| { + .scan(Some(&key(1)), Some(&key(5)), false, |key, value| { res.push((key.to_vec(), value.to_vec())); true }) .unwrap(); - assert_eq!(res, vec![(k1.to_vec(), v1.to_vec())]); + assert_eq!(res, vec![(key(1), value(1))]); - memtable.delete(k5.as_ref()); - assert_eq!(memtable.get(k5.as_ref()), None); + memtable.delete(&key(5)); + assert_eq!(memtable.get(&key(5)), None); assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 1); assert_eq!(memtable.max_file_seq(LogQueue::Append).unwrap(), 1); - memtable.put(k1.to_vec(), v1.to_vec(), FileId::new(LogQueue::Rewrite, 2)); - memtable.put(k5.to_vec(), v5.to_vec(), FileId::new(LogQueue::Rewrite, 3)); + memtable.put(key(1), value(1), FileId::new(LogQueue::Rewrite, 2)); + memtable.put(key(5), value(5), FileId::new(LogQueue::Rewrite, 3)); assert_eq!(memtable.min_file_seq(LogQueue::Append), None); assert_eq!(memtable.max_file_seq(LogQueue::Append), None); assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 2); assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3); assert_eq!(memtable.global_stats.rewrite_entries(), 2); - memtable.delete(k1.as_ref()); + memtable.delete(&key(1)); assert_eq!(memtable.min_file_seq(LogQueue::Rewrite).unwrap(), 3); assert_eq!(memtable.max_file_seq(LogQueue::Rewrite).unwrap(), 3); assert_eq!(memtable.global_stats.deleted_rewrite_entries(), 1); - memtable.put(k5.to_vec(), v5.to_vec(), FileId::new(LogQueue::Append, 7)); + memtable.put(key(5), value(5), FileId::new(LogQueue::Append, 7)); assert_eq!(memtable.min_file_seq(LogQueue::Rewrite), None); assert_eq!(memtable.max_file_seq(LogQueue::Rewrite), None); assert_eq!(memtable.min_file_seq(LogQueue::Append).unwrap(), 7); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 500370bb..ff392cb6 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -3,13 +3,14 @@ //! A generic log storage. use std::cmp::Ordering; +use std::fmt::{self, Display}; use fail::fail_point; use num_derive::{FromPrimitive, ToPrimitive}; +use num_traits::ToPrimitive; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; -use crate::file_pipe_log::LogFileFormat; use crate::Result; /// The type of log queue. @@ -24,7 +25,7 @@ pub enum LogQueue { pub type FileSeq = u64; /// A unique identifier for a log file. -#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct FileId { pub queue: LogQueue, pub seq: FileSeq, @@ -61,7 +62,7 @@ impl std::cmp::PartialOrd for FileId { } /// A logical pointer to a chunk of log file data. -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct FileBlockHandle { pub id: FileId, pub offset: u64, @@ -101,7 +102,7 @@ pub enum Version { impl Version { pub fn has_log_signing(&self) -> bool { - fail_point!("pipe_log::version::force_enable", |_| { true }); + fail_point!("pipe_log::version::force_enable_log_signing", |_| { true }); match self { Version::V1 => false, Version::V2 => true, @@ -115,60 +116,28 @@ impl Default for Version { } } -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum DataLayout { - NoAlignment, - /// Alignment(block_size) mode in memory for DataLayout will make sense - /// when DIO is open. - /// - /// `block_size` will be dumped into the header of each log file. - Alignment(u64), -} - -impl DataLayout { - pub fn from_u64(val: u64) -> Self { - match val { - 0 => DataLayout::NoAlignment, - aligned => DataLayout::Alignment(aligned), - } - } - - pub fn to_u64(self) -> u64 { - match self { - DataLayout::NoAlignment => 0, - DataLayout::Alignment(aligned) => { - debug_assert!(aligned > 0); - aligned - } - } - } - - pub const fn len() -> usize { - std::mem::size_of::() /* serialized in u64. */ +impl Display for Version { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.to_u64().unwrap()) } } #[derive(Debug, Clone)] pub struct LogFileContext { pub id: FileId, - pub format: LogFileFormat, + pub version: Version, } impl LogFileContext { - pub fn new(file_id: FileId, format: LogFileFormat) -> Self { - Self { - id: file_id, - format, - } + pub fn new(id: FileId, version: Version) -> Self { + Self { id, version } } - /// Return the `signature` in `Option` format. - /// - /// `None` will be returned only if `self.version` is invalid. + /// Returns the `signature` in `u32` format. pub fn get_signature(&self) -> Option { - if self.format.version().has_log_signing() { + if self.version.has_log_signing() { // Here, the count of files will be always limited to less than - // `UINT32_MAX`. So, we just use the low 32 bit as the `signature` + // `u32::MAX`. So, we just use the low 32 bit as the `signature` // by default. Some(self.id.seq as u32) } else { @@ -185,10 +154,7 @@ pub trait PipeLog: Sized { /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. /// - /// Incoming `bytes` will be appended to the `active_file`, which should - /// only be held by one thread, that is, the leader of the writer group. - /// Also, it's not permitted to change the `active_file` until the leader - /// confirms all bytes have been dumped into the file. + /// The result of `fetch_active_file` will not be affected by this method. fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result; /// Hints it to synchronize buffered writes. The synchronization is @@ -229,7 +195,7 @@ pub trait PipeLog: Sized { /// Returns the number of deleted files. fn purge_to(&self, file_id: FileId) -> Result; - /// Returns `[LogFileContext]` of the active file in the specific + /// Returns [`LogFileContext`] of the active file in the specific /// log queue. fn fetch_active_file(&self, queue: LogQueue) -> LogFileContext; } diff --git a/src/purge.rs b/src/purge.rs index 183c062f..9be81c5b 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -104,7 +104,7 @@ where should_compact.extend(self.rewrite_or_compact_append_queue( rewrite_watermark, compact_watermark, - &mut *rewrite_candidate_regions, + &mut rewrite_candidate_regions, )?); if append_queue_barrier == first_append && first_append < latest_append { diff --git a/src/swappy_allocator.rs b/src/swappy_allocator.rs index b103f791..e1767b71 100644 --- a/src/swappy_allocator.rs +++ b/src/swappy_allocator.rs @@ -1067,7 +1067,7 @@ mod tests { // test_drain_leak static mut DROPS: i32 = 0; - #[derive(Debug, PartialEq)] + #[derive(Debug, PartialEq, Eq)] struct D(u32, bool); impl Drop for D { diff --git a/src/util.rs b/src/util.rs index 55ac534e..05bb15e6 100644 --- a/src/util.rs +++ b/src/util.rs @@ -19,7 +19,7 @@ pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE; pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE; pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE; -#[derive(Clone, Debug, Copy, PartialEq, PartialOrd)] +#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd)] pub struct ReadableSize(pub u64); impl ReadableSize { @@ -325,13 +325,11 @@ pub trait Factory: Send + Sync { fn new_target(&self) -> Target; } -/// Return an aligned `offset`. +/// Returns an aligned `offset`. /// /// # Example: /// -/// ``` -/// use raft_engine::internals::round_up; -/// +/// ```ignore /// assert_eq!(round_up(18, 4), 20); /// assert_eq!(round_up(64, 16), 64); /// ``` @@ -340,13 +338,11 @@ pub fn round_up(offset: usize, alignment: usize) -> usize { (offset + alignment - 1) / alignment * alignment } -/// Return an aligned `offset`. +/// Returns an aligned `offset`. /// /// # Example: /// -/// ``` -/// use raft_engine::internals::round_down; -/// +/// ```ignore /// assert_eq!(round_down(18, 4), 16); /// assert_eq!(round_down(64, 16), 64); /// ``` diff --git a/src/write_barrier.rs b/src/write_barrier.rs index 3652b8ee..3d365456 100644 --- a/src/write_barrier.rs +++ b/src/write_barrier.rs @@ -46,14 +46,8 @@ impl Writer { } } - /// Returns an immutable reference to the payload. - #[cfg(test)] - pub fn get_payload(&self) -> &P { - unsafe { &*self.payload } - } - /// Returns a mutable reference to the payload. - pub fn get_mut_payload(&mut self) -> &mut P { + pub fn mut_payload(&mut self) -> &mut P { unsafe { &mut *self.payload } } @@ -243,12 +237,11 @@ mod tests { #[test] fn test_sequential_groups() { let barrier: WriteBarrier<(), u32> = Default::default(); - let mut payload = (); let mut leaders = 0; let mut processed_writers = 0; for _ in 0..4 { - let mut writer = Writer::new(&mut payload, false); + let mut writer = Writer::new(&mut (), false); { let mut wg = barrier.enter(&mut writer).unwrap(); leaders += 1; @@ -306,7 +299,8 @@ mod tests { leader_enter_tx.send(()).unwrap(); let mut n = 0; for w in wg.iter_mut() { - w.set_output(*w.get_payload()); + let p = *w.mut_payload(); + w.set_output(p); n += 1; } assert_eq!(n, 1); @@ -340,7 +334,8 @@ mod tests { leader_enter_tx_clone.send(()).unwrap(); let mut idx = 0; for w in wg.iter_mut() { - w.set_output(*w.get_payload()); + let p = *w.mut_payload(); + w.set_output(p); idx += 1; } assert_eq!(idx, n as u32); diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 2a578af8..e973c609 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -8,10 +8,10 @@ edition = "2018" clap = { version = "3.1", features = ["derive", "cargo"] } const_format = "0.2.13" hdrhistogram = "7.4" +num-traits = "0.2" parking_lot_core = "0.9" raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } raft-engine = { path = "..", features = ["internals"] } rand = "0.8" rand_distr = "0.4" -serde_json = "1.0" statistical = "1.0.0" diff --git a/stress/src/main.rs b/stress/src/main.rs index 2adae063..f5086f4a 100644 --- a/stress/src/main.rs +++ b/stress/src/main.rs @@ -9,9 +9,9 @@ use std::thread::{sleep, Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; use clap::{crate_authors, crate_version, Parser}; - use const_format::formatcp; use hdrhistogram::Histogram; +use num_traits::FromPrimitive; use parking_lot_core::SpinWait; use raft::eraftpb::Entry; use raft_engine::internals::{EventListener, FileBlockHandle}; @@ -236,7 +236,7 @@ struct ControlOpt { default_value = "1", help = "Format version of log files" )] - format_version: String, + format_version: u64, #[clap( long = "enable-log-recycle", @@ -591,7 +591,7 @@ fn main() { config.batch_compression_threshold = ReadableSize::from_str(&opts.batch_compression_threshold).unwrap(); config.enable_log_recycle = opts.enable_log_recycle; - config.format_version = serde_json::from_str::(&opts.format_version).unwrap(); + config.format_version = Version::from_u64(opts.format_version).unwrap(); args.time = Duration::from_secs(opts.time); args.regions = opts.regions; args.purge_interval = Duration::from_secs(opts.purge_interval); diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index bb498fc3..93faf640 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -450,11 +450,11 @@ fn test_tail_corruption() { let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); append(&engine, rid, 1, 5, Some(&data)); drop(engine); - assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + Engine::open_with_file_system(cfg, fs.clone()).unwrap(); } // Version::V1 in header owns abnormal DataLayout. { - let _f = FailGuard::new("log_file_header::force_abnormal_data_layout", "return"); + let _f = FailGuard::new("log_file_header::too_large", "return"); let dir = tempfile::Builder::new() .prefix("test_tail_corruption_4") .tempdir() @@ -472,11 +472,11 @@ fn test_tail_corruption() { let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); append(&engine, rid, 1, 5, Some(&data)); drop(engine); - assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + Engine::open_with_file_system(cfg, fs.clone()).unwrap(); } // DataLayout in header is corrupted for Version::V2 { - let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return"); + let _f = FailGuard::new("log_file_header::too_small", "return"); let dir = tempfile::Builder::new() .prefix("test_tail_corruption_5") .tempdir() @@ -488,11 +488,11 @@ fn test_tail_corruption() { }; let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); drop(engine); - assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + Engine::open_with_file_system(cfg, fs.clone()).unwrap(); } // DataLayout in header is abnormal for Version::V2 { - let _f = FailGuard::new("log_file_header::force_abnormal_data_layout", "return"); + let _f = FailGuard::new("log_file_header::too_large", "return"); let dir = tempfile::Builder::new() .prefix("test_tail_corruption_6") .tempdir() @@ -504,11 +504,11 @@ fn test_tail_corruption() { }; let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); drop(engine); - assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + Engine::open_with_file_system(cfg, fs.clone()).unwrap(); } // DataLayout in header is corrupted for Version::V2, followed with records { - let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return"); + let _f = FailGuard::new("log_file_header::too_small", "return"); let dir = tempfile::Builder::new() .prefix("test_tail_corruption_7") .tempdir() @@ -616,7 +616,7 @@ fn test_recycle_with_stale_logbatch_at_tail() { // Force open Engine with `enable_log_recycle == true` and // `format_version == Version::V1`. let engine = { - let _f = FailGuard::new("pipe_log::version::force_enable", "return"); + let _f = FailGuard::new("pipe_log::version::force_enable_log_signing", "return"); Engine::open(cfg_err.clone()).unwrap() }; // Do not truncate the active_file when exit @@ -681,7 +681,7 @@ fn test_build_engine_with_multi_datalayout() { append(&engine, rid, 11, 20, Some(&data)); } drop(engine); - assert!(Engine::open(cfg_v2).is_ok()); + Engine::open(cfg_v2).unwrap(); } #[test] @@ -706,7 +706,7 @@ fn test_build_engine_with_datalayout_abnormal() { append(&engine, 2, 1, 11, Some(&data)); { // Set failpoint to dump content with invalid paddings into log file. - let _f1 = FailGuard::new("file_pipe_log::append::force_abnormal_paddings", "return"); + let _f1 = FailGuard::new("file_pipe_log::append::corrupted_padding", "return"); append(&engine, 3, 1, 11, Some(&data)); drop(engine); assert!(Engine::open(cfg.clone()).is_err()); @@ -720,6 +720,6 @@ fn test_build_engine_with_datalayout_abnormal() { append(&engine, rid, 1, 11, Some(&data)); } drop(engine); - assert!(Engine::open(cfg).is_ok()); + Engine::open(cfg).unwrap(); } }