From 665647ca9047cce24d45a48acc41291703feeabf Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 13 Sep 2023 13:55:14 +0100 Subject: [PATCH] pageserver: implement new style attachment location config API --- libs/pageserver_api/src/models.rs | 46 +++++++++++ pageserver/src/http/routes.rs | 49 +++++++++++- pageserver/src/tenant.rs | 11 +++ pageserver/src/tenant/config.rs | 49 ++++++++++++ pageserver/src/tenant/mgr.rs | 128 ++++++++++++++++++++++++++++++ 5 files changed, 281 insertions(+), 2 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index f354296be223c..40dcb5be1ffff 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -10,6 +10,7 @@ use serde_with::{serde_as, DisplayFromStr}; use strum_macros; use utils::{ completion, + generation::Generation, history_buffer::HistoryBufferWithDropCounter, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, @@ -218,6 +219,8 @@ impl std::ops::Deref for TenantCreateRequest { } } +/// An alternative representation of `pageserver::tenant::TenantConf` with +/// simpler types. #[derive(Serialize, Deserialize, Debug, Default)] pub struct TenantConfig { pub checkpoint_distance: Option, @@ -243,6 +246,39 @@ pub struct TenantConfig { pub gc_feedback: Option, } +/// A flattened analog of a `pagesever::tenant::LocationMode`, which +/// lists out all possible states (and the virtual "Detached" state) +/// in a flat form rather than using rust-style enums. +#[derive(Serialize, Deserialize, Debug)] +pub enum LocationConfigMode { + AttachedSingle, + AttachedMulti, + AttachedStale, + Secondary, + Detached, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct LocationConfigSecondary { + pub warm: bool, +} + +/// An alternative representation of `pageserver::tenant::LocationConf`, +/// for use in external-facing APIs. +#[derive(Serialize, Deserialize, Debug)] +pub struct LocationConfig { + pub mode: LocationConfigMode, + /// If attaching, in what generation? + #[serde(default)] + pub generation: Option, + #[serde(default)] + pub secondary_conf: Option, + + // If requesting mode `Secondary`, configuration for that. + // Custom storage configuration for the tenant, if any + pub tenant_conf: TenantConfig, +} + #[serde_as] #[derive(Serialize, Deserialize)] #[serde(transparent)] @@ -253,6 +289,16 @@ pub struct StatusResponse { pub id: NodeId, } +#[serde_as] +#[derive(Serialize, Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct TenantLocationConfigRequest { + #[serde_as(as = "DisplayFromStr")] + pub tenant_id: TenantId, + #[serde(flatten)] + pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it +} + #[serde_as] #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3625cadd76316..fec5f4119b19f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -9,7 +9,8 @@ use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::{ - DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest, TenantLoadRequest, + DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest, + TenantLoadRequest, TenantLocationConfigRequest, }; use remote_storage::GenericRemoteStorage; use tenant_size_model::{SizeResult, StorageModel}; @@ -27,7 +28,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; -use crate::tenant::config::TenantConfOpt; +use crate::tenant::config::{LocationConf, TenantConfOpt}; use crate::tenant::mgr::{ GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError, }; @@ -986,6 +987,47 @@ async fn update_tenant_config_handler( json_response(StatusCode::OK, ()) } +async fn put_tenant_location_config_handler( + mut request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let request_data: TenantLocationConfigRequest = json_request(&mut request).await?; + let tenant_id = request_data.tenant_id; + check_permission(&request, Some(tenant_id))?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let state = get_state(&request); + let conf = state.conf; + + // The `Detached` state is special, it doesn't upsert a tenant, it removes + // its local disk content and drops it from memory. + if let LocationConfigMode::Detached = request_data.config.mode { + mgr::detach_tenant(conf, tenant_id, true) + .instrument(info_span!("tenant_detach", %tenant_id)) + .await?; + return json_response(StatusCode::OK, ()); + } + + let location_conf = + LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?; + + mgr::upsert_location( + state.conf, + tenant_id, + location_conf, + state.broker_client.clone(), + state.remote_storage.clone(), + &ctx, + ) + .await + // TODO: badrequest assumes the caller was asking for something unreasonable, but in + // principle we might have hit something like concurrent API calls to the same tenant, + // which is not a 400 but a 409. + .map_err(ApiError::BadRequest)?; + + json_response(StatusCode::OK, ()) +} + /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`]. async fn handle_tenant_break( r: Request, @@ -1406,6 +1448,9 @@ pub fn make_router( .get("/v1/tenant/:tenant_id/config", |r| { api_handler(r, get_tenant_config_handler) }) + .put("/v1/tenant/:tenant_id/location_config", |r| { + api_handler(r, put_tenant_location_config_handler) + }) .get("/v1/tenant/:tenant_id/timeline", |r| { api_handler(r, timeline_list_handler) }) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cc54eec31474f..3b064e2488d21 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2197,6 +2197,17 @@ impl Tenant { } } + pub(crate) fn set_new_location_config(&self, new_conf: LocationConf) { + *self.tenant_conf.write().unwrap() = new_conf; + // Don't hold self.timelines.lock() during the notifies. + // There's no risk of deadlock right now, but there could be if we consolidate + // mutexes in struct Timeline in the future. + let timelines = self.list_timelines(); + for timeline in timelines { + timeline.tenant_conf_updated(); + } + } + /// Helper function to create a new Timeline struct. /// /// The returned Timeline is in Loading state. The caller is responsible for diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 5646cb98b41f2..f9adcf5877b63 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -196,6 +196,55 @@ impl LocationConf { } } +impl TryFrom<&'_ models::LocationConfig> for LocationConf { + type Error = anyhow::Error; + + fn try_from(conf: &'_ models::LocationConfig) -> Result { + let tenant_conf = TenantConfOpt::try_from(&conf.tenant_conf)?; + + fn get_generation(conf: &'_ models::LocationConfig) -> Result { + conf.generation + .ok_or_else(|| anyhow::anyhow!("Generation must be set when attaching")) + } + + let mode = match &conf.mode { + models::LocationConfigMode::AttachedMulti => { + LocationMode::Attached(AttachedLocationConfig { + generation: get_generation(conf)?, + attach_mode: AttachmentMode::Multi, + }) + } + models::LocationConfigMode::AttachedSingle => { + LocationMode::Attached(AttachedLocationConfig { + generation: get_generation(conf)?, + attach_mode: AttachmentMode::Single, + }) + } + models::LocationConfigMode::AttachedStale => { + LocationMode::Attached(AttachedLocationConfig { + generation: get_generation(conf)?, + attach_mode: AttachmentMode::Stale, + }) + } + models::LocationConfigMode::Secondary => { + let warm = conf + .secondary_conf + .as_ref() + .map(|c| c.warm) + .unwrap_or(false); + LocationMode::Secondary(SecondaryLocationConfig { warm }) + } + models::LocationConfigMode::Detached => { + // Should not have been called: API code should translate this mode + // into a detach rather than trying to decode it as a LocationConf + return Err(anyhow::anyhow!("Cannot decode a Detached configuration")); + } + }; + + Ok(Self { mode, tenant_conf }) + } +} + impl Default for LocationConf { // TODO: this should be removed once tenant loading can guarantee that we are never // loading from a directory without a configuration. diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d7cbbea9cd89f..186a89bdd276a 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -582,6 +582,110 @@ pub async fn set_new_tenant_config( Ok(()) } +#[instrument(skip_all, fields(tenant_id, new_location_config))] +pub(crate) async fn upsert_location( + conf: &'static PageServerConf, + tenant_id: TenantId, + new_location_config: LocationConf, + broker_client: storage_broker::BrokerClientChannel, + remote_storage: Option, + ctx: &RequestContext, +) -> Result<(), anyhow::Error> { + info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); + + let mut existing_tenant = match get_tenant(tenant_id, false).await { + Ok(t) => Some(t), + Err(GetTenantError::NotFound(_)) => None, + Err(e) => anyhow::bail!(e), + }; + + // If we need to shut down a Tenant, do that first + let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) { + (LocationMode::Secondary(_), Some(t)) => Some(t), + (LocationMode::Attached(attach_conf), Some(t)) => { + if attach_conf.generation != t.generation { + Some(t) + } else { + None + } + } + _ => None, + }; + + // TODO: reason about what happens if a concurrent API call lands + // while we are in the middle of this. We should not hold the + // TenantsMap lock across waiting for shutdown + + if let Some(tenant) = shutdown_tenant { + let (_guard, progress) = utils::completion::channel(); + info!("Shutting down attached tenant"); + match tenant.shutdown(progress, false).await { + Ok(()) => {} + Err(barrier) => { + info!("Shutdown already in progress, waiting for it to complete"); + barrier.wait().await; + } + } + existing_tenant = None; + } + + if let Some(tenant) = existing_tenant { + // Update the existing tenant + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; + tenant.set_new_location_config(new_location_config); + } else { + // Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock, + // and re-check that the state of anything we are replacing is as expected. + tenant_map_upsert_slot(tenant_id, |old_value| async move { + if let Some(TenantSlot::Attached(t)) = old_value { + if !matches!(t.current_state(), TenantState::Stopping { .. }) { + anyhow::bail!("Tenant state changed during location configuration update"); + } + } + + let new_slot = match &new_location_config.mode { + LocationMode::Secondary(_) => TenantSlot::Secondary, + LocationMode::Attached(_attach_config) => { + // Do a schedule_local_tenant_processing + // FIXME: should avoid doing this disk I/O inside the TenantsMap lock, + // we have the same problem in load_tenant/attach_tenant. Probably + // need a lock in TenantSlot to fix this. + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; + let tenant_path = conf.tenant_path(&tenant_id); + let resources = TenantSharedResources { + broker_client, + remote_storage, + }; + let new_tenant = schedule_local_tenant_processing( + conf, + tenant_id, + &tenant_path, + new_location_config, + resources, + None, + &TENANTS, + ctx, + ) + .with_context(|| { + format!("Failed to schedule tenant processing in path {tenant_path:?}") + })?; + + TenantSlot::Attached(new_tenant) + } + }; + + Ok(new_slot) + }) + .await?; + } + + Ok(()) +} + #[derive(Debug, thiserror::Error)] pub enum GetTenantError { #[error("Tenant {0} not found")] @@ -887,6 +991,30 @@ where } } +async fn tenant_map_upsert_slot<'a, F, R>( + tenant_id: TenantId, + upsert_fn: F, +) -> Result<(), TenantMapInsertError> +where + F: FnOnce(Option) -> R, + R: std::future::Future>, +{ + let mut guard = TENANTS.write().await; + let m = match &mut *guard { + TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), + TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), + TenantsMap::Open(m) => m, + }; + + match upsert_fn(m.remove(&tenant_id)).await { + Ok(upsert_val) => { + m.insert(tenant_id, upsert_val); + Ok(()) + } + Err(e) => Err(TenantMapInsertError::Closure(e)), + } +} + /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise. /// Allows to remove other tenant resources manually, via `tenant_cleanup`. /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal