diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 5dcca31e419d..0ca77faa4d32 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1797,57 +1797,23 @@ impl RemoteTimelineClient { Ok(()) } - /// /// Pick next tasks from the queue, and start as many of them as possible without violating /// the ordering constraints. /// - /// The caller needs to already hold the `upload_queue` lock. + /// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does, + /// to avoid tenants starving other tenants. fn launch_queued_tasks(self: &Arc, upload_queue: &mut UploadQueueInitialized) { - while let Some(next_op) = upload_queue.queued_operations.front() { - // Can we run this task now? - let can_run_now = match next_op { - UploadOp::UploadLayer(..) => { - // Can always be scheduled. - true - } - UploadOp::UploadMetadata { .. } => { - // These can only be performed after all the preceding operations - // have finished. - upload_queue.inprogress_tasks.is_empty() - } - UploadOp::Delete(..) => { - // Wait for preceding uploads to finish. Concurrent deletions are OK, though. - upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() - } - - UploadOp::Barrier(_) | UploadOp::Shutdown => { - upload_queue.inprogress_tasks.is_empty() - } - }; - - // If we cannot launch this task, don't look any further. - // - // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch - // them now, but we don't try to do that currently. For example, if the frontmost task - // is an index-file upload that cannot proceed until preceding uploads have finished, we - // could still start layer uploads that were scheduled later. - if !can_run_now { - break; - } - - if let UploadOp::Shutdown = next_op { - // leave the op in the queue but do not start more tasks; it will be dropped when - // the stop is called. - upload_queue.shutdown_ready.close(); - break; - } - - // We can launch this task. Remove it from the queue first. - let mut next_op = upload_queue.queued_operations.pop_front().unwrap(); + // Check for a shutdown. Leave it in the queue, but don't start more tasks. It will be + // dropped on stop. + if let Some(UploadOp::Shutdown) = upload_queue.queued_operations.front() { + upload_queue.shutdown_ready.close(); + return; + } - debug!("starting op: {}", next_op); + while let Some(mut next_op) = upload_queue.next_ready() { + debug!("starting op: {next_op}"); - // Update the counters and prepare + // Prepare upload. match &mut next_op { UploadOp::UploadLayer(layer, meta, mode) => { if upload_queue @@ -1858,18 +1824,14 @@ impl RemoteTimelineClient { } else { *mode = Some(OpType::MayReorder) } - upload_queue.num_inprogress_layer_uploads += 1; - } - UploadOp::UploadMetadata { .. } => { - upload_queue.num_inprogress_metadata_uploads += 1; } + UploadOp::UploadMetadata { .. } => {} UploadOp::Delete(Delete { layers }) => { for (name, meta) in layers { upload_queue .recently_deleted .insert((name.clone(), meta.generation)); } - upload_queue.num_inprogress_deletions += 1; } UploadOp::Barrier(sender) => { sender.send_replace(()); @@ -1967,6 +1929,8 @@ impl RemoteTimelineClient { let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => { + // TODO: check if this mechanism can be removed now that can_bypass() performs + // conflict checks during scheduling. if let Some(OpType::FlushDeletion) = mode { if self.config.read().unwrap().block_deletions { // Of course, this is not efficient... but usually the queue should be empty. @@ -2189,13 +2153,8 @@ impl RemoteTimelineClient { upload_queue.inprogress_tasks.remove(&task.task_id); let lsn_update = match task.op { - UploadOp::UploadLayer(_, _, _) => { - upload_queue.num_inprogress_layer_uploads -= 1; - None - } + UploadOp::UploadLayer(_, _, _) => None, UploadOp::UploadMetadata { ref uploaded } => { - upload_queue.num_inprogress_metadata_uploads -= 1; - // the task id is reused as a monotonicity check for storing the "clean" // IndexPart. let last_updater = upload_queue.clean.1; @@ -2229,10 +2188,7 @@ impl RemoteTimelineClient { None } } - UploadOp::Delete(_) => { - upload_queue.num_inprogress_deletions -= 1; - None - } + UploadOp::Delete(_) => None, UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(), }; @@ -2356,9 +2312,6 @@ impl RemoteTimelineClient { visible_remote_consistent_lsn: initialized .visible_remote_consistent_lsn .clone(), - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, inprogress_tasks: HashMap::default(), queued_operations: VecDeque::default(), #[cfg(feature = "testing")] @@ -2385,14 +2338,6 @@ impl RemoteTimelineClient { } }; - // consistency check - assert_eq!( - qi.num_inprogress_layer_uploads - + qi.num_inprogress_metadata_uploads - + qi.num_inprogress_deletions, - qi.inprogress_tasks.len() - ); - // We don't need to do anything here for in-progress tasks. They will finish // on their own, decrement the unfinished-task counter themselves, and observe // that the queue is Stopped. @@ -2531,6 +2476,20 @@ pub fn remote_layer_path( RemotePath::from_string(&path).expect("Failed to construct path") } +/// Returns true if a and b have the same layer path within a tenant/timeline. +/// +/// TODO: there should be a variant of LayerName for the physical path that contains information +/// about the shard and generation, such that this could be replaced by a simple comparison. +/// Effectively remote_layer_path(a) == remote_layer_path(b) but without the string allocations. +pub fn is_same_layer_path( + aname: &LayerName, + ameta: &LayerFileMetadata, + bname: &LayerName, + bmeta: &LayerFileMetadata, +) -> bool { + aname == bname && ameta.shard == bmeta.shard && ameta.generation == bmeta.generation +} + pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath { RemotePath::from_string(&format!( "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}" @@ -2824,8 +2783,8 @@ mod tests { let mut guard = client.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut().unwrap(); assert!(upload_queue.queued_operations.is_empty()); - assert!(upload_queue.inprogress_tasks.len() == 2); - assert!(upload_queue.num_inprogress_layer_uploads == 2); + assert_eq!(upload_queue.inprogress_tasks.len(), 2); + assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2); // also check that `latest_file_changes` was updated assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2); @@ -2895,8 +2854,8 @@ mod tests { // Deletion schedules upload of the index file, and the file deletion itself assert_eq!(upload_queue.queued_operations.len(), 2); assert_eq!(upload_queue.inprogress_tasks.len(), 1); - assert_eq!(upload_queue.num_inprogress_layer_uploads, 1); - assert_eq!(upload_queue.num_inprogress_deletions, 0); + assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1); + assert_eq!(upload_queue.num_inprogress_deletions(), 0); assert_eq!( upload_queue.latest_files_changes_since_metadata_upload_scheduled, 0 diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index fe5af8825497..5d9c87483782 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -152,7 +152,7 @@ impl IndexPart { let Some(index_metadata) = self.layer_metadata.get(name) else { return false; }; - metadata.shard == index_metadata.shard && metadata.generation == index_metadata.generation + super::is_same_layer_path(name, metadata, name, index_metadata) } } diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index ef3aa759f303..a00aa20c636c 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -1,3 +1,5 @@ +use super::remote_timeline_client::is_same_layer_path; +use super::storage_layer::AsLayerDesc as _; use super::storage_layer::LayerName; use super::storage_layer::ResidentLayer; use crate::tenant::metadata::TimelineMetadata; @@ -70,11 +72,6 @@ pub(crate) struct UploadQueueInitialized { /// we skip validation) pub(crate) visible_remote_consistent_lsn: Arc, - // Breakdown of different kinds of tasks currently in-progress - pub(crate) num_inprogress_layer_uploads: usize, - pub(crate) num_inprogress_metadata_uploads: usize, - pub(crate) num_inprogress_deletions: usize, - /// Tasks that are currently in-progress. In-progress means that a tokio Task /// has been launched for it. An in-progress task can be busy uploading, but it can /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can @@ -122,6 +119,98 @@ impl UploadQueueInitialized { let lsn = self.clean.0.metadata.disk_consistent_lsn(); self.clean.1.map(|_| lsn) } + + /// Returns the number of in-progress deletion operations. + #[cfg(test)] + pub(crate) fn num_inprogress_deletions(&self) -> usize { + self.inprogress_tasks + .iter() + .filter(|(_, t)| matches!(t.op, UploadOp::Delete(_))) + .count() + } + + /// Returns the number of in-progress layer uploads. + #[cfg(test)] + pub(crate) fn num_inprogress_layer_uploads(&self) -> usize { + self.inprogress_tasks + .iter() + .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _))) + .count() + } + + /// Returns and removes the next ready operation from the queue, if any. This isn't necessarily + /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump + /// the queue if it doesn't conflict with operations ahead of it. + /// + /// None may be returned even if the queue isn't empty, if no operations are ready yet. + pub(crate) fn next_ready(&mut self) -> Option { + // TODO: this is quadratic. Is that a problem? Consider optimizing. + for (i, candidate) in self.queued_operations.iter().enumerate() { + // If this candidate is ready, go for it. Otherwise, try the next one. + if self.is_ready(i) { + return self.queued_operations.remove(i); + } + + // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up. + if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) { + return None; + } + } + None + } + + /// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if + /// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are + /// allowed to skip the queue when it's safe to do so, to increase parallelism. + /// + /// The position must be valid for the queue size. + fn is_ready(&self, pos: usize) -> bool { + let candidate = self.queued_operations.get(pos).expect("invalid position"); + self + // Look at in-progress operations, in random order. + .inprogress_tasks + .values() + .map(|task| &task.op) + // Then queued operations ahead of the candidate, front-to-back. + .chain(self.queued_operations.iter().take(pos)) + // Keep track of the active index ahead of each operation. This is used to ensure that + // an upload doesn't skip the queue too far, such that it modifies a layer that's + // referenced by the active index. + // + // It's okay that in-progress operations are emitted in random order above, since at + // most one of them can be an index upload (enforced by can_bypass). + .scan(&self.clean.0, |next_active_index, op| { + let active_index = *next_active_index; + if let UploadOp::UploadMetadata { ref uploaded } = op { + *next_active_index = uploaded; // stash index for next operation after this + } + Some((op, active_index)) + }) + // Check if the candidate can bypass all of them. + .all(|(op, active_index)| candidate.can_bypass(op, active_index)) + } + + /// Test helper that schedules all ready operations in inprogress_tasks, and returns references + /// to them. In particular, this ensures that any ready operations that conflict with + /// inprogress_tasks won't be returned yet. + /// + /// TODO: the corresponding logic should be moved from RemoteTimelineClient into UploadQueue, + /// such that we could use the regular code paths. + #[cfg(test)] + fn schedule_ready(&mut self) -> Vec> { + let mut tasks = Vec::new(); + while let Some(op) = self.next_ready() { + self.task_counter += 1; + let task = Arc::new(UploadTask { + task_id: self.task_counter, + op, + retries: 0.into(), + }); + self.inprogress_tasks.insert(task.task_id, task.clone()); + tasks.push(task); + } + tasks + } } #[derive(Clone, Copy)] @@ -185,9 +274,6 @@ impl UploadQueue { visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)), // what follows are boring default initializations task_counter: 0, - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), #[cfg(feature = "testing")] @@ -227,9 +313,6 @@ impl UploadQueue { ), // what follows are boring default initializations task_counter: 0, - num_inprogress_layer_uploads: 0, - num_inprogress_metadata_uploads: 0, - num_inprogress_deletions: 0, inprogress_tasks: HashMap::new(), queued_operations: VecDeque::new(), #[cfg(feature = "testing")] @@ -291,7 +374,7 @@ pub(crate) struct Delete { pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) enum UploadOp { /// Upload a layer file. The last field indicates the last operation for thie file. UploadLayer(ResidentLayer, LayerFileMetadata, Option), @@ -338,3 +421,117 @@ impl std::fmt::Display for UploadOp { } } } + +impl UploadOp { + /// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the + /// active index when other would be uploaded -- if we allow self to bypass other, this would + /// be the active index when self is uploaded. + pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool { + match (self, other) { + // Nothing can bypass a barrier or shutdown, and it can't bypass anything. + (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false, + (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false, + + // Uploads and deletes can bypass each other unless they're for the same file. + (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => { + let aname = &a.layer_desc().layer_name(); + let bname = &b.layer_desc().layer_name(); + !is_same_layer_path(aname, ameta, bname, bmeta) + } + (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d)) + | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => { + d.layers.iter().all(|(dname, dmeta)| { + !is_same_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta) + }) + } + + // Deletes are idempotent and can always bypass each other. + (UploadOp::Delete(_), UploadOp::Delete(_)) => true, + + // Uploads and deletes can bypass an index upload as long as neither the uploaded index + // nor the active index below it references the file. A layer can't be modified or + // deleted while referenced by an index. + // + // Similarly, index uploads can bypass uploads and deletes as long as neither the + // uploaded index nor the active index references the file (the latter would be + // incorrect use by the caller). + (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i }) + | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => { + let uname = u.layer_desc().layer_name(); + !i.references(&uname, umeta) && !index.references(&uname, umeta) + } + (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i }) + | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => { + d.layers.iter().all(|(dname, dmeta)| { + !i.references(dname, dmeta) && !index.references(dname, dmeta) + }) + } + + // Indexes can never bypass each other. + // TODO: we could coalesce them though, by only uploading the newest ready index. This + // is left for later, out of caution. + (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr as _; + use utils::shard::ShardIndex; + + /// Test helper to construct layer names and metadata. + fn make_layer(name: &str) -> anyhow::Result<(LayerName, LayerFileMetadata)> { + let Ok(name) = LayerName::from_str(name) else { + anyhow::bail!(name.to_string()); + }; + let metadata = LayerFileMetadata { + generation: Generation::Valid(1), + shard: ShardIndex::unsharded(), + file_size: 0, // dummy + }; + Ok((name, metadata)) + } + + /// Deletes can be scheduled concurrently, even if they're for the same file. + #[test] + fn delete_concurrent() -> anyhow::Result<()> { + let index = IndexPart::empty(TimelineMetadata::example()); + let mut queue = UploadQueue::Uninitialized; + let queue = queue.initialize_with_current_remote_index_part(&index)?; + + // Enqueue a bunch of deletes, some with conflicting names. + let ops = [ + UploadOp::Delete(Delete { layers: vec![ + make_layer("000000000000000000000000000000000000-800000000000000000000000000000000000__00000000016B59D8-00000000016B5A51")?, + make_layer("800000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51")?, + ] }), + UploadOp::Delete(Delete { layers: vec![ + make_layer("000000000000000000000000000000000000-800000000000000000000000000000000000__00000000016B59D8-00000000016B5A51")?, + ] }), + UploadOp::Delete(Delete { layers: vec![ + make_layer("800000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51")?, + ] }), + UploadOp::Delete(Delete { layers: vec![ + make_layer("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B5A51-0000000002000000")?, + ] }), + ]; + + queue.queued_operations.extend(ops.clone()); + + // Schedule all ready operations. Since deletes don't conflict, they're all scheduled. + // TODO: there should be a better way to compare operations. + let tasks = queue.schedule_ready(); + assert_eq!(tasks.len(), ops.len()); + for (task, op) in tasks.into_iter().zip(ops) { + match (&task.op, &op) { + (UploadOp::Delete(a), UploadOp::Delete(b)) => assert_eq!(a.layers, b.layers), + (a, b) => panic!("unexpected ops {a:?} and {b:?}"), + } + } + assert!(queue.queued_operations.is_empty()); + + Ok(()) + } +}