Skip to content

Commit

Permalink
Close builder (#1576)
Browse files Browse the repository at this point in the history
* atexit-safe close builder draft

* - get rid of nolocal crate
- add timeout for async task controller finalization

* Close builder trait with proper timeout support for Runtime and Session

* fix close behavior

* fix clippy
fix ephemeral ports in tests

* - brush-up timeout implementation in the TaskController
- remove backoff from CloseBuilder
- simplify some code around CloseBuilder

* Revert ephemeral port fix

* Update close.rs

* merge fix

* Make Close builder functionality unstable && internal

* fix docs

* Session close tests
  • Loading branch information
yellowhatter authored Dec 3, 2024
1 parent 549bc7b commit c6f52c2
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 86 deletions.
42 changes: 25 additions & 17 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,23 @@ impl TaskController {
/// The call blocks until all tasks yield or timeout duration expires.
/// Returns 0 in case of success, number of non terminated tasks otherwise.
pub fn terminate_all(&self, timeout: Duration) -> usize {
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).wait()
ResolveFuture::new(async move {
if tokio::time::timeout(timeout, self.terminate_all_async())
.await
.is_err()
{
tracing::error!("Failed to terminate {} tasks", self.tracker.len());
}
self.tracker.len()
})
.wait()
}

/// Async version of [`TaskController::terminate_all()`].
pub async fn terminate_all_async(&self, timeout: Duration) -> usize {
pub async fn terminate_all_async(&self) {
self.tracker.close();
self.token.cancel();
if tokio::time::timeout(timeout, self.tracker.wait())
.await
.is_err()
{
tracing::error!("Failed to terminate {} tasks", self.tracker.len());
return self.tracker.len();
}
0
self.tracker.wait().await
}
}

Expand Down Expand Up @@ -181,18 +183,24 @@ impl TerminatableTask {
/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(&mut self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
ResolveFuture::new(async move {
if tokio::time::timeout(timeout, self.terminate_async())
.await
.is_err()
{
tracing::error!("Failed to terminate the task");
return false;
};
true
})
.wait()
}

/// Async version of [`TerminatableTask::terminate()`].
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
pub async fn terminate_async(&mut self) {
self.token.cancel();
if let Some(handle) = self.handle.take() {
if tokio::time::timeout(timeout, handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
let _ = handle.await;
}
true
}
}
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,7 @@ impl TransportManager {

pub async fn close(&self) {
self.close_unicast().await;
self.task_controller
.terminate_all_async(Duration::from_secs(10))
.await;
self.task_controller.terminate_all_async().await;
}

/*************************************/
Expand Down
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ impl TransportMulticastInner {
cb.closed();
}

self.task_controller
.terminate_all_async(Duration::from_secs(10))
.await;
self.task_controller.terminate_all_async().await;

Ok(())
}
Expand Down
152 changes: 152 additions & 0 deletions zenoh/src/api/builders/close.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{
future::{Future, IntoFuture},
pin::Pin,
time::Duration,
};

use async_trait::async_trait;
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
use zenoh_runtime::ZRuntime;

/// A builder for close operations.
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
pub struct CloseBuilder<TCloseable: Closeable> {
closee: TCloseable::TClosee,
timeout: Duration,
}

// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
impl<TCloseable: Closeable> CloseBuilder<TCloseable> {
pub(crate) fn new(closeable: &'_ TCloseable) -> Self {
Self {
closee: closeable.get_closee(),
timeout: Duration::from_secs(10),
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
/// Set the timeout for close operation
///
/// # Arguments
///
/// * `timeout` - The timeout value for close operation (10s by default)
///
#[doc(hidden)]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

#[cfg(all(feature = "unstable", feature = "internal"))]
/// Run Close operation concurrently
#[doc(hidden)]
pub fn in_background(
self,
) -> BackgroundCloseBuilder<<CloseBuilder<TCloseable> as Resolvable>::To> {
BackgroundCloseBuilder::new(self.into_future())
}
}

impl<TCloseable: Closeable> Resolvable for CloseBuilder<TCloseable> {
type To = ZResult<()>;
}

impl<TCloseable: Closeable> Wait for CloseBuilder<TCloseable> {
fn wait(self) -> Self::To {
ZRuntime::Application.block_in_place(self.into_future())
}
}

impl<TCloseable: Closeable> IntoFuture for CloseBuilder<TCloseable> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(
async move {
if tokio::time::timeout(self.timeout, self.closee.close_inner())
.await
.is_err()
{
bail!("close operation timed out!")
}
Ok(())
}
.into_future(),
)
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
/// A builder for close operations running in background
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[doc(hidden)]
#[allow(private_bounds)]
pub struct BackgroundCloseBuilder<TOutput: Send + 'static> {
inner: Pin<Box<dyn Future<Output = TOutput> + Send>>,
}

#[cfg(all(feature = "unstable", feature = "internal"))]
#[doc(hidden)]
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
impl<TOutput: Send + 'static> BackgroundCloseBuilder<TOutput> {
fn new(inner: Pin<Box<dyn Future<Output = TOutput> + Send>>) -> Self {
Self { inner }
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
impl<TOutput: Send + 'static> Resolvable for BackgroundCloseBuilder<TOutput> {
type To = tokio::task::JoinHandle<TOutput>;
}

#[cfg(all(feature = "unstable", feature = "internal"))]
impl<TOutput: Send + 'static> Wait for BackgroundCloseBuilder<TOutput> {
fn wait(self) -> Self::To {
ZRuntime::Application.block_in_place(self.into_future())
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
impl<TOutput: Send + 'static> IntoFuture for BackgroundCloseBuilder<TOutput> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + Send>>;

// NOTE: yes, we need to return a future that returns JoinHandle
#[allow(clippy::async_yields_async)]
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { ZRuntime::Application.spawn(self.inner) }.into_future())
}
}

#[async_trait]
pub(crate) trait Closee: Send + Sync + 'static {
async fn close_inner(&self);
}

pub(crate) trait Closeable {
type TClosee: Closee;
fn get_closee(&self) -> Self::TClosee;
}
1 change: 1 addition & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

pub(crate) mod close;
pub(crate) mod info;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
Expand Down
94 changes: 53 additions & 41 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use async_trait::async_trait;
use tracing::{error, info, trace, warn};
use uhlc::Timestamp;
#[cfg(feature = "internal")]
Expand Down Expand Up @@ -67,6 +68,7 @@ use zenoh_result::ZResult;
use zenoh_shm::api::client_storage::ShmClientStorage;
use zenoh_task::TaskController;

use super::builders::close::{CloseBuilder, Closeable, Closee};
#[cfg(feature = "unstable")]
use crate::api::selector::ZenohParameters;
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -733,8 +735,8 @@ impl Session {
/// subscriber_task.await.unwrap();
/// # }
/// ```
pub fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
self.0.close()
pub fn close(&self) -> CloseBuilder<Self> {
CloseBuilder::new(self)
}

/// Check if the session has been closed.
Expand Down Expand Up @@ -1236,50 +1238,12 @@ impl Session {
})
}
}

impl SessionInner {
pub fn zid(&self) -> ZenohId {
self.runtime.zid()
}

fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
ResolveFuture::new(async move {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
if self.owns_runtime {
info!(zid = %self.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
} else {
primitives.send_close();
}
// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
Ok(())
})
}

pub(crate) fn declare_prefix<'a>(
&'a self,
prefix: &'a str,
Expand Down Expand Up @@ -3146,3 +3110,51 @@ where
{
OpenBuilder::new(config)
}

#[async_trait]
impl Closee for Arc<SessionInner> {
async fn close_inner(&self) {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return;
};

if self.owns_runtime {
info!(zid = %self.zid(), "close session");
self.task_controller.terminate_all_async().await;
self.runtime.get_closee().close_inner().await;
} else {
self.task_controller.terminate_all_async().await;
primitives.send_close();
}

// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
}
}

impl Closeable for Session {
type TClosee = Arc<SessionInner>;

fn get_closee(&self) -> Self::TClosee {
self.0.clone()
}
}
1 change: 1 addition & 0 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub mod session {
pub use crate::api::builders::session::{init, InitBuilder};
pub use crate::api::{
builders::{
close::CloseBuilder,
info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder},
publisher::{SessionDeleteBuilder, SessionPutBuilder},
query::SessionGetBuilder,
Expand Down
Loading

0 comments on commit c6f52c2

Please sign in to comment.