diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 180ab5f0c530..c349e2b9bfbd 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -966,6 +966,8 @@ impl Service { let res = self.heartbeater.heartbeat(nodes).await; if let Ok(deltas) = res { + let mut to_handle = Vec::default(); + for (node_id, state) in deltas.0 { let new_availability = match state { PageserverState::Available { utilization, .. } => { @@ -997,14 +999,27 @@ impl Service { } }; + let node_lock = trace_exclusive_lock( + &self.node_op_locks, + node_id, + NodeOperations::Configure, + ) + .await; + // This is the code path for geniune availability transitions (i.e node // goes unavailable and/or comes back online). let res = self - .node_configure(node_id, Some(new_availability), None) + .node_state_configure(node_id, Some(new_availability), None, &node_lock) .await; match res { - Ok(()) => {} + Ok(transition) => { + // Keep hold of the lock until the availability transitions + // have been handled in + // [`Service::handle_node_availability_transitions`] in order avoid + // racing with [`Service::external_node_configure`]. + to_handle.push((node_id, node_lock, transition)); + } Err(ApiError::NotFound(_)) => { // This should be rare, but legitimate since the heartbeats are done // on a snapshot of the nodes. @@ -1014,13 +1029,37 @@ impl Service { // Transition to active involves reconciling: if a node responds to a heartbeat then // becomes unavailable again, we may get an error here. tracing::error!( - "Failed to update node {} after heartbeat round: {}", + "Failed to update node state {} after heartbeat round: {}", node_id, err ); } } } + + // We collected all the transitions above and now we handle them. + let res = self.handle_node_availability_transitions(to_handle).await; + if let Err(errs) = res { + for (node_id, err) in errs { + match err { + ApiError::NotFound(_) => { + // This should be rare, but legitimate since the heartbeats are done + // on a snapshot of the nodes. + tracing::info!( + "Node {} was not found after heartbeat round", + node_id + ); + } + err => { + tracing::error!( + "Failed to handle availability transition for {} after heartbeat round: {}", + node_id, + err + ); + } + } + } + } } } } @@ -5299,15 +5338,17 @@ impl Service { Ok(()) } - pub(crate) async fn node_configure( + /// Configure in-memory and persistent state of a node as requested + /// + /// Note that this function does not trigger any immediate side effects in response + /// to the changes. That part is handled by [`Self::handle_node_availability_transition`]. + async fn node_state_configure( &self, node_id: NodeId, availability: Option, scheduling: Option, - ) -> Result<(), ApiError> { - let _node_lock = - trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await; - + node_lock: &TracingExclusiveGuard, + ) -> Result { if let Some(scheduling) = scheduling { // Scheduling is a persistent part of Node: we must write updates to the database before // applying them in memory @@ -5336,7 +5377,7 @@ impl Service { }; if matches!(availability_transition, AvailabilityTransition::ToActive) { - self.node_activate_reconcile(activate_node, &_node_lock) + self.node_activate_reconcile(activate_node, node_lock) .await?; } availability_transition @@ -5346,7 +5387,7 @@ impl Service { // Apply changes from the request to our in-memory state for the Node let mut locked = self.inner.write().unwrap(); - let (nodes, tenants, scheduler) = locked.parts_mut(); + let (nodes, _tenants, scheduler) = locked.parts_mut(); let mut new_nodes = (**nodes).clone(); @@ -5356,8 +5397,8 @@ impl Service { )); }; - if let Some(availability) = availability.as_ref() { - node.set_availability(availability.clone()); + if let Some(availability) = availability { + node.set_availability(availability); } if let Some(scheduling) = scheduling { @@ -5368,11 +5409,30 @@ impl Service { scheduler.node_upsert(node); let new_nodes = Arc::new(new_nodes); + locked.nodes = new_nodes; + Ok(availability_transition) + } + + /// Handle availability transition of one node + /// + /// Note that you should first call [`Self::node_state_configure`] to update + /// the in-memory state referencing that node. If you need to handle more than one transition + /// consider using [`Self::handle_node_availability_transitions`]. + async fn handle_node_availability_transition( + &self, + node_id: NodeId, + transition: AvailabilityTransition, + _node_lock: &TracingExclusiveGuard, + ) -> Result<(), ApiError> { // Modify scheduling state for any Tenants that are affected by a change in the node's availability state. - match availability_transition { + match transition { AvailabilityTransition::ToOffline => { tracing::info!("Node {} transition to offline", node_id); + + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + let mut tenants_affected: usize = 0; for (tenant_shard_id, tenant_shard) in tenants { @@ -5382,14 +5442,14 @@ impl Service { observed_loc.conf = None; } - if new_nodes.len() == 1 { + if nodes.len() == 1 { // Special case for single-node cluster: there is no point trying to reschedule // any tenant shards: avoid doing so, in order to avoid spewing warnings about // failures to schedule them. continue; } - if !new_nodes + if !nodes .values() .any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_))) { @@ -5415,10 +5475,7 @@ impl Service { tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id); } Ok(()) => { - if self - .maybe_reconcile_shard(tenant_shard, &new_nodes) - .is_some() - { + if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() { tenants_affected += 1; }; } @@ -5433,9 +5490,13 @@ impl Service { } AvailabilityTransition::ToActive => { tracing::info!("Node {} transition to active", node_id); + + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, _scheduler) = locked.parts_mut(); + // When a node comes back online, we must reconcile any tenant that has a None observed // location on the node. - for tenant_shard in locked.tenants.values_mut() { + for tenant_shard in tenants.values_mut() { // If a reconciliation is already in progress, rely on the previous scheduling // decision and skip triggering a new reconciliation. if tenant_shard.reconciler.is_some() { @@ -5444,7 +5505,7 @@ impl Service { if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { if observed_loc.conf.is_none() { - self.maybe_reconcile_shard(tenant_shard, &new_nodes); + self.maybe_reconcile_shard(tenant_shard, nodes); } } } @@ -5465,11 +5526,54 @@ impl Service { } } - locked.nodes = new_nodes; - Ok(()) } + /// Handle availability transition for multiple nodes + /// + /// Note that you should first call [`Self::node_state_configure`] for + /// all nodes being handled here for the handling to use fresh in-memory state. + async fn handle_node_availability_transitions( + &self, + transitions: Vec<( + NodeId, + TracingExclusiveGuard, + AvailabilityTransition, + )>, + ) -> Result<(), Vec<(NodeId, ApiError)>> { + let mut errors = Vec::default(); + for (node_id, node_lock, transition) in transitions { + let res = self + .handle_node_availability_transition(node_id, transition, &node_lock) + .await; + if let Err(err) = res { + errors.push((node_id, err)); + } + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } + + pub(crate) async fn node_configure( + &self, + node_id: NodeId, + availability: Option, + scheduling: Option, + ) -> Result<(), ApiError> { + let node_lock = + trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await; + + let transition = self + .node_state_configure(node_id, availability, scheduling, &node_lock) + .await?; + self.handle_node_availability_transition(node_id, transition, &node_lock) + .await + } + /// Wrapper around [`Self::node_configure`] which only allows changes while there is no ongoing /// operation for HTTP api. pub(crate) async fn external_node_configure(