diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 6839ef69f592..ec7b81423a44 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -75,7 +75,7 @@ pub struct TenantPolicyRequest { pub scheduling: Option, } -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)] pub struct AvailabilityZone(pub String); impl Display for AvailabilityZone { diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index ecc6b11e4758..51a4cf35be0a 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -742,6 +742,50 @@ impl Scheduler { self.schedule_shard::(&[], &None, &ScheduleContext::default()) } + /// For choosing which AZ to schedule a new shard into, use this. It will return the + /// AZ with the lowest median utilization. + /// + /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded + /// node, because while tenants start out single sharded, when they grow and undergo + /// shard-split, they will occupy space on many nodes within an AZ. + /// + /// We use median rather than total free space or mean utilization, because + /// we wish to avoid preferring AZs that have low-load nodes resulting from + /// recent replacements. + /// + /// The practical result is that we will pick an AZ based on its median node, and + /// then actually _schedule_ the new shard onto the lowest-loaded node in that AZ. + pub(crate) fn get_az_for_new_tenant(&self) -> Option { + if self.nodes.is_empty() { + return None; + } + + let mut scores_by_az = HashMap::new(); + for (node_id, node) in &self.nodes { + let az_scores = scores_by_az.entry(&node.az).or_insert_with(Vec::new); + let score = match &node.may_schedule { + MaySchedule::Yes(utilization) => utilization.score(), + MaySchedule::No => PageserverUtilization::full().score(), + }; + az_scores.push((node_id, node, score)); + } + + // Sort by utilization. Also include the node ID to break ties. + for scores in scores_by_az.values_mut() { + scores.sort_by_key(|i| (i.2, i.0)); + } + + let mut median_by_az = scores_by_az + .iter() + .map(|(az, nodes)| (*az, nodes.get(nodes.len() / 2).unwrap().2)) + .collect::>(); + // Sort by utilization. Also include the AZ to break ties. + median_by_az.sort_by_key(|i| (i.1, i.0)); + + // Return the AZ with the lowest median utilization + Some(median_by_az.first().unwrap().0.clone()) + } + /// Unit test access to internal state #[cfg(test)] pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize { @@ -1087,4 +1131,53 @@ mod tests { intent.clear(&mut scheduler); } } + + #[test] + fn az_scheduling_for_new_tenant() { + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + let nodes = test_utils::make_test_nodes( + 6, + &[ + az_a_tag.clone(), + az_a_tag.clone(), + az_a_tag.clone(), + az_b_tag.clone(), + az_b_tag.clone(), + az_b_tag.clone(), + ], + ); + + let mut scheduler = Scheduler::new(nodes.values()); + + /// Force the utilization of a node in Scheduler's state to a particular + /// number of bytes used. + fn set_utilization(scheduler: &mut Scheduler, node_id: NodeId, shard_count: u32) { + let mut node = Node::new( + node_id, + "".to_string(), + 0, + "".to_string(), + 0, + scheduler.nodes.get(&node_id).unwrap().az.clone(), + ); + node.set_availability(NodeAvailability::Active(test_utilization::simple( + shard_count, + 0, + ))); + scheduler.node_upsert(&node); + } + + // Initial empty state. Scores are tied, scheduler prefers lower AZ ID. + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); + + // Put some utilization on one node in AZ A: this should change nothing, as the median hasn't changed + set_utilization(&mut scheduler, NodeId(1), 1000000); + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); + + // Put some utilization on a second node in AZ A: now the median has changed, so the scheduler + // should prefer the other AZ. + set_utilization(&mut scheduler, NodeId(2), 1000000); + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone())); + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 894b67fdc6bd..746177c08978 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1582,6 +1582,7 @@ impl Service { attach_req.tenant_shard_id, ShardIdentity::unsharded(), PlacementPolicy::Attached(0), + None, ), ); tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id); @@ -2109,6 +2110,16 @@ impl Service { ) }; + let preferred_az_id = { + let locked = self.inner.read().unwrap(); + // Idempotency: take the existing value if the tenant already exists + if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) { + shard.preferred_az().cloned() + } else { + locked.scheduler.get_az_for_new_tenant() + } + }; + // Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart // during the creation, rather than risking leaving orphan objects in S3. @@ -2128,7 +2139,7 @@ impl Service { splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default()) .unwrap(), - preferred_az_id: None, + preferred_az_id: preferred_az_id.as_ref().map(|az| az.to_string()), }) .collect(); @@ -2164,6 +2175,7 @@ impl Service { &create_req.shard_parameters, create_req.config.clone(), placement_policy.clone(), + preferred_az_id.as_ref(), &mut schedule_context, ) .await; @@ -2177,44 +2189,6 @@ impl Service { } } - let preferred_azs = { - let locked = self.inner.read().unwrap(); - response_shards - .iter() - .filter_map(|resp| { - let az_id = locked - .nodes - .get(&resp.node_id) - .map(|n| n.get_availability_zone_id().clone())?; - - Some((resp.shard_id, az_id)) - }) - .collect::>() - }; - - // Note that we persist the preferred AZ for the new shards separately. - // In theory, we could "peek" the scheduler to determine where the shard will - // land, but the subsequent "real" call into the scheduler might select a different - // node. Hence, we do this awkward update to keep things consistent. - let updated = self - .persistence - .set_tenant_shard_preferred_azs(preferred_azs) - .await - .map_err(|err| { - ApiError::InternalServerError(anyhow::anyhow!( - "Failed to persist preferred az ids: {err}" - )) - })?; - - { - let mut locked = self.inner.write().unwrap(); - for (tid, az_id) in updated { - if let Some(shard) = locked.tenants.get_mut(&tid) { - shard.set_preferred_az(az_id); - } - } - } - // If we failed to schedule shards, then they are still created in the controller, // but we return an error to the requester to avoid a silent failure when someone // tries to e.g. create a tenant whose placement policy requires more nodes than @@ -2245,6 +2219,7 @@ impl Service { /// Helper for tenant creation that does the scheduling for an individual shard. Covers both the /// case of a new tenant and a pre-existing one. + #[allow(clippy::too_many_arguments)] async fn do_initial_shard_scheduling( &self, tenant_shard_id: TenantShardId, @@ -2252,6 +2227,7 @@ impl Service { shard_params: &ShardParameters, config: TenantConfig, placement_policy: PlacementPolicy, + preferred_az_id: Option<&AvailabilityZone>, schedule_context: &mut ScheduleContext, ) -> InitialShardScheduleOutcome { let mut locked = self.inner.write().unwrap(); @@ -2262,10 +2238,6 @@ impl Service { Entry::Occupied(mut entry) => { tracing::info!("Tenant shard {tenant_shard_id} already exists while creating"); - // TODO: schedule() should take an anti-affinity expression that pushes - // attached and secondary locations (independently) away frorm those - // pageservers also holding a shard for this tenant. - if let Err(err) = entry.get_mut().schedule(scheduler, schedule_context) { return InitialShardScheduleOutcome::ShardScheduleError(err); } @@ -2289,6 +2261,7 @@ impl Service { tenant_shard_id, ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params), placement_policy, + preferred_az_id.cloned(), )); state.generation = initial_generation; @@ -4256,7 +4229,8 @@ impl Service { }, ); - let mut child_state = TenantShard::new(child, child_shard, policy.clone()); + let mut child_state = + TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone()); child_state.intent = IntentState::single(scheduler, Some(pageserver)); child_state.observed = ObservedState { locations: child_observed, diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 2eb98ee82545..f1b921646f44 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -472,6 +472,7 @@ impl TenantShard { tenant_shard_id: TenantShardId, shard: ShardIdentity, policy: PlacementPolicy, + preferred_az_id: Option, ) -> Self { metrics::METRICS_REGISTRY .metrics_group @@ -495,7 +496,7 @@ impl TenantShard { last_error: Arc::default(), pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), - preferred_az_id: None, + preferred_az_id, } } @@ -1571,6 +1572,7 @@ pub(crate) mod tests { ) .unwrap(), policy, + None, ) } @@ -1597,7 +1599,7 @@ pub(crate) mod tests { shard_number, shard_count, }; - let mut ts = TenantShard::new( + TenantShard::new( tenant_shard_id, ShardIdentity::new( shard_number, @@ -1606,13 +1608,8 @@ pub(crate) mod tests { ) .unwrap(), policy.clone(), - ); - - if let Some(az) = &preferred_az { - ts.set_preferred_az(az.clone()); - } - - ts + preferred_az.clone(), + ) }) .collect() }