-
Notifications
You must be signed in to change notification settings - Fork 265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add fanout to tpu-client-next #3478
Changes from all commits
92cfd0c
20a9c52
bb3b34f
42755bc
85e8ae4
244c4c2
612b611
ecc721d
afbc3da
5fbfddd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -9,7 +9,8 @@ use { | |||||||||
create_client_config, create_client_endpoint, QuicClientCertificate, QuicError, | ||||||||||
}, | ||||||||||
transaction_batch::TransactionBatch, | ||||||||||
workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError}, | ||||||||||
workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError}, | ||||||||||
SendTransactionStats, | ||||||||||
}, | ||||||||||
log::*, | ||||||||||
quinn::Endpoint, | ||||||||||
|
@@ -39,6 +40,25 @@ pub enum ConnectionWorkersSchedulerError { | |||||||||
LeaderReceiverDropped, | ||||||||||
} | ||||||||||
|
||||||||||
/// [`Fanout`] is a configuration struct that specifies how many leaders should | ||||||||||
/// be targeted when sending transactions and connecting. | ||||||||||
/// | ||||||||||
/// Note, that the unit is number of leaders per | ||||||||||
/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is | ||||||||||
/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per | ||||||||||
/// consecutive leader slots are [L1, L1, L2], so there are 3 of them. | ||||||||||
/// | ||||||||||
/// The idea of having a separate `connect` parameter is to create a set of | ||||||||||
/// nodes to connect to in advance in order to hide the latency of opening new | ||||||||||
/// connection. Hence, `connect` must be greater or equal to `send` | ||||||||||
pub struct Fanout { | ||||||||||
/// The number of leaders to target for sending transactions. | ||||||||||
pub send: usize, | ||||||||||
|
||||||||||
/// The number of leaders to target for establishing connections. | ||||||||||
pub connect: usize, | ||||||||||
} | ||||||||||
|
||||||||||
/// Configuration for the [`ConnectionWorkersScheduler`]. | ||||||||||
/// | ||||||||||
/// This struct holds the necessary settings to initialize and manage connection | ||||||||||
|
@@ -66,10 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig { | |||||||||
/// connection failure. | ||||||||||
pub max_reconnect_attempts: usize, | ||||||||||
|
||||||||||
/// The number of slots to look ahead during the leader estimation | ||||||||||
/// procedure. Determines how far into the future leaders are estimated, | ||||||||||
/// allowing connections to be established with those leaders in advance. | ||||||||||
pub lookahead_slots: u64, | ||||||||||
/// Configures the number of leaders to connect to and send transactions to. | ||||||||||
pub leaders_fanout: Fanout, | ||||||||||
} | ||||||||||
|
||||||||||
impl ConnectionWorkersScheduler { | ||||||||||
|
@@ -90,7 +108,7 @@ impl ConnectionWorkersScheduler { | |||||||||
skip_check_transaction_age, | ||||||||||
worker_channel_size, | ||||||||||
max_reconnect_attempts, | ||||||||||
lookahead_slots, | ||||||||||
leaders_fanout, | ||||||||||
}: ConnectionWorkersSchedulerConfig, | ||||||||||
mut leader_updater: Box<dyn LeaderUpdater>, | ||||||||||
mut transaction_receiver: mpsc::Receiver<TransactionBatch>, | ||||||||||
|
@@ -99,6 +117,7 @@ impl ConnectionWorkersScheduler { | |||||||||
let endpoint = Self::setup_endpoint(bind, validator_identity)?; | ||||||||||
debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); | ||||||||||
let mut workers = WorkersCache::new(num_connections, cancel.clone()); | ||||||||||
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new(); | ||||||||||
|
||||||||||
loop { | ||||||||||
let transaction_batch = tokio::select! { | ||||||||||
|
@@ -114,50 +133,49 @@ impl ConnectionWorkersScheduler { | |||||||||
break; | ||||||||||
} | ||||||||||
}; | ||||||||||
let updated_leaders = leader_updater.next_leaders(lookahead_slots); | ||||||||||
let new_leader = &updated_leaders[0]; | ||||||||||
let future_leaders = &updated_leaders[1..]; | ||||||||||
if !workers.contains(new_leader) { | ||||||||||
debug!("No existing workers for {new_leader:?}, starting a new one."); | ||||||||||
let worker = Self::spawn_worker( | ||||||||||
&endpoint, | ||||||||||
new_leader, | ||||||||||
worker_channel_size, | ||||||||||
skip_check_transaction_age, | ||||||||||
max_reconnect_attempts, | ||||||||||
); | ||||||||||
workers.push(*new_leader, worker).await; | ||||||||||
} | ||||||||||
|
||||||||||
tokio::select! { | ||||||||||
send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res { | ||||||||||
Ok(()) => (), | ||||||||||
Err(WorkersCacheError::ShutdownError) => { | ||||||||||
debug!("Connection to {new_leader} was closed, worker cache shutdown"); | ||||||||||
} | ||||||||||
Err(err) => { | ||||||||||
warn!("Connection to {new_leader} was closed, worker error: {err}"); | ||||||||||
// If we has failed to send batch, it will be dropped. | ||||||||||
} | ||||||||||
}, | ||||||||||
() = cancel.cancelled() => { | ||||||||||
debug!("Cancelled: Shutting down"); | ||||||||||
break; | ||||||||||
} | ||||||||||
}; | ||||||||||
let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect); | ||||||||||
|
||||||||||
// Regardless of who is leader, add future leaders to the cache to | ||||||||||
// hide the latency of opening the connection. | ||||||||||
for peer in future_leaders { | ||||||||||
let (fanout_leaders, connect_leaders) = | ||||||||||
split_leaders(&updated_leaders, &leaders_fanout); | ||||||||||
// add future leaders to the cache to hide the latency of opening | ||||||||||
// the connection. | ||||||||||
for peer in connect_leaders { | ||||||||||
if !workers.contains(peer) { | ||||||||||
let stats = send_stats_per_addr.entry(peer.ip()).or_default(); | ||||||||||
let worker = Self::spawn_worker( | ||||||||||
&endpoint, | ||||||||||
peer, | ||||||||||
worker_channel_size, | ||||||||||
skip_check_transaction_age, | ||||||||||
max_reconnect_attempts, | ||||||||||
stats.clone(), | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor This
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. weird that it passed clippy |
||||||||||
); | ||||||||||
workers.push(*peer, worker).await; | ||||||||||
maybe_shutdown_worker(workers.push(*peer, worker)); | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
for new_leader in fanout_leaders { | ||||||||||
if !workers.contains(new_leader) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I don't think that this can ever happen? I'd remove the code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can happen if a connection to the leader is dropped and the worker is stopped. Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
} It is possible for the
Comment on lines
+158
to
+159
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the But we actually start workers in the block above. Maybe call it
Suggested change
Footnotes
|
||||||||||
warn!("No existing worker for {new_leader:?}, skip sending to this leader."); | ||||||||||
continue; | ||||||||||
} | ||||||||||
|
||||||||||
let send_res = | ||||||||||
workers.try_send_transactions_to_address(new_leader, transaction_batch.clone()); | ||||||||||
match send_res { | ||||||||||
Ok(()) => (), | ||||||||||
Err(WorkersCacheError::ShutdownError) => { | ||||||||||
debug!("Connection to {new_leader} was closed, worker cache shutdown"); | ||||||||||
} | ||||||||||
Err(WorkersCacheError::ReceiverDropped) => { | ||||||||||
// Remove the worker from the cache, if the peer has disconnected. | ||||||||||
maybe_shutdown_worker(workers.pop(*new_leader)); | ||||||||||
} | ||||||||||
Err(err) => { | ||||||||||
warn!("Connection to {new_leader} was closed, worker error: {err}"); | ||||||||||
// If we has failed to send batch, it will be dropped. | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
@@ -166,7 +184,7 @@ impl ConnectionWorkersScheduler { | |||||||||
|
||||||||||
endpoint.close(0u32.into(), b"Closing connection"); | ||||||||||
leader_updater.stop().await; | ||||||||||
Ok(workers.transaction_stats().clone()) | ||||||||||
Ok(send_stats_per_addr) | ||||||||||
} | ||||||||||
|
||||||||||
/// Sets up the QUIC endpoint for the scheduler to handle connections. | ||||||||||
|
@@ -191,6 +209,7 @@ impl ConnectionWorkersScheduler { | |||||||||
worker_channel_size: usize, | ||||||||||
skip_check_transaction_age: bool, | ||||||||||
max_reconnect_attempts: usize, | ||||||||||
stats: Arc<SendTransactionStats>, | ||||||||||
) -> WorkerInfo { | ||||||||||
let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size); | ||||||||||
let endpoint = endpoint.clone(); | ||||||||||
|
@@ -202,12 +221,31 @@ impl ConnectionWorkersScheduler { | |||||||||
txs_receiver, | ||||||||||
skip_check_transaction_age, | ||||||||||
max_reconnect_attempts, | ||||||||||
stats, | ||||||||||
); | ||||||||||
let handle = tokio::spawn(async move { | ||||||||||
worker.run().await; | ||||||||||
worker.transaction_stats().clone() | ||||||||||
}); | ||||||||||
|
||||||||||
WorkerInfo::new(txs_sender, handle, cancel) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
/// Splits `leaders` into two slices based on the `fanout` configuration: | ||||||||||
/// * the first slice contains the leaders to which transactions will be sent, | ||||||||||
/// * the second vector contains the leaders, used to warm up connections. This | ||||||||||
/// slice includes the the first set. | ||||||||||
Comment on lines
+236
to
+237
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor
Suggested change
|
||||||||||
fn split_leaders<'leaders>( | ||||||||||
leaders: &'leaders [SocketAddr], | ||||||||||
fanout: &Fanout, | ||||||||||
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) { | ||||||||||
let Fanout { send, connect } = fanout; | ||||||||||
assert!(send <= connect); | ||||||||||
let send_count = (*send).min(leaders.len()); | ||||||||||
let connect_count = (*connect).min(leaders.len()); | ||||||||||
|
||||||||||
let send_slice = &leaders[..send_count]; | ||||||||||
let connect_slice = &leaders[..connect_count]; | ||||||||||
|
||||||||||
(send_slice, connect_slice) | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit confusing that we call it
send
andconnect
portions elsewhere, but here they are calledfanout
andconnection
portions.Maybe it would be more consistent to call it
send_leaders
andconnect_leaders
here as well?