Skip to content

Commit

Permalink
storage controller: register nodes in re-attach request (#7040)
Browse files Browse the repository at this point in the history
## Problem

Currently we manually register nodes with the storage controller, and
use a script during deploy to register with the cloud control plane.
Rather than extend that script further, nodes should just register on
startup.

## Summary of changes

- Extend the re-attach request to include an optional
NodeRegisterRequest
- If the `register` field is set, handle it like a normal node
registration before executing the normal re-attach work.
- Update tests/neon_local that used to rely on doing an explicit
register step that could be enabled/disabled.

---------

Co-authored-by: Christian Schwarz <[email protected]>
  • Loading branch information
jcsp and problame authored Mar 12, 2024
1 parent 1f7d54f commit 7ae8364
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 52 deletions.
4 changes: 4 additions & 0 deletions control_plane/attachment_service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,10 @@ impl Service {
&self,
reattach_req: ReAttachRequest,
) -> Result<ReAttachResponse, ApiError> {
if let Some(register_req) = reattach_req.register {
self.node_register(register_req).await?;
}

// Take a re-attach as indication that the node is available: this is a precursor to proper
// heartbeating in https://github.com/neondatabase/neon/issues/6844
self.node_configure(NodeConfigureRequest {
Expand Down
13 changes: 4 additions & 9 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,9 +1100,8 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
let register = subcommand_args.get_one::<bool>("register").unwrap_or(&true);
if let Err(e) = get_pageserver(env, subcommand_args)?
.start(&pageserver_config_overrides(subcommand_args), *register)
.start(&pageserver_config_overrides(subcommand_args))
.await
{
eprintln!("pageserver start failed: {e}");
Expand Down Expand Up @@ -1131,7 +1130,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
}

if let Err(e) = pageserver
.start(&pageserver_config_overrides(subcommand_args), false)
.start(&pageserver_config_overrides(subcommand_args))
.await
{
eprintln!("pageserver start failed: {e}");
Expand Down Expand Up @@ -1293,7 +1292,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver
.start(&pageserver_config_overrides(sub_match), true)
.start(&pageserver_config_overrides(sub_match))
.await
{
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
Expand Down Expand Up @@ -1596,11 +1595,7 @@ fn cli() -> Command {
.subcommand(Command::new("status"))
.subcommand(Command::new("start")
.about("Start local pageserver")
.arg(pageserver_config_args.clone()).arg(Arg::new("register")
.long("register")
.default_value("true").required(false)
.value_parser(value_parser!(bool))
.value_name("register"))
.arg(pageserver_config_args.clone())
)
.subcommand(Command::new("stop")
.about("Stop local pageserver")
Expand Down
5 changes: 4 additions & 1 deletion control_plane/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,10 @@ impl Endpoint {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}

let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap();
let response = client
.post(format!(
"http://{}:{}/configure",
Expand Down
48 changes: 24 additions & 24 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::time::Duration;
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::controller_api::NodeRegisterRequest;
use pageserver_api::models::{
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
};
Expand All @@ -32,7 +31,6 @@ use utils::{
};

use crate::local_env::PageServerConf;
use crate::storage_controller::StorageController;
use crate::{background_process, local_env::LocalEnv};

/// Directory within .neon which will be used by default for LocalFs remote storage.
Expand Down Expand Up @@ -163,8 +161,8 @@ impl PageServerNode {
.expect("non-Unicode path")
}

pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> {
self.start_node(config_overrides, false, register).await
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
self.start_node(config_overrides, false).await
}

fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
Expand Down Expand Up @@ -202,34 +200,36 @@ impl PageServerNode {
String::from_utf8_lossy(&init_output.stderr),
);

// Write metadata file, used by pageserver on startup to register itself with
// the storage controller
let metadata_path = datadir.join("metadata.json");

let (_http_host, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(9898);
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
std::fs::write(
metadata_path,
serde_json::to_vec(&serde_json::json!({
"host": "localhost",
"port": self.pg_connection_config.port(),
"http_host": "localhost",
"http_port": http_port,
}))
.unwrap(),
)
.expect("Failed to write metadata file");

Ok(())
}

async fn start_node(
&self,
config_overrides: &[&str],
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
// Register the node with the storage controller before starting pageserver: pageserver must be registered to
// successfully call /re-attach and finish starting up.
if register {
let storage_controller = StorageController::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
storage_controller
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}

// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
Expand Down
9 changes: 8 additions & 1 deletion libs/pageserver_api/src/upcall_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@
use serde::{Deserialize, Serialize};
use utils::id::NodeId;

use crate::shard::TenantShardId;
use crate::{controller_api::NodeRegisterRequest, shard::TenantShardId};

/// Upcall message sent by the pageserver to the configured `control_plane_api` on
/// startup.
#[derive(Serialize, Deserialize)]
pub struct ReAttachRequest {
pub node_id: NodeId,

/// Optional inline self-registration: this is useful with the storage controller,
/// if the node already has a node_id set.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub register: Option<NodeRegisterRequest>,
}

#[derive(Serialize, Deserialize)]
Expand Down
27 changes: 26 additions & 1 deletion pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
use std::env;
use std::{collections::HashMap, env};
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
Expand Down Expand Up @@ -304,6 +305,26 @@ impl<T> BuilderValue<T> {
}
}

// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not neeed by the pageserver
// itself, it is only used for registering the pageserver with the control
// plane and/or storage controller.
//
#[derive(serde::Deserialize)]
pub(crate) struct NodeMetadata {
#[serde(rename = "host")]
pub(crate) postgres_host: String,
#[serde(rename = "port")]
pub(crate) postgres_port: u16,
pub(crate) http_host: String,
pub(crate) http_port: u16,

// Deployment tools may write fields to the metadata file beyond what we
// use in this type: this type intentionally only names fields that require.
#[serde(flatten)]
pub(crate) other: HashMap<String, serde_json::Value>,
}

// needed to simplify config construction
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
Expand Down Expand Up @@ -761,6 +782,10 @@ impl PageServerConf {
self.workdir.join("deletion")
}

pub fn metadata_path(&self) -> Utf8PathBuf {
self.workdir.join("metadata.json")
}

pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
Expand Down
55 changes: 53 additions & 2 deletions pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use futures::Future;
use pageserver_api::{
controller_api::NodeRegisterRequest,
shard::TenantShardId,
upcall_api::{
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
Expand All @@ -12,7 +13,10 @@ use tokio_util::sync::CancellationToken;
use url::Url;
use utils::{backoff, generation::Generation, id::NodeId};

use crate::config::PageServerConf;
use crate::{
config::{NodeMetadata, PageServerConf},
virtual_file::on_fatal_io_error,
};

/// The Pageserver's client for using the control plane API: this is a small subset
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
Expand All @@ -32,6 +36,7 @@ pub enum RetryForeverError {
pub trait ControlPlaneGenerationsApi {
fn re_attach(
&self,
conf: &PageServerConf,
) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
fn validate(
&self,
Expand Down Expand Up @@ -110,13 +115,59 @@ impl ControlPlaneClient {

impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response, or error out if we are shut down
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
async fn re_attach(
&self,
conf: &PageServerConf,
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
let re_attach_path = self
.base_url
.join("re-attach")
.expect("Failed to build re-attach path");

// Include registration content in the re-attach request if a metadata file is readable
let metadata_path = conf.metadata_path();
let register = match tokio::fs::read_to_string(&metadata_path).await {
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
Ok(m) => {
// Since we run one time at startup, be generous in our logging and
// dump all metadata.
tracing::info!(
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
m.postgres_host,
m.postgres_port,
m.http_host,
m.http_port,
m.other
);

Some(NodeRegisterRequest {
node_id: conf.id,
listen_pg_addr: m.postgres_host,
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
})
}
Err(e) => {
tracing::error!("Unreadable metadata in {metadata_path}: {e}");
None
}
},
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// This is legal: we may have been deployed with some external script
// doing registration for us.
tracing::info!("Metadata file not found at {metadata_path}");
} else {
on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
}
None
}
};

let request = ReAttachRequest {
node_id: self.node_id,
register,
};

fail::fail_point!("control-plane-client-re-attach");
Expand Down
5 changes: 4 additions & 1 deletion pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,10 @@ mod test {
}

impl ControlPlaneGenerationsApi for MockControlPlane {
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
async fn re_attach(
&self,
_conf: &PageServerConf,
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
unimplemented!()
}
async fn validate(
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ async fn init_load_generations(
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach().await {
match client.re_attach(conf).await {
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
anyhow::bail!("Shut down while waiting for control plane re-attach response")
Expand Down
20 changes: 12 additions & 8 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,9 @@ def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEn
self.env = NeonEnv(self)
return self.env

def start(self):
def start(self, register_pageservers=False):
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
self.env.start(register_pageservers=register_pageservers)

def init_start(
self,
Expand Down Expand Up @@ -1112,7 +1112,7 @@ def __init__(self, config: NeonEnvBuilder):
log.info(f"Config: {cfg}")
self.neon_cli.init(cfg, force=config.config_init_force)

def start(self):
def start(self, register_pageservers=False):
# storage controller starts first, so that pageserver /re-attach calls don't
# bounce through retries on startup
self.storage_controller.start()
Expand All @@ -1124,6 +1124,11 @@ def storage_controller_ready():
# reconcile.
wait_until(30, 1, storage_controller_ready)

if register_pageservers:
# Special case for forward compat tests, this can be removed later.
for pageserver in self.pageservers:
self.storage_controller.node_register(pageserver)

# Start up broker, pageserver and all safekeepers
futs = []
with concurrent.futures.ThreadPoolExecutor(
Expand Down Expand Up @@ -1712,10 +1717,8 @@ def pageserver_start(
id: int,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
register: bool = True,
) -> "subprocess.CompletedProcess[str]":
register_str = "true" if register else "false"
start_args = ["pageserver", "start", f"--id={id}", *overrides, f"--register={register_str}"]
start_args = ["pageserver", "start", f"--id={id}", *overrides]
storage = self.env.pageserver_remote_storage
append_pageserver_param_overrides(
params_to_update=start_args,
Expand Down Expand Up @@ -2066,6 +2069,8 @@ def node_register(self, node: NeonPageserver):
"node_id": int(node.id),
"listen_http_addr": "localhost",
"listen_http_port": node.service_port.http,
"listen_pg_addr": "localhost",
"listen_pg_port": node.service_port.pg,
}
log.info(f"node_register({body})")
self.request(
Expand Down Expand Up @@ -2233,7 +2238,6 @@ def start(
self,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
register: bool = True,
) -> "NeonPageserver":
"""
Start the page server.
Expand All @@ -2243,7 +2247,7 @@ def start(
assert self.running is False

self.env.neon_cli.pageserver_start(
self.id, overrides=overrides, extra_env_vars=extra_env_vars, register=register
self.id, overrides=overrides, extra_env_vars=extra_env_vars
)
self.running = True
return self
Expand Down
Loading

1 comment on commit 7ae8364

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2588 tests run: 2453 passed, 1 failed, 134 skipped (full report)


Failures on Postgres 14

  • test_bulk_insert[neon-github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_bulk_insert[neon-release-pg14-github-actions-selfhosted]"
Flaky tests (2)

Postgres 16

  • test_crafted_wal_end[last_wal_record_crossing_segment]: debug

Postgres 14

  • test_ondemand_activation: debug

Code coverage* (full report)

  • functions: 28.7% (7027 of 24471 functions)
  • lines: 47.5% (43410 of 91403 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
7ae8364 at 2024-03-12T15:46:14.944Z :recycle:

Please sign in to comment.