Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): add compact queue http endpoint #10173

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod utilization;
use camino::Utf8PathBuf;
pub use utilization::PageserverUtilization;

use core::ops::Range;
use std::{
collections::HashMap,
fmt::Display,
Expand All @@ -28,6 +29,7 @@ use utils::{
};

use crate::{
key::Key,
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
Expand Down Expand Up @@ -210,6 +212,68 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}

impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}

impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}

impl CompactLsnRange {
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct CompactInfoResponse {
pub compact_key_range: Option<CompactKeyRange>,
pub compact_lsn_range: Option<CompactLsnRange>,
pub sub_compaction: bool,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
Expand Down
36 changes: 34 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
TimelineGcRequest, TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
Expand Down Expand Up @@ -2039,6 +2039,34 @@ async fn timeline_cancel_compact_handler(
.await
}

// Get compact info of a timeline
async fn timeline_compact_info_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
let mut resp = Vec::new();
for item in res {
resp.push(CompactInfoResponse {
compact_key_range: item.compact_key_range,
compact_lsn_range: item.compact_lsn_range,
sub_compaction: item.sub_compaction,
});
}
json_response(StatusCode::OK, resp)
}
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}

// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
Expand Down Expand Up @@ -3400,6 +3428,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|r| api_handler(r, timeline_gc_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_info_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),
Expand Down
23 changes: 20 additions & 3 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3122,6 +3122,23 @@ impl Tenant {
}
}

pub(crate) fn get_scheduled_compaction_tasks(
&self,
timeline_id: TimelineId,
) -> Vec<CompactOptions> {
use itertools::Itertools;
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard
.get(&timeline_id)
.map(|tline_pending_tasks| {
tline_pending_tasks
.iter()
.map(|x| x.options.clone())
.collect_vec()
})
.unwrap_or_default()
}

/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
Expand Down Expand Up @@ -5759,13 +5776,13 @@ mod tests {
use timeline::{CompactOptions, DeltaLayerTestDesc};
use utils::id::TenantId;

#[cfg(feature = "testing")]
use models::CompactLsnRange;
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
#[cfg(feature = "testing")]
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
#[cfg(feature = "testing")]
use timeline::CompactLsnRange;
#[cfg(feature = "testing")]
use timeline::GcInfo;

static TEST_KEY: Lazy<Key> =
Expand Down Expand Up @@ -9634,7 +9651,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
use timeline::CompactLsnRange;
use models::CompactLsnRange;

let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;
Expand Down
63 changes: 3 additions & 60 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
Expand Down Expand Up @@ -788,63 +788,6 @@ pub(crate) struct CompactRequest {
pub sub_compaction_max_job_size_mb: Option<u64>,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}

impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}

impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}

impl CompactLsnRange {
#[cfg(test)]
#[cfg(feature = "testing")]
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,
Expand Down
13 changes: 12 additions & 1 deletion test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,18 @@ def timeline_offload(
res_json = res.json()
assert res_json is None

def timeline_compact_info(
self,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
)
self.verbose_error(res)
res_json = res.json()
return res_json

def timeline_compact(
self,
tenant_id: TenantId | TenantShardId,
Expand All @@ -749,7 +761,6 @@ def timeline_compact(
enhanced_gc_bottom_most_compaction=False,
body: dict[str, Any] | None = None,
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"
Expand Down
6 changes: 6 additions & 0 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):

workload.churn_rows(row_count, env.pageserver.id)

def compaction_finished():
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
assert queue_depth == 0

wait_until(compaction_finished, timeout=60)

# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"
Expand Down
Loading