Skip to content

Commit

Permalink
storcon: track pageserver availability zone (#8852)
Browse files Browse the repository at this point in the history
## Problem
In order to build AZ aware scheduling, the storage controller needs to
know what AZ pageservers are in.

Related #8848

## Summary of changes
This patch set adds a new nullable column to the `nodes` table:
`availability_zone_id`. The node registration
request is extended to include the AZ id (pageservers already have this
in their `metadata.json` file).

If the node is already registered, then we update the persistent and
in-memory state with the provided AZ.
Otherwise, we add the node with the AZ to begin with.

A couple assumptions are made here:
1. Pageserver AZ ids are stable
2. AZ ids do not change over time

Once all pageservers have a configured AZ, we can remove the optionals
in the code and make the database column not nullable.
  • Loading branch information
VladLazar authored Aug 28, 2024
1 parent a889a49 commit 793b506
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 19 deletions.
4 changes: 4 additions & 0 deletions control_plane/storcon_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ enum Command {
listen_http_addr: String,
#[arg(long)]
listen_http_port: u16,
#[arg(long)]
availability_zone_id: String,
},

/// Modify a node's configuration in the storage controller
Expand Down Expand Up @@ -322,6 +324,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
availability_zone_id,
} => {
storcon_client
.dispatch::<_, ()>(
Expand All @@ -333,6 +336,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
availability_zone_id: Some(availability_zone_id),
}),
)
.await?;
Expand Down
2 changes: 2 additions & 0 deletions libs/pageserver_api/src/controller_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct NodeRegisterRequest {

pub listen_http_addr: String,
pub listen_http_port: u16,

pub availability_zone_id: Option<String>,
}

#[derive(Serialize, Deserialize)]
Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,18 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
m.other
);

let az_id = m
.other
.get("availability_zone_id")
.and_then(|jv| jv.as_str().map(|str| str.to_owned()));

Some(NodeRegisterRequest {
node_id: conf.id,
listen_pg_addr: m.postgres_host,
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
availability_zone_id: az_id,
})
}
Err(e) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE nodes DROP availability_zone_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE nodes ADD availability_zone_id VARCHAR;
23 changes: 22 additions & 1 deletion storage_controller/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,

availability_zone_id: Option<String>,

// This cancellation token means "stop any RPCs in flight to this node, and don't start
// any more". It is not related to process shutdown.
#[serde(skip)]
Expand All @@ -61,6 +63,10 @@ impl Node {
self.id
}

pub(crate) fn get_availability_zone_id(&self) -> Option<&str> {
self.availability_zone_id.as_deref()
}

pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
self.scheduling
}
Expand All @@ -72,7 +78,18 @@ impl Node {
/// Does this registration request match `self`? This is used when deciding whether a registration
/// request should be allowed to update an existing record with the same node ID.
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
self.id == register_req.node_id
let az_ids_match = {
match (
self.availability_zone_id.as_deref(),
register_req.availability_zone_id.as_deref(),
) {
(Some(current_az), Some(register_req_az)) => current_az == register_req_az,
_ => true,
}
};

az_ids_match
&& self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
&& self.listen_pg_addr == register_req.listen_pg_addr
Expand Down Expand Up @@ -173,6 +190,7 @@ impl Node {
listen_http_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: Option<String>,
) -> Self {
Self {
id,
Expand All @@ -182,6 +200,7 @@ impl Node {
listen_pg_port,
scheduling: NodeSchedulingPolicy::Active,
availability: NodeAvailability::Offline,
availability_zone_id,
cancel: CancellationToken::new(),
}
}
Expand All @@ -194,6 +213,7 @@ impl Node {
listen_http_port: self.listen_http_port as i32,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
availability_zone_id: self.availability_zone_id.clone(),
}
}

Expand All @@ -208,6 +228,7 @@ impl Node {
listen_http_port: np.listen_http_port as u16,
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
availability_zone_id: np.availability_zone_id,
cancel: CancellationToken::new(),
}
}
Expand Down
27 changes: 27 additions & 0 deletions storage_controller/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealthOutdated,
GetLeader,
UpdateLeader,
SetNodeAzId,
}

#[must_use]
Expand Down Expand Up @@ -315,6 +316,31 @@ impl Persistence {
}
}

pub(crate) async fn set_node_availability_zone_id(
&self,
input_node_id: NodeId,
input_az_id: String,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::SetNodeAzId, move |conn| {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.set((availability_zone_id.eq(input_az_id.clone()),))
.execute(conn)?;
Ok(updated)
})
.await?;

if updated != 1 {
Err(DatabaseError::Logical(format!(
"Node {node_id:?} not found for setting az id",
)))
} else {
Ok(())
}
}

/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
Expand Down Expand Up @@ -974,6 +1000,7 @@ pub(crate) struct NodePersistence {
pub(crate) listen_http_port: i32,
pub(crate) listen_pg_addr: String,
pub(crate) listen_pg_port: i32,
pub(crate) availability_zone_id: Option<String>,
}

/// Tenant metadata health status that are stored durably.
Expand Down
1 change: 1 addition & 0 deletions storage_controller/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ pub(crate) mod test_utils {
80 + i as u16,
format!("pghost-{i}"),
5432 + i as u16,
None,
);
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
assert!(node.is_available());
Expand Down
1 change: 1 addition & 0 deletions storage_controller/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ diesel::table! {
listen_http_port -> Int4,
listen_pg_addr -> Varchar,
listen_pg_port -> Int4,
availability_zone_id -> Nullable<Varchar>,
}
}

Expand Down
93 changes: 75 additions & 18 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ impl Service {
123,
"".to_string(),
123,
None,
);

scheduler.node_upsert(&node);
Expand Down Expand Up @@ -4683,29 +4684,84 @@ impl Service {
)
.await;

{
if register_req.availability_zone_id.is_none() {
tracing::warn!(
"Node {} registering without specific availability zone id",
register_req.node_id
);
}

enum RegistrationStatus {
Matched(Node),
Mismatched,
New,
}

let registration_status = {
let locked = self.inner.read().unwrap();
if let Some(node) = locked.nodes.get(&register_req.node_id) {
// Note that we do not do a total equality of the struct, because we don't require
// the availability/scheduling states to agree for a POST to be idempotent.
if node.registration_match(&register_req) {
tracing::info!(
"Node {} re-registered with matching address",
register_req.node_id
);
return Ok(());
RegistrationStatus::Matched(node.clone())
} else {
// TODO: decide if we want to allow modifying node addresses without removing and re-adding
// the node. Safest/simplest thing is to refuse it, and usually we deploy with
// a fixed address through the lifetime of a node.
tracing::warn!(
"Node {} tried to register with different address",
register_req.node_id
);
return Err(ApiError::Conflict(
"Node is already registered with different address".to_string(),
));
RegistrationStatus::Mismatched
}
} else {
RegistrationStatus::New
}
};

match registration_status {
RegistrationStatus::Matched(node) => {
tracing::info!(
"Node {} re-registered with matching address",
register_req.node_id
);

if node.get_availability_zone_id().is_none() {
if let Some(az_id) = register_req.availability_zone_id.clone() {
tracing::info!("Extracting availability zone id from registration request for node {}: {}",
register_req.node_id, az_id);

// Persist to the database and update in memory state. See comment below
// on ordering.
self.persistence
.set_node_availability_zone_id(register_req.node_id, az_id)
.await?;
let node_with_az = Node::new(
register_req.node_id,
register_req.listen_http_addr,
register_req.listen_http_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.availability_zone_id,
);

let mut locked = self.inner.write().unwrap();
let mut new_nodes = (*locked.nodes).clone();

locked.scheduler.node_upsert(&node_with_az);
new_nodes.insert(register_req.node_id, node_with_az);

locked.nodes = Arc::new(new_nodes);
}
}

return Ok(());
}
RegistrationStatus::Mismatched => {
// TODO: decide if we want to allow modifying node addresses without removing and re-adding
// the node. Safest/simplest thing is to refuse it, and usually we deploy with
// a fixed address through the lifetime of a node.
tracing::warn!(
"Node {} tried to register with different address",
register_req.node_id
);
return Err(ApiError::Conflict(
"Node is already registered with different address".to_string(),
));
}
RegistrationStatus::New => {
// fallthrough
}
}

Expand Down Expand Up @@ -4742,6 +4798,7 @@ impl Service {
register_req.listen_http_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.availability_zone_id,
);

// TODO: idempotency if the node already exists in the database
Expand Down
3 changes: 3 additions & 0 deletions test_runner/fixtures/pageserver/allowed_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def scan_pageserver_log_for_errors(
# controller's attempts to notify the endpoint).
".*reconciler.*neon_local notification hook failed.*",
".*reconciler.*neon_local error.*",
# Neon local does not provide pageserver with an AZ
# TODO: remove this once neon local does so
".*registering without specific availability zone id.*",
]


Expand Down

1 comment on commit 793b506

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3780 tests run: 3672 passed, 2 failed, 106 skipped (full report)


Failures on Postgres 14

  • test_delete_timeline_exercise_crash_safety_failpoints[Check.RETRY_WITHOUT_RESTART-timeline-delete-after-rm]: release-arm64
  • test_delete_timeline_exercise_crash_safety_failpoints[Check.RETRY_WITH_RESTART-timeline-delete-before-index-delete]: release-arm64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_delete_timeline_exercise_crash_safety_failpoints[release-pg14-Check.RETRY_WITHOUT_RESTART-timeline-delete-after-rm] or test_delete_timeline_exercise_crash_safety_failpoints[release-pg14-Check.RETRY_WITH_RESTART-timeline-delete-before-index-delete]"
Flaky tests (2)

Postgres 15

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
793b506 at 2024-08-28T18:17:21.195Z :recycle:

Please sign in to comment.