Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Nov 7, 2024
1 parent d356039 commit 448d476
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
4 changes: 2 additions & 2 deletions tpu-client-next/src/send_transaction_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl fmt::Display for SendTransactionStats {
/// For tests it is useful to be have PartialEq but we cannot have it on top of
/// atomics. This macro creates a structure with the same attributes but of type
/// u64.
macro_rules! define_non_atomic_struct {
macro_rules! define_non_atomic_struct_for {
($name:ident, $atomic_name:ident, {$($field:ident),* $(,)?}) => {
#[derive(Debug, Default, PartialEq)]
pub struct $name {
Expand All @@ -186,7 +186,7 @@ macro_rules! define_non_atomic_struct {
}

// Define the non-atomic struct and the `to_non_atomic` conversion method
define_non_atomic_struct!(
define_non_atomic_struct_for!(
SendTransactionStatsNonAtomic,
SendTransactionStats,
{
Expand Down
28 changes: 16 additions & 12 deletions tpu-client-next/src/workers_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use {
lru::LruCache,
std::net::SocketAddr,
thiserror::Error,
tokio::{sync::mpsc, task::JoinHandle},
tokio::{
sync::mpsc::{self, error::TrySendError},
task::JoinHandle,
},
tokio_util::sync::CancellationToken,
};

Expand All @@ -35,8 +38,8 @@ impl WorkerInfo {

fn try_send_transactions(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> {
self.sender.try_send(txs_batch).map_err(|err| match err {
mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel,
mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped,
TrySendError::Full(_) => WorkersCacheError::FullChannel,
TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped,
})?;
Ok(())
}
Expand Down Expand Up @@ -180,13 +183,14 @@ impl ShutdownWorker {
}

pub(crate) fn maybe_shutdown_worker(worker: Option<ShutdownWorker>) {
if let Some(worker) = worker {
tokio::spawn(async move {
let leader = worker.leader();
let res = worker.shutdown().await;
if let Err(err) = res {
debug!("Error while shutting down worker for {leader}: {err}");
}
});
}
let Some(worker) = worker else {
return;
};
tokio::spawn(async move {
let leader = worker.leader();
let res = worker.shutdown().await;
if let Err(err) = res {
debug!("Error while shutting down worker for {leader}: {err}");
}
});
}
26 changes: 15 additions & 11 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use {
leader_updater::create_leader_updater,
send_transaction_stats::SendTransactionStatsNonAtomic,
transaction_batch::TransactionBatch,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats,
SendTransactionStatsPerAddr,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr,
},
std::{
collections::HashMap,
Expand All @@ -48,6 +47,11 @@ fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedule
stake_identity: validator_identity,
num_connections: 1,
skip_check_transaction_age: false,
// At the moment we have only one strategy to send transactions: we try
// to put to worker channel transaction batch and in case of failure
// just drop it. This requires to use large channels here. In the
// future, we are planning to add an option to send with backpressure at
// the speed of fastest leader.
worker_channel_size: 100,
max_reconnect_attempts: 4,
lookahead_slots: 1,
Expand Down Expand Up @@ -92,15 +96,15 @@ async fn join_scheduler(
scheduler_handle: JoinHandle<
Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>,
>,
) -> Arc<SendTransactionStats> {
) -> SendTransactionStatsNonAtomic {
let stats_per_ip = scheduler_handle
.await
.unwrap()
.expect("Scheduler should stop successfully.");
stats_per_ip
.get(&IpAddr::from_str("127.0.0.1").unwrap())
.expect("setup_connection_worker_scheduler() connected to a leader at 127.0.0.1")
.clone()
.to_non_atomic()
}

// Specify the pessimistic time to finish generation and result checks.
Expand Down Expand Up @@ -239,7 +243,7 @@ async fn test_basic_transactions_sending() {

// Stop sending
tx_sender_shutdown.await;
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();
let localhost_stats = join_scheduler(scheduler_handle).await;
assert_eq!(
localhost_stats,
SendTransactionStatsNonAtomic {
Expand Down Expand Up @@ -316,7 +320,7 @@ async fn test_connection_denied_until_allowed() {

// Wait for the exchange to finish.
tx_sender_shutdown.await;
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();
let localhost_stats = join_scheduler(scheduler_handle).await;
// in case of pruning, server closes the connection with code 1 and error
// message b"dropped". This might lead to connection error
// (ApplicationClosed::ApplicationClose) or to stream error
Expand Down Expand Up @@ -375,7 +379,7 @@ async fn test_connection_pruned_and_reopened() {

// Wait for the exchange to finish.
tx_sender_shutdown.await;
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();
let localhost_stats = join_scheduler(scheduler_handle).await;
// in case of pruning, server closes the connection with code 1 and error
// message b"dropped". This might lead to connection error
// (ApplicationClosed::ApplicationClose) or to stream error
Expand Down Expand Up @@ -436,7 +440,7 @@ async fn test_staked_connection() {

// Wait for the exchange to finish.
tx_sender_shutdown.await;
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();
let localhost_stats = join_scheduler(scheduler_handle).await;
assert_eq!(
localhost_stats,
SendTransactionStatsNonAtomic {
Expand Down Expand Up @@ -483,7 +487,7 @@ async fn test_connection_throttling() {

// Stop sending
tx_sender_shutdown.await;
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();
let localhost_stats = join_scheduler(scheduler_handle).await;
assert_eq!(
localhost_stats,
SendTransactionStatsNonAtomic {
Expand Down Expand Up @@ -584,7 +588,7 @@ async fn test_rate_limiting() {

// And the scheduler.
scheduler_cancel.cancel();
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();
let localhost_stats = join_scheduler(scheduler_handle).await;

// We do not expect to see any errors, as the connection is in the pending state still, when we
// do the shutdown. If we increase the time we wait in `count_received_packets_for`, we would
Expand Down Expand Up @@ -648,7 +652,7 @@ async fn test_rate_limiting_establish_connection() {

// And the scheduler.
scheduler_cancel.cancel();
let mut localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();
let mut localhost_stats = join_scheduler(scheduler_handle).await;
assert!(
localhost_stats.connection_error_timed_out > 0,
"As the quinn timeout is below 1 minute, a few connections will fail to connect during \
Expand Down

0 comments on commit 448d476

Please sign in to comment.