diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 552107328..a2731b952 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -337,6 +337,8 @@ transport: { unicast: { /// Timeout in milliseconds when opening a link + open_timeout: 10000, + /// Timeout in milliseconds when accepting a link accept_timeout: 10000, /// Maximum number of zenoh session in pending state while accepting accept_pending: 100, diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index da10d7b79..195a08a07 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -171,6 +171,7 @@ impl Default for ConnectConfig { impl Default for TransportUnicastConf { fn default() -> Self { Self { + open_timeout: 10_000, accept_timeout: 10_000, accept_pending: 100, max_sessions: 1_000, diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 8ab4d5d3a..30bd9a7c8 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -374,6 +374,8 @@ validated_struct::validator! { TransportConf { pub unicast: TransportUnicastConf { /// Timeout in milliseconds when opening a link (default: 10000). + open_timeout: u64, + /// Timeout in milliseconds when accepting a link (default: 10000). accept_timeout: u64, /// Number of links that may stay pending during accept phase (default: 100). accept_pending: usize, diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 749760475..c2cf85832 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -60,6 +60,7 @@ use crate::{ pub struct TransportManagerConfigUnicast { pub lease: Duration, pub keep_alive: usize, + pub open_timeout: Duration, pub accept_timeout: Duration, pub accept_pending: usize, pub max_sessions: usize, @@ -105,6 +106,7 @@ pub struct TransportManagerBuilderUnicast { // target interval. pub(super) lease: Duration, pub(super) keep_alive: usize, + pub(super) open_timeout: Duration, pub(super) accept_timeout: Duration, pub(super) accept_pending: usize, pub(super) max_sessions: usize, @@ -131,6 +133,11 @@ impl TransportManagerBuilderUnicast { self } + pub fn open_timeout(mut self, open_timeout: Duration) -> Self { + self.open_timeout = open_timeout; + self + } + pub fn accept_timeout(mut self, accept_timeout: Duration) -> Self { self.accept_timeout = accept_timeout; self @@ -225,6 +232,7 @@ impl TransportManagerBuilderUnicast { let config = TransportManagerConfigUnicast { lease: self.lease, keep_alive: self.keep_alive, + open_timeout: self.open_timeout, accept_timeout: self.accept_timeout, accept_pending: self.accept_pending, max_sessions: self.max_sessions, @@ -274,6 +282,7 @@ impl Default for TransportManagerBuilderUnicast { Self { lease: Duration::from_millis(*link_tx.lease()), keep_alive: *link_tx.keep_alive(), + open_timeout: Duration::from_millis(*transport.open_timeout()), accept_timeout: Duration::from_millis(*transport.accept_timeout()), accept_pending: *transport.accept_pending(), max_sessions: *transport.max_sessions(), @@ -725,7 +734,12 @@ impl TransportManager { // Create a new link associated by calling the Link Manager let link = manager.new_link(endpoint.clone()).await?; // Open the link - super::establishment::open::open_link(endpoint, link, self).await + tokio::time::timeout( + self.config.unicast.open_timeout, + super::establishment::open::open_link(endpoint, link, self), + ) + .await + .map_err(|e| zerror!("{e}"))? } pub async fn get_transport_unicast(&self, peer: &ZenohIdProto) -> Option { diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index bd3cf0a15..15f7ca52a 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -349,11 +349,7 @@ impl Runtime { ); if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() { // try to connect and exit immediately without retry - if self - .peer_connector(endpoint, retry_config.timeout()) - .await - .is_ok() - { + if self.peer_connector(endpoint).await.is_ok() { return Ok(()); } } else { @@ -379,7 +375,7 @@ impl Runtime { ); if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() { // try to connect and exit immediately without retry - if let Err(e) = self.peer_connector(endpoint, retry_config.timeout()).await { + if let Err(e) = self.peer_connector(endpoint).await { if retry_config.exit_on_failure { return Err(e); } @@ -398,18 +394,12 @@ impl Runtime { Ok(()) } - async fn peer_connector(&self, peer: EndPoint, timeout: std::time::Duration) -> ZResult<()> { - match tokio::time::timeout(timeout, self.manager().open_transport_unicast(peer.clone())) - .await - { - Ok(Ok(_)) => Ok(()), - Ok(Err(e)) => { - tracing::warn!("Unable to connect to {}! {}", peer, e); - Err(e) - } + async fn peer_connector(&self, peer: EndPoint) -> ZResult<()> { + match self.manager().open_transport_unicast(peer.clone()).await { + Ok(_) => Ok(()), Err(e) => { tracing::warn!("Unable to connect to {}! {}", peer, e); - Err(e.into()) + Err(e) } } } @@ -795,9 +785,9 @@ impl Runtime { tracing::trace!("Trying to connect to configured peer {}", peer); let endpoint = peer.clone(); tokio::select! { - res = tokio::time::timeout(retry_config.timeout(), self.manager().open_transport_unicast(endpoint)) => { + res = self.manager().open_transport_unicast(endpoint) => { match res { - Ok(Ok(transport)) => { + Ok(transport) => { tracing::debug!("Successfully connected to configured peer {}", peer); if let Ok(Some(orch_transport)) = transport.get_callback() { if let Some(orch_transport) = orch_transport @@ -809,14 +799,6 @@ impl Runtime { } return transport.get_zid(); } - Ok(Err(e)) => { - tracing::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - period.duration() - ); - } Err(e) => { tracing::debug!( "Unable to connect to configured peer {}! {}. Retry in {:?}.", @@ -977,7 +959,6 @@ impl Runtime { }; let endpoint = locator.to_owned().into(); - let retry_config = self.get_connect_retry_config(&endpoint); let priorities = locator .metadata() .get(Metadata::PRIORITIES) @@ -997,35 +978,23 @@ impl Runtime { }) { if is_multicast { - match tokio::time::timeout( - retry_config.timeout(), - manager.open_transport_multicast(endpoint), - ) - .await - { - Ok(Ok(transport)) => { + match manager.open_transport_multicast(endpoint).await { + Ok(transport) => { tracing::debug!( "Successfully connected to newly scouted peer: {:?}", transport ); } - Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), } } else { - match tokio::time::timeout( - retry_config.timeout(), - manager.open_transport_unicast(endpoint), - ) - .await - { - Ok(Ok(transport)) => { + match manager.open_transport_unicast(endpoint).await { + Ok(transport) => { tracing::debug!( "Successfully connected to newly scouted peer: {:?}", transport ); } - Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), } }