Skip to content

Commit

Permalink
storcon: shutdown with clean observed state (#8494)
Browse files Browse the repository at this point in the history
## Problem
Storcon shutdown did not produce a clean observed state. This is not a
problem at the moment, but we will need to stop all reconciles with
clean observed state for rolling restarts.

I tried to test this by collecting the observed state during shutdown
and comparing it with the in-memory observed
state, but it doesn't work because a lot of tests use the cursed attach
hook to create tenants directly through the ps.

## Summary of Changes
Rework storcon shutdown as follows:
* Reconcilers get a separate cancellation token which is a child token
of the global `Service::cancel`.
* Reconcilers get a separate gate
* Add a mechanism to drain the reconciler result queue before
* Put all of this together into a clean shutdown sequence

Related neondatabase/cloud#14701
  • Loading branch information
VladLazar authored Jul 25, 2024
1 parent 775c0c8 commit 3977e0a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 23 deletions.
58 changes: 37 additions & 21 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub struct Service {
config: Config,
persistence: Arc<Persistence>,
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,

heartbeater: Heartbeater,

Expand Down Expand Up @@ -308,9 +308,15 @@ pub struct Service {
// Process shutdown will fire this token
cancel: CancellationToken,

// Child token of [`Service::cancel`] used by reconcilers
reconcilers_cancel: CancellationToken,

// Background tasks will hold this gate
gate: Gate,

// Reconcilers background tasks will hold this gate
reconcilers_gate: Gate,

/// This waits for initial reconciliation with pageservers to complete. Until this barrier
/// passes, it isn't safe to do any actions that mutate tenants.
pub(crate) startup_complete: Barrier,
Expand Down Expand Up @@ -397,6 +403,11 @@ struct ShardUpdate {
generation: Option<Generation>,
}

pub(crate) enum ReconcileResultRequest {
ReconcileResult(ReconcileResult),
Stop,
}

impl Service {
pub fn get_config(&self) -> &Config {
&self.config
Expand Down Expand Up @@ -753,7 +764,7 @@ impl Service {
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);

let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
while !self.cancel.is_cancelled() {
while !self.reconcilers_cancel.is_cancelled() {
tokio::select! {
_ = interval.tick() => {
let reconciles_spawned = self.reconcile_all();
Expand All @@ -766,7 +777,7 @@ impl Service {
}
}
}
_ = self.cancel.cancelled() => return
_ = self.reconcilers_cancel.cancelled() => return
}
}
}
Expand Down Expand Up @@ -937,7 +948,7 @@ impl Service {

async fn process_results(
&self,
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResultRequest>,
mut bg_compute_hook_result_rx: tokio::sync::mpsc::Receiver<
Result<(), (TenantShardId, NotifyError)>,
>,
Expand All @@ -947,8 +958,8 @@ impl Service {
tokio::select! {
r = result_rx.recv() => {
match r {
Some(result) => {self.process_result(result);},
None => {break;}
Some(ReconcileResultRequest::ReconcileResult(result)) => {self.process_result(result);},
None | Some(ReconcileResultRequest::Stop) => {break;}
}
}
_ = async{
Expand All @@ -974,9 +985,6 @@ impl Service {
}
};
}

// We should only fall through on shutdown
assert!(self.cancel.is_cancelled());
}

async fn process_aborts(
Expand Down Expand Up @@ -1153,6 +1161,8 @@ impl Service {
tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES);

let cancel = CancellationToken::new();
let reconcilers_cancel = cancel.child_token();

let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
Expand All @@ -1178,7 +1188,9 @@ impl Service {
abort_tx,
startup_complete: startup_complete.clone(),
cancel,
reconcilers_cancel,
gate: Gate::default(),
reconcilers_gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
});
Expand Down Expand Up @@ -5132,7 +5144,7 @@ impl Service {
}
};

let Ok(gate_guard) = self.gate.enter() else {
let Ok(gate_guard) = self.reconcilers_gate.enter() else {
// Gate closed: we're shutting down, drop out.
return None;
};
Expand All @@ -5145,7 +5157,7 @@ impl Service {
&self.persistence,
units,
gate_guard,
&self.cancel,
&self.reconcilers_cancel,
)
}

Expand Down Expand Up @@ -5592,17 +5604,21 @@ impl Service {
}

pub async fn shutdown(&self) {
// Note that this already stops processing any results from reconciles: so
// we do not expect that our [`TenantShard`] objects will reach a neat
// final state.
// Cancel all on-going reconciles and wait for them to exit the gate.
tracing::info!("Shutting down: cancelling and waiting for in-flight reconciles");
self.reconcilers_cancel.cancel();
self.reconcilers_gate.close().await;

// Signal the background loop in [`Service::process_results`] to exit once
// it has proccessed the results from all the reconciles we cancelled earlier.
tracing::info!("Shutting down: processing results from previously in-flight reconciles");
self.result_tx.send(ReconcileResultRequest::Stop).ok();
self.result_tx.closed().await;

// Background tasks hold gate guards: this notifies them of the cancellation and
// waits for them all to complete.
tracing::info!("Shutting down: cancelling and waiting for background tasks to exit");
self.cancel.cancel();

// The cancellation tokens in [`crate::reconciler::Reconciler`] are children
// of our cancellation token, so we do not need to explicitly cancel each of
// them.

// Background tasks and reconcilers hold gate guards: this waits for them all
// to complete.
self.gate.close().await;
}

Expand Down
7 changes: 5 additions & 2 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
service::ReconcileResultRequest,
};
use pageserver_api::controller_api::{
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
Expand Down Expand Up @@ -1059,7 +1060,7 @@ impl TenantShard {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn spawn_reconciler(
&mut self,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
service_config: &service::Config,
Expand Down Expand Up @@ -1183,7 +1184,9 @@ impl TenantShard {
pending_compute_notification: reconciler.compute_notify_failure,
};

result_tx.send(result).ok();
result_tx
.send(ReconcileResultRequest::ReconcileResult(result))
.ok();
}
.instrument(reconciler_span),
);
Expand Down

1 comment on commit 3977e0a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3132 tests run: 3011 passed, 0 failed, 121 skipped (full report)


Code coverage* (full report)

  • functions: 32.7% (7000 of 21414 functions)
  • lines: 50.1% (55665 of 111165 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
3977e0a at 2024-07-25T15:23:44.034Z :recycle:

Please sign in to comment.