Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading block based log format #249

Merged
merged 25 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
44abe11
[New feature]Build basics for DIO.
LykxSassinator Jul 20, 2022
c27d043
Supply extra supports for building the basic structure for different …
LykxSassinator Jul 21, 2022
53b363a
Supply the reading strategy when `cfg.format_data_layout == Alignment`.
LykxSassinator Jul 21, 2022
4ccf139
Supply `alignment-mode` flag in `stress` tool for supporting setting …
LykxSassinator Jul 21, 2022
5e15c90
Code-style cleanning.
LykxSassinator Jul 21, 2022
884826c
Refactor the DataLayout for compatibilities to the future fragmented …
LykxSassinator Jul 22, 2022
a1be483
Supply abnormal testcases.
LykxSassinator Jul 22, 2022
d095195
Bugfixes for uts in format.rs.
LykxSassinator Jul 22, 2022
94232a8
Bugfix for the checking in `LogFileReader::read()`.
LykxSassinator Jul 22, 2022
8e075ba
Refactor the DataLayout and remove over-designed parts of codes.
LykxSassinator Jul 22, 2022
2337263
Add extra annotations and refinement to `next()` in `reader.rs`.
LykxSassinator Jul 22, 2022
3855fee
Bugfix on strategy for aligned writting and reading.
LykxSassinator Jul 25, 2022
07405da
Fix code-style errs.
LykxSassinator Jul 25, 2022
19b50cd
Bugfix for reading tail paddings when DataLayout == Alignment.
LykxSassinator Jul 25, 2022
bf75542
Merge branch 'tikv:master' into basics_for_dio
LykxSassinator Jul 26, 2022
0cf8875
Serialized the DataLayout in u64 and dump it into header of log files.
LykxSassinator Jul 26, 2022
a218c73
Supply etra and necessary fail_points to make production code path sa…
LykxSassinator Jul 27, 2022
8aa79a0
Fix the bug while parsing the header of log file and make code-style …
LykxSassinator Jul 28, 2022
248a3cc
Refine the code style.
LykxSassinator Jul 28, 2022
7c5893c
Bugfix when read corrupted header with Version::V1.
LykxSassinator Jul 28, 2022
e430d1f
Refine the info of Error::InvalidArguement in format.rs when parse an…
LykxSassinator Jul 29, 2022
35410d6
Supply extra corner cases for testing, especially the case that recov…
LykxSassinator Jul 29, 2022
3b251ec
Refine the code-style and design of uts.
LykxSassinator Aug 1, 2022
c867868
Bugfix for doctest.
LykxSassinator Aug 1, 2022
4287b8d
Refine the strategy for tolerating tail corruptions.
LykxSassinator Aug 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fail = "0.5"
fs2 = "0.4"
hashbrown = "0.12"
hex = "0.4"
if_chain = "1.0"
lazy_static = "1.3"
libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Config {
pub recovery_mode: RecoveryMode,
/// Minimum I/O size for reading log files during recovery.
///
/// Default: "4KB". Minimum: "512B".
/// Default: "16KB". Minimum: "512B".
tabokie marked this conversation as resolved.
Show resolved Hide resolved
pub recovery_read_block_size: ReadableSize,
/// The number of threads used to scan and recovery log files.
///
Expand Down
6 changes: 5 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ where
script: String,
file_system: Arc<F>,
) -> Result<()> {
use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};
use crate::file_pipe_log::{LogFileFormat, RecoveryConfig, ReplayMachine};
use crate::pipe_log::DataLayout;

if !path.exists() {
return Err(Error::InvalidArgument(format!(
Expand All @@ -458,6 +459,7 @@ where
..Default::default()
};
let recovery_mode = cfg.recovery_mode;
let file_format = LogFileFormat::new(cfg.format_version, DataLayout::NoAlignment);
let read_block_size = cfg.recovery_read_block_size.0;
let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new());
builder.scan()?;
Expand All @@ -469,6 +471,7 @@ where
RecoveryConfig {
queue: LogQueue::Append,
mode: recovery_mode,
file_format,
concurrency: 1,
read_block_size,
},
Expand All @@ -481,6 +484,7 @@ where
RecoveryConfig {
queue: LogQueue::Rewrite,
mode: recovery_mode,
file_format,
concurrency: 1,
read_block_size,
},
Expand Down
254 changes: 233 additions & 21 deletions src/file_pipe_log/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf};
use num_traits::{FromPrimitive, ToPrimitive};

use crate::codec::{self, NumberEncoder};
use crate::pipe_log::{FileId, LogQueue, Version};
use crate::pipe_log::{DataLayout, FileId, LogQueue, Version};
use crate::{Error, Result};

/// Width to format log sequence number.
Expand All @@ -20,6 +20,20 @@ const LOG_REWRITE_SUFFIX: &str = ".rewrite";
/// File header.
const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8E84916EB0ED5";

/// Check whether the given `buf` is a valid padding or not.
///
/// To simplify the checking strategy, we just check the first
/// and last byte in the `buf`.
///
/// In most common cases, the paddings will be filled with `0`,
/// and several corner cases, where there exists corrupted blocks
/// in the disk, might pass through this rule, but will failed in
/// followed processing. So, we can just keep it simplistic.
#[inline]
pub(crate) fn is_valid_paddings(buf: &[u8]) -> bool {
buf.is_empty() || (buf[0] == 0 && buf[buf.len() - 1] == 0)
}

/// `FileNameExt` offers file name formatting extensions to [`FileId`].
pub trait FileNameExt: Sized {
fn parse_file_name(file_name: &str) -> Option<Self>;
Expand Down Expand Up @@ -79,28 +93,66 @@ pub(super) fn lock_file_path<P: AsRef<Path>>(dir: P) -> PathBuf {
}

/// In-memory representation of `Format` in log files.
#[derive(Clone, Default)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct LogFileFormat {
version: Version,
data_layout: DataLayout,
}

impl Default for LogFileFormat {
fn default() -> Self {
Self {
version: Version::default(),
data_layout: DataLayout::NoAlignment,
}
}
}

impl LogFileFormat {
pub fn new(version: Version, data_layout: DataLayout) -> Self {
Self {
version,
data_layout,
}
}

/// Length of whole `LogFileFormat` written on storage.
pub fn enc_len(&self) -> usize {
Self::header_len() + Self::payload_len(self.version)
}

/// Length of header written on storage.
pub const fn len() -> usize {
pub const fn header_len() -> usize {
LOG_FILE_MAGIC_HEADER.len() + std::mem::size_of::<Version>()
}

/// Length of serialized `DataLayout` written on storage.
pub const fn payload_len(version: Version) -> usize {
match version {
Version::V1 => 0,
Version::V2 => DataLayout::len(),
}
}

pub fn from_version(version: Version) -> Self {
Self { version }
Self {
version,
data_layout: DataLayout::NoAlignment,
}
}

pub fn version(&self) -> Version {
self.version
}

pub fn data_layout(&self) -> DataLayout {
self.data_layout
}

/// Decodes a slice of bytes into a `LogFileFormat`.
pub fn decode(buf: &mut &[u8]) -> Result<LogFileFormat> {
if buf.len() < Self::len() {
let buf_len = buf.len();
if buf_len < Self::header_len() {
return Err(Error::Corruption("log file header too short".to_owned()));
}
if !buf.starts_with(LOG_FILE_MAGIC_HEADER) {
Expand All @@ -109,36 +161,122 @@ impl LogFileFormat {
));
}
buf.consume(LOG_FILE_MAGIC_HEADER.len());
let v = codec::decode_u64(buf)?;
if let Some(version) = Version::from_u64(v) {
Ok(Self { version })
} else {
Err(Error::Corruption(format!(
"unrecognized log file version: {}",
// Parse `Version` of LogFileFormat from header of the file.
let version = {
let dec_version = codec::decode_u64(buf)?;
if let Some(v) = Version::from_u64(dec_version) {
v
)))
} else {
return Err(Error::Corruption(format!(
"unrecognized log file version: {}",
dec_version
)));
}
};
// Parse `DataLayout` of LogFileFormat from header of the file.
let payload_len = Self::payload_len(version);
if payload_len == 0 {
// No alignment.
return Ok(Self {
version,
data_layout: DataLayout::NoAlignment,
});
}
if_chain::if_chain! {
if payload_len > 0;
if buf_len >= Self::header_len() + payload_len;
if let Ok(layout_block_size) = codec::decode_u64(buf);
then {
// If the decoded `payload_len > 0`, serialized data_layout
// should be extracted from the file.
Ok(Self {
version,
data_layout: if layout_block_size == 0 {
DataLayout::NoAlignment
} else {
DataLayout::Alignment(layout_block_size)
},
})
} else {
// Here, we mark this special err, that is, corrupted `payload`,
// with InvalidArgument.
Err(Error::InvalidArgument(format!(
"invalid dataload in the header, len: {}, expected len: {}",
buf_len - Self::header_len(), Self::payload_len(version)
)))
}
}
}

/// Encodes this header and appends the bytes to the provided buffer.
pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
buf.encode_u64(self.version.to_u64().unwrap())?;
let corrupted = || {
fail::fail_point!("log_file_header::corrupted", |_| true);
false
};
if corrupted() {
buf[0] += 1;
if Self::payload_len(self.version) > 0 {
buf.encode_u64(self.data_layout.to_u64())?;
}
#[cfg(feature = "failpoints")]
{
// Set header corrupted.
let corrupted = || {
fail::fail_point!("log_file_header::corrupted", |_| true);
false
};
// Set abnormal DataLayout.
let force_abnormal_data_layout = || {
fail::fail_point!("log_file_header::force_abnormal_data_layout", |_| true);
false
};
// Set corrupted DataLayout for `payload`.
let corrupted_data_layout = || {
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
fail::fail_point!("log_file_header::corrupted_data_layout", |_| true);
false
};
if corrupted() {
buf[0] += 1;
}
if force_abnormal_data_layout() {
buf.encode_u64(0_u64)?;
}
if corrupted_data_layout() {
buf.pop();
}
}
Ok(())
}

/// Return the aligned block size.
#[inline]
pub fn get_aligned_block_size(&self) -> usize {
self.data_layout.to_u64() as usize
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::pipe_log::LogFileContext;
use crate::test_util::catch_unwind_silent;

#[test]
fn test_check_paddings_is_valid() {
// normal buffer
let mut buf = vec![0; 128];
// len < 8
assert!(is_valid_paddings(&buf[0..6]));
// len == 8
assert!(is_valid_paddings(&buf[120..]));
// len > 8
assert!(is_valid_paddings(&buf[..]));

// abnormal buffer
buf[127] = 3_u8;
assert!(is_valid_paddings(&buf[0..110]));
assert!(is_valid_paddings(&buf[120..125]));
assert!(!is_valid_paddings(&buf[124..128]));
assert!(!is_valid_paddings(&buf[120..]));
assert!(!is_valid_paddings(&buf[..]));
}

#[test]
fn test_file_name() {
Expand Down Expand Up @@ -172,23 +310,97 @@ mod tests {
assert_eq!(version, version2);
}

#[test]
fn test_data_layout() {
assert_eq!(DataLayout::NoAlignment.to_u64(), 0);
assert_eq!(DataLayout::Alignment(16).to_u64(), 16);
assert_eq!(DataLayout::from_u64(0), DataLayout::NoAlignment);
assert_eq!(DataLayout::from_u64(4096), DataLayout::Alignment(4096));
assert_eq!(DataLayout::len(), 8);
}

#[test]
fn test_file_header() {
let header1 = LogFileFormat::default();
assert_eq!(header1.version().to_u64().unwrap(), 1);
assert_eq!(header1.data_layout().to_u64(), 0);
let header2 = LogFileFormat::from_version(Version::default());
assert_eq!(header2.version().to_u64(), header1.version().to_u64());
let header3 = LogFileFormat::from_version(Version::default());
assert_eq!(header1.data_layout().to_u64(), 0);
let header3 = LogFileFormat::from_version(header1.version());
assert_eq!(header3.version(), header1.version());
assert_eq!(header1.data_layout().to_u64(), 0);
assert_eq!(header1.enc_len(), LogFileFormat::header_len());
assert_eq!(header2.enc_len(), LogFileFormat::header_len());
assert_eq!(header3.enc_len(), LogFileFormat::header_len());
let header4 = LogFileFormat {
version: Version::V2,
data_layout: DataLayout::Alignment(16),
};
assert_eq!(
header4.enc_len(),
LogFileFormat::header_len() + LogFileFormat::payload_len(header4.version)
);
}

#[test]
fn test_encoding_decoding_file_format() {
fn enc_dec_file_format(file_format: LogFileFormat) -> Result<LogFileFormat> {
let mut buf = Vec::with_capacity(
LogFileFormat::header_len() + LogFileFormat::payload_len(file_format.version),
);
assert!(file_format.encode(&mut buf).is_ok());
LogFileFormat::decode(&mut &buf[..])
}
// header with aligned-sized data_layout
{
let mut buf = Vec::with_capacity(LogFileFormat::header_len());
let version = Version::V2;
let data_layout = DataLayout::Alignment(4096);
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
assert!(buf.encode_u64(version.to_u64().unwrap()).is_ok());
assert!(buf.encode_u64(data_layout.to_u64()).is_ok());
assert_eq!(
LogFileFormat::decode(&mut &buf[..]).unwrap(),
LogFileFormat::new(version, data_layout)
);
}
// header with abnormal version
{
let mut buf = Vec::with_capacity(LogFileFormat::header_len());
let abnormal_version = 4_u64; /* abnormal version */
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
assert!(buf.encode_u64(abnormal_version).is_ok());
assert!(buf.encode_u64(16).is_ok());
assert!(LogFileFormat::decode(&mut &buf[..]).is_err());
}
// header with Version::default and DataLayout::Alignment(_)
{
let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(0));
assert_eq!(
LogFileFormat::new(Version::default(), DataLayout::NoAlignment),
enc_dec_file_format(file_format).unwrap()
);
let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(4096));
assert_eq!(
LogFileFormat::new(Version::default(), DataLayout::NoAlignment),
enc_dec_file_format(file_format).unwrap()
);
}
// header with Version::V2 and DataLayout::Alignment(0)
{
let file_format = LogFileFormat::new(Version::V2, DataLayout::Alignment(0));
catch_unwind_silent(|| enc_dec_file_format(file_format)).unwrap_err();
}
}

#[test]
fn test_file_context() {
let mut file_context =
LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default());
assert_eq!(file_context.get_signature(), None);
file_context.id.seq = 10;
file_context.version = Version::V2;
file_context.format.version = Version::V2;
assert_eq!(file_context.get_signature().unwrap(), 10);
let abnormal_seq = (file_context.id.seq << 32) as u64 + 100_u64;
file_context.id.seq = abnormal_seq;
Expand Down
Loading