Skip to content

Commit

Permalink
refactor(pageserver): make image layer creation atomic
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed Oct 31, 2024
1 parent 9761b6a commit 8ee5588
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 35 deletions.
2 changes: 2 additions & 0 deletions pageserver/src/tenant/storage_layer/batch_split_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ impl BatchLayerWriter {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
assert_eq!(writer.key_range().start, layer_key.key_range.start);
assert_eq!(writer.lsn(), layer_key.lsn_range.start);
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
Expand Down
9 changes: 9 additions & 0 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ impl ImageLayerWriterInner {
}

let final_key_range = if let Some(end_key) = end_key {
assert!(end_key <= self.key_range.end);
self.key_range.start..end_key
} else {
self.key_range.clone()
Expand Down Expand Up @@ -1033,6 +1034,14 @@ impl ImageLayerWriter {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
}

pub(crate) fn key_range(&self) -> Range<Key> {
self.inner.as_ref().unwrap().key_range.clone()
}

pub(crate) fn lsn(&self) -> Lsn {
self.inner.as_ref().unwrap().lsn
}
}

impl Drop for ImageLayerWriter {
Expand Down
100 changes: 66 additions & 34 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use handle::ShardTimelineId;
use itertools::Itertools;
use offload::OffloadError;
use once_cell::sync::Lazy;
use pageserver_api::{
Expand Down Expand Up @@ -65,13 +66,14 @@ use std::{
};
use std::{pin::pin, sync::OnceLock};

use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
config::AttachmentMode,
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
storage_layer::inmemory_layer::IndexEntry,
},
walingest::WalLagCooldown,
walredo,
Expand Down Expand Up @@ -104,7 +106,6 @@ use crate::{
virtual_file::{MaybeFatalIo, VirtualFile},
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;

use crate::config::PageServerConf;
Expand Down Expand Up @@ -142,7 +143,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf};

use super::{
config::TenantConf,
storage_layer::{inmemory_layer, LayerVisibilityHint},
storage_layer::{
batch_split_writer::{BatchLayerWriter, BatchWriterResult},
inmemory_layer, LayerVisibilityHint,
},
upload_queue::NotInitialized,
MaybeOffloaded,
};
Expand Down Expand Up @@ -856,7 +860,7 @@ pub(crate) enum ShutdownMode {
}

struct ImageLayerCreationOutcome {
image: Option<ResidentLayer>,
image: Option<ImageLayerWriter>,
next_start_key: Key,
}

Expand Down Expand Up @@ -4053,11 +4057,14 @@ impl Timeline {
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("created image layer for rel {}", image_layer.local_path());
info!(
"flushed image layer for rel key_start={} key_end={} lsn={}",
image_layer_writer.key_range().start,
image_layer_writer.key_range().end,
image_layer_writer.lsn()
);
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
image: Some(image_layer_writer),
next_start_key: img_range.end,
})
} else {
Expand Down Expand Up @@ -4143,14 +4150,14 @@ impl Timeline {
if wrote_any_image {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!(
"created image layer for metadata {}",
image_layer.local_path()
"flushed image layer for metadata key_start={} key_end={} lsn={}",
image_layer_writer.key_range().start,
image_layer_writer.key_range().end,
image_layer_writer.lsn()
);
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
image: Some(image_layer_writer),
next_start_key: img_range.end,
})
} else {
Expand Down Expand Up @@ -4227,7 +4234,6 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers = Vec::new();

// We need to avoid holes between generated image layers.
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
Expand All @@ -4242,6 +4248,8 @@ impl Timeline {

let check_for_image_layers = self.should_check_if_image_layers_required(lsn);

let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;

for partition in partitioning.parts.iter() {
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
Expand Down Expand Up @@ -4272,24 +4280,6 @@ impl Timeline {
continue;
}
}
if let ImageLayerCreationMode::Force = mode {
// When forced to create image layers, we might try and create them where they already
// exist. This mode is only used in tests/debug.
let layers = self.layers.read().await;
if layers.contains_key(&PersistentLayerKey {
key_range: img_range.clone(),
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
is_delta: false,
}) {
tracing::info!(
"Skipping image layer at {lsn} {}..{}, already exists",
img_range.start,
img_range.end
);
start = img_range.end;
continue;
}
}

let image_layer_writer = ImageLayerWriter::new(
self.conf,
Expand Down Expand Up @@ -4323,7 +4313,11 @@ impl Timeline {
.await?;

start = next_start_key;
image_layers.extend(image);
if let Some(image) = image {
let key_range = image.key_range();
let lsn = image.lsn();
batch_image_writer.add_unfinished_image_writer(image, key_range, lsn);
}
} else {
let ImageLayerCreationOutcome {
image,
Expand All @@ -4340,10 +4334,48 @@ impl Timeline {
)
.await?;
start = next_start_key;
image_layers.extend(image);
if let Some(image) = image {
let key_range = image.key_range();
let lsn = image.lsn();
batch_image_writer.add_unfinished_image_writer(image, key_range, lsn);
}
}
}

let image_layers = batch_image_writer
.finish_with_discard_fn(self, ctx, |key| {
// TODO: remove this clone when Rust Edition 2024 is available, closure should capture this
// lifetime.
let key = key.clone();
async move {
// When forced to create image layers, we might try and create them where they already
// exist. The force mode is only used in tests/debug.
let layers = self.layers.read().await;
if layers.contains_key(&key) {
tracing::info!(
"Skipping image layer at {} {}..{}, already exists",
key.lsn_range.start,
key.key_range.start,
key.key_range.end
);
true
} else {
false
}
}
})
.await?;
let image_layers = image_layers
.into_iter()
.filter_map(|x| {
if let BatchWriterResult::Produced(x) = x {
Some(x)
} else {
None
}
})
.collect_vec();

let mut guard = self.layers.write().await;

// FIXME: we could add the images to be uploaded *before* returning from here, but right
Expand Down
5 changes: 4 additions & 1 deletion pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2461,7 +2461,10 @@ impl TimelineAdaptor {
)
.await?;

if let Some(image_layer) = image {
if let Some(image_layer_writer) = image {
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_images.push(image_layer);
}

Expand Down

0 comments on commit 8ee5588

Please sign in to comment.