Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support secondary directory for log writing. #261

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dabc636
Support secondary-dir configuration.
LykxSassinator Aug 11, 2022
3c927b2
[Enhancement] Supply the `StorageFull` judgement if the main dir was …
LykxSassinator Aug 15, 2022
221cc8f
Supply necessary corner cases when disk is full.
LykxSassinator Aug 15, 2022
b6570a7
Merge branch 'master' into multi-dir
LykxSassinator Aug 15, 2022
a97ca69
Polish codes and supply corner cases when `sub-dir` is set for use.
LykxSassinator Aug 16, 2022
4859a52
Merge branch 'master' into multi-dir
LykxSassinator Aug 16, 2022
3da8d13
Supply basic uts for StorageInfo to improve the codepath coverage ratio.
LykxSassinator Aug 16, 2022
5231a07
Supply examples for setting secondary-dir with uts in config.rs.
LykxSassinator Aug 17, 2022
2278f0e
Polish the design of StorageInfo to make the whole structure on secon…
LykxSassinator Aug 17, 2022
d424d76
Refine the codes.
LykxSassinator Aug 17, 2022
5704eb5
Refine the retry strategy in Engine::write(...).
LykxSassinator Aug 17, 2022
35fc5b5
Refine annotations.
LykxSassinator Aug 19, 2022
30e0802
Rename `sub-dir` with `secondary-dir` and polish annotations in `scan`.
LykxSassinator Aug 22, 2022
d769480
Merge branch 'master' into multi-dir
LykxSassinator Aug 24, 2022
a92873e
[Refinement]Refine the `purge` progress.
LykxSassinator Aug 29, 2022
703892f
[Bugfix]Fix recursive loop when trying to rewrite `LogBatch` by `Engi…
LykxSassinator Aug 29, 2022
34307bb
Refine the processing when `force_rotate == true`
LykxSassinator Aug 30, 2022
dcd0e24
Merge branch 'master' into multi-dir
LykxSassinator Aug 31, 2022
a0c977b
Bugfix for rewriting a LogBatch with V2 and tidy the implementation i…
LykxSassinator Sep 1, 2022
acbe904
Merge branch 'multi-dir' of github.com:LykxSassinator/raft-engine int…
LykxSassinator Sep 1, 2022
193d34f
Refactor the `rewrite` progress when `write` meets `NOSPC` error.
LykxSassinator Sep 1, 2022
f306981
Clean unnecessary testing dirs in uts.
LykxSassinator Sep 1, 2022
518621d
Merge branch 'master' into multi-dir
LykxSassinator Sep 6, 2022
00b7059
Tidy style of annotations.
LykxSassinator Sep 6, 2022
6607662
Merge branch 'master' into multi-dir
LykxSassinator Sep 8, 2022
0a0b705
Refine the code style, and refine the strategy for `write` LogBatchs.
LykxSassinator Sep 8, 2022
f5df93e
Merge branch 'master' into multi-dir
LykxSassinator Sep 15, 2022
6e67a29
Refine code styls of `scan` in pipe_build.rs
LykxSassinator Sep 15, 2022
966656f
Polish annotations.
LykxSassinator Sep 15, 2022
7e65af6
Merge branch 'master' into multi-dir
LykxSassinator Sep 22, 2022
6a276e5
Polish the implementation while doing recovery by scanning.
LykxSassinator Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use log::warn;
use serde::{Deserialize, Serialize};

use crate::pipe_log::Version;
use crate::util::dev_ext::{on_same_dev, validate_dir};
use crate::{util::ReadableSize, Result};

const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
Expand Down Expand Up @@ -31,6 +32,15 @@ pub struct Config {
/// Default: ""
pub dir: String,

/// Secondary directory to store log files. Will create on startup if
/// set and not exists.
///
/// Newly logs will be put into this dir when the main `dir` is full
/// or not accessible.
///
/// Default: ""
pub secondary_dir: Option<String>,

/// How to deal with file corruption during recovery.
///
/// Default: "tolerate-tail-corruption".
Expand Down Expand Up @@ -98,6 +108,7 @@ impl Default for Config {
#[allow(unused_mut)]
let mut cfg = Config {
dir: "".to_owned(),
secondary_dir: None,
recovery_mode: RecoveryMode::TolerateTailCorruption,
recovery_read_block_size: ReadableSize::kb(16),
recovery_threads: 4,
Expand Down Expand Up @@ -160,6 +171,26 @@ impl Config {
if self.memory_limit.is_some() {
warn!("memory-limit will be ignored because swap feature is disabled");
}
// Validate `dir`.
if let Err(e) = validate_dir(&self.dir) {
return Err(box_err!(
"dir ({}) is invalid, err: {}, please check it again",
self.dir,
e
));
}
// Validate `secondary-dir`.
if_chain::if_chain! {
if let Some(secondary_dir) = self.secondary_dir.as_ref();
if validate_dir(secondary_dir).is_ok();
if let Ok(true) = on_same_dev(&self.dir, secondary_dir);
then {
warn!(
"secondary-dir ({}) and dir ({}) are on same device, recommend setting it to another device",
secondary_dir, self.dir
);
}
}
Ok(())
}

Expand Down Expand Up @@ -187,6 +218,11 @@ impl Config {
mod tests {
use super::*;

fn remove_dir(dir: &str) -> Result<()> {
std::fs::remove_dir_all(dir)?;
Ok(())
}

#[test]
fn test_serde() {
let value = Config::default();
Expand Down Expand Up @@ -219,13 +255,15 @@ mod tests {
#[test]
fn test_invalid() {
let hard_error = r#"
dir = "./"
target-file-size = "5MB"
purge-threshold = "3MB"
"#;
let mut hard_load: Config = toml::from_str(hard_error).unwrap();
assert!(hard_load.sanitize().is_err());

let soft_error = r#"
dir = "./"
recovery-read-block-size = "1KB"
recovery-threads = 0
target-file-size = "5000MB"
Expand All @@ -245,6 +283,7 @@ mod tests {
assert!(soft_sanitized.enable_log_recycle);

let recycle_error = r#"
dir = "./"
enable-log-recycle = true
format-version = 1
"#;
Expand All @@ -256,6 +295,7 @@ mod tests {
fn test_backward_compactibility() {
// Upgrade from older version.
let old = r#"
dir = "./"
recovery-mode = "tolerate-corrupted-tail-records"
"#;
let mut load: Config = toml::from_str(old).unwrap();
Expand All @@ -265,4 +305,36 @@ mod tests {
.unwrap()
.contains("tolerate-corrupted-tail-records"));
}

#[test]
fn test_validate_dir_setting() {
{
let dir_list = r#""#;
let mut load: Config = toml::from_str(dir_list).unwrap();
assert!(load.sanitize().is_err());
}
{
// Set the sub-dir same with main dir
let dir_list = r#"
dir = "./test_validate_dir_setting/"
secondary-dir = "./test_validate_dir_setting/"
"#;
let mut load: Config = toml::from_str(dir_list).unwrap();
load.sanitize().unwrap();
assert!(load.secondary_dir.is_some());
assert!(remove_dir(&load.dir).is_ok());
}
{
// Set the sub-dir with `"..."`
let dir_list = r#"
dir = "./test_validate_dir_setting"
secondary-dir = "./test_validate_dir_setting_secondary"
"#;
let mut load: Config = toml::from_str(dir_list).unwrap();
load.sanitize().unwrap();
assert!(load.secondary_dir.is_some());
assert!(remove_dir(&load.dir).is_ok());
assert!(remove_dir(&load.secondary_dir.unwrap()).is_ok());
}
}
}
63 changes: 57 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{mpsc, Arc, Mutex};
use std::thread::{Builder as ThreadBuilder, JoinHandle};
use std::time::{Duration, Instant};

use log::{error, info};
use log::{error, info, warn};
use protobuf::{parse_from_bytes, Message};

use crate::config::{Config, RecoveryMode};
Expand All @@ -25,6 +25,8 @@ use crate::write_barrier::{WriteBarrier, Writer};
use crate::{perf_context, Error, GlobalStats, Result};

const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
/// Max retry count for `write`.
const MAX_WRITE_RETRY_COUNT: u64 = 2;

pub struct Engine<F = DefaultFileSystem, P = FilePipeLog<F>>
where
Expand Down Expand Up @@ -142,7 +144,16 @@ where
let start = Instant::now();
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
debug_assert!(len > 0);
let block_handle = {

let mut attempt_count = 0_u64;
// Flag on whether force to rotate the current active file or not.
let mut force_rotate = false;
let block_handle = loop {
// Max retry count is limited to `2`. If the first `append` retry because of
// `NOSPC` error, the next `append` should success, unless there exists
// several abnormal cases in the IO device. In that case, `Engine::write`
// must return `Err`.
attempt_count += 1;
let mut writer = Writer::new(log_batch, sync);
// Snapshot and clear the current perf context temporarily, so the write group
// leader will collect the perf context diff later.
Expand All @@ -154,14 +165,34 @@ where
for writer in group.iter_mut() {
writer.entered_time = Some(now);
sync |= writer.sync;

let log_batch = writer.mut_payload();
let res = self.pipe_log.append(LogQueue::Append, log_batch);
let res = self
.pipe_log
.append(LogQueue::Append, log_batch, force_rotate);
// If we found that there is no spare space for the next LogBatch in the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what I meant by "rotate inside pipe_log". The decision to rotate should be put inside pipe_log.

fn append() {
  let mut writer = self.writer.lock();
  let r = writer.write(bytes);
  if r.errorno == NOSPC {
    self.rotate_imp(&mut writer)?;
    return Err(TryAgain);
  }
  r
}

// current active file, we will mark the `force_rotate` with `true` to
// notify the leader do `rotate` immediately.
if let Err(Error::Other(e)) = res {
warn!(
"Cannot append, err: {}, try to re-append this log_batch into next log",
e
);
// Notify the next `append` to rotate current file.
force_rotate = true;
writer.set_output(Err(Error::Other(box_err!(
"Failed to append logbatch, try to dump it to other dir"
))));
continue;
}
force_rotate = false;
writer.set_output(res);
}
perf_context!(log_write_duration).observe_since(now);
if sync {
// As per trait protocol, this error should be retriable. But we panic anyway to
// save the trouble of propagating it to other group members.
// As per trait protocol, this error should be retriable. But we panic
// anyway to save the trouble of propagating it to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the comment line break is changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tackled in the new pr.

// other group members.
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
}
// Pass the perf context diff to all the writers.
Expand All @@ -178,7 +209,27 @@ where
debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO);
perf_context += &writer.perf_context_diff;
set_perf_context(perf_context);
writer.finish()?
// Retry if `writer.finish()` returns a special 'Error::Other', remarking that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add another error type TryAgain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// there still exists free space for this `LogBatch`.
match writer.finish() {
Ok(handle) => {
break handle;
}
Err(Error::Other(_)) => {
// A special err, we will retry this LogBatch `append` by appending
// this writer to the next write group, and the current write leader
// will not hang on this write and will return timely.
if attempt_count >= MAX_WRITE_RETRY_COUNT {
return Err(Error::Other(box_err!(
"Failed to write logbatch, exceed max_retry_count: ({})",
MAX_WRITE_RETRY_COUNT
)));
}
}
Err(e) => {
return Err(e);
}
}
};
let mut now = Instant::now();
log_batch.finish_write(block_handle);
Expand Down
14 changes: 13 additions & 1 deletion src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,26 @@ impl LogFd {
let bytes = match pwrite(self.0, &content[written..], offset as i64) {
Ok(bytes) => bytes,
Err(e) if e == Errno::EINTR => continue,
Err(e) => return Err(from_nix_error(e, "pwrite")),
Err(e) => {
return {
if e == Errno::ENOSPC {
// no space left
Err(from_nix_error(e, "nospace"))
} else {
Err(from_nix_error(e, "pwrite"))
}
};
}
};
if bytes == 0 {
break;
}
written += bytes;
offset += bytes;
}
fail_point!("log_fd::write::no_space_err", |_| {
Err(from_nix_error(nix::Error::ENOSPC, "nospace"))
});
fail_point!("log_fd::write::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
Expand Down
Loading