diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index abe53b236d2e75..f52ee17b3888d2 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -115,7 +115,7 @@ pub struct Config { pub batch_send_rate_ms: u64, /// When the retry pool exceeds this max size, new transactions are dropped after their first broadcast attempt pub retry_pool_max_size: usize, - pub tpu_peers: Option>, + pub tpu_peers: Arc>>, } impl Default for Config { @@ -128,7 +128,7 @@ impl Default for Config { batch_size: DEFAULT_TRANSACTION_BATCH_SIZE, batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS, retry_pool_max_size: MAX_TRANSACTION_RETRY_POOL_SIZE, - tpu_peers: None, + tpu_peers: Default::default(), } } } @@ -568,11 +568,18 @@ impl SendTransactionService { stats: &SendTransactionServiceStats, ) { // Processing the transactions in batch - let mut addresses = config + let addresses = config .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::>()) - .unwrap_or_default(); + .read() + .unwrap() + .iter() + .cloned() + .map(|addr| (addr, 0)) + .collect::>(); + let mut addresses = addresses + .iter() + .map(|(addr, slot)| (addr, *slot)) + .collect::>(); let leader_addresses = Self::get_tpu_addresses_with_slots( tpu_address, leader_info, @@ -710,11 +717,7 @@ impl SendTransactionService { let iter = wire_transactions.chunks(config.batch_size); for chunk in iter { - let mut addresses = config - .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().collect::>()) - .unwrap_or_default(); + let mut addresses = config.tpu_peers.read().unwrap().clone(); let mut leader_info_provider = leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); let leader_addresses = Self::get_tpu_addresses( diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index b6d65e3ec4a4df..6a9bed57e4d109 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -46,6 +46,7 @@ pub struct AdminRpcRequestMetadata { pub authorized_voter_keypairs: Arc>>>, pub tower_storage: Arc, pub staked_nodes_overrides: Arc>>, + pub tpu_peers: Arc>>, pub post_init: Arc>>, pub rpc_to_plugin_manager_sender: Option>, } @@ -205,6 +206,13 @@ pub trait AdminRpc { #[rpc(meta, name = "setStakedNodesOverrides")] fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>; + #[rpc(meta, name = "setRpcSendTransactionTpuPeer")] + fn set_rpc_send_transaction_tpu_peers( + &self, + meta: Self::Metadata, + tpu_peers: Vec, + ) -> Result<()>; + #[rpc(meta, name = "contactInfo")] fn contact_info(&self, meta: Self::Metadata) -> Result; @@ -493,6 +501,18 @@ impl AdminRpc for AdminRpcImpl { Ok(()) } + fn set_rpc_send_transaction_tpu_peers( + &self, + meta: Self::Metadata, + tpu_peers: Vec, + ) -> Result<()> { + info!("Setting rpc send transaction tpu peers to {:?}", tpu_peers); + let mut meta_tpu_peers = meta.tpu_peers.write().unwrap(); + meta_tpu_peers.clear(); + meta_tpu_peers.extend(tpu_peers); + Ok(()) + } + fn contact_info(&self, meta: Self::Metadata) -> Result { meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into())) } diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 42f5a0634c0cfa..d0374f8caa8754 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -405,6 +405,7 @@ fn main() { validator_exit: genesis.validator_exit.clone(), authorized_voter_keypairs: genesis.authorized_voter_keypairs.clone(), staked_nodes_overrides: genesis.staked_nodes_overrides.clone(), + tpu_peers: Default::default(), post_init: admin_service_post_init, tower_storage: tower_storage.clone(), rpc_to_plugin_manager_sender, diff --git a/validator/src/cli.rs b/validator/src/cli.rs index f127273c8da2f3..acb7cc57e8e1ae 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1826,6 +1826,26 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { validator instance", ), ) + .subcommand( + SubCommand::with_name("rpc-send-transaction-tpu-peer") + .about("Sets peer(s) to broadcast transactions to instead of the current leader.") + .arg( + Arg::with_name("tpu-peers") + .takes_value(true) + .number_of_values(1) + .multiple(true) + .value_name("HOST:PORT") + .validator(solana_net_utils::is_host_port) + .help( + "Provide a whitespace-separated list of tpu peers in the form HOST:PORT \ + to which transactions will be broadcast", + ), + ) + .after_help( + "Note: the new tpu peers overrides only applies to the currently running \ + validator instance", + ), + ) .subcommand( SubCommand::with_name("wait-for-restart-window") .about("Monitor the validator for a good time to restart") diff --git a/validator/src/main.rs b/validator/src/main.rs index 545ecfda481d35..ce40f166fe1bef 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -708,6 +708,35 @@ pub fn main() { }); return; } + ("rpc-send-transaction-tpu-peer", Some(subcommand_matches)) => { + if !subcommand_matches.is_present("tpu-peers") { + println!("rpc-send-transaction-tpu-peer requires at least one HOST:PORT"); + exit(1); + } + + let tpu_peers = subcommand_matches.values_of("tpu-peers").unwrap(); + + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { + admin_client + .await? + .set_rpc_send_transaction_tpu_peers(tpu_peers + .into_iter() + .map(solana_net_utils::parse_host_port) + .collect::, String>>() + .unwrap_or_else(|e| { + eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}"); + exit(1); + })) + .await + }) + .unwrap_or_else(|err| { + println!("setStakedNodesOverrides request failed: {err}"); + exit(1); + }); + return; + } ("set-identity", Some(subcommand_matches)) => { let require_tower = subcommand_matches.is_present("require_tower"); @@ -1331,6 +1360,10 @@ pub fn main() { let full_api = matches.is_present("full_rpc_api"); + let tpu_peers = Arc::new(RwLock::new( + rpc_send_transaction_tpu_peers.unwrap_or_default(), + )); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -1439,7 +1472,7 @@ pub fn main() { "rpc_send_transaction_retry_pool_max_size", usize ), - tpu_peers: rpc_send_transaction_tpu_peers, + tpu_peers: tpu_peers.clone(), }, no_poh_speed_test: matches.is_present("no_poh_speed_test"), no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"), @@ -1764,6 +1797,7 @@ pub fn main() { post_init: admin_service_post_init.clone(), tower_storage: validator_config.tower_storage.clone(), staked_nodes_overrides, + tpu_peers, rpc_to_plugin_manager_sender, }, );