Skip to content

Commit

Permalink
feat(pageserver): add manual trigger for bottom-most compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed Jun 18, 2024
1 parent b37d988 commit daea4b1
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 16 deletions.
9 changes: 9 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,15 @@ async fn timeline_compact_handler(
if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
flags |= CompactFlags::ForceImageLayerCreation;
}
if Some(true) == parse_query_param::<_, bool>(&request, "enhanced_gc_bottom_most_compaction")? {
if !cfg!(feature = "testing") {
return Err(ApiError::InternalServerError(
anyhow!("enhanced_gc_bottom_most_compaction is only available in testing mode")
.into(),
));
}
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
}
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);

Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ impl DeltaLayerInner {
}

/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ impl ImageLayerInner {
}

/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand Down
2 changes: 0 additions & 2 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ impl Layer {
}

/// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
#[cfg(test)]
pub(crate) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand Down Expand Up @@ -1774,7 +1773,6 @@ impl DownloadedLayer {
}
}

#[cfg(test)]
async fn load_key_values(
&self,
owner: &Arc<LayerInner>,
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ pub enum GetLogicalSizePriority {
pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
EnhancedGcBottomMostCompaction,
}

impl std::fmt::Debug for Timeline {
Expand Down Expand Up @@ -1121,7 +1122,6 @@ impl Timeline {
/// scan iterator interface. We could optimize this interface later to avoid some checks in the vectored
/// get path to maintain and split the probing and to-be-probe keyspace. We also need to ensure that
/// the scan operation will not cause OOM in the future.
#[allow(dead_code)]
pub(crate) async fn scan(
&self,
keyspace: KeySpace,
Expand Down
46 changes: 37 additions & 9 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ impl Timeline {
/// TODO: cancellation
pub(crate) async fn compact_legacy(
self: &Arc<Self>,
_cancel: &CancellationToken,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
return self.compact_with_gc(cancel, ctx).await;
}

// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
Expand Down Expand Up @@ -959,15 +963,16 @@ impl Timeline {
/// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
/// and create delta layers with all deltas >= gc horizon.
#[cfg(test)]
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
_cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
use crate::tenant::storage_layer::ValueReconstructState;
use std::collections::BTreeSet;

use crate::tenant::storage_layer::ValueReconstructState;
info!("running enhanced gc bottom-most compaction");

// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
// The layer selection has the following properties:
// 1. If a layer is in the selection, all layers below it are in the selection.
Expand All @@ -987,6 +992,11 @@ impl Timeline {
}
(selected_layers, gc_cutoff)
};
info!(
"picked {} layers for compaction with gc_cutoff={}",
layer_selection.len(),
gc_cutoff
);
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
let mut all_key_values = Vec::new();
let mut delta_split_points = BTreeSet::new();
Expand Down Expand Up @@ -1059,24 +1069,34 @@ impl Timeline {
} else if *lsn <= horizon {
match val {
crate::repository::Value::Image(image) => {
if lsn <= &horizon {
base_image = Some((*lsn, image.clone()));
break;
}
base_image = Some((*lsn, image.clone()));
break;
}
crate::repository::Value::WalRecord(wal) => {
delta_above_base_image.push((*lsn, wal.clone()));
}
}
}
}
delta_above_base_image.reverse();
// do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
keys_above_horizon.reverse();
let img_lsn = base_image.as_ref().map(|img| img.0);
let delta_lsns = delta_above_base_image
.iter()
.map(|delta| delta.0)
.collect_vec();
let full_history = accumulated_values
.iter()
.map(|(_, lsn, _)| lsn)
.collect_vec();
let state = ValueReconstructState {
img: base_image,
records: delta_above_base_image,
};
let img = tline.reconstruct_value(key, horizon, state).await?;
let img = tline
.reconstruct_value(key, horizon, state)
.await
.with_context(|| format!("when reconstructing {img_lsn:?} {delta_lsns:?} {full_history:?} at {horizon}"))?;
Ok((keys_above_horizon, img))
}

Expand Down Expand Up @@ -1183,6 +1203,11 @@ impl Timeline {
);

let image_layer = image_layer_writer.finish(self, ctx).await?;
info!(
"produced {} delta layers and {} image layers",
delta_layers.len(),
1
);
let mut compact_to = Vec::new();
compact_to.extend(delta_layers);
compact_to.push(image_layer);
Expand All @@ -1191,6 +1216,9 @@ impl Timeline {
let mut guard = self.layers.write().await;
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};

self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;
Ok(())
}
}
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/timeline/layer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ impl LayerManager {
}

/// Called when a GC-compaction is completed.
#[cfg(test)]
pub(crate) fn finish_gc_compaction(
&mut self,
compact_from: &[Layer],
Expand Down
3 changes: 3 additions & 0 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ def timeline_compact(
force_repartition=False,
force_image_layer_creation=False,
wait_until_uploaded=False,
enhanced_gc_bottom_most_compaction=False,
):
self.is_testing_enabled_or_skip()
query = {}
Expand All @@ -603,6 +604,8 @@ def timeline_compact(
query["force_image_layer_creation"] = "true"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"
if enhanced_gc_bottom_most_compaction:
query["enhanced_gc_bottom_most_compaction"] = "true"

log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
Expand Down
29 changes: 28 additions & 1 deletion test_runner/performance/test_gc_feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "10 s",
"pitr_interval": "60 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
Expand Down Expand Up @@ -99,6 +99,33 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
MetricReport.LOWER_IS_BETTER,
)

client.timeline_compact(tenant_id, timeline_id, enhanced_gc_bottom_most_compaction=True)

max_num_of_deltas_above_image = 0
max_total_num_of_deltas = 0
for key_range in client.perf_info(tenant_id, timeline_id):
max_total_num_of_deltas = max(max_total_num_of_deltas, key_range["total_num_of_deltas"])
max_num_of_deltas_above_image = max(
max_num_of_deltas_above_image, key_range["num_of_deltas_above_image"]
)
zenbenchmark.record("logical_size_after_bottom_most_compaction", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("physical_size_after_bottom_most_compaction", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"physical/logical ratio after bottom_most_compaction", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"max_total_num_of_deltas_after_bottom_most_compaction", max_total_num_of_deltas, "", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"max_num_of_deltas_above_image_after_bottom_most_compaction",
max_num_of_deltas_above_image,
"",
MetricReport.LOWER_IS_BETTER,
)

with endpoint.cursor() as cur:
cur.execute(f"SELECT * FROM t") # ensure data is not corrupted

layer_map_path = env.repo_dir / "layer-map.json"
log.info(f"Writing layer map to {layer_map_path}")
with layer_map_path.open("w") as f:
Expand Down

0 comments on commit daea4b1

Please sign in to comment.