From 92cfd0cf6a32c29085e8a36aea788a0680294f2a Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 12:23:41 +0100 Subject: [PATCH 01/10] Add tpu-client-next to the root Cargo.toml --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 8778a641a570dd..b7f09cf260278a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -539,6 +539,7 @@ solana-test-validator = { path = "test-validator", version = "=2.2.0" } solana-thin-client = { path = "thin-client", version = "=2.2.0" } solana-transaction-error = { path = "sdk/transaction-error", version = "=2.2.0" } solana-tpu-client = { path = "tpu-client", version = "=2.2.0", default-features = false } +solana-tpu-client-next = { path = "tpu-client-next", version = "=2.2.0" } solana-transaction-status = { path = "transaction-status", version = "=2.2.0" } solana-transaction-status-client-types = { path = "transaction-status-client-types", version = "=2.2.0" } solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.2.0" } From 20a9c5299f8f844baf2e9dfb08b7b118e6d1d319 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 14:11:07 +0100 Subject: [PATCH 02/10] Change LeaderUpdater trait to accept mut self --- tpu-client-next/src/leader_updater.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs index 5e07b9b0bfe612..63d0938da46d51 100644 --- a/tpu-client-next/src/leader_updater.rs +++ b/tpu-client-next/src/leader_updater.rs @@ -35,7 +35,7 @@ pub trait LeaderUpdater: Send { /// If the current leader estimation is incorrect and transactions are sent to /// only one estimated leader, there is a risk of losing all the transactions, /// depending on the forwarding policy. - fn next_leaders(&self, lookahead_slots: u64) -> Vec; + fn next_leaders(&mut self, lookahead_slots: u64) -> Vec; /// Stop [`LeaderUpdater`] and releases all associated resources. async fn stop(&mut self); @@ -98,7 +98,7 @@ struct LeaderUpdaterService { #[async_trait] impl LeaderUpdater for LeaderUpdaterService { - fn next_leaders(&self, lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, lookahead_slots: u64) -> Vec { self.leader_tpu_service.leader_tpu_sockets(lookahead_slots) } @@ -116,7 +116,7 @@ struct PinnedLeaderUpdater { #[async_trait] impl LeaderUpdater for PinnedLeaderUpdater { - fn next_leaders(&self, _lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, _lookahead_slots: u64) -> Vec { self.address.clone() } From bb3b34f6ac0e1a21f81f2a04c4e8800e083047f9 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Nov 2024 10:38:11 +0100 Subject: [PATCH 03/10] add fanout to the tpu-client-next --- .../src/connection_workers_scheduler.rs | 91 +++++++++++++------ .../connection_workers_scheduler_test.rs | 6 +- 2 files changed, 65 insertions(+), 32 deletions(-) diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 82b038827b48eb..c822a623eed4d6 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -39,6 +39,15 @@ pub enum ConnectionWorkersSchedulerError { LeaderReceiverDropped, } +/// This enum defines to how many discovered leaders we will send transactions. +pub enum LeadersFanout { + /// Send transactions to all the leaders discovered by the `next_leaders` + /// call. + All, + /// Send transactions to the first selected number of leaders. + Selected(usize), +} + /// Configuration for the [`ConnectionWorkersScheduler`]. /// /// This struct holds the necessary settings to initialize and manage connection @@ -70,6 +79,9 @@ pub struct ConnectionWorkersSchedulerConfig { /// procedure. Determines how far into the future leaders are estimated, /// allowing connections to be established with those leaders in advance. pub lookahead_slots: u64, + + /// The number of leaders to send transactions to. + pub leaders_fanout: LeadersFanout, } impl ConnectionWorkersScheduler { @@ -91,6 +103,7 @@ impl ConnectionWorkersScheduler { worker_channel_size, max_reconnect_attempts, lookahead_slots, + leaders_fanout, }: ConnectionWorkersSchedulerConfig, mut leader_updater: Box, mut transaction_receiver: mpsc::Receiver, @@ -115,39 +128,41 @@ impl ConnectionWorkersScheduler { } }; let updated_leaders = leader_updater.next_leaders(lookahead_slots); - let new_leader = &updated_leaders[0]; - let future_leaders = &updated_leaders[1..]; - if !workers.contains(new_leader) { - debug!("No existing workers for {new_leader:?}, starting a new one."); - let worker = Self::spawn_worker( - &endpoint, - new_leader, - worker_channel_size, - skip_check_transaction_age, - max_reconnect_attempts, - ); - workers.push(*new_leader, worker).await; - } - tokio::select! { - send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res { - Ok(()) => (), - Err(WorkersCacheError::ShutdownError) => { - debug!("Connection to {new_leader} was closed, worker cache shutdown"); - } - Err(err) => { - warn!("Connection to {new_leader} was closed, worker error: {err}"); - // If we has failed to send batch, it will be dropped. - } - }, - () = cancel.cancelled() => { - debug!("Cancelled: Shutting down"); - break; + let (new_leaders, future_leaders) = split_leaders(&updated_leaders, &leaders_fanout); + for new_leader in new_leaders { + if !workers.contains(new_leader) { + debug!("No existing workers for {new_leader:?}, starting a new one."); + let worker = Self::spawn_worker( + &endpoint, + new_leader, + worker_channel_size, + skip_check_transaction_age, + max_reconnect_attempts, + ); + workers.push(*new_leader, worker).await; } - }; - // Regardless of who is leader, add future leaders to the cache to - // hide the latency of opening the connection. + tokio::select! { + send_res = workers.send_transactions_to_address(new_leader, transaction_batch.clone()) => match send_res { + Ok(()) => (), + Err(WorkersCacheError::ShutdownError) => { + debug!("Connection to {new_leader} was closed, worker cache shutdown"); + } + Err(err) => { + warn!("Connection to {new_leader} was closed, worker error: {err}"); + // If we has failed to send batch, it will be dropped. + } + }, + () = cancel.cancelled() => { + debug!("Cancelled: Shutting down"); + break; + } + }; + } + + // add future leaders to the cache to hide the latency of opening the + // connection. for peer in future_leaders { if !workers.contains(peer) { let worker = Self::spawn_worker( @@ -211,3 +226,19 @@ impl ConnectionWorkersScheduler { WorkerInfo::new(txs_sender, handle, cancel) } } + +/// Splits the input vector of leaders into two parts based on the `fanout` configuration: +/// * the first vector contains the leaders to which transactions will be sent. +/// * the second vector contains the remaining leaders, used to warm up connections. +fn split_leaders<'a>( + leaders: &'a [SocketAddr], + fanout: &'a LeadersFanout, +) -> (Vec<&'a SocketAddr>, Vec<&'a SocketAddr>) { + match fanout { + LeadersFanout::All => (leaders.iter().collect(), Vec::new()), // All elements go to the first vector + LeadersFanout::Selected(count) => { + let (selected, remaining) = leaders.split_at((*count).min(leaders.len())); // Split at the specified count or max length + (selected.iter().collect(), remaining.iter().collect()) + } + } +} diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 4944a80542ce87..be83992f0a6601 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -16,8 +16,9 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::ConnectionWorkersSchedulerConfig, - leader_updater::create_leader_updater, transaction_batch::TransactionBatch, + connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, LeadersFanout::Selected}, + leader_updater::create_leader_updater, + transaction_batch::TransactionBatch, ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats, SendTransactionStatsPerAddr, }, @@ -49,6 +50,7 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule worker_channel_size: 2, max_reconnect_attempts: 4, lookahead_slots: 1, + leaders_fanout: Selected(1), } } From 42755bc6db0feb776a4f7de4f8394862f42e9d96 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Nov 2024 12:30:38 +0100 Subject: [PATCH 04/10] Shutdown in separate task --- tpu-client-next/src/connection_worker.rs | 26 +-- .../src/connection_workers_scheduler.rs | 22 ++- tpu-client-next/src/send_transaction_stats.rs | 186 +++++++++++------- tpu-client-next/src/workers_cache.rs | 83 ++++---- .../connection_workers_scheduler_test.rs | 34 ++-- 5 files changed, 204 insertions(+), 147 deletions(-) diff --git a/tpu-client-next/src/connection_worker.rs b/tpu-client-next/src/connection_worker.rs index 7d77bc3f6ed2a2..25d2a087c0f8e6 100644 --- a/tpu-client-next/src/connection_worker.rs +++ b/tpu-client-next/src/connection_worker.rs @@ -14,7 +14,10 @@ use { clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, timing::timestamp, }, - std::net::SocketAddr, + std::{ + net::SocketAddr, + sync::{atomic::Ordering, Arc}, + }, tokio::{ sync::mpsc, time::{sleep, Duration}, @@ -72,7 +75,7 @@ pub(crate) struct ConnectionWorker { connection: ConnectionState, skip_check_transaction_age: bool, max_reconnect_attempts: usize, - send_txs_stats: SendTransactionStats, + send_txs_stats: Arc, cancel: CancellationToken, } @@ -93,6 +96,7 @@ impl ConnectionWorker { transactions_receiver: mpsc::Receiver, skip_check_transaction_age: bool, max_reconnect_attempts: usize, + send_txs_stats: Arc, ) -> (Self, CancellationToken) { let cancel = CancellationToken::new(); @@ -103,7 +107,7 @@ impl ConnectionWorker { connection: ConnectionState::NotSetup, skip_check_transaction_age, max_reconnect_attempts, - send_txs_stats: SendTransactionStats::default(), + send_txs_stats, cancel: cancel.clone(), }; @@ -155,11 +159,6 @@ impl ConnectionWorker { } } - /// Retrieves the statistics for transactions sent by this worker. - pub fn transaction_stats(&self) -> &SendTransactionStats { - &self.send_txs_stats - } - /// Sends a batch of transactions using the provided `connection`. /// /// Each transaction in the batch is sent over the QUIC streams one at the @@ -183,11 +182,12 @@ impl ConnectionWorker { if let Err(error) = result { trace!("Failed to send transaction over stream with error: {error}."); - record_error(error, &mut self.send_txs_stats); + record_error(error, &self.send_txs_stats); self.connection = ConnectionState::Retry(0); } else { - self.send_txs_stats.successfully_sent = - self.send_txs_stats.successfully_sent.saturating_add(1); + self.send_txs_stats + .successfully_sent + .fetch_add(1, Ordering::Relaxed); } } measure_send.stop(); @@ -221,14 +221,14 @@ impl ConnectionWorker { } Err(err) => { warn!("Connection error {}: {}", self.peer, err); - record_error(err.into(), &mut self.send_txs_stats); + record_error(err.into(), &self.send_txs_stats); self.connection = ConnectionState::Retry(max_retries_attempt.saturating_add(1)); } } } Err(connecting_error) => { - record_error(connecting_error.clone().into(), &mut self.send_txs_stats); + record_error(connecting_error.clone().into(), &self.send_txs_stats); match connecting_error { ConnectError::EndpointStopping => { debug!("Endpoint stopping, exit connection worker."); diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index c822a623eed4d6..3e89a01c939b4c 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -10,6 +10,7 @@ use { }, transaction_batch::TransactionBatch, workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError}, + SendTransactionStats, }, log::*, quinn::Endpoint, @@ -112,6 +113,7 @@ impl ConnectionWorkersScheduler { let endpoint = Self::setup_endpoint(bind, validator_identity)?; debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); let mut workers = WorkersCache::new(num_connections, cancel.clone()); + let mut send_stats_per_addr = SendTransactionStatsPerAddr::new(); loop { let transaction_batch = tokio::select! { @@ -133,14 +135,25 @@ impl ConnectionWorkersScheduler { for new_leader in new_leaders { if !workers.contains(new_leader) { debug!("No existing workers for {new_leader:?}, starting a new one."); + let stats = send_stats_per_addr.entry(new_leader.ip()).or_default(); let worker = Self::spawn_worker( &endpoint, new_leader, worker_channel_size, skip_check_transaction_age, max_reconnect_attempts, + stats.clone(), ); - workers.push(*new_leader, worker).await; + let shutdown_worker = workers.push(*new_leader, worker).await; + if let Some(shutdown_worker) = shutdown_worker { + tokio::spawn(async move { + let leader = shutdown_worker.leader(); + let res = shutdown_worker.shutdown().await; + if let Err(err) = res { + debug!("Error while shutting down worker for {leader}: {err}"); + } + }); + } } tokio::select! { @@ -165,12 +178,14 @@ impl ConnectionWorkersScheduler { // connection. for peer in future_leaders { if !workers.contains(peer) { + let stats = send_stats_per_addr.entry(peer.ip()).or_default(); let worker = Self::spawn_worker( &endpoint, peer, worker_channel_size, skip_check_transaction_age, max_reconnect_attempts, + stats.clone(), ); workers.push(*peer, worker).await; } @@ -181,7 +196,7 @@ impl ConnectionWorkersScheduler { endpoint.close(0u32.into(), b"Closing connection"); leader_updater.stop().await; - Ok(workers.transaction_stats().clone()) + Ok(send_stats_per_addr) } /// Sets up the QUIC endpoint for the scheduler to handle connections. @@ -206,6 +221,7 @@ impl ConnectionWorkersScheduler { worker_channel_size: usize, skip_check_transaction_age: bool, max_reconnect_attempts: usize, + stats: Arc, ) -> WorkerInfo { let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size); let endpoint = endpoint.clone(); @@ -217,10 +233,10 @@ impl ConnectionWorkersScheduler { txs_receiver, skip_check_transaction_age, max_reconnect_attempts, + stats, ); let handle = tokio::spawn(async move { worker.run().await; - worker.transaction_stats().clone() }); WorkerInfo::new(txs_sender, handle, cancel) diff --git a/tpu-client-next/src/send_transaction_stats.rs b/tpu-client-next/src/send_transaction_stats.rs index abe68b8bf60213..43711f3e385851 100644 --- a/tpu-client-next/src/send_transaction_stats.rs +++ b/tpu-client-next/src/send_transaction_stats.rs @@ -4,86 +4,118 @@ use { super::QuicError, quinn::{ConnectError, ConnectionError, WriteError}, - std::{collections::HashMap, fmt, net::IpAddr}, + std::{ + collections::HashMap, + fmt, + net::IpAddr, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + }, }; /// [`SendTransactionStats`] aggregates counters related to sending transactions. -#[derive(Debug, Default, Clone, PartialEq)] +#[derive(Debug, Default)] pub struct SendTransactionStats { - pub successfully_sent: u64, - pub connect_error_cids_exhausted: u64, - pub connect_error_invalid_remote_address: u64, - pub connect_error_other: u64, - pub connection_error_application_closed: u64, - pub connection_error_cids_exhausted: u64, - pub connection_error_connection_closed: u64, - pub connection_error_locally_closed: u64, - pub connection_error_reset: u64, - pub connection_error_timed_out: u64, - pub connection_error_transport_error: u64, - pub connection_error_version_mismatch: u64, - pub write_error_closed_stream: u64, - pub write_error_connection_lost: u64, - pub write_error_stopped: u64, - pub write_error_zero_rtt_rejected: u64, + pub successfully_sent: AtomicU64, + pub connect_error_cids_exhausted: AtomicU64, + pub connect_error_invalid_remote_address: AtomicU64, + pub connect_error_other: AtomicU64, + pub connection_error_application_closed: AtomicU64, + pub connection_error_cids_exhausted: AtomicU64, + pub connection_error_connection_closed: AtomicU64, + pub connection_error_locally_closed: AtomicU64, + pub connection_error_reset: AtomicU64, + pub connection_error_timed_out: AtomicU64, + pub connection_error_transport_error: AtomicU64, + pub connection_error_version_mismatch: AtomicU64, + pub write_error_closed_stream: AtomicU64, + pub write_error_connection_lost: AtomicU64, + pub write_error_stopped: AtomicU64, + pub write_error_zero_rtt_rejected: AtomicU64, } #[allow(clippy::arithmetic_side_effects)] -pub fn record_error(err: QuicError, stats: &mut SendTransactionStats) { +pub fn record_error(err: QuicError, stats: &SendTransactionStats) { match err { QuicError::Connect(ConnectError::EndpointStopping) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::CidsExhausted) => { - stats.connect_error_cids_exhausted += 1; + stats + .connect_error_cids_exhausted + .fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::InvalidServerName(_)) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::InvalidRemoteAddress(_)) => { - stats.connect_error_invalid_remote_address += 1; + stats + .connect_error_invalid_remote_address + .fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::NoDefaultClientConfig) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::UnsupportedVersion) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::VersionMismatch) => { - stats.connection_error_version_mismatch += 1; + stats + .connection_error_version_mismatch + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::TransportError(_)) => { - stats.connection_error_transport_error += 1; + stats + .connection_error_transport_error + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::ConnectionClosed(_)) => { - stats.connection_error_connection_closed += 1; + stats + .connection_error_connection_closed + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::ApplicationClosed(_)) => { - stats.connection_error_application_closed += 1; + stats + .connection_error_application_closed + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::Reset) => { - stats.connection_error_reset += 1; + stats.connection_error_reset.fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::TimedOut) => { - stats.connection_error_timed_out += 1; + stats + .connection_error_timed_out + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::LocallyClosed) => { - stats.connection_error_locally_closed += 1; + stats + .connection_error_locally_closed + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::CidsExhausted) => { - stats.connection_error_cids_exhausted += 1; + stats + .connection_error_cids_exhausted + .fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::Stopped(_)) => { - stats.write_error_stopped += 1; + stats.write_error_stopped.fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::ConnectionLost(_)) => { - stats.write_error_connection_lost += 1; + stats + .write_error_connection_lost + .fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::ClosedStream) => { - stats.write_error_closed_stream += 1; + stats + .write_error_closed_stream + .fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::ZeroRttRejected) => { - stats.write_error_zero_rtt_rejected += 1; + stats + .write_error_zero_rtt_rejected + .fetch_add(1, Ordering::Relaxed); } // Endpoint is created on the scheduler level and handled separately // No counters are used for this case. @@ -91,39 +123,7 @@ pub fn record_error(err: QuicError, stats: &mut SendTransactionStats) { } } -pub type SendTransactionStatsPerAddr = HashMap; - -macro_rules! add_fields { - ($self:ident += $other:ident for: $( $field:ident ),* $(,)? ) => { - $( - $self.$field = $self.$field.saturating_add($other.$field); - )* - }; -} - -impl SendTransactionStats { - pub fn add(&mut self, other: &SendTransactionStats) { - add_fields!( - self += other for: - successfully_sent, - connect_error_cids_exhausted, - connect_error_invalid_remote_address, - connect_error_other, - connection_error_application_closed, - connection_error_cids_exhausted, - connection_error_connection_closed, - connection_error_locally_closed, - connection_error_reset, - connection_error_timed_out, - connection_error_transport_error, - connection_error_version_mismatch, - write_error_closed_stream, - write_error_connection_lost, - write_error_stopped, - write_error_zero_rtt_rejected, - ); - } -} +pub type SendTransactionStatsPerAddr = HashMap>; macro_rules! display_send_transaction_stats_body { ($self:ident, $f:ident, $($field:ident),* $(,)?) => { @@ -135,7 +135,7 @@ macro_rules! display_send_transaction_stats_body { "\x20 ", stringify!($field), ": {},\n", )* ), - $($self.$field),* + $($self.$field.load(Ordering::Relaxed)),* ) }; } @@ -164,3 +164,47 @@ impl fmt::Display for SendTransactionStats { ) } } + +/// For tests it is useful to be have PartialEq but we cannot have it on top of +/// atomics. This macro creates a structure with the same attributes but of type +/// u64. +macro_rules! define_non_atomic_struct { + ($name:ident, $atomic_name:ident, {$($field:ident),* $(,)?}) => { + #[derive(Debug, Default, PartialEq)] + pub struct $name { + $(pub $field: u64),* + } + + impl $atomic_name { + pub fn to_non_atomic(&self) -> $name { + $name { + $($field: self.$field.load(Ordering::Relaxed)),* + } + } + } + }; +} + +// Define the non-atomic struct and the `to_non_atomic` conversion method +define_non_atomic_struct!( + SendTransactionStatsNonAtomic, + SendTransactionStats, + { + successfully_sent, + connect_error_cids_exhausted, + connect_error_invalid_remote_address, + connect_error_other, + connection_error_application_closed, + connection_error_cids_exhausted, + connection_error_connection_closed, + connection_error_locally_closed, + connection_error_reset, + connection_error_timed_out, + connection_error_transport_error, + connection_error_version_mismatch, + write_error_closed_stream, + write_error_connection_lost, + write_error_stopped, + write_error_zero_rtt_rejected + } +); diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs index 90d2954b669d7f..ed9b68721cae26 100644 --- a/tpu-client-next/src/workers_cache.rs +++ b/tpu-client-next/src/workers_cache.rs @@ -3,14 +3,10 @@ //! batches, and gathering send transaction statistics. use { - super::SendTransactionStats, crate::transaction_batch::TransactionBatch, log::*, lru::LruCache, - std::{ - collections::HashMap, - net::{IpAddr, SocketAddr}, - }, + std::net::SocketAddr, thiserror::Error, tokio::{sync::mpsc, task::JoinHandle}, tokio_util::sync::CancellationToken, @@ -20,14 +16,14 @@ use { /// transaction batches. pub(crate) struct WorkerInfo { pub sender: mpsc::Sender, - pub handle: JoinHandle, + pub handle: JoinHandle<()>, pub cancel: CancellationToken, } impl WorkerInfo { pub fn new( sender: mpsc::Sender, - handle: JoinHandle, + handle: JoinHandle<()>, cancel: CancellationToken, ) -> Self { Self { @@ -50,14 +46,13 @@ impl WorkerInfo { /// Closes the worker by dropping the sender and awaiting the worker's /// statistics. - async fn shutdown(self) -> Result { + async fn shutdown(self) -> Result<(), WorkersCacheError> { self.cancel.cancel(); drop(self.sender); - let stats = self - .handle + self.handle .await .map_err(|_| WorkersCacheError::TaskJoinFailure)?; - Ok(stats) + Ok(()) } } @@ -65,7 +60,6 @@ impl WorkerInfo { /// manage workers. It also tracks transaction statistics for each peer. pub(crate) struct WorkersCache { workers: LruCache, - send_stats_per_addr: HashMap, /// Indicates that the `WorkersCache` is been `shutdown()`, interrupting any outstanding /// `send_txs()` invocations. @@ -86,32 +80,35 @@ pub enum WorkersCacheError { } impl WorkersCache { - pub fn new(capacity: usize, cancel: CancellationToken) -> Self { + pub(crate) fn new(capacity: usize, cancel: CancellationToken) -> Self { Self { workers: LruCache::new(capacity), - send_stats_per_addr: HashMap::new(), cancel, } } - pub fn contains(&self, peer: &SocketAddr) -> bool { + pub(crate) fn contains(&self, peer: &SocketAddr) -> bool { self.workers.contains(peer) } - pub async fn push(&mut self, peer: SocketAddr, peer_worker: WorkerInfo) { - // Although there might be concerns about the performance implications - // of waiting for the worker to be closed when trying to add a new - // worker, the idea is that these workers are almost always created in - // advance so the latency is hidden. + pub(crate) async fn push( + &mut self, + peer: SocketAddr, + peer_worker: WorkerInfo, + ) -> Option { if let Some((leader, popped_worker)) = self.workers.push(peer, peer_worker) { - self.shutdown_worker(leader, popped_worker).await; + return Some(ShutdownWorker { + leader, + worker: popped_worker, + }); } + None } /// Sends a batch of transactions to the worker for a given peer. If the /// worker for the peer is disconnected or fails, it is removed from the /// cache. - pub async fn send_transactions_to_address( + pub(crate) async fn send_transactions_to_address( &mut self, peer: &SocketAddr, txs_batch: TransactionBatch, @@ -148,37 +145,35 @@ impl WorkersCache { } } - pub fn transaction_stats(&self) -> &HashMap { - &self.send_stats_per_addr - } - /// Closes and removes all workers in the cache. This is typically done when /// shutting down the system. - pub async fn shutdown(&mut self) { - // Interrupt any outstanding `send_txs()` calls. + pub(crate) async fn shutdown(&mut self) { + // Interrupt any outstanding `send_transactions()` calls. self.cancel.cancel(); while let Some((leader, worker)) = self.workers.pop_lru() { - self.shutdown_worker(leader, worker).await; + let res = worker.shutdown().await; + if let Err(err) = res { + debug!("Error while shutting down worker for {leader}: {err}"); + } } } +} - /// Shuts down a worker for a given peer by closing the worker and gathering - /// its transaction statistics. - async fn shutdown_worker(&mut self, leader: SocketAddr, worker: WorkerInfo) { - let res = worker.shutdown().await; +/// [`ShutdownWorker`] takes care of stopping the worker. It's method +/// `shutdown()` should be executed in a separate task to hide the latency of +/// finishing worker gracefully. +pub(crate) struct ShutdownWorker { + leader: SocketAddr, + worker: WorkerInfo, +} - let stats = match res { - Ok(stats) => stats, - Err(err) => { - debug!("Error while shutting down worker for {leader}: {err}"); - return; - } - }; +impl ShutdownWorker { + pub(crate) fn leader(&self) -> SocketAddr { + self.leader + } - self.send_stats_per_addr - .entry(leader.ip()) - .and_modify(|e| e.add(&stats)) - .or_insert(stats); + pub(crate) async fn shutdown(self) -> Result<(), WorkersCacheError> { + self.worker.shutdown().await } } diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index be83992f0a6601..dad037a0867ca7 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -18,6 +18,7 @@ use { solana_tpu_client_next::{ connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, LeadersFanout::Selected}, leader_updater::create_leader_updater, + send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats, SendTransactionStatsPerAddr, @@ -91,7 +92,7 @@ async fn join_scheduler( scheduler_handle: JoinHandle< Result, >, -) -> SendTransactionStats { +) -> Arc { let stats_per_ip = scheduler_handle .await .unwrap() @@ -238,10 +239,10 @@ async fn test_basic_transactions_sending() { // Stop sending tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await; + let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); assert_eq!( localhost_stats, - SendTransactionStats { + SendTransactionStatsNonAtomic { successfully_sent: expected_num_txs as u64, ..Default::default() } @@ -315,7 +316,7 @@ async fn test_connection_denied_until_allowed() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await; + let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); // in case of pruning, server closes the connection with code 1 and error // message b"dropped". This might lead to connection error // (ApplicationClosed::ApplicationClose) or to stream error @@ -374,7 +375,7 @@ async fn test_connection_pruned_and_reopened() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await; + let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); // in case of pruning, server closes the connection with code 1 and error // message b"dropped". This might lead to connection error // (ApplicationClosed::ApplicationClose) or to stream error @@ -435,10 +436,10 @@ async fn test_staked_connection() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await; + let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); assert_eq!( localhost_stats, - SendTransactionStats { + SendTransactionStatsNonAtomic { successfully_sent: expected_num_txs as u64, ..Default::default() } @@ -482,10 +483,10 @@ async fn test_connection_throttling() { // Stop sending tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await; + let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); assert_eq!( localhost_stats, - SendTransactionStats { + SendTransactionStatsNonAtomic { successfully_sent: expected_num_txs as u64, ..Default::default() } @@ -526,13 +527,14 @@ async fn test_no_host() { tx_sender_shutdown.await; // While attempting to establish a connection with a nonexistent host, we fill the worker's - // channel. Transactions from this channel will never be sent and will eventually be dropped - // without increasing the `SendTransactionStats` counters. + // channel. let stats = scheduler_handle .await .expect("Scheduler should stop successfully") .expect("Scheduler execution was successful"); - assert_eq!(stats, HashMap::new()); + let stats = stats.get(&server_ip).unwrap().to_non_atomic(); + // `5` because `config.max_reconnect_attempts` is 4 + assert_eq!(stats.connect_error_invalid_remote_address, 5); } // Check that when the client is rate-limited by server, we update counters @@ -582,13 +584,13 @@ async fn test_rate_limiting() { // And the scheduler. scheduler_cancel.cancel(); - let localhost_stats = join_scheduler(scheduler_handle).await; + let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); // We do not expect to see any errors, as the connection is in the pending state still, when we // do the shutdown. If we increase the time we wait in `count_received_packets_for`, we would // start seeing a `connection_error_timed_out` incremented to 1. Potentially, we may want to // accept both 0 and 1 as valid values for it. - assert_eq!(localhost_stats, SendTransactionStats::default()); + assert_eq!(localhost_stats, SendTransactionStatsNonAtomic::default()); // Stop the server. exit.store(true, Ordering::Relaxed); @@ -646,7 +648,7 @@ async fn test_rate_limiting_establish_connection() { // And the scheduler. scheduler_cancel.cancel(); - let mut localhost_stats = join_scheduler(scheduler_handle).await; + let mut localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); assert!( localhost_stats.connection_error_timed_out > 0, "As the quinn timeout is below 1 minute, a few connections will fail to connect during \ @@ -665,7 +667,7 @@ async fn test_rate_limiting_establish_connection() { // All the rest of the error counters should be 0. localhost_stats.connection_error_timed_out = 0; localhost_stats.successfully_sent = 0; - assert_eq!(localhost_stats, SendTransactionStats::default()); + assert_eq!(localhost_stats, SendTransactionStatsNonAtomic::default()); // Stop the server. exit.store(true, Ordering::Relaxed); From 85e8ae48afa70180ea2db0edfccb9ac673e28248 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Nov 2024 17:00:43 +0100 Subject: [PATCH 05/10] Use try_send instead, minor impromenets --- .../src/connection_workers_scheduler.rs | 64 ++++++------- tpu-client-next/src/workers_cache.rs | 89 +++++++++++-------- .../connection_workers_scheduler_test.rs | 6 +- 3 files changed, 81 insertions(+), 78 deletions(-) diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 3e89a01c939b4c..3739b900179c19 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -9,7 +9,7 @@ use { create_client_config, create_client_endpoint, QuicClientCertificate, QuicError, }, transaction_batch::TransactionBatch, - workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError}, + workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError}, SendTransactionStats, }, log::*, @@ -45,8 +45,9 @@ pub enum LeadersFanout { /// Send transactions to all the leaders discovered by the `next_leaders` /// call. All, - /// Send transactions to the first selected number of leaders. - Selected(usize), + /// Send transactions to the first selected number of leaders discovered by + /// the `next_leaders` call. + Next(usize), } /// Configuration for the [`ConnectionWorkersScheduler`]. @@ -144,34 +145,25 @@ impl ConnectionWorkersScheduler { max_reconnect_attempts, stats.clone(), ); - let shutdown_worker = workers.push(*new_leader, worker).await; - if let Some(shutdown_worker) = shutdown_worker { - tokio::spawn(async move { - let leader = shutdown_worker.leader(); - let res = shutdown_worker.shutdown().await; - if let Err(err) = res { - debug!("Error while shutting down worker for {leader}: {err}"); - } - }); - } + maybe_shutdown_worker(workers.push(*new_leader, worker)); } - tokio::select! { - send_res = workers.send_transactions_to_address(new_leader, transaction_batch.clone()) => match send_res { - Ok(()) => (), - Err(WorkersCacheError::ShutdownError) => { - debug!("Connection to {new_leader} was closed, worker cache shutdown"); - } - Err(err) => { - warn!("Connection to {new_leader} was closed, worker error: {err}"); - // If we has failed to send batch, it will be dropped. - } - }, - () = cancel.cancelled() => { - debug!("Cancelled: Shutting down"); - break; + let send_res = + workers.try_send_transactions_to_address(new_leader, transaction_batch.clone()); + match send_res { + Ok(()) => (), + Err(WorkersCacheError::ShutdownError) => { + debug!("Connection to {new_leader} was closed, worker cache shutdown"); + } + Err(WorkersCacheError::ReceiverDropped) => { + // Remove the worker from the cache, if the peer has disconnected. + maybe_shutdown_worker(workers.pop(*new_leader)); + } + Err(err) => { + warn!("Connection to {new_leader} was closed, worker error: {err}"); + // If we has failed to send batch, it will be dropped. } - }; + } } // add future leaders to the cache to hide the latency of opening the @@ -187,7 +179,7 @@ impl ConnectionWorkersScheduler { max_reconnect_attempts, stats.clone(), ); - workers.push(*peer, worker).await; + maybe_shutdown_worker(workers.push(*peer, worker)); } } } @@ -249,12 +241,10 @@ impl ConnectionWorkersScheduler { fn split_leaders<'a>( leaders: &'a [SocketAddr], fanout: &'a LeadersFanout, -) -> (Vec<&'a SocketAddr>, Vec<&'a SocketAddr>) { - match fanout { - LeadersFanout::All => (leaders.iter().collect(), Vec::new()), // All elements go to the first vector - LeadersFanout::Selected(count) => { - let (selected, remaining) = leaders.split_at((*count).min(leaders.len())); // Split at the specified count or max length - (selected.iter().collect(), remaining.iter().collect()) - } - } +) -> (&'a [SocketAddr], &'a [SocketAddr]) { + let count = match fanout { + LeadersFanout::All => leaders.len(), + LeadersFanout::Next(count) => (*count).min(leaders.len()), + }; + leaders.split_at(count) } diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs index ed9b68721cae26..39965fa7c9a113 100644 --- a/tpu-client-next/src/workers_cache.rs +++ b/tpu-client-next/src/workers_cache.rs @@ -15,9 +15,9 @@ use { /// [`WorkerInfo`] holds information about a worker responsible for sending /// transaction batches. pub(crate) struct WorkerInfo { - pub sender: mpsc::Sender, - pub handle: JoinHandle<()>, - pub cancel: CancellationToken, + sender: mpsc::Sender, + handle: JoinHandle<()>, + cancel: CancellationToken, } impl WorkerInfo { @@ -33,14 +33,11 @@ impl WorkerInfo { } } - async fn send_transactions( - &self, - txs_batch: TransactionBatch, - ) -> Result<(), WorkersCacheError> { - self.sender - .send(txs_batch) - .await - .map_err(|_| WorkersCacheError::ReceiverDropped)?; + fn try_send_transactions(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> { + self.sender.try_send(txs_batch).map_err(|err| match err { + mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel, + mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, + })?; Ok(()) } @@ -72,6 +69,9 @@ pub enum WorkersCacheError { #[error("Work receiver has been dropped unexpectedly.")] ReceiverDropped, + #[error("Worker's channel is full.")] + FullChannel, + #[error("Task failed to join.")] TaskJoinFailure, @@ -91,12 +91,22 @@ impl WorkersCache { self.workers.contains(peer) } - pub(crate) async fn push( + pub(crate) fn push( &mut self, - peer: SocketAddr, + leader: SocketAddr, peer_worker: WorkerInfo, ) -> Option { - if let Some((leader, popped_worker)) = self.workers.push(peer, peer_worker) { + if let Some((leader, popped_worker)) = self.workers.push(leader, peer_worker) { + return Some(ShutdownWorker { + leader, + worker: popped_worker, + }); + } + None + } + + pub(crate) fn pop(&mut self, leader: SocketAddr) -> Option { + if let Some(popped_worker) = self.workers.pop(&leader) { return Some(ShutdownWorker { leader, worker: popped_worker, @@ -108,7 +118,7 @@ impl WorkersCache { /// Sends a batch of transactions to the worker for a given peer. If the /// worker for the peer is disconnected or fails, it is removed from the /// cache. - pub(crate) async fn send_transactions_to_address( + pub(crate) fn try_send_transactions_to_address( &mut self, peer: &SocketAddr, txs_batch: TransactionBatch, @@ -116,33 +126,24 @@ impl WorkersCache { let Self { workers, cancel, .. } = self; + if cancel.is_cancelled() { + return Err(WorkersCacheError::ShutdownError); + } - let body = async move { - let current_worker = workers.get(peer).expect( - "Failed to fetch worker for peer {peer}.\n\ + let current_worker = workers.get(peer).expect( + "Failed to fetch worker for peer {peer}.\n\ Peer existence must be checked before this call using `contains` method.", - ); - let send_res = current_worker.send_transactions(txs_batch).await; - - if let Err(WorkersCacheError::ReceiverDropped) = send_res { - // Remove the worker from the cache, if the peer has disconnected. - if let Some(current_worker) = workers.pop(peer) { - // To avoid obscuring the error from send, ignore a possible - // `TaskJoinFailure`. - let close_result = current_worker.shutdown().await; - if let Err(error) = close_result { - error!("Error while closing worker: {error}."); - } - } - } - - send_res - }; + ); + let send_res = current_worker.try_send_transactions(txs_batch); - tokio::select! { - send_res = body => send_res, - () = cancel.cancelled() => Err(WorkersCacheError::ShutdownError), + if let Err(WorkersCacheError::ReceiverDropped) = send_res { + warn!( + "Failed to deliver transaction batch for leader {}, drop batch.", + peer.ip() + ); } + + send_res } /// Closes and removes all workers in the cache. This is typically done when @@ -177,3 +178,15 @@ impl ShutdownWorker { self.worker.shutdown().await } } + +pub(crate) fn maybe_shutdown_worker(worker: Option) { + if let Some(worker) = worker { + tokio::spawn(async move { + let leader = worker.leader(); + let res = worker.shutdown().await; + if let Err(err) = res { + debug!("Error while shutting down worker for {leader}: {err}"); + } + }); + } +} diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index dad037a0867ca7..4448e958999c18 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -16,7 +16,7 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, LeadersFanout::Selected}, + connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, LeadersFanout}, leader_updater::create_leader_updater, send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, @@ -48,10 +48,10 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule stake_identity: validator_identity, num_connections: 1, skip_check_transaction_age: false, - worker_channel_size: 2, + worker_channel_size: 100, max_reconnect_attempts: 4, lookahead_slots: 1, - leaders_fanout: Selected(1), + leaders_fanout: LeadersFanout::Next(1), } } From 244c4c26310995304f34c76656e3149069f3177f Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Nov 2024 17:28:18 +0100 Subject: [PATCH 06/10] fix LeaderUpdaterError traits --- tpu-client-next/src/leader_updater.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs index 63d0938da46d51..0405dd18dc357d 100644 --- a/tpu-client-next/src/leader_updater.rs +++ b/tpu-client-next/src/leader_updater.rs @@ -22,6 +22,7 @@ use { Arc, }, }, + thiserror::Error, }; /// [`LeaderUpdater`] trait abstracts out functionality required for the @@ -42,6 +43,7 @@ pub trait LeaderUpdater: Send { } /// Error type for [`LeaderUpdater`]. +#[derive(Error, PartialEq)] pub struct LeaderUpdaterError; impl fmt::Display for LeaderUpdaterError { From 612b61126c91ee2755bf35c5ebf628a0e07a33f2 Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Thu, 7 Nov 2024 08:39:08 +0100 Subject: [PATCH 07/10] improve lifetimes in split_leaders Co-authored-by: Illia Bobyr --- tpu-client-next/src/connection_workers_scheduler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 3739b900179c19..8c40c647b5ba46 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -238,10 +238,10 @@ impl ConnectionWorkersScheduler { /// Splits the input vector of leaders into two parts based on the `fanout` configuration: /// * the first vector contains the leaders to which transactions will be sent. /// * the second vector contains the remaining leaders, used to warm up connections. -fn split_leaders<'a>( - leaders: &'a [SocketAddr], - fanout: &'a LeadersFanout, -) -> (&'a [SocketAddr], &'a [SocketAddr]) { +fn split_leaders<'leaders>( + leaders: &'leaders [SocketAddr], + fanout: &LeadersFanout, +) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) { let count = match fanout { LeadersFanout::All => leaders.len(), LeadersFanout::Next(count) => (*count).min(leaders.len()), From ecc721d32085f46a63698de40d309749afb87873 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Thu, 7 Nov 2024 14:20:34 +0100 Subject: [PATCH 08/10] address PR comments --- tpu-client-next/src/send_transaction_stats.rs | 4 +-- tpu-client-next/src/workers_cache.rs | 28 +++++++++++-------- .../connection_workers_scheduler_test.rs | 26 +++++++++-------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/tpu-client-next/src/send_transaction_stats.rs b/tpu-client-next/src/send_transaction_stats.rs index 43711f3e385851..fe16e2546ea411 100644 --- a/tpu-client-next/src/send_transaction_stats.rs +++ b/tpu-client-next/src/send_transaction_stats.rs @@ -168,7 +168,7 @@ impl fmt::Display for SendTransactionStats { /// For tests it is useful to be have PartialEq but we cannot have it on top of /// atomics. This macro creates a structure with the same attributes but of type /// u64. -macro_rules! define_non_atomic_struct { +macro_rules! define_non_atomic_struct_for { ($name:ident, $atomic_name:ident, {$($field:ident),* $(,)?}) => { #[derive(Debug, Default, PartialEq)] pub struct $name { @@ -186,7 +186,7 @@ macro_rules! define_non_atomic_struct { } // Define the non-atomic struct and the `to_non_atomic` conversion method -define_non_atomic_struct!( +define_non_atomic_struct_for!( SendTransactionStatsNonAtomic, SendTransactionStats, { diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs index 39965fa7c9a113..d3d25223dfcbfd 100644 --- a/tpu-client-next/src/workers_cache.rs +++ b/tpu-client-next/src/workers_cache.rs @@ -8,7 +8,10 @@ use { lru::LruCache, std::net::SocketAddr, thiserror::Error, - tokio::{sync::mpsc, task::JoinHandle}, + tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinHandle, + }, tokio_util::sync::CancellationToken, }; @@ -35,8 +38,8 @@ impl WorkerInfo { fn try_send_transactions(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> { self.sender.try_send(txs_batch).map_err(|err| match err { - mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel, - mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, + TrySendError::Full(_) => WorkersCacheError::FullChannel, + TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, })?; Ok(()) } @@ -180,13 +183,14 @@ impl ShutdownWorker { } pub(crate) fn maybe_shutdown_worker(worker: Option) { - if let Some(worker) = worker { - tokio::spawn(async move { - let leader = worker.leader(); - let res = worker.shutdown().await; - if let Err(err) = res { - debug!("Error while shutting down worker for {leader}: {err}"); - } - }); - } + let Some(worker) = worker else { + return; + }; + tokio::spawn(async move { + let leader = worker.leader(); + let res = worker.shutdown().await; + if let Err(err) = res { + debug!("Error while shutting down worker for {leader}: {err}"); + } + }); } diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 4448e958999c18..829c4e40f247cc 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -20,8 +20,7 @@ use { leader_updater::create_leader_updater, send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, - ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats, - SendTransactionStatsPerAddr, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr, }, std::{ collections::HashMap, @@ -48,6 +47,11 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule stake_identity: validator_identity, num_connections: 1, skip_check_transaction_age: false, + // At the moment we have only one strategy to send transactions: we try + // to put to worker channel transaction batch and in case of failure + // just drop it. This requires to use large channels here. In the + // future, we are planning to add an option to send with backpressure at + // the speed of fastest leader. worker_channel_size: 100, max_reconnect_attempts: 4, lookahead_slots: 1, @@ -92,7 +96,7 @@ async fn join_scheduler( scheduler_handle: JoinHandle< Result, >, -) -> Arc { +) -> SendTransactionStatsNonAtomic { let stats_per_ip = scheduler_handle .await .unwrap() @@ -100,7 +104,7 @@ async fn join_scheduler( stats_per_ip .get(&IpAddr::from_str("127.0.0.1").unwrap()) .expect("setup_connection_worker_scheduler() connected to a leader at 127.0.0.1") - .clone() + .to_non_atomic() } // Specify the pessimistic time to finish generation and result checks. @@ -239,7 +243,7 @@ async fn test_basic_transactions_sending() { // Stop sending tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, SendTransactionStatsNonAtomic { @@ -316,7 +320,7 @@ async fn test_connection_denied_until_allowed() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; // in case of pruning, server closes the connection with code 1 and error // message b"dropped". This might lead to connection error // (ApplicationClosed::ApplicationClose) or to stream error @@ -375,7 +379,7 @@ async fn test_connection_pruned_and_reopened() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; // in case of pruning, server closes the connection with code 1 and error // message b"dropped". This might lead to connection error // (ApplicationClosed::ApplicationClose) or to stream error @@ -436,7 +440,7 @@ async fn test_staked_connection() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, SendTransactionStatsNonAtomic { @@ -483,7 +487,7 @@ async fn test_connection_throttling() { // Stop sending tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, SendTransactionStatsNonAtomic { @@ -584,7 +588,7 @@ async fn test_rate_limiting() { // And the scheduler. scheduler_cancel.cancel(); - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; // We do not expect to see any errors, as the connection is in the pending state still, when we // do the shutdown. If we increase the time we wait in `count_received_packets_for`, we would @@ -648,7 +652,7 @@ async fn test_rate_limiting_establish_connection() { // And the scheduler. scheduler_cancel.cancel(); - let mut localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let mut localhost_stats = join_scheduler(scheduler_handle).await; assert!( localhost_stats.connection_error_timed_out > 0, "As the quinn timeout is below 1 minute, a few connections will fail to connect during \ From afbc3daf046cb1edb7860016d9a1b44b88ed1d41 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Thu, 7 Nov 2024 15:13:22 +0100 Subject: [PATCH 09/10] create connections in advance --- .../src/connection_workers_scheduler.rs | 83 +++++++++++-------- .../connection_workers_scheduler_test.rs | 7 +- 2 files changed, 55 insertions(+), 35 deletions(-) diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 8c40c647b5ba46..abb0f4275d1df3 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -40,6 +40,19 @@ pub enum ConnectionWorkersSchedulerError { LeaderReceiverDropped, } +/// [`Fanout`] is a configuration struct that specifies how many leaders should +/// be targeted when sending transactions and connecting. +/// +/// Assumption is that `send_next` <= `connect_next`. The idea is to hide latency +/// of creating connections by doing this in advance. +pub struct Fanout { + /// The number of leaders to target for sending transactions. + pub send_next: usize, + + /// The number of leaders to target for establishing connections. + pub connect_next: usize, +} + /// This enum defines to how many discovered leaders we will send transactions. pub enum LeadersFanout { /// Send transactions to all the leaders discovered by the `next_leaders` @@ -47,7 +60,7 @@ pub enum LeadersFanout { All, /// Send transactions to the first selected number of leaders discovered by /// the `next_leaders` call. - Next(usize), + Next(Fanout), } /// Configuration for the [`ConnectionWorkersScheduler`]. @@ -132,20 +145,29 @@ impl ConnectionWorkersScheduler { }; let updated_leaders = leader_updater.next_leaders(lookahead_slots); - let (new_leaders, future_leaders) = split_leaders(&updated_leaders, &leaders_fanout); - for new_leader in new_leaders { - if !workers.contains(new_leader) { - debug!("No existing workers for {new_leader:?}, starting a new one."); - let stats = send_stats_per_addr.entry(new_leader.ip()).or_default(); + let (fanout_leaders, connect_leaders) = + split_leaders(&updated_leaders, &leaders_fanout); + // add future leaders to the cache to hide the latency of opening + // the connection. + for peer in connect_leaders { + if !workers.contains(peer) { + let stats = send_stats_per_addr.entry(peer.ip()).or_default(); let worker = Self::spawn_worker( &endpoint, - new_leader, + peer, worker_channel_size, skip_check_transaction_age, max_reconnect_attempts, stats.clone(), ); - maybe_shutdown_worker(workers.push(*new_leader, worker)); + maybe_shutdown_worker(workers.push(*peer, worker)); + } + } + + for new_leader in fanout_leaders { + if !workers.contains(new_leader) { + warn!("No existing worker for {new_leader:?}, skip sending to this leader."); + continue; } let send_res = @@ -165,23 +187,6 @@ impl ConnectionWorkersScheduler { } } } - - // add future leaders to the cache to hide the latency of opening the - // connection. - for peer in future_leaders { - if !workers.contains(peer) { - let stats = send_stats_per_addr.entry(peer.ip()).or_default(); - let worker = Self::spawn_worker( - &endpoint, - peer, - worker_channel_size, - skip_check_transaction_age, - max_reconnect_attempts, - stats.clone(), - ); - maybe_shutdown_worker(workers.push(*peer, worker)); - } - } } workers.shutdown().await; @@ -235,16 +240,28 @@ impl ConnectionWorkersScheduler { } } -/// Splits the input vector of leaders into two parts based on the `fanout` configuration: -/// * the first vector contains the leaders to which transactions will be sent. -/// * the second vector contains the remaining leaders, used to warm up connections. +/// Splits `leaders` into two slices based on the `fanout` configuration: +/// * the first slice contains the leaders to which transactions will be sent, +/// * the second vector contains the leaders, used to warm up connections. This +/// slice includes the the first set. fn split_leaders<'leaders>( leaders: &'leaders [SocketAddr], fanout: &LeadersFanout, ) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) { - let count = match fanout { - LeadersFanout::All => leaders.len(), - LeadersFanout::Next(count) => (*count).min(leaders.len()), - }; - leaders.split_at(count) + match fanout { + LeadersFanout::All => (leaders, leaders), + LeadersFanout::Next(Fanout { + send_next, + connect_next, + }) => { + assert!(send_next <= connect_next); + let send_count = (*send_next).min(leaders.len()); + let connect_count = (*connect_next).min(leaders.len()); + + let send_slice = &leaders[..send_count]; + let connect_slice = &leaders[..connect_count]; + + (send_slice, connect_slice) + } + } } diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 829c4e40f247cc..13bb06ee54921b 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -16,7 +16,7 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, LeadersFanout}, + connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout, LeadersFanout}, leader_updater::create_leader_updater, send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, @@ -55,7 +55,10 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule worker_channel_size: 100, max_reconnect_attempts: 4, lookahead_slots: 1, - leaders_fanout: LeadersFanout::Next(1), + leaders_fanout: LeadersFanout::Next(Fanout { + send_next: 1, + connect_next: 1, + }), } } From 5fbfdddb9a0a4fb4e0068a2d04b13ac7809cc27f Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Thu, 7 Nov 2024 16:39:25 +0100 Subject: [PATCH 10/10] removed lookahead_slots --- .../src/connection_workers_scheduler.rs | 60 +++++++------------ tpu-client-next/src/leader_updater.rs | 13 ++-- .../connection_workers_scheduler_test.rs | 11 ++-- 3 files changed, 36 insertions(+), 48 deletions(-) diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index abb0f4275d1df3..e42040316c139a 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -43,24 +43,20 @@ pub enum ConnectionWorkersSchedulerError { /// [`Fanout`] is a configuration struct that specifies how many leaders should /// be targeted when sending transactions and connecting. /// -/// Assumption is that `send_next` <= `connect_next`. The idea is to hide latency -/// of creating connections by doing this in advance. +/// Note, that the unit is number of leaders per +/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is +/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per +/// consecutive leader slots are [L1, L1, L2], so there are 3 of them. +/// +/// The idea of having a separate `connect` parameter is to create a set of +/// nodes to connect to in advance in order to hide the latency of opening new +/// connection. Hence, `connect` must be greater or equal to `send` pub struct Fanout { /// The number of leaders to target for sending transactions. - pub send_next: usize, + pub send: usize, /// The number of leaders to target for establishing connections. - pub connect_next: usize, -} - -/// This enum defines to how many discovered leaders we will send transactions. -pub enum LeadersFanout { - /// Send transactions to all the leaders discovered by the `next_leaders` - /// call. - All, - /// Send transactions to the first selected number of leaders discovered by - /// the `next_leaders` call. - Next(Fanout), + pub connect: usize, } /// Configuration for the [`ConnectionWorkersScheduler`]. @@ -90,13 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig { /// connection failure. pub max_reconnect_attempts: usize, - /// The number of slots to look ahead during the leader estimation - /// procedure. Determines how far into the future leaders are estimated, - /// allowing connections to be established with those leaders in advance. - pub lookahead_slots: u64, - - /// The number of leaders to send transactions to. - pub leaders_fanout: LeadersFanout, + /// Configures the number of leaders to connect to and send transactions to. + pub leaders_fanout: Fanout, } impl ConnectionWorkersScheduler { @@ -117,7 +108,6 @@ impl ConnectionWorkersScheduler { skip_check_transaction_age, worker_channel_size, max_reconnect_attempts, - lookahead_slots, leaders_fanout, }: ConnectionWorkersSchedulerConfig, mut leader_updater: Box, @@ -143,7 +133,8 @@ impl ConnectionWorkersScheduler { break; } }; - let updated_leaders = leader_updater.next_leaders(lookahead_slots); + + let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect); let (fanout_leaders, connect_leaders) = split_leaders(&updated_leaders, &leaders_fanout); @@ -246,22 +237,15 @@ impl ConnectionWorkersScheduler { /// slice includes the the first set. fn split_leaders<'leaders>( leaders: &'leaders [SocketAddr], - fanout: &LeadersFanout, + fanout: &Fanout, ) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) { - match fanout { - LeadersFanout::All => (leaders, leaders), - LeadersFanout::Next(Fanout { - send_next, - connect_next, - }) => { - assert!(send_next <= connect_next); - let send_count = (*send_next).min(leaders.len()); - let connect_count = (*connect_next).min(leaders.len()); + let Fanout { send, connect } = fanout; + assert!(send <= connect); + let send_count = (*send).min(leaders.len()); + let connect_count = (*connect).min(leaders.len()); - let send_slice = &leaders[..send_count]; - let connect_slice = &leaders[..connect_count]; + let send_slice = &leaders[..send_count]; + let connect_slice = &leaders[..connect_count]; - (send_slice, connect_slice) - } - } + (send_slice, connect_slice) } diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs index 0405dd18dc357d..1c7d16cd2acb2d 100644 --- a/tpu-client-next/src/leader_updater.rs +++ b/tpu-client-next/src/leader_updater.rs @@ -13,6 +13,7 @@ use { log::*, solana_connection_cache::connection_cache::Protocol, solana_rpc_client::nonblocking::rpc_client::RpcClient, + solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS, solana_tpu_client::nonblocking::tpu_client::LeaderTpuService, std::{ fmt, @@ -30,13 +31,15 @@ use { /// identify next leaders to send transactions to. #[async_trait] pub trait LeaderUpdater: Send { - /// Returns next unique leaders for the next `lookahead_slots` starting from + /// Returns next leaders for the next `lookahead_leaders` starting from /// current estimated slot. /// + /// Leaders are returned per [`NUM_CONSECUTIVE_LEADER_SLOTS`] to avoid unnecessary repetition. + /// /// If the current leader estimation is incorrect and transactions are sent to /// only one estimated leader, there is a risk of losing all the transactions, /// depending on the forwarding policy. - fn next_leaders(&mut self, lookahead_slots: u64) -> Vec; + fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec; /// Stop [`LeaderUpdater`] and releases all associated resources. async fn stop(&mut self); @@ -100,7 +103,9 @@ struct LeaderUpdaterService { #[async_trait] impl LeaderUpdater for LeaderUpdaterService { - fn next_leaders(&mut self, lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec { + let lookahead_slots = + (lookahead_leaders as u64).saturating_mul(NUM_CONSECUTIVE_LEADER_SLOTS); self.leader_tpu_service.leader_tpu_sockets(lookahead_slots) } @@ -118,7 +123,7 @@ struct PinnedLeaderUpdater { #[async_trait] impl LeaderUpdater for PinnedLeaderUpdater { - fn next_leaders(&mut self, _lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, _lookahead_leaders: usize) -> Vec { self.address.clone() } diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 13bb06ee54921b..8a8d623ab8da3f 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -16,7 +16,7 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout, LeadersFanout}, + connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout}, leader_updater::create_leader_updater, send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, @@ -54,11 +54,10 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule // the speed of fastest leader. worker_channel_size: 100, max_reconnect_attempts: 4, - lookahead_slots: 1, - leaders_fanout: LeadersFanout::Next(Fanout { - send_next: 1, - connect_next: 1, - }), + leaders_fanout: Fanout { + send: 1, + connect: 1, + }, } }