From 2a952d6b64356937c55327721f2406cc403f3acf Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 2 Dec 2024 15:57:35 +0100 Subject: [PATCH 01/14] fix(serial-reopening): resolved a bug that prevents sessions to be reopened over a serial link Signed-off-by: Gabriele Baldoni --- .../zenoh-link-serial/src/unicast.rs | 65 +++++++++++-------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 3b54dcfc3a..8a275bd4c7 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -113,6 +113,13 @@ impl LinkUnicastSerial { } false } + + fn clear_buffers(&self) -> ZResult<()> { + Ok(self + .get_port_mut() + .clear() + .map_err(|e| zerror!("Cannot clear serial buffers: {e:?}"))?) + } } #[async_trait] @@ -120,11 +127,7 @@ impl LinkUnicastTrait for LinkUnicastSerial { async fn close(&self) -> ZResult<()> { tracing::trace!("Closing Serial link: {}", self); let _guard = zasynclock!(self.write_lock); - self.get_port_mut().clear().map_err(|e| { - let e = zerror!("Unable to close Serial link {}: {}", self, e); - tracing::error!("{}", e); - e - })?; + self.clear_buffers()?; self.is_connected.store(false, Ordering::Release); Ok(()) } @@ -402,6 +405,9 @@ async fn accept_read_task( src_path: String, is_connected: Arc, ) -> ZResult> { + // Cleaning RX buffer before listening + link.clear_buffers()?; + while !is_connected.load(Ordering::Acquire) && !link.is_ready() { // Waiting to be ready, if not sleep some time. tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; @@ -415,32 +421,37 @@ async fn accept_read_task( tracing::trace!("Ready to accept Serial connections on: {:?}", src_path); loop { - tokio::select! { - res = receive( - link.clone(), - src_path.clone(), - is_connected.clone(), - ) => { - match res { - Ok(link) => { - // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await { - tracing::error!("{}-{}: {}", file!(), line!(), e) + if !is_connected.load(Ordering::Acquire) { + tokio::select! { + res = receive( + link.clone(), + src_path.clone(), + is_connected.clone(), + ) => { + match res { + Ok(link) => { + // Communicate the new link to the initial transport manager + if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await { + tracing::error!("{}-{}: {}", file!(), line!(), e) + } + + // Ensure the creation of this link is only once + continue; } + Err(e) => { + tracing::warn!("{}. Hint: Is the serial cable connected?", e); + tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; + continue; - // Ensure the creation of this link is only once - break; - } - Err(e) => { - tracing::warn!("{}. Hint: Is the serial cable connected?", e); - tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; - continue; - + } } - } - }, + }, - _ = token.cancelled() => break, + _ = token.cancelled() => break, + } + } else { + // In this case its already connected, so we do nothing + tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; } } Ok(()) From 3b1e368b9333589d847c1c16b2a33a86767c5ea5 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Wed, 4 Dec 2024 09:22:58 +0100 Subject: [PATCH 02/14] fix: serial reconnections Signed-off-by: Gabriele Baldoni --- Cargo.lock | 5 +- Cargo.toml | 3 + .../zenoh-link-serial/src/unicast.rs | 66 ++++++++----------- zenoh/Cargo.toml | 1 + 4 files changed, 35 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0129941564..a2501916ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4381,7 +4381,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "static_assertions", ] @@ -5043,8 +5043,7 @@ checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "z-serial" version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f113597c6b880587004169f14bc010e4b440981ab2ad669779d3654f9b1c4af1" +source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#2262076d24689965cd0422386c1cf491b4281605" dependencies = [ "cobs", "futures", diff --git a/Cargo.toml b/Cargo.toml index de23d2eea4..57d5707314 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -223,6 +223,9 @@ zenoh = { version = "1.0.0-dev", path = "zenoh", default-features = false } zenoh-runtime = { version = "1.0.0-dev", path = "commons/zenoh-runtime" } zenoh-task = { version = "1.0.0-dev", path = "commons/zenoh-task" } +[patch.crates-io] +z-serial = { git = "https://github.com/ZettaScaleLabs/z-serial/", branch = "feat/state-machine" } + [profile.dev] debug = true opt-level = 0 diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 8a275bd4c7..1b048fb9f8 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -30,7 +30,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use z_serial::ZSerial; -use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; +use zenoh_core::{bail, zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, @@ -96,25 +96,8 @@ impl LinkUnicastSerial { unsafe { &mut *self.port.get() } } - fn is_ready(&self) -> bool { - let res = match self.get_port_mut().bytes_to_read() { - Ok(b) => b, - Err(e) => { - tracing::warn!( - "Unable to check if there are bytes to read in serial {}: {}", - self.src_locator, - e - ); - 0 - } - }; - if res > 0 { - return true; - } - false - } - fn clear_buffers(&self) -> ZResult<()> { + tracing::trace!("I'm cleaning the buffers"); Ok(self .get_port_mut() .clear() @@ -127,8 +110,8 @@ impl LinkUnicastTrait for LinkUnicastSerial { async fn close(&self) -> ZResult<()> { tracing::trace!("Closing Serial link: {}", self); let _guard = zasynclock!(self.write_lock); - self.clear_buffers()?; self.is_connected.store(false, Ordering::Release); + self.get_port_mut().close(); Ok(()) } @@ -151,17 +134,14 @@ impl LinkUnicastTrait for LinkUnicastSerial { } async fn read(&self, buffer: &mut [u8]) -> ZResult { - loop { - let _guard = zasynclock!(self.read_lock); - match self.get_port_mut().read_msg(buffer).await { - Ok(read) => return Ok(read), - Err(e) => { - let e = zerror!("Read error on Serial link {}: {}", self, e); - tracing::error!("{}", e); - drop(_guard); - tokio::time::sleep(std::time::Duration::from_millis(1)).await; - continue; - } + let _guard = zasynclock!(self.read_lock); + match self.get_port_mut().read_msg(buffer).await { + Ok(read) => return Ok(read), + Err(e) => { + let e = zerror!("Read error on Serial link {}: {}", self, e); + tracing::error!("{}", e); + drop(_guard); + bail!("Read error on Serial link {}: {}", self, e); } } } @@ -287,7 +267,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { let baud_rate = get_baud_rate(&endpoint); let exclusive = get_exclusive(&endpoint); tracing::trace!("Opening Serial Link on device {path:?}, with baudrate {baud_rate} and exclusive set as {exclusive}"); - let port = ZSerial::new(path.clone(), baud_rate, exclusive).map_err(|e| { + let mut port = ZSerial::new(path.clone(), baud_rate, exclusive).map_err(|e| { let e = zerror!( "Can not create a new Serial link bound to {:?}: {}", path, @@ -297,6 +277,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { e })?; + // Clear buffers + port.clear()?; + port.connect().await?; + // Create Serial link let link = Arc::new(LinkUnicastSerial::new( UnsafeCell::new(port), @@ -405,11 +389,17 @@ async fn accept_read_task( src_path: String, is_connected: Arc, ) -> ZResult> { - // Cleaning RX buffer before listening - link.clear_buffers()?; - - while !is_connected.load(Ordering::Acquire) && !link.is_ready() { - // Waiting to be ready, if not sleep some time. + // while !is_connected.load(Ordering::Acquire) && !link.is_ready() { + // // Waiting to be ready, if not sleep some time. + // tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; + // } + // while !is_connected.load(Ordering::Acquire) { + // // Waiting to be ready, if not sleep some time. + // tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; + // } + + while link.get_port_mut().accept().await.is_err() { + //Waiting to be ready, if not sleep some time. tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; } @@ -418,6 +408,8 @@ async fn accept_read_task( Ok(link.clone()) } + // Cleaning RX buffer before listening + link.clear_buffers()?; tracing::trace!("Ready to accept Serial connections on: {:?}", src_path); loop { diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 94f6d6eb48..02709b3424 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -42,6 +42,7 @@ default = [ "transport_udp", "transport_unixsock-stream", "transport_ws", + "transport_serial" ] internal = ["zenoh-keyexpr/internal", "zenoh-config/internal"] plugins = [] From dbddaf4b916ac5abeeff26354929c3fb77f64c0c Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Wed, 4 Dec 2024 11:23:42 +0100 Subject: [PATCH 03/14] feat: configurable timeout Signed-off-by: Gabriele Baldoni --- Cargo.lock | 2 +- io/zenoh-links/zenoh-link-serial/src/lib.rs | 11 +++++++++++ io/zenoh-links/zenoh-link-serial/src/unicast.rs | 7 ++++--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2501916ff..bfcb1bd5c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5043,7 +5043,7 @@ checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "z-serial" version = "0.2.3" -source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#2262076d24689965cd0422386c1cf491b4281605" +source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#43b978efa780b5b8150eaac96949f8545166f918" dependencies = [ "cobs", "futures", diff --git a/io/zenoh-links/zenoh-link-serial/src/lib.rs b/io/zenoh-links/zenoh-link-serial/src/lib.rs index 04cefebd11..ccf4f2c627 100644 --- a/io/zenoh-links/zenoh-link-serial/src/lib.rs +++ b/io/zenoh-links/zenoh-link-serial/src/lib.rs @@ -38,6 +38,8 @@ const DEFAULT_BAUDRATE: u32 = 9_600; const DEFAULT_EXCLUSIVE: bool = true; +const DEFAULT_TIMEOUT: u64 = 50_000; + pub const SERIAL_LOCATOR_PREFIX: &str = "serial"; const SERIAL_MTU_LIMIT: BatchSize = SERIAL_MAX_MTU; @@ -94,6 +96,14 @@ pub fn get_exclusive(endpoint: &EndPoint) -> bool { } } +pub fn get_timeout(endpoint: &EndPoint) -> u64 { + if let Some(tout) = endpoint.config().get(config::TIMEOUT_RAW) { + u64::from_str(tout).unwrap_or(DEFAULT_TIMEOUT) + } else { + DEFAULT_TIMEOUT + } +} + pub fn get_unix_path_as_string(address: Address<'_>) -> String { address.as_str().to_owned() } @@ -101,4 +111,5 @@ pub fn get_unix_path_as_string(address: Address<'_>) -> String { pub mod config { pub const PORT_BAUD_RATE_RAW: &str = "baudrate"; pub const PORT_EXCLUSIVE_RAW: &str = "exclusive"; + pub const TIMEOUT_RAW: &str = "tout"; } diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 1b048fb9f8..26fa083b4d 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -45,7 +45,7 @@ use super::{ get_baud_rate, get_unix_path_as_string, SERIAL_ACCEPT_THROTTLE_TIME, SERIAL_DEFAULT_MTU, SERIAL_LOCATOR_PREFIX, }; -use crate::get_exclusive; +use crate::{get_exclusive, get_timeout}; struct LinkUnicastSerial { // The underlying serial port as returned by ZSerial (tokio-serial) @@ -266,7 +266,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { let path = get_unix_path_as_string(endpoint.address()); let baud_rate = get_baud_rate(&endpoint); let exclusive = get_exclusive(&endpoint); - tracing::trace!("Opening Serial Link on device {path:?}, with baudrate {baud_rate} and exclusive set as {exclusive}"); + let tout = get_timeout(&endpoint); + tracing::trace!("Opening Serial Link on device {path:?}, with baudrate {baud_rate}, exclusive set as {exclusive} and timeout (us) {tout}"); let mut port = ZSerial::new(path.clone(), baud_rate, exclusive).map_err(|e| { let e = zerror!( "Can not create a new Serial link bound to {:?}: {}", @@ -279,7 +280,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { // Clear buffers port.clear()?; - port.connect().await?; + port.connect(Some(Duration::from_micros(tout))).await?; // Create Serial link let link = Arc::new(LinkUnicastSerial::new( From 0c02db8b81317844cb03689123a97999db6887f5 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 9 Dec 2024 08:35:07 +0100 Subject: [PATCH 04/14] deps: bump z-serial dependency Signed-off-by: Gabriele Baldoni --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index bfcb1bd5c2..0ce1ec692b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5043,7 +5043,7 @@ checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "z-serial" version = "0.2.3" -source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#43b978efa780b5b8150eaac96949f8545166f918" +source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#bb00ed07424987d3fa983732741e0a1aaba24411" dependencies = [ "cobs", "futures", From f67f713d0e15f0482d98fb1a69e1073c0c58182b Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 9 Dec 2024 12:49:31 +0100 Subject: [PATCH 05/14] fix: release file on close Signed-off-by: Gabriele Baldoni --- io/zenoh-links/zenoh-link-serial/src/lib.rs | 11 ++ .../zenoh-link-serial/src/unicast.rs | 106 ++++++++++++++---- 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/io/zenoh-links/zenoh-link-serial/src/lib.rs b/io/zenoh-links/zenoh-link-serial/src/lib.rs index ccf4f2c627..9d4b9012a4 100644 --- a/io/zenoh-links/zenoh-link-serial/src/lib.rs +++ b/io/zenoh-links/zenoh-link-serial/src/lib.rs @@ -40,6 +40,8 @@ const DEFAULT_EXCLUSIVE: bool = true; const DEFAULT_TIMEOUT: u64 = 50_000; +const DEFAULT_RELEASE_ON_CLOSE: bool = true; + pub const SERIAL_LOCATOR_PREFIX: &str = "serial"; const SERIAL_MTU_LIMIT: BatchSize = SERIAL_MAX_MTU; @@ -104,6 +106,14 @@ pub fn get_timeout(endpoint: &EndPoint) -> u64 { } } +pub fn get_release_on_close(endpoint: &EndPoint) -> bool { + if let Some(release_on_close) = endpoint.config().get(config::RELEASE_ON_CLOSE) { + bool::from_str(release_on_close).unwrap_or(DEFAULT_RELEASE_ON_CLOSE) + } else { + DEFAULT_RELEASE_ON_CLOSE + } +} + pub fn get_unix_path_as_string(address: Address<'_>) -> String { address.as_str().to_owned() } @@ -112,4 +122,5 @@ pub mod config { pub const PORT_BAUD_RATE_RAW: &str = "baudrate"; pub const PORT_EXCLUSIVE_RAW: &str = "exclusive"; pub const TIMEOUT_RAW: &str = "tout"; + pub const RELEASE_ON_CLOSE: &str = "release_on_close"; } diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 26fa083b4d..58646c050b 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -15,6 +15,7 @@ use std::{ cell::UnsafeCell, collections::HashMap, + f32::consts::E, fmt, sync::{ atomic::{AtomicBool, Ordering}, @@ -45,7 +46,7 @@ use super::{ get_baud_rate, get_unix_path_as_string, SERIAL_ACCEPT_THROTTLE_TIME, SERIAL_DEFAULT_MTU, SERIAL_LOCATOR_PREFIX, }; -use crate::{get_exclusive, get_timeout}; +use crate::{get_exclusive, get_release_on_close, get_timeout}; struct LinkUnicastSerial { // The underlying serial port as returned by ZSerial (tokio-serial) @@ -56,13 +57,15 @@ struct LinkUnicastSerial { // already ensures that no concurrent reads or writes can happen on // the same stream: there is only one task at the time that writes on // the stream and only one task at the time that reads from the stream. - port: UnsafeCell, + port: UnsafeCell>, // The serial port path src_locator: Locator, // The serial destination path (random UUIDv4) dst_locator: Locator, // A flag that tells if the link is connected or not is_connected: Arc, + // A flag that tells if we must release the file on close. + release_on_close: bool, // Locks for reading and writing ends of the serial. write_lock: AsyncMutex<()>, read_lock: AsyncMutex<()>, @@ -73,16 +76,18 @@ unsafe impl Sync for LinkUnicastSerial {} impl LinkUnicastSerial { fn new( - port: UnsafeCell, + port: UnsafeCell>, src_path: &str, dst_path: &str, is_connected: Arc, + release_on_close: bool, ) -> Self { Self { port, src_locator: Locator::new(SERIAL_LOCATOR_PREFIX, src_path, "").unwrap(), dst_locator: Locator::new(SERIAL_LOCATOR_PREFIX, dst_path, "").unwrap(), is_connected, + release_on_close, write_lock: AsyncMutex::new(()), read_lock: AsyncMutex::new(()), } @@ -92,17 +97,32 @@ impl LinkUnicastSerial { // or concurrent writes will ever happen. The write_lock and read_lock // are respectively acquired in any read and write operation. #[allow(clippy::mut_from_ref)] - fn get_port_mut(&self) -> &mut ZSerial { - unsafe { &mut *self.port.get() } + fn get_port_mut(&self) -> ZResult<&mut ZSerial> { + unsafe { + let opt = &mut *self.port.get(); + + if let Some(port) = opt { + return Ok(port); + } + bail!("Serial is not opened") + } } fn clear_buffers(&self) -> ZResult<()> { tracing::trace!("I'm cleaning the buffers"); Ok(self - .get_port_mut() + .get_port_mut()? .clear() .map_err(|e| zerror!("Cannot clear serial buffers: {e:?}"))?) } + + fn set_port(&self, port: ZSerial) { + unsafe { *self.port.get() = Some(port) } + } + + fn unset_port(&self) { + unsafe { *self.port.get() = None } + } } #[async_trait] @@ -111,13 +131,17 @@ impl LinkUnicastTrait for LinkUnicastSerial { tracing::trace!("Closing Serial link: {}", self); let _guard = zasynclock!(self.write_lock); self.is_connected.store(false, Ordering::Release); - self.get_port_mut().close(); + self.get_port_mut()?.close(); + if self.release_on_close { + self.unset_port(); + } + Ok(()) } async fn write(&self, buffer: &[u8]) -> ZResult { let _guard = zasynclock!(self.write_lock); - self.get_port_mut().write(buffer).await.map_err(|e| { + self.get_port_mut()?.write(buffer).await.map_err(|e| { let e = zerror!("Unable to write on Serial link {}: {}", self, e); tracing::error!("{}", e); e @@ -135,7 +159,7 @@ impl LinkUnicastTrait for LinkUnicastSerial { async fn read(&self, buffer: &mut [u8]) -> ZResult { let _guard = zasynclock!(self.read_lock); - match self.get_port_mut().read_msg(buffer).await { + match self.get_port_mut()?.read_msg(buffer).await { Ok(read) => return Ok(read), Err(e) => { let e = zerror!("Read error on Serial link {}: {}", self, e); @@ -267,6 +291,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { let baud_rate = get_baud_rate(&endpoint); let exclusive = get_exclusive(&endpoint); let tout = get_timeout(&endpoint); + let release_on_close = get_release_on_close(&endpoint); tracing::trace!("Opening Serial Link on device {path:?}, with baudrate {baud_rate}, exclusive set as {exclusive} and timeout (us) {tout}"); let mut port = ZSerial::new(path.clone(), baud_rate, exclusive).map_err(|e| { let e = zerror!( @@ -284,10 +309,11 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { // Create Serial link let link = Arc::new(LinkUnicastSerial::new( - UnsafeCell::new(port), + UnsafeCell::new(Some(port)), &path, &path, Arc::new(AtomicBool::new(true)), + release_on_close, )); Ok(LinkUnicast(link)) @@ -297,7 +323,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { let path = get_unix_path_as_string(endpoint.address()); let baud_rate = get_baud_rate(&endpoint); let exclusive = get_exclusive(&endpoint); + let release_on_close = get_release_on_close(&endpoint); tracing::trace!("Creating Serial listener on device {path:?}, with baudrate {baud_rate} and exclusive set as {exclusive}"); + + // Opening to check if the port exists. let port = ZSerial::new(path.clone(), baud_rate, exclusive).map_err(|e| { let e = zerror!( "Can not create a new Serial link bound to {:?}: {}", @@ -307,15 +336,18 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { tracing::warn!("{}", e); e })?; + // closing it. + drop(port); // Creating the link let is_connected = Arc::new(AtomicBool::new(false)); let dst_path = format!("{}", uuid::Uuid::new_v4()); let link = Arc::new(LinkUnicastSerial::new( - UnsafeCell::new(port), + UnsafeCell::new(None), &path, &dst_path, is_connected.clone(), + release_on_close, )); // Spawn the accept loop for the listener @@ -330,7 +362,17 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { async move { // Wait for the accept loop to terminate - let res = accept_read_task(link, token, manager, path.clone(), is_connected).await; + let res = accept_read_task( + link, + token, + manager, + path.clone(), + is_connected, + baud_rate, + exclusive, + release_on_close, + ) + .await; zasyncwrite!(listeners).remove(&path); res } @@ -384,22 +426,39 @@ async fn accept_read_task( manager: NewLinkChannelSender, src_path: String, is_connected: Arc, + baud_rate: u32, + exclusive: bool, + release_on_close: bool, ) -> ZResult<()> { async fn receive( link: Arc, src_path: String, is_connected: Arc, + baud_rate: u32, + exclusive: bool, + release_on_close: bool, ) -> ZResult> { - // while !is_connected.load(Ordering::Acquire) && !link.is_ready() { - // // Waiting to be ready, if not sleep some time. - // tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; - // } - // while !is_connected.load(Ordering::Acquire) { - // // Waiting to be ready, if not sleep some time. - // tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; - // } - - while link.get_port_mut().accept().await.is_err() { + while !is_connected.load(Ordering::Acquire) { + // The serial is already connected to nothing. + tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; + } + + tracing::trace!("Creating Serial listener on device {src_path:?}, with baudrate {baud_rate} and exclusive set as {exclusive}"); + if release_on_close { + let port = ZSerial::new(src_path.clone(), baud_rate, exclusive).map_err(|e| { + let e = zerror!( + "Can not create a new Serial link bound to {:?}: {}", + src_path, + e + ); + tracing::warn!("{}", e); + e + })?; + + link.set_port(port); + } + + while link.get_port_mut()?.accept().await.is_err() { //Waiting to be ready, if not sleep some time. tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; } @@ -420,6 +479,9 @@ async fn accept_read_task( link.clone(), src_path.clone(), is_connected.clone(), + baud_rate, + exclusive, + release_on_close, ) => { match res { Ok(link) => { From 61524af85a2075f2f6a8db81b869e03fed660d89 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 08:25:41 +0100 Subject: [PATCH 06/14] fix: closing file on error Signed-off-by: Gabriele Baldoni --- .../zenoh-link-serial/src/unicast.rs | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 58646c050b..88266566d6 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -15,7 +15,6 @@ use std::{ cell::UnsafeCell, collections::HashMap, - f32::consts::E, fmt, sync::{ atomic::{AtomicBool, Ordering}, @@ -324,20 +323,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { let baud_rate = get_baud_rate(&endpoint); let exclusive = get_exclusive(&endpoint); let release_on_close = get_release_on_close(&endpoint); - tracing::trace!("Creating Serial listener on device {path:?}, with baudrate {baud_rate} and exclusive set as {exclusive}"); - - // Opening to check if the port exists. - let port = ZSerial::new(path.clone(), baud_rate, exclusive).map_err(|e| { - let e = zerror!( - "Can not create a new Serial link bound to {:?}: {}", - path, - e - ); - tracing::warn!("{}", e); - e - })?; - // closing it. - drop(port); // Creating the link let is_connected = Arc::new(AtomicBool::new(false)); @@ -438,7 +423,7 @@ async fn accept_read_task( exclusive: bool, release_on_close: bool, ) -> ZResult> { - while !is_connected.load(Ordering::Acquire) { + while is_connected.load(Ordering::Acquire) { // The serial is already connected to nothing. tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; } @@ -456,6 +441,8 @@ async fn accept_read_task( })?; link.set_port(port); + // Cleaning RX buffer before listening + link.clear_buffers()?; } while link.get_port_mut()?.accept().await.is_err() { @@ -468,8 +455,6 @@ async fn accept_read_task( Ok(link.clone()) } - // Cleaning RX buffer before listening - link.clear_buffers()?; tracing::trace!("Ready to accept Serial connections on: {:?}", src_path); loop { From c4992aed61e64799931105bcf6ebaca4d38a3beb Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 09:09:58 +0100 Subject: [PATCH 07/14] fix: opening and reconnections Signed-off-by: Gabriele Baldoni --- Cargo.lock | 4 ++-- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ce1ec692b..41980a624e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4381,7 +4381,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] @@ -5043,7 +5043,7 @@ checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "z-serial" version = "0.2.3" -source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#bb00ed07424987d3fa983732741e0a1aaba24411" +source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#d7496e671215ee04316bcb1f4b31b437d1ee87af" dependencies = [ "cobs", "futures", diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 88266566d6..4a9dd767e8 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -423,6 +423,8 @@ async fn accept_read_task( exclusive: bool, release_on_close: bool, ) -> ZResult> { + tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; + while is_connected.load(Ordering::Acquire) { // The serial is already connected to nothing. tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; From ffac83d9ecc7812344f7ea9585e4abdb6580949e Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 12:19:55 +0100 Subject: [PATCH 08/14] chore: updated log levels Signed-off-by: Gabriele Baldoni --- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 4a9dd767e8..4221823064 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -433,13 +433,11 @@ async fn accept_read_task( tracing::trace!("Creating Serial listener on device {src_path:?}, with baudrate {baud_rate} and exclusive set as {exclusive}"); if release_on_close { let port = ZSerial::new(src_path.clone(), baud_rate, exclusive).map_err(|e| { - let e = zerror!( + zerror!( "Can not create a new Serial link bound to {:?}: {}", src_path, e ); - tracing::warn!("{}", e); - e })?; link.set_port(port); @@ -474,7 +472,7 @@ async fn accept_read_task( Ok(link) => { // Communicate the new link to the initial transport manager if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await { - tracing::error!("{}-{}: {}", file!(), line!(), e) + tracing::debug!("{}-{}: {}", file!(), line!(), e) } // Ensure the creation of this link is only once From 895b07ee630d5cf382be6ba7ecb9b844422f068a Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 12:24:53 +0100 Subject: [PATCH 09/14] fix: return on map_err Signed-off-by: Gabriele Baldoni --- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 4221823064..2d679ae5a2 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -437,7 +437,7 @@ async fn accept_read_task( "Can not create a new Serial link bound to {:?}: {}", src_path, e - ); + ) })?; link.set_port(port); From 713d1294a0b46e53c06382599c4ccb169f26a402 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 12:34:26 +0100 Subject: [PATCH 10/14] chore: changed warn log to debug Signed-off-by: Gabriele Baldoni --- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 2d679ae5a2..390922311a 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -479,7 +479,7 @@ async fn accept_read_task( continue; } Err(e) => { - tracing::warn!("{}. Hint: Is the serial cable connected?", e); + tracing::debug!("{}. Hint: Is the serial cable connected?", e); tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await; continue; From e8abf3cbb31388187c65a9f8c01e03d1ff85de74 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 12:46:51 +0100 Subject: [PATCH 11/14] linter: make clippy happy again Signed-off-by: Gabriele Baldoni --- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 390922311a..cb68b9893c 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -405,6 +405,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { } } +#[allow(clippy::too_many_arguments)] async fn accept_read_task( link: Arc, token: CancellationToken, @@ -415,6 +416,7 @@ async fn accept_read_task( exclusive: bool, release_on_close: bool, ) -> ZResult<()> { + #[allow(clippy::too_many_arguments)] async fn receive( link: Arc, src_path: String, From ada218463b449401d55fe88dc4d59e67fff5bb2c Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 17:10:09 +0100 Subject: [PATCH 12/14] deps: bump z-serial Signed-off-by: Gabriele Baldoni --- Cargo.lock | 7 ++++--- Cargo.toml | 5 +---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41980a624e..bbdee961a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4381,7 +4381,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "static_assertions", ] @@ -5042,8 +5042,9 @@ checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "z-serial" -version = "0.2.3" -source = "git+https://github.com/ZettaScaleLabs/z-serial/?branch=feat/state-machine#d7496e671215ee04316bcb1f4b31b437d1ee87af" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1f288ec253cd9add72c2cf8c24d6525a3d81481b0407a4d53c23c16c871c38c" dependencies = [ "cobs", "futures", diff --git a/Cargo.toml b/Cargo.toml index 57d5707314..4e5f70f933 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -187,7 +187,7 @@ vec_map = "0.8.2" webpki-roots = "0.26.5" winapi = { version = "0.3.9", features = ["iphlpapi", "winerror"] } x509-parser = "0.16.0" -z-serial = "0.2.3" +z-serial = "0.3.0" either = "1.13.0" prost = "0.13.2" tls-listener = { version = "0.10.2", features = ["rustls-ring"] } @@ -223,9 +223,6 @@ zenoh = { version = "1.0.0-dev", path = "zenoh", default-features = false } zenoh-runtime = { version = "1.0.0-dev", path = "commons/zenoh-runtime" } zenoh-task = { version = "1.0.0-dev", path = "commons/zenoh-task" } -[patch.crates-io] -z-serial = { git = "https://github.com/ZettaScaleLabs/z-serial/", branch = "feat/state-machine" } - [profile.dev] debug = true opt-level = 0 From a4506efbe103827a8cb9668f05864dea11747d79 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 10 Dec 2024 19:55:49 +0100 Subject: [PATCH 13/14] chore: serial not as default feature Signed-off-by: Gabriele Baldoni --- zenoh/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 02709b3424..6a998a50a4 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -41,8 +41,7 @@ default = [ "transport_tls", "transport_udp", "transport_unixsock-stream", - "transport_ws", - "transport_serial" + "transport_ws" ] internal = ["zenoh-keyexpr/internal", "zenoh-config/internal"] plugins = [] From 2dc5243bb234641481e104df5afa1800e3ec6bbd Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Wed, 11 Dec 2024 11:10:37 +0100 Subject: [PATCH 14/14] deps: bump z-serial Signed-off-by: Gabriele Baldoni --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbdee961a3..314c10f5da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4381,7 +4381,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] @@ -5042,9 +5042,9 @@ checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "z-serial" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1f288ec253cd9add72c2cf8c24d6525a3d81481b0407a4d53c23c16c871c38c" +checksum = "1660dfc9f90480610f94c285a9a967b49cd2f57b3b1267d9bd7fd5d4f57c36c8" dependencies = [ "cobs", "futures", diff --git a/Cargo.toml b/Cargo.toml index 4e5f70f933..4b7b5f1f69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -187,7 +187,7 @@ vec_map = "0.8.2" webpki-roots = "0.26.5" winapi = { version = "0.3.9", features = ["iphlpapi", "winerror"] } x509-parser = "0.16.0" -z-serial = "0.3.0" +z-serial = "0.3.1" either = "1.13.0" prost = "0.13.2" tls-listener = { version = "0.10.2", features = ["rustls-ring"] }