From 1fd52b2911f2dc0cea8ca5e5e9cb39bdc554529c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 16 Dec 2024 12:17:45 +0100 Subject: [PATCH] pageserver: upload flushed layers in parallel --- pageserver/src/tenant/timeline.rs | 220 ++++++++++++++++++++++-------- 1 file changed, 163 insertions(+), 57 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 87f5a0338252..d533fa95cf69 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -45,6 +45,7 @@ use storage_broker::BrokerClientChannel; use tokio::{ runtime::Handle, sync::{oneshot, watch}, + time::sleep, }; use tokio_util::sync::CancellationToken; use tracing::*; @@ -75,7 +76,7 @@ use crate::{ tenant::{ config::AttachmentMode, layer_map::{LayerMap, SearchResult}, - metadata::TimelineMetadata, + metadata::{MetadataUpdate, TimelineMetadata}, storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc}, }, walingest::WalLagCooldown, @@ -328,9 +329,9 @@ pub struct Timeline { /// the flush finishes. You can use that to wait for the flush to finish. /// - The LSN is updated to max() of its current value and the latest disk_consistent_lsn /// read by whoever sends an update - layer_flush_start_tx: tokio::sync::watch::Sender<(u64, Lsn)>, + layer_flush_start_tx: watch::Sender, /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel - layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>, + layer_flush_done_tx: watch::Sender, // Needed to ensure that we can't create a branch at a point that was already garbage collected pub latest_gc_cutoff_lsn: Rcu, @@ -660,6 +661,67 @@ impl From for CreateImageLayersError { } } +/// A layer flush request, which wakes the flush loop when changed. +#[derive(Copy, Clone)] +pub(crate) struct FlushRequest { + /// The sequence number of this flush request. Must be monotonically increasing. + seq: FlushRequestID, + /// The LSN to freeze and flush to disk. Later LSNs may also be flushed. + flush_lsn: Lsn, + /// The LSN to immediately upload indexes at. Otherwise, index uploads may be delayed by several + /// seconds to pipeline layer uploads. + upload_index_now_lsn: Lsn, +} + +pub(crate) type FlushRequestID = u64; + +impl FlushRequest { + /// Creates a new, empty flush request. + pub(crate) fn new(disk_consistent_lsn: Lsn) -> Self { + Self { + seq: 0, + flush_lsn: disk_consistent_lsn, + upload_index_now_lsn: Lsn(0), + } + } + + /// Bumps the flush request to the given flush LSN, and increases the sequence number. If + /// upload_index_now is true, an index upload is scheduled immediately after the flush + /// (otherwise it is deferred to pipeline layer uploads). Returns the sequence number. + pub(crate) fn bump(&mut self, flush_lsn: Lsn, upload_index_now: bool) -> FlushRequestID { + self.seq += 1; + self.flush_lsn = std::cmp::max(self.flush_lsn, flush_lsn); + if upload_index_now { + self.upload_index_now_lsn = std::cmp::max(self.upload_index_now_lsn, flush_lsn); + } + self.seq + } +} + +/// A layer flush response. +pub(crate) struct FlushResponse { + /// The latest request sequence number that has been flushed. + seq: FlushRequestID, + /// The last flush result, containing the flush LSN. An error may be replaced by a later + /// success. This is fine, as long as we're making progress. + result: Result, +} + +impl FlushResponse { + /// Creates a new flush response. + pub(crate) fn new(seq: FlushRequestID, result: Result) -> Self { + Self { seq, result } + } + + /// Creates an empty flush response. + pub(crate) fn empty() -> Self { + Self { + seq: 0, + result: Ok(Lsn(0)), + } + } +} + #[derive(thiserror::Error, Debug, Clone)] pub(crate) enum FlushLayerError { /// Timeline cancellation token was cancelled @@ -1521,7 +1583,7 @@ impl Timeline { pub(crate) async fn freeze0(&self) -> Result { let mut g = self.write_lock.lock().await; let to_lsn = self.get_last_record_lsn(); - self.freeze_inmem_layer_at(to_lsn, &mut g).await + self.freeze_inmem_layer_at(to_lsn, true, &mut g).await } // This exists to provide a non-span creating version of `freeze_and_flush` we can call without @@ -1586,7 +1648,7 @@ impl Timeline { // still hold the write_guard. let _ = async { let token = self - .freeze_inmem_layer_at(last_record_lsn, &mut write_guard) + .freeze_inmem_layer_at(last_record_lsn, true, &mut write_guard) .await?; self.wait_flush_completion(token).await } @@ -1640,7 +1702,7 @@ impl Timeline { // Upgrade to a write lock and freeze the layer drop(layers_guard); let res = self - .freeze_inmem_layer_at(current_lsn, &mut write_guard) + .freeze_inmem_layer_at(current_lsn, false, &mut write_guard) .await; if let Err(e) = res { @@ -2298,8 +2360,8 @@ impl Timeline { let disk_consistent_lsn = metadata.disk_consistent_lsn(); let (state, _) = watch::channel(state); - let (layer_flush_start_tx, _) = tokio::sync::watch::channel((0, disk_consistent_lsn)); - let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(()))); + let (layer_flush_start_tx, _) = watch::channel(FlushRequest::new(disk_consistent_lsn)); + let (layer_flush_done_tx, _) = watch::channel(FlushResponse::empty()); let evictions_low_residence_duration_metric_threshold = { let loaded_tenant_conf = tenant_conf.load(); @@ -3573,6 +3635,7 @@ impl Timeline { async fn freeze_inmem_layer_at( &self, at: Lsn, + upload_index_now: bool, write_lock: &mut tokio::sync::MutexGuard<'_, Option>, ) -> Result { let frozen = { @@ -3599,11 +3662,8 @@ impl Timeline { return Err(FlushLayerError::NotRunning(flush_loop_state)); } - self.layer_flush_start_tx.send_modify(|(counter, lsn)| { - my_flush_request = *counter + 1; - *counter = my_flush_request; - *lsn = std::cmp::max(at, *lsn); - }); + self.layer_flush_start_tx + .send_modify(|req| my_flush_request = req.bump(at, upload_index_now)); assert_ne!(my_flush_request, 0); @@ -3613,27 +3673,51 @@ impl Timeline { /// Layer flusher task's main loop. async fn flush_loop( self: &Arc, - mut layer_flush_start_rx: tokio::sync::watch::Receiver<(u64, Lsn)>, + mut layer_flush_start_rx: watch::Receiver, ctx: &RequestContext, ) { + /// To parallelize uploads, we only schedule metadata index uploads every few layers, or + /// when no flushes have happened in a while. Metadata uploads act as upload queue barriers. + const UPLOAD_METADATA_LAYERS: usize = 8; + const UPLOAD_METADATA_DELAY: Duration = Duration::from_secs(1); + let mut layers_since_metadata_upload = 0; + + let mut flushed_to_lsn = Lsn(0); + let mut uploaded_metadata_to_lsn = Lsn(0); + info!("started flush loop"); loop { tokio::select! { _ = self.cancel.cancelled() => { info!("shutting down layer flush task due to Timeline::cancel"); + // TODO: is this necessary? Upload queue may be shutting down too. + if layers_since_metadata_upload > 0 { + _ = self.schedule_metadata_upload(flushed_to_lsn); + } break; }, _ = layer_flush_start_rx.changed() => {} + _ = sleep(UPLOAD_METADATA_DELAY), if layers_since_metadata_upload > 0 => { + if let Err(err) = self.schedule_metadata_upload(flushed_to_lsn) { + error!("failed to schedule index upload: {err}"); + } else { + uploaded_metadata_to_lsn = flushed_to_lsn; + layers_since_metadata_upload = 0; + } + continue; + } } trace!("waking up"); - let (flush_counter, frozen_to_lsn) = *layer_flush_start_rx.borrow(); + let req = *layer_flush_start_rx.borrow(); // The highest LSN to which we flushed in the loop over frozen layers - let mut flushed_to_lsn = Lsn(0); - let result = loop { + let mut result = loop { if self.cancel.is_cancelled() { info!("dropping out of flush loop for timeline shutdown"); + if layers_since_metadata_upload > 0 { + _ = self.schedule_metadata_upload(self.disk_consistent_lsn.load()); + } // Note: we do not bother transmitting into [`layer_flush_done_tx`], because // anyone waiting on that will respect self.cancel as well: they will stop // waiting at the same time we as drop out of this loop. @@ -3660,7 +3744,7 @@ impl Timeline { // drop 'layers' lock to allow concurrent reads and writes }; let Some(layer_to_flush) = layer_to_flush else { - break Ok(()); + break Ok(flushed_to_lsn); }; if num_frozen_layers > std::cmp::max( @@ -3673,6 +3757,9 @@ impl Timeline { "too many frozen layers: {num_frozen_layers} layers with estimated in-mem size of {frozen_layer_total_size} bytes", ); } + + // Flush the layer and schedule it for upload, but don't update the index yet. The + // index update acts as an upload queue barrier and would prevent parallel uploads. match self.flush_frozen_layer(layer_to_flush, ctx).await { Ok(this_layer_to_lsn) => { flushed_to_lsn = std::cmp::max(flushed_to_lsn, this_layer_to_lsn); @@ -3681,16 +3768,25 @@ impl Timeline { info!("dropping out of flush loop for timeline shutdown"); return; } - err @ Err( - FlushLayerError::NotRunning(_) - | FlushLayerError::Other(_) - | FlushLayerError::CreateImageLayersError(_), + Err( + err @ FlushLayerError::NotRunning(_) + | err @ FlushLayerError::Other(_) + | err @ FlushLayerError::CreateImageLayersError(_), ) => { - error!("could not flush frozen layer: {err:?}"); - break err.map(|_| ()); + error!("could not flush frozen layer: {err}"); + break Err(err); } } timer.stop_and_record(); + + layers_since_metadata_upload += 1; + if layers_since_metadata_upload >= UPLOAD_METADATA_LAYERS { + if let Err(err) = self.schedule_metadata_upload(flushed_to_lsn) { + break Err(FlushLayerError::from_anyhow(self, err)); + } + layers_since_metadata_upload = 0; + uploaded_metadata_to_lsn = flushed_to_lsn; + } }; // Unsharded tenants should never advance their LSN beyond the end of the @@ -3698,29 +3794,38 @@ impl Timeline { // are only legal on sharded tenants. debug_assert!( self.shard_identity.count.count() > 1 - || flushed_to_lsn >= frozen_to_lsn + || flushed_to_lsn >= req.flush_lsn || !flushed_to_lsn.is_valid() ); - if flushed_to_lsn < frozen_to_lsn && self.shard_identity.count.count() > 1 { + if flushed_to_lsn < req.flush_lsn && self.shard_identity.count.count() > 1 { // If our layer flushes didn't carry disk_consistent_lsn up to the `to_lsn` advertised // to us via layer_flush_start_rx, then advance it here. // // This path is only taken for tenants with multiple shards: single sharded tenants should // never encounter a gap in the wal. - let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); - tracing::debug!("Advancing disk_consistent_lsn across layer gap {old_disk_consistent_lsn}->{frozen_to_lsn}"); - if self.set_disk_consistent_lsn(frozen_to_lsn) { - if let Err(e) = self.schedule_uploads(frozen_to_lsn, vec![]) { - tracing::warn!("Failed to schedule metadata upload after updating disk_consistent_lsn: {e}"); - } + debug!( + "advancing disk_consistent_lsn across layer gap {}->{}", + self.disk_consistent_lsn.load(), + req.flush_lsn + ); + self.set_disk_consistent_lsn(req.flush_lsn); + flushed_to_lsn = req.flush_lsn; + } + + // If requested, schedule an immediate metadata upload. + if result.is_ok() && req.upload_index_now_lsn >= uploaded_metadata_to_lsn { + if let Err(err) = self.schedule_metadata_upload(flushed_to_lsn) { + result = Err(FlushLayerError::from_anyhow(self, err)); + } else { + uploaded_metadata_to_lsn = flushed_to_lsn; } } - // Notify any listeners that we're done + // Notify any listeners that we're done. let _ = self .layer_flush_done_tx - .send_replace((flush_counter, result)); + .send_replace(FlushResponse::new(req.seq, result)); } } @@ -3729,16 +3834,9 @@ impl Timeline { let mut rx = self.layer_flush_done_tx.subscribe(); loop { { - let (last_result_counter, last_result) = &*rx.borrow(); - if *last_result_counter >= request { - if let Err(err) = last_result { - // We already logged the original error in - // flush_loop. We cannot propagate it to the caller - // here, because it might not be Cloneable - return Err(err.clone()); - } else { - return Ok(()); - } + let resp = &*rx.borrow(); + if resp.seq >= request { + return resp.result.clone().map(|_| ()); } } trace!("waiting for flush to complete"); @@ -3757,7 +3855,10 @@ impl Timeline { } } - /// Flush one frozen in-memory layer to disk, as a new delta layer. + /// Flush one frozen in-memory layer to disk, as a new delta layer, and schedule + /// it for upload. + /// + /// NB: this does not schedule a metadata index update, use schedule_metadata_upload(). /// /// Return value is the last lsn (inclusive) of the layer that was frozen. #[instrument(skip_all, fields(layer=%frozen_layer))] @@ -3886,8 +3987,8 @@ impl Timeline { ); if self.set_disk_consistent_lsn(disk_consistent_lsn) { - // Schedule remote uploads that will reflect our new disk_consistent_lsn - self.schedule_uploads(disk_consistent_lsn, layers_to_upload) + // Schedule remote uploads that will reflect our new disk_consistent_lsn. + self.schedule_layer_upload(layers_to_upload) .map_err(|e| FlushLayerError::from_anyhow(self, e))?; } // release lock on 'layers' @@ -3919,12 +4020,20 @@ impl Timeline { new_value != old_value } - /// Update metadata file - fn schedule_uploads( + /// Schedule upload of layer files. Note that this does not upload a new index, + /// see schedule_metadata_upload(). + fn schedule_layer_upload( &self, - disk_consistent_lsn: Lsn, - layers_to_upload: impl IntoIterator, + layers: impl IntoIterator, ) -> anyhow::Result<()> { + for layer in layers { + self.remote_client.schedule_layer_file_upload(layer)?; + } + Ok(()) + } + + /// Schedule upload of an index file based on a partial metadata update. + fn schedule_metadata_upload(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { // We can only save a valid 'prev_record_lsn' value on disk if we // flushed *all* in-memory changes to disk. We only track // 'prev_record_lsn' in memory for the latest processed record, so we @@ -3941,7 +4050,7 @@ impl Timeline { None }; - let update = crate::tenant::metadata::MetadataUpdate::new( + let update = MetadataUpdate::new( disk_consistent_lsn, ondisk_prev_record_lsn, *self.latest_gc_cutoff_lsn.read(), @@ -3952,9 +4061,6 @@ impl Timeline { x.unwrap() )); - for layer in layers_to_upload { - self.remote_client.schedule_layer_file_upload(layer)?; - } self.remote_client .schedule_index_upload_for_metadata_update(&update)?; @@ -5304,7 +5410,7 @@ impl Timeline { // This unconditionally schedules also an index_part.json update, even though, we will // be doing one a bit later with the unlinked gc'd layers. let disk_consistent_lsn = self.disk_consistent_lsn.load(); - self.schedule_uploads(disk_consistent_lsn, None) + self.schedule_metadata_upload(disk_consistent_lsn) .map_err(|e| { if self.cancel.is_cancelled() { GcError::TimelineCancelled @@ -5886,7 +5992,7 @@ impl TimelineWriter<'_> { // self.write_guard will be taken by the freezing self.tl - .freeze_inmem_layer_at(freeze_at, &mut self.write_guard) + .freeze_inmem_layer_at(freeze_at, false, &mut self.write_guard) .await?; assert!(self.write_guard.is_none());