Skip to content

Commit

Permalink
storcon: tip off control plane about preferred AZ in compute hook
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Nov 29, 2024
1 parent d9aa383 commit eac3044
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
58 changes: 50 additions & 8 deletions storage_controller/src/compute_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
Expand All @@ -27,6 +28,9 @@ struct UnshardedComputeHookTenant {
// Which node is this tenant attached to
node_id: NodeId,

// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,

// Must hold this lock to send a notification.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
}
Expand All @@ -35,6 +39,9 @@ struct ShardedComputeHookTenant {
shard_count: ShardCount,
shards: Vec<(ShardNumber, NodeId)>,

// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,

// Must hold this lock to send a notification. The contents represent
// the last successfully sent notification, and are used to coalesce multiple
// updates by only sending when there is a chance since our last successful send.
Expand Down Expand Up @@ -63,17 +70,24 @@ enum ComputeHookTenant {

impl ComputeHookTenant {
/// Construct with at least one shard's information
fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
fn new(
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
preferred_az: Option<AvailabilityZone>,
node_id: NodeId,
) -> Self {
if tenant_shard_id.shard_count.count() > 1 {
Self::Sharded(ShardedComputeHookTenant {
shards: vec![(tenant_shard_id.shard_number, node_id)],
stripe_size,
shard_count: tenant_shard_id.shard_count,
preferred_az,
send_lock: Arc::default(),
})
} else {
Self::Unsharded(UnshardedComputeHookTenant {
node_id,
preferred_az,
send_lock: Arc::default(),
})
}
Expand Down Expand Up @@ -123,11 +137,15 @@ impl ComputeHookTenant {
&mut self,
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
preferred_az: Option<&AvailabilityZone>,
node_id: NodeId,
) {
match self {
Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
unsharded_tenant.node_id = node_id
unsharded_tenant.node_id = node_id;
if unsharded_tenant.preferred_az.as_ref() != preferred_az {
unsharded_tenant.preferred_az = preferred_az.cloned();
}
}
Self::Sharded(sharded_tenant)
if sharded_tenant.stripe_size == stripe_size
Expand All @@ -145,10 +163,14 @@ impl ComputeHookTenant {
.push((tenant_shard_id.shard_number, node_id));
sharded_tenant.shards.sort_by_key(|s| s.0)
}

if sharded_tenant.preferred_az.as_ref() != preferred_az {
sharded_tenant.preferred_az = preferred_az.cloned();
}
}
_ => {
// Shard count changed: reset struct.
*self = Self::new(tenant_shard_id, stripe_size, node_id);
*self = Self::new(tenant_shard_id, stripe_size, preferred_az.cloned(), node_id);
}
}
}
Expand All @@ -164,6 +186,7 @@ struct ComputeHookNotifyRequestShard {
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
preferred_az: Option<String>,
stripe_size: Option<ShardStripeSize>,
shards: Vec<ComputeHookNotifyRequestShard>,
}
Expand Down Expand Up @@ -237,6 +260,10 @@ impl ComputeHookTenant {
node_id: unsharded_tenant.node_id,
}],
stripe_size: None,
preferred_az: unsharded_tenant
.preferred_az
.as_ref()
.map(|az| az.0.clone()),
}),
Self::Sharded(sharded_tenant)
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
Expand All @@ -252,6 +279,7 @@ impl ComputeHookTenant {
})
.collect(),
stripe_size: Some(sharded_tenant.stripe_size),
preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
})
}
Self::Sharded(sharded_tenant) => {
Expand Down Expand Up @@ -362,6 +390,7 @@ impl ComputeHook {
tenant_id,
shards,
stripe_size,
preferred_az: _preferred_az,
} = reconfigure_request;

let compute_pageservers = shards
Expand Down Expand Up @@ -507,6 +536,7 @@ impl ComputeHook {
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
preferred_az: Option<&AvailabilityZone>,
) -> MaybeSendResult {
let mut state_locked = self.state.lock().unwrap();

Expand All @@ -515,11 +545,12 @@ impl ComputeHook {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
preferred_az.cloned(),
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant.update(tenant_shard_id, stripe_size, preferred_az, node_id);
tenant
}
};
Expand Down Expand Up @@ -607,13 +638,19 @@ impl ComputeHook {
/// if something failed.
pub(super) fn notify_background(
self: &Arc<Self>,
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
notifications: Vec<(
TenantShardId,
NodeId,
ShardStripeSize,
Option<AvailabilityZone>,
)>,
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
cancel: &CancellationToken,
) {
let mut maybe_sends = Vec::new();
for (tenant_shard_id, node_id, stripe_size) in notifications {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
for (tenant_shard_id, node_id, stripe_size, preferred_az) in notifications {
let maybe_send_result =
self.notify_prepare(tenant_shard_id, node_id, stripe_size, preferred_az.as_ref());
maybe_sends.push((tenant_shard_id, maybe_send_result))
}

Expand Down Expand Up @@ -683,9 +720,11 @@ impl ComputeHook {
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
preferred_az: Option<&AvailabilityZone>,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
let maybe_send_result =
self.notify_prepare(tenant_shard_id, node_id, stripe_size, preferred_az);
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
.await
}
Expand Down Expand Up @@ -738,6 +777,7 @@ pub(crate) mod tests {
shard_number: ShardNumber(0),
},
ShardStripeSize(12345),
None,
NodeId(1),
);

Expand Down Expand Up @@ -771,6 +811,7 @@ pub(crate) mod tests {
shard_number: ShardNumber(1),
},
ShardStripeSize(32768),
None,
NodeId(1),
);
assert!(matches!(
Expand All @@ -786,6 +827,7 @@ pub(crate) mod tests {
shard_number: ShardNumber(0),
},
ShardStripeSize(32768),
None,
NodeId(1),
);

Expand Down
4 changes: 3 additions & 1 deletion storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::service;
use pageserver_api::controller_api::PlacementPolicy;
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
Expand Down Expand Up @@ -45,6 +45,7 @@ pub(super) struct Reconciler {
pub(crate) reconciler_config: ReconcilerConfig,

pub(crate) config: TenantConfig,
pub(crate) preferred_az: Option<AvailabilityZone>,

/// Observed state from the point of view of the reconciler.
/// This gets updated as the reconciliation makes progress.
Expand Down Expand Up @@ -837,6 +838,7 @@ impl Reconciler {
self.tenant_shard_id,
node.get_id(),
self.shard.stripe_size,
self.preferred_az.as_ref(),
&self.cancel,
)
.await;
Expand Down
9 changes: 8 additions & 1 deletion storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ impl Service {
*tenant_shard_id,
attached_at,
tenant_shard.shard.stripe_size,
tenant_shard.preferred_az().cloned(),
));
}
}
Expand Down Expand Up @@ -4705,7 +4706,13 @@ impl Service {
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = self
.compute_hook
.notify(child_id, child_ps, stripe_size, &self.cancel)
.notify(
child_id,
child_ps,
stripe_size,
preferred_az_id.as_ref(),
&self.cancel,
)
.await
{
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
Expand Down
1 change: 1 addition & 0 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ impl TenantShard {
detach,
reconciler_config,
config: self.config.clone(),
preferred_az: self.preferred_az_id.clone(),
observed: self.observed.clone(),
original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),
Expand Down

0 comments on commit eac3044

Please sign in to comment.