Skip to content

Commit

Permalink
Vote using QUIC (#3304)
Browse files Browse the repository at this point in the history
* Vote using QUIC
  • Loading branch information
lijunwangs authored Nov 8, 2024
1 parent f621667 commit 2025d85
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
34 changes: 33 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct TpuSockets {
pub broadcast: Vec<UdpSocket>,
pub transactions_quic: Vec<UdpSocket>,
pub transactions_forwards_quic: Vec<UdpSocket>,
pub vote_quic: Vec<UdpSocket>,
}

pub struct Tpu {
Expand All @@ -78,6 +79,7 @@ pub struct Tpu {
tpu_entry_notifier: Option<TpuEntryNotifier>,
staked_nodes_updater_service: StakedNodesUpdaterService,
tracer_thread_hdl: TracerThread,
tpu_vote_quic_t: thread::JoinHandle<()>,
}

impl Tpu {
Expand Down Expand Up @@ -126,6 +128,7 @@ impl Tpu {
broadcast: broadcast_sockets,
transactions_quic: transactions_quic_sockets,
transactions_forwards_quic: transactions_forwards_quic_sockets,
vote_quic: tpu_vote_quic_sockets,
} = sockets;

let (packet_sender, packet_receiver) = unbounded();
Expand Down Expand Up @@ -155,6 +158,32 @@ impl Tpu {

let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();

// Streamer for Votes:
let SpawnServerResult {
endpoints: _,
thread: tpu_vote_quic_t,
key_updater: vote_streamer_key_updater,
} = spawn_server_multi(
"solQuicTVo",
"quic_streamer_tpu_vote",
tpu_vote_quic_sockets,
keypair,
vote_packet_sender.clone(),
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: 1,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0,
..QuicServerParams::default()
},
)
.unwrap();

// Streamer for TPU
let SpawnServerResult {
endpoints: _,
thread: tpu_quic_t,
Expand All @@ -176,6 +205,7 @@ impl Tpu {
)
.unwrap();

// Streamer for TPU forward
let SpawnServerResult {
endpoints: _,
thread: tpu_forwards_quic_t,
Expand Down Expand Up @@ -289,8 +319,9 @@ impl Tpu {
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
tpu_vote_quic_t,
},
vec![key_updater, forwards_key_updater],
vec![key_updater, forwards_key_updater, vote_streamer_key_updater],
)
}

Expand All @@ -304,6 +335,7 @@ impl Tpu {
self.staked_nodes_updater_service.join(),
self.tpu_quic_t.join(),
self.tpu_forwards_quic_t.join(),
self.tpu_vote_quic_t.join(),
];
let broadcast_result = self.broadcast_stage.join();
for result in results {
Expand Down
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,7 @@ impl Validator {
broadcast: node.sockets.broadcast,
transactions_quic: node.sockets.tpu_quic,
transactions_forwards_quic: node.sockets.tpu_forwards_quic,
vote_quic: node.sockets.tpu_vote_quic,
},
&rpc_subscriptions,
transaction_status_sender,
Expand Down
43 changes: 38 additions & 5 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2585,6 +2585,7 @@ pub struct Sockets {
pub ancestor_hashes_requests_quic: UdpSocket,
pub tpu_quic: Vec<UdpSocket>,
pub tpu_forwards_quic: Vec<UdpSocket>,
pub tpu_vote_quic: Vec<UdpSocket>,
}

pub struct NodeConfig {
Expand Down Expand Up @@ -2647,13 +2648,19 @@ impl Node {
localhost_ip_addr,
port_range,
QUIC_PORT_OFFSET,
udp_config,
udp_config.clone(),
quic_config.clone(),
)
.unwrap();
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap();
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone())
.unwrap();
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tpu_vote_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();

let tpu_vote_quic =
bind_more_with_config(tpu_vote_quic, num_quic_endpoints, quic_config).unwrap();

let repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
Expand Down Expand Up @@ -2690,6 +2697,11 @@ impl Node {
"TPU-forwards"
);
set_socket!(set_tpu_vote, tpu_vote.local_addr().unwrap(), "TPU-vote");
set_socket!(
set_tpu_vote_quic,
tpu_vote_quic[0].local_addr().unwrap(),
"TPU-vote QUIC"
);
set_socket!(set_rpc, rpc_addr, "RPC");
set_socket!(set_rpc_pubsub, rpc_pubsub_addr, "RPC-pubsub");
set_socket!(
Expand Down Expand Up @@ -2722,6 +2734,7 @@ impl Node {
ancestor_hashes_requests_quic,
tpu_quic,
tpu_forwards_quic,
tpu_vote_quic,
},
}
}
Expand Down Expand Up @@ -2776,7 +2789,7 @@ impl Node {
quic_config.clone(),
)
.unwrap();
let tpu_quic =
let tpu_quic: Vec<UdpSocket> =
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap();
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
Expand All @@ -2787,9 +2800,17 @@ impl Node {
quic_config.clone(),
)
.unwrap();
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap();
let tpu_forwards_quic = bind_more_with_config(
tpu_forwards_quic,
DEFAULT_QUIC_ENDPOINTS,
quic_config.clone(),
)
.unwrap();
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range);
let tpu_vote_quic: Vec<UdpSocket> =
bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap();

let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (_, repair_quic) = Self::bind(bind_ip_addr, port_range);
Expand Down Expand Up @@ -2830,6 +2851,8 @@ impl Node {
serve_repair_quic_port,
"serve-repair QUIC"
);
set_socket!(set_tpu_vote_quic, tpu_vote_quic_port, "TPU-vote QUIC");

trace!("new ContactInfo: {:?}", info);

Node {
Expand All @@ -2852,6 +2875,7 @@ impl Node {
ancestor_hashes_requests_quic,
tpu_quic,
tpu_forwards_quic,
tpu_vote_quic,
},
}
}
Expand Down Expand Up @@ -2907,6 +2931,12 @@ impl Node {
let (tpu_vote_port, tpu_vote_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");

let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range);

let tpu_vote_quic =
bind_more_with_config(tpu_vote_quic, num_quic_endpoints.get(), quic_config.clone())
.unwrap();

let (_, retransmit_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind");

Expand Down Expand Up @@ -2940,6 +2970,8 @@ impl Node {
info.set_serve_repair((addr, serve_repair_port)).unwrap();
info.set_serve_repair_quic((addr, serve_repair_quic_port))
.unwrap();
info.set_tpu_vote_quic((addr, tpu_vote_quic_port)).unwrap();

trace!("new ContactInfo: {:?}", info);

Node {
Expand All @@ -2962,6 +2994,7 @@ impl Node {
ancestor_hashes_requests_quic,
tpu_quic,
tpu_forwards_quic,
tpu_vote_quic,
},
}
}
Expand Down

0 comments on commit 2025d85

Please sign in to comment.