Skip to content

Commit

Permalink
storcon: include preferred AZ in compute notifications (#9953)
Browse files Browse the repository at this point in the history
## Problem

It is unreliable for the control plane to infer the AZ for computes from
where the tenant is currently attached, because if a tenant happens to
be in a degraded state or a release is ongoing while a compute starts,
then the tenant's attached AZ can be a different one to where it will
run long-term, and the control plane doesn't check back later to restart
the compute.

This can land in parallel with
#9947

## Summary of changes

- Thread through the preferred AZ into the compute hook code via the
reconciler
- Include the preferred AZ in the body of compute hook notifications
  • Loading branch information
jcsp authored Dec 17, 2024
1 parent 93e9583 commit fd23022
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 53 deletions.
138 changes: 98 additions & 40 deletions storage_controller/src/compute_hook.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::error::Error as _;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
Expand All @@ -6,6 +7,7 @@ use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
Expand All @@ -28,6 +30,9 @@ struct UnshardedComputeHookTenant {
// Which node is this tenant attached to
node_id: NodeId,

// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,

// Must hold this lock to send a notification.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
}
Expand All @@ -36,6 +41,9 @@ struct ShardedComputeHookTenant {
shard_count: ShardCount,
shards: Vec<(ShardNumber, NodeId)>,

// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,

// Must hold this lock to send a notification. The contents represent
// the last successfully sent notification, and are used to coalesce multiple
// updates by only sending when there is a chance since our last successful send.
Expand Down Expand Up @@ -64,17 +72,24 @@ enum ComputeHookTenant {

impl ComputeHookTenant {
/// Construct with at least one shard's information
fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
fn new(
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
preferred_az: Option<AvailabilityZone>,
node_id: NodeId,
) -> Self {
if tenant_shard_id.shard_count.count() > 1 {
Self::Sharded(ShardedComputeHookTenant {
shards: vec![(tenant_shard_id.shard_number, node_id)],
stripe_size,
shard_count: tenant_shard_id.shard_count,
preferred_az,
send_lock: Arc::default(),
})
} else {
Self::Unsharded(UnshardedComputeHookTenant {
node_id,
preferred_az,
send_lock: Arc::default(),
})
}
Expand Down Expand Up @@ -120,15 +135,20 @@ impl ComputeHookTenant {

/// Set one shard's location. If stripe size or shard count have changed, Self is reset
/// and drops existing content.
fn update(
&mut self,
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
node_id: NodeId,
) {
fn update(&mut self, shard_update: ShardUpdate) {
let tenant_shard_id = shard_update.tenant_shard_id;
let node_id = shard_update.node_id;
let stripe_size = shard_update.stripe_size;
let preferred_az = shard_update.preferred_az;

match self {
Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
unsharded_tenant.node_id = node_id
unsharded_tenant.node_id = node_id;
if unsharded_tenant.preferred_az.as_ref()
!= preferred_az.as_ref().map(|az| az.as_ref())
{
unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
}
}
Self::Sharded(sharded_tenant)
if sharded_tenant.stripe_size == stripe_size
Expand All @@ -146,10 +166,21 @@ impl ComputeHookTenant {
.push((tenant_shard_id.shard_number, node_id));
sharded_tenant.shards.sort_by_key(|s| s.0)
}

if sharded_tenant.preferred_az.as_ref()
!= preferred_az.as_ref().map(|az| az.as_ref())
{
sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
}
}
_ => {
// Shard count changed: reset struct.
*self = Self::new(tenant_shard_id, stripe_size, node_id);
*self = Self::new(
tenant_shard_id,
stripe_size,
preferred_az.map(|az| az.into_owned()),
node_id,
);
}
}
}
Expand All @@ -165,6 +196,7 @@ struct ComputeHookNotifyRequestShard {
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
preferred_az: Option<String>,
stripe_size: Option<ShardStripeSize>,
shards: Vec<ComputeHookNotifyRequestShard>,
}
Expand Down Expand Up @@ -238,6 +270,10 @@ impl ComputeHookTenant {
node_id: unsharded_tenant.node_id,
}],
stripe_size: None,
preferred_az: unsharded_tenant
.preferred_az
.as_ref()
.map(|az| az.0.clone()),
}),
Self::Sharded(sharded_tenant)
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
Expand All @@ -253,6 +289,7 @@ impl ComputeHookTenant {
})
.collect(),
stripe_size: Some(sharded_tenant.stripe_size),
preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
})
}
Self::Sharded(sharded_tenant) => {
Expand Down Expand Up @@ -313,6 +350,17 @@ pub(super) struct ComputeHook {
client: reqwest::Client,
}

/// Callers may give us a list of these when asking us to send a bulk batch
/// of notifications in the background. This is a 'notification' in the sense of
/// other code notifying us of a shard's status, rather than being the final notification
/// that we send upwards to the control plane for the whole tenant.
pub(crate) struct ShardUpdate<'a> {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) node_id: NodeId,
pub(crate) stripe_size: ShardStripeSize,
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
}

impl ComputeHook {
pub(super) fn new(config: Config) -> Self {
let authorization_header = config
Expand Down Expand Up @@ -363,6 +411,7 @@ impl ComputeHook {
tenant_id,
shards,
stripe_size,
preferred_az: _preferred_az,
} = reconfigure_request;

let compute_pageservers = shards
Expand Down Expand Up @@ -503,24 +552,30 @@ impl ComputeHook {
}

/// Synchronous phase: update the per-tenant state for the next intended notification
fn notify_prepare(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
) -> MaybeSendResult {
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
let mut state_locked = self.state.lock().unwrap();

use std::collections::hash_map::Entry;
let tenant_shard_id = shard_update.tenant_shard_id;

let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Vacant(e) => {
let ShardUpdate {
tenant_shard_id,
node_id,
stripe_size,
preferred_az,
} = shard_update;
e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
preferred_az.map(|az| az.into_owned()),
node_id,
))
}
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant.update(shard_update);
tenant
}
};
Expand Down Expand Up @@ -608,13 +663,14 @@ impl ComputeHook {
/// if something failed.
pub(super) fn notify_background(
self: &Arc<Self>,
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
notifications: Vec<ShardUpdate>,
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
cancel: &CancellationToken,
) {
let mut maybe_sends = Vec::new();
for (tenant_shard_id, node_id, stripe_size) in notifications {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
for shard_update in notifications {
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
maybe_sends.push((tenant_shard_id, maybe_send_result))
}

Expand Down Expand Up @@ -678,15 +734,14 @@ impl ComputeHook {
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify(
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify<'a>(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
shard_update: ShardUpdate<'a>,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
.await
}
Expand Down Expand Up @@ -739,6 +794,7 @@ pub(crate) mod tests {
shard_number: ShardNumber(0),
},
ShardStripeSize(12345),
None,
NodeId(1),
);

Expand All @@ -765,30 +821,32 @@ pub(crate) mod tests {
// Writing the first shard of a multi-sharded situation (i.e. in a split)
// resets the tenant state and puts it in an non-notifying state (need to
// see all shards)
tenant_state.update(
TenantShardId {
tenant_state.update(ShardUpdate {
tenant_shard_id: TenantShardId {
tenant_id,
shard_count: ShardCount::new(2),
shard_number: ShardNumber(1),
},
ShardStripeSize(32768),
NodeId(1),
);
stripe_size: ShardStripeSize(32768),
preferred_az: None,
node_id: NodeId(1),
});
assert!(matches!(
tenant_state.maybe_send(tenant_id, None),
MaybeSendResult::Noop
));

// Writing the second shard makes it ready to notify
tenant_state.update(
TenantShardId {
tenant_state.update(ShardUpdate {
tenant_shard_id: TenantShardId {
tenant_id,
shard_count: ShardCount::new(2),
shard_number: ShardNumber(0),
},
ShardStripeSize(32768),
NodeId(1),
);
stripe_size: ShardStripeSize(32768),
preferred_az: None,
node_id: NodeId(1),
});

let send_result = tenant_state.maybe_send(tenant_id, None);
let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
Expand Down
15 changes: 10 additions & 5 deletions storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::service;
use pageserver_api::controller_api::PlacementPolicy;
use crate::{compute_hook, service};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_client::mgmt_api;
use reqwest::StatusCode;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -45,6 +46,7 @@ pub(super) struct Reconciler {
pub(crate) reconciler_config: ReconcilerConfig,

pub(crate) config: TenantConfig,
pub(crate) preferred_az: Option<AvailabilityZone>,

/// Observed state from the point of view of the reconciler.
/// This gets updated as the reconciliation makes progress.
Expand Down Expand Up @@ -834,9 +836,12 @@ impl Reconciler {
let result = self
.compute_hook
.notify(
self.tenant_shard_id,
node.get_id(),
self.shard.stripe_size,
compute_hook::ShardUpdate {
tenant_shard_id: self.tenant_shard_id,
node_id: node.get_id(),
stripe_size: self.shard.stripe_size,
preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
},
&self.cancel,
)
.await;
Expand Down
25 changes: 18 additions & 7 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
background_node_operations::{
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
},
compute_hook::NotifyError,
compute_hook::{self, NotifyError},
drain_utils::{self, TenantShardDrain, TenantShardIterator},
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
leadership::Leadership,
Expand Down Expand Up @@ -656,11 +656,14 @@ impl Service {
// emit a compute notification for this. In the case where our observed state does not
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
if let Some(attached_at) = tenant_shard.stably_attached() {
compute_notifications.push((
*tenant_shard_id,
attached_at,
tenant_shard.shard.stripe_size,
));
compute_notifications.push(compute_hook::ShardUpdate {
tenant_shard_id: *tenant_shard_id,
node_id: attached_at,
stripe_size: tenant_shard.shard.stripe_size,
preferred_az: tenant_shard
.preferred_az()
.map(|az| Cow::Owned(az.clone())),
});
}
}
}
Expand Down Expand Up @@ -4786,7 +4789,15 @@ impl Service {
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = self
.compute_hook
.notify(child_id, child_ps, stripe_size, &self.cancel)
.notify(
compute_hook::ShardUpdate {
tenant_shard_id: child_id,
node_id: child_ps,
stripe_size,
preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed),
},
&self.cancel,
)
.await
{
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
Expand Down
1 change: 1 addition & 0 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ impl TenantShard {
detach,
reconciler_config,
config: self.config.clone(),
preferred_az: self.preferred_az_id.clone(),
observed: self.observed.clone(),
original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),
Expand Down
Loading

1 comment on commit fd23022

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7095 tests run: 6794 passed, 3 failed, 298 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_small_inmemory_layers[debug-pg17-True] or test_idle_checkpoints[debug-pg17] or test_pageserver_small_inmemory_layers[debug-pg17-False]"
Flaky tests (7)

Postgres 17

Postgres 16

  • test_delete_timeline_exercise_crash_safety_failpoints[Check.RETRY_WITHOUT_RESTART-timeline-delete-before-rm]: release-arm64

Postgres 15

  • test_pgdata_import_smoke[None-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64
  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-arm64

Postgres 14

  • test_pgdata_import_smoke[None-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64
  • test_pgdata_import_smoke[8-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64

Test coverage report is not available

The comment gets automatically updated with the latest test results
fd23022 at 2024-12-17T21:51:17.031Z :recycle:

Please sign in to comment.