Skip to content

Commit

Permalink
rewrite: optimize the interval of sync when rewriting memtables.
Browse files Browse the repository at this point in the history
In a cloud environment, refraining from unscheduling sync operations
when rewriting memtables might result in an accumulation of unsynced bytes
in the buffer. This accumulation has the potential to impede the foreground
write progress during sync.

This pull request introduces periodic sync operations when the amount of
stashed unsynced bytes exceeds a predefined threshold. This optimization
aims to address the issue and enhance performance.

Signed-off-by: lucasliang <[email protected]>
  • Loading branch information
LykxSassinator committed Dec 31, 2023
1 parent d043b4a commit 2318ea0
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const REWRITE_RATIO: f64 = 0.7;
const MAX_REWRITE_ENTRIES_PER_REGION: usize = 32;
const MAX_COUNT_BEFORE_FORCE_REWRITE: u32 = 9;

#[inline]
fn max_batch_bytes() -> usize {
fail_point!("max_rewrite_batch_bytes", |s| s
.unwrap()
Expand All @@ -35,6 +36,10 @@ fn max_batch_bytes() -> usize {
128 * 1024
}

fn max_forcely_sync_bytes() -> usize {
max_batch_bytes() * 4
}

pub struct PurgeManager<P>
where
P: PipeLog,
Expand Down Expand Up @@ -354,6 +359,7 @@ where
let mut current_entry_indexes = Vec::new();
let mut current_entries = Vec::new();
let mut current_size = 0;
let mut unsynced_size = 0;
// Split the entries into smaller chunks, so that we don't OOM, and the
// compression overhead is not too high.
let mut entry_indexes = entry_indexes.into_iter().peekable();
Expand All @@ -362,6 +368,7 @@ where
current_size += entry.len();
current_entries.push(entry);
current_entry_indexes.push(ei);
unsynced_size += current_size;
// If this is the last entry, we handle them outside the loop.
if entry_indexes.peek().is_some()
&& current_size + previous_size > max_batch_bytes()
Expand Down Expand Up @@ -396,7 +403,15 @@ where
)?;
current_size = 0;
previous_size = 0;
let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
let sync = if unsynced_size >= max_forcely_sync_bytes() {
// Avoiding too many unsynced size can make the later `fdatasync` in
// the append progress blocked for too long.
unsynced_size = 0;
true
} else {
false
};
let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap();
if needs_atomicity && atomic_group_start.is_none() {
atomic_group_start = Some(handle.id.seq);
}
Expand Down

0 comments on commit 2318ea0

Please sign in to comment.