Skip to content

Commit

Permalink
storcon: set scheduling policy Active when detaching
Browse files Browse the repository at this point in the history
Closes: #9957
  • Loading branch information
jcsp committed Dec 4, 2024
1 parent 9a4157d commit cee8984
Showing 1 changed file with 103 additions and 109 deletions.
212 changes: 103 additions & 109 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use pageserver_api::{
controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, TenantCreateRequest,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::{
SecondaryProgress, TenantConfigRequest, TimelineArchivalConfigRequest,
Expand Down Expand Up @@ -468,7 +468,6 @@ struct ShardSplitParams {
policy: PlacementPolicy,
config: TenantConfig,
shard_ident: ShardIdentity,
preferred_az_id: Option<AvailabilityZone>,
}

// When preparing for a shard split, we may either choose to proceed with the split,
Expand Down Expand Up @@ -513,6 +512,9 @@ struct ShardUpdate {

/// If this is None, generation is not updated.
generation: Option<Generation>,

/// If this is None, scheduling policy is not updated.
scheduling_policy: Option<ShardSchedulingPolicy>,
}

enum StopReconciliationsReason {
Expand Down Expand Up @@ -2376,6 +2378,23 @@ impl Service {
}
};

// Ordinarily we do not update scheduling policy, but when making major changes
// like detaching or demoting to secondary-only, we need to force the scheduling
// mode to Active, or the caller's expected outcome (detach it) will not happen.
let scheduling_policy = match req.config.mode {
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
// Special case: when making major changes like detaching or demoting to secondary-only,
// we need to force the scheduling mode to Active, or nothing will happen.
Some(ShardSchedulingPolicy::Active)
}
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// While attached, continue to respect whatever the existing scheduling mode is.
None
}
};

let mut create = true;
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
// Saw an existing shard: this is not a creation
Expand All @@ -2401,6 +2420,7 @@ impl Service {
placement_policy: placement_policy.clone(),
tenant_config: req.config.tenant_conf.clone(),
generation: set_generation,
scheduling_policy,
});
}

Expand Down Expand Up @@ -2492,11 +2512,13 @@ impl Service {
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.
let mut schedule_context = ScheduleContext::default();

for ShardUpdate {
tenant_shard_id,
placement_policy,
tenant_config,
generation,
scheduling_policy,
} in &updates
{
self.persistence
Expand All @@ -2505,7 +2527,7 @@ impl Service {
Some(placement_policy.clone()),
Some(tenant_config.clone()),
*generation,
None,
*scheduling_policy,
)
.await?;
}
Expand All @@ -2521,6 +2543,7 @@ impl Service {
placement_policy,
tenant_config,
generation: update_generation,
scheduling_policy,
} in updates
{
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
Expand All @@ -2539,6 +2562,10 @@ impl Service {
shard.generation = Some(generation);
}

if let Some(scheduling_policy) = scheduling_policy {
shard.set_scheduling_policy(scheduling_policy);
}

shard.schedule(scheduler, &mut schedule_context)?;

let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
Expand Down Expand Up @@ -4104,7 +4131,7 @@ impl Service {
for parent_id in parent_ids {
let child_ids = parent_id.split(new_shard_count);

let (pageserver, generation, policy, parent_ident, config, preferred_az) = {
let (pageserver, generation, policy, parent_ident, config) = {
let mut old_state = tenants
.remove(&parent_id)
.expect("It was present, we just split it");
Expand All @@ -4123,7 +4150,6 @@ impl Service {
old_state.policy.clone(),
old_state.shard,
old_state.config.clone(),
old_state.preferred_az().cloned(),
)
};

Expand Down Expand Up @@ -4156,9 +4182,6 @@ impl Service {
};
child_state.generation = Some(generation);
child_state.config = config.clone();
if let Some(preferred_az) = &preferred_az {
child_state.set_preferred_az(preferred_az.clone());
}

// The child's TenantShard::splitting is intentionally left at the default value of Idle,
// as at this point in the split process we have succeeded and this part is infallible:
Expand Down Expand Up @@ -4351,7 +4374,6 @@ impl Service {
let mut policy = None;
let mut config = None;
let mut shard_ident = None;
let mut preferred_az_id = None;
// Validate input, and calculate which shards we will create
let (old_shard_count, targets) =
{
Expand Down Expand Up @@ -4410,9 +4432,6 @@ impl Service {
if config.is_none() {
config = Some(shard.config.clone());
}
if preferred_az_id.is_none() {
preferred_az_id = shard.preferred_az().cloned();
}

if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
tracing::info!(
Expand Down Expand Up @@ -4483,7 +4502,6 @@ impl Service {
policy,
config,
shard_ident,
preferred_az_id,
})))
}

Expand All @@ -4506,7 +4524,6 @@ impl Service {
policy,
config,
shard_ident,
preferred_az_id,
} = *params;

// Drop any secondary locations: pageservers do not support splitting these, and in any case the
Expand Down Expand Up @@ -4580,7 +4597,7 @@ impl Service {
// Scheduling policies and preferred AZ do not carry through to children
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: preferred_az_id.as_ref().map(|az| az.0.clone()),
preferred_az_id: None,
});
}

Expand Down Expand Up @@ -4700,6 +4717,47 @@ impl Service {
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);

// Now that we have scheduled the child shards, attempt to set their preferred AZ
// to that of the pageserver they've been attached on.
let preferred_azs = {
let locked = self.inner.read().unwrap();
child_locations
.iter()
.filter_map(|(tid, node_id, _stripe_size)| {
let az_id = locked
.nodes
.get(node_id)
.map(|n| n.get_availability_zone_id().clone())?;

Some((*tid, az_id))
})
.collect::<Vec<_>>()
};

let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
});

match updated {
Ok(updated) => {
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}
Err(err) => {
tracing::warn!("Failed to persist preferred AZs after split: {err}");
}
}

// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps, stripe_size) in child_locations {
Expand Down Expand Up @@ -5128,38 +5186,34 @@ impl Service {
*nodes = Arc::new(nodes_mut);
}

for (_tenant_id, mut schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Normal)
{
for shard in shards {
if shard.deref_node(node_id) {
if let Err(e) = shard.schedule(scheduler, &mut schedule_context) {
// TODO: implement force flag to remove a node even if we can't reschedule
// a tenant
tracing::error!(
"Refusing to delete node, shard {} can't be rescheduled: {e}",
shard.tenant_shard_id
);
return Err(e.into());
} else {
tracing::info!(
"Rescheduled shard {} away from node during deletion",
shard.tenant_shard_id
)
}
for (tenant_shard_id, shard) in tenants {
if shard.deref_node(node_id) {
// FIXME: we need to build a ScheduleContext that reflects this shard's peers, otherwise
// it won't properly do anti-affinity.
let mut schedule_context = ScheduleContext::default();

self.maybe_reconcile_shard(shard, nodes);
if let Err(e) = shard.schedule(scheduler, &mut schedule_context) {
// TODO: implement force flag to remove a node even if we can't reschedule
// a tenant
tracing::error!("Refusing to delete node, shard {tenant_shard_id} can't be rescheduled: {e}");
return Err(e.into());
} else {
tracing::info!(
"Rescheduled shard {tenant_shard_id} away from node during deletion"
)
}

// Here we remove an existing observed location for the node we're removing, and it will
// not be re-added by a reconciler's completion because we filter out removed nodes in
// process_result.
//
// Note that we update the shard's observed state _after_ calling maybe_reconcile_shard: that
// means any reconciles we spawned will know about the node we're deleting, enabling them
// to do live migrations if it's still online.
shard.observed.locations.remove(&node_id);
self.maybe_reconcile_shard(shard, nodes);
}

// Here we remove an existing observed location for the node we're removing, and it will
// not be re-added by a reconciler's completion because we filter out removed nodes in
// process_result.
//
// Note that we update the shard's observed state _after_ calling maybe_reconcile_shard: that
// means any reconciles we spawned will know about the node we're deleting, enabling them
// to do live migrations if it's still online.
shard.observed.locations.remove(&node_id);
}

scheduler.node_remove(node_id);
Expand Down Expand Up @@ -5681,7 +5735,7 @@ impl Service {
}

match node_policy {
NodeSchedulingPolicy::Active => {
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
.await?;

Expand Down Expand Up @@ -6253,14 +6307,6 @@ impl Service {
> DOWNLOAD_FRESHNESS_THRESHOLD
{
tracing::info!("Skipping migration of {tenant_shard_id} to {node} because secondary isn't ready: {progress:?}");

#[cfg(feature = "testing")]
if progress.heatmap_mtime.is_none() {
// No heatmap might mean the attached location has never uploaded one, or that
// the secondary download hasn't happened yet. This is relatively unusual in the field,
// but fairly common in tests.
self.kick_secondary_download(tenant_shard_id).await;
}
} else {
// Location looks ready: proceed
tracing::info!(
Expand All @@ -6275,58 +6321,6 @@ impl Service {
validated_work
}

/// Some aspects of scheduling optimisation wait for secondary locations to be warm. This
/// happens on multi-minute timescales in the field, which is fine because optimisation is meant
/// to be a lazy background thing. However, when testing, it is not practical to wait around, so
/// we have this helper to move things along faster.
#[cfg(feature = "testing")]
async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) {
let (attached_node, secondary_node) = {
let locked = self.inner.read().unwrap();
let Some(shard) = locked.tenants.get(&tenant_shard_id) else {
return;
};
let (Some(attached), Some(secondary)) = (
shard.intent.get_attached(),
shard.intent.get_secondary().first(),
) else {
return;
};
(
locked.nodes.get(attached).unwrap().clone(),
locked.nodes.get(secondary).unwrap().clone(),
)
};

// Make remote API calls to upload + download heatmaps: we ignore errors because this is just
// a 'kick' to let scheduling optimisation run more promptly.
attached_node
.with_client_retries(
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;

secondary_node
.with_client_retries(
|client| async move {
client
.tenant_secondary_download(tenant_shard_id, Some(Duration::from_secs(1)))
.await
},
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
}

/// Look for shards which are oversized and in need of splitting
async fn autosplit_tenants(self: &Arc<Self>) {
let Some(split_threshold) = self.config.split_threshold else {
Expand Down

0 comments on commit cee8984

Please sign in to comment.