diff --git a/Cargo.lock b/Cargo.lock index 3c862241a49d..4a6d52b8c1a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,12 @@ dependencies = [ "backtrace", ] +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "archery" version = "0.5.0" @@ -1185,15 +1191,15 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.4.0" +version = "5.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" dependencies = [ "cfg-if", - "hashbrown 0.12.3", + "hashbrown 0.14.0", "lock_api", "once_cell", - "parking_lot_core 0.9.7", + "parking_lot_core 0.9.8", ] [[package]] @@ -1642,6 +1648,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" + [[package]] name = "hashlink" version = "0.8.2" @@ -2057,9 +2069,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "lock_api" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" dependencies = [ "autocfg", "scopeguard", @@ -2323,9 +2335,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.17.1" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "oorandom" @@ -2515,6 +2527,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "async-compression", "async-stream", "async-trait", @@ -2528,6 +2541,7 @@ dependencies = [ "crc32c", "criterion", "crossbeam-utils", + "dashmap", "either", "enum-map", "enumset", @@ -2622,7 +2636,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.7", + "parking_lot_core 0.9.8", ] [[package]] @@ -2641,15 +2655,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.7" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.2.16", + "redox_syscall 0.3.5", "smallvec", - "windows-sys 0.45.0", + "windows-targets 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 44d49d95e8b1..64e0c1a22fa4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ license = "Apache-2.0" ## All dependency versions, used in the project [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } +arc-swap = "1.6" async-compression = { version = "0.4.0", features = ["tokio", "gzip"] } flate2 = "1.0.26" async-stream = "0.3" @@ -54,6 +55,7 @@ comfy-table = "6.1" const_format = "0.2" crc32c = "0.6" crossbeam-utils = "0.8.5" +dashmap = "5.5" either = "1.8" enum-map = "2.4.2" enumset = "1.0.12" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 27e90ea97ddb..3b42a9406486 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,6 +12,7 @@ testing = ["fail/failpoints"] [dependencies] anyhow.workspace = true +arc-swap.workspace = true async-compression.workspace = true async-stream.workspace = true async-trait.workspace = true @@ -24,6 +25,7 @@ const_format.workspace = true consumption_metrics.workspace = true crc32c.workspace = true crossbeam-utils.workspace = true +dashmap.workspace = true either.workspace = true flate2.workspace = true fail.workspace = true diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 2908d3a83cfe..2e3cfaab9a3f 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -65,7 +65,7 @@ use super::storage_layer::PersistentLayerDesc; /// /// LayerMap tracks what layers exist on a timeline. /// -#[derive(Default)] +#[derive(Default, Clone)] pub struct LayerMap { // // 'open_layer' holds the current InMemoryLayer that is accepting new @@ -727,7 +727,7 @@ mod tests { // and can remove it in the future. let _map = LayerMap::default(); - let mut mapping = TestLayerFileManager::new(); + let mapping = TestLayerFileManager::new(); mapping .replace_and_verify(not_found, new_version) @@ -742,7 +742,7 @@ mod tests { let downloaded = Arc::new(LayerObject::new(skeleton)); let mut map = LayerMap::default(); - let mut mapping = LayerFileManager::new(); + let mapping = LayerFileManager::new(); // two disjoint Arcs in different lifecycle phases. even if it seems they must be the // same layer, we use LayerMap::compare_arced_layers as the identity of layers. diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index 347490c1ba1c..5fea1b8d69ed 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -60,6 +60,7 @@ impl From<&PersistentLayerDesc> for LayerKey { /// Allows answering layer map queries very efficiently, /// but doesn't allow retroactive insertion, which is /// sometimes necessary. See BufferedHistoricLayerCoverage. +#[derive(Clone)] pub struct HistoricLayerCoverage { /// The latest state head: LayerCoverageTuple, @@ -412,6 +413,7 @@ fn test_persistent_overlapping() { /// /// See this for more on persistent and retroactive techniques: /// +#[derive(Clone)] pub struct BufferedHistoricLayerCoverage { /// A persistent layer map that we rebuild when we need to retroactively update historic_coverage: HistoricLayerCoverage, diff --git a/pageserver/src/tenant/layer_map/layer_coverage.rs b/pageserver/src/tenant/layer_map/layer_coverage.rs index 1d9101d3d16b..75bb88d60c9e 100644 --- a/pageserver/src/tenant/layer_map/layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/layer_coverage.rs @@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync; /// /// NOTE The struct is parameterized over Value for easier /// testing, but in practice it's some sort of layer. +#[derive(Clone)] pub struct LayerCoverage { /// For every change in coverage (as we sweep the key space) /// we store (lsn.end, value). @@ -138,6 +139,7 @@ impl LayerCoverage { } /// Image and delta coverage at a specific LSN. +#[derive(Clone)] pub struct LayerCoverageTuple { pub image_coverage: LayerCoverage, pub delta_coverage: LayerCoverage, diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index c6d1a0052a37..53da60683556 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -41,7 +41,7 @@ pub use inmemory_layer::InMemoryLayer; pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use remote_layer::RemoteLayer; -use super::timeline::layer_manager::LayerManager; +use super::timeline::layer_manager::LayerManagerWriteGuard; pub fn range_overlaps(a: &Range, b: &Range) -> bool where @@ -176,7 +176,7 @@ impl LayerAccessStats { /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad /// [`record_residence_event`]: Self::record_residence_event pub(crate) fn for_loading_layer( - layer_map_lock_held_witness: &LayerManager, + layer_map_lock_held_witness: &LayerManagerWriteGuard, status: LayerResidenceStatus, ) -> Self { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); @@ -197,7 +197,7 @@ impl LayerAccessStats { /// [`record_residence_event`]: Self::record_residence_event pub(crate) fn clone_for_residence_change( &self, - layer_map_lock_held_witness: &LayerManager, + layer_map_lock_held_witness: &LayerManagerWriteGuard, new_status: LayerResidenceStatus, ) -> LayerAccessStats { let clone = { @@ -229,7 +229,7 @@ impl LayerAccessStats { /// pub(crate) fn record_residence_event( &self, - _layer_map_lock_held_witness: &LayerManager, + _layer_map_lock_held_witness: &LayerManagerWriteGuard, status: LayerResidenceStatus, reason: LayerResidenceEventReason, ) { diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index d3c40d93bb8c..117cbdc30989 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -6,7 +6,7 @@ use crate::context::RequestContext; use crate::repository::Key; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; -use crate::tenant::timeline::layer_manager::LayerManager; +use crate::tenant::timeline::layer_manager::LayerManagerWriteGuard; use anyhow::{bail, Result}; use pageserver_api::models::HistoricLayerInfo; use std::ops::Range; @@ -226,7 +226,7 @@ impl RemoteLayer { /// Create a Layer struct representing this layer, after it has been downloaded. pub fn create_downloaded_layer( &self, - layer_map_lock_held_witness: &LayerManager, + layer_map_lock_held_witness: &LayerManagerWriteGuard, conf: &'static PageServerConf, file_size: u64, ) -> Arc { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c6ceae500b54..31e8cd378590 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -81,7 +81,7 @@ use crate::{is_temporary, task_mgr}; pub(super) use self::eviction_task::EvictionTaskTenantState; use self::eviction_task::EvictionTaskTimelineState; -use self::layer_manager::LayerManager; +use self::layer_manager::{LayerManager, LayerManagerReadGuard, LayerManagerWriteGuard}; use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; @@ -125,13 +125,13 @@ impl PartialOrd for Hole { /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. /// Can be removed after all refactors are done. -fn drop_rlock(rlock: tokio::sync::OwnedRwLockReadGuard) { +fn drop_rlock(rlock: LayerManagerReadGuard) { drop(rlock) } /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. /// Can be removed after all refactors are done. -fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { +fn drop_wlock(rlock: LayerManagerWriteGuard) { drop(rlock) } pub struct Timeline { @@ -162,7 +162,7 @@ pub struct Timeline { /// /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`, /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`. - pub(crate) layers: Arc>, + pub(crate) layers: Arc, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -1109,7 +1109,7 @@ impl Timeline { &self, _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, local_layer: &Arc, - layer_mgr: &mut LayerManager, + layer_mgr: &mut LayerManagerWriteGuard, ) -> Result<(), EvictionError> { if local_layer.is_remote_layer() { return Err(EvictionError::CannotEvictRemoteLayer); @@ -1349,7 +1349,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())), + layers: Arc::new(LayerManager::create()), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -2561,13 +2561,15 @@ impl Timeline { /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { let mut guard = self.layers.write().await; - let layer = guard.get_layer_for_write( - lsn, - self.get_last_record_lsn(), - self.conf, - self.timeline_id, - self.tenant_id, - )?; + let layer = guard + .get_layer_for_write( + lsn, + self.get_last_record_lsn(), + self.conf, + self.timeline_id, + self.tenant_id, + ) + .await?; Ok(layer) } @@ -2600,7 +2602,9 @@ impl Timeline { Some(self.write_lock.lock().await) }; let mut guard = self.layers.write().await; - guard.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at); + guard + .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at) + .await; } /// Layer flusher task's main loop. @@ -2772,7 +2776,9 @@ impl Timeline { self.metrics.persistent_bytes_written.inc_by(sz); } - guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer); + guard + .finish_flush_l0_layer(delta_layer_to_add, &frozen_layer) + .await; // release lock on 'layers' } @@ -3151,7 +3157,7 @@ impl Timeline { LayerResidenceEventReason::LayerCreate, ); } - guard.track_new_image_layers(image_layers); + guard.track_new_image_layers(image_layers).await; drop_wlock(guard); timer.stop_and_record(); @@ -3315,7 +3321,7 @@ impl Timeline { fn compact_level0_phase1( self: Arc, _layer_removal_cs: Arc>, - guard: tokio::sync::OwnedRwLockReadGuard, + guard: LayerManagerReadGuard, mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, ctx: &RequestContext, @@ -3739,7 +3745,7 @@ impl Timeline { }; let begin = tokio::time::Instant::now(); - let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await; + let phase1_layers_locked = self.layers.read().await; let now = tokio::time::Instant::now(); stats.read_lock_acquisition_micros = DurationRecorder::Recorded(RecordedDuration(now - begin), now); @@ -3837,12 +3843,14 @@ impl Timeline { remove_layers.push(guard.get_from_desc(&ldesc)); } - guard.finish_compact_l0( - layer_removal_cs, - remove_layers, - insert_layers, - &self.metrics, - )?; + guard + .finish_compact_l0( + layer_removal_cs, + remove_layers, + insert_layers, + &self.metrics, + ) + .await?; drop_wlock(guard); @@ -4175,7 +4183,9 @@ impl Timeline { layer_names_to_delete.push(doomed_layer.filename()); result.layers_removed += 1; } - let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?; + guard + .finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics) + .await?; if result.layers_removed != 0 { fail_point!("after-timeline-gc-removed-layers"); @@ -4184,8 +4194,6 @@ impl Timeline { if let Some(remote_client) = &self.remote_client { remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; } - - apply.flush(); } info!( diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index f6f0d533d101..828e202912d6 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -1,5 +1,7 @@ use anyhow::{bail, ensure, Context, Result}; -use std::{collections::HashMap, sync::Arc}; +use arc_swap::ArcSwap; +use dashmap::DashMap; +use std::sync::Arc; use tracing::trace; use utils::{ id::{TenantId, TimelineId}, @@ -21,94 +23,235 @@ use crate::{ /// Provides semantic APIs to manipulate the layer map. pub struct LayerManager { - layer_map: LayerMap, - layer_fmgr: LayerFileManager, + /// The layer map that tracks all layers in the timeline at the current time. + layer_map: ArcSwap, + /// Ensure there is only one thread that can modify the layer map at a time. + state_lock: Arc>, + /// Layer file manager that manages the layer files in the local / remote file system. + layer_fmgr: Arc, + /// The lock to mock the original behavior of `RwLock`. Can be removed if + /// #4509 is implemented. + pseudo_lock: Arc>, } -/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after -/// scheduling deletes in remote client. -pub struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>); +struct LayerSnapshot { + /// The current snapshot of the layer map. Immutable. + layer_map: Arc, + /// Reference to the file manager. This is mutable and the content might change when + /// the snapshot is held. + layer_fmgr: Arc, +} -impl ApplyGcResultGuard<'_> { - pub fn flush(self) { - self.0.flush(); - } +pub struct LayerManagerReadGuard { + snapshot: LayerSnapshot, + /// Mock the behavior of the layer map lock. + _pseudo_lock: tokio::sync::OwnedRwLockReadGuard<()>, +} + +pub struct LayerManagerWriteGuard { + snapshot: LayerSnapshot, + /// Semantic layer operations will need to modify the layer content. + layer_manager: Arc, + /// Mock the behavior of the layer map lock. + _pseudo_lock: tokio::sync::OwnedRwLockWriteGuard<()>, } impl LayerManager { pub fn create() -> Self { Self { - layer_map: LayerMap::default(), - layer_fmgr: LayerFileManager::new(), + layer_map: ArcSwap::from(Arc::new(LayerMap::default())), + state_lock: Arc::new(tokio::sync::Mutex::new(())), + layer_fmgr: Arc::new(LayerFileManager::new()), + pseudo_lock: Arc::new(tokio::sync::RwLock::new(())), } } - pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + /// Take the snapshot of the layer map and return a read guard. The read guard will prevent + /// the layer map from being modified. + pub async fn read(&self) -> LayerManagerReadGuard { + // take the lock before taking snapshot + let pseudo_lock = self.pseudo_lock.clone().read_owned().await; + LayerManagerReadGuard { + snapshot: LayerSnapshot { + layer_map: self.layer_map.load_full(), + layer_fmgr: Arc::clone(&self.layer_fmgr), + }, + _pseudo_lock: pseudo_lock, + } + } + + /// Take the snapshot of the layer map and return a write guard. The write guard will prevent + /// the layer map from being read. + pub async fn write(self: &Arc) -> LayerManagerWriteGuard { + // take the lock before taking snapshot + let pseudo_lock = self.pseudo_lock.clone().write_owned().await; + LayerManagerWriteGuard { + snapshot: LayerSnapshot { + layer_map: self.layer_map.load_full(), + layer_fmgr: Arc::clone(&self.layer_fmgr), + }, + _pseudo_lock: pseudo_lock, + layer_manager: self.clone(), + } + } + + pub fn try_write( + self: &Arc, + ) -> Result { + self.pseudo_lock + .clone() + .try_write_owned() + .map(|pseudo_lock| LayerManagerWriteGuard { + snapshot: LayerSnapshot { + layer_map: self.layer_map.load_full(), + layer_fmgr: Arc::clone(&self.layer_fmgr), + }, + _pseudo_lock: pseudo_lock, + layer_manager: self.clone(), + }) + } + + /// Make an update to the layer map. This function will NOT take the state lock and should ONLY + /// be used when there is not known concurrency when initializing the layer map. Error will be returned only if + /// the update function fails. + fn initialize_update(&self, f: impl FnOnce(LayerMap) -> Result) -> Result<()> { + let snapshot = self.layer_map.load_full(); + let new_layer_map = f(LayerMap::clone(&*snapshot))?; + let old_layer_map = self.layer_map.swap(Arc::new(new_layer_map)); + debug_assert_eq!( + Arc::as_ptr(&snapshot) as usize, + Arc::as_ptr(&old_layer_map) as usize, + "race detected when modifying layer map, use `update` instead of `initialize_update`." + ); + Ok(()) + } + + /// Make an update to the layer map. Error will be returned only if the update function fails. + async fn update(&self, f: impl FnOnce(LayerMap) -> Result<(LayerMap, T)>) -> Result { + let _guard = self.state_lock.lock().await; + let snapshot = self.layer_map.load_full(); + let (new_layer_map, data) = f(LayerMap::clone(&*snapshot))?; + let old_layer_map = self.layer_map.swap(Arc::new(new_layer_map)); + debug_assert_eq!( + Arc::as_ptr(&snapshot) as usize, + Arc::as_ptr(&old_layer_map) as usize, + "race detected when modifying layer map, please check if `initialize_update` is used on the `update` code path." + ); + Ok(data) + } +} + +impl LayerSnapshot { + fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { self.layer_fmgr.get_from_desc(desc) } /// Get an immutable reference to the layer map. /// - /// We expect users only to be able to get an immutable layer map. If users want to make modifications, - /// they should use the below semantic APIs. This design makes us step closer to immutable storage state. - pub fn layer_map(&self) -> &LayerMap { + /// If a user needs to modify the layer map, they should get a write guard and use the semantic + /// functions. + fn layer_map(&self) -> &LayerMap { &self.layer_map } - /// Get a mutable reference to the layer map. This function will be removed once `flush_frozen_layer` - /// gets a refactor. - pub fn layer_map_mut(&mut self) -> &mut LayerMap { - &mut self.layer_map + fn contains(&self, layer: &Arc) -> bool { + self.layer_fmgr.contains(layer) + } +} + +impl LayerManagerReadGuard { + pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + self.snapshot.get_from_desc(desc) + } + + /// Get an immutable reference to the layer map. + pub fn layer_map(&self) -> &LayerMap { + self.snapshot.layer_map() + } +} + +impl LayerManagerWriteGuard { + /// Check if the layer file manager contains a layer. This should ONLY be used in the compaction + /// code path where we need to check if a layer already exists on disk. With the immutable layer + /// map design, it is possible that the layer map snapshot does not contain the layer, but the layer file + /// manager does. Therefore, use this function with caution. + pub(super) fn contains(&self, layer: &Arc) -> bool { + self.snapshot.contains(layer) + } + + pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + self.snapshot.get_from_desc(desc) + } + + /// Get an immutable reference to the layer map. + pub fn layer_map(&self) -> &LayerMap { + self.snapshot.layer_map() } /// Replace layers in the layer file manager, used in evictions and layer downloads. - pub fn replace_and_verify( + pub(crate) fn replace_and_verify( &mut self, expected: Arc, new: Arc, ) -> Result<()> { - self.layer_fmgr.replace_and_verify(expected, new) + self.snapshot.layer_fmgr.replace_and_verify(expected, new) } /// Called from `load_layer_map`. Initialize the layer manager with: /// 1. all on-disk layers /// 2. next open layer (with disk disk_consistent_lsn LSN) - pub fn initialize_local_layers( + pub(crate) fn initialize_local_layers( &mut self, on_disk_layers: Vec>, next_open_layer_at: Lsn, ) { - let mut updates = self.layer_map.batch_update(); - for layer in on_disk_layers { - Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr); - } - updates.flush(); - self.layer_map.next_open_layer_at = Some(next_open_layer_at); + self.layer_manager + .initialize_update(|mut layer_map| { + let mut updates = layer_map.batch_update(); + for layer in on_disk_layers { + Self::insert_historic_layer(layer, &mut updates, &self.snapshot.layer_fmgr); + } + updates.flush(); + layer_map.next_open_layer_at = Some(next_open_layer_at); + Ok(layer_map) + }) + .unwrap(); } /// Initialize when creating a new timeline, called in `init_empty_layer_map`. - pub fn initialize_empty(&mut self, next_open_layer_at: Lsn) { - self.layer_map.next_open_layer_at = Some(next_open_layer_at); + pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) { + self.layer_manager + .initialize_update(|mut layer_map| { + layer_map.next_open_layer_at = Some(next_open_layer_at); + Ok(layer_map) + }) + .unwrap(); } - pub fn initialize_remote_layers( + pub(crate) fn initialize_remote_layers( &mut self, corrupted_local_layers: Vec>, remote_layers: Vec>, ) { - let mut updates = self.layer_map.batch_update(); - for layer in corrupted_local_layers { - Self::remove_historic_layer(layer, &mut updates, &mut self.layer_fmgr); - } - for layer in remote_layers { - Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr); - } - updates.flush(); + self.layer_manager + .initialize_update(|mut layer_map| { + let mut updates = layer_map.batch_update(); + for layer in corrupted_local_layers { + Self::remove_historic_layer(layer, &mut updates, &self.snapshot.layer_fmgr); + } + for layer in remote_layers { + Self::insert_historic_layer(layer, &mut updates, &self.snapshot.layer_fmgr); + } + updates.flush(); + Ok(layer_map) + }) + .unwrap(); } /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer, - /// called within `get_layer_for_write`. - pub fn get_layer_for_write( + /// called within `get_layer_for_write`. Only ONE thread can call this function, which is guaranteed by `write_lock` + /// in `Timeline`. + pub(crate) async fn get_layer_for_write( &mut self, lsn: Lsn, last_record_lsn: Lsn, @@ -127,7 +270,7 @@ impl LayerManager { ); // Do we have a layer open for writing already? - let layer = if let Some(open_layer) = &self.layer_map.open_layer { + let layer = if let Some(open_layer) = &self.snapshot.layer_map.open_layer { if open_layer.get_lsn_range().start > lsn { bail!( "unexpected open layer in the future: open layers starts at {}, write lsn {}", @@ -138,135 +281,188 @@ impl LayerManager { Arc::clone(open_layer) } else { - // No writeable layer yet. Create one. - let start_lsn = self - .layer_map - .next_open_layer_at - .context("No next open layer found")?; - - trace!( - "creating in-memory layer at {}/{} for record at {}", - timeline_id, - start_lsn, - lsn - ); - - let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?; - let layer = Arc::new(new_layer); - - self.layer_map.open_layer = Some(layer.clone()); - self.layer_map.next_open_layer_at = None; - - layer + self.layer_manager + .update(|mut layer_map| { + // No writeable layer yet. Create one. + let start_lsn = layer_map + .next_open_layer_at + .context("No next open layer found")?; + + trace!( + "creating in-memory layer at {}/{} for record at {}", + timeline_id, + start_lsn, + lsn + ); + + let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?; + let layer = Arc::new(new_layer); + + layer_map.open_layer = Some(layer.clone()); + layer_map.next_open_layer_at = None; + + Ok((layer_map, layer)) + }) + .await? }; Ok(layer) } /// Called from `freeze_inmem_layer`, returns true if successfully frozen. - pub fn try_freeze_in_memory_layer( + pub(crate) async fn try_freeze_in_memory_layer( &mut self, Lsn(last_record_lsn): Lsn, last_freeze_at: &AtomicLsn, ) { let end_lsn = Lsn(last_record_lsn + 1); - if let Some(open_layer) = &self.layer_map.open_layer { + if let Some(open_layer) = &self.snapshot.layer_map.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? open_layer.freeze(end_lsn); // The layer is no longer open, update the layer map to reflect this. // We will replace it with on-disk historics below. - self.layer_map.frozen_layers.push_back(open_layer_rc); - self.layer_map.open_layer = None; - self.layer_map.next_open_layer_at = Some(end_lsn); + self.layer_manager + .update(|mut layer_map| { + layer_map.frozen_layers.push_back(open_layer_rc); + layer_map.open_layer = None; + layer_map.next_open_layer_at = Some(end_lsn); + Ok((layer_map, ())) + }) + .await + .unwrap(); + last_freeze_at.store(end_lsn); } } /// Add image layers to the layer map, called from `create_image_layers`. - pub fn track_new_image_layers(&mut self, image_layers: Vec) { - let mut updates = self.layer_map.batch_update(); - for layer in image_layers { - Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr); - } - updates.flush(); + pub(crate) async fn track_new_image_layers(&mut self, image_layers: Vec) { + self.layer_manager + .update(|mut layer_map| { + let mut updates: BatchedUpdates<'_> = layer_map.batch_update(); + for layer in image_layers { + Self::insert_historic_layer( + Arc::new(layer), + &mut updates, + &self.snapshot.layer_fmgr, + ); + } + updates.flush(); + Ok((layer_map, ())) + }) + .await + .unwrap(); } /// Flush a frozen layer and add the written delta layer to the layer map. - pub fn finish_flush_l0_layer( + pub(crate) async fn finish_flush_l0_layer( &mut self, delta_layer: Option, frozen_layer_for_check: &Arc, ) { - let l = self.layer_map.frozen_layers.pop_front(); - let mut updates = self.layer_map.batch_update(); - - // Only one thread may call this function at a time (for this - // timeline). If two threads tried to flush the same frozen - // layer to disk at the same time, that would not work. - assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check)); - - if let Some(delta_layer) = delta_layer { - Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr); - } - updates.flush(); + self.layer_manager + .update(|mut layer_map| { + let l = layer_map.frozen_layers.pop_front(); + let mut updates = layer_map.batch_update(); + + // Only one thread may call this function at a time (for this + // timeline). If two threads tried to flush the same frozen + // layer to disk at the same time, that would not work. + assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check)); + + if let Some(delta_layer) = delta_layer { + Self::insert_historic_layer( + Arc::new(delta_layer), + &mut updates, + &self.snapshot.layer_fmgr, + ); + } + updates.flush(); + Ok((layer_map, ())) + }) + .await + .unwrap(); } /// Called when compaction is completed. - pub fn finish_compact_l0( + pub(crate) async fn finish_compact_l0( &mut self, layer_removal_cs: Arc>, compact_from: Vec>, compact_to: Vec>, metrics: &TimelineMetrics, ) -> Result<()> { - let mut updates = self.layer_map.batch_update(); - for l in compact_to { - Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr); - } - for l in compact_from { - // NB: the layer file identified by descriptor `l` is guaranteed to be present - // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire - // time, even though we dropped `Timeline::layers` inbetween. - Self::delete_historic_layer( - layer_removal_cs.clone(), - l, - &mut updates, - metrics, - &mut self.layer_fmgr, - )?; - } - updates.flush(); - Ok(()) + self.layer_manager + .update(|mut layer_map| { + let mut updates = layer_map.batch_update(); + for l in compact_to { + Self::insert_historic_layer(l, &mut updates, &self.snapshot.layer_fmgr); + } + for l in compact_from { + // NB: the layer file identified by descriptor `l` is guaranteed to be present + // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire + // time, even though we dropped `Timeline::layers` inbetween. + if let Err(e) = Self::delete_historic_layer( + layer_removal_cs.clone(), + l, + &mut updates, + metrics, + &self.snapshot.layer_fmgr, + ) { + // If this fails, we will need to return the "partially" modified layer map + // now. Eventually, we should decouple file deletion and layer map updates, so + // that this part can be moved out of the `update` section. + updates.flush(); + return Ok((layer_map, Err(e))); + } + } + updates.flush(); + Ok((layer_map, Ok(()))) + }) + .await + .unwrap() // unwrap the first level error, which is always Ok. } /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map. - pub fn finish_gc_timeline( + pub(crate) async fn finish_gc_timeline( &mut self, layer_removal_cs: Arc>, gc_layers: Vec>, metrics: &TimelineMetrics, - ) -> Result { - let mut updates = self.layer_map.batch_update(); - for doomed_layer in gc_layers { - Self::delete_historic_layer( - layer_removal_cs.clone(), - doomed_layer, - &mut updates, - metrics, - &mut self.layer_fmgr, - )?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch? - } - Ok(ApplyGcResultGuard(updates)) + ) -> Result<()> { + self.layer_manager + .update(|mut layer_map| { + let mut updates = layer_map.batch_update(); + for doomed_layer in gc_layers { + // TODO: decouple deletion and layer map modification + if let Err(e) = Self::delete_historic_layer( + layer_removal_cs.clone(), + doomed_layer, + &mut updates, + metrics, + &self.snapshot.layer_fmgr, + ) + // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch? + { + updates.flush(); + return Ok((layer_map, Err(e))); + } + } + updates.flush(); + Ok((layer_map, Ok(()))) + }) + .await + .unwrap() // unwrap first level error } /// Helper function to insert a layer into the layer map and file manager. fn insert_historic_layer( layer: Arc, updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, + mapping: &LayerFileManager, ) { updates.insert_historic(layer.layer_desc().clone()); mapping.insert(layer); @@ -276,7 +472,7 @@ impl LayerManager { fn remove_historic_layer( layer: Arc, updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, + mapping: &LayerFileManager, ) { updates.remove_historic(layer.layer_desc().clone()); mapping.remove(layer); @@ -290,7 +486,7 @@ impl LayerManager { layer: Arc, updates: &mut BatchedUpdates<'_>, metrics: &TimelineMetrics, - mapping: &mut LayerFileManager, + mapping: &LayerFileManager, ) -> anyhow::Result<()> { if !layer.is_remote_layer() { layer.delete_resident_layer_file()?; @@ -308,18 +504,18 @@ impl LayerManager { Ok(()) } - - pub(crate) fn contains(&self, layer: &Arc) -> bool { - self.layer_fmgr.contains(layer) - } } +/// Manages the layer files in the local / remote file system. This is a wrapper around `DashMap`. +/// +/// Developer notes: dashmap will deadlock in some cases. Please ensure only one reference to the element +/// in the dashmap is held in each of the functions. pub struct LayerFileManager( - HashMap>, + DashMap>, ); impl LayerFileManager { - fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { // The assumption for the `expect()` is that all code maintains the following invariant: // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. self.0 @@ -329,7 +525,7 @@ impl LayerFileManager { .clone() } - pub(crate) fn insert(&mut self, layer: Arc) { + pub(crate) fn insert(&self, layer: Arc) { let present = self.0.insert(layer.layer_desc().key(), layer.clone()); if present.is_some() && cfg!(debug_assertions) { panic!("overwriting a layer: {:?}", layer.layer_desc()) @@ -341,10 +537,10 @@ impl LayerFileManager { } pub(crate) fn new() -> Self { - Self(HashMap::new()) + Self(DashMap::new()) } - pub(crate) fn remove(&mut self, layer: Arc) { + pub(crate) fn remove(&self, layer: Arc) { let present = self.0.remove(&layer.layer_desc().key()); if present.is_none() && cfg!(debug_assertions) { panic!( @@ -354,7 +550,7 @@ impl LayerFileManager { } } - pub(crate) fn replace_and_verify(&mut self, expected: Arc, new: Arc) -> Result<()> { + pub(crate) fn replace_and_verify(&self, expected: Arc, new: Arc) -> Result<()> { let key = expected.layer_desc().key(); let other = new.layer_desc().key(); @@ -375,12 +571,12 @@ impl LayerFileManager { "one layer is l0 while the other is not: {expected_l0} != {new_l0}" ); - if let Some(layer) = self.0.get_mut(&key) { + if let Some(mut layer) = self.0.get_mut(&key) { anyhow::ensure!( - compare_arced_layers(&expected, layer), + compare_arced_layers(&expected, &*layer), "another layer was found instead of expected, expected={expected:?}, new={new:?}", expected = Arc::as_ptr(&expected), - new = Arc::as_ptr(layer), + new = Arc::as_ptr(&*layer), ); *layer = new; Ok(()) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 12c5dc8281d2..28bf5ccfa2fd 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 12c5dc8281d20b5bd636e1097eea80a7bc609591 +Subproject commit 28bf5ccfa2fda9677566a25abd450e714d9ed055 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index e3fbfc4d143b..553f2d3618a6 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit e3fbfc4d143b2d3c3c1813ce747f8af35aa9405e +Subproject commit 553f2d3618a6d4893bde67f1c065926ee8a3a118