diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 1db552dd108e7b..091a5901c2311e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -64,6 +64,7 @@ pub struct TpuSockets { pub broadcast: Vec, pub transactions_quic: Vec, pub transactions_forwards_quic: Vec, + pub vote_quic: Vec, } pub struct Tpu { @@ -78,6 +79,7 @@ pub struct Tpu { tpu_entry_notifier: Option, staked_nodes_updater_service: StakedNodesUpdaterService, tracer_thread_hdl: TracerThread, + tpu_vote_quic_t: thread::JoinHandle<()>, } impl Tpu { @@ -126,6 +128,7 @@ impl Tpu { broadcast: broadcast_sockets, transactions_quic: transactions_quic_sockets, transactions_forwards_quic: transactions_forwards_quic_sockets, + vote_quic: tpu_vote_quic_sockets, } = sockets; let (packet_sender, packet_receiver) = unbounded(); @@ -155,6 +158,32 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + // Streamer for Votes: + let SpawnServerResult { + endpoints: _, + thread: tpu_vote_quic_t, + key_updater: vote_streamer_key_updater, + } = spawn_server_multi( + "solQuicTVo", + "quic_streamer_tpu_vote", + tpu_vote_quic_sockets, + keypair, + vote_packet_sender.clone(), + exit.clone(), + staked_nodes.clone(), + QuicServerParams { + max_connections_per_peer: 1, + max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, + coalesce: tpu_coalesce, + max_staked_connections: MAX_STAKED_CONNECTIONS + .saturating_add(MAX_UNSTAKED_CONNECTIONS), + max_unstaked_connections: 0, + ..QuicServerParams::default() + }, + ) + .unwrap(); + + // Streamer for TPU let SpawnServerResult { endpoints: _, thread: tpu_quic_t, @@ -176,6 +205,7 @@ impl Tpu { ) .unwrap(); + // Streamer for TPU forward let SpawnServerResult { endpoints: _, thread: tpu_forwards_quic_t, @@ -289,8 +319,9 @@ impl Tpu { tpu_entry_notifier, staked_nodes_updater_service, tracer_thread_hdl, + tpu_vote_quic_t, }, - vec![key_updater, forwards_key_updater], + vec![key_updater, forwards_key_updater, vote_streamer_key_updater], ) } @@ -304,6 +335,7 @@ impl Tpu { self.staked_nodes_updater_service.join(), self.tpu_quic_t.join(), self.tpu_forwards_quic_t.join(), + self.tpu_vote_quic_t.join(), ]; let broadcast_result = self.broadcast_stage.join(); for result in results { diff --git a/core/src/validator.rs b/core/src/validator.rs index 612bf68b0bc0c8..8986fea5f005d2 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1460,6 +1460,7 @@ impl Validator { broadcast: node.sockets.broadcast, transactions_quic: node.sockets.tpu_quic, transactions_forwards_quic: node.sockets.tpu_forwards_quic, + vote_quic: node.sockets.tpu_vote_quic, }, &rpc_subscriptions, transaction_status_sender, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index abed4f23bcd27b..23288a8ae11253 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2585,6 +2585,7 @@ pub struct Sockets { pub ancestor_hashes_requests_quic: UdpSocket, pub tpu_quic: Vec, pub tpu_forwards_quic: Vec, + pub tpu_vote_quic: Vec, } pub struct NodeConfig { @@ -2647,13 +2648,19 @@ impl Node { localhost_ip_addr, port_range, QUIC_PORT_OFFSET, - udp_config, + udp_config.clone(), quic_config.clone(), ) .unwrap(); let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap(); + bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone()) + .unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let tpu_vote_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); + + let tpu_vote_quic = + bind_more_with_config(tpu_vote_quic, num_quic_endpoints, quic_config).unwrap(); + let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -2690,6 +2697,11 @@ impl Node { "TPU-forwards" ); set_socket!(set_tpu_vote, tpu_vote.local_addr().unwrap(), "TPU-vote"); + set_socket!( + set_tpu_vote_quic, + tpu_vote_quic[0].local_addr().unwrap(), + "TPU-vote QUIC" + ); set_socket!(set_rpc, rpc_addr, "RPC"); set_socket!(set_rpc_pubsub, rpc_pubsub_addr, "RPC-pubsub"); set_socket!( @@ -2722,6 +2734,7 @@ impl Node { ancestor_hashes_requests_quic, tpu_quic, tpu_forwards_quic, + tpu_vote_quic, }, } } @@ -2776,7 +2789,7 @@ impl Node { quic_config.clone(), ) .unwrap(); - let tpu_quic = + let tpu_quic: Vec = bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( @@ -2787,9 +2800,17 @@ impl Node { quic_config.clone(), ) .unwrap(); - let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap(); + let tpu_forwards_quic = bind_more_with_config( + tpu_forwards_quic, + DEFAULT_QUIC_ENDPOINTS, + quic_config.clone(), + ) + .unwrap(); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); + let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range); + let tpu_vote_quic: Vec = + bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap(); + let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); let (_, repair_quic) = Self::bind(bind_ip_addr, port_range); @@ -2830,6 +2851,8 @@ impl Node { serve_repair_quic_port, "serve-repair QUIC" ); + set_socket!(set_tpu_vote_quic, tpu_vote_quic_port, "TPU-vote QUIC"); + trace!("new ContactInfo: {:?}", info); Node { @@ -2852,6 +2875,7 @@ impl Node { ancestor_hashes_requests_quic, tpu_quic, tpu_forwards_quic, + tpu_vote_quic, }, } } @@ -2907,6 +2931,12 @@ impl Node { let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); + let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range); + + let tpu_vote_quic = + bind_more_with_config(tpu_vote_quic, num_quic_endpoints.get(), quic_config.clone()) + .unwrap(); + let (_, retransmit_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind"); @@ -2940,6 +2970,8 @@ impl Node { info.set_serve_repair((addr, serve_repair_port)).unwrap(); info.set_serve_repair_quic((addr, serve_repair_quic_port)) .unwrap(); + info.set_tpu_vote_quic((addr, tpu_vote_quic_port)).unwrap(); + trace!("new ContactInfo: {:?}", info); Node { @@ -2962,6 +2994,7 @@ impl Node { ancestor_hashes_requests_quic, tpu_quic, tpu_forwards_quic, + tpu_vote_quic, }, } }