-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support secondary directory for log writing. #261
Conversation
This commit builds a prototype to support secondary dir configuration. Signed-off-by: Lucasliang <[email protected]>
…full but the secondary dir was free to flush data. Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
It's a prototype for closing #257, and all reasonable suggestions are acceptable. And please hold on, I wanna refine it and make it more coherent. |
It's a prototype for closing #257, and all reasonable suggestions are acceptable. |
Codecov ReportBase: 97.64% // Head: 97.60% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #261 +/- ##
==========================================
- Coverage 97.64% 97.60% -0.04%
==========================================
Files 30 30
Lines 10655 11250 +595
==========================================
+ Hits 10404 10981 +577
- Misses 251 269 +18
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
@tabokie Please take a review, this is a prototype for implementing |
Signed-off-by: Lucasliang <[email protected]>
Please hold on, the design of it should be refined and polished, to simplify the whole structure. |
src/engine.rs
Outdated
writer.finish()? | ||
// Retry if `writer.finish()` returns a special err, remarking there still | ||
// exists free space for this `LogBatch`. | ||
let ret = writer.finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not enough. fsync failure will panic. You should propagate the maybe_sync
error as well.
I think letting leader do all the retrying is simpler. When a write fails with NOSPC, the leader will close the current active file (it internally truncates un-sync-ed parts), and create a new file and retry the whole write group. |
…dary-dir more readable and concise. Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
I hold a different view on it. |
Signed-off-by: Lucasliang <[email protected]>
@@ -99,108 +101,201 @@ impl<F: FileSystem> DualPipesBuilder<F> { | |||
/// Scans for all log files under the working directory. The directory will | |||
/// be created if not exists. | |||
pub fn scan(&mut self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change so much in this function? Couldn't it work by just doing two fs::read_dir(path)?.for_each
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The modifications just tidy codes and split the procedure of scan
into three parts. It's not approapriate to code it with two fs::read_dir(path)?.for_each
s, as the definition of cfg.secondary-dir
is Option<String>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not change the main code, because the updated version isn't more readable IMO. You can change the original scan
to scan_dir
, inside it you insert the file handle into a vector. After two scan_dir
, you can sort and validate the vector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is still too big and I don't see a clear purpose. You can keep the scan_dir
simple and short as before, and do the file handle initialization early, then sort the FileToRecover
list afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my view, I think it's more clear than before.
scan
has been splited into the following steps:
- scanning dir to get file_seq_range of append logs and rewrite logs
scan_dir
in Main dir;scan_dir
in Secondary dir, if it had been specified bycfg.secondary-dir
;
- Check and clear stale metadata of logs
clear_stale_metadata
in Main dir;clear_stale_metadata
in Secondary dir, if it had been specified bycfg.secondary-dir
;
- Build file_list vector;
Still confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There're two things here, one is to keep diff as short as possible. I do refactor all the time, but they are often centered around a change of abstraction (i.e. interaction between different modules, usually involves changing function interface and data types). I don't often refactor the internal implementation unless it significantly improves readability.
Then let's go to the code itself, is the readability improved? IMO it is not. The LOC increased 100%. And you introduced several types for the sake of refactoring (adding glue types usually indicates a bad abstraction). In particular I don't think the min_id
/max_id
approach is suitable anymore.
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
…ne::write` Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
PTAL cc @tabokie , several modifications have been supplied according to previous suggestions. |
@@ -99,108 +101,201 @@ impl<F: FileSystem> DualPipesBuilder<F> { | |||
/// Scans for all log files under the working directory. The directory will | |||
/// be created if not exists. | |||
pub fn scan(&mut self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not change the main code, because the updated version isn't more readable IMO. You can change the original scan
to scan_dir
, inside it you insert the file handle into a vector. After two scan_dir
, you can sort and validate the vector.
…n `scan`. Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
src/log_batch.rs
Outdated
} | ||
|
||
/// Prepare the `rewrite` by reseting the `signature` in the `LogBatch`. | ||
pub(crate) fn prepare_rewrite(buf: &mut Vec<u8>, signature: Option<u32>) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second thought, let's move checksum-ing of footer to LogBatch
from LogItemBatch
. This way we don't need to create two functions in LogItemBatch
, and keep everything details together in LogBatch
.
@@ -99,108 +101,201 @@ impl<F: FileSystem> DualPipesBuilder<F> { | |||
/// Scans for all log files under the working directory. The directory will | |||
/// be created if not exists. | |||
pub fn scan(&mut self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is still too big and I don't see a clear purpose. You can keep the scan_dir
simple and short as before, and do the file handle initialization early, then sort the FileToRecover
list afterwards.
src/file_pipe_log/pipe.rs
Outdated
|
||
/// Represents the info of storage dirs, including `main dir` and | ||
/// `secondary dir`. | ||
struct StorageInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageManager, and use path_id to refer to path instead of storage_type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
path_id as in an integer. You can make the StorageManager manages a vector of directories, and you can also let it take care of directory lock as well. (Pass it to Pipe::open
).
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
Signed-off-by: Lucasliang <[email protected]>
src/engine.rs
Outdated
let res = self | ||
.pipe_log | ||
.append(LogQueue::Append, log_batch, force_rotate); | ||
// If we found that there is no spare space for the next LogBatch in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not what I meant by "rotate inside pipe_log". The decision to rotate should be put inside pipe_log.
fn append() {
let mut writer = self.writer.lock();
let r = writer.write(bytes);
if r.errorno == NOSPC {
self.rotate_imp(&mut writer)?;
return Err(TryAgain);
}
r
}
src/file_pipe_log/pipe_builder.rs
Outdated
let (min_id, max_id) = (files[0].seq, files[files.len() - 1].seq); | ||
debug_assert!(min_id > 0); | ||
let mut cleared = 0_u64; | ||
for seq in (0..min_id).rev() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your new code won't work. Assuming it fails to clean up metadata for file N, on the next startup, it will not attempt to clean up all metadata for file n<N.
fs::create_dir(dir)?; | ||
self.dir_lock = Some(lock_dir(dir)?); | ||
return Ok(()); | ||
// Scan main `dir` and `secondary-dir`, if `secondary-dir` is valid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me be more verbose this time:
struct PipeBuilder {
// Only available after a successful `scan`.
dir_manager: Option<Arc<DirectoryManager>>,
}
struct DirectoryManager {
paths: Vec<PathBuf>,
locks: Vec<File>,
}
fn scan(&self) {
// setup directories
let mut dirs = DirectoryManager::new();
dirs.add(self.cfg.dir)?;
dirs.add(self.second_dir)?;
for path_id in 0..dirs.len() {
self.scan_dir(path_id, dirs[path_id]);
}
self.rewrite_files.sort();
self.append_files.sort();
for queue, files in [] {
if files.is_empty() { continue; }
// check consecutiveness
let mut invalid_files = 0;
let mut current_seq = files[0].seq;
for (i, f) in files.enumerate() {
if f.seq > current_seq {
warn!("hole");
current_seq = f.seq + 1;
invalid_files = i;
} else if f.seq < current_seq {
return Error::InvalidArgument("Duplicate file");
}
}
files.drain(..invalid_files);
if files.is_empty() { continue; }
// cleanup metadata
let delete_start = {...}
'cleanup: for seq in delete_start..files[0].seq {
for path_id in 0..dirs.len() {
if self.file_system.exists_metadata(dirs.path(path_id)) {
if let Err(e) = self.file_system.delete_metadata() { ...; break 'cleanup; }
}
}
}
}
self.dir_manager = Arc::new(dirs);
}
fn scan_dir(&self, path_id: u64, dir: &Path) {
for f in fs::read_dir(dir) {
self.rewrite_files.push(...);
}
}
fn build(self) {
SinglePipe::new(self.dir_manager.clone(), ...);
}
Signed-off-by: Lucasliang <[email protected]>
This pr is based on a too obsolete So, I've reviewed the previous comment and re-built another pr for tackling this issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Some pending comments from months ago)
@@ -551,10 +529,10 @@ enum BufState { | |||
/// state only briefly exists between encoding and writing, user operation | |||
/// will panic under this state. | |||
/// # Content | |||
/// (header_offset, entries_len) | |||
/// (header_offset, entries_len, signature) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signature -> original checksum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change on BufState::Sealed()
has been refactored in the new PR.
sign_checksum(&mut self.buf, Some(old ^ new))?; | ||
} | ||
(Some(old), None) => { | ||
sign_checksum(&mut self.buf, Some(old))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lack coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
return Ok(()); | ||
} | ||
if !path.is_dir() { | ||
return Err(box_err!("Not directory: {}", dir)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lack coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has been refactored here.
// As per trait protocol, this error should be retriable. But we panic anyway to | ||
// save the trouble of propagating it to other group members. | ||
// As per trait protocol, this error should be retriable. But we panic | ||
// anyway to save the trouble of propagating it to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the comment line break is changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tackled in the new pr.
@@ -178,7 +189,27 @@ where | |||
debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO); | |||
perf_context += &writer.perf_context_diff; | |||
set_perf_context(perf_context); | |||
writer.finish()? | |||
// Retry if `writer.finish()` returns a special 'Error::Other', remarking that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add another error type TryAgain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Brief Introduction
This pr is used to support the
secondary directory
feature, referred to ISSUE: #257.It contains:
sub-dir
to enable this feature. Default isNone
, and if set withSome(...)
, this directory will be used if the main dir is full, specified bydir
option.StorageInfo
inpipe.rs
, to make this feature compatible to other existing features, i.e. recycle logs, recoverying and so on.