Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test disable stream timeout #34497

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
0ef05f4
Add a test to use quic connect to a particular server
lijunwangs Aug 28, 2023
1881f0a
Add a test to use quic connect to a particular server
lijunwangs Aug 28, 2023
5b706d8
Target the dev cluster node
lijunwangs Aug 29, 2023
a0b9bf9
Temporarily setting connection limit to 8000 to invesitgate memory im…
lijunwangs Aug 29, 2023
e808264
Dump connection count in test and change count to 8000
Aug 29, 2023
c226fe4
Changed validator port range to 20k
lijunwangs Aug 29, 2023
77d95cf
do not send for cache warming
lijunwangs Aug 29, 2023
5552e89
tweak quic stream counts
lijunwangs Aug 30, 2023
37a2d1d
tweak quic stream counts
lijunwangs Aug 31, 2023
fddb7e9
Use connection cache
lijunwangs Sep 6, 2023
f074d16
Connection cache size on server side
lijunwangs Sep 6, 2023
c26facd
use PACKET_DATA_SIZE-1
Sep 8, 2023
55deec4
talk to the local server
lijunwangs Nov 16, 2023
a3ae3f4
Fixed a comp issue with tests
lijunwangs Nov 16, 2023
a112068
Do not finalize stream intentionally
lijunwangs Nov 20, 2023
9d6ee04
Debug connection -- why only 1 active connections?
lijunwangs Nov 20, 2023
268d485
Use write instead of write_all and see if it can hold the stream
lijunwangs Nov 20, 2023
0fc1ee0
Use a vector to hold the streams to avoid being dropped
lijunwangs Nov 20, 2023
7f92536
There is a hung when sending streams -- added debugs
lijunwangs Nov 20, 2023
c2dd30b
Change threads to 16
lijunwangs Nov 21, 2023
b82885c
Change quic-client worker threads to 4 to debug hung
lijunwangs Nov 21, 2023
a3be6a4
Change both thread count to 1
lijunwangs Nov 21, 2023
9fe1abd
Print send result
lijunwangs Nov 21, 2023
6e2c483
Print send result
lijunwangs Nov 21, 2023
1bc9100
Print send result
lijunwangs Nov 21, 2023
9c6fe41
Print thread
lijunwangs Nov 21, 2023
457bdfc
Print thread
lijunwangs Nov 21, 2023
cfaf738
Print thread
lijunwangs Nov 21, 2023
0fb7a35
More prints
lijunwangs Nov 21, 2023
3edc687
More prints
lijunwangs Nov 21, 2023
5a485c7
More prints
lijunwangs Nov 21, 2023
c6da838
More prints
lijunwangs Nov 21, 2023
87c4159
More prints
lijunwangs Nov 21, 2023
d70dd41
disable async connection cache create and random fetch. maybe I shoul…
lijunwangs Nov 21, 2023
aeed896
Changed chunk timeout to 3600 seconds
lijunwangs Nov 21, 2023
9bbad7e
Use quic_client directly
lijunwangs Nov 22, 2023
0690ac1
revert async change in cache
lijunwangs Nov 22, 2023
8b9012f
Use correct port
lijunwangs Nov 22, 2023
c2bc3e7
Use correct port: port forward
lijunwangs Nov 22, 2023
0e192b5
Limit streams to 512 exactly
lijunwangs Nov 22, 2023
e6346cd
Changes waiting for streams to 1 hour to maximize connecitons
lijunwangs Nov 22, 2023
2a965b7
Use 8010
lijunwangs Nov 22, 2023
d5f8762
Do not remove connection
lijunwangs Nov 22, 2023
6473214
Add debug message on timeout
lijunwangs Nov 22, 2023
f4f37ed
Use 8009
lijunwangs Nov 22, 2023
e446af7
Timeout handling
lijunwangs Nov 22, 2023
dc11354
Timeout handling; break -- it is causing hung in the runtime when the…
lijunwangs Nov 22, 2023
3516b18
Changed QUIC_MAX_TIMEOUT to 3600s
lijunwangs Nov 22, 2023
efb2206
Revert thread changes from client side
lijunwangs Nov 23, 2023
4ad75f0
Do not use timeout in handle_connection to see if it makes difference…
lijunwangs Nov 27, 2023
49dd96c
Do not use timeout in handle_connection to see if it makes difference…
lijunwangs Nov 27, 2023
6195cbf
try tokio::select
lijunwangs Dec 14, 2023
a7ce4b6
Try latest tokio version
lijunwangs Dec 14, 2023
67b8f06
Set worker threads to 4 for quic-server for ease of debug
lijunwangs Dec 16, 2023
7862296
disable sleep in receive chunk
lijunwangs Dec 17, 2023
5913fc5
Use 10 sleep interval
lijunwangs Dec 18, 2023
192abda
disable sleep
lijunwangs Dec 18, 2023
a6fdfea
disable sleep test
lijunwangs Dec 18, 2023
46d4e7c
Revert "Try latest tokio version"
lijunwangs Dec 18, 2023
bee5c4a
change sleep interval to 3600
lijunwangs Dec 19, 2023
8102675
change sleep interval to 60
lijunwangs Dec 19, 2023
c231570
revert server side hacks of timeouts
lijunwangs Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use {
};

// allow multiple connections for NAT and any open/close overlap
pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8000;

pub struct TpuSockets {
pub transactions: Vec<UdpSocket>,
Expand Down
2 changes: 1 addition & 1 deletion net-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct UdpSocketPair {

pub type PortRange = (u16, u16);

pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 20_000);
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 14; // VALIDATOR_PORT_RANGE must be at least this wide

pub(crate) const HEADER_LENGTH: usize = 4;
Expand Down
1 change: 1 addition & 0 deletions quic-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ tokio = { workspace = true, features = ["full"] }

[dev-dependencies]
crossbeam-channel = { workspace = true }
rayon = { workspace = true }
solana-logger = { workspace = true }
solana-perf = { workspace = true }
35 changes: 27 additions & 8 deletions quic-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol
//! and provides an interface for sending data which is restricted by the
//! server's flow control.

use {
async_mutex::Mutex,
async_trait::async_trait,
Expand All @@ -9,7 +10,7 @@ use {
log::*,
quinn::{
ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig,
IdleTimeout, TokioRuntime, TransportConfig, WriteError,
IdleTimeout, SendStream, TokioRuntime, TransportConfig, WriteError,
},
solana_connection_cache::{
client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
Expand Down Expand Up @@ -252,6 +253,7 @@ pub struct QuicClient {
addr: SocketAddr,
stats: Arc<ClientStats>,
chunk_size: usize,
streams: Mutex<Vec<SendStream>>,
}

impl QuicClient {
Expand All @@ -266,17 +268,26 @@ impl QuicClient {
addr,
stats: Arc::new(ClientStats::default()),
chunk_size,
streams: Vec::default().into(),
}
}

async fn _send_buffer_using_conn(
&self,
data: &[u8],
connection: &Connection,
) -> Result<(), QuicError> {
println!("Opening stream...");
let mut send_stream = connection.open_uni().await?;

send_stream.write_all(data).await?;
send_stream.finish().await?;
println!("Writing data...");

send_stream.write(data).await?;
// intentionally holding off finalizing the stream
// send_stream.finish().await?;
println!("Taking a lock");
let mut lock = self.streams.lock().await;
lock.push(send_stream);
println!("Stored send_stream");
Ok(())
}

Expand Down Expand Up @@ -339,12 +350,13 @@ impl QuicClient {
match conn {
Ok(conn) => {
*conn_guard = Some(conn.clone());
info!(
"Made connection to {} id {} try_count {}, from connection cache warming?: {}",
println!(
"Made connection to {} id {} try_count {}, from connection cache warming?: {} {:?}",
self.addr,
conn.connection.stable_id(),
connection_try_count,
data.is_empty(),
std::thread::current().id()
);
connection_try_count += 1;
conn.connection.clone()
Expand Down Expand Up @@ -395,7 +407,11 @@ impl QuicClient {
last_connection_id = connection.stable_id();
measure_prepare_connection.stop();

match Self::_send_buffer_using_conn(data, &connection).await {
println!(
"To call _send_buffer_using_conn... {:?}",
std::thread::current().id()
);
match self._send_buffer_using_conn(data, &connection).await {
Ok(()) => {
measure_send_packet.stop();
stats.successful_packets.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -500,7 +516,7 @@ impl QuicClient {
join_all(
buffs
.into_iter()
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
.map(|buf| self._send_buffer_using_conn(buf.as_ref(), connection_ref)),
)
})
.collect();
Expand Down Expand Up @@ -568,12 +584,15 @@ impl ClientConnection for QuicClientConnection {
}

async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
println!("Executing send_data_batch");
let stats = ClientStats::default();
let len = buffers.len();
let res = self
.client
.send_batch(buffers, &stats, self.connection_stats.clone())
.await;

println!("Result send_data_batch {res:?}");
self.connection_stats
.add_client_stats(&stats, len, res.is_ok());
res?;
Expand Down
1 change: 1 addition & 0 deletions quic-client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ lazy_static! {
static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
//.worker_threads(1)
.thread_name("quic-client")
.enable_all()
.build()
Expand Down
116 changes: 115 additions & 1 deletion quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ mod tests {
use {
crossbeam_channel::{unbounded, Receiver},
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::connection_cache_stats::ConnectionCacheStats,
solana_perf::packet::PacketBatch,
solana_quic_client::nonblocking::quic_client::{
QuicClientCertificate, QuicLazyInitializedEndpoint,
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_sdk::{
net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE,
quic::QUIC_MAX_STAKED_CONCURRENT_STREAMS, signature::Keypair,
},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate,
Expand Down Expand Up @@ -319,4 +323,114 @@ mod tests {
response_recv_thread.join().unwrap();
info!("Response receiver exited!");
}

#[tokio::test]
async fn test_connection_cache_memory_usage() {
use {
solana_connection_cache::client_connection::ClientConnection,
solana_quic_client::quic_client::QuicClientConnection,
};
solana_logger::setup();

let addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let port = 8009;
let tpu_addr = SocketAddr::new(addr, port);
let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
let mut clients = Vec::default();

let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(512)
.thread_name(|i| format!("concame{i:02}"))
.build()
.unwrap();

for i in 0..8000 {
println!("Connection {i}");
let client = QuicClientConnection::new(
Arc::new(QuicLazyInitializedEndpoint::default()),
tpu_addr,
connection_cache_stats.clone(),
);

// Send a full size packet with single byte writes.
let num_expected_packets: usize = 1;

thread_pool.install(|| {
(0..QUIC_MAX_STAKED_CONCURRENT_STREAMS)
.into_par_iter()
.for_each(|i| {
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
let rslt = client.send_data_batch(&packets);
if let Err(rslt) = rslt {
info!("Connection {i} error {rslt:?}");
}
});
});

clients.push(client);
}
}

#[tokio::test]
async fn test_connection_cache_memory_usage_2() {
use {
solana_connection_cache::client_connection::ClientConnection,
solana_quic_client::quic_client::QuicClientConnection,
};

solana_logger::setup();

// Request Sender, it uses the same endpoint as the response receiver:
let addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let port = 8009;
let tpu_addr = SocketAddr::new(addr, port);
let connection_cache_stats = Arc::new(ConnectionCacheStats::default());

let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.expect("Failed to initialize QUIC client certificates");
let client_certificate = Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
});

let endpoint = Arc::new(QuicLazyInitializedEndpoint::new(client_certificate, None));

let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(128)
.thread_name(|i| format!("concame{i:02}"))
.build()
.unwrap();

let mut clients = Vec::default();
for i in 0..8000 {
println!("Connection {i}");
let client = QuicClientConnection::new(
endpoint.clone(),
tpu_addr,
connection_cache_stats.clone(),
);
// Send a full size packet with single byte writes.
let num_expected_packets: usize = 1;

thread_pool.install(|| {
(0..QUIC_MAX_STAKED_CONCURRENT_STREAMS)
.into_par_iter()
.for_each(|_i| {
let packets = vec![
vec![0u8; PACKET_DATA_SIZE-1 /*PACKET_DATA_SIZE*/];
num_expected_packets
];
println!("Sending data in thread {:?}", std::thread::current().id());
let rslt = client.send_data_batch(&packets);
println!("Sent data: {rslt:?}");
if let Err(rslt) = rslt {
info!("Connection {i} error {_i} {rslt:?}");
}
});
});

clients.push(client);
}
}
}
8 changes: 4 additions & 4 deletions sdk/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
// Empirically found max number of concurrent streams
// that seems to maximize TPS on GCE (higher values don't seem to
// give significant improvement or seem to impact stability)
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 512;
pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 512;

pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000;
pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 1_024_000;

// Set the maximum concurrent stream numbers to avoid excessive streams.
// The value was lowered from 2048 to reduce contention of the limited
// receive_window among the streams which is observed in CI bench-tests with
// forwarded packets from staked nodes.
pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512;

pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(2);
pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(3600);
pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(1);

// Based on commonly-used handshake timeouts for various TCP
Expand Down
Loading
Loading