diff --git a/Cargo.lock b/Cargo.lock index b104a35bf563..313222cf3c84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "RustyXML" @@ -5415,6 +5415,7 @@ dependencies = [ "strum", "strum_macros", "thiserror", + "tikv-jemallocator", "tokio", "tokio-io-timeout", "tokio-postgres", @@ -7069,6 +7070,7 @@ dependencies = [ "rand 0.8.5", "regex", "routerify", + "scopeguard", "sentry", "serde", "serde_assert", diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 6b670de2ea85..b178d7abd6d6 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -37,6 +37,7 @@ use std::collections::HashMap; use std::fs::File; use std::path::Path; use std::process::exit; +use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock}; use std::{thread, time::Duration}; @@ -322,8 +323,15 @@ fn wait_spec( } else { spec_set = false; } + let connstr = Url::parse(connstr).context("cannot parse connstr as a URL")?; + let conn_conf = postgres::config::Config::from_str(connstr.as_str()) + .context("cannot build postgres config from connstr")?; + let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str()) + .context("cannot build tokio postgres config from connstr")?; let compute_node = ComputeNode { - connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, + connstr, + conn_conf, + tokio_conn_conf, pgdata: pgdata.to_string(), pgbin: pgbin.to_string(), pgversion: get_pg_version_string(pgbin), diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs index 6716cc623412..b6db3eb11abc 100644 --- a/compute_tools/src/bin/fast_import.rs +++ b/compute_tools/src/bin/fast_import.rs @@ -21,7 +21,7 @@ //! - Build the image with the following command: //! //! ```bash -//! docker buildx build --build-arg DEBIAN_FLAVOR=bullseye-slim --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/Dockerfile.com +//! docker buildx build --platform linux/amd64 --build-arg DEBIAN_VERSION=bullseye --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/compute-node.Dockerfile . //! docker push localhost:3030/localregistry/compute-node-v14:latest //! ``` @@ -132,7 +132,8 @@ pub(crate) async fn main() -> anyhow::Result<()> { // // Initialize pgdata // - let pg_version = match get_pg_version(pg_bin_dir.as_str()) { + let pgbin = pg_bin_dir.join("postgres"); + let pg_version = match get_pg_version(pgbin.as_ref()) { PostgresMajorVersion::V14 => 14, PostgresMajorVersion::V15 => 15, PostgresMajorVersion::V16 => 16, @@ -155,7 +156,7 @@ pub(crate) async fn main() -> anyhow::Result<()> { // // Launch postgres process // - let mut postgres_proc = tokio::process::Command::new(pg_bin_dir.join("postgres")) + let mut postgres_proc = tokio::process::Command::new(pgbin) .arg("-D") .arg(&pgdata_dir) .args(["-c", "wal_level=minimal"]) diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 08ae8bf44d71..72198a947944 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -6,7 +6,6 @@ use tokio::{ process::Command, spawn, }; -use tokio_postgres::connect; use tokio_stream::{self as stream, StreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; use tracing::warn; @@ -16,10 +15,8 @@ use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgr use compute_api::responses::CatalogObjects; pub async fn get_dbs_and_roles(compute: &Arc) -> anyhow::Result { - let connstr = compute.connstr.clone(); - - let (client, connection): (tokio_postgres::Client, _) = - connect(connstr.as_str(), NoTls).await?; + let conf = compute.get_tokio_conn_conf(Some("compute_ctl:get_dbs_and_roles")); + let (client, connection): (tokio_postgres::Client, _) = conf.connect(NoTls).await?; spawn(async move { if let Err(e) = connection.await { diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs index cec2b1bed833..62d61a8bc987 100644 --- a/compute_tools/src/checker.rs +++ b/compute_tools/src/checker.rs @@ -9,7 +9,8 @@ use crate::compute::ComputeNode; #[instrument(skip_all)] pub async fn check_writability(compute: &ComputeNode) -> Result<()> { // Connect to the database. - let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?; + let conf = compute.get_tokio_conn_conf(Some("compute_ctl:availability_checker")); + let (client, connection) = conf.connect(NoTls).await?; if client.is_closed() { return Err(anyhow!("connection to postgres closed")); } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 1a026a40143a..da1caf1a9b2f 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,8 +20,9 @@ use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use nix::unistd::Pid; +use postgres; use postgres::error::SqlState; -use postgres::{Client, NoTls}; +use postgres::NoTls; use tracing::{debug, error, info, instrument, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -58,6 +59,10 @@ pub static PG_PID: AtomicU32 = AtomicU32::new(0); pub struct ComputeNode { // Url type maintains proper escaping pub connstr: url::Url, + // We connect to Postgres from many different places, so build configs once + // and reuse them where needed. + pub conn_conf: postgres::config::Config, + pub tokio_conn_conf: tokio_postgres::config::Config, pub pgdata: String, pub pgbin: String, pub pgversion: String, @@ -800,10 +805,10 @@ impl ComputeNode { /// version. In the future, it may upgrade all 3rd-party extensions. #[instrument(skip_all)] pub fn post_apply_config(&self) -> Result<()> { - let connstr = self.connstr.clone(); + let conf = self.get_conn_conf(Some("compute_ctl:post_apply_config")); thread::spawn(move || { let func = || { - let mut client = Client::connect(connstr.as_str(), NoTls)?; + let mut client = conf.connect(NoTls)?; handle_neon_extension_upgrade(&mut client) .context("handle_neon_extension_upgrade")?; Ok::<_, anyhow::Error>(()) @@ -815,12 +820,27 @@ impl ComputeNode { Ok(()) } + pub fn get_conn_conf(&self, application_name: Option<&str>) -> postgres::Config { + let mut conf = self.conn_conf.clone(); + if let Some(application_name) = application_name { + conf.application_name(application_name); + } + conf + } + + pub fn get_tokio_conn_conf(&self, application_name: Option<&str>) -> tokio_postgres::Config { + let mut conf = self.tokio_conn_conf.clone(); + if let Some(application_name) = application_name { + conf.application_name(application_name); + } + conf + } + async fn get_maintenance_client( conf: &tokio_postgres::Config, ) -> Result { let mut conf = conf.clone(); - - conf.application_name("apply_config"); + conf.application_name("compute_ctl:apply_config"); let (client, conn) = match conf.connect(NoTls).await { // If connection fails, it may be the old node with `zenith_admin` superuser. @@ -837,6 +857,7 @@ impl ComputeNode { e ); let mut zenith_admin_conf = postgres::config::Config::from(conf.clone()); + zenith_admin_conf.application_name("compute_ctl:apply_config"); zenith_admin_conf.user("zenith_admin"); let mut client = @@ -1134,8 +1155,7 @@ impl ComputeNode { /// Do initial configuration of the already started Postgres. #[instrument(skip_all)] pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap(); - conf.application_name("apply_config"); + let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); let conf = Arc::new(conf); let spec = Arc::new( @@ -1161,7 +1181,7 @@ impl ComputeNode { thread::spawn(move || { let conf = conf.as_ref().clone(); let mut conf = postgres::config::Config::from(conf); - conf.application_name("migrations"); + conf.application_name("compute_ctl:migrations"); let mut client = conf.connect(NoTls)?; handle_migrations(&mut client).context("apply_config handle_migrations") @@ -1369,9 +1389,9 @@ impl ComputeNode { } self.post_apply_config()?; - let connstr = self.connstr.clone(); + let conf = self.get_conn_conf(None); thread::spawn(move || { - let res = get_installed_extensions(&connstr); + let res = get_installed_extensions(conf); match res { Ok(extensions) => { info!( @@ -1510,7 +1530,8 @@ impl ComputeNode { /// Select `pg_stat_statements` data and return it as a stringified JSON pub async fn collect_insights(&self) -> String { let mut result_rows: Vec = Vec::new(); - let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await; + let conf = self.get_tokio_conn_conf(Some("compute_ctl:collect_insights")); + let connect_result = conf.connect(NoTls).await; let (client, connection) = connect_result.unwrap(); tokio::spawn(async move { if let Err(e) = connection.await { @@ -1636,10 +1657,9 @@ LIMIT 100", privileges: &[Privilege], role_name: &PgIdent, ) -> Result<()> { - use tokio_postgres::config::Config; use tokio_postgres::NoTls; - let mut conf = Config::from_str(self.connstr.as_str()).unwrap(); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:set_role_grants")); conf.dbname(db_name); let (db_client, conn) = conf @@ -1676,10 +1696,9 @@ LIMIT 100", db_name: &PgIdent, ext_version: ExtVersion, ) -> Result { - use tokio_postgres::config::Config; use tokio_postgres::NoTls; - let mut conf = Config::from_str(self.connstr.as_str()).unwrap(); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:install_extension")); conf.dbname(db_name); let (db_client, conn) = conf diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index a6c6cff20af5..7fa6426d8f9e 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -295,12 +295,11 @@ async fn routes(req: Request, compute: &Arc) -> Response render_json(Body::from(serde_json::to_string(&res).unwrap())), diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index f473c29a558e..5f62f08858a7 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -10,8 +10,6 @@ use metrics::core::Collector; use metrics::{register_uint_gauge_vec, UIntGaugeVec}; use once_cell::sync::Lazy; -use crate::pg_helpers::postgres_conf_for_db; - /// We don't reuse get_existing_dbs() just for code clarity /// and to make database listing query here more explicit. /// @@ -41,14 +39,16 @@ fn list_dbs(client: &mut Client) -> Result> { /// /// Same extension can be installed in multiple databases with different versions, /// we only keep the highest and lowest version across all databases. -pub fn get_installed_extensions(connstr: &url::Url) -> Result { - let mut client = Client::connect(connstr.as_str(), NoTls)?; +pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result { + conf.application_name("compute_ctl:get_installed_extensions"); + let mut client = conf.connect(NoTls)?; + let databases: Vec = list_dbs(&mut client)?; let mut extensions_map: HashMap = HashMap::new(); for db in databases.iter() { - let config = postgres_conf_for_db(connstr, db)?; - let mut db_client = config.connect(NoTls)?; + conf.dbname(db); + let mut db_client = conf.connect(NoTls)?; let extensions: Vec<(String, String)> = db_client .query( "SELECT extname, extversion FROM pg_catalog.pg_extension;", @@ -82,7 +82,7 @@ pub fn get_installed_extensions(connstr: &url::Url) -> Result = None; @@ -57,7 +54,7 @@ fn watch_compute_activity(compute: &ComputeNode) { info!("connection to Postgres is closed, trying to reconnect"); // Connection is closed, reconnect and try again. - client = Client::connect(connstr, NoTls); + client = conf.connect(NoTls); continue; } @@ -196,7 +193,7 @@ fn watch_compute_activity(compute: &ComputeNode) { debug!("could not connect to Postgres: {}, retrying", e); // Establish a new connection and try again. - client = Client::connect(connstr, NoTls); + client = conf.connect(NoTls); } } } diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 50e0e9e50429..e49d15ba87a0 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -442,12 +442,7 @@ impl Default for ConfigToml { tenant_config: TenantConfigToml::default(), no_sync: None, wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL, - page_service_pipelining: PageServicePipeliningConfig::Pipelined( - PageServicePipeliningConfigPipelined { - max_batch_size: NonZeroUsize::new(32).unwrap(), - execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures, - }, - ), + page_service_pipelining: PageServicePipeliningConfig::Serial, } } } diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 840917ef6812..8d1962fa29ba 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -35,6 +35,7 @@ use utils::backoff; use utils::backoff::exponential_backoff_duration_seconds; use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind}; +use crate::DownloadKind; use crate::{ config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata, @@ -49,10 +50,17 @@ pub struct AzureBlobStorage { concurrency_limiter: ConcurrencyLimiter, // Per-request timeout. Accessible for tests. pub timeout: Duration, + + // Alternative timeout used for metadata objects which are expected to be small + pub small_timeout: Duration, } impl AzureBlobStorage { - pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result { + pub fn new( + azure_config: &AzureConfig, + timeout: Duration, + small_timeout: Duration, + ) -> Result { debug!( "Creating azure remote storage for azure container {}", azure_config.container_name @@ -94,6 +102,7 @@ impl AzureBlobStorage { max_keys_per_list_response, concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()), timeout, + small_timeout, }) } @@ -133,6 +142,7 @@ impl AzureBlobStorage { async fn download_for_builder( &self, builder: GetBlobBuilder, + timeout: Duration, cancel: &CancellationToken, ) -> Result { let kind = RequestKind::Get; @@ -156,7 +166,7 @@ impl AzureBlobStorage { .map_err(to_download_error); // apply per request timeout - let response = tokio_stream::StreamExt::timeout(response, self.timeout); + let response = tokio_stream::StreamExt::timeout(response, timeout); // flatten let response = response.map(|res| match res { @@ -415,7 +425,7 @@ impl RemoteStorage for AzureBlobStorage { let blob_client = self.client.blob_client(self.relative_path_to_name(key)); let properties_future = blob_client.get_properties().into_future(); - let properties_future = tokio::time::timeout(self.timeout, properties_future); + let properties_future = tokio::time::timeout(self.small_timeout, properties_future); let res = tokio::select! { res = properties_future => res, @@ -521,7 +531,12 @@ impl RemoteStorage for AzureBlobStorage { }); } - self.download_for_builder(builder, cancel).await + let timeout = match opts.kind { + DownloadKind::Small => self.small_timeout, + DownloadKind::Large => self.timeout, + }; + + self.download_for_builder(builder, timeout, cancel).await } async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index e99ae4f747d9..f6ef31077c76 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -24,6 +24,13 @@ pub struct RemoteStorageConfig { skip_serializing_if = "is_default_timeout" )] pub timeout: Duration, + /// Alternative timeout used for metadata objects which are expected to be small + #[serde( + with = "humantime_serde", + default = "default_small_timeout", + skip_serializing_if = "is_default_small_timeout" + )] + pub small_timeout: Duration, } impl RemoteStorageKind { @@ -40,10 +47,18 @@ fn default_timeout() -> Duration { RemoteStorageConfig::DEFAULT_TIMEOUT } +fn default_small_timeout() -> Duration { + RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT +} + fn is_default_timeout(d: &Duration) -> bool { *d == RemoteStorageConfig::DEFAULT_TIMEOUT } +fn is_default_small_timeout(d: &Duration) -> bool { + *d == RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT +} + /// A kind of a remote storage to connect to, with its connection configuration. #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(untagged)] @@ -184,6 +199,7 @@ fn serialize_storage_class( impl RemoteStorageConfig { pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120); + pub const DEFAULT_SMALL_TIMEOUT: Duration = std::time::Duration::from_secs(30); pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result { Ok(utils::toml_edit_ext::deserialize_item(toml)?) @@ -219,7 +235,8 @@ timeout = '5s'"; storage: RemoteStorageKind::LocalFs { local_path: Utf8PathBuf::from(".") }, - timeout: Duration::from_secs(5) + timeout: Duration::from_secs(5), + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT } ); } @@ -247,7 +264,8 @@ timeout = '5s'"; max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, upload_storage_class: Some(StorageClass::IntelligentTiering), }), - timeout: Duration::from_secs(7) + timeout: Duration::from_secs(7), + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT } ); } @@ -299,7 +317,8 @@ timeout = '5s'"; concurrency_limit: default_remote_storage_azure_concurrency_limit(), max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, }), - timeout: Duration::from_secs(7) + timeout: Duration::from_secs(7), + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT } ); } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 719608dd5f1b..0ece29d99e17 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -178,6 +178,15 @@ pub struct DownloadOpts { /// The end of the byte range to download, or unbounded. Must be after the /// start bound. pub byte_end: Bound, + /// Indicate whether we're downloading something small or large: this indirectly controls + /// timeouts: for something like an index/manifest/heatmap, we should time out faster than + /// for layer files + pub kind: DownloadKind, +} + +pub enum DownloadKind { + Large, + Small, } impl Default for DownloadOpts { @@ -186,6 +195,7 @@ impl Default for DownloadOpts { etag: Default::default(), byte_start: Bound::Unbounded, byte_end: Bound::Unbounded, + kind: DownloadKind::Large, } } } @@ -584,6 +594,10 @@ impl GenericRemoteStorage> { impl GenericRemoteStorage { pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result { let timeout = storage_config.timeout; + + // If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically + let small_timeout = std::cmp::min(storage_config.small_timeout, timeout); + Ok(match &storage_config.storage { RemoteStorageKind::LocalFs { local_path: path } => { info!("Using fs root '{path}' as a remote storage"); @@ -606,7 +620,11 @@ impl GenericRemoteStorage { .unwrap_or(""); info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'", azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container); - Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?)) + Self::AzureBlob(Arc::new(AzureBlobStorage::new( + azure_config, + timeout, + small_timeout, + )?)) } }) } diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 3a20649490ba..92d579fec866 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -219,7 +219,8 @@ async fn create_azure_client( concurrency_limit: NonZeroUsize::new(100).unwrap(), max_keys_per_list_response, }), - timeout: Duration::from_secs(120), + timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; Ok(Arc::new( GenericRemoteStorage::from_config(&remote_storage_config) diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 3e99a65fac0b..e60ec18c93d9 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -396,6 +396,7 @@ async fn create_s3_client( upload_storage_class: None, }), timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; Ok(Arc::new( GenericRemoteStorage::from_config(&remote_storage_config) diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index a52d953d662e..5648072a83c2 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -46,6 +46,7 @@ tracing.workspace = true tracing-error.workspace = true tracing-subscriber = { workspace = true, features = ["json", "registry"] } rand.workspace = true +scopeguard.workspace = true strum.workspace = true strum_macros.workspace = true url.workspace = true diff --git a/libs/utils/src/sync/spsc_fold.rs b/libs/utils/src/sync/spsc_fold.rs index a33f8097fc11..b44f766ef01f 100644 --- a/libs/utils/src/sync/spsc_fold.rs +++ b/libs/utils/src/sync/spsc_fold.rs @@ -115,6 +115,9 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { + scopeguard::defer! { + self.state.wake_receiver.notify() + }; let Ok(mut guard) = self.state.value.lock() else { return; }; @@ -179,6 +182,9 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { + scopeguard::defer! { + self.state.wake_sender.notify() + }; let Ok(mut guard) = self.state.value.lock() else { return; }; @@ -401,4 +407,46 @@ mod tests { let result = receiver.recv().await; assert!(matches!(result, Err(RecvError::SenderGone))); } + + #[tokio::test(start_paused = true)] + async fn test_receiver_drops_after_sender_went_to_sleep() { + let (mut sender, receiver) = channel(); + let state = receiver.state.clone(); + + sender.send(23, |_, _| unreachable!()).await.unwrap(); + + let send_task = tokio::spawn(async move { sender.send(42, |_, v| Err(v)).await }); + + tokio::time::sleep(FOREVER).await; + + assert!(matches!( + &*state.value.lock().unwrap(), + &State::SenderWaitsForReceiverToConsume(_) + )); + + drop(receiver); + + let err = send_task + .await + .unwrap() + .expect_err("should unblock immediately"); + assert!(matches!(err, SendError::ReceiverGone)); + } + + #[tokio::test(start_paused = true)] + async fn test_sender_drops_after_receiver_went_to_sleep() { + let (sender, mut receiver) = channel::(); + let state = sender.state.clone(); + + let recv_task = tokio::spawn(async move { receiver.recv().await }); + + tokio::time::sleep(FOREVER).await; + + assert!(matches!(&*state.value.lock().unwrap(), &State::NoData)); + + drop(sender); + + let err = recv_task.await.unwrap().expect_err("should error"); + assert!(matches!(err, RecvError::SenderGone)); + } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index e74c8ecf5a9e..1d508f5fe962 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -838,6 +838,7 @@ mod test { local_path: remote_fs_dir.clone(), }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; let storage = GenericRemoteStorage::from_config(&storage_config) .await diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2b70e4673d7a..944b42ddd685 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1270,41 +1270,37 @@ impl PageServerHandler { let cancel_batcher = self.cancel.child_token(); let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); - let read_messages = pipeline_stage!( - "read_messages", - cancel_batcher.clone(), - move |cancel_batcher| { - let ctx = ctx.attached_child(); - async move { - let mut pgb_reader = pgb_reader; - let mut exit = false; - while !exit { - let read_res = Self::pagestream_read_message( - &mut pgb_reader, - tenant_id, - timeline_id, - &mut timeline_handles, - &cancel_batcher, - &ctx, - request_span.clone(), - ) + let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| { + let ctx = ctx.attached_child(); + async move { + let mut pgb_reader = pgb_reader; + let mut exit = false; + while !exit { + let read_res = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel_batcher, + &ctx, + request_span.clone(), + ) + .await; + let Some(read_res) = read_res.transpose() else { + debug!("client-initiated shutdown"); + break; + }; + exit |= read_res.is_err(); + let could_send = batch_tx + .send(read_res, |batch, res| { + Self::pagestream_do_batch(max_batch_size, batch, res) + }) .await; - let Some(read_res) = read_res.transpose() else { - debug!("client-initiated shutdown"); - break; - }; - exit |= read_res.is_err(); - let could_send = batch_tx - .send(read_res, |batch, res| { - Self::pagestream_do_batch(max_batch_size, batch, res) - }) - .await; - exit |= could_send.is_err(); - } - (pgb_reader, timeline_handles) + exit |= could_send.is_err(); } + (pgb_reader, timeline_handles) } - ); + }); // // Executor @@ -1341,11 +1337,11 @@ impl PageServerHandler { match execution { PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => { - tokio::join!(read_messages, executor) + tokio::join!(batcher, executor) } PageServiceProtocolPipelinedExecutionStrategy::Tasks => { // These tasks are not tracked anywhere. - let read_messages_task = tokio::spawn(read_messages); + let read_messages_task = tokio::spawn(batcher); let (read_messages_task_res, executor_res_) = tokio::join!(read_messages_task, executor,); ( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 339a3ca1bb98..cd0690bb1a57 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5423,6 +5423,7 @@ pub(crate) mod harness { local_path: remote_fs_dir.clone(), }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }; let remote_storage = GenericRemoteStorage::from_config(&config).await.unwrap(); let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone())); diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index d632e595ada0..739615be9cef 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -30,7 +30,9 @@ use crate::tenant::Generation; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; -use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath}; +use remote_storage::{ + DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, +}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use utils::pausable_failpoint; @@ -345,12 +347,13 @@ pub async fn list_remote_timelines( async fn do_download_remote_path_retry_forever( storage: &GenericRemoteStorage, remote_path: &RemotePath, + download_opts: DownloadOpts, cancel: &CancellationToken, ) -> Result<(Vec, SystemTime), DownloadError> { download_retry_forever( || async { let download = storage - .download(remote_path, &DownloadOpts::default(), cancel) + .download(remote_path, &download_opts, cancel) .await?; let mut bytes = Vec::new(); @@ -377,8 +380,13 @@ async fn do_download_tenant_manifest( ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> { let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation); + let download_opts = DownloadOpts { + kind: DownloadKind::Small, + ..Default::default() + }; + let (manifest_bytes, manifest_bytes_mtime) = - do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?; + do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?; let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes) .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}")) @@ -398,8 +406,13 @@ async fn do_download_index_part( timeline_id.expect("A timeline ID is always provided when downloading an index"); let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation); + let download_opts = DownloadOpts { + kind: DownloadKind::Small, + ..Default::default() + }; + let (index_part_bytes, index_part_mtime) = - do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?; + do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) .with_context(|| format!("deserialize index part file at {remote_path:?}")) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 7443261a9c00..8d771dc40535 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -49,7 +49,7 @@ use futures::Future; use metrics::UIntGauge; use pageserver_api::models::SecondaryProgress; use pageserver_api::shard::TenantShardId; -use remote_storage::{DownloadError, DownloadOpts, Etag, GenericRemoteStorage}; +use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage}; use tokio_util::sync::CancellationToken; use tracing::{info_span, instrument, warn, Instrument}; @@ -946,6 +946,7 @@ impl<'a> TenantDownloader<'a> { let cancel = &self.secondary_state.cancel; let opts = DownloadOpts { etag: prev_etag.cloned(), + kind: DownloadKind::Small, ..Default::default() }; diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs index 8d5ab1780f70..bc4d148a2942 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -4,7 +4,8 @@ use anyhow::Context; use bytes::Bytes; use postgres_ffi::ControlFileData; use remote_storage::{ - Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingObject, RemotePath, + Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing, + ListingObject, RemotePath, }; use serde::de::DeserializeOwned; use tokio_util::sync::CancellationToken; @@ -239,6 +240,7 @@ impl RemoteStorageWrapper { .download( path, &DownloadOpts { + kind: DownloadKind::Large, etag: None, byte_start: Bound::Included(start_inclusive), byte_end: Bound::Excluded(end_exclusive) diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index e328c6de7938..b375eb886e09 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -486,6 +486,7 @@ mod tests { upload_storage_class: None, }), timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT, }) ); assert_eq!(parquet_upload.parquet_upload_row_group_size, 100); @@ -545,6 +546,7 @@ mod tests { local_path: tmpdir.to_path_buf(), }, timeout: std::time::Duration::from_secs(120), + small_timeout: std::time::Duration::from_secs(30), }; let storage = GenericRemoteStorage::from_config(&remote_storage_config) .await diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 635a9222e122..0422c46ab10c 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -41,6 +41,7 @@ serde_json.workspace = true strum.workspace = true strum_macros.workspace = true thiserror.workspace = true +tikv-jemallocator.workspace = true tokio = { workspace = true, features = ["fs"] } tokio-util = { workspace = true } tokio-io-timeout.workspace = true diff --git a/safekeeper/benches/receive_wal.rs b/safekeeper/benches/receive_wal.rs index c637b4fb24d1..8c4281cf527e 100644 --- a/safekeeper/benches/receive_wal.rs +++ b/safekeeper/benches/receive_wal.rs @@ -6,6 +6,7 @@ mod benchutils; use std::io::Write as _; use benchutils::Env; +use bytes::BytesMut; use camino_tempfile::tempfile; use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion}; use itertools::Itertools as _; @@ -23,6 +24,9 @@ const KB: usize = 1024; const MB: usize = 1024 * KB; const GB: usize = 1024 * MB; +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + // Register benchmarks with Criterion. criterion_group!( name = benches; @@ -30,7 +34,8 @@ criterion_group!( targets = bench_process_msg, bench_wal_acceptor, bench_wal_acceptor_throughput, - bench_file_write + bench_file_write, + bench_bytes_reserve, ); criterion_main!(benches); @@ -341,3 +346,26 @@ fn bench_file_write(c: &mut Criterion) { Ok(()) } } + +/// Benchmarks the cost of memory allocations when receiving WAL messages. This emulates the logic +/// in FeMessage::parse, which extends the read buffer. It is primarily intended to test jemalloc. +fn bench_bytes_reserve(c: &mut Criterion) { + let mut g = c.benchmark_group("bytes_reserve"); + for size in [1, 64, KB, 8 * KB, 128 * KB] { + g.throughput(criterion::Throughput::Bytes(size as u64)); + g.bench_function(format!("size={size}"), |b| run_bench(b, size).unwrap()); + } + + fn run_bench(b: &mut Bencher, size: usize) -> anyhow::Result<()> { + let mut bytes = BytesMut::new(); + let data = vec![0; size]; + + b.iter(|| { + bytes.reserve(size); + bytes.extend_from_slice(&data); + bytes.split_to(size).freeze(); + }); + + Ok(()) + } +} diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 1248428d3393..3659bcd7e048 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -48,6 +48,9 @@ use utils::{ tcp_listener, }; +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + const PID_FILE_NAME: &str = "safekeeper.pid"; const ID_FILE_NAME: &str = "safekeeper.id"; diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 2414d95eb89b..ecc6b11e4758 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -305,7 +305,7 @@ impl std::ops::Add for AffinityScore { /// Hint for whether this is a sincere attempt to schedule, or a speculative /// check for where we _would_ schedule (done during optimization) -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum ScheduleMode { Normal, Speculative, @@ -319,7 +319,7 @@ impl Default for ScheduleMode { // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling // it for many shards in the same tenant. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(crate) struct ScheduleContext { /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`] pub(crate) nodes: HashMap, @@ -331,6 +331,14 @@ pub(crate) struct ScheduleContext { } impl ScheduleContext { + pub(crate) fn new(mode: ScheduleMode) -> Self { + Self { + nodes: HashMap::new(), + attached_nodes: HashMap::new(), + mode, + } + } + /// Input is a list of nodes we would like to avoid using again within this context. The more /// times a node is passed into this call, the less inclined we are to use it. pub(crate) fn avoid(&mut self, nodes: &[NodeId]) { @@ -355,6 +363,11 @@ impl ScheduleContext { pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize { self.attached_nodes.get(&node_id).copied().unwrap_or(0) } + + #[cfg(test)] + pub(crate) fn attach_count(&self) -> usize { + self.attached_nodes.values().sum() + } } pub(crate) enum RefCountUpdate { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 446c476b99c5..636ccf11a120 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1,3 +1,6 @@ +pub mod chaos_injector; +mod context_iterator; + use hyper::Uri; use std::{ borrow::Cow, @@ -95,7 +98,7 @@ use crate::{ }, }; -pub mod chaos_injector; +use context_iterator::TenantShardContextIterator; // For operations that should be quick, like attaching a new tenant const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); @@ -5498,49 +5501,51 @@ impl Service { let mut tenants_affected: usize = 0; - for (tenant_shard_id, tenant_shard) in tenants { - if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { - // When a node goes offline, we set its observed configuration to None, indicating unknown: we will - // not assume our knowledge of the node's configuration is accurate until it comes back online - observed_loc.conf = None; - } - - if nodes.len() == 1 { - // Special case for single-node cluster: there is no point trying to reschedule - // any tenant shards: avoid doing so, in order to avoid spewing warnings about - // failures to schedule them. - continue; - } + for (_tenant_id, mut schedule_context, shards) in + TenantShardContextIterator::new(tenants, ScheduleMode::Normal) + { + for tenant_shard in shards { + let tenant_shard_id = tenant_shard.tenant_shard_id; + if let Some(observed_loc) = + tenant_shard.observed.locations.get_mut(&node_id) + { + // When a node goes offline, we set its observed configuration to None, indicating unknown: we will + // not assume our knowledge of the node's configuration is accurate until it comes back online + observed_loc.conf = None; + } - if !nodes - .values() - .any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_))) - { - // Special case for when all nodes are unavailable and/or unschedulable: there is no point - // trying to reschedule since there's nowhere else to go. Without this - // branch we incorrectly detach tenants in response to node unavailability. - continue; - } + if nodes.len() == 1 { + // Special case for single-node cluster: there is no point trying to reschedule + // any tenant shards: avoid doing so, in order to avoid spewing warnings about + // failures to schedule them. + continue; + } - if tenant_shard.intent.demote_attached(scheduler, node_id) { - tenant_shard.sequence = tenant_shard.sequence.next(); + if !nodes + .values() + .any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_))) + { + // Special case for when all nodes are unavailable and/or unschedulable: there is no point + // trying to reschedule since there's nowhere else to go. Without this + // branch we incorrectly detach tenants in response to node unavailability. + continue; + } - // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters - // for tenants without secondary locations: if they have a secondary location, then this - // schedule() call is just promoting an existing secondary) - let mut schedule_context = ScheduleContext::default(); + if tenant_shard.intent.demote_attached(scheduler, node_id) { + tenant_shard.sequence = tenant_shard.sequence.next(); - match tenant_shard.schedule(scheduler, &mut schedule_context) { - Err(e) => { - // It is possible that some tenants will become unschedulable when too many pageservers - // go offline: in this case there isn't much we can do other than make the issue observable. - // TODO: give TenantShard a scheduling error attribute to be queried later. - tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id); - } - Ok(()) => { - if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() { - tenants_affected += 1; - }; + match tenant_shard.schedule(scheduler, &mut schedule_context) { + Err(e) => { + // It is possible that some tenants will become unschedulable when too many pageservers + // go offline: in this case there isn't much we can do other than make the issue observable. + // TODO: give TenantShard a scheduling error attribute to be queried later. + tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id); + } + Ok(()) => { + if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() { + tenants_affected += 1; + }; + } } } } @@ -6011,14 +6016,8 @@ impl Service { let (nodes, tenants, _scheduler) = locked.parts_mut(); let pageservers = nodes.clone(); - let mut schedule_context = ScheduleContext::default(); - let mut reconciles_spawned = 0; - for (tenant_shard_id, shard) in tenants.iter_mut() { - if tenant_shard_id.is_shard_zero() { - schedule_context = ScheduleContext::default(); - } - + for shard in tenants.values_mut() { // Skip checking if this shard is already enqueued for reconciliation if shard.delayed_reconcile && self.reconciler_concurrency.available_permits() == 0 { // If there is something delayed, then return a nonzero count so that @@ -6033,8 +6032,6 @@ impl Service { if self.maybe_reconcile_shard(shard, &pageservers).is_some() { reconciles_spawned += 1; } - - schedule_context.avoid(&shard.intent.all_pageservers()); } reconciles_spawned @@ -6103,95 +6100,62 @@ impl Service { } fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> { - let mut schedule_context = ScheduleContext::default(); - - let mut tenant_shards: Vec<&TenantShard> = Vec::new(); - // How many candidate optimizations we will generate, before evaluating them for readniess: setting // this higher than the execution limit gives us a chance to execute some work even if the first // few optimizations we find are not ready. const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8; let mut work = Vec::new(); - let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); - for (tenant_shard_id, shard) in tenants.iter() { - if tenant_shard_id.is_shard_zero() { - // Reset accumulators on the first shard in a tenant - schedule_context = ScheduleContext::default(); - schedule_context.mode = ScheduleMode::Speculative; - tenant_shards.clear(); - } - - if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { - break; - } - match shard.get_scheduling_policy() { - ShardSchedulingPolicy::Active => { - // Ok to do optimization + for (_tenant_id, schedule_context, shards) in + TenantShardContextIterator::new(tenants, ScheduleMode::Speculative) + { + for shard in shards { + if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { + break; } - ShardSchedulingPolicy::Essential - | ShardSchedulingPolicy::Pause - | ShardSchedulingPolicy::Stop => { - // Policy prevents optimizing this shard. - continue; + match shard.get_scheduling_policy() { + ShardSchedulingPolicy::Active => { + // Ok to do optimization + } + ShardSchedulingPolicy::Essential + | ShardSchedulingPolicy::Pause + | ShardSchedulingPolicy::Stop => { + // Policy prevents optimizing this shard. + continue; + } } - } - - // Accumulate the schedule context for all the shards in a tenant: we must have - // the total view of all shards before we can try to optimize any of them. - schedule_context.avoid(&shard.intent.all_pageservers()); - if let Some(attached) = shard.intent.get_attached() { - schedule_context.push_attached(*attached); - } - tenant_shards.push(shard); - // Once we have seen the last shard in the tenant, proceed to search across all shards - // in the tenant for optimizations - if shard.shard.number.0 == shard.shard.count.count() - 1 { - if tenant_shards.iter().any(|s| s.reconciler.is_some()) { + if !matches!(shard.splitting, SplitState::Idle) + || matches!(shard.policy, PlacementPolicy::Detached) + || shard.reconciler.is_some() + { // Do not start any optimizations while another change to the tenant is ongoing: this // is not necessary for correctness, but simplifies operations and implicitly throttles // optimization changes to happen in a "trickle" over time. continue; } - if tenant_shards.iter().any(|s| { - !matches!(s.splitting, SplitState::Idle) - || matches!(s.policy, PlacementPolicy::Detached) - }) { - // Never attempt to optimize a tenant that is currently being split, or - // a tenant that is meant to be detached - continue; - } - // TODO: optimization calculations are relatively expensive: create some fast-path for // the common idle case (avoiding the search on tenants that we have recently checked) - - for shard in &tenant_shards { - if let Some(optimization) = - // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to - // its primary location based on soft constraints, cut it over. - shard.optimize_attachment(nodes, &schedule_context) - { - work.push((shard.tenant_shard_id, optimization)); - break; - } else if let Some(optimization) = - // If idle, maybe optimize secondary locations: if a shard has a secondary location that would be - // better placed on another node, based on ScheduleContext, then adjust it. This - // covers cases like after a shard split, where we might have too many shards - // in the same tenant with secondary locations on the node where they originally split. - shard.optimize_secondary(scheduler, &schedule_context) - { - work.push((shard.tenant_shard_id, optimization)); - break; - } - - // TODO: extend this mechanism to prefer attaching on nodes with fewer attached - // tenants (i.e. extend schedule state to distinguish attached from secondary counts), - // for the total number of attachments on a node (not just within a tenant.) + if let Some(optimization) = + // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to + // its primary location based on soft constraints, cut it over. + shard.optimize_attachment(nodes, &schedule_context) + { + work.push((shard.tenant_shard_id, optimization)); + break; + } else if let Some(optimization) = + // If idle, maybe optimize secondary locations: if a shard has a secondary location that would be + // better placed on another node, based on ScheduleContext, then adjust it. This + // covers cases like after a shard split, where we might have too many shards + // in the same tenant with secondary locations on the node where they originally split. + shard.optimize_secondary(scheduler, &schedule_context) + { + work.push((shard.tenant_shard_id, optimization)); + break; } } } diff --git a/storage_controller/src/service/context_iterator.rs b/storage_controller/src/service/context_iterator.rs new file mode 100644 index 000000000000..d38010a27eca --- /dev/null +++ b/storage_controller/src/service/context_iterator.rs @@ -0,0 +1,139 @@ +use std::collections::BTreeMap; + +use utils::id::TenantId; +use utils::shard::TenantShardId; + +use crate::scheduler::{ScheduleContext, ScheduleMode}; +use crate::tenant_shard::TenantShard; + +/// When making scheduling decisions, it is useful to have the ScheduleContext for a whole +/// tenant while considering the individual shards within it. This iterator is a helper +/// that gathers all the shards in a tenant and then yields them together with a ScheduleContext +/// for the tenant. +pub(super) struct TenantShardContextIterator<'a> { + schedule_mode: ScheduleMode, + inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>, +} + +impl<'a> TenantShardContextIterator<'a> { + pub(super) fn new( + tenants: &'a mut BTreeMap, + schedule_mode: ScheduleMode, + ) -> Self { + Self { + schedule_mode, + inner: tenants.iter_mut(), + } + } +} + +impl<'a> Iterator for TenantShardContextIterator<'a> { + type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>); + + fn next(&mut self) -> Option { + let mut tenant_shards = Vec::new(); + let mut schedule_context = ScheduleContext::new(self.schedule_mode.clone()); + loop { + let (tenant_shard_id, shard) = self.inner.next()?; + + if tenant_shard_id.is_shard_zero() { + // Cleared on last shard of previous tenant + assert!(tenant_shards.is_empty()); + } + + // Accumulate the schedule context for all the shards in a tenant + schedule_context.avoid(&shard.intent.all_pageservers()); + if let Some(attached) = shard.intent.get_attached() { + schedule_context.push_attached(*attached); + } + tenant_shards.push(shard); + + if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 { + return Some((tenant_shard_id.tenant_id, schedule_context, tenant_shards)); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, str::FromStr}; + + use pageserver_api::controller_api::PlacementPolicy; + use utils::shard::{ShardCount, ShardNumber}; + + use crate::{ + scheduler::test_utils::make_test_nodes, service::Scheduler, + tenant_shard::tests::make_test_tenant_with_id, + }; + + use super::*; + + #[test] + fn test_context_iterator() { + // Hand-crafted tenant IDs to ensure they appear in the expected order when put into + // a btreemap & iterated + let mut t_1_shards = make_test_tenant_with_id( + TenantId::from_str("af0480929707ee75372337efaa5ecf96").unwrap(), + PlacementPolicy::Attached(1), + ShardCount(1), + None, + ); + let t_2_shards = make_test_tenant_with_id( + TenantId::from_str("bf0480929707ee75372337efaa5ecf96").unwrap(), + PlacementPolicy::Attached(1), + ShardCount(4), + None, + ); + let mut t_3_shards = make_test_tenant_with_id( + TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(), + PlacementPolicy::Attached(1), + ShardCount(1), + None, + ); + + let t1_id = t_1_shards[0].tenant_shard_id.tenant_id; + let t2_id = t_2_shards[0].tenant_shard_id.tenant_id; + let t3_id = t_3_shards[0].tenant_shard_id.tenant_id; + + let mut tenants = BTreeMap::new(); + tenants.insert(t_1_shards[0].tenant_shard_id, t_1_shards.pop().unwrap()); + for shard in t_2_shards { + tenants.insert(shard.tenant_shard_id, shard); + } + tenants.insert(t_3_shards[0].tenant_shard_id, t_3_shards.pop().unwrap()); + + let nodes = make_test_nodes(3, &[]); + let mut scheduler = Scheduler::new(nodes.values()); + let mut context = ScheduleContext::default(); + for shard in tenants.values_mut() { + shard.schedule(&mut scheduler, &mut context).unwrap(); + } + + let mut iter = TenantShardContextIterator::new(&mut tenants, ScheduleMode::Speculative); + let (tenant_id, context, shards) = iter.next().unwrap(); + assert_eq!(tenant_id, t1_id); + assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); + assert_eq!(shards.len(), 1); + assert_eq!(context.attach_count(), 1); + + let (tenant_id, context, shards) = iter.next().unwrap(); + assert_eq!(tenant_id, t2_id); + assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); + assert_eq!(shards[1].tenant_shard_id.shard_number, ShardNumber(1)); + assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2)); + assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3)); + assert_eq!(shards.len(), 4); + assert_eq!(context.attach_count(), 4); + + let (tenant_id, context, shards) = iter.next().unwrap(); + assert_eq!(tenant_id, t3_id); + assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); + assert_eq!(shards.len(), 1); + assert_eq!(context.attach_count(), 1); + + for shard in tenants.values_mut() { + shard.intent.clear(&mut scheduler); + } + } +} diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 27c97d3b864e..2eb98ee82545 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -1574,13 +1574,20 @@ pub(crate) mod tests { ) } - fn make_test_tenant( + pub(crate) fn make_test_tenant( policy: PlacementPolicy, shard_count: ShardCount, preferred_az: Option, ) -> Vec { - let tenant_id = TenantId::generate(); + make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az) + } + pub(crate) fn make_test_tenant_with_id( + tenant_id: TenantId, + policy: PlacementPolicy, + shard_count: ShardCount, + preferred_az: Option, + ) -> Vec { (0..shard_count.count()) .map(|i| { let shard_number = ShardNumber(i); diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 284ae56be239..c1989c934d46 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 284ae56be2397fd3eaf20777fa220b2d0ad968f5 +Subproject commit c1989c934d46e04e78b3c496c8a34bcd40ddceeb diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index aed79ee87b94..d929b9a8b9f3 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit aed79ee87b94779cc52ec13e3b74eba6ada93f05 +Subproject commit d929b9a8b9f32f6fe5a0eac3e6e963f0e44e27e6 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index f5cfc6fa8985..13e9e3539419 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit f5cfc6fa898544050e821ac688adafece1ac3cff +Subproject commit 13e9e3539419003e79bd9aa29e1bc44f3fd555dd diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index 3c15b6565f6c..faebe5e5aff5 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit 3c15b6565f6c8d36d169ed9ea7412cf90cfb2a8f +Subproject commit faebe5e5aff5687908504453623778f8515529db diff --git a/vendor/revisions.json b/vendor/revisions.json index 4dae88e73dc1..abeddcadf73c 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,18 +1,18 @@ { "v17": [ "17.2", - "3c15b6565f6c8d36d169ed9ea7412cf90cfb2a8f" + "faebe5e5aff5687908504453623778f8515529db" ], "v16": [ "16.6", - "f5cfc6fa898544050e821ac688adafece1ac3cff" + "13e9e3539419003e79bd9aa29e1bc44f3fd555dd" ], "v15": [ "15.10", - "aed79ee87b94779cc52ec13e3b74eba6ada93f05" + "d929b9a8b9f32f6fe5a0eac3e6e963f0e44e27e6" ], "v14": [ "14.15", - "284ae56be2397fd3eaf20777fa220b2d0ad968f5" + "c1989c934d46e04e78b3c496c8a34bcd40ddceeb" ] }