From f090e77a4fc2182e3bce1591b62359a52e7040ae Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sat, 21 Dec 2024 17:39:20 -0800 Subject: [PATCH] Initial implementations of the TPU Vortexor (#3258) * Initial implementations of the TPU Vortexor --- Cargo.lock | 42 +++++ Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 48 +---- streamer/src/nonblocking/testing_utilities.rs | 96 +++++++--- streamer/src/quic.rs | 10 +- vortexor/Cargo.toml | 61 ++++++ vortexor/Readme.md | 144 ++++++++++++++ vortexor/src/cli.rs | 177 ++++++++++++++++++ vortexor/src/lib.rs | 2 + vortexor/src/main.rs | 82 ++++++++ vortexor/src/vortexor.rs | 171 +++++++++++++++++ vortexor/tests/vortexor.rs | 76 ++++++++ 12 files changed, 837 insertions(+), 73 deletions(-) create mode 100644 vortexor/Cargo.toml create mode 100644 vortexor/Readme.md create mode 100644 vortexor/src/cli.rs create mode 100644 vortexor/src/lib.rs create mode 100644 vortexor/src/main.rs create mode 100644 vortexor/src/vortexor.rs create mode 100644 vortexor/tests/vortexor.rs diff --git a/Cargo.lock b/Cargo.lock index 9a856b3f1a31bb..9185a0fcf82398 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9953,6 +9953,48 @@ dependencies = [ "solana-serde-varint", ] +[[package]] +name = "solana-vortexor" +version = "2.2.0" +dependencies = [ + "assert_matches", + "async-channel", + "bytes", + "clap 2.33.3", + "crossbeam-channel", + "dashmap", + "futures 0.3.31", + "futures-util", + "governor", + "histogram", + "indexmap 2.7.0", + "itertools 0.12.1", + "libc", + "log", + "nix", + "pem", + "percentage", + "quinn", + "quinn-proto", + "rand 0.8.5", + "rustls 0.23.20", + "smallvec", + "socket2 0.5.8", + "solana-clap-utils", + "solana-logger", + "solana-measure", + "solana-metrics", + "solana-net-utils", + "solana-perf", + "solana-sdk", + "solana-streamer", + "solana-transaction-metrics-tracker", + "solana-version", + "thiserror 2.0.6", + "tokio", + "x509-parser", +] + [[package]] name = "solana-vote" version = "2.2.0" diff --git a/Cargo.toml b/Cargo.toml index 038041726e73c4..161a2843707ac7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -225,6 +225,7 @@ members = [ "upload-perf", "validator", "version", + "vortexor", "vote", "watchtower", "wen-restart", diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 002a00bb12674a..d873dd77925ed4 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1525,8 +1525,8 @@ pub mod test { nonblocking::{ quic::compute_max_allowed_uni_streams, testing_utilities::{ - get_client_config, make_client_endpoint, setup_quic_server, - SpawnTestServerResult, TestServerConfig, + check_multiple_streams, get_client_config, make_client_endpoint, + setup_quic_server, SpawnTestServerResult, TestServerConfig, }, }, quic::DEFAULT_TPU_COALESCE, @@ -1589,48 +1589,6 @@ pub mod test { } } - pub async fn check_multiple_streams( - receiver: Receiver, - server_address: SocketAddr, - ) { - let conn1 = Arc::new(make_client_endpoint(&server_address, None).await); - let conn2 = Arc::new(make_client_endpoint(&server_address, None).await); - let mut num_expected_packets = 0; - for i in 0..10 { - info!("sending: {}", i); - let c1 = conn1.clone(); - let c2 = conn2.clone(); - let mut s1 = c1.open_uni().await.unwrap(); - let mut s2 = c2.open_uni().await.unwrap(); - s1.write_all(&[0u8]).await.unwrap(); - s1.finish().unwrap(); - s2.write_all(&[0u8]).await.unwrap(); - s2.finish().unwrap(); - num_expected_packets += 2; - sleep(Duration::from_millis(200)).await; - } - let mut all_packets = vec![]; - let now = Instant::now(); - let mut total_packets = 0; - while now.elapsed().as_secs() < 10 { - if let Ok(packets) = receiver.try_recv() { - total_packets += packets.len(); - all_packets.push(packets) - } else { - sleep(Duration::from_secs(1)).await; - } - if total_packets == num_expected_packets { - break; - } - } - for batch in all_packets { - for p in batch.iter() { - assert_eq!(p.meta().size, 1); - } - } - assert_eq!(total_packets, num_expected_packets); - } - pub async fn check_multiple_writes( receiver: Receiver, server_address: SocketAddr, @@ -2049,7 +2007,7 @@ pub mod test { ) .unwrap(); - check_multiple_streams(receiver, server_address).await; + check_multiple_streams(receiver, server_address, None).await; assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0); assert_eq!(stats.total_new_streams.load(Ordering::Relaxed), 20); assert_eq!(stats.total_connections.load(Ordering::Relaxed), 2); diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index fc0c3a801784e0..78adfd3171b52c 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -12,7 +12,7 @@ use { }, streamer::StakedNodes, }, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Receiver}, quinn::{ crypto::rustls::QuicClientConfig, ClientConfig, Connection, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, @@ -25,8 +25,9 @@ use { std::{ net::{SocketAddr, UdpSocket}, sync::{atomic::AtomicBool, Arc, RwLock}, + time::{Duration, Instant}, }, - tokio::task::JoinHandle, + tokio::{task::JoinHandle, time::sleep}, }; pub fn get_client_config(keypair: &Keypair) -> ClientConfig { @@ -79,33 +80,35 @@ pub struct SpawnTestServerResult { pub stats: Arc, } +pub fn create_quic_server_sockets() -> Vec { + #[cfg(not(target_os = "windows"))] + { + use { + solana_net_utils::bind_to, + std::net::{IpAddr, Ipv4Addr}, + }; + (0..10) + .map(|_| { + bind_to( + IpAddr::V4(Ipv4Addr::LOCALHOST), + /*port*/ 0, + /*reuseport:*/ true, + ) + .unwrap() + }) + .collect::>() + } + #[cfg(target_os = "windows")] + { + vec![bind_to_localhost().unwrap()] + } +} + pub fn setup_quic_server( option_staked_nodes: Option, config: TestServerConfig, ) -> SpawnTestServerResult { - let sockets = { - #[cfg(not(target_os = "windows"))] - { - use { - solana_net_utils::bind_to, - std::net::{IpAddr, Ipv4Addr}, - }; - (0..10) - .map(|_| { - bind_to( - IpAddr::V4(Ipv4Addr::LOCALHOST), - /*port*/ 0, - /*reuseport:*/ true, - ) - .unwrap() - }) - .collect::>() - } - #[cfg(target_os = "windows")] - { - vec![bind_to_localhost().unwrap()] - } - }; + let sockets = create_quic_server_sockets(); setup_quic_server_with_sockets(sockets, option_staked_nodes, config) } @@ -180,3 +183,46 @@ pub async fn make_client_endpoint( .await .expect("Test server should be already listening on 'localhost'") } + +pub async fn check_multiple_streams( + receiver: Receiver, + server_address: SocketAddr, + client_keypair: Option<&Keypair>, +) { + let conn1 = Arc::new(make_client_endpoint(&server_address, client_keypair).await); + let conn2 = Arc::new(make_client_endpoint(&server_address, client_keypair).await); + let mut num_expected_packets = 0; + for i in 0..10 { + info!("sending: {}", i); + let c1 = conn1.clone(); + let c2 = conn2.clone(); + let mut s1 = c1.open_uni().await.unwrap(); + let mut s2 = c2.open_uni().await.unwrap(); + s1.write_all(&[0u8]).await.unwrap(); + s1.finish().unwrap(); + s2.write_all(&[0u8]).await.unwrap(); + s2.finish().unwrap(); + num_expected_packets += 2; + sleep(Duration::from_millis(200)).await; + } + let mut all_packets = vec![]; + let now = Instant::now(); + let mut total_packets = 0; + while now.elapsed().as_secs() < 10 { + if let Ok(packets) = receiver.try_recv() { + total_packets += packets.len(); + all_packets.push(packets) + } else { + sleep(Duration::from_secs(1)).await; + } + if total_packets == num_expected_packets { + break; + } + } + for batch in all_packets { + for p in batch.iter() { + assert_eq!(p.meta().size, 1); + } + } + assert_eq!(total_packets, num_expected_packets); +} diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 85760f096f1932..4a290d17c43714 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -564,6 +564,7 @@ pub fn spawn_server( ) } +#[derive(Clone)] pub struct QuicServerParams { pub max_connections_per_peer: usize, pub max_staked_connections: usize, @@ -632,8 +633,11 @@ pub fn spawn_server_multi( #[cfg(test)] mod test { use { - super::*, crate::nonblocking::quic::test::*, crossbeam_channel::unbounded, - solana_net_utils::bind_to_localhost, std::net::SocketAddr, + super::*, + crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams}, + crossbeam_channel::unbounded, + solana_net_utils::bind_to_localhost, + std::net::SocketAddr, }; fn setup_quic_server() -> ( @@ -723,7 +727,7 @@ mod test { .unwrap(); let runtime = rt("solQuicTestRt".to_string()); - runtime.block_on(check_multiple_streams(receiver, server_address)); + runtime.block_on(check_multiple_streams(receiver, server_address, None)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); } diff --git a/vortexor/Cargo.toml b/vortexor/Cargo.toml new file mode 100644 index 00000000000000..f87a457269ce25 --- /dev/null +++ b/vortexor/Cargo.toml @@ -0,0 +1,61 @@ +[package] +name = "solana-vortexor" +description = "Solana TPU Vortexor" +documentation = "https://docs.rs/solana-vortexor" +default-run = "solana-vortexor" +publish = false +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +async-channel = { workspace = true } +bytes = { workspace = true } +clap = { workspace = true } +crossbeam-channel = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +futures-util = { workspace = true } +governor = { workspace = true } +histogram = { workspace = true } +indexmap = { workspace = true } +itertools = { workspace = true } +libc = { workspace = true } +log = { workspace = true } +nix = { workspace = true, features = ["net"] } +pem = { workspace = true } +percentage = { workspace = true } +quinn = { workspace = true } +quinn-proto = { workspace = true } +rand = { workspace = true } +rustls = { workspace = true } +smallvec = { workspace = true } +socket2 = { workspace = true } +solana-clap-utils = { workspace = true } +solana-measure = { workspace = true } +solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } +solana-perf = { workspace = true } +solana-sdk = { workspace = true } +solana-streamer = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } +solana-version = { workspace = true } + +thiserror = { workspace = true } +tokio = { workspace = true, features = ["full"] } +x509-parser = { workspace = true } + +[dev-dependencies] +assert_matches = { workspace = true } +solana-logger = { workspace = true } +solana-streamer = { workspace = true, features = ["dev-context-only-utils"] } + +[lib] +crate-type = ["lib"] +name = "solana_vortexor" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/vortexor/Readme.md b/vortexor/Readme.md new file mode 100644 index 00000000000000..f12462d9019e0d --- /dev/null +++ b/vortexor/Readme.md @@ -0,0 +1,144 @@ +# Introduction +The Vortexor is a service that can offload the tasks of receiving transactions +from the public, performing signature verifications, and deduplications from the +core validator, enabling it to focus on processing and executing the +transactions. The verified and filtered transactions will then be forwarded to +the validators linked with the Vortexor. This setup makes the TPU transaction +ingestion and verification more scalable compared to a single-node solution. + +# Architecture +Figure 1 describes the architecture diagram of the Vortexor and its +relationship with the validator. + + +---------------------+ + | Solana | + | RPC / Web Socket | + | Service | + +---------------------+ + | + v + +--------------------- VORTEXOR ------------------------+ + | | | + | +------------------+ | + | | StakedKeyUpdater | | + | +------------------+ | + | | | + | v | + | +-------------+ +--------------------+ | + TPU --> | | TPU Streamer| -----> | SigVerifier/Dedup | | + /QUIC | +-------------+ +--------------------+ | + | | | | + | v v | + | +----------------+ +------------------------+ | + | | Subscription |<----| VerifiedPacketForwarder| | + | | Management | +------------------------+ | + | +----------------+ | | + +--------------------------------|----------------------+ + ^ | (UDP/QUIC) + Heartbeat/subscriptions | | + | v + +-------------------- AGAVE VALIDATOR ------------------+ + | | + | +----------------+ +-----------------------+ | + Config-> | | Subscription | | VerifiedPacketReceiver| | + Admin RPC | | Management | | | | + | +----------------+ +-----------------------+ | + | | | | + | | v | + | v +-----------+ | + | +--------------------+ | Banking | | + Gossip <--------|--| Gossip/Contact Info| | Stage | | + | +--------------------+ +-----------+ | + +-------------------------------------------------------+ + + Figure 1. + +The Vortexor is a new executable that can be deployed on nodes separate from +the core Agave validator. It can also be deployed on the same node as the core +validator if the node has sufficient performance bandwidth. + +It has the following major components: + +1. **The TPU Streamer** – This is built from the existing QUIC-based TPU streamer. +2. **The SigVerify/Dedup** – This is refactored from the existing SigVerify component. +3. **Subscription Management** – Responsible for managing subscriptions + from the validator. Actions include subscribing to transactions and canceling subscriptions. +4. **VerifiedPacketForwarder** – Responsible for forwarding verified + transaction packets to subscribed validators. It uses UDP/QUIC to send transactions. + Validators can bind to private addresses for receiving the verified packets. + Firewalls can also restrict transactions to the chosen Vortexor. +5. **The Vortexor StakedKeyUpdater** – Retrieves the stake map from the network and makes + it available to the TPU streamer for stake-weighted QoS. + +Validators include a new component that receives verified packets sent from +the Vortexor and directly sends them to the banking stage. The validator's +Admin RPC is enhanced to configure peering with the Vortexor. The ContactInfo of +the validator updates with the Vortexor's address when linked. + +# Relationship of Validator and Vortexor +The validator broadcasts one TPU address served by a Vortexor. A validator can +switch its paired Vortexor to another. A Vortexor, depending on its performance, +can serve one or more validators. The architecture also supports multiple +Vortexors sharing the TPU address behind a load balancer for scalability: + + Load Balancer + | + v + __________________________ + | | | + | | | + Vortexor Vortexor Vortexor + | | | + | | | + __________________________ + | + v + Validator + + Figure 2. + +When the validator is in 'Paired' mode, receiving active transactions or +heartbeat messages from the Vortexor, it receives TPU transactions solely from +the Vortexor. It publishes the TPU address via gossip. The regular TPU and TPU +forward services are disabled for security and performance reasons. + +The design assumes a trust relationship between the Vortexor and the validator, +achieved through a private network, firewall rules, or TLS verification. QUIC, +used for the VerifiedPacketReceiver, supports QoS to prioritize Vortexor traffic. + +Heartbeat messages from the Vortexor inform the validator of its status. If no +transactions or heartbeats are received within a configurable timeout, the +validator may switch to another Vortexor or revert to its built-in TPU streamer. + +# Deployment Considerations +Using a Vortexor enhances validator scalability but introduces complexities: + +1. **Deployment Complexity**: For validators not using a Vortexor, there is no + impact. For those using a Vortexor, additional setup is required. To minimize + complexity, the Vortexor and validator require minimal configuration changes + and provide clear documentation for pairing. Automatic fallback ensures + continued operation if the connection between the Vortexor and validator + breaks. + +2. **Latency**: An additional hop exists between the original client and the + leader validator. Latency is minimized by deploying the Vortexor on a node + with low-latency connections to the validator. UDP forwarding is supported + for speed. + +3. **Security**: The implicit trust between the validator and Vortexor is + safeguarded by private networks, firewalls, and QUIC with public key-based + rules. Validators can optionally enforce re-verification of transactions. + +4. **Compatibility**: The solution is compatible with existing setups, such as + jito-relayers. The Vortexor CLI mimics jito-relayer's CLI to reduce friction + for migrating users. + +5. **Networking**: The Vortexor can be exposed directly to the internet or + placed behind a load balancer. Communication with the validator is + encouraged via private networks for security and performance. + +# Upgrade Considerations +Operators can decide whether to adopt Vortexors without concerns about network +protocol changes. Upgrading involves specifying the Vortexor's TPU address and +verified packet receiver network address via CLI or Admin RPC. The transition is +designed to be seamless for operators. diff --git a/vortexor/src/cli.rs b/vortexor/src/cli.rs new file mode 100644 index 00000000000000..f813fb8e01bb1d --- /dev/null +++ b/vortexor/src/cli.rs @@ -0,0 +1,177 @@ +use { + clap::{crate_description, crate_name, App, AppSettings, Arg}, + solana_clap_utils::input_validators::{is_keypair_or_ask_keyword, is_parsable}, + solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}, + solana_sdk::quic::QUIC_PORT_OFFSET, + solana_streamer::{ + nonblocking::quic::{ + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS, + }, + quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + }, +}; + +pub const DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; +pub const DEFAULT_NUM_QUIC_ENDPOINTS: usize = 8; + +pub struct DefaultArgs { + pub bind_address: String, + pub dynamic_port_range: String, + pub max_connections_per_peer: String, + pub max_tpu_staked_connections: String, + pub max_tpu_unstaked_connections: String, + pub max_fwd_staked_connections: String, + pub max_fwd_unstaked_connections: String, + pub max_streams_per_ms: String, + pub max_connections_per_ipaddr_per_min: String, + pub num_quic_endpoints: String, +} + +impl Default for DefaultArgs { + fn default() -> Self { + Self { + bind_address: "0.0.0.0".to_string(), + dynamic_port_range: format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1), + max_connections_per_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER.to_string(), + max_tpu_staked_connections: MAX_STAKED_CONNECTIONS.to_string(), + max_tpu_unstaked_connections: MAX_UNSTAKED_CONNECTIONS.to_string(), + max_fwd_staked_connections: MAX_STAKED_CONNECTIONS + .saturating_add(MAX_UNSTAKED_CONNECTIONS) + .to_string(), + max_fwd_unstaked_connections: 0.to_string(), + max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS.to_string(), + max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE + .to_string(), + num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS.to_string(), + } + } +} + +fn port_range_validator(port_range: String) -> Result<(), String> { + if let Some((start, end)) = solana_net_utils::parse_port_range(&port_range) { + if end.saturating_sub(start) < MINIMUM_VALIDATOR_PORT_RANGE_WIDTH { + Err(format!( + "Port range is too small. Try --dynamic-port-range {}-{}", + start, + start.saturating_add(MINIMUM_VALIDATOR_PORT_RANGE_WIDTH) + )) + } else if end.checked_add(QUIC_PORT_OFFSET).is_none() { + Err("Invalid dynamic_port_range.".to_string()) + } else { + Ok(()) + } + } else { + Err("Invalid port range".to_string()) + } +} + +pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { + return App::new(crate_name!()) + .about(crate_description!()) + .version(version) + .global_setting(AppSettings::ColoredHelp) + .global_setting(AppSettings::InferSubcommands) + .global_setting(AppSettings::UnifiedHelpMessage) + .global_setting(AppSettings::VersionlessSubcommands) + .arg( + Arg::with_name("identity") + .short("i") + .long("identity") + .value_name("KEYPAIR") + .takes_value(true) + .validator(is_keypair_or_ask_keyword) + .help("Vortexor identity keypair"), + ) + .arg( + Arg::with_name("bind_address") + .long("bind-address") + .value_name("HOST") + .takes_value(true) + .validator(solana_net_utils::is_host) + .default_value(&default_args.bind_address) + .help("IP address to bind the validator ports"), + ) + .arg( + Arg::with_name("dynamic_port_range") + .long("dynamic-port-range") + .value_name("MIN_PORT-MAX_PORT") + .takes_value(true) + .default_value(&default_args.dynamic_port_range) + .validator(port_range_validator) + .help("Range to use for dynamically assigned ports"), + ) + .arg( + Arg::with_name("max_connections_per_peer") + .long("max-connections-per-peer") + .takes_value(true) + .default_value(&default_args.max_connections_per_peer) + .validator(is_parsable::) + .help("Controls the max concurrent connections per IpAddr."), + ) + .arg( + Arg::with_name("max_tpu_staked_connections") + .long("max-tpu-staked-connections") + .takes_value(true) + .default_value(&default_args.max_tpu_staked_connections) + .validator(is_parsable::) + .help("Controls the max concurrent connections for TPU from staked nodes."), + ) + .arg( + Arg::with_name("max_tpu_unstaked_connections") + .long("max-tpu-unstaked-connections") + .takes_value(true) + .default_value(&default_args.max_tpu_unstaked_connections) + .validator(is_parsable::) + .help("Controls the max concurrent connections fort TPU from unstaked nodes."), + ) + .arg( + Arg::with_name("max_fwd_staked_connections") + .long("max-fwd-staked-connections") + .takes_value(true) + .default_value(&default_args.max_fwd_staked_connections) + .validator(is_parsable::) + .help("Controls the max concurrent connections for TPU-forward from staked nodes."), + ) + .arg( + Arg::with_name("max_fwd_unstaked_connections") + .long("max-fwd-unstaked-connections") + .takes_value(true) + .default_value(&default_args.max_fwd_unstaked_connections) + .validator(is_parsable::) + .help("Controls the max concurrent connections for TPU-forward from unstaked nodes."), + ) + .arg( + Arg::with_name("max_connections_per_ipaddr_per_minute") + .long("max-connections-per-ipaddr-per-minute") + .takes_value(true) + .default_value(&default_args.max_connections_per_ipaddr_per_min) + .validator(is_parsable::) + .help("Controls the rate of the clients connections per IpAddr per minute."), + ) + .arg( + Arg::with_name("num_quic_endpoints") + .long("num-quic-endpoints") + .takes_value(true) + .default_value(&default_args.num_quic_endpoints) + .validator(is_parsable::) + .help("The number of QUIC endpoints used for TPU and TPU-Forward. It can be increased to \ + increase network ingest throughput, at the expense of higher CPU and general \ + validator load."), + ) + .arg( + Arg::with_name("max_streams_per_ms") + .long("max-streams-per-ms") + .takes_value(true) + .default_value(&default_args.max_streams_per_ms) + .validator(is_parsable::) + .help("Max streams per second for a streamer."), + ) + .arg( + Arg::with_name("tpu_coalesce_ms") + .long("tpu-coalesce-ms") + .value_name("MILLISECS") + .takes_value(true) + .validator(is_parsable::) + .help("Milliseconds to wait in the TPU receiver for packet coalescing."), + ); +} diff --git a/vortexor/src/lib.rs b/vortexor/src/lib.rs new file mode 100644 index 00000000000000..e92563196fb2d4 --- /dev/null +++ b/vortexor/src/lib.rs @@ -0,0 +1,2 @@ +pub mod cli; +pub mod vortexor; diff --git a/vortexor/src/main.rs b/vortexor/src/main.rs new file mode 100644 index 00000000000000..44f79b262f0113 --- /dev/null +++ b/vortexor/src/main.rs @@ -0,0 +1,82 @@ +use { + clap::{value_t, value_t_or_exit}, + crossbeam_channel::unbounded, + solana_clap_utils::input_parsers::keypair_of, + solana_sdk::net::DEFAULT_TPU_COALESCE, + solana_streamer::streamer::StakedNodes, + solana_vortexor::{ + cli::{app, DefaultArgs}, + vortexor::Vortexor, + }, + std::{ + sync::{atomic::AtomicBool, Arc, RwLock}, + time::Duration, + }, +}; + +pub fn main() { + let default_args = DefaultArgs::default(); + let solana_version = solana_version::version!(); + let cli_app = app(solana_version, &default_args); + let matches = cli_app.get_matches(); + + let identity_keypair = keypair_of(&matches, "identity").unwrap_or_else(|| { + clap::Error::with_description( + "The --identity argument is required", + clap::ErrorKind::ArgumentNotFound, + ) + .exit(); + }); + + let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap()) + .expect("invalid bind_address"); + let max_connections_per_peer = value_t_or_exit!(matches, "max_connections_per_peer", u64); + let max_tpu_staked_connections = value_t_or_exit!(matches, "max_tpu_staked_connections", u64); + let max_fwd_staked_connections = value_t_or_exit!(matches, "max_fwd_staked_connections", u64); + let max_fwd_unstaked_connections = + value_t_or_exit!(matches, "max_fwd_unstaked_connections", u64); + + let max_tpu_unstaked_connections = + value_t_or_exit!(matches, "max_tpu_unstaked_connections", u64); + + let max_connections_per_ipaddr_per_min = + value_t_or_exit!(matches, "max_connections_per_ipaddr_per_minute", u64); + let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", u64); + let tpu_coalesce = value_t!(matches, "tpu_coalesce_ms", u64) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_TPU_COALESCE); + + let dynamic_port_range = + solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) + .expect("invalid dynamic_port_range"); + + let max_streams_per_ms = value_t_or_exit!(matches, "max_streams_per_ms", u64); + let exit = Arc::new(AtomicBool::new(false)); + // To be linked with the Tpu sigverify and forwarder service + let (tpu_sender, _tpu_receiver) = unbounded(); + let (tpu_fwd_sender, _tpu_fwd_receiver) = unbounded(); + + let tpu_sockets = + Vortexor::create_tpu_sockets(bind_address, dynamic_port_range, num_quic_endpoints); + + // To be linked with StakedNodes service. + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); + + let vortexor = Vortexor::create_vortexor( + tpu_sockets, + staked_nodes, + tpu_sender, + tpu_fwd_sender, + max_connections_per_peer, + max_tpu_staked_connections, + max_tpu_unstaked_connections, + max_fwd_staked_connections, + max_fwd_unstaked_connections, + max_streams_per_ms, + max_connections_per_ipaddr_per_min, + tpu_coalesce, + &identity_keypair, + exit, + ); + vortexor.join().unwrap(); +} diff --git a/vortexor/src/vortexor.rs b/vortexor/src/vortexor.rs new file mode 100644 index 00000000000000..8a86d15ecec055 --- /dev/null +++ b/vortexor/src/vortexor.rs @@ -0,0 +1,171 @@ +use { + crossbeam_channel::Sender, + solana_net_utils::{bind_in_range_with_config, bind_more_with_config, SocketConfig}, + solana_perf::packet::PacketBatch, + solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair}, + solana_streamer::{ + nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + quic::{spawn_server_multi, EndpointKeyUpdater, QuicServerParams}, + streamer::StakedNodes, + }, + std::{ + net::UdpSocket, + sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, + thread::{self, JoinHandle}, + time::Duration, + }, +}; + +pub struct TpuSockets { + pub tpu_quic: Vec, + pub tpu_quic_fwd: Vec, +} + +pub struct Vortexor { + thread_handles: Vec>, + key_update_notifier: Arc, +} + +struct KeyUpdateNotifier { + key_updaters: Mutex>>, +} + +impl KeyUpdateNotifier { + fn new(key_updaters: Vec>) -> Self { + Self { + key_updaters: Mutex::new(key_updaters), + } + } +} + +impl NotifyKeyUpdate for KeyUpdateNotifier { + fn update_key(&self, key: &Keypair) -> Result<(), Box> { + let updaters = self.key_updaters.lock().unwrap(); + for updater in updaters.iter() { + updater.update_key(key)? + } + Ok(()) + } +} + +impl Vortexor { + pub fn create_tpu_sockets( + bind_address: std::net::IpAddr, + dynamic_port_range: (u16, u16), + num_quic_endpoints: u64, + ) -> TpuSockets { + let quic_config = SocketConfig { reuseport: true }; + + let (_, tpu_quic) = + bind_in_range_with_config(bind_address, dynamic_port_range, quic_config.clone()) + .expect("expected bind to succeed"); + + let tpu_quic_port = tpu_quic.local_addr().unwrap().port(); + let tpu_quic = bind_more_with_config( + tpu_quic, + num_quic_endpoints.try_into().unwrap(), + quic_config.clone(), + ) + .unwrap(); + + let (_, tpu_quic_fwd) = bind_in_range_with_config( + bind_address, + (tpu_quic_port.saturating_add(1), dynamic_port_range.1), + quic_config.clone(), + ) + .expect("expected bind to succeed"); + + let tpu_quic_fwd = bind_more_with_config( + tpu_quic_fwd, + num_quic_endpoints.try_into().unwrap(), + quic_config, + ) + .unwrap(); + + TpuSockets { + tpu_quic, + tpu_quic_fwd, + } + } + + #[allow(clippy::too_many_arguments)] + pub fn create_vortexor( + tpu_sockets: TpuSockets, + staked_nodes: Arc>, + tpu_sender: Sender, + tpu_fwd_sender: Sender, + max_connections_per_peer: u64, + max_tpu_staked_connections: u64, + max_tpu_unstaked_connections: u64, + max_fwd_staked_connections: u64, + max_fwd_unstaked_connections: u64, + max_streams_per_ms: u64, + max_connections_per_ipaddr_per_min: u64, + tpu_coalesce: Duration, + identity_keypair: &Keypair, + exit: Arc, + ) -> Self { + let mut quic_server_params = QuicServerParams { + max_connections_per_peer: max_connections_per_peer.try_into().unwrap(), + max_staked_connections: max_tpu_staked_connections.try_into().unwrap(), + max_unstaked_connections: max_tpu_unstaked_connections.try_into().unwrap(), + max_streams_per_ms, + max_connections_per_ipaddr_per_min, + wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + coalesce: tpu_coalesce, + }; + + let TpuSockets { + tpu_quic, + tpu_quic_fwd, + } = tpu_sockets; + + let tpu_result = spawn_server_multi( + "solVtxTpu", + "quic_vortexor_tpu", + tpu_quic, + identity_keypair, + tpu_sender.clone(), + exit.clone(), + staked_nodes.clone(), + quic_server_params.clone(), + ) + .unwrap(); + + // Fot TPU forward -- we disallow unstaked connections. Allocate all connection resources + // for staked connections: + quic_server_params.max_staked_connections = max_fwd_staked_connections.try_into().unwrap(); + quic_server_params.max_unstaked_connections = + max_fwd_unstaked_connections.try_into().unwrap(); + let tpu_fwd_result = spawn_server_multi( + "solVtxTpuFwd", + "quic_vortexor_tpu_forwards", + tpu_quic_fwd, + identity_keypair, + tpu_fwd_sender, + exit.clone(), + staked_nodes.clone(), + quic_server_params, + ) + .unwrap(); + + Self { + thread_handles: vec![tpu_result.thread, tpu_fwd_result.thread], + key_update_notifier: Arc::new(KeyUpdateNotifier::new(vec![ + tpu_result.key_updater, + tpu_fwd_result.key_updater, + ])), + } + } + + pub fn get_key_update_notifier(&self) -> Arc { + self.key_update_notifier.clone() + } + + pub fn join(self) -> thread::Result<()> { + for t in self.thread_handles { + t.join()? + } + Ok(()) + } +} diff --git a/vortexor/tests/vortexor.rs b/vortexor/tests/vortexor.rs new file mode 100644 index 00000000000000..8d2f683c9e2006 --- /dev/null +++ b/vortexor/tests/vortexor.rs @@ -0,0 +1,76 @@ +use { + crossbeam_channel::unbounded, + solana_net_utils::VALIDATOR_PORT_RANGE, + solana_sdk::{net::DEFAULT_TPU_COALESCE, pubkey::Pubkey, signature::Keypair, signer::Signer}, + solana_streamer::{ + nonblocking::{ + quic::{DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS}, + testing_utilities::check_multiple_streams, + }, + quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + streamer::StakedNodes, + }, + solana_vortexor::{ + cli::{DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER, DEFAULT_NUM_QUIC_ENDPOINTS}, + vortexor::Vortexor, + }, + std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + }, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_vortexor() { + solana_logger::setup(); + + let bind_address = solana_net_utils::parse_host("127.0.0.1").expect("invalid bind_address"); + let keypair = Keypair::new(); + let exit = Arc::new(AtomicBool::new(false)); + + let (tpu_sender, tpu_receiver) = unbounded(); + let (tpu_fwd_sender, tpu_fwd_receiver) = unbounded(); + let tpu_sockets = Vortexor::create_tpu_sockets( + bind_address, + VALIDATOR_PORT_RANGE, + DEFAULT_NUM_QUIC_ENDPOINTS.try_into().unwrap(), + ); + + let tpu_address = tpu_sockets.tpu_quic[0].local_addr().unwrap(); + let tpu_fwd_address = tpu_sockets.tpu_quic_fwd[0].local_addr().unwrap(); + + let stakes = HashMap::from([(keypair.pubkey(), 10000)]); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::new( + Arc::new(stakes), + HashMap::::default(), // overrides + ))); + + let vortexor = Vortexor::create_vortexor( + tpu_sockets, + staked_nodes, + tpu_sender, + tpu_fwd_sender, + DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER.try_into().unwrap(), + MAX_STAKED_CONNECTIONS.try_into().unwrap(), + MAX_UNSTAKED_CONNECTIONS.try_into().unwrap(), + MAX_STAKED_CONNECTIONS + .saturating_add(MAX_UNSTAKED_CONNECTIONS) + .try_into() + .unwrap(), // max_fwd_staked_connections + 0, // max_fwd_unstaked_connections + DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, + DEFAULT_TPU_COALESCE, + &keypair, + exit.clone(), + ); + + check_multiple_streams(tpu_receiver, tpu_address, Some(&keypair)).await; + check_multiple_streams(tpu_fwd_receiver, tpu_fwd_address, Some(&keypair)).await; + + exit.store(true, Ordering::Relaxed); + vortexor.join().unwrap(); +}