diff --git a/Cargo.lock b/Cargo.lock index 6ae5aac12797..865fb3388960 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -666,34 +666,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum" -version = "0.6.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" -dependencies = [ - "async-trait", - "axum-core 0.3.4", - "bitflags 1.3.2", - "bytes", - "futures-util", - "http 0.2.9", - "http-body 0.4.5", - "hyper 0.14.30", - "itoa", - "matchit 0.7.0", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper 0.1.2", - "tower", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.7.5" @@ -701,7 +673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", - "axum-core 0.4.5", + "axum-core", "base64 0.21.1", "bytes", "futures-util", @@ -731,23 +703,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum-core" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 0.2.9", - "http-body 0.4.5", - "mime", - "rustversion", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.4.5" @@ -971,7 +926,7 @@ dependencies = [ "clang-sys", "itertools 0.12.1", "log", - "prettyplease 0.2.17", + "prettyplease", "proc-macro2", "quote", "regex", @@ -2454,15 +2409,6 @@ dependencies = [ "digest", ] -[[package]] -name = "home" -version = "0.5.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" -dependencies = [ - "windows-sys 0.52.0", -] - [[package]] name = "hostname" version = "0.4.0" @@ -2657,14 +2603,15 @@ dependencies = [ [[package]] name = "hyper-timeout" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "pin-project-lite", "tokio", - "tokio-io-timeout", + "tower-service", ] [[package]] @@ -3470,7 +3417,7 @@ dependencies = [ "opentelemetry-http", "opentelemetry-proto", "opentelemetry_sdk", - "prost 0.13.3", + "prost", "reqwest 0.12.4", "thiserror", ] @@ -3483,8 +3430,8 @@ checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" dependencies = [ "opentelemetry", "opentelemetry_sdk", - "prost 0.13.3", - "tonic 0.12.3", + "prost", + "tonic", ] [[package]] @@ -4178,16 +4125,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "prettyplease" -version = "0.1.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" -dependencies = [ - "proc-macro2", - "syn 1.0.109", -] - [[package]] name = "prettyplease" version = "0.2.17" @@ -4258,16 +4195,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "prost" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" -dependencies = [ - "bytes", - "prost-derive 0.11.9", -] - [[package]] name = "prost" version = "0.13.3" @@ -4275,42 +4202,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" dependencies = [ "bytes", - "prost-derive 0.13.3", + "prost-derive", ] [[package]] name = "prost-build" -version = "0.11.9" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.4.1", - "itertools 0.10.5", - "lazy_static", + "heck 0.5.0", + "itertools 0.12.1", "log", "multimap", + "once_cell", "petgraph", - "prettyplease 0.1.25", - "prost 0.11.9", + "prettyplease", + "prost", "prost-types", "regex", - "syn 1.0.109", + "syn 2.0.52", "tempfile", - "which", -] - -[[package]] -name = "prost-derive" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" -dependencies = [ - "anyhow", - "itertools 0.10.5", - "proc-macro2", - "quote", - "syn 1.0.109", ] [[package]] @@ -4328,11 +4241,11 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.11.9" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" dependencies = [ - "prost 0.11.9", + "prost", ] [[package]] @@ -5094,6 +5007,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebbbdb961df0ad3f2652da8f3fdc4b36122f568f968f45ad3316f26c025c677b" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.2", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.2" @@ -5119,6 +5047,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.1", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.2" @@ -5194,6 +5135,7 @@ dependencies = [ "fail", "futures", "hex", + "http 1.1.0", "humantime", "hyper 0.14.30", "metrics", @@ -5750,19 +5692,22 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", + "bytes", "clap", "const_format", "futures", "futures-core", "futures-util", + "http-body-util", "humantime", - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "metrics", "once_cell", "parking_lot 0.12.1", - "prost 0.11.9", + "prost", "tokio", - "tonic 0.9.2", + "tonic", "tonic-build", "tracing", "utils", @@ -6306,6 +6251,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.7", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.16" @@ -6395,54 +6351,34 @@ dependencies = [ "winnow", ] -[[package]] -name = "tonic" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" -dependencies = [ - "async-stream", - "async-trait", - "axum 0.6.20", - "base64 0.21.1", - "bytes", - "futures-core", - "futures-util", - "h2 0.3.26", - "http 0.2.9", - "http-body 0.4.5", - "hyper 0.14.30", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost 0.11.9", - "rustls-native-certs 0.6.2", - "rustls-pemfile 1.0.2", - "tokio", - "tokio-rustls 0.24.0", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tonic" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ + "async-stream", "async-trait", + "axum", "base64 0.22.1", "bytes", + "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", "http-body-util", + "hyper 1.4.1", + "hyper-timeout", + "hyper-util", "percent-encoding", "pin-project", - "prost 0.13.3", + "prost", + "rustls-native-certs 0.8.0", + "rustls-pemfile 2.1.1", + "socket2", + "tokio", + "tokio-rustls 0.26.0", "tokio-stream", + "tower", "tower-layer", "tower-service", "tracing", @@ -6450,15 +6386,16 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.9.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" dependencies = [ - "prettyplease 0.1.25", + "prettyplease", "proc-macro2", "prost-build", + "prost-types", "quote", - "syn 1.0.109", + "syn 2.0.52", ] [[package]] @@ -6864,7 +6801,7 @@ name = "vm_monitor" version = "0.1.0" dependencies = [ "anyhow", - "axum 0.7.5", + "axum", "cgroups-rs", "clap", "futures", @@ -7095,18 +7032,6 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "which" -version = "4.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" -dependencies = [ - "either", - "home", - "once_cell", - "rustix", -] - [[package]] name = "whoami" version = "1.5.1" @@ -7335,9 +7260,10 @@ version = "0.1.0" dependencies = [ "ahash", "anyhow", + "axum", + "axum-core", "base64 0.21.1", "base64ct", - "bitflags 2.4.1", "bytes", "camino", "cc", @@ -7365,7 +7291,6 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "indexmap 1.9.3", - "itertools 0.10.5", "itertools 0.12.1", "lazy_static", "libc", @@ -7377,15 +7302,15 @@ dependencies = [ "num-traits", "once_cell", "parquet", + "prettyplease", "proc-macro2", - "prost 0.11.9", + "prost", "quote", "rand 0.8.5", "regex", "regex-automata 0.4.3", "regex-syntax 0.8.2", "reqwest 0.12.4", - "rustls 0.21.11", "scopeguard", "serde", "serde_json", @@ -7401,9 +7326,10 @@ dependencies = [ "time", "time-macros", "tokio", - "tokio-rustls 0.24.0", + "tokio-stream", "tokio-util", "toml_edit", + "tonic", "tower", "tracing", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index ed7dda235a0f..dde80f502087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,7 @@ pbkdf2 = { version = "0.12.1", features = ["simple", "std"] } pin-project-lite = "0.2" procfs = "0.16" prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency -prost = "0.11" +prost = "0.13" rand = "0.8" redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] } regex = "1.10.2" @@ -178,7 +178,7 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" -tonic = {version = "0.9", features = ["tls", "tls-roots"]} +tonic = {version = "0.12.3", features = ["tls", "tls-roots"]} tower-service = "0.3.2" tracing = "0.1" tracing-error = "0.2" @@ -246,7 +246,7 @@ criterion = "0.5.1" rcgen = "0.12" rstest = "0.18" camino-tempfile = "1.0.2" -tonic-build = "0.9" +tonic-build = "0.12" [patch.crates-io] diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 109d315d67a4..284db005c86c 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -402,8 +402,7 @@ fn start_postgres( ) -> Result<(Option, StartPostgresResult)> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); - state.status = ComputeStatus::Init; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::Init, &compute.state_changed); info!( "running compute with features: {:?}", diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ba7b4f37df0f..3e558a7d3ca0 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -109,6 +109,18 @@ impl ComputeState { metrics: ComputeMetrics::default(), } } + + pub fn set_status(&mut self, status: ComputeStatus, state_changed: &Condvar) { + let prev = self.status; + info!("Changing compute status from {} to {}", prev, status); + self.status = status; + state_changed.notify_all(); + } + + pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) { + self.error = Some(format!("{err:?}")); + self.set_status(ComputeStatus::Failed, state_changed); + } } impl Default for ComputeState { @@ -303,15 +315,12 @@ impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { let mut state = self.state.lock().unwrap(); - state.status = status; - self.state_changed.notify_all(); + state.set_status(status, &self.state_changed); } pub fn set_failed_status(&self, err: anyhow::Error) { let mut state = self.state.lock().unwrap(); - state.error = Some(format!("{err:?}")); - state.status = ComputeStatus::Failed; - self.state_changed.notify_all(); + state.set_failed_status(err, &self.state_changed); } pub fn get_status(&self) -> ComputeStatus { diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index 7bd0e4938df8..a2043529a192 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -24,8 +24,7 @@ fn configurator_main_loop(compute: &Arc) { // Re-check the status after waking up if state.status == ComputeStatus::ConfigurationPending { info!("got configuration request"); - state.status = ComputeStatus::Configuration; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::Configuration, &compute.state_changed); drop(state); let mut new_status = ComputeStatus::Failed; diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 43d29402bcfd..fade3bbe6d85 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -288,8 +288,7 @@ async fn handle_configure_request( return Err((msg, StatusCode::PRECONDITION_FAILED)); } state.pspec = Some(parsed_spec); - state.status = ComputeStatus::ConfigurationPending; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed); drop(state); info!("set new spec and notified waiters"); } @@ -362,15 +361,15 @@ async fn handle_terminate_request(compute: &Arc) -> Result<(), (Str } if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { let msg = format!( - "invalid compute status for termination request: {:?}", - state.status.clone() + "invalid compute status for termination request: {}", + state.status ); return Err((msg, StatusCode::PRECONDITION_FAILED)); } - state.status = ComputeStatus::TerminationPending; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::TerminationPending, &compute.state_changed); drop(state); } + forward_termination_signal(); info!("sent signal and notified waiters"); @@ -384,7 +383,8 @@ async fn handle_terminate_request(compute: &Arc) -> Result<(), (Str while state.status != ComputeStatus::Terminated { state = c.state_changed.wait(state).unwrap(); info!( - "waiting for compute to become Terminated, current status: {:?}", + "waiting for compute to become {}, current status: {:?}", + ComputeStatus::Terminated, state.status ); } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index aa9405d28d5e..73f3d1006a11 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::fs::File; use std::path::Path; use std::str::FromStr; @@ -189,6 +190,15 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { let mut xact = client.transaction()?; let existing_roles: Vec = get_existing_roles(&mut xact)?; + let mut jwks_roles = HashSet::new(); + if let Some(local_proxy) = &spec.local_proxy_config { + for jwks_setting in local_proxy.jwks.iter().flatten() { + for role_name in &jwks_setting.role_names { + jwks_roles.insert(role_name.clone()); + } + } + } + // Print a list of existing Postgres roles (only in debug mode) if span_enabled!(Level::INFO) { let mut vec = Vec::new(); @@ -308,6 +318,9 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser", name.pg_quote() ); + if jwks_roles.contains(name.as_str()) { + query = format!("CREATE ROLE {}", name.pg_quote()); + } info!("running role create query: '{}'", &query); query.push_str(&role.to_pg_options()); xact.execute(query.as_str(), &[])?; diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index d05d625b0ad1..3f055b914a39 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -1,5 +1,7 @@ //! Structs representing the JSON formats used in the compute_ctl's HTTP API. +use std::fmt::Display; + use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize, Serializer}; @@ -58,6 +60,21 @@ pub enum ComputeStatus { Terminated, } +impl Display for ComputeStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ComputeStatus::Empty => f.write_str("empty"), + ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"), + ComputeStatus::Init => f.write_str("init"), + ComputeStatus::Running => f.write_str("running"), + ComputeStatus::Configuration => f.write_str("configuration"), + ComputeStatus::Failed => f.write_str("failed"), + ComputeStatus::TerminationPending => f.write_str("termination-pending"), + ComputeStatus::Terminated => f.write_str("terminated"), + } + } +} + fn rfc3339_serialize(x: &Option>, s: S) -> Result where S: Serializer, diff --git a/pgxn/neon/neon_pgversioncompat.h b/pgxn/neon/neon_pgversioncompat.h index e4754ec7ea82..6b4b355672a8 100644 --- a/pgxn/neon/neon_pgversioncompat.h +++ b/pgxn/neon/neon_pgversioncompat.h @@ -7,6 +7,7 @@ #define NEON_PGVERSIONCOMPAT_H #include "fmgr.h" +#include "storage/buf_internals.h" #if PG_MAJORVERSION_NUM < 17 #define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId) @@ -20,11 +21,24 @@ NInfoGetRelNumber(a) == NInfoGetRelNumber(b) \ ) -/* buftag population & RelFileNode/RelFileLocator rework */ +/* These macros were turned into static inline functions in v16 */ #if PG_MAJORVERSION_NUM < 16 +static inline bool +BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2) +{ + return BUFFERTAGS_EQUAL(*tag1, *tag2); +} + +static inline void +InitBufferTag(BufferTag *tag, const RelFileNode *rnode, + ForkNumber forkNum, BlockNumber blockNum) +{ + INIT_BUFFERTAG(*tag, *rnode, forkNum, blockNum); +} +#endif -#define InitBufferTag(tag, rfn, fn, bn) INIT_BUFFERTAG(*tag, *rfn, fn, bn) - +/* RelFileNode -> RelFileLocator rework */ +#if PG_MAJORVERSION_NUM < 16 #define USE_RELFILENODE #define RELFILEINFO_HDR "storage/relfilenode.h" @@ -73,8 +87,6 @@ #define USE_RELFILELOCATOR -#define BUFFERTAGS_EQUAL(a, b) BufferTagsEqual(&(a), &(b)) - #define RELFILEINFO_HDR "storage/relfilelocator.h" #define NRelFileInfo RelFileLocator diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 49ac16158e9b..672f70c1446d 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -215,7 +215,7 @@ typedef struct PrfHashEntry sizeof(BufferTag) \ ) -#define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag)) +#define SH_EQUAL(tb, a, b) (BufferTagsEqual(&(a)->buftag, &(b)->buftag)) #define SH_SCOPE static inline #define SH_DEFINE #define SH_DECLARE @@ -853,7 +853,7 @@ prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns, Assert(slot->status != PRFS_UNUSED); Assert(MyPState->ring_last <= ring_index && ring_index < MyPState->ring_unused); - Assert(BUFFERTAGS_EQUAL(slot->buftag, hashkey.buftag)); + Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag)); /* * If the caller specified a request LSN to use, only accept diff --git a/pgxn/neon_walredo/walredoproc.c b/pgxn/neon_walredo/walredoproc.c index f98aa1cbe7ec..6a656c8d3849 100644 --- a/pgxn/neon_walredo/walredoproc.c +++ b/pgxn/neon_walredo/walredoproc.c @@ -992,7 +992,7 @@ redo_block_filter(XLogReaderState *record, uint8 block_id) * If this block isn't one we are currently restoring, then return 'true' * so that this gets ignored */ - return !BUFFERTAGS_EQUAL(target_tag, target_redo_tag); + return !BufferTagsEqual(&target_tag, &target_redo_tag); } /* diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 78a3129abacb..ec08d022400a 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -23,6 +23,7 @@ crc32c.workspace = true fail.workspace = true hex.workspace = true humantime.workspace = true +http.workspace = true hyper0.workspace = true futures.workspace = true once_cell.workspace = true diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index a05c2d4559c2..047b4be8fadf 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -13,7 +13,7 @@ use desim::{ node_os::NodeOs, proto::{AnyMessage, NetEvent, NodeEvent}, }; -use hyper0::Uri; +use http::Uri; use safekeeper::{ safekeeper::{ProposerAcceptorMessage, SafeKeeper, ServerInfo, UNKNOWN_SERVER_VERSION}, state::{TimelinePersistentState, TimelineState}, diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index 849707fbc425..2d19472c362b 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -10,13 +10,16 @@ bench = [] [dependencies] anyhow.workspace = true async-stream.workspace = true +bytes.workspace = true clap = { workspace = true, features = ["derive"] } const_format.workspace = true futures.workspace = true futures-core.workspace = true futures-util.workspace = true humantime.workspace = true -hyper0 = { workspace = true, features = ["full"] } +hyper = { workspace = true, features = ["full"] } +http-body-util.workspace = true +hyper-util = "0.1" once_cell.workspace = true parking_lot.workspace = true prost.workspace = true diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index 9c56e9fab546..1fbb651656c8 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -10,16 +10,15 @@ //! //! Only safekeeper message is supported, but it is not hard to add something //! else with generics. - -extern crate hyper0 as hyper; - use clap::{command, Parser}; use futures_core::Stream; use futures_util::StreamExt; +use http_body_util::Full; +use hyper::body::Incoming; use hyper::header::CONTENT_TYPE; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, StatusCode}; +use hyper::service::service_fn; +use hyper::{Method, StatusCode}; +use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use parking_lot::RwLock; use std::collections::HashMap; use std::convert::Infallible; @@ -27,9 +26,11 @@ use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use tokio::net::TcpListener; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::time; +use tonic::body::{self, empty_body, BoxBody}; use tonic::codegen::Service; use tonic::transport::server::Connected; use tonic::Code; @@ -48,9 +49,7 @@ use storage_broker::proto::{ FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage, }; -use storage_broker::{ - parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, -}; +use storage_broker::{parse_proto_ttid, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR}; use utils::id::TenantTimelineId; use utils::logging::{self, LogFormat}; use utils::sentry_init::init_sentry; @@ -602,8 +601,8 @@ impl BrokerService for Broker { // We serve only metrics and healthcheck through http1. async fn http1_handler( - req: hyper::Request, -) -> Result, Infallible> { + req: hyper::Request, +) -> Result, Infallible> { let resp = match (req.method(), req.uri().path()) { (&Method::GET, "/metrics") => { let mut buffer = vec![]; @@ -614,16 +613,16 @@ async fn http1_handler( hyper::Response::builder() .status(StatusCode::OK) .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) + .body(body::boxed(Full::new(bytes::Bytes::from(buffer)))) .unwrap() } (&Method::GET, "/status") => hyper::Response::builder() .status(StatusCode::OK) - .body(Body::empty()) + .body(empty_body()) .unwrap(), _ => hyper::Response::builder() .status(StatusCode::NOT_FOUND) - .body(Body::empty()) + .body(empty_body()) .unwrap(), }; Ok(resp) @@ -665,52 +664,76 @@ async fn main() -> Result<(), Box> { }; let storage_broker_server = BrokerServiceServer::new(storage_broker_impl); - info!("listening on {}", &args.listen_addr); - // grpc is served along with http1 for metrics on a single port, hence we // don't use tonic's Server. - hyper::Server::bind(&args.listen_addr) - .http2_keep_alive_interval(Some(args.http2_keepalive_interval)) - .serve(make_service_fn(move |conn: &AddrStream| { - let storage_broker_server_cloned = storage_broker_server.clone(); - let connect_info = conn.connect_info(); - async move { - Ok::<_, Infallible>(service_fn(move |mut req| { - // That's what tonic's MakeSvc.call does to pass conninfo to - // the request handler (and where its request.remote_addr() - // expects it to find). - req.extensions_mut().insert(connect_info.clone()); - - // Technically this second clone is not needed, but consume - // by async block is apparently unavoidable. BTW, error - // message is enigmatic, see - // https://github.com/rust-lang/rust/issues/68119 - // - // We could get away without async block at all, but then we - // need to resort to futures::Either to merge the result, - // which doesn't caress an eye as well. - let mut storage_broker_server_svc = storage_broker_server_cloned.clone(); - async move { - if req.headers().get("content-type").map(|x| x.as_bytes()) - == Some(b"application/grpc") - { - let res_resp = storage_broker_server_svc.call(req).await; - // Grpc and http1 handlers have slightly different - // Response types: it is UnsyncBoxBody for the - // former one (not sure why) and plain hyper::Body - // for the latter. Both implement HttpBody though, - // and EitherBody is used to merge them. - res_resp.map(|resp| resp.map(EitherBody::Left)) - } else { - let res_resp = http1_handler(req).await; - res_resp.map(|resp| resp.map(EitherBody::Right)) - } + let tcp_listener = TcpListener::bind(&args.listen_addr).await?; + info!("listening on {}", &args.listen_addr); + loop { + let (stream, addr) = match tcp_listener.accept().await { + Ok(v) => v, + Err(e) => { + info!("couldn't accept connection: {e}"); + continue; + } + }; + + let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + builder.http1().timer(TokioTimer::new()); + builder + .http2() + .timer(TokioTimer::new()) + .keep_alive_interval(Some(args.http2_keepalive_interval)) + // This matches the tonic server default. It allows us to support production-like workloads. + .max_concurrent_streams(None); + + let storage_broker_server_cloned = storage_broker_server.clone(); + let connect_info = stream.connect_info(); + let service_fn_ = async move { + service_fn(move |mut req| { + // That's what tonic's MakeSvc.call does to pass conninfo to + // the request handler (and where its request.remote_addr() + // expects it to find). + req.extensions_mut().insert(connect_info.clone()); + + // Technically this second clone is not needed, but consume + // by async block is apparently unavoidable. BTW, error + // message is enigmatic, see + // https://github.com/rust-lang/rust/issues/68119 + // + // We could get away without async block at all, but then we + // need to resort to futures::Either to merge the result, + // which doesn't caress an eye as well. + let mut storage_broker_server_svc = storage_broker_server_cloned.clone(); + async move { + if req.headers().get("content-type").map(|x| x.as_bytes()) + == Some(b"application/grpc") + { + let res_resp = storage_broker_server_svc.call(req).await; + // Grpc and http1 handlers have slightly different + // Response types: it is UnsyncBoxBody for the + // former one (not sure why) and plain hyper::Body + // for the latter. Both implement HttpBody though, + // and `Either` is used to merge them. + res_resp.map(|resp| resp.map(http_body_util::Either::Left)) + } else { + let res_resp = http1_handler(req).await; + res_resp.map(|resp| resp.map(http_body_util::Either::Right)) } - })) + } + }) + } + .await; + + tokio::task::spawn(async move { + let res = builder + .serve_connection(TokioIo::new(stream), service_fn_) + .await; + + if let Err(e) = res { + info!("error serving connection from {addr}: {e}"); } - })) - .await?; - Ok(()) + }); + } } #[cfg(test)] diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 447591f89876..bc632a39f7bc 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -1,8 +1,3 @@ -extern crate hyper0 as hyper; - -use hyper::body::HttpBody; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::Duration; use tonic::codegen::StdError; use tonic::transport::{ClientTlsConfig, Endpoint}; @@ -96,56 +91,3 @@ pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result; - -// Provides impl HttpBody for two different types implementing it. Inspired by -// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs -pub enum EitherBody { - Left(A), - Right(B), -} - -impl HttpBody for EitherBody -where - A: HttpBody + Send + Unpin, - B: HttpBody + Send + Unpin, - A::Error: Into, - B::Error: Into, -{ - type Data = A::Data; - type Error = Box; - - fn is_end_stream(&self) -> bool { - match self { - EitherBody::Left(b) => b.is_end_stream(), - EitherBody::Right(b) => b.is_end_stream(), - } - } - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - match self.get_mut() { - EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err), - EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err), - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - match self.get_mut() { - EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), - EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), - } - } -} - -fn map_option_err>(err: Option>) -> Option> { - err.map(|e| e.map_err(Into::into)) -} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index ddfc14ce1b75..5bed02df4e11 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -17,9 +17,10 @@ license.workspace = true [dependencies] ahash = { version = "0.8" } anyhow = { version = "1", features = ["backtrace"] } +axum = { version = "0.7", features = ["ws"] } +axum-core = { version = "0.4", default-features = false, features = ["tracing"] } base64 = { version = "0.21", features = ["alloc"] } base64ct = { version = "1", default-features = false, features = ["std"] } -bitflags = { version = "2", default-features = false, features = ["std"] } bytes = { version = "1", features = ["serde"] } camino = { version = "1", default-features = false, features = ["serde1"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } @@ -43,11 +44,10 @@ hashbrown = { version = "0.14", features = ["raw"] } hex = { version = "0.4", features = ["serde"] } hmac = { version = "0.12", default-features = false, features = ["reset"] } hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["full"] } -hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["http1", "http2", "server"] } -hyper-util = { version = "0.1", features = ["http1", "http2", "server", "tokio"] } +hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] } +hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] } indexmap = { version = "1", default-features = false, features = ["std"] } -itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" } -itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" } +itertools = { version = "0.12" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } libc = { version = "0.2", features = ["extra_traits", "use_std"] } log = { version = "0.4", default-features = false, features = ["std"] } @@ -58,13 +58,12 @@ num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } once_cell = { version = "1" } parquet = { version = "53", default-features = false, features = ["zstd"] } -prost = { version = "0.11" } +prost = { version = "0.13", features = ["prost-derive"] } rand = { version = "0.8", features = ["small_rng"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.8" } reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive"] } serde_json = { version = "1", features = ["raw_value"] } @@ -77,10 +76,11 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures tikv-jemalloc-sys = { version = "0.5" } time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } -tokio-rustls = { version = "0.24" } +tokio-stream = { version = "0.1", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } -tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] } +tonic = { version = "0.12", features = ["tls-roots"] } +tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } url = { version = "2", features = ["serde"] } @@ -92,7 +92,6 @@ zstd-sys = { version = "2", default-features = false, features = ["legacy", "std [build-dependencies] ahash = { version = "0.8" } anyhow = { version = "1", features = ["backtrace"] } -bitflags = { version = "2", default-features = false, features = ["std"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } @@ -101,9 +100,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] } half = { version = "2", default-features = false, features = ["num-traits"] } hashbrown = { version = "0.14", features = ["raw"] } indexmap = { version = "1", default-features = false, features = ["std"] } -itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" } -itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" } -lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } +itertools = { version = "0.12" } libc = { version = "0.2", features = ["extra_traits", "use_std"] } log = { version = "0.4", default-features = false, features = ["std"] } memchr = { version = "2" } @@ -113,8 +110,9 @@ num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } once_cell = { version = "1" } parquet = { version = "53", default-features = false, features = ["zstd"] } +prettyplease = { version = "0.2", default-features = false, features = ["verbatim"] } proc-macro2 = { version = "1" } -prost = { version = "0.11" } +prost = { version = "0.13", features = ["prost-derive"] } quote = { version = "1" } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }