Skip to content

Commit

Permalink
feat(pageserver): add compact queue http endpoint (#10173)
Browse files Browse the repository at this point in the history
## Problem

We cannot get the size of the compaction queue and access the info.

Part of #9114 

## Summary of changes

* Add an API endpoint to get the compaction queue.
* gc_compaction test case now waits until the compaction finishes.

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Dec 18, 2024
1 parent 835287b commit 3d1c3a8
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 66 deletions.
64 changes: 64 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod utilization;
use camino::Utf8PathBuf;
pub use utilization::PageserverUtilization;

use core::ops::Range;
use std::{
collections::HashMap,
fmt::Display,
Expand All @@ -28,6 +29,7 @@ use utils::{
};

use crate::{
key::Key,
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
Expand Down Expand Up @@ -210,6 +212,68 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}

impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}

impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}

impl CompactLsnRange {
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct CompactInfoResponse {
pub compact_key_range: Option<CompactKeyRange>,
pub compact_lsn_range: Option<CompactLsnRange>,
pub sub_compaction: bool,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
Expand Down
36 changes: 34 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
TimelineGcRequest, TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
Expand Down Expand Up @@ -2039,6 +2039,34 @@ async fn timeline_cancel_compact_handler(
.await
}

// Get compact info of a timeline
async fn timeline_compact_info_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
let mut resp = Vec::new();
for item in res {
resp.push(CompactInfoResponse {
compact_key_range: item.compact_key_range,
compact_lsn_range: item.compact_lsn_range,
sub_compaction: item.sub_compaction,
});
}
json_response(StatusCode::OK, resp)
}
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}

// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
Expand Down Expand Up @@ -3400,6 +3428,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|r| api_handler(r, timeline_gc_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_info_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),
Expand Down
23 changes: 20 additions & 3 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3122,6 +3122,23 @@ impl Tenant {
}
}

pub(crate) fn get_scheduled_compaction_tasks(
&self,
timeline_id: TimelineId,
) -> Vec<CompactOptions> {
use itertools::Itertools;
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard
.get(&timeline_id)
.map(|tline_pending_tasks| {
tline_pending_tasks
.iter()
.map(|x| x.options.clone())
.collect_vec()
})
.unwrap_or_default()
}

/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
Expand Down Expand Up @@ -5759,13 +5776,13 @@ mod tests {
use timeline::{CompactOptions, DeltaLayerTestDesc};
use utils::id::TenantId;

#[cfg(feature = "testing")]
use models::CompactLsnRange;
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
#[cfg(feature = "testing")]
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
#[cfg(feature = "testing")]
use timeline::CompactLsnRange;
#[cfg(feature = "testing")]
use timeline::GcInfo;

static TEST_KEY: Lazy<Key> =
Expand Down Expand Up @@ -9634,7 +9651,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
use timeline::CompactLsnRange;
use models::CompactLsnRange;

let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;
Expand Down
63 changes: 3 additions & 60 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
Expand Down Expand Up @@ -788,63 +788,6 @@ pub(crate) struct CompactRequest {
pub sub_compaction_max_job_size_mb: Option<u64>,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}

impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}

impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}

impl CompactLsnRange {
#[cfg(test)]
#[cfg(feature = "testing")]
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,
Expand Down
13 changes: 12 additions & 1 deletion test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,18 @@ def timeline_offload(
res_json = res.json()
assert res_json is None

def timeline_compact_info(
self,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
)
self.verbose_error(res)
res_json = res.json()
return res_json

def timeline_compact(
self,
tenant_id: TenantId | TenantShardId,
Expand All @@ -749,7 +761,6 @@ def timeline_compact(
enhanced_gc_bottom_most_compaction=False,
body: dict[str, Any] | None = None,
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"
Expand Down
6 changes: 6 additions & 0 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):

workload.churn_rows(row_count, env.pageserver.id)

def compaction_finished():
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
assert queue_depth == 0

wait_until(compaction_finished, timeout=60)

# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"
Expand Down

1 comment on commit 3d1c3a8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7095 tests run: 6796 passed, 1 failed, 298 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_small_inmemory_layers[debug-pg17-True]"

Test coverage report is not available

The comment gets automatically updated with the latest test results
3d1c3a8 at 2024-12-18T19:04:47.637Z :recycle:

Please sign in to comment.