Skip to content

Commit

Permalink
Check pushes against the previous index
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Dec 20, 2024
1 parent c2215f1 commit c2ea6a3
Showing 1 changed file with 53 additions and 26 deletions.
79 changes: 53 additions & 26 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1798,10 +1798,11 @@ impl RemoteTimelineClient {
Ok(())
}

/// Returns true if a can bypass b, i.e. if the operations don't conflict.
/// Returns true if a can bypass b, i.e. if the operations don't conflict. prev_index is the
/// active index below b.
///
/// TODO: consider moving this and other associated logic into UploadOp and UploadQueue.
fn can_bypass(a: &UploadOp, b: &UploadOp) -> bool {
fn can_bypass(a: &UploadOp, b: &UploadOp, prev_index: &IndexPart) -> bool {
match (a, b) {
// Nothing can bypass a barrier or shutdown, and it can't bypass anything.
(UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
Expand All @@ -1819,27 +1820,39 @@ impl RemoteTimelineClient {
.map(|(name, _)| name)
.contains(&u.layer_desc().layer_name()),

// Deletes are idempotent and can always bypass each other
// Deletes are idempotent and can always bypass each other.
// TODO: verify this.
(UploadOp::Delete(_), UploadOp::Delete(_)) => true,

// Uploads and indexes can bypass each other unless they overlap.
// TODO: what if the current index references the file? Can we clobber it?
(UploadOp::UploadLayer(u, _, _), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, _, _)) => {
!i.layer_metadata.contains_key(&u.layer_desc().layer_name())
// Uploads and deletes can bypass an index upload as long as the index and the index
// below it doesn't reference the file. A layer can't be modified or deleted while
// referenced by an index.
(UploadOp::UploadLayer(u, _, _), UploadOp::UploadMetadata { uploaded: i }) => {
let layer_name = u.layer_desc().layer_name();
!i.layer_metadata.contains_key(&layer_name)
&& !prev_index.layer_metadata.contains_key(&layer_name)
}
(UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i }) => {
d.layers.iter().all(|(layer_name, _)| {
!i.layer_metadata.contains_key(layer_name)
&& !prev_index.layer_metadata.contains_key(layer_name)
})
}

// A delete can't bypass an index. The queued index likely doesn't reference the file,
// but the current index may still reference it.
(UploadOp::Delete(_), UploadOp::UploadMetadata { .. }) => false,

// An index can bypass a delete if the index doesn't contain the deleted files. The
// existing index can't reference these files, otherwise they couldn't be deleted.
(UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => !d
.layers
.iter()
.any(|(name, _)| i.layer_metadata.contains_key(name)),
// An index can bypass an upload or delete if the index doesn't contain the modified
// files. They can't be referenced by the previous index either, since it's not legal to
// modify a referenced file.
(UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, _, _)) => {
let layer_name = u.layer_desc().layer_name();
!i.layer_metadata.contains_key(&layer_name)
&& !prev_index.layer_metadata.contains_key(&layer_name)
}
(UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
d.layers.iter().all(|(layer_name, _)| {
!i.layer_metadata.contains_key(layer_name)
&& !prev_index.layer_metadata.contains_key(layer_name)
})
}

// Indexes can never bypass each other.
// TODO: they can coalesce though, consider this.
Expand All @@ -1855,18 +1868,32 @@ impl RemoteTimelineClient {
// For each queued operation, check if it can bypass the in-progress and queued operations
// ahead of it (if any).
//
// TODO: this loop is a bit ugly, rewrite the internals as an iterator chain.
// TODO: this is quadratic. Is that a problem? Consider optimizing.
for (i, op) in queue.queued_operations.iter().enumerate() {
'OUTER: for (i, op) in queue.queued_operations.iter().enumerate() {
// Visit the preceding operations in reverse order (front to back), to keep track of
// prev_index. The order really doesn't matter, since we must be able to bypass all.
//
// TODO: add assertions that an upload or delete is never referenced by the most recent
// index.
let in_progress = queue.inprogress_tasks.values().map(|task| &task.op);
let ahead = queue.queued_operations.iter().take(i);
if ahead.chain(in_progress).all(|q| Self::can_bypass(op, q)) {
return queue.queued_operations.remove(i);
let ahead = queue.queued_operations.iter().take(i).rev();
let mut prev_index = &queue.clean.0;
for other in in_progress.chain(ahead) {
if !Self::can_bypass(op, other, prev_index) {
// Nothing can cross a barrier, so give up if we find one.
if matches!(op, UploadOp::Barrier(_) | UploadOp::Shutdown) {
return None;
}
continue 'OUTER;
}
if let UploadOp::UploadMetadata { ref uploaded } = other {
prev_index = uploaded;
}
}

// Nothing can cross a barrier, so give up if we didn't already return it above.
if matches!(op, UploadOp::Barrier(_) | UploadOp::Shutdown) {
return None;
}
// We can bypass all preceding operations; go for it.
return queue.queued_operations.remove(i);
}
None
}
Expand Down

0 comments on commit c2ea6a3

Please sign in to comment.