From 3d1c3a80ae0fd2babe42d7fc2293cb2a058ff8cb Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:09:02 -0500 Subject: [PATCH] feat(pageserver): add compact queue http endpoint (#10173) ## Problem We cannot get the size of the compaction queue and access the info. Part of #9114 ## Summary of changes * Add an API endpoint to get the compaction queue. * gc_compaction test case now waits until the compaction finishes. --------- Signed-off-by: Alex Chi Z --- libs/pageserver_api/src/models.rs | 64 +++++++++++++++++++++++++ pageserver/src/http/routes.rs | 36 +++++++++++++- pageserver/src/tenant.rs | 23 +++++++-- pageserver/src/tenant/timeline.rs | 63 ++---------------------- test_runner/fixtures/pageserver/http.py | 13 ++++- test_runner/regress/test_compaction.py | 6 +++ 6 files changed, 139 insertions(+), 66 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 5690b643f062..f3fc9fad760a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -6,6 +6,7 @@ pub mod utilization; use camino::Utf8PathBuf; pub use utilization::PageserverUtilization; +use core::ops::Range; use std::{ collections::HashMap, fmt::Display, @@ -28,6 +29,7 @@ use utils::{ }; use crate::{ + key::Key, reltag::RelTag, shard::{ShardCount, ShardStripeSize, TenantShardId}, }; @@ -210,6 +212,68 @@ pub enum TimelineState { Broken { reason: String, backtrace: String }, } +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct CompactLsnRange { + pub start: Lsn, + pub end: Lsn, +} + +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct CompactKeyRange { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub start: Key, + #[serde_as(as = "serde_with::DisplayFromStr")] + pub end: Key, +} + +impl From> for CompactLsnRange { + fn from(range: Range) -> Self { + Self { + start: range.start, + end: range.end, + } + } +} + +impl From> for CompactKeyRange { + fn from(range: Range) -> Self { + Self { + start: range.start, + end: range.end, + } + } +} + +impl From for Range { + fn from(range: CompactLsnRange) -> Self { + range.start..range.end + } +} + +impl From for Range { + fn from(range: CompactKeyRange) -> Self { + range.start..range.end + } +} + +impl CompactLsnRange { + pub fn above(lsn: Lsn) -> Self { + Self { + start: lsn, + end: Lsn::MAX, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct CompactInfoResponse { + pub compact_key_range: Option, + pub compact_lsn_range: Option, + pub sub_compaction: bool, +} + #[derive(Serialize, Deserialize, Clone)] pub struct TimelineCreateRequest { pub new_timeline_id: TimelineId, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index db7d29385641..60ef4c3702f4 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; use crate::DEFAULT_PG_VERSION; use crate::{disk_usage_eviction_task, tenant}; use pageserver_api::models::{ - StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest, - TimelineInfo, + CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, + TimelineGcRequest, TimelineInfo, }; use utils::{ auth::SwappableJwtAuth, @@ -2039,6 +2039,34 @@ async fn timeline_cancel_compact_handler( .await } +// Get compact info of a timeline +async fn timeline_compact_info_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + let state = get_state(&request); + async { + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + let res = tenant.get_scheduled_compaction_tasks(timeline_id); + let mut resp = Vec::new(); + for item in res { + resp.push(CompactInfoResponse { + compact_key_range: item.compact_key_range, + compact_lsn_range: item.compact_lsn_range, + sub_compaction: item.sub_compaction, + }); + } + json_response(StatusCode::OK, resp) + } + .instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) + .await +} + // Run compaction immediately on given timeline. async fn timeline_compact_handler( mut request: Request, @@ -3400,6 +3428,10 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc", |r| api_handler(r, timeline_gc_handler), ) + .get( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact", + |r| api_handler(r, timeline_compact_info_handler), + ) .put( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact", |r| api_handler(r, timeline_compact_handler), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 99289d5f15f7..2e4c47c6e40f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3122,6 +3122,23 @@ impl Tenant { } } + pub(crate) fn get_scheduled_compaction_tasks( + &self, + timeline_id: TimelineId, + ) -> Vec { + use itertools::Itertools; + let guard = self.scheduled_compaction_tasks.lock().unwrap(); + guard + .get(&timeline_id) + .map(|tline_pending_tasks| { + tline_pending_tasks + .iter() + .map(|x| x.options.clone()) + .collect_vec() + }) + .unwrap_or_default() + } + /// Schedule a compaction task for a timeline. pub(crate) async fn schedule_compaction( &self, @@ -5759,13 +5776,13 @@ mod tests { use timeline::{CompactOptions, DeltaLayerTestDesc}; use utils::id::TenantId; + #[cfg(feature = "testing")] + use models::CompactLsnRange; #[cfg(feature = "testing")] use pageserver_api::record::NeonWalRecord; #[cfg(feature = "testing")] use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; #[cfg(feature = "testing")] - use timeline::CompactLsnRange; - #[cfg(feature = "testing")] use timeline::GcInfo; static TEST_KEY: Lazy = @@ -9634,7 +9651,7 @@ mod tests { #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> { - use timeline::CompactLsnRange; + use models::CompactLsnRange; let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?; let (tenant, ctx) = harness.load().await; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 87f5a0338252..e71cb4db80b9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -31,9 +31,9 @@ use pageserver_api::{ }, keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning}, models::{ - CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo, - DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, - LsnLease, TimelineState, + CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings, + DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, + InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState, }, reltag::BlockNumber, shard::{ShardIdentity, ShardNumber, TenantShardId}, @@ -788,63 +788,6 @@ pub(crate) struct CompactRequest { pub sub_compaction_max_job_size_mb: Option, } -#[serde_with::serde_as] -#[derive(Debug, Clone, serde::Deserialize)] -pub(crate) struct CompactLsnRange { - pub start: Lsn, - pub end: Lsn, -} - -#[serde_with::serde_as] -#[derive(Debug, Clone, serde::Deserialize)] -pub(crate) struct CompactKeyRange { - #[serde_as(as = "serde_with::DisplayFromStr")] - pub start: Key, - #[serde_as(as = "serde_with::DisplayFromStr")] - pub end: Key, -} - -impl From> for CompactLsnRange { - fn from(range: Range) -> Self { - Self { - start: range.start, - end: range.end, - } - } -} - -impl From> for CompactKeyRange { - fn from(range: Range) -> Self { - Self { - start: range.start, - end: range.end, - } - } -} - -impl From for Range { - fn from(range: CompactLsnRange) -> Self { - range.start..range.end - } -} - -impl From for Range { - fn from(range: CompactKeyRange) -> Self { - range.start..range.end - } -} - -impl CompactLsnRange { - #[cfg(test)] - #[cfg(feature = "testing")] - pub fn above(lsn: Lsn) -> Self { - Self { - start: lsn, - end: Lsn::MAX, - } - } -} - #[derive(Debug, Clone, Default)] pub(crate) struct CompactOptions { pub flags: EnumSet, diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index eabdeb10539c..378e56862252 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -738,6 +738,18 @@ def timeline_offload( res_json = res.json() assert res_json is None + def timeline_compact_info( + self, + tenant_id: TenantId | TenantShardId, + timeline_id: TimelineId, + ) -> Any: + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact", + ) + self.verbose_error(res) + res_json = res.json() + return res_json + def timeline_compact( self, tenant_id: TenantId | TenantShardId, @@ -749,7 +761,6 @@ def timeline_compact( enhanced_gc_bottom_most_compaction=False, body: dict[str, Any] | None = None, ): - self.is_testing_enabled_or_skip() query = {} if force_repartition: query["force_repartition"] = "true" diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index aef9a825ee36..ae48a8fc27d0 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -176,6 +176,12 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): workload.churn_rows(row_count, env.pageserver.id) + def compaction_finished(): + queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id)) + assert queue_depth == 0 + + wait_until(compaction_finished, timeout=60) + # ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked) env.pageserver.assert_log_contains( "scheduled_compact_timeline.*picked .* layers for compaction"