Skip to content

Commit

Permalink
adds ContactInfo socket for TPU votes over QUIC (#3316)
Browse files Browse the repository at this point in the history
Working towards implementing TPU path for votes over QUIC.
  • Loading branch information
behzadnouri authored Oct 25, 2024
1 parent ed5e30d commit 74d2eb9
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 14 deletions.
11 changes: 8 additions & 3 deletions core/src/next_leader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
crate::banking_stage::LikeClusterInfo,
itertools::Itertools,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_gossip::{
cluster_info::ClusterInfo,
contact_info::{ContactInfo, Protocol},
},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey},
std::{net::SocketAddr, sync::RwLock},
Expand All @@ -26,7 +29,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets(
.dedup()
.filter_map(|leader_pubkey| {
cluster_info
.lookup_contact_info(&leader_pubkey, ContactInfo::tpu_vote)?
.lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(Protocol::UDP))?
.ok()
})
// dedup again since leaders could potentially share the same tpu vote socket
Expand All @@ -38,7 +41,9 @@ pub(crate) fn next_leader_tpu_vote(
cluster_info: &impl LikeClusterInfo,
poh_recorder: &RwLock<PohRecorder>,
) -> Option<(Pubkey, SocketAddr)> {
next_leader(cluster_info, poh_recorder, ContactInfo::tpu_vote)
next_leader(cluster_info, poh_recorder, |node| {
node.tpu_vote(Protocol::UDP)
})
}

pub(crate) fn next_leader<F, E>(
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ impl ClusterInfo {
"-".to_string()
},
self.addr_to_string(&ip_addr, &node.gossip().ok()),
self.addr_to_string(&ip_addr, &node.tpu_vote().ok()),
self.addr_to_string(&ip_addr, &node.tpu_vote(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tpu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()),
Expand Down
22 changes: 16 additions & 6 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ const SOCKET_TAG_TPU_FORWARDS: u8 = 6;
const SOCKET_TAG_TPU_FORWARDS_QUIC: u8 = 7;
const SOCKET_TAG_TPU_QUIC: u8 = 8;
const SOCKET_TAG_TPU_VOTE: u8 = 9;
const SOCKET_TAG_TPU_VOTE_QUIC: u8 = 12;
const SOCKET_TAG_TVU: u8 = 10;
const SOCKET_TAG_TVU_QUIC: u8 = 11;
const_assert_eq!(SOCKET_CACHE_SIZE, 12);
const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TVU_QUIC as usize + 1usize;
const_assert_eq!(SOCKET_CACHE_SIZE, 13);
const SOCKET_CACHE_SIZE: usize = SOCKET_TAG_TPU_VOTE_QUIC as usize + 1usize;

#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -240,7 +241,7 @@ impl ContactInfo {
SOCKET_TAG_TPU_FORWARDS,
SOCKET_TAG_TPU_FORWARDS_QUIC
);
get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE);
get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE, SOCKET_TAG_TPU_VOTE_QUIC);
get_socket!(tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC);

set_socket!(set_gossip, SOCKET_TAG_GOSSIP);
Expand All @@ -255,6 +256,7 @@ impl ContactInfo {
SOCKET_TAG_TPU_FORWARDS_QUIC
);
set_socket!(set_tpu_vote, SOCKET_TAG_TPU_VOTE);
set_socket!(set_tpu_vote_quic, SOCKET_TAG_TPU_VOTE_QUIC);
set_socket!(set_tvu, SOCKET_TAG_TVU);
set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC);

Expand Down Expand Up @@ -700,7 +702,8 @@ mod tests {
assert_matches!(ci.tpu(Protocol::UDP), Err(Error::InvalidPort(0)));
assert_matches!(ci.tpu_forwards(Protocol::QUIC), Err(Error::InvalidPort(0)));
assert_matches!(ci.tpu_forwards(Protocol::UDP), Err(Error::InvalidPort(0)));
assert_matches!(ci.tpu_vote(), Err(Error::InvalidPort(0)));
assert_matches!(ci.tpu_vote(Protocol::UDP), Err(Error::InvalidPort(0)));
assert_matches!(ci.tpu_vote(Protocol::QUIC), Err(Error::InvalidPort(0)));
assert_matches!(ci.tvu(Protocol::QUIC), Err(Error::InvalidPort(0)));
assert_matches!(ci.tvu(Protocol::UDP), Err(Error::InvalidPort(0)));
}
Expand Down Expand Up @@ -856,9 +859,13 @@ mod tests {
sockets.get(&SOCKET_TAG_TPU_FORWARDS_QUIC)
);
assert_eq!(
node.tpu_vote().ok().as_ref(),
node.tpu_vote(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU_VOTE)
);
assert_eq!(
node.tpu_vote(Protocol::QUIC).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU_VOTE_QUIC)
);
assert_eq!(
node.tvu(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_TVU)
Expand Down Expand Up @@ -954,7 +961,10 @@ mod tests {
old.tpu(Protocol::UDP).unwrap().port() + QUIC_PORT_OFFSET
)
);
assert_eq!(old.tpu_vote().unwrap(), node.tpu_vote().unwrap());
assert_eq!(
old.tpu_vote().unwrap(),
node.tpu_vote(Protocol::UDP).unwrap()
);
assert_eq!(
old.tvu(Protocol::QUIC).unwrap(),
node.tvu(Protocol::QUIC).unwrap()
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/legacy_contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo {
serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3528,7 +3528,7 @@ pub mod rpc_full {
.ok()
.filter(|addr| socket_addr_space.check(addr)),
tpu_vote: contact_info
.tpu_vote()
.tpu_vote(Protocol::UDP)
.ok()
.filter(|addr| socket_addr_space.check(addr)),
serve_repair: contact_info
Expand Down
2 changes: 1 addition & 1 deletion validator/src/admin_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl From<ContactInfo> for AdminRpcContactInfo {
serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
Expand Down
2 changes: 1 addition & 1 deletion validator/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn verify_reachable_ports(
udp_sockets.extend(node.sockets.tpu_forwards.iter());
udp_sockets.extend(&node.sockets.tpu_forwards_quic);
}
if verify_address(&node.info.tpu_vote().ok()) {
if verify_address(&node.info.tpu_vote(Protocol::UDP).ok()) {
udp_sockets.extend(node.sockets.tpu_vote.iter());
}
if verify_address(&node.info.tvu(Protocol::UDP).ok()) {
Expand Down

0 comments on commit 74d2eb9

Please sign in to comment.