Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/tikv/raft-engine into cle…
Browse files Browse the repository at this point in the history
…anup-220715

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie committed Aug 2, 2022
2 parents 9ec5a77 + c3a6156 commit 7ab1af2
Show file tree
Hide file tree
Showing 16 changed files with 827 additions and 138 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fail = "0.5"
fs2 = "0.4"
hashbrown = "0.12"
hex = "0.4"
if_chain = "1.0"
lazy_static = "1.3"
libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Config {
pub recovery_mode: RecoveryMode,
/// Minimum I/O size for reading log files during recovery.
///
/// Default: "4KB". Minimum: "512B".
/// Default: "16KB". Minimum: "512B".
pub recovery_read_block_size: ReadableSize,
/// The number of threads used to scan and recovery log files.
///
Expand Down
6 changes: 5 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ where
script: String,
file_system: Arc<F>,
) -> Result<()> {
use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};
use crate::file_pipe_log::{LogFileFormat, RecoveryConfig, ReplayMachine};
use crate::pipe_log::DataLayout;

if !path.exists() {
return Err(Error::InvalidArgument(format!(
Expand All @@ -462,6 +463,7 @@ where
..Default::default()
};
let recovery_mode = cfg.recovery_mode;
let file_format = LogFileFormat::new(cfg.format_version, DataLayout::NoAlignment);
let read_block_size = cfg.recovery_read_block_size.0;
let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new());
builder.scan()?;
Expand All @@ -473,6 +475,7 @@ where
RecoveryConfig {
queue: LogQueue::Append,
mode: recovery_mode,
file_format,
concurrency: 1,
read_block_size,
},
Expand All @@ -485,6 +488,7 @@ where
RecoveryConfig {
queue: LogQueue::Rewrite,
mode: recovery_mode,
file_format,
concurrency: 1,
read_block_size,
},
Expand Down
254 changes: 233 additions & 21 deletions src/file_pipe_log/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf};
use num_traits::{FromPrimitive, ToPrimitive};

use crate::codec::{self, NumberEncoder};
use crate::pipe_log::{FileId, LogQueue, Version};
use crate::pipe_log::{DataLayout, FileId, LogQueue, Version};
use crate::{Error, Result};

/// Width to format log sequence number.
Expand All @@ -20,6 +20,20 @@ const LOG_REWRITE_SUFFIX: &str = ".rewrite";
/// File header.
const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8E84916EB0ED5";

/// Check whether the given `buf` is a valid padding or not.
///
/// To simplify the checking strategy, we just check the first
/// and last byte in the `buf`.
///
/// In most common cases, the paddings will be filled with `0`,
/// and several corner cases, where there exists corrupted blocks
/// in the disk, might pass through this rule, but will failed in
/// followed processing. So, we can just keep it simplistic.
#[inline]
pub(crate) fn is_valid_paddings(buf: &[u8]) -> bool {
buf.is_empty() || (buf[0] == 0 && buf[buf.len() - 1] == 0)
}

/// `FileNameExt` offers file name formatting extensions to [`FileId`].
pub trait FileNameExt: Sized {
fn parse_file_name(file_name: &str) -> Option<Self>;
Expand Down Expand Up @@ -79,28 +93,66 @@ pub(super) fn lock_file_path<P: AsRef<Path>>(dir: P) -> PathBuf {
}

/// In-memory representation of `Format` in log files.
#[derive(Clone, Default)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct LogFileFormat {
version: Version,
data_layout: DataLayout,
}

impl Default for LogFileFormat {
fn default() -> Self {
Self {
version: Version::default(),
data_layout: DataLayout::NoAlignment,
}
}
}

impl LogFileFormat {
pub fn new(version: Version, data_layout: DataLayout) -> Self {
Self {
version,
data_layout,
}
}

/// Length of whole `LogFileFormat` written on storage.
pub fn enc_len(&self) -> usize {
Self::header_len() + Self::payload_len(self.version)
}

/// Length of header written on storage.
pub const fn len() -> usize {
pub const fn header_len() -> usize {
LOG_FILE_MAGIC_HEADER.len() + std::mem::size_of::<Version>()
}

/// Length of serialized `DataLayout` written on storage.
pub const fn payload_len(version: Version) -> usize {
match version {
Version::V1 => 0,
Version::V2 => DataLayout::len(),
}
}

pub fn from_version(version: Version) -> Self {
Self { version }
Self {
version,
data_layout: DataLayout::NoAlignment,
}
}

pub fn version(&self) -> Version {
self.version
}

pub fn data_layout(&self) -> DataLayout {
self.data_layout
}

/// Decodes a slice of bytes into a `LogFileFormat`.
pub fn decode(buf: &mut &[u8]) -> Result<LogFileFormat> {
if buf.len() < Self::len() {
let buf_len = buf.len();
if buf_len < Self::header_len() {
return Err(Error::Corruption("log file header too short".to_owned()));
}
if !buf.starts_with(LOG_FILE_MAGIC_HEADER) {
Expand All @@ -109,36 +161,122 @@ impl LogFileFormat {
));
}
buf.consume(LOG_FILE_MAGIC_HEADER.len());
let v = codec::decode_u64(buf)?;
if let Some(version) = Version::from_u64(v) {
Ok(Self { version })
} else {
Err(Error::Corruption(format!(
"unrecognized log file version: {}",
// Parse `Version` of LogFileFormat from header of the file.
let version = {
let dec_version = codec::decode_u64(buf)?;
if let Some(v) = Version::from_u64(dec_version) {
v
)))
} else {
return Err(Error::Corruption(format!(
"unrecognized log file version: {}",
dec_version
)));
}
};
// Parse `DataLayout` of LogFileFormat from header of the file.
let payload_len = Self::payload_len(version);
if payload_len == 0 {
// No alignment.
return Ok(Self {
version,
data_layout: DataLayout::NoAlignment,
});
}
if_chain::if_chain! {
if payload_len > 0;
if buf_len >= Self::header_len() + payload_len;
if let Ok(layout_block_size) = codec::decode_u64(buf);
then {
// If the decoded `payload_len > 0`, serialized data_layout
// should be extracted from the file.
Ok(Self {
version,
data_layout: if layout_block_size == 0 {
DataLayout::NoAlignment
} else {
DataLayout::Alignment(layout_block_size)
},
})
} else {
// Here, we mark this special err, that is, corrupted `payload`,
// with InvalidArgument.
Err(Error::InvalidArgument(format!(
"invalid dataload in the header, len: {}, expected len: {}",
buf_len - Self::header_len(), Self::payload_len(version)
)))
}
}
}

/// Encodes this header and appends the bytes to the provided buffer.
pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
buf.encode_u64(self.version.to_u64().unwrap())?;
let corrupted = || {
fail::fail_point!("log_file_header::corrupted", |_| true);
false
};
if corrupted() {
buf[0] += 1;
if Self::payload_len(self.version) > 0 {
buf.encode_u64(self.data_layout.to_u64())?;
}
#[cfg(feature = "failpoints")]
{
// Set header corrupted.
let corrupted = || {
fail::fail_point!("log_file_header::corrupted", |_| true);
false
};
// Set abnormal DataLayout.
let force_abnormal_data_layout = || {
fail::fail_point!("log_file_header::force_abnormal_data_layout", |_| true);
false
};
// Set corrupted DataLayout for `payload`.
let corrupted_data_layout = || {
fail::fail_point!("log_file_header::corrupted_data_layout", |_| true);
false
};
if corrupted() {
buf[0] += 1;
}
if force_abnormal_data_layout() {
buf.encode_u64(0_u64)?;
}
if corrupted_data_layout() {
buf.pop();
}
}
Ok(())
}

/// Return the aligned block size.
#[inline]
pub fn get_aligned_block_size(&self) -> usize {
self.data_layout.to_u64() as usize
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::pipe_log::LogFileContext;
use crate::test_util::catch_unwind_silent;

#[test]
fn test_check_paddings_is_valid() {
// normal buffer
let mut buf = vec![0; 128];
// len < 8
assert!(is_valid_paddings(&buf[0..6]));
// len == 8
assert!(is_valid_paddings(&buf[120..]));
// len > 8
assert!(is_valid_paddings(&buf[..]));

// abnormal buffer
buf[127] = 3_u8;
assert!(is_valid_paddings(&buf[0..110]));
assert!(is_valid_paddings(&buf[120..125]));
assert!(!is_valid_paddings(&buf[124..128]));
assert!(!is_valid_paddings(&buf[120..]));
assert!(!is_valid_paddings(&buf[..]));
}

#[test]
fn test_file_name() {
Expand Down Expand Up @@ -172,23 +310,97 @@ mod tests {
assert_eq!(version, version2);
}

#[test]
fn test_data_layout() {
assert_eq!(DataLayout::NoAlignment.to_u64(), 0);
assert_eq!(DataLayout::Alignment(16).to_u64(), 16);
assert_eq!(DataLayout::from_u64(0), DataLayout::NoAlignment);
assert_eq!(DataLayout::from_u64(4096), DataLayout::Alignment(4096));
assert_eq!(DataLayout::len(), 8);
}

#[test]
fn test_file_header() {
let header1 = LogFileFormat::default();
assert_eq!(header1.version().to_u64().unwrap(), 1);
assert_eq!(header1.data_layout().to_u64(), 0);
let header2 = LogFileFormat::from_version(Version::default());
assert_eq!(header2.version().to_u64(), header1.version().to_u64());
let header3 = LogFileFormat::from_version(Version::default());
assert_eq!(header1.data_layout().to_u64(), 0);
let header3 = LogFileFormat::from_version(header1.version());
assert_eq!(header3.version(), header1.version());
assert_eq!(header1.data_layout().to_u64(), 0);
assert_eq!(header1.enc_len(), LogFileFormat::header_len());
assert_eq!(header2.enc_len(), LogFileFormat::header_len());
assert_eq!(header3.enc_len(), LogFileFormat::header_len());
let header4 = LogFileFormat {
version: Version::V2,
data_layout: DataLayout::Alignment(16),
};
assert_eq!(
header4.enc_len(),
LogFileFormat::header_len() + LogFileFormat::payload_len(header4.version)
);
}

#[test]
fn test_encoding_decoding_file_format() {
fn enc_dec_file_format(file_format: LogFileFormat) -> Result<LogFileFormat> {
let mut buf = Vec::with_capacity(
LogFileFormat::header_len() + LogFileFormat::payload_len(file_format.version),
);
assert!(file_format.encode(&mut buf).is_ok());
LogFileFormat::decode(&mut &buf[..])
}
// header with aligned-sized data_layout
{
let mut buf = Vec::with_capacity(LogFileFormat::header_len());
let version = Version::V2;
let data_layout = DataLayout::Alignment(4096);
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
assert!(buf.encode_u64(version.to_u64().unwrap()).is_ok());
assert!(buf.encode_u64(data_layout.to_u64()).is_ok());
assert_eq!(
LogFileFormat::decode(&mut &buf[..]).unwrap(),
LogFileFormat::new(version, data_layout)
);
}
// header with abnormal version
{
let mut buf = Vec::with_capacity(LogFileFormat::header_len());
let abnormal_version = 4_u64; /* abnormal version */
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
assert!(buf.encode_u64(abnormal_version).is_ok());
assert!(buf.encode_u64(16).is_ok());
assert!(LogFileFormat::decode(&mut &buf[..]).is_err());
}
// header with Version::default and DataLayout::Alignment(_)
{
let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(0));
assert_eq!(
LogFileFormat::new(Version::default(), DataLayout::NoAlignment),
enc_dec_file_format(file_format).unwrap()
);
let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(4096));
assert_eq!(
LogFileFormat::new(Version::default(), DataLayout::NoAlignment),
enc_dec_file_format(file_format).unwrap()
);
}
// header with Version::V2 and DataLayout::Alignment(0)
{
let file_format = LogFileFormat::new(Version::V2, DataLayout::Alignment(0));
catch_unwind_silent(|| enc_dec_file_format(file_format)).unwrap_err();
}
}

#[test]
fn test_file_context() {
let mut file_context =
LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default());
assert_eq!(file_context.get_signature(), None);
file_context.id.seq = 10;
file_context.version = Version::V2;
file_context.format.version = Version::V2;
assert_eq!(file_context.get_signature().unwrap(), 10);
let abnormal_seq = (file_context.id.seq << 32) as u64 + 100_u64;
file_context.id.seq = abnormal_seq;
Expand Down
Loading

0 comments on commit 7ab1af2

Please sign in to comment.