diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index fee11bc742bf..3ff108aca2ad 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -188,6 +188,7 @@ use camino::Utf8Path; use chrono::{NaiveDateTime, Utc}; pub(crate) use download::download_initdb_tar_zst; +use itertools::Itertools as _; use pageserver_api::models::TimelineArchivalState; use pageserver_api::shard::{ShardIndex, TenantShardId}; use regex::Regex; @@ -1797,55 +1798,120 @@ impl RemoteTimelineClient { Ok(()) } + /// Returns true if a can bypass b, i.e. if the operations don't conflict. prev_index is the + /// active index below b. /// - /// Pick next tasks from the queue, and start as many of them as possible without violating - /// the ordering constraints. - /// - /// The caller needs to already hold the `upload_queue` lock. - fn launch_queued_tasks(self: &Arc, upload_queue: &mut UploadQueueInitialized) { - while let Some(next_op) = upload_queue.queued_operations.front() { - // Can we run this task now? - let can_run_now = match next_op { - UploadOp::UploadLayer(..) => { - // Can always be scheduled. - true - } - UploadOp::UploadMetadata { .. } => { - // These can only be performed after all the preceding operations - // have finished. - upload_queue.inprogress_tasks.is_empty() - } - UploadOp::Delete(..) => { - // Wait for preceding uploads to finish. Concurrent deletions are OK, though. - upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() - } + /// TODO: consider moving this and other associated logic into UploadOp and UploadQueue. + fn can_bypass(a: &UploadOp, b: &UploadOp, prev_index: &IndexPart) -> bool { + match (a, b) { + // Nothing can bypass a barrier or shutdown, and it can't bypass anything. + (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false, + (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false, + + // Uploads and deletes can bypass each other unless they're for the same file. + // TODO: index and memoize filenames. + (UploadOp::UploadLayer(a, _, _), UploadOp::UploadLayer(b, _, _)) => { + a.layer_desc().layer_name() != b.layer_desc().layer_name() + } + (UploadOp::UploadLayer(u, _, _), UploadOp::Delete(d)) + | (UploadOp::Delete(d), UploadOp::UploadLayer(u, _, _)) => !d + .layers + .iter() + .map(|(name, _)| name) + .contains(&u.layer_desc().layer_name()), + + // Deletes are idempotent and can always bypass each other. + // TODO: verify this. + (UploadOp::Delete(_), UploadOp::Delete(_)) => true, + + // Uploads and deletes can bypass an index upload as long as the index and the index + // below it doesn't reference the file. A layer can't be modified or deleted while + // referenced by an index. + (UploadOp::UploadLayer(u, _, _), UploadOp::UploadMetadata { uploaded: i }) => { + let layer_name = u.layer_desc().layer_name(); + !i.layer_metadata.contains_key(&layer_name) + && !prev_index.layer_metadata.contains_key(&layer_name) + } + (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i }) => { + d.layers.iter().all(|(layer_name, _)| { + !i.layer_metadata.contains_key(layer_name) + && !prev_index.layer_metadata.contains_key(layer_name) + }) + } - UploadOp::Barrier(_) | UploadOp::Shutdown => { - upload_queue.inprogress_tasks.is_empty() - } - }; + // An index can bypass an upload or delete if the index doesn't contain the modified + // files. They can't be referenced by the previous index either, since it's not legal to + // modify a referenced file. + (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, _, _)) => { + let layer_name = u.layer_desc().layer_name(); + !i.layer_metadata.contains_key(&layer_name) + && !prev_index.layer_metadata.contains_key(&layer_name) + } + (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => { + d.layers.iter().all(|(layer_name, _)| { + !i.layer_metadata.contains_key(layer_name) + && !prev_index.layer_metadata.contains_key(layer_name) + }) + } - // If we cannot launch this task, don't look any further. + // Indexes can never bypass each other. + // TODO: they can coalesce though, consider this. + (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false, + } + } + + /// Returns and removes the next operation from the queue, if any. This isn't necessarily the + /// first task in the queue, to avoid head-of-line blocking. + /// + /// TODO: consider limiting the number of in-progress tasks. + fn next_queued_task(self: &Arc, queue: &mut UploadQueueInitialized) -> Option { + // For each queued operation, check if it can bypass the in-progress and queued operations + // ahead of it (if any). + // + // TODO: this loop is a bit ugly, rewrite the internals as an iterator chain. + // TODO: this is quadratic. Is that a problem? Consider optimizing. + 'OUTER: for (i, op) in queue.queued_operations.iter().enumerate() { + // Visit the preceding operations in reverse order (front to back), to keep track of + // prev_index. The order really doesn't matter, since we must be able to bypass all. // - // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch - // them now, but we don't try to do that currently. For example, if the frontmost task - // is an index-file upload that cannot proceed until preceding uploads have finished, we - // could still start layer uploads that were scheduled later. - if !can_run_now { - break; + // TODO: add assertions that an upload or delete is never referenced by the most recent + // index. + let in_progress = queue.inprogress_tasks.values().map(|task| &task.op); + let ahead = queue.queued_operations.iter().take(i).rev(); + let mut prev_index = &queue.clean.0; + for other in in_progress.chain(ahead) { + if !Self::can_bypass(op, other, prev_index) { + // Nothing can cross a barrier, so give up if we find one. + if matches!(op, UploadOp::Barrier(_) | UploadOp::Shutdown) { + return None; + } + continue 'OUTER; + } + if let UploadOp::UploadMetadata { ref uploaded } = other { + prev_index = uploaded; + } } - if let UploadOp::Shutdown = next_op { - // leave the op in the queue but do not start more tasks; it will be dropped when - // the stop is called. - upload_queue.shutdown_ready.close(); - break; - } + // We can bypass all preceding operations; go for it. + return queue.queued_operations.remove(i); + } + None + } - // We can launch this task. Remove it from the queue first. - let mut next_op = upload_queue.queued_operations.pop_front().unwrap(); + /// Pick next tasks from the queue, and start as many of them as possible without violating + /// the ordering constraints. + /// + /// The caller needs to already hold the `upload_queue` lock. + fn launch_queued_tasks(self: &Arc, upload_queue: &mut UploadQueueInitialized) { + // Check for a shutdown. Leave it in the queue, but don't start more tasks. It will be + // dropped on stop. + if let Some(UploadOp::Shutdown) = upload_queue.queued_operations.front() { + upload_queue.shutdown_ready.close(); + return; + } - debug!("starting op: {}", next_op); + while let Some(mut next_op) = self.next_queued_task(upload_queue) { + debug!("starting op: {next_op}"); // Update the counters and prepare match &mut next_op {