Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rpc-send-transaction-tpu-peer subcommand. #35518

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub struct Config {
pub batch_send_rate_ms: u64,
/// When the retry pool exceeds this max size, new transactions are dropped after their first broadcast attempt
pub retry_pool_max_size: usize,
pub tpu_peers: Option<Vec<SocketAddr>>,
pub tpu_peers: Arc<RwLock<Vec<SocketAddr>>>,
}

impl Default for Config {
Expand All @@ -128,7 +128,7 @@ impl Default for Config {
batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
retry_pool_max_size: MAX_TRANSACTION_RETRY_POOL_SIZE,
tpu_peers: None,
tpu_peers: Default::default(),
}
}
}
Expand Down Expand Up @@ -568,11 +568,18 @@ impl SendTransactionService {
stats: &SendTransactionServiceStats,
) {
// Processing the transactions in batch
let mut addresses = config
let addresses = config
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().map(|a| (a, 0)).collect::<Vec<_>>())
.unwrap_or_default();
.read()
.unwrap()
.iter()
.cloned()
.map(|addr| (addr, 0))
.collect::<Vec<_>>();
let mut addresses = addresses
.iter()
.map(|(addr, slot)| (addr, *slot))
.collect::<Vec<(&SocketAddr, Slot)>>();
let leader_addresses = Self::get_tpu_addresses_with_slots(
tpu_address,
leader_info,
Expand Down Expand Up @@ -710,11 +717,7 @@ impl SendTransactionService {

let iter = wire_transactions.chunks(config.batch_size);
for chunk in iter {
let mut addresses = config
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().collect::<Vec<_>>())
.unwrap_or_default();
let mut addresses = config.tpu_peers.read().unwrap().clone();
let mut leader_info_provider = leader_info_provider.lock().unwrap();
let leader_info = leader_info_provider.get_leader_info();
let leader_addresses = Self::get_tpu_addresses(
Expand Down
20 changes: 20 additions & 0 deletions validator/src/admin_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct AdminRpcRequestMetadata {
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
pub tower_storage: Arc<dyn TowerStorage>,
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
pub tpu_peers: Arc<RwLock<Vec<SocketAddr>>>,
pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
}
Expand Down Expand Up @@ -205,6 +206,13 @@ pub trait AdminRpc {
#[rpc(meta, name = "setStakedNodesOverrides")]
fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;

#[rpc(meta, name = "setRpcSendTransactionTpuPeer")]
fn set_rpc_send_transaction_tpu_peers(
&self,
meta: Self::Metadata,
tpu_peers: Vec<SocketAddr>,
) -> Result<()>;

#[rpc(meta, name = "contactInfo")]
fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;

Expand Down Expand Up @@ -493,6 +501,18 @@ impl AdminRpc for AdminRpcImpl {
Ok(())
}

fn set_rpc_send_transaction_tpu_peers(
&self,
meta: Self::Metadata,
tpu_peers: Vec<SocketAddr>,
) -> Result<()> {
info!("Setting rpc send transaction tpu peers to {:?}", tpu_peers);
let mut meta_tpu_peers = meta.tpu_peers.write().unwrap();
meta_tpu_peers.clear();
meta_tpu_peers.extend(tpu_peers);
Ok(())
}

fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
}
Expand Down
1 change: 1 addition & 0 deletions validator/src/bin/solana-test-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ fn main() {
validator_exit: genesis.validator_exit.clone(),
authorized_voter_keypairs: genesis.authorized_voter_keypairs.clone(),
staked_nodes_overrides: genesis.staked_nodes_overrides.clone(),
tpu_peers: Default::default(),
post_init: admin_service_post_init,
tower_storage: tower_storage.clone(),
rpc_to_plugin_manager_sender,
Expand Down
20 changes: 20 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,26 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
validator instance",
),
)
.subcommand(
SubCommand::with_name("rpc-send-transaction-tpu-peer")
.about("Sets peer(s) to broadcast transactions to instead of the current leader.")
.arg(
Arg::with_name("tpu-peers")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.value_name("HOST:PORT")
.validator(solana_net_utils::is_host_port)
.help(
"Provide a whitespace-separated list of tpu peers in the form HOST:PORT \
to which transactions will be broadcast",
),
)
.after_help(
"Note: the new tpu peers overrides only applies to the currently running \
validator instance",
),
)
.subcommand(
SubCommand::with_name("wait-for-restart-window")
.about("Monitor the validator for a good time to restart")
Expand Down
36 changes: 35 additions & 1 deletion validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,35 @@ pub fn main() {
});
return;
}
("rpc-send-transaction-tpu-peer", Some(subcommand_matches)) => {
if !subcommand_matches.is_present("tpu-peers") {
println!("rpc-send-transaction-tpu-peer requires at least one HOST:PORT");
exit(1);
}

let tpu_peers = subcommand_matches.values_of("tpu-peers").unwrap();

let admin_client = admin_rpc_service::connect(&ledger_path);
admin_rpc_service::runtime()
.block_on(async move {
admin_client
.await?
.set_rpc_send_transaction_tpu_peers(tpu_peers
.into_iter()
.map(solana_net_utils::parse_host_port)
.collect::<Result<Vec<SocketAddr>, String>>()
.unwrap_or_else(|e| {
eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}");
exit(1);
}))
.await
})
.unwrap_or_else(|err| {
println!("setStakedNodesOverrides request failed: {err}");
exit(1);
});
return;
}
("set-identity", Some(subcommand_matches)) => {
let require_tower = subcommand_matches.is_present("require_tower");

Expand Down Expand Up @@ -1331,6 +1360,10 @@ pub fn main() {

let full_api = matches.is_present("full_rpc_api");

let tpu_peers = Arc::new(RwLock::new(
rpc_send_transaction_tpu_peers.unwrap_or_default(),
));

let mut validator_config = ValidatorConfig {
require_tower: matches.is_present("require_tower"),
tower_storage,
Expand Down Expand Up @@ -1439,7 +1472,7 @@ pub fn main() {
"rpc_send_transaction_retry_pool_max_size",
usize
),
tpu_peers: rpc_send_transaction_tpu_peers,
tpu_peers: tpu_peers.clone(),
},
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
Expand Down Expand Up @@ -1764,6 +1797,7 @@ pub fn main() {
post_init: admin_service_post_init.clone(),
tower_storage: validator_config.tower_storage.clone(),
staked_nodes_overrides,
tpu_peers,
rpc_to_plugin_manager_sender,
},
);
Expand Down
Loading