diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 31e8cd378590..1eb20a363368 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3780,7 +3780,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut guard = self.layers.write().await; + let guard = self.layers.modify().await; let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction. @@ -3844,7 +3844,7 @@ impl Timeline { } guard - .finish_compact_l0( + .finish_compact_l0_consume_guard( layer_removal_cs, remove_layers, insert_layers, @@ -3852,8 +3852,6 @@ impl Timeline { ) .await?; - drop_wlock(guard); - // Also schedule the deletions in remote storage if let Some(remote_client) = &self.remote_client { remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 6999678baeff..f7ce74fc530f 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -1,8 +1,9 @@ use anyhow::{bail, ensure, Context, Result}; use arc_swap::ArcSwap; use dashmap::DashMap; +use either::Either; use std::sync::Arc; -use tracing::trace; +use tracing::{log::warn, trace}; use utils::{ id::{TenantId, TimelineId}, lsn::{AtomicLsn, Lsn}, @@ -55,7 +56,8 @@ pub struct LayerManagerWriteGuard { layer_manager: Arc, /// Mock the behavior of the layer map lock. #[allow(dead_code)] - pseudo_lock: tokio::sync::OwnedRwLockWriteGuard<()>, + pseudo_lock: + Either, tokio::sync::OwnedRwLockReadGuard<()>>, } impl LayerManager { @@ -92,7 +94,22 @@ impl LayerManager { layer_map: self.layer_map.load_full(), layer_fmgr: Arc::clone(&self.layer_fmgr), }, - pseudo_lock, + pseudo_lock: Either::Left(pseudo_lock), + layer_manager: self.clone(), + } + } + + /// Take the snapshot of the layer map and return a write guard. With the `modify` call, the guard + /// will only hold a read lock instead of write lock. + pub async fn modify(self: &Arc) -> LayerManagerWriteGuard { + // take the lock before taking snapshot + let pseudo_lock = self.pseudo_lock.clone().read_owned().await; + LayerManagerWriteGuard { + snapshot: LayerSnapshot { + layer_map: self.layer_map.load_full(), + layer_fmgr: Arc::clone(&self.layer_fmgr), + }, + pseudo_lock: Either::Right(pseudo_lock), layer_manager: self.clone(), } } @@ -108,7 +125,7 @@ impl LayerManager { layer_map: self.layer_map.load_full(), layer_fmgr: Arc::clone(&self.layer_fmgr), }, - pseudo_lock, + pseudo_lock: Either::Left(pseudo_lock), layer_manager: self.clone(), }) } @@ -207,6 +224,11 @@ impl LayerManagerWriteGuard { on_disk_layers: Vec>, next_open_layer_at: Lsn, ) { + assert!( + self.pseudo_lock.is_left(), + "should use `write` guard for this function." + ); + self.layer_manager .initialize_update(|mut layer_map| { let mut updates = layer_map.batch_update(); @@ -235,6 +257,11 @@ impl LayerManagerWriteGuard { corrupted_local_layers: Vec>, remote_layers: Vec>, ) { + assert!( + self.pseudo_lock.is_left(), + "should use `write` guard for this function." + ); + self.layer_manager .initialize_update(|mut layer_map| { let mut updates = layer_map.batch_update(); @@ -261,6 +288,11 @@ impl LayerManagerWriteGuard { timeline_id: TimelineId, tenant_id: TenantId, ) -> Result> { + assert!( + self.pseudo_lock.is_left(), + "should use `write` guard for this function." + ); + ensure!(lsn.is_aligned()); ensure!( @@ -317,6 +349,11 @@ impl LayerManagerWriteGuard { Lsn(last_record_lsn): Lsn, last_freeze_at: &AtomicLsn, ) { + assert!( + self.pseudo_lock.is_left(), + "should use `write` guard for this function." + ); + let end_lsn = Lsn(last_record_lsn + 1); if let Some(open_layer) = &self.snapshot.layer_map.open_layer { @@ -342,6 +379,10 @@ impl LayerManagerWriteGuard { /// Add image layers to the layer map, called from `create_image_layers`. pub(crate) async fn track_new_image_layers(&mut self, image_layers: Vec) { + assert!( + self.pseudo_lock.is_left(), + "should use `write` guard for this function." + ); self.layer_manager .update(|mut layer_map| { let mut updates: BatchedUpdates<'_> = layer_map.batch_update(); @@ -365,6 +406,10 @@ impl LayerManagerWriteGuard { delta_layer: Option, frozen_layer_for_check: &Arc, ) { + assert!( + self.pseudo_lock.is_left(), + "should use `write` guard for this function." + ); self.layer_manager .update(|mut layer_map| { let l = layer_map.frozen_layers.pop_front(); @@ -390,42 +435,53 @@ impl LayerManagerWriteGuard { } /// Called when compaction is completed. - pub(crate) async fn finish_compact_l0( - &mut self, - layer_removal_cs: Arc>, + pub(crate) async fn finish_compact_l0_consume_guard( + self, + _layer_removal_cs: Arc>, compact_from: Vec>, compact_to: Vec>, metrics: &TimelineMetrics, ) -> Result<()> { - self.layer_manager + assert!( + self.pseudo_lock.is_right(), + "should use `modify` guard for this function." + ); + let compact_from = self + .layer_manager .update(|mut layer_map| { let mut updates = layer_map.batch_update(); for l in compact_to { Self::insert_historic_layer(l, &mut updates, &self.snapshot.layer_fmgr); } - for l in compact_from { - // NB: the layer file identified by descriptor `l` is guaranteed to be present - // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire - // time, even though we dropped `Timeline::layers` inbetween. - if let Err(e) = Self::delete_historic_layer( - layer_removal_cs.clone(), - l, - &mut updates, - metrics, - &self.snapshot.layer_fmgr, - ) { - // If this fails, we will need to return the "partially" modified layer map - // now. Eventually, we should decouple file deletion and layer map updates, so - // that this part can be moved out of the `update` section. - updates.flush(); - return Ok((layer_map, Err(e))); - } + for l in &compact_from { + // only remove from the layer map, not from file manager + updates.remove_historic(l.layer_desc().clone()); } updates.flush(); - Ok((layer_map, Ok(()))) + Ok((layer_map, Ok::<_, anyhow::Error>(compact_from))) }) - .await - .unwrap() // unwrap the first level error, which is always Ok. + .await??; + drop(self.pseudo_lock); + // acquire the write lock so that all read threads are blocked, and once this lock is acquired, + // new reads will be based on the updated layer map. + let guard = self.layer_manager.pseudo_lock.write().await; + drop(guard); + // now that no one has access to the old layer map, we can safely remove the layers from disk. + for layer in compact_from { + self.snapshot.layer_fmgr.remove(layer.clone()); + // NB: the layer file identified by descriptor `l` is guaranteed to be present + // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire + // time, even though we dropped `Timeline::layers` inbetween. + if !layer.is_remote_layer() { + if let Err(e) = layer.delete_resident_layer_file() { + warn!("Failed to delete resident layer file: {}", e); + } else { + let layer_file_size = layer.file_size(); + metrics.resident_physical_size_gauge.sub(layer_file_size); + } + } + } + Ok(()) } /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map. @@ -435,6 +491,10 @@ impl LayerManagerWriteGuard { gc_layers: Vec>, metrics: &TimelineMetrics, ) -> Result<()> { + assert!( + self.pseudo_lock.is_left(), + "should use `write` guard for this function." + ); self.layer_manager .update(|mut layer_map| { let mut updates = layer_map.batch_update();