Skip to content

Commit

Permalink
Merge pull request #537 from elfenpiff/iox2-533-add-timeout-to-waitset
Browse files Browse the repository at this point in the history
[#533] add timeout to waitset
  • Loading branch information
elfenpiff authored Dec 1, 2024
2 parents b20c0a8 + f3cbe8c commit 900c5f4
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 174 deletions.
24 changes: 22 additions & 2 deletions iceoryx2-ffi/cxx/include/iox2/waitset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ class WaitSet {
auto wait_and_process(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError>;

/// Waits until an event arrives on the [`WaitSet`], then
/// collects all events by calling the provided `fn_call` callback with the corresponding
/// [`WaitSetAttachmentId`] and then returns. This makes it ideal to be called in some kind of
/// event-loop.
///
/// The provided callback must return [`CallbackProgression::Continue`] to continue the event
/// processing and handle the next event or [`CallbackProgression::Stop`] to return from this
/// call immediately. All unhandled events will be lost forever and the call will return
/// [`WaitSetRunResult::StopRequest`].
///
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or
/// [`WaitSetRunResult::TerminationRequest`].
///
/// When no signal was received and all events were handled, it will return
/// [`WaitSetRunResult::AllEventsHandled`].
auto wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError>;

/// Waits until an event arrives on the [`WaitSet`] or the provided timeout has passed, then
/// collects all events by calling the provided `fn_call` callback with the corresponding
/// [`WaitSetAttachmentId`] and then returns. This makes it ideal to be called in some kind of
Expand All @@ -154,8 +173,9 @@ class WaitSet {
///
/// When no signal was received and all events were handled, it will return
/// [`WaitSetRunResult::AllEventsHandled`].
auto wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call,
iox::units::Duration timeout) -> iox::expected<WaitSetRunResult, WaitSetRunError>;
auto wait_and_process_once_with_timeout(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call,
iox::units::Duration timeout)
-> iox::expected<WaitSetRunResult, WaitSetRunError>;

/// Returns the capacity of the [`WaitSet`]
auto capacity() const -> uint64_t;
Expand Down
20 changes: 17 additions & 3 deletions iceoryx2-ffi/cxx/src/waitset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,28 @@ auto WaitSet<S>::wait_and_process(const iox::function<CallbackProgression(WaitSe
}

template <ServiceType S>
auto WaitSet<S>::wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call,
const iox::units::Duration timeout)
auto WaitSet<S>::wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError> {
iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST;
auto ctx = internal::ctx(fn_call);
auto result = iox2_waitset_wait_and_process_once(&m_handle, run_callback<S>, static_cast<void*>(&ctx), &run_result);

if (result == IOX2_OK) {
return iox::ok(iox::into<WaitSetRunResult>(static_cast<int>(run_result)));
}

return iox::err(iox::into<WaitSetRunError>(result));
}

template <ServiceType S>
auto WaitSet<S>::wait_and_process_once_with_timeout(
const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call,
const iox::units::Duration timeout) -> iox::expected<WaitSetRunResult, WaitSetRunError> {
iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST;
auto ctx = internal::ctx(fn_call);
auto timeout_secs = timeout.toSeconds();
auto timeout_nsecs = timeout.toNanoseconds() - timeout.toSeconds() * iox::units::Duration::NANOSECS_PER_SEC;
auto result = iox2_waitset_wait_and_process_once(
auto result = iox2_waitset_wait_and_process_once_with_timeout(
&m_handle, run_callback<S>, static_cast<void*>(&ctx), timeout_secs, timeout_nsecs, &run_result);

if (result == IOX2_OK) {
Expand Down
33 changes: 14 additions & 19 deletions iceoryx2-ffi/cxx/tests/src/waitset_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ TYPED_TEST(WaitSetTest, empty_waitset_returns_error_on_run) {

TYPED_TEST(WaitSetTest, empty_waitset_returns_error_on_run_once) {
auto sut = this->create_sut();
auto result =
sut.wait_and_process_once([](auto) { return CallbackProgression::Continue; }, iox::units::Duration::max());
auto result = sut.wait_and_process_once([](auto) { return CallbackProgression::Continue; });

ASSERT_THAT(result.has_error(), Eq(true));
ASSERT_THAT(result.get_error(), Eq(WaitSetRunError::NoAttachments));
Expand Down Expand Up @@ -197,7 +196,7 @@ TYPED_TEST(WaitSetTest, does_not_block_longer_than_provided_timeout) {
auto guard = sut.attach_interval(Duration::max()).expect("");

auto callback_called = false;
auto result = sut.wait_and_process_once(
auto result = sut.wait_and_process_once_with_timeout(
[&](auto attachment_id) -> CallbackProgression {
callback_called = true;
return CallbackProgression::Stop;
Expand All @@ -218,12 +217,10 @@ TYPED_TEST(WaitSetTest, blocks_until_interval_when_user_timeout_is_larger) {
auto guard = sut.attach_interval(TIMEOUT).expect("");

auto callback_called = false;
auto result = sut.wait_and_process_once(
[&](auto attachment_id) -> CallbackProgression {
callback_called = true;
return CallbackProgression::Stop;
},
Duration::max());
auto result = sut.wait_and_process_once([&](auto attachment_id) -> CallbackProgression {
callback_called = true;
return CallbackProgression::Stop;
});

auto end = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count();
Expand Down Expand Up @@ -310,18 +307,16 @@ TYPED_TEST(WaitSetTest, triggering_everything_works) {
std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT.toMilliseconds()));
std::vector<bool> was_triggered(guards.size(), false);

auto result = sut.wait_and_process_once(
[&](auto attachment_id) -> CallbackProgression {
for (uint64_t idx = 0; idx < guards.size(); ++idx) {
if (attachment_id.has_event_from(guards[idx])) {
was_triggered[idx] = true;
break;
}
auto result = sut.wait_and_process_once([&](auto attachment_id) -> CallbackProgression {
for (uint64_t idx = 0; idx < guards.size(); ++idx) {
if (attachment_id.has_event_from(guards[idx])) {
was_triggered[idx] = true;
break;
}
}

return CallbackProgression::Continue;
},
iox::units::Duration::max());
return CallbackProgression::Continue;
});

ASSERT_THAT(result.has_error(), Eq(false));

Expand Down
144 changes: 117 additions & 27 deletions iceoryx2-ffi/ffi/src/api/waitset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ pub unsafe extern "C" fn iox2_waitset_attach_interval(
IOX2_OK
}

/// Waits until an event arrives on the [`iox2_waitset_h`] or the provided timeout has passed, then
/// Waits until an event arrives on the [`iox2_waitset_h`], then
/// collects all events by calling the provided `fn_call` callback with the corresponding
/// [`iox2_waitset_attachment_id_h`] and then returns. This makes it ideal to be called in some kind
/// of event-loop.
Expand Down Expand Up @@ -640,6 +640,88 @@ pub unsafe extern "C" fn iox2_waitset_attach_interval(
/// [`iox2_waitset_attachment_id_drop()`](crate::iox2_waitset_attachment_id_drop())
#[no_mangle]
pub unsafe extern "C" fn iox2_waitset_wait_and_process_once(
handle: iox2_waitset_h_ref,
callback: iox2_waitset_run_callback,
callback_ctx: iox2_callback_context,
result: *mut iox2_waitset_run_result_e,
) -> c_int {
handle.assert_non_null();
debug_assert!(!result.is_null());

let waitset = &mut *handle.as_type();

let run_once_result = match waitset.service_type {
iox2_service_type_e::IPC => {
waitset
.value
.as_ref()
.ipc
.wait_and_process_once(|attachment_id| {
let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc();
(*attachment_id_ptr).init(
waitset.service_type,
AttachmentIdUnion::new_ipc(attachment_id),
iox2_waitset_attachment_id_t::dealloc,
);
let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle();
callback(attachment_id_handle_ptr, callback_ctx).into()
})
}
iox2_service_type_e::LOCAL => {
waitset
.value
.as_ref()
.local
.wait_and_process_once(|attachment_id| {
let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc();
(*attachment_id_ptr).init(
waitset.service_type,
AttachmentIdUnion::new_local(attachment_id),
iox2_waitset_attachment_id_t::dealloc,
);
let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle();
callback(attachment_id_handle_ptr, callback_ctx).into()
})
}
};

match run_once_result {
Ok(v) => {
*result = v.into();
IOX2_OK
}
Err(e) => e.into_c_int(),
}
}

/// Waits until an event arrives on the [`iox2_waitset_h`] or the provided timeout has passed, then
/// collects all events by calling the provided `fn_call` callback with the corresponding
/// [`iox2_waitset_attachment_id_h`] and then returns. This makes it ideal to be called in some kind
/// of event-loop.
///
/// The provided callback must return [`iox2_callback_progression_e::CONTINUE`] to continue the event
/// processing and handle the next event or [`iox2_callback_progression_e::STOP`] to return from this
/// call immediately. All unhandled events will be lost forever and the call will return
/// [`iox2_waitset_run_result_e::STOP_REQUEST`].
///
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user with [`iox2_waitset_run_result_e::INTERRUPT`] or
/// [`iox2_waitset_run_result_e::TERMINATION_REQUEST`].
///
/// When no signal was received and all events were handled, it will return
/// [`iox2_waitset_run_result_e::ALL_EVENTS_HANDLED`].
/// # Return
///
/// `IOX2_OK` on success, otherwise [`iox2_waitset_run_error_e`].
///
/// # Safety
///
/// * `handle` must be valid and acquired with
/// [`iox2_waitset_builder_create()`](crate::iox2_waitset_builder_create())
/// * the provided [`iox2_waitset_attachment_id_h`] in the callback must be released via
/// [`iox2_waitset_attachment_id_drop()`](crate::iox2_waitset_attachment_id_drop())
#[no_mangle]
pub unsafe extern "C" fn iox2_waitset_wait_and_process_once_with_timeout(
handle: iox2_waitset_h_ref,
callback: iox2_waitset_run_callback,
callback_ctx: iox2_callback_context,
Expand All @@ -654,32 +736,40 @@ pub unsafe extern "C" fn iox2_waitset_wait_and_process_once(
let timeout = Duration::from_secs(seconds) + Duration::from_nanos(nanoseconds as u64);

let run_once_result = match waitset.service_type {
iox2_service_type_e::IPC => waitset.value.as_ref().ipc.wait_and_process_once(
|attachment_id| {
let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc();
(*attachment_id_ptr).init(
waitset.service_type,
AttachmentIdUnion::new_ipc(attachment_id),
iox2_waitset_attachment_id_t::dealloc,
);
let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle();
callback(attachment_id_handle_ptr, callback_ctx).into()
},
timeout,
),
iox2_service_type_e::LOCAL => waitset.value.as_ref().local.wait_and_process_once(
|attachment_id| {
let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc();
(*attachment_id_ptr).init(
waitset.service_type,
AttachmentIdUnion::new_local(attachment_id),
iox2_waitset_attachment_id_t::dealloc,
);
let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle();
callback(attachment_id_handle_ptr, callback_ctx).into()
},
timeout,
),
iox2_service_type_e::IPC => waitset
.value
.as_ref()
.ipc
.wait_and_process_once_with_timeout(
|attachment_id| {
let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc();
(*attachment_id_ptr).init(
waitset.service_type,
AttachmentIdUnion::new_ipc(attachment_id),
iox2_waitset_attachment_id_t::dealloc,
);
let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle();
callback(attachment_id_handle_ptr, callback_ctx).into()
},
timeout,
),
iox2_service_type_e::LOCAL => waitset
.value
.as_ref()
.local
.wait_and_process_once_with_timeout(
|attachment_id| {
let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc();
(*attachment_id_ptr).init(
waitset.service_type,
AttachmentIdUnion::new_local(attachment_id),
iox2_waitset_attachment_id_t::dealloc,
);
let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle();
callback(attachment_id_handle_ptr, callback_ctx).into()
},
timeout,
),
};

match run_once_result {
Expand Down
62 changes: 59 additions & 3 deletions iceoryx2/src/port/waitset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ impl<Service: crate::service::Service> WaitSet<Service> {
mut fn_call: F,
) -> Result<WaitSetRunResult, WaitSetRunError> {
loop {
match self.wait_and_process_once(&mut fn_call, Duration::MAX) {
match self.wait_and_process_once(&mut fn_call) {
Ok(WaitSetRunResult::AllEventsHandled) => (),
Ok(v) => return Ok(v),
Err(e) => {
Expand All @@ -758,6 +758,60 @@ impl<Service: crate::service::Service> WaitSet<Service> {
}
}

/// Waits until an event arrives on the [`WaitSet`], then
/// collects all events by calling the provided `fn_call` callback with the corresponding
/// [`WaitSetAttachmentId`] and then returns. This makes it ideal to be called in some kind of
/// event-loop.
///
/// The provided callback must return [`CallbackProgression::Continue`] to continue the event
/// processing and handle the next event or [`CallbackProgression::Stop`] to return from this
/// call immediately. All unhandled events will be lost forever and the call will return
/// [`WaitSetRunResult::StopRequest`].
///
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or
/// [`WaitSetRunResult::TerminationRequest`].
///
/// When no signal was received and all events were handled, it will return
/// [`WaitSetRunResult::AllEventsHandled`].
///
/// # Example
///
/// ```no_run
/// use iceoryx2::prelude::*;
/// # use core::time::Duration;
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let node = NodeBuilder::new().create::<ipc::Service>()?;
/// # let event = node.service_builder(&"MyEventName_1".try_into()?)
/// # .event()
/// # .open_or_create()?;
///
/// let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
///
/// let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
/// // do some event processing
/// CallbackProgression::Continue
/// };
///
/// // main event loop
/// loop {
/// // blocks until an event arrives, handles all arrived events
/// // and then returns.
/// waitset.wait_and_process_once(on_event)?;
/// // do some event post processing
/// println!("handled events");
/// }
///
/// # Ok(())
/// # }
/// ```
pub fn wait_and_process_once<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
fn_call: F,
) -> Result<WaitSetRunResult, WaitSetRunError> {
self.wait_and_process_once_with_timeout(fn_call, Duration::MAX)
}

/// Waits until an event arrives on the [`WaitSet`] or the provided timeout has passed, then
/// collects all events by calling the provided `fn_call` callback with the corresponding
/// [`WaitSetAttachmentId`] and then returns. This makes it ideal to be called in some kind of
Expand Down Expand Up @@ -799,15 +853,17 @@ impl<Service: crate::service::Service> WaitSet<Service> {
/// loop {
/// // blocks until an event arrives or TIMEOUT was reached, handles all arrived events
/// // and then returns.
/// waitset.wait_and_process_once(on_event, TIMEOUT)?;
/// waitset.wait_and_process_once_with_timeout(on_event, TIMEOUT)?;
/// // do some event post processing
/// println!("handled events");
/// }
///
/// # Ok(())
/// # }
/// ```
pub fn wait_and_process_once<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
pub fn wait_and_process_once_with_timeout<
F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression,
>(
&self,
mut fn_call: F,
timeout: Duration,
Expand Down
Loading

0 comments on commit 900c5f4

Please sign in to comment.