diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index b06ab0c43fc6a0..2006e3ac4bd5a4 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -8,11 +8,14 @@ use { #[cfg(target_os = "linux")] use { itertools::izip, - libc::{iovec, mmsghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE}, + libc::{ + iovec, mmsghdr, msghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE, + }, std::{ - mem, + mem::{self, MaybeUninit}, net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, os::unix::io::AsRawFd, + ptr, }, }; @@ -78,16 +81,19 @@ fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option io::Result { + // Should never hit this, but bail if the caller didn't provide any Packets + // to receive into + if packets.is_empty() { + return Ok(0); + } // Assert that there are no leftovers in packets. debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default())); const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::(); - let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; - let iovs = mem::MaybeUninit::<[iovec; NUM_RCVMMSGS]>::uninit(); - let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { iovs.assume_init() }; - let mut addrs: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; + let mut iovs = [MaybeUninit::uninit(); NUM_RCVMMSGS]; + let mut addrs = [MaybeUninit::zeroed(); NUM_RCVMMSGS]; + let mut hdrs = [MaybeUninit::uninit(); NUM_RCVMMSGS]; let sock_fd = sock.as_raw_fd(); let count = cmp::min(iovs.len(), packets.len()); @@ -96,15 +102,25 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result() as *mut _, + msg_controllen: 0, + msg_flags: 0, + }, + }); } + let mut ts = libc::timespec { tv_sec: 1, tv_nsec: 0, @@ -114,7 +130,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result io::Result] to [T] and letting `Drop` do the work + // for us when these items go out of scope at the end of the function + unsafe { + iov.assume_init_drop(); + addr.assume_init_drop(); + hdr.assume_init_drop(); + } + } + Ok(nrecv) }