From 7ae8364b0b0746b335f1d6e7c0d409fc1a236ffe Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 12 Mar 2024 14:47:12 +0000 Subject: [PATCH] storage controller: register nodes in re-attach request (#7040) ## Problem Currently we manually register nodes with the storage controller, and use a script during deploy to register with the cloud control plane. Rather than extend that script further, nodes should just register on startup. ## Summary of changes - Extend the re-attach request to include an optional NodeRegisterRequest - If the `register` field is set, handle it like a normal node registration before executing the normal re-attach work. - Update tests/neon_local that used to rely on doing an explicit register step that could be enabled/disabled. --------- Co-authored-by: Christian Schwarz --- .../attachment_service/src/service.rs | 4 ++ control_plane/src/bin/neon_local.rs | 13 ++--- control_plane/src/endpoint.rs | 5 +- control_plane/src/pageserver.rs | 48 ++++++++-------- libs/pageserver_api/src/upcall_api.rs | 9 ++- pageserver/src/config.rs | 27 ++++++++- pageserver/src/control_plane_client.rs | 55 ++++++++++++++++++- pageserver/src/deletion_queue.rs | 5 +- pageserver/src/tenant/mgr.rs | 2 +- test_runner/fixtures/neon_fixtures.py | 20 ++++--- test_runner/regress/test_compatibility.py | 2 +- .../regress/test_pageserver_generations.py | 4 +- test_runner/regress/test_sharding_service.py | 3 +- 13 files changed, 145 insertions(+), 52 deletions(-) diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 3f245b52559f..a8498a39b50e 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -922,6 +922,10 @@ impl Service { &self, reattach_req: ReAttachRequest, ) -> Result { + if let Some(register_req) = reattach_req.register { + self.node_register(register_req).await?; + } + // Take a re-attach as indication that the node is available: this is a precursor to proper // heartbeating in https://github.com/neondatabase/neon/issues/6844 self.node_configure(NodeConfigureRequest { diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 86b9c0085dcc..952229c4b7fe 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1100,9 +1100,8 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result Result<()> { match sub_match.subcommand() { Some(("start", subcommand_args)) => { - let register = subcommand_args.get_one::("register").unwrap_or(&true); if let Err(e) = get_pageserver(env, subcommand_args)? - .start(&pageserver_config_overrides(subcommand_args), *register) + .start(&pageserver_config_overrides(subcommand_args)) .await { eprintln!("pageserver start failed: {e}"); @@ -1131,7 +1130,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> } if let Err(e) = pageserver - .start(&pageserver_config_overrides(subcommand_args), false) + .start(&pageserver_config_overrides(subcommand_args)) .await { eprintln!("pageserver start failed: {e}"); @@ -1293,7 +1292,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); if let Err(e) = pageserver - .start(&pageserver_config_overrides(sub_match), true) + .start(&pageserver_config_overrides(sub_match)) .await { eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); @@ -1596,11 +1595,7 @@ fn cli() -> Command { .subcommand(Command::new("status")) .subcommand(Command::new("start") .about("Start local pageserver") - .arg(pageserver_config_args.clone()).arg(Arg::new("register") - .long("register") - .default_value("true").required(false) - .value_parser(value_parser!(bool)) - .value_name("register")) + .arg(pageserver_config_args.clone()) ) .subcommand(Command::new("stop") .about("Stop local pageserver") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 646bc2e8bc39..5206222961ce 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -774,7 +774,10 @@ impl Endpoint { spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize); } - let client = reqwest::Client::new(); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .unwrap(); let response = client .post(format!( "http://{}:{}/configure", diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 021b9aca34b0..06ec942895f2 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -17,7 +17,6 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; use futures::SinkExt; -use pageserver_api::controller_api::NodeRegisterRequest; use pageserver_api::models::{ self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo, }; @@ -32,7 +31,6 @@ use utils::{ }; use crate::local_env::PageServerConf; -use crate::storage_controller::StorageController; use crate::{background_process, local_env::LocalEnv}; /// Directory within .neon which will be used by default for LocalFs remote storage. @@ -163,8 +161,8 @@ impl PageServerNode { .expect("non-Unicode path") } - pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> { - self.start_node(config_overrides, false, register).await + pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> { + self.start_node(config_overrides, false).await } fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> { @@ -202,6 +200,28 @@ impl PageServerNode { String::from_utf8_lossy(&init_output.stderr), ); + // Write metadata file, used by pageserver on startup to register itself with + // the storage controller + let metadata_path = datadir.join("metadata.json"); + + let (_http_host, http_port) = + parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr"); + let http_port = http_port.unwrap_or(9898); + // Intentionally hand-craft JSON: this acts as an implicit format compat test + // in case the pageserver-side structure is edited, and reflects the real life + // situation: the metadata is written by some other script. + std::fs::write( + metadata_path, + serde_json::to_vec(&serde_json::json!({ + "host": "localhost", + "port": self.pg_connection_config.port(), + "http_host": "localhost", + "http_port": http_port, + })) + .unwrap(), + ) + .expect("Failed to write metadata file"); + Ok(()) } @@ -209,27 +229,7 @@ impl PageServerNode { &self, config_overrides: &[&str], update_config: bool, - register: bool, ) -> anyhow::Result<()> { - // Register the node with the storage controller before starting pageserver: pageserver must be registered to - // successfully call /re-attach and finish starting up. - if register { - let storage_controller = StorageController::from_env(&self.env); - let (pg_host, pg_port) = - parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); - let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr) - .expect("Unable to parse listen_http_addr"); - storage_controller - .node_register(NodeRegisterRequest { - node_id: self.conf.id, - listen_pg_addr: pg_host.to_string(), - listen_pg_port: pg_port.unwrap_or(5432), - listen_http_addr: http_host.to_string(), - listen_http_port: http_port.unwrap_or(80), - }) - .await?; - } - // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); print!( diff --git a/libs/pageserver_api/src/upcall_api.rs b/libs/pageserver_api/src/upcall_api.rs index 0acc3a7bb0ae..5472948091cb 100644 --- a/libs/pageserver_api/src/upcall_api.rs +++ b/libs/pageserver_api/src/upcall_api.rs @@ -6,11 +6,18 @@ use serde::{Deserialize, Serialize}; use utils::id::NodeId; -use crate::shard::TenantShardId; +use crate::{controller_api::NodeRegisterRequest, shard::TenantShardId}; +/// Upcall message sent by the pageserver to the configured `control_plane_api` on +/// startup. #[derive(Serialize, Deserialize)] pub struct ReAttachRequest { pub node_id: NodeId, + + /// Optional inline self-registration: this is useful with the storage controller, + /// if the node already has a node_id set. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub register: Option, } #[derive(Serialize, Deserialize)] diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 4adcedafd1c9..845b20c8db34 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -7,8 +7,9 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use pageserver_api::shard::TenantShardId; use remote_storage::{RemotePath, RemoteStorageConfig}; +use serde; use serde::de::IntoDeserializer; -use std::env; +use std::{collections::HashMap, env}; use storage_broker::Uri; use utils::crashsafe::path_with_suffix_extension; use utils::id::ConnectionId; @@ -304,6 +305,26 @@ impl BuilderValue { } } +// Certain metadata (e.g. externally-addressable name, AZ) is delivered +// as a separate structure. This information is not neeed by the pageserver +// itself, it is only used for registering the pageserver with the control +// plane and/or storage controller. +// +#[derive(serde::Deserialize)] +pub(crate) struct NodeMetadata { + #[serde(rename = "host")] + pub(crate) postgres_host: String, + #[serde(rename = "port")] + pub(crate) postgres_port: u16, + pub(crate) http_host: String, + pub(crate) http_port: u16, + + // Deployment tools may write fields to the metadata file beyond what we + // use in this type: this type intentionally only names fields that require. + #[serde(flatten)] + pub(crate) other: HashMap, +} + // needed to simplify config construction struct PageServerConfigBuilder { listen_pg_addr: BuilderValue, @@ -761,6 +782,10 @@ impl PageServerConf { self.workdir.join("deletion") } + pub fn metadata_path(&self) -> Utf8PathBuf { + self.workdir.join("metadata.json") + } + pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf { // Encode a version in the filename, so that if we ever switch away from JSON we can // increment this. diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 3fcf3a983b36..1b3d76335dbc 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use futures::Future; use pageserver_api::{ + controller_api::NodeRegisterRequest, shard::TenantShardId, upcall_api::{ ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse, @@ -12,7 +13,10 @@ use tokio_util::sync::CancellationToken; use url::Url; use utils::{backoff, generation::Generation, id::NodeId}; -use crate::config::PageServerConf; +use crate::{ + config::{NodeMetadata, PageServerConf}, + virtual_file::on_fatal_io_error, +}; /// The Pageserver's client for using the control plane API: this is a small subset /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md) @@ -32,6 +36,7 @@ pub enum RetryForeverError { pub trait ControlPlaneGenerationsApi { fn re_attach( &self, + conf: &PageServerConf, ) -> impl Future, RetryForeverError>> + Send; fn validate( &self, @@ -110,13 +115,59 @@ impl ControlPlaneClient { impl ControlPlaneGenerationsApi for ControlPlaneClient { /// Block until we get a successful response, or error out if we are shut down - async fn re_attach(&self) -> Result, RetryForeverError> { + async fn re_attach( + &self, + conf: &PageServerConf, + ) -> Result, RetryForeverError> { let re_attach_path = self .base_url .join("re-attach") .expect("Failed to build re-attach path"); + + // Include registration content in the re-attach request if a metadata file is readable + let metadata_path = conf.metadata_path(); + let register = match tokio::fs::read_to_string(&metadata_path).await { + Ok(metadata_str) => match serde_json::from_str::(&metadata_str) { + Ok(m) => { + // Since we run one time at startup, be generous in our logging and + // dump all metadata. + tracing::info!( + "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}", + m.postgres_host, + m.postgres_port, + m.http_host, + m.http_port, + m.other + ); + + Some(NodeRegisterRequest { + node_id: conf.id, + listen_pg_addr: m.postgres_host, + listen_pg_port: m.postgres_port, + listen_http_addr: m.http_host, + listen_http_port: m.http_port, + }) + } + Err(e) => { + tracing::error!("Unreadable metadata in {metadata_path}: {e}"); + None + } + }, + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + // This is legal: we may have been deployed with some external script + // doing registration for us. + tracing::info!("Metadata file not found at {metadata_path}"); + } else { + on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}")) + } + None + } + }; + let request = ReAttachRequest { node_id: self.node_id, + register, }; fail::fail_point!("control-plane-client-re-attach"); diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 313eb2663d7c..b6aea8fae8db 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -831,7 +831,10 @@ mod test { } impl ControlPlaneGenerationsApi for MockControlPlane { - async fn re_attach(&self) -> Result, RetryForeverError> { + async fn re_attach( + &self, + _conf: &PageServerConf, + ) -> Result, RetryForeverError> { unimplemented!() } async fn validate( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index fc08b3c82e27..38274448b350 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -295,7 +295,7 @@ async fn init_load_generations( } else if let Some(client) = ControlPlaneClient::new(conf, cancel) { info!("Calling control plane API to re-attach tenants"); // If we are configured to use the control plane API, then it is the source of truth for what tenants to load. - match client.re_attach().await { + match client.re_attach(conf).await { Ok(tenants) => tenants, Err(RetryForeverError::ShuttingDown) => { anyhow::bail!("Shut down while waiting for control plane re-attach response") diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b7196a25561a..975c6d865b0c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -519,9 +519,9 @@ def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEn self.env = NeonEnv(self) return self.env - def start(self): + def start(self, register_pageservers=False): assert self.env is not None, "environment is not already initialized, call init() first" - self.env.start() + self.env.start(register_pageservers=register_pageservers) def init_start( self, @@ -1112,7 +1112,7 @@ def __init__(self, config: NeonEnvBuilder): log.info(f"Config: {cfg}") self.neon_cli.init(cfg, force=config.config_init_force) - def start(self): + def start(self, register_pageservers=False): # storage controller starts first, so that pageserver /re-attach calls don't # bounce through retries on startup self.storage_controller.start() @@ -1124,6 +1124,11 @@ def storage_controller_ready(): # reconcile. wait_until(30, 1, storage_controller_ready) + if register_pageservers: + # Special case for forward compat tests, this can be removed later. + for pageserver in self.pageservers: + self.storage_controller.node_register(pageserver) + # Start up broker, pageserver and all safekeepers futs = [] with concurrent.futures.ThreadPoolExecutor( @@ -1712,10 +1717,8 @@ def pageserver_start( id: int, overrides: Tuple[str, ...] = (), extra_env_vars: Optional[Dict[str, str]] = None, - register: bool = True, ) -> "subprocess.CompletedProcess[str]": - register_str = "true" if register else "false" - start_args = ["pageserver", "start", f"--id={id}", *overrides, f"--register={register_str}"] + start_args = ["pageserver", "start", f"--id={id}", *overrides] storage = self.env.pageserver_remote_storage append_pageserver_param_overrides( params_to_update=start_args, @@ -2066,6 +2069,8 @@ def node_register(self, node: NeonPageserver): "node_id": int(node.id), "listen_http_addr": "localhost", "listen_http_port": node.service_port.http, + "listen_pg_addr": "localhost", + "listen_pg_port": node.service_port.pg, } log.info(f"node_register({body})") self.request( @@ -2233,7 +2238,6 @@ def start( self, overrides: Tuple[str, ...] = (), extra_env_vars: Optional[Dict[str, str]] = None, - register: bool = True, ) -> "NeonPageserver": """ Start the page server. @@ -2243,7 +2247,7 @@ def start( assert self.running is False self.env.neon_cli.pageserver_start( - self.id, overrides=overrides, extra_env_vars=extra_env_vars, register=register + self.id, overrides=overrides, extra_env_vars=extra_env_vars ) self.running = True return self diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 618ac637855e..5f815d3e6c52 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -242,7 +242,7 @@ def test_forward_compatibility( # everything else: our test code is written for latest CLI args. env.neon_local_binpath = neon_local_binpath - neon_env_builder.start() + neon_env_builder.start(register_pageservers=True) check_neon_works( env, diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index d1acb9817e5d..3ca13a904d9d 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -205,6 +205,9 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): sk.start() env.storage_controller.start() + # We will start a pageserver with no control_plane_api set, so it won't be able to self-register + env.storage_controller.node_register(env.pageserver) + env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',)) env.neon_cli.create_tenant( @@ -511,7 +514,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP env.pageserver.start( overrides=("--pageserver-config-override=control_plane_emergency_mode=true",), - register=False, ) # The pageserver should provide service to clients diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 6b7cd9d8290d..7a0707b564ae 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -278,13 +278,12 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: env.pageservers[0].allowed_errors.append(".*Emergency mode!.*") env.pageservers[0].start( overrides=("--pageserver-config-override=control_plane_emergency_mode=true",), - register=False, ) origin_ps = env.pageservers[0] # This is the pageserver managed by the sharding service, where the tenant # will be attached after onboarding - env.pageservers[1].start(register=True) + env.pageservers[1].start() dest_ps = env.pageservers[1] virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)