diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs index b5e90491c681..bebc62ac2f70 100644 --- a/control_plane/attachment_service/src/compute_hook.rs +++ b/control_plane/attachment_service/src/compute_hook.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, time::Duration}; use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; use control_plane::local_env::LocalEnv; use hyper::{Method, StatusCode}; -use pageserver_api::shard::{ShardIndex, ShardNumber, TenantShardId}; +use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId}; use postgres_connection::parse_host_port; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; @@ -19,8 +19,66 @@ const SLOWDOWN_DELAY: Duration = Duration::from_secs(5); pub(crate) const API_CONCURRENCY: usize = 32; -pub(super) struct ComputeHookTenant { - shards: Vec<(ShardIndex, NodeId)>, +struct ShardedComputeHookTenant { + stripe_size: ShardStripeSize, + shard_count: ShardCount, + shards: Vec<(ShardNumber, NodeId)>, +} + +enum ComputeHookTenant { + Unsharded(NodeId), + Sharded(ShardedComputeHookTenant), +} + +impl ComputeHookTenant { + /// Construct with at least one shard's information + fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self { + if tenant_shard_id.shard_count.count() > 1 { + Self::Sharded(ShardedComputeHookTenant { + shards: vec![(tenant_shard_id.shard_number, node_id)], + stripe_size, + shard_count: tenant_shard_id.shard_count, + }) + } else { + Self::Unsharded(node_id) + } + } + + /// Set one shard's location. If stripe size or shard count have changed, Self is reset + /// and drops existing content. + fn update( + &mut self, + tenant_shard_id: TenantShardId, + stripe_size: ShardStripeSize, + node_id: NodeId, + ) { + match self { + Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => { + *existing_node_id = node_id + } + Self::Sharded(sharded_tenant) + if sharded_tenant.stripe_size == stripe_size + && sharded_tenant.shard_count == tenant_shard_id.shard_count => + { + if let Some(existing) = sharded_tenant + .shards + .iter() + .position(|s| s.0 == tenant_shard_id.shard_number) + { + sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id; + } else { + sharded_tenant + .shards + .push((tenant_shard_id.shard_number, node_id)); + sharded_tenant.shards.sort_by_key(|s| s.0) + } + } + _ => { + // Shard count changed: reset struct. + *self = Self::new(tenant_shard_id, stripe_size, node_id); + } + } + } } #[derive(Serialize, Deserialize, Debug)] @@ -33,6 +91,7 @@ struct ComputeHookNotifyRequestShard { #[derive(Serialize, Deserialize, Debug)] struct ComputeHookNotifyRequest { tenant_id: TenantId, + stripe_size: Option, shards: Vec, } @@ -63,42 +122,43 @@ pub(crate) enum NotifyError { } impl ComputeHookTenant { - async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> Option { - // Find the highest shard count and drop any shards that aren't - // for that shard count. - let shard_count = self.shards.iter().map(|(k, _v)| k.shard_count).max(); - let Some(shard_count) = shard_count else { - // No shards, nothing to do. - tracing::info!("ComputeHookTenant::maybe_reconfigure: no shards"); - return None; - }; - - self.shards.retain(|(k, _v)| k.shard_count == shard_count); - self.shards - .sort_by_key(|(shard, _node_id)| shard.shard_number); - - if self.shards.len() == shard_count.count() as usize || shard_count.is_unsharded() { - // We have pageservers for all the shards: emit a configuration update - return Some(ComputeHookNotifyRequest { + fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option { + match self { + Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest { tenant_id, - shards: self - .shards - .iter() - .map(|(shard, node_id)| ComputeHookNotifyRequestShard { - shard_number: shard.shard_number, - node_id: *node_id, - }) - .collect(), - }); - } else { - tracing::info!( - "ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})", - self.shards.len(), - shard_count.count() - ); - } + shards: vec![ComputeHookNotifyRequestShard { + shard_number: ShardNumber(0), + node_id: *node_id, + }], + stripe_size: None, + }), + Self::Sharded(sharded_tenant) + if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize => + { + Some(ComputeHookNotifyRequest { + tenant_id, + shards: sharded_tenant + .shards + .iter() + .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard { + shard_number: *shard_number, + node_id: *node_id, + }) + .collect(), + stripe_size: Some(sharded_tenant.stripe_size), + }) + } + Self::Sharded(sharded_tenant) => { + // Sharded tenant doesn't yet have information for all its shards - None + tracing::info!( + "ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})", + sharded_tenant.shards.len(), + sharded_tenant.shard_count.count() + ); + None + } + } } } @@ -139,7 +199,11 @@ impl ComputeHook { }; let cplane = ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane"); - let ComputeHookNotifyRequest { tenant_id, shards } = reconfigure_request; + let ComputeHookNotifyRequest { + tenant_id, + shards, + stripe_size, + } = reconfigure_request; let compute_pageservers = shards .into_iter() @@ -156,7 +220,9 @@ impl ComputeHook { for (endpoint_name, endpoint) in &cplane.endpoints { if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running { tracing::info!("Reconfiguring endpoint {}", endpoint_name,); - endpoint.reconfigure(compute_pageservers.clone()).await?; + endpoint + .reconfigure(compute_pageservers.clone(), stripe_size) + .await?; } } @@ -271,30 +337,26 @@ impl ComputeHook { &self, tenant_shard_id: TenantShardId, node_id: NodeId, + stripe_size: ShardStripeSize, cancel: &CancellationToken, ) -> Result<(), NotifyError> { let mut locked = self.state.lock().await; - let entry = locked - .entry(tenant_shard_id.tenant_id) - .or_insert_with(|| ComputeHookTenant { shards: Vec::new() }); - let shard_index = ShardIndex { - shard_count: tenant_shard_id.shard_count, - shard_number: tenant_shard_id.shard_number, - }; - - let mut set = false; - for (existing_shard, existing_node) in &mut entry.shards { - if *existing_shard == shard_index { - *existing_node = node_id; - set = true; + use std::collections::hash_map::Entry; + let tenant = match locked.entry(tenant_shard_id.tenant_id) { + Entry::Vacant(e) => e.insert(ComputeHookTenant::new( + tenant_shard_id, + stripe_size, + node_id, + )), + Entry::Occupied(e) => { + let tenant = e.into_mut(); + tenant.update(tenant_shard_id, stripe_size, node_id); + tenant } - } - if !set { - entry.shards.push((shard_index, node_id)); - } + }; - let reconfigure_request = entry.maybe_reconfigure(tenant_shard_id.tenant_id).await; + let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id); let Some(reconfigure_request) = reconfigure_request else { // The tenant doesn't yet have pageservers for all its shards: we won't notify anything // until it does. @@ -316,3 +378,85 @@ impl ComputeHook { } } } + +#[cfg(test)] +pub(crate) mod tests { + use pageserver_api::shard::{ShardCount, ShardNumber}; + use utils::id::TenantId; + + use super::*; + + #[test] + fn tenant_updates() -> anyhow::Result<()> { + let tenant_id = TenantId::generate(); + let mut tenant_state = ComputeHookTenant::new( + TenantShardId { + tenant_id, + shard_count: ShardCount::new(0), + shard_number: ShardNumber(0), + }, + ShardStripeSize(12345), + NodeId(1), + ); + + // An unsharded tenant is always ready to emit a notification + assert!(tenant_state.maybe_reconfigure(tenant_id).is_some()); + assert_eq!( + tenant_state + .maybe_reconfigure(tenant_id) + .unwrap() + .shards + .len(), + 1 + ); + assert!(tenant_state + .maybe_reconfigure(tenant_id) + .unwrap() + .stripe_size + .is_none()); + + // Writing the first shard of a multi-sharded situation (i.e. in a split) + // resets the tenant state and puts it in an non-notifying state (need to + // see all shards) + tenant_state.update( + TenantShardId { + tenant_id, + shard_count: ShardCount::new(2), + shard_number: ShardNumber(1), + }, + ShardStripeSize(32768), + NodeId(1), + ); + assert!(tenant_state.maybe_reconfigure(tenant_id).is_none()); + + // Writing the second shard makes it ready to notify + tenant_state.update( + TenantShardId { + tenant_id, + shard_count: ShardCount::new(2), + shard_number: ShardNumber(0), + }, + ShardStripeSize(32768), + NodeId(1), + ); + + assert!(tenant_state.maybe_reconfigure(tenant_id).is_some()); + assert_eq!( + tenant_state + .maybe_reconfigure(tenant_id) + .unwrap() + .shards + .len(), + 2 + ); + assert_eq!( + tenant_state + .maybe_reconfigure(tenant_id) + .unwrap() + .stripe_size, + Some(ShardStripeSize(32768)) + ); + + Ok(()) + } +} diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index d4f940373f90..0fa6e8e2f8d5 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -565,7 +565,12 @@ impl Reconciler { if let Some(node_id) = self.intent.attached { let result = self .compute_hook - .notify(self.tenant_shard_id, node_id, &self.cancel) + .notify( + self.tenant_shard_id, + node_id, + self.shard.stripe_size, + &self.cancel, + ) .await; if let Err(e) = &result { // It is up to the caller whether they want to drop out on this error, but they don't have to: diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index bc34c9dcf6f7..ff35567ff388 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -283,7 +283,11 @@ impl Service { // emit a compute notification for this. In the case where our observed state does not // yet match our intent, we will eventually reconcile, and that will emit a compute notification. if let Some(attached_at) = tenant_state.stably_attached() { - compute_notifications.push((*tenant_shard_id, attached_at)); + compute_notifications.push(( + *tenant_shard_id, + attached_at, + tenant_state.shard.stripe_size, + )); } } } @@ -493,7 +497,7 @@ impl Service { /// Returns a set of any shards for which notifications where not acked within the deadline. async fn compute_notify_many( &self, - notifications: Vec<(TenantShardId, NodeId)>, + notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>, deadline: Instant, ) -> HashSet { let compute_hook = self.inner.read().unwrap().compute_hook.clone(); @@ -504,11 +508,14 @@ impl Service { // Construct an async stream of futures to invoke the compute notify function: we do this // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. let mut stream = futures::stream::iter(notifications.into_iter()) - .map(|(tenant_shard_id, node_id)| { + .map(|(tenant_shard_id, node_id, stripe_size)| { let compute_hook = compute_hook.clone(); let cancel = self.cancel.clone(); async move { - if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await { + if let Err(e) = compute_hook + .notify(tenant_shard_id, node_id, stripe_size, &cancel) + .await + { tracing::error!( %tenant_shard_id, %node_id, @@ -1396,7 +1403,10 @@ impl Service { // First check if this is a creation or an update let create_or_update = self.tenant_location_config_prepare(tenant_id, req); - let mut result = TenantLocationConfigResponse { shards: Vec::new() }; + let mut result = TenantLocationConfigResponse { + shards: Vec::new(), + stripe_size: None, + }; let waiters = match create_or_update { TenantCreateOrUpdate::Create((create_req, placement_policy)) => { let (create_resp, waiters) = @@ -1452,6 +1462,11 @@ impl Service { continue; }; + // Update stripe size + if result.stripe_size.is_none() && shard.shard.count.count() > 1 { + result.stripe_size = Some(shard.shard.stripe_size); + } + shard.policy = placement_policy; shard.config = tenant_config; if let Some(generation) = update_generation { @@ -2456,7 +2471,7 @@ impl Service { // as at this point in the split process we have succeeded and this part is infallible: // we will never need to do any special recovery from this state. - child_locations.push((child, pageserver)); + child_locations.push((child, pageserver, child_shard.stripe_size)); tenants.insert(child, child_state); response.new_shards.push(child); @@ -2466,8 +2481,11 @@ impl Service { // Send compute notifications for all the new shards let mut failed_notifications = Vec::new(); - for (child_id, child_ps) in child_locations { - if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await { + for (child_id, child_ps, stripe_size) in child_locations { + if let Err(e) = compute_hook + .notify(child_id, child_ps, stripe_size, &self.cancel) + .await + { tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})", child_id, child_ps); failed_notifications.push(child_id); diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index cf647a5f9bc3..1feec5cd9bac 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1024,7 +1024,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re }) .collect::>() }; - endpoint.reconfigure(pageservers).await?; + endpoint.reconfigure(pageservers, None).await?; } "stop" => { let endpoint_id = sub_args diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 5a75bc2a1dcd..10e4c5d69fec 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -52,6 +52,7 @@ use compute_api::spec::RemoteExtSpec; use compute_api::spec::Role; use nix::sys::signal::kill; use nix::sys::signal::Signal; +use pageserver_api::shard::ShardStripeSize; use serde::{Deserialize, Serialize}; use url::Host; use utils::id::{NodeId, TenantId, TimelineId}; @@ -735,7 +736,11 @@ impl Endpoint { } } - pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> { + pub async fn reconfigure( + &self, + mut pageservers: Vec<(Host, u16)>, + stripe_size: Option, + ) -> Result<()> { let mut spec: ComputeSpec = { let spec_path = self.endpoint_path().join("spec.json"); let file = std::fs::File::open(spec_path)?; @@ -765,6 +770,9 @@ impl Endpoint { let pageserver_connstr = Self::build_pageserver_connstr(&pageservers); assert!(!pageserver_connstr.is_empty()); spec.pageserver_connstring = Some(pageserver_connstr); + if stripe_size.is_some() { + spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize); + } let client = reqwest::Client::new(); let response = client diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d583866290f3..57497e38313e 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -435,6 +435,8 @@ pub struct TenantShardLocation { #[serde(deny_unknown_fields)] pub struct TenantLocationConfigResponse { pub shards: Vec, + // If the shards' ShardCount count is >1, stripe_size will be set. + pub stripe_size: Option, } #[derive(Serialize, Deserialize, Debug)] diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 19b5fb7e799e..d924224a321f 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -1339,6 +1339,10 @@ components: type: array items: $ref: "#/components/schemas/TenantShardLocation" + stripe_size: + description: If multiple shards are present, this field contains the sharding stripe size, else it is null. + type: integer + nullable: true TenantShardLocation: type: object required: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9d92fbaee05f..6aaf1ab27e46 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1451,11 +1451,12 @@ async fn put_tenant_location_config_handler( tenant::SpawnMode::Eager }; - let attached = state + let tenant = state .tenant_manager .upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx) - .await? - .is_some(); + .await?; + let stripe_size = tenant.as_ref().map(|t| t.get_shard_stripe_size()); + let attached = tenant.is_some(); if let Some(_flush_ms) = flush { match state @@ -1477,12 +1478,20 @@ async fn put_tenant_location_config_handler( // This API returns a vector of pageservers where the tenant is attached: this is // primarily for use in the sharding service. For compatibilty, we also return this // when called directly on a pageserver, but the payload is always zero or one shards. - let mut response = TenantLocationConfigResponse { shards: Vec::new() }; + let mut response = TenantLocationConfigResponse { + shards: Vec::new(), + stripe_size: None, + }; if attached { response.shards.push(TenantShardLocation { shard_id: tenant_shard_id, node_id: state.conf.id, - }) + }); + if tenant_shard_id.shard_count.count() > 1 { + // Stripe size should be set if we are attached + debug_assert!(stripe_size.is_some()); + response.stripe_size = stripe_size; + } } json_response(StatusCode::OK, response) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3423b50eaadc..b24c06c4daa3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -22,6 +22,7 @@ use pageserver_api::models; use pageserver_api::models::TimelineState; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::ShardIdentity; +use pageserver_api::shard::ShardStripeSize; use pageserver_api::shard::TenantShardId; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; @@ -2086,6 +2087,10 @@ impl Tenant { &self.tenant_shard_id } + pub(crate) fn get_shard_stripe_size(&self) -> ShardStripeSize { + self.shard_identity.stripe_size + } + pub(crate) fn get_generation(&self) -> Generation { self.generation } diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index bc77dfd084cd..aecc244a4723 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -1,7 +1,7 @@ import time from collections import defaultdict from datetime import datetime, timezone -from typing import Any, Dict, List +from typing import Any, Dict, List, Union import pytest from fixtures.log_helper import log @@ -443,10 +443,12 @@ def handler(request: Request): # Initial notification from tenant creation assert len(notifications) == 1 - expect = { + expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = { "tenant_id": str(env.initial_tenant), + "stripe_size": None, "shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}], } + assert notifications[0] == expect env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) @@ -460,6 +462,7 @@ def node_evacuated(node_id: int) -> None: log.info(f"notifications: {notifications}") expect = { "tenant_id": str(env.initial_tenant), + "stripe_size": None, "shards": [{"node_id": int(env.pageservers[1].id), "shard_number": 0}], } @@ -475,10 +478,27 @@ def received_migration_notification(): def received_restart_notification(): assert len(notifications) == 3 - assert notifications[1] == expect + assert notifications[2] == expect wait_until(10, 1, received_restart_notification) + # Splitting a tenant should cause its stripe size to become visible in the compute notification + env.attachment_service.tenant_shard_split(env.initial_tenant, shard_count=2) + expect = { + "tenant_id": str(env.initial_tenant), + "stripe_size": 32768, + "shards": [ + {"node_id": int(env.pageservers[1].id), "shard_number": 0}, + {"node_id": int(env.pageservers[1].id), "shard_number": 1}, + ], + } + + def received_split_notification(): + assert len(notifications) == 4 + assert notifications[3] == expect + + wait_until(10, 1, received_split_notification) + env.attachment_service.consistency_check()