Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storcon: do az aware scheduling #9083

Merged
merged 7 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions control_plane/storcon_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
Expand Down Expand Up @@ -339,7 +339,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
availability_zone_id,
availability_zone_id: AvailabilityZone(availability_zone_id),
}),
)
.await?;
Expand Down
14 changes: 12 additions & 2 deletions libs/pageserver_api/src/controller_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::str::FromStr;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -57,7 +58,7 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,

pub availability_zone_id: String,
pub availability_zone_id: AvailabilityZone,
}

#[derive(Serialize, Deserialize)]
Expand All @@ -74,10 +75,19 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct AvailabilityZone(pub String);

impl Display for AvailabilityZone {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

#[derive(Serialize, Deserialize)]
pub struct ShardsPreferredAzsRequest {
#[serde(flatten)]
pub preferred_az_ids: HashMap<TenantShardId, String>,
pub preferred_az_ids: HashMap<TenantShardId, AvailabilityZone>,
}

#[derive(Serialize, Deserialize)]
Expand Down
6 changes: 3 additions & 3 deletions pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;

use futures::Future;
use pageserver_api::{
controller_api::NodeRegisterRequest,
controller_api::{AvailabilityZone, NodeRegisterRequest},
shard::TenantShardId,
upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
Expand Down Expand Up @@ -148,10 +148,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));

match az_id_from_metadata {
Some(az_id) => Some(az_id),
Some(az_id) => Some(AvailabilityZone(az_id)),
None => {
tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
conf.availability_zone.clone()
conf.availability_zone.clone().map(AvailabilityZone)
}
}
};
Expand Down
16 changes: 8 additions & 8 deletions storage_controller/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::{str::FromStr, time::Duration};

use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
TenantLocateResponseShard,
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
NodeSchedulingPolicy, TenantLocateResponseShard,
},
shard::TenantShardId,
};
Expand Down Expand Up @@ -36,7 +36,7 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,

availability_zone_id: String,
availability_zone_id: AvailabilityZone,

// This cancellation token means "stop any RPCs in flight to this node, and don't start
// any more". It is not related to process shutdown.
Expand Down Expand Up @@ -64,8 +64,8 @@ impl Node {
}

#[allow(unused)]
pub(crate) fn get_availability_zone_id(&self) -> &str {
self.availability_zone_id.as_str()
pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
&self.availability_zone_id
}

pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
Expand Down Expand Up @@ -181,7 +181,7 @@ impl Node {
listen_http_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: String,
availability_zone_id: AvailabilityZone,
) -> Self {
Self {
id,
Expand All @@ -204,7 +204,7 @@ impl Node {
listen_http_port: self.listen_http_port as i32,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
availability_zone_id: self.availability_zone_id.clone(),
availability_zone_id: self.availability_zone_id.0.clone(),
}
}

Expand All @@ -219,7 +219,7 @@ impl Node {
listen_http_port: np.listen_http_port as u16,
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
availability_zone_id: np.availability_zone_id,
availability_zone_id: AvailabilityZone(np.availability_zone_id),
cancel: CancellationToken::new(),
}
}
Expand Down
7 changes: 4 additions & 3 deletions storage_controller/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
use itertools::Itertools;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
Expand Down Expand Up @@ -667,8 +668,8 @@ impl Persistence {

pub(crate) async fn set_tenant_shard_preferred_azs(
&self,
preferred_azs: Vec<(TenantShardId, String)>,
) -> DatabaseResult<Vec<(TenantShardId, String)>> {
preferred_azs: Vec<(TenantShardId, AvailabilityZone)>,
) -> DatabaseResult<Vec<(TenantShardId, AvailabilityZone)>> {
use crate::schema::tenant_shards::dsl::*;

self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
Expand All @@ -679,7 +680,7 @@ impl Persistence {
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set(preferred_az_id.eq(preferred_az))
.set(preferred_az_id.eq(preferred_az.0.clone()))
.execute(conn)?;

if updated == 1 {
Expand Down
Loading
Loading