Skip to content

Commit

Permalink
feat(pageserver): support schedule gc-compaction (#9809)
Browse files Browse the repository at this point in the history
## Problem

part of #9114

gc-compaction can take a long time. This patch adds support for
scheduling a gc-compaction job. The compaction loop will first handle
L0->L1 compaction, and then gc compaction. The scheduled jobs are stored
in a non-persistent queue within the tenant structure.

This will be the building block for the partial compaction trigger -- if
the system determines that we need to do a gc compaction, it will
partition the keyspace and schedule several jobs. Each of these jobs
will run for a short amount of time (i.e, 1 min). L0 compaction will be
prioritized over gc compaction.

## Summary of changes
 
* Add compaction scheduler in tenant.
* Run scheduled compaction in integration tests.
* Change the manual compaction API to allow schedule a compaction
instead of immediately doing it.
* Add LSN upper bound as gc-compaction parameter. If we schedule partial
compactions, gc_cutoff might move across different runs. Therefore, we
need to pass a pre-determined gc_cutoff beforehand. (TODO: support LSN
lower bound so that we can compact arbitrary "rectangle" in the layer
map)
* Refactor the gc_compaction internal interface.

---------

Signed-off-by: Alex Chi Z <[email protected]>
Co-authored-by: Christian Schwarz <[email protected]>
  • Loading branch information
skyzh and problame authored Dec 5, 2024
1 parent c0ba416 commit 71f38d1
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 74 deletions.
66 changes: 55 additions & 11 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::offload::OffloadError;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRange;
use crate::tenant::timeline::CompactRequest;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
Expand Down Expand Up @@ -1978,6 +1978,26 @@ async fn timeline_gc_handler(
json_response(StatusCode::OK, gc_result)
}

// Cancel scheduled compaction tasks
async fn timeline_cancel_compact_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.cancel_scheduled_compaction(timeline_id);
json_response(StatusCode::OK, ())
}
.instrument(info_span!("timeline_cancel_compact", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}

// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
Expand All @@ -1987,7 +2007,7 @@ async fn timeline_compact_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;

let compact_range = json_request_maybe::<Option<CompactRange>>(&mut request).await?;
let compact_request = json_request_maybe::<Option<CompactRequest>>(&mut request).await?;

let state = get_state(&request);

Expand All @@ -2012,22 +2032,42 @@ async fn timeline_compact_handler(
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);

let wait_until_scheduled_compaction_done =
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
.unwrap_or(false);

let options = CompactOptions {
compact_range,
compact_range: compact_request
.as_ref()
.and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
flags,
};

let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false);

async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
if scheduled {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let rx = tenant.schedule_compaction(timeline_id, options).await;
if wait_until_scheduled_compaction_done {
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
rx.await.ok();
}
} else {
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
}
}
json_response(StatusCode::OK, ())
}
Expand Down Expand Up @@ -3301,6 +3341,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),
)
.delete(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_cancel_compact_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),
Expand Down
171 changes: 147 additions & 24 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::UploadQueueNotReadyError;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::ScheduledCompactionTask;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
Expand Down Expand Up @@ -339,6 +343,11 @@ pub struct Tenant {
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,

/// Scheduled compaction tasks. Currently, this can only be populated by triggering
/// a manual gc-compaction from the manual compaction API.
scheduled_compaction_tasks:
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,

/// If the tenant is in Activating state, notify this to encourage it
/// to proceed to Active as soon as possible, rather than waiting for lazy
/// background warmup.
Expand Down Expand Up @@ -2953,27 +2962,68 @@ impl Tenant {

for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
{
// pending_task_left == None: cannot compact, maybe still pending tasks
// pending_task_left == Some(true): compaction task left
// pending_task_left == Some(false): no compaction task left
let pending_task_left = if *can_compact {
Some(
timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
let has_pending_l0_compaction_task = timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
})?;
if has_pending_l0_compaction_task {
Some(true)
} else {
let has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task
} else {
has_pending_scheduled_compaction_task = false;
None
}
};
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
{
if !next_scheduled_compaction_task
.options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else {
let _ = timeline
.compact_with_options(
cancel,
next_scheduled_compaction_task.options,
ctx,
)
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
.await?;
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
// TODO: we can send compaction statistics in the future
tx.send(()).ok();
}
})?,
)
}
}
Some(has_pending_scheduled_compaction_task)
}
} else {
None
};
Expand All @@ -2993,6 +3043,36 @@ impl Tenant {
Ok(has_pending_task)
}

/// Cancel scheduled compaction tasks
pub(crate) fn cancel_scheduled_compaction(
&self,
timeline_id: TimelineId,
) -> Vec<ScheduledCompactionTask> {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
current_tline_pending_tasks.into_iter().collect()
} else {
Vec::new()
}
}

/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
timeline_id: TimelineId,
options: CompactOptions,
) -> tokio::sync::oneshot::Receiver<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
});
rx
}

// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
Expand Down Expand Up @@ -4005,6 +4085,7 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
cancel: CancellationToken::default(),
Expand Down Expand Up @@ -9163,6 +9244,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
},
&ctx,
)
Expand Down Expand Up @@ -9399,6 +9481,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
},
&ctx,
)
Expand Down Expand Up @@ -9885,7 +9968,15 @@ mod tests {

// Do a partial compaction on key range 0..2
tline
.partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(2)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9924,7 +10015,15 @@ mod tests {

// Do a partial compaction on key range 2..4
tline
.partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(2)..get_key(4)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9968,7 +10067,15 @@ mod tests {

// Do a partial compaction on key range 4..9
tline
.partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(4)..get_key(9)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -10011,7 +10118,15 @@ mod tests {

// Do a partial compaction on key range 9..10
tline
.partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(9)..get_key(10)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -10059,7 +10174,15 @@ mod tests {

// Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones.
tline
.partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(10)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down
Loading

1 comment on commit 71f38d1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7166 tests run: 6847 passed, 0 failed, 319 skipped (full report)


Flaky tests (5)

Postgres 17

Postgres 16

Postgres 15

Code coverage* (full report)

  • functions: 31.4% (8321 of 26499 functions)
  • lines: 47.8% (65446 of 137023 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
71f38d1 at 2024-12-05T21:29:03.615Z :recycle:

Please sign in to comment.