Skip to content

Commit

Permalink
Initial implementations of the TPU Vortexor (#3258)
Browse files Browse the repository at this point in the history
* Initial implementations of the TPU Vortexor
  • Loading branch information
lijunwangs authored Dec 22, 2024
1 parent a2d88af commit f090e77
Show file tree
Hide file tree
Showing 12 changed files with 837 additions and 73 deletions.
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ members = [
"upload-perf",
"validator",
"version",
"vortexor",
"vote",
"watchtower",
"wen-restart",
Expand Down
48 changes: 3 additions & 45 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1525,8 +1525,8 @@ pub mod test {
nonblocking::{
quic::compute_max_allowed_uni_streams,
testing_utilities::{
get_client_config, make_client_endpoint, setup_quic_server,
SpawnTestServerResult, TestServerConfig,
check_multiple_streams, get_client_config, make_client_endpoint,
setup_quic_server, SpawnTestServerResult, TestServerConfig,
},
},
quic::DEFAULT_TPU_COALESCE,
Expand Down Expand Up @@ -1589,48 +1589,6 @@ pub mod test {
}
}

pub async fn check_multiple_streams(
receiver: Receiver<PacketBatch>,
server_address: SocketAddr,
) {
let conn1 = Arc::new(make_client_endpoint(&server_address, None).await);
let conn2 = Arc::new(make_client_endpoint(&server_address, None).await);
let mut num_expected_packets = 0;
for i in 0..10 {
info!("sending: {}", i);
let c1 = conn1.clone();
let c2 = conn2.clone();
let mut s1 = c1.open_uni().await.unwrap();
let mut s2 = c2.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().unwrap();
s2.write_all(&[0u8]).await.unwrap();
s2.finish().unwrap();
num_expected_packets += 2;
sleep(Duration::from_millis(200)).await;
}
let mut all_packets = vec![];
let now = Instant::now();
let mut total_packets = 0;
while now.elapsed().as_secs() < 10 {
if let Ok(packets) = receiver.try_recv() {
total_packets += packets.len();
all_packets.push(packets)
} else {
sleep(Duration::from_secs(1)).await;
}
if total_packets == num_expected_packets {
break;
}
}
for batch in all_packets {
for p in batch.iter() {
assert_eq!(p.meta().size, 1);
}
}
assert_eq!(total_packets, num_expected_packets);
}

pub async fn check_multiple_writes(
receiver: Receiver<PacketBatch>,
server_address: SocketAddr,
Expand Down Expand Up @@ -2049,7 +2007,7 @@ pub mod test {
)
.unwrap();

check_multiple_streams(receiver, server_address).await;
check_multiple_streams(receiver, server_address, None).await;
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0);
assert_eq!(stats.total_new_streams.load(Ordering::Relaxed), 20);
assert_eq!(stats.total_connections.load(Ordering::Relaxed), 2);
Expand Down
96 changes: 71 additions & 25 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
},
streamer::StakedNodes,
},
crossbeam_channel::unbounded,
crossbeam_channel::{unbounded, Receiver},
quinn::{
crypto::rustls::QuicClientConfig, ClientConfig, Connection, EndpointConfig, IdleTimeout,
TokioRuntime, TransportConfig,
Expand All @@ -25,8 +25,9 @@ use {
std::{
net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicBool, Arc, RwLock},
time::{Duration, Instant},
},
tokio::task::JoinHandle,
tokio::{task::JoinHandle, time::sleep},
};

pub fn get_client_config(keypair: &Keypair) -> ClientConfig {
Expand Down Expand Up @@ -79,33 +80,35 @@ pub struct SpawnTestServerResult {
pub stats: Arc<StreamerStats>,
}

pub fn create_quic_server_sockets() -> Vec<UdpSocket> {
#[cfg(not(target_os = "windows"))]
{
use {
solana_net_utils::bind_to,
std::net::{IpAddr, Ipv4Addr},
};
(0..10)
.map(|_| {
bind_to(
IpAddr::V4(Ipv4Addr::LOCALHOST),
/*port*/ 0,
/*reuseport:*/ true,
)
.unwrap()
})
.collect::<Vec<_>>()
}
#[cfg(target_os = "windows")]
{
vec![bind_to_localhost().unwrap()]
}
}

pub fn setup_quic_server(
option_staked_nodes: Option<StakedNodes>,
config: TestServerConfig,
) -> SpawnTestServerResult {
let sockets = {
#[cfg(not(target_os = "windows"))]
{
use {
solana_net_utils::bind_to,
std::net::{IpAddr, Ipv4Addr},
};
(0..10)
.map(|_| {
bind_to(
IpAddr::V4(Ipv4Addr::LOCALHOST),
/*port*/ 0,
/*reuseport:*/ true,
)
.unwrap()
})
.collect::<Vec<_>>()
}
#[cfg(target_os = "windows")]
{
vec![bind_to_localhost().unwrap()]
}
};
let sockets = create_quic_server_sockets();
setup_quic_server_with_sockets(sockets, option_staked_nodes, config)
}

Expand Down Expand Up @@ -180,3 +183,46 @@ pub async fn make_client_endpoint(
.await
.expect("Test server should be already listening on 'localhost'")
}

pub async fn check_multiple_streams(
receiver: Receiver<PacketBatch>,
server_address: SocketAddr,
client_keypair: Option<&Keypair>,
) {
let conn1 = Arc::new(make_client_endpoint(&server_address, client_keypair).await);
let conn2 = Arc::new(make_client_endpoint(&server_address, client_keypair).await);
let mut num_expected_packets = 0;
for i in 0..10 {
info!("sending: {}", i);
let c1 = conn1.clone();
let c2 = conn2.clone();
let mut s1 = c1.open_uni().await.unwrap();
let mut s2 = c2.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().unwrap();
s2.write_all(&[0u8]).await.unwrap();
s2.finish().unwrap();
num_expected_packets += 2;
sleep(Duration::from_millis(200)).await;
}
let mut all_packets = vec![];
let now = Instant::now();
let mut total_packets = 0;
while now.elapsed().as_secs() < 10 {
if let Ok(packets) = receiver.try_recv() {
total_packets += packets.len();
all_packets.push(packets)
} else {
sleep(Duration::from_secs(1)).await;
}
if total_packets == num_expected_packets {
break;
}
}
for batch in all_packets {
for p in batch.iter() {
assert_eq!(p.meta().size, 1);
}
}
assert_eq!(total_packets, num_expected_packets);
}
10 changes: 7 additions & 3 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ pub fn spawn_server(
)
}

#[derive(Clone)]
pub struct QuicServerParams {
pub max_connections_per_peer: usize,
pub max_staked_connections: usize,
Expand Down Expand Up @@ -632,8 +633,11 @@ pub fn spawn_server_multi(
#[cfg(test)]
mod test {
use {
super::*, crate::nonblocking::quic::test::*, crossbeam_channel::unbounded,
solana_net_utils::bind_to_localhost, std::net::SocketAddr,
super::*,
crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams},
crossbeam_channel::unbounded,
solana_net_utils::bind_to_localhost,
std::net::SocketAddr,
};

fn setup_quic_server() -> (
Expand Down Expand Up @@ -723,7 +727,7 @@ mod test {
.unwrap();

let runtime = rt("solQuicTestRt".to_string());
runtime.block_on(check_multiple_streams(receiver, server_address));
runtime.block_on(check_multiple_streams(receiver, server_address, None));
exit.store(true, Ordering::Relaxed);
t.join().unwrap();
}
Expand Down
61 changes: 61 additions & 0 deletions vortexor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[package]
name = "solana-vortexor"
description = "Solana TPU Vortexor"
documentation = "https://docs.rs/solana-vortexor"
default-run = "solana-vortexor"
publish = false
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
governor = { workspace = true }
histogram = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
nix = { workspace = true, features = ["net"] }
pem = { workspace = true }
percentage = { workspace = true }
quinn = { workspace = true }
quinn-proto = { workspace = true }
rand = { workspace = true }
rustls = { workspace = true }
smallvec = { workspace = true }
socket2 = { workspace = true }
solana-clap-utils = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
solana-version = { workspace = true }

thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
x509-parser = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
solana-logger = { workspace = true }
solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }

[lib]
crate-type = ["lib"]
name = "solana_vortexor"

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
Loading

0 comments on commit f090e77

Please sign in to comment.