Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): gc-compaction split job and partial scheduler #9897

Merged
merged 5 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2036,15 +2036,23 @@ async fn timeline_compact_handler(
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
.unwrap_or(false);

let sub_compaction = compact_request
.as_ref()
.map(|r| r.sub_compaction)
.unwrap_or(false);
let options = CompactOptions {
compact_range: compact_request
.as_ref()
.and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
flags,
sub_compaction,
};

let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false);
let scheduled = compact_request
.as_ref()
.map(|r| r.scheduled)
.unwrap_or(false);

async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
Expand Down
49 changes: 41 additions & 8 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
Expand Down Expand Up @@ -2987,10 +2988,16 @@ impl Tenant {
if has_pending_l0_compaction_task {
Some(true)
} else {
let has_pending_scheduled_compaction_task;
let mut has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
if !tline_pending_tasks.is_empty() {
info!(
"{} tasks left in the compaction schedule queue",
tline_pending_tasks.len()
);
}
let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task
Expand All @@ -3007,6 +3014,32 @@ impl Tenant {
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else if next_scheduled_compaction_task.options.sub_compaction {
skyzh marked this conversation as resolved.
Show resolved Hide resolved
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = timeline
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
} else {
has_pending_scheduled_compaction_task = true;
let jobs_len = jobs.len();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(ScheduledCompactionTask {
options: job,
result_tx: if idx == jobs_len - 1 {
// The last compaction job sends the completion signal
next_scheduled_compaction_task.result_tx.take()
} else {
None
},
});
}
skyzh marked this conversation as resolved.
Show resolved Hide resolved
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
}
} else {
let _ = timeline
.compact_with_options(
Expand Down Expand Up @@ -9244,7 +9277,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -9481,7 +9514,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -9973,7 +10006,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(2)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10020,7 +10053,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(2)..get_key(4)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10072,7 +10105,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(4)..get_key(9)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10123,7 +10156,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(9)..get_key(10)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10179,7 +10212,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(10)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down
7 changes: 7 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,9 @@ pub(crate) struct CompactRequest {
/// Whether the compaction job should be scheduled.
#[serde(default)]
pub scheduled: bool,
/// Whether the compaction job should be split across key ranges.
#[serde(default)]
pub sub_compaction: bool,
}

#[serde_with::serde_as]
Expand Down Expand Up @@ -814,6 +817,9 @@ pub(crate) struct CompactOptions {
/// If set, the compaction will only compact the LSN below this value.
/// This option is only used by GC compaction.
pub compact_below_lsn: Option<Lsn>,
/// Enable sub-compaction (split compaction job across key ranges).
/// This option is only used by GC compaction.
pub sub_compaction: bool,
}

impl std::fmt::Debug for Timeline {
Expand Down Expand Up @@ -1629,6 +1635,7 @@ impl Timeline {
flags,
compact_range: None,
compact_below_lsn: None,
sub_compaction: false,
},
ctx,
)
Expand Down
162 changes: 151 additions & 11 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::sync::Arc;

use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
RecordedDuration, Timeline,
CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
ImageLayerCreationMode, RecordedDuration, Timeline,
};

use anyhow::{anyhow, bail, Context};
Expand All @@ -29,7 +29,6 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
Expand Down Expand Up @@ -1751,6 +1750,116 @@ impl Timeline {
Ok(())
}

/// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
/// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
/// ad-hoc information about gc compaction itself.
pub(crate) async fn gc_compaction_split_jobs(
self: &Arc<Self>,
options: CompactOptions,
) -> anyhow::Result<Vec<CompactOptions>> {
if !options.sub_compaction {
return Ok(vec![options]);
}
let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
start: Key::MIN,
end: Key::MAX,
});
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
compact_below_lsn
} else {
let gc_info = self.gc_info.read().unwrap();
gc_info.cutoffs.select_min() // use the real gc cutoff
};
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock");
};
let ((dense_ks, sparse_ks), _) = &*partition;
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,
source_end: &Key,
target_start: &Key,
target_end: &Key,
) -> Option<(Key, Key)> {
let start = source_start.max(target_start);
let end = source_end.min(target_end);
if start < end {
Some((*start, *end))
} else {
None
}
}
let mut split_key_ranges = Vec::new();
let ranges = dense_ks
.parts
.iter()
.map(|partition| partition.ranges.iter())
.chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
.flatten()
.cloned()
.collect_vec();
for range in ranges.iter() {
let Some((start, end)) = truncate_to(
&range.start,
&range.end,
&compact_range.start,
&compact_range.end,
) else {
continue;
};
split_key_ranges.push((start, end));
}
split_key_ranges.sort();
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut current_start = None;
// Split compaction job to about 2GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
let ranges_num = split_key_ranges.len();
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
if current_start.is_none() {
current_start = Some(start);
}
let start = current_start.unwrap();
if start >= end {
// We have already processed this partition.
continue;
}
let res = layer_map.range_search(start..end, compact_below_lsn);
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
let mut compact_options = options.clone();
// Try to extend the compaction range so that we include at least one full layer file.
let extended_end = res
.found
.keys()
.map(|layer| layer.layer.key_range.end)
.min();
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
// In this case, we simply use the specified key range end.
let end = if let Some(extended_end) = extended_end {
extended_end.max(end)
} else {
end
};
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_options.compact_range = Some(CompactRange { start, end });
compact_options.compact_below_lsn = Some(compact_below_lsn);
compact_options.sub_compaction = false;
compact_jobs.push(compact_options);
current_start = Some(end);
}
}
drop(guard);
Ok(compact_jobs)
}

/// An experimental compaction building block that combines compaction with garbage collection.
///
/// The current implementation picks all delta + image layers that are below or intersecting with
Expand All @@ -1773,6 +1882,36 @@ impl Timeline {
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
if options.sub_compaction {
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = self.gc_compaction_split_jobs(options).await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx).await?;
}
if jobs_len == 0 {
info!("no jobs to run, skipping gc bottom-most compaction");
}
return Ok(());
}
self.compact_with_gc_inner(cancel, options, ctx).await
}

async fn compact_with_gc_inner(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
assert!(
!options.sub_compaction,
"sub-compaction should be handled by the outer function"
);
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
Expand Down Expand Up @@ -1942,14 +2081,15 @@ impl Timeline {

// Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let layer_names = job_desc
.selected_layers
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
}
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
// let layer_names = job_desc
// .selected_layers
// .iter()
// .map(|layer| layer.layer_desc().layer_name())
// .collect_vec();
// if let Some(err) = check_valid_layermap(&layer_names) {
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
// The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc
.selected_layers
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this
Expand Down
Loading