Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(serial-reopening): bug that prevented the serial link to be reused after closing a session #1624

Merged
merged 14 commits into from
Dec 11, 2024
65 changes: 38 additions & 27 deletions io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,21 @@ impl LinkUnicastSerial {
}
false
}

fn clear_buffers(&self) -> ZResult<()> {
Ok(self
.get_port_mut()
.clear()
.map_err(|e| zerror!("Cannot clear serial buffers: {e:?}"))?)
}
}

#[async_trait]
impl LinkUnicastTrait for LinkUnicastSerial {
async fn close(&self) -> ZResult<()> {
tracing::trace!("Closing Serial link: {}", self);
let _guard = zasynclock!(self.write_lock);
self.get_port_mut().clear().map_err(|e| {
let e = zerror!("Unable to close Serial link {}: {}", self, e);
tracing::error!("{}", e);
e
})?;
self.clear_buffers()?;
self.is_connected.store(false, Ordering::Release);
Ok(())
}
Expand Down Expand Up @@ -402,6 +405,9 @@ async fn accept_read_task(
src_path: String,
is_connected: Arc<AtomicBool>,
) -> ZResult<Arc<LinkUnicastSerial>> {
// Cleaning RX buffer before listening
link.clear_buffers()?;

while !is_connected.load(Ordering::Acquire) && !link.is_ready() {
// Waiting to be ready, if not sleep some time.
tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await;
Expand All @@ -415,32 +421,37 @@ async fn accept_read_task(
tracing::trace!("Ready to accept Serial connections on: {:?}", src_path);

loop {
tokio::select! {
res = receive(
link.clone(),
src_path.clone(),
is_connected.clone(),
) => {
match res {
Ok(link) => {
// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
if !is_connected.load(Ordering::Acquire) {
tokio::select! {
res = receive(
link.clone(),
src_path.clone(),
is_connected.clone(),
) => {
match res {
Ok(link) => {
// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
}

// Ensure the creation of this link is only once
continue;
}
Err(e) => {
tracing::warn!("{}. Hint: Is the serial cable connected?", e);
tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await;
continue;

// Ensure the creation of this link is only once
break;
}
Err(e) => {
tracing::warn!("{}. Hint: Is the serial cable connected?", e);
tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await;
continue;

}
}
}
},
},

_ = token.cancelled() => break,
_ = token.cancelled() => break,
}
} else {
// In this case its already connected, so we do nothing
tokio::time::sleep(Duration::from_micros(*SERIAL_ACCEPT_THROTTLE_TIME)).await;
}
}
Ok(())
Expand Down