diff --git a/Cargo.toml b/Cargo.toml index 0cb16194d5..dcbe5e7eac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ libc = "0.2.139" libloading = "0.8" log = "0.4.17" lz4_flex = "0.11" -nix = { version = "0.27", features = ["fs"] } +nix = { version = "0.27", features = ["fs", "net", "socket"] } num_cpus = "1.15.0" ordered-float = "4.1.1" panic-message = "0.3.0" diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index d44686ff50..1b888d584e 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -18,7 +18,7 @@ use core::{ hash::{Hash, Hasher}, ops::Deref, }; -use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::core::{EndPoint, Locator, Priority}; use zenoh_result::ZResult; pub type LinkManagerUnicast = Arc; @@ -45,6 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync { fn get_dst(&self) -> &Locator; fn is_reliable(&self) -> bool; fn is_streamed(&self) -> bool; + fn set_priority(&self, priority: Priority) -> ZResult<()>; async fn write(&self, buffer: &[u8]) -> ZResult; async fn write_all(&self, buffer: &[u8]) -> ZResult<()>; async fn read(&self, buffer: &mut [u8]) -> ZResult; diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 2b1c59ad23..256acd8d6d 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -32,6 +32,7 @@ use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; +use zenoh_config::Priority; use zenoh_core::{zasynclock, zread, zwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, @@ -152,6 +153,11 @@ impl LinkUnicastTrait for LinkUnicastQuic { fn is_streamed(&self) -> bool { true } + + fn set_priority(&self, _priority: Priority) -> ZResult<()> { + // no-op + Ok(()) + } } impl Drop for LinkUnicastQuic { diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 6ec8f8f279..f8a47d7880 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -23,6 +23,7 @@ use std::fmt; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; +use zenoh_config::Priority; use zenoh_core::{zasynclock, zread, zwrite}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, @@ -190,6 +191,11 @@ impl LinkUnicastTrait for LinkUnicastSerial { fn is_streamed(&self) -> bool { false } + + fn set_priority(&self, _priority: Priority) -> ZResult<()> { + // no-op + Ok(()) + } } impl fmt::Display for LinkUnicastSerial { diff --git a/io/zenoh-links/zenoh-link-tcp/Cargo.toml b/io/zenoh-links/zenoh-link-tcp/Cargo.toml index 9c4725ff03..742c24a804 100644 --- a/io/zenoh-links/zenoh-link-tcp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tcp/Cargo.toml @@ -28,6 +28,7 @@ description = "Internal crate for zenoh." async-std = { workspace = true } async-trait = { workspace = true } log = { workspace = true } +nix.workspace = true zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3960b91228..a8adce2ee1 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::convert::TryInto; use std::fmt; use std::net::{IpAddr, Shutdown}; +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -27,7 +28,7 @@ use zenoh_core::{zread, zwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; -use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::core::{EndPoint, Locator, Priority}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use zenoh_sync::Signal; @@ -153,6 +154,14 @@ impl LinkUnicastTrait for LinkUnicastTcp { fn is_streamed(&self) -> bool { true } + + fn set_priority(&self, priority: Priority) -> ZResult<()> { + use nix::sys::socket::sockopt::Priority as O_PRIORITY; + let fd = unsafe { OwnedFd::from_raw_fd(self.socket.as_raw_fd()) }; + let priority = priority as u8 as i32; + nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?; + Ok(()) + } } impl Drop for LinkUnicastTcp { diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 7761195e4b..112bf1af74 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -46,6 +46,7 @@ use webpki::{ anchor_from_trusted_cert, types::{CertificateDer, TrustAnchor}, }; +use zenoh_config::Priority; use zenoh_core::{zasynclock, zread, zwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, @@ -204,6 +205,11 @@ impl LinkUnicastTrait for LinkUnicastTls { fn is_streamed(&self) -> bool { true } + + fn set_priority(&self, _priority: Priority) -> ZResult<()> { + // no-op + Ok(()) + } } impl Drop for LinkUnicastTls { diff --git a/io/zenoh-links/zenoh-link-udp/Cargo.toml b/io/zenoh-links/zenoh-link-udp/Cargo.toml index aae1b01f54..28cc7fb3e6 100644 --- a/io/zenoh-links/zenoh-link-udp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-udp/Cargo.toml @@ -28,6 +28,7 @@ description = "Internal crate for zenoh." async-std = { workspace = true } async-trait = { workspace = true } log = { workspace = true } +nix.workspace = true socket2 = { workspace = true } zenoh-buffers = { workspace = true } zenoh-collections = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 585442ed71..5819f83a25 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -24,6 +24,7 @@ use async_trait::async_trait; use std::collections::HashMap; use std::fmt; use std::net::IpAddr; +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::Duration; @@ -32,7 +33,7 @@ use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; -use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::core::{EndPoint, Locator, Priority}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use zenoh_sync::Mvar; use zenoh_sync::Signal; @@ -63,6 +64,14 @@ impl LinkUnicastUdpConnected { async fn close(&self) -> ZResult<()> { Ok(()) } + + fn set_priority(&self, priority: Priority) -> ZResult<()> { + use nix::sys::socket::sockopt::Priority as O_PRIORITY; + let fd = unsafe { OwnedFd::from_raw_fd(self.socket.as_raw_fd()) }; + let priority = priority as u8 as i32; + nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?; + Ok(()) + } } struct LinkUnicastUdpUnconnected { @@ -116,6 +125,19 @@ impl LinkUnicastUdpUnconnected { zlock!(self.links).remove(&(src_addr, dst_addr)); Ok(()) } + + fn set_priority(&self, priority: Priority) -> ZResult<()> { + use nix::sys::socket::sockopt::Priority as O_PRIORITY; + + let socket = match self.socket.upgrade() { + Some(socket) => socket, + None => bail!("UDP listener has been dropped"), + }; + let fd = unsafe { OwnedFd::from_raw_fd(socket.as_raw_fd()) }; + let priority = priority as u8 as i32; + nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?; + Ok(()) + } } enum LinkUnicastUdpVariant { @@ -217,6 +239,13 @@ impl LinkUnicastTrait for LinkUnicastUdp { fn is_streamed(&self) -> bool { false } + + fn set_priority(&self, priority: Priority) -> ZResult<()> { + match &self.variant { + LinkUnicastUdpVariant::Connected(link) => link.set_priority(priority), + LinkUnicastUdpVariant::Unconnected(link) => link.set_priority(priority), + } + } } impl fmt::Display for LinkUnicastUdp { diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 156698d195..1d2d3ce085 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -29,6 +29,7 @@ use std::fs::{File, OpenOptions}; use std::io::{Read, Write}; use std::os::unix::fs::OpenOptionsExt; use std::sync::Arc; +use zenoh_config::Priority; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_protocol::core::{EndPoint, Locator}; @@ -501,6 +502,11 @@ impl LinkUnicastTrait for UnicastPipe { fn is_streamed(&self) -> bool { true } + + fn set_priority(&self, _priority: Priority) -> ZResult<()> { + // TODO: no-op + Ok(()) + } } impl fmt::Display for UnicastPipe { diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index e4d751344f..6f0bff5bf1 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -24,6 +24,7 @@ use std::collections::HashMap; use std::fmt; use std::fs::remove_file; use std::net::Shutdown; +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::os::unix::io::RawFd; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -33,7 +34,7 @@ use zenoh_core::{zread, zwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; -use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::core::{EndPoint, Locator, Priority}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::Signal; @@ -124,6 +125,14 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { fn is_streamed(&self) -> bool { true } + + fn set_priority(&self, priority: Priority) -> ZResult<()> { + use nix::sys::socket::sockopt::Priority as O_PRIORITY; + let fd = unsafe { OwnedFd::from_raw_fd(self.socket.as_raw_fd()) }; + let priority = priority as u8 as i32; + nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?; + Ok(()) + } } impl Drop for LinkUnicastUnixSocketStream { diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 2238dcb4a6..9f0a8c732d 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -35,6 +35,7 @@ use zenoh_core::{zasynclock, zread, zwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; +use zenoh_protocol::core::Priority; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, ZResult}; use zenoh_sync::Signal; @@ -216,6 +217,11 @@ impl LinkUnicastTrait for LinkUnicastWs { fn is_streamed(&self) -> bool { false } + + fn set_priority(&self, _priority: Priority) -> ZResult<()> { + // no-op + Ok(()) + } } impl Drop for LinkUnicastWs { diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index bd756d6396..ee50452c7e 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -15,6 +15,7 @@ use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch use std::fmt; use std::sync::Arc; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; +use zenoh_config::Priority; use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; use zenoh_protocol::transport::{BatchSize, Close, OpenAck, TransportMessage}; @@ -144,7 +145,11 @@ pub(crate) struct TransportLinkUnicastTx { } impl TransportLinkUnicastTx { - pub(crate) async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> { + pub(crate) async fn send_batch( + &mut self, + batch: &mut WBatch, + priority: Option, + ) -> ZResult<()> { const ERR: &str = "Write error on link: "; // log::trace!("WBatch: {:?}", batch); @@ -165,6 +170,10 @@ impl TransportLinkUnicastTx { // log::trace!("WBytes: {:02x?}", bytes); // Send the message on the link + + if let Some(priority) = priority { + self.inner.link.set_priority(priority)?; + } self.inner.link.write_all(bytes).await?; Ok(()) @@ -177,7 +186,7 @@ impl TransportLinkUnicastTx { let mut batch = WBatch::new(self.inner.config.batch); batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; let len = batch.len() as usize; - self.send_batch(&mut batch).await?; + self.send_batch(&mut batch, None).await?; Ok(len) } } diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 513cefc0a6..e484f71c7f 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -34,6 +34,7 @@ use std::{ time::Duration, }; use zenoh_buffers::ZSliceBuffer; +use zenoh_config::Priority; use zenoh_core::zwrite; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; @@ -195,7 +196,11 @@ async fn tx_task( match pipeline.pull().timeout(keep_alive).await { Ok(res) => match res { Some((mut batch, priority)) => { - link.send_batch(&mut batch).await?; + link.send_batch( + &mut batch, + Some(Priority::try_from(priority as u8).unwrap()), + ) + .await?; #[cfg(feature = "stats")] { @@ -225,7 +230,7 @@ async fn tx_task( // Drain the transmission pipeline and write remaining bytes on the wire let mut batches = pipeline.drain(); for (mut b, _) in batches.drain(..) { - link.send_batch(&mut b) + link.send_batch(&mut b, None) .timeout(keep_alive) .await .map_err(|_| zerror!("{}: flush failed after {} ms", link, keep_alive.as_millis()))??;