Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): support schedule gc-compaction #9809

Merged
merged 12 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::offload::OffloadError;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRange;
use crate::tenant::timeline::CompactRequest;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
Expand Down Expand Up @@ -1962,6 +1962,26 @@ async fn timeline_gc_handler(
json_response(StatusCode::OK, gc_result)
}

// Cancel scheduled compaction tasks
async fn timeline_cancel_compact_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.cancel_scheduled_compaction(timeline_id);
json_response(StatusCode::OK, ())
}
.instrument(info_span!("timeline_cancel_compact", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}

// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
Expand All @@ -1971,7 +1991,7 @@ async fn timeline_compact_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;

let compact_range = json_request_maybe::<Option<CompactRange>>(&mut request).await?;
let compact_request = json_request_maybe::<Option<CompactRequest>>(&mut request).await?;

let state = get_state(&request);

Expand All @@ -1996,22 +2016,42 @@ async fn timeline_compact_handler(
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);

let wait_until_scheduled_compaction_done =
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
.unwrap_or(false);

let options = CompactOptions {
compact_range,
compact_range: compact_request
.as_ref()
.and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
flags,
};

let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false);

async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
if scheduled {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let rx = tenant.schedule_compaction(timeline_id, options).await;
if wait_until_scheduled_compaction_done {
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
rx.await.ok();
}
} else {
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
}
}
json_response(StatusCode::OK, ())
}
Expand Down Expand Up @@ -3285,6 +3325,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),
)
.delete(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_cancel_compact_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),
Expand Down
171 changes: 147 additions & 24 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::UploadQueueNotReadyError;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::ScheduledCompactionTask;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
Expand Down Expand Up @@ -339,6 +343,11 @@ pub struct Tenant {
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,

/// Scheduled compaction tasks. Currently, this can only be populated by triggering
/// a manual gc-compaction from the manual compaction API.
scheduled_compaction_tasks:
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,

/// If the tenant is in Activating state, notify this to encourage it
/// to proceed to Active as soon as possible, rather than waiting for lazy
/// background warmup.
Expand Down Expand Up @@ -2953,27 +2962,68 @@ impl Tenant {

for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
{
// pending_task_left == None: cannot compact, maybe still pending tasks
// pending_task_left == Some(true): compaction task left
// pending_task_left == Some(false): no compaction task left
let pending_task_left = if *can_compact {
Some(
timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
let has_pending_l0_compaction_task = timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
})?;
if has_pending_l0_compaction_task {
Some(true)
} else {
let has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task
} else {
has_pending_scheduled_compaction_task = false;
None
}
};
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
{
if !next_scheduled_compaction_task
.options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else {
let _ = timeline
.compact_with_options(
cancel,
next_scheduled_compaction_task.options,
ctx,
)
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
.await?;
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
// TODO: we can send compaction statistics in the future
tx.send(()).ok();
}
})?,
)
}
}
Some(has_pending_scheduled_compaction_task)
}
} else {
None
};
Expand All @@ -2993,6 +3043,36 @@ impl Tenant {
Ok(has_pending_task)
}

/// Cancel scheduled compaction tasks
pub(crate) fn cancel_scheduled_compaction(
&self,
timeline_id: TimelineId,
) -> Vec<ScheduledCompactionTask> {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
current_tline_pending_tasks.into_iter().collect()
} else {
Vec::new()
}
}

/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
timeline_id: TimelineId,
options: CompactOptions,
) -> tokio::sync::oneshot::Receiver<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
});
rx
}

// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
Expand Down Expand Up @@ -3993,6 +4073,7 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
cancel: CancellationToken::default(),
Expand Down Expand Up @@ -9149,6 +9230,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
},
&ctx,
)
Expand Down Expand Up @@ -9385,6 +9467,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
},
&ctx,
)
Expand Down Expand Up @@ -9871,7 +9954,15 @@ mod tests {

// Do a partial compaction on key range 0..2
tline
.partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(2)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9910,7 +10001,15 @@ mod tests {

// Do a partial compaction on key range 2..4
tline
.partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(2)..get_key(4)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9954,7 +10053,15 @@ mod tests {

// Do a partial compaction on key range 4..9
tline
.partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(4)..get_key(9)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9997,7 +10104,15 @@ mod tests {

// Do a partial compaction on key range 9..10
tline
.partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(9)..get_key(10)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -10045,7 +10160,15 @@ mod tests {

// Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones.
tline
.partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(10)).into()),
compact_below_lsn: None,
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down
Loading
Loading