Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: reorder upload queue when possible #10218

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 107 additions & 41 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};

pub(crate) use download::download_initdb_tar_zst;
use itertools::Itertools as _;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;
Expand Down Expand Up @@ -1797,55 +1798,120 @@ impl RemoteTimelineClient {
Ok(())
}

/// Returns true if a can bypass b, i.e. if the operations don't conflict. prev_index is the
/// active index below b.
///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
}
UploadOp::UploadMetadata { .. } => {
// These can only be performed after all the preceding operations
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(..) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
/// TODO: consider moving this and other associated logic into UploadOp and UploadQueue.
fn can_bypass(a: &UploadOp, b: &UploadOp, prev_index: &IndexPart) -> bool {
match (a, b) {
// Nothing can bypass a barrier or shutdown, and it can't bypass anything.
(UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,

// Uploads and deletes can bypass each other unless they're for the same file.
// TODO: index and memoize filenames.
(UploadOp::UploadLayer(a, _, _), UploadOp::UploadLayer(b, _, _)) => {
a.layer_desc().layer_name() != b.layer_desc().layer_name()
}
(UploadOp::UploadLayer(u, _, _), UploadOp::Delete(d))
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, _, _)) => !d
.layers
.iter()
.map(|(name, _)| name)
.contains(&u.layer_desc().layer_name()),

// Deletes are idempotent and can always bypass each other.
// TODO: verify this.
(UploadOp::Delete(_), UploadOp::Delete(_)) => true,

// Uploads and deletes can bypass an index upload as long as the index and the index
// below it doesn't reference the file. A layer can't be modified or deleted while
// referenced by an index.
(UploadOp::UploadLayer(u, _, _), UploadOp::UploadMetadata { uploaded: i }) => {
let layer_name = u.layer_desc().layer_name();
!i.layer_metadata.contains_key(&layer_name)
&& !prev_index.layer_metadata.contains_key(&layer_name)
}
(UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i }) => {
d.layers.iter().all(|(layer_name, _)| {
!i.layer_metadata.contains_key(layer_name)
&& !prev_index.layer_metadata.contains_key(layer_name)
})
}

UploadOp::Barrier(_) | UploadOp::Shutdown => {
upload_queue.inprogress_tasks.is_empty()
}
};
// An index can bypass an upload or delete if the index doesn't contain the modified
// files. They can't be referenced by the previous index either, since it's not legal to
// modify a referenced file.
(UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, _, _)) => {
let layer_name = u.layer_desc().layer_name();
!i.layer_metadata.contains_key(&layer_name)
&& !prev_index.layer_metadata.contains_key(&layer_name)
}
(UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
d.layers.iter().all(|(layer_name, _)| {
!i.layer_metadata.contains_key(layer_name)
&& !prev_index.layer_metadata.contains_key(layer_name)
})
}

// If we cannot launch this task, don't look any further.
// Indexes can never bypass each other.
// TODO: they can coalesce though, consider this.
(UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
}
}

/// Returns and removes the next operation from the queue, if any. This isn't necessarily the
/// first task in the queue, to avoid head-of-line blocking.
///
/// TODO: consider limiting the number of in-progress tasks.
fn next_queued_task(self: &Arc<Self>, queue: &mut UploadQueueInitialized) -> Option<UploadOp> {
// For each queued operation, check if it can bypass the in-progress and queued operations
// ahead of it (if any).
//
// TODO: this loop is a bit ugly, rewrite the internals as an iterator chain.
// TODO: this is quadratic. Is that a problem? Consider optimizing.
'OUTER: for (i, op) in queue.queued_operations.iter().enumerate() {
// Visit the preceding operations in reverse order (front to back), to keep track of
// prev_index. The order really doesn't matter, since we must be able to bypass all.
//
// In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
// them now, but we don't try to do that currently. For example, if the frontmost task
// is an index-file upload that cannot proceed until preceding uploads have finished, we
// could still start layer uploads that were scheduled later.
if !can_run_now {
break;
// TODO: add assertions that an upload or delete is never referenced by the most recent
// index.
let in_progress = queue.inprogress_tasks.values().map(|task| &task.op);
let ahead = queue.queued_operations.iter().take(i).rev();
let mut prev_index = &queue.clean.0;
for other in in_progress.chain(ahead) {
if !Self::can_bypass(op, other, prev_index) {
// Nothing can cross a barrier, so give up if we find one.
if matches!(op, UploadOp::Barrier(_) | UploadOp::Shutdown) {
return None;
}
continue 'OUTER;
}
if let UploadOp::UploadMetadata { ref uploaded } = other {
prev_index = uploaded;
}
}

if let UploadOp::Shutdown = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
break;
}
// We can bypass all preceding operations; go for it.
return queue.queued_operations.remove(i);
}
None
}

// We can launch this task. Remove it from the queue first.
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
// Check for a shutdown. Leave it in the queue, but don't start more tasks. It will be
// dropped on stop.
if let Some(UploadOp::Shutdown) = upload_queue.queued_operations.front() {
upload_queue.shutdown_ready.close();
return;
}

debug!("starting op: {}", next_op);
while let Some(mut next_op) = self.next_queued_task(upload_queue) {
debug!("starting op: {next_op}");

// Update the counters and prepare
match &mut next_op {
Expand Down
Loading