Skip to content

Commit

Permalink
feat(pageserver): split delta writer automatically determines key range
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed Sep 9, 2024
1 parent 723c097 commit 23ce178
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 55 deletions.
6 changes: 3 additions & 3 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7091,13 +7091,13 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..Key::NON_L0_MAX,
key_range: Key::MIN..Key::MAX,
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
// The delta layer below the horizon
PersistentLayerKey {
key_range: Key::MIN..Key::NON_L0_MAX,
key_range: get_key(3)..get_key(4),
lsn_range: Lsn(0x30)..Lsn(0x48),
is_delta: true
},
Expand Down
151 changes: 103 additions & 48 deletions pageserver/src/tenant/storage_layer/split_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,45 +204,34 @@ impl SplitImageLayerWriter {
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
inner: DeltaLayerWriter,
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
start_key: Key,
}

impl SplitDeltaLayerWriter {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn_range: Range<Lsn>,
target_layer_size: u64,
ctx: &RequestContext,
#[allow(unused)] ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: DeltaLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
start_key,
lsn_range.clone(),
ctx,
)
.await?,
inner: None,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
start_key,
})
}

Expand All @@ -265,9 +254,26 @@ impl SplitDeltaLayerWriter {
//
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
// strategy. https://github.com/neondatabase/neon/issues/8837

if self.inner.is_none() {
self.inner = Some((
key,
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
key,
self.lsn_range.clone(),
ctx,
)
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();

let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
if inner.num_keys() >= 1
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
if key != self.last_key_written {
let next_delta_writer = DeltaLayerWriter::new(
Expand All @@ -279,13 +285,13 @@ impl SplitDeltaLayerWriter {
ctx,
)
.await?;
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
key_range: start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
self.start_key = key;
if discard(&layer_key).await {
drop(prev_delta_writer);
self.generated_layers
Expand All @@ -296,17 +302,18 @@ impl SplitDeltaLayerWriter {
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
self.inner.estimated_size()
inner.estimated_size()
);
}
}
self.last_key_written = key;
self.inner.put_value(key, lsn, val, ctx).await
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
}

pub async fn put_value(
Expand All @@ -325,7 +332,6 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
where
Expand All @@ -337,11 +343,15 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
let Some((start_key, inner)) = inner else {
return Ok(generated_layers);
};
if inner.num_keys() == 0 {
return Ok(generated_layers);
}
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
key_range: start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
Expand All @@ -360,15 +370,14 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}

/// When split writer fails, the caller should call this function and handle partially generated layers.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
Ok((self.generated_layers, self.inner))
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
Ok((self.generated_layers, self.inner.map(|x| x.1)))
}
}

Expand Down Expand Up @@ -432,7 +441,6 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
Expand Down Expand Up @@ -460,11 +468,22 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(layers.len(), 1);
assert_eq!(
layers
.into_iter()
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}

#[tokio::test]
Expand Down Expand Up @@ -501,7 +520,6 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
Expand Down Expand Up @@ -533,10 +551,7 @@ mod tests {
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
Expand All @@ -555,6 +570,14 @@ mod tests {
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
Expand Down Expand Up @@ -602,7 +625,6 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&ctx,
Expand Down Expand Up @@ -644,11 +666,35 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(layers.len(), 2);
let mut layers_iter = layers.into_iter();
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(1)..get_key(2),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}

#[tokio::test]
Expand All @@ -668,7 +714,6 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&ctx,
Expand All @@ -689,10 +734,20 @@ mod tests {
.await
.unwrap();
}
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
assert_eq!(delta_layers.len(), 1);
let delta_layer = delta_layers
.into_iter()
.next()
.unwrap()
.into_resident_layer();
assert_eq!(
delta_layer.layer_desc().key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
is_delta: true
}
);
}
}
6 changes: 2 additions & 4 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,6 @@ impl Timeline {
.unwrap();
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
// as an L0 layer.
let hack_end_key = Key::NON_L0_MAX;
let mut delta_layers = Vec::new();
let mut image_layers = Vec::new();
let mut downloaded_layers = Vec::new();
Expand Down Expand Up @@ -1855,7 +1854,6 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
Key::MIN,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
ctx,
Expand Down Expand Up @@ -1965,7 +1963,7 @@ impl Timeline {
let produced_image_layers = if let Some(writer) = image_layer_writer {
if !dry_run {
writer
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.await?
} else {
let (layers, _) = writer.take()?;
Expand All @@ -1978,7 +1976,7 @@ impl Timeline {

let produced_delta_layers = if !dry_run {
delta_layer_writer
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.finish_with_discard_fn(self, ctx, discard)
.await?
} else {
let (layers, _) = delta_layer_writer.take()?;
Expand Down

0 comments on commit 23ce178

Please sign in to comment.