diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 6f3e8889423bf4..434d580732e8f5 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -14,7 +14,7 @@ use { clock::{Slot, DEFAULT_MS_PER_SLOT}, epoch_schedule::EpochSchedule, genesis_config::ClusterType, - packet::{Meta, PACKET_DATA_SIZE}, + packet::{Packet, PACKET_DATA_SIZE}, pubkey::Pubkey, }, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, @@ -350,9 +350,6 @@ pub(crate) fn receive_quic_datagrams( }; let mut packet_batch = PacketBatch::new_with_recycler(&recycler, PACKETS_PER_BATCH, "receive_quic_datagrams"); - unsafe { - packet_batch.set_len(PACKETS_PER_BATCH); - }; let deadline = Instant::now() + PACKET_COALESCE_DURATION; let entries = std::iter::once(entry).chain( std::iter::repeat_with(|| quic_datagrams_receiver.recv_deadline(deadline).ok()) @@ -360,19 +357,16 @@ pub(crate) fn receive_quic_datagrams( ); let size = entries .filter(|(_, _, bytes)| bytes.len() <= PACKET_DATA_SIZE) - .zip(packet_batch.iter_mut()) + .zip(packet_batch.spare_capacity_mut().iter_mut()) .map(|((_pubkey, addr, bytes), packet)| { - *packet.meta_mut() = Meta { - size: bytes.len(), - addr: addr.ip(), - port: addr.port(), - flags, - }; - packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes); + Packet::init_packet_from_bytes(packet, &bytes, Some(&addr)).unwrap(); + // SAFETY: Packet::init_packet_from_bytes() just initialized the packet + unsafe { packet.assume_init_mut().meta_mut().set_flags(flags) }; }) .count(); if size > 0 { - packet_batch.truncate(size); + // SAFETY: By now, size packets have been initialized + unsafe { packet_batch.set_len(size) }; if sender.send(packet_batch).is_err() { return; // The receiver end of the channel is disconnected. } diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index 7188f809c61694..0f2a0f98d8ab38 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -362,6 +362,11 @@ impl Meta { .set(PacketFlags::FROM_STAKED_NODE, from_staked_node); } + #[inline] + pub fn set_flags(&mut self, flags: PacketFlags) { + self.flags = flags; + } + #[inline] pub fn discard(&self) -> bool { self.flags.contains(PacketFlags::DISCARD)