Skip to content

Commit

Permalink
storcon: refine logic for choosing AZ on tenant creation (#10054)
Browse files Browse the repository at this point in the history
## Problem

When we update our scheduler/optimization code to respect AZs properly
(#9916), the choice of AZ
becomes a much higher-stakes decision. We will pretty much always run a
tenant in its preferred AZ, and that AZ is fixed for the lifetime of the
tenant (unless a human intervenes)

Eventually, when we do auto-balancing based on utilization, I anticipate
that part of that will be to automatically change the AZ of tenants if
our original scheduling decisions have caused imbalance, but as an
interim measure, we can at least avoid making this scheduling decision
based purely on which AZ contains the emptiest node.

This is a precursor to #9947

## Summary of changes

- When creating a tenant, instead of scheduling a shard and then reading
its preferred AZ back, make the AZ decision first.
- Instead of choosing AZ based on which node is emptiest, use the median
utilization of nodes in each AZ to pick the AZ to use. This avoids bad
AZ decisions during periods when some node has very low utilization
(such as after replacing a dead node)

I considered also making the selection a weighted pseudo-random choice
based on utilization, but wanted to avoid destabilising tests with that
for now.
  • Loading branch information
jcsp authored Dec 12, 2024
1 parent 6d56875 commit a93e3d3
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 54 deletions.
2 changes: 1 addition & 1 deletion libs/pageserver_api/src/controller_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)]
pub struct AvailabilityZone(pub String);

impl Display for AvailabilityZone {
Expand Down
93 changes: 93 additions & 0 deletions storage_controller/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,50 @@ impl Scheduler {
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
}

/// For choosing which AZ to schedule a new shard into, use this. It will return the
/// AZ with the lowest median utilization.
///
/// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded
/// node, because while tenants start out single sharded, when they grow and undergo
/// shard-split, they will occupy space on many nodes within an AZ.
///
/// We use median rather than total free space or mean utilization, because
/// we wish to avoid preferring AZs that have low-load nodes resulting from
/// recent replacements.
///
/// The practical result is that we will pick an AZ based on its median node, and
/// then actually _schedule_ the new shard onto the lowest-loaded node in that AZ.
pub(crate) fn get_az_for_new_tenant(&self) -> Option<AvailabilityZone> {
if self.nodes.is_empty() {
return None;
}

let mut scores_by_az = HashMap::new();
for (node_id, node) in &self.nodes {
let az_scores = scores_by_az.entry(&node.az).or_insert_with(Vec::new);
let score = match &node.may_schedule {
MaySchedule::Yes(utilization) => utilization.score(),
MaySchedule::No => PageserverUtilization::full().score(),
};
az_scores.push((node_id, node, score));
}

// Sort by utilization. Also include the node ID to break ties.
for scores in scores_by_az.values_mut() {
scores.sort_by_key(|i| (i.2, i.0));
}

let mut median_by_az = scores_by_az
.iter()
.map(|(az, nodes)| (*az, nodes.get(nodes.len() / 2).unwrap().2))
.collect::<Vec<_>>();
// Sort by utilization. Also include the AZ to break ties.
median_by_az.sort_by_key(|i| (i.1, i.0));

// Return the AZ with the lowest median utilization
Some(median_by_az.first().unwrap().0.clone())
}

/// Unit test access to internal state
#[cfg(test)]
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
Expand Down Expand Up @@ -1087,4 +1131,53 @@ mod tests {
intent.clear(&mut scheduler);
}
}

#[test]
fn az_scheduling_for_new_tenant() {
let az_a_tag = AvailabilityZone("az-a".to_string());
let az_b_tag = AvailabilityZone("az-b".to_string());
let nodes = test_utils::make_test_nodes(
6,
&[
az_a_tag.clone(),
az_a_tag.clone(),
az_a_tag.clone(),
az_b_tag.clone(),
az_b_tag.clone(),
az_b_tag.clone(),
],
);

let mut scheduler = Scheduler::new(nodes.values());

/// Force the utilization of a node in Scheduler's state to a particular
/// number of bytes used.
fn set_utilization(scheduler: &mut Scheduler, node_id: NodeId, shard_count: u32) {
let mut node = Node::new(
node_id,
"".to_string(),
0,
"".to_string(),
0,
scheduler.nodes.get(&node_id).unwrap().az.clone(),
);
node.set_availability(NodeAvailability::Active(test_utilization::simple(
shard_count,
0,
)));
scheduler.node_upsert(&node);
}

// Initial empty state. Scores are tied, scheduler prefers lower AZ ID.
assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));

// Put some utilization on one node in AZ A: this should change nothing, as the median hasn't changed
set_utilization(&mut scheduler, NodeId(1), 1000000);
assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));

// Put some utilization on a second node in AZ A: now the median has changed, so the scheduler
// should prefer the other AZ.
set_utilization(&mut scheduler, NodeId(2), 1000000);
assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone()));
}
}
62 changes: 18 additions & 44 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,7 @@ impl Service {
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
),
);
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
Expand Down Expand Up @@ -2109,6 +2110,16 @@ impl Service {
)
};

let preferred_az_id = {
let locked = self.inner.read().unwrap();
// Idempotency: take the existing value if the tenant already exists
if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) {
shard.preferred_az().cloned()
} else {
locked.scheduler.get_az_for_new_tenant()
}
};

// Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
// to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
// during the creation, rather than risking leaving orphan objects in S3.
Expand All @@ -2128,7 +2139,7 @@ impl Service {
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
preferred_az_id: preferred_az_id.as_ref().map(|az| az.to_string()),
})
.collect();

Expand Down Expand Up @@ -2164,6 +2175,7 @@ impl Service {
&create_req.shard_parameters,
create_req.config.clone(),
placement_policy.clone(),
preferred_az_id.as_ref(),
&mut schedule_context,
)
.await;
Expand All @@ -2177,44 +2189,6 @@ impl Service {
}
}

let preferred_azs = {
let locked = self.inner.read().unwrap();
response_shards
.iter()
.filter_map(|resp| {
let az_id = locked
.nodes
.get(&resp.node_id)
.map(|n| n.get_availability_zone_id().clone())?;

Some((resp.shard_id, az_id))
})
.collect::<Vec<_>>()
};

// Note that we persist the preferred AZ for the new shards separately.
// In theory, we could "peek" the scheduler to determine where the shard will
// land, but the subsequent "real" call into the scheduler might select a different
// node. Hence, we do this awkward update to keep things consistent.
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
})?;

{
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}

// If we failed to schedule shards, then they are still created in the controller,
// but we return an error to the requester to avoid a silent failure when someone
// tries to e.g. create a tenant whose placement policy requires more nodes than
Expand Down Expand Up @@ -2245,13 +2219,15 @@ impl Service {

/// Helper for tenant creation that does the scheduling for an individual shard. Covers both the
/// case of a new tenant and a pre-existing one.
#[allow(clippy::too_many_arguments)]
async fn do_initial_shard_scheduling(
&self,
tenant_shard_id: TenantShardId,
initial_generation: Option<Generation>,
shard_params: &ShardParameters,
config: TenantConfig,
placement_policy: PlacementPolicy,
preferred_az_id: Option<&AvailabilityZone>,
schedule_context: &mut ScheduleContext,
) -> InitialShardScheduleOutcome {
let mut locked = self.inner.write().unwrap();
Expand All @@ -2262,10 +2238,6 @@ impl Service {
Entry::Occupied(mut entry) => {
tracing::info!("Tenant shard {tenant_shard_id} already exists while creating");

// TODO: schedule() should take an anti-affinity expression that pushes
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.

if let Err(err) = entry.get_mut().schedule(scheduler, schedule_context) {
return InitialShardScheduleOutcome::ShardScheduleError(err);
}
Expand All @@ -2289,6 +2261,7 @@ impl Service {
tenant_shard_id,
ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params),
placement_policy,
preferred_az_id.cloned(),
));

state.generation = initial_generation;
Expand Down Expand Up @@ -4256,7 +4229,8 @@ impl Service {
},
);

let mut child_state = TenantShard::new(child, child_shard, policy.clone());
let mut child_state =
TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone());
child_state.intent = IntentState::single(scheduler, Some(pageserver));
child_state.observed = ObservedState {
locations: child_observed,
Expand Down
15 changes: 6 additions & 9 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ impl TenantShard {
tenant_shard_id: TenantShardId,
shard: ShardIdentity,
policy: PlacementPolicy,
preferred_az_id: Option<AvailabilityZone>,
) -> Self {
metrics::METRICS_REGISTRY
.metrics_group
Expand All @@ -495,7 +496,7 @@ impl TenantShard {
last_error: Arc::default(),
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_az_id: None,
preferred_az_id,
}
}

Expand Down Expand Up @@ -1571,6 +1572,7 @@ pub(crate) mod tests {
)
.unwrap(),
policy,
None,
)
}

Expand All @@ -1597,7 +1599,7 @@ pub(crate) mod tests {
shard_number,
shard_count,
};
let mut ts = TenantShard::new(
TenantShard::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
Expand All @@ -1606,13 +1608,8 @@ pub(crate) mod tests {
)
.unwrap(),
policy.clone(),
);

if let Some(az) = &preferred_az {
ts.set_preferred_az(az.clone());
}

ts
preferred_az.clone(),
)
})
.collect()
}
Expand Down

1 comment on commit a93e3d3

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7740 tests run: 7407 passed, 2 failed, 331 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_idle_checkpoints[debug-pg17] or test_pageserver_small_inmemory_layers[debug-pg17-True]"
Flaky tests (4)

Postgres 17

Postgres 15

  • test_pgdata_import_smoke[None-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64

Test coverage report is not available

The comment gets automatically updated with the latest test results
a93e3d3 at 2024-12-12T20:31:01.936Z :recycle:

Please sign in to comment.