Skip to content

Commit

Permalink
move GetOptions back under Session:: scope
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Nov 21, 2024
1 parent 982e469 commit 95d6da0
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 162 deletions.
1 change: 1 addition & 0 deletions include/zenoh/api.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
#include "api/ext/serialization.hxx"
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
#include "api/ext/publication_cache.hxx"
#include "api/ext/querying_subscriber.hxx"
#endif
94 changes: 89 additions & 5 deletions include/zenoh/api/ext/querying_subscriber.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
#include "../base.hxx"
#include "../get.hxx"
#include "../interop.hxx"
#include "../keyexpr.hxx"
#include "../session.hxx"

namespace zenoh {
namespace ext {
Expand All @@ -31,16 +31,14 @@ class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> {
public:
/// @name Methods

///@copydoc zenoh::GetOptions
using GetOptions = zenoh::GetOptions;

/// @brief Make querying subscriber perform an additional query on a specified selector.
/// The queried samples will be merged with the received publications and made available in the subscriber callback.
/// @param key_expr the key expression matching resources to query.
/// @param options query options.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
void get(const KeyExpr& key_expr, zenoh::GetOptions&& options = zenoh::GetOptions::create_default(),
void get(const KeyExpr& key_expr,
zenoh::Session::GetOptions&& options = zenoh::Session::GetOptions::create_default(),
ZResult* err = nullptr) const {
::z_get_options_t opts;
z_get_options_default(&opts);
Expand Down Expand Up @@ -192,5 +190,91 @@ auto move_to_c_obj(zenoh::ext::QueryingSubscriber<Handler>&& s) {
}
} // namespace interop

template <class C, class D>
[[nodiscard]] ext::QueryingSubscriber<void> Session::declare_querying_subscriber(const KeyExpr& key_expr, C&& on_sample,
D&& on_drop,
QueryingSubscriberOptions&& options,
ZResult* err) const {
static_assert(std::is_invocable_r<void, C, const Sample&>::value,
"on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)");
static_assert(std::is_invocable_r<void, D>::value,
"on_drop should be callable with the following signature: void on_drop()");
::z_owned_closure_sample_t c_closure;
using Cval = std::remove_reference_t<C>;
using Dval = std::remove_reference_t<D>;
using ClosureType = typename detail::closures::Closure<Cval, Dval, void, const Sample&>;
auto closure = ClosureType::into_context(std::forward<C>(on_sample), std::forward<D>(on_drop));
::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure);
::ze_querying_subscriber_options_t opts;
ze_querying_subscriber_options_default(&opts);
opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr);
#if defined(Z_FEATURE_UNSTABLE_API)
opts.allowed_origin = options.allowed_origin;
opts.query_accept_replies = options.query_accept_replies;
#endif
opts.query_target = options.query_target;
opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation);
opts.query_timeout_ms = options.query_timeout_ms;
ext::QueryingSubscriber<void> qs = interop::detail::null<ext::QueryingSubscriber<void>>();
ZResult res = ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs),
interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts);
__ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber");
return qs;
}

template <class C, class D>
void Session::declare_background_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, D&& on_drop,
QueryingSubscriberOptions&& options, ZResult* err) const {
static_assert(std::is_invocable_r<void, C, const Sample&>::value,
"on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)");
static_assert(std::is_invocable_r<void, D>::value,
"on_drop should be callable with the following signature: void on_drop()");
::z_owned_closure_sample_t c_closure;
using Cval = std::remove_reference_t<C>;
using Dval = std::remove_reference_t<D>;
using ClosureType = typename detail::closures::Closure<Cval, Dval, void, const Sample&>;
auto closure = ClosureType::into_context(std::forward<C>(on_sample), std::forward<D>(on_drop));
::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure);
::ze_querying_subscriber_options_t opts;
ze_querying_subscriber_options_default(&opts);
opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr);
#if defined(Z_FEATURE_UNSTABLE_API)
opts.allowed_origin = options.allowed_origin;
opts.query_accept_replies = options.query_accept_replies;
#endif
opts.query_target = options.query_target;
opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation);
;
opts.query_timeout_ms = options.query_timeout_ms;
ZResult res = ::ze_declare_background_querying_subscriber(
interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts);
__ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber");
}

template <class Channel>
[[nodiscard]] ext::QueryingSubscriber<typename Channel::template HandlerType<Sample>>
Session::declare_querying_subscriber(const KeyExpr& key_expr, Channel channel, QueryingSubscriberOptions&& options,
ZResult* err) const {
auto cb_handler_pair = channel.template into_cb_handler_pair<Sample>();
::ze_querying_subscriber_options_t opts;
ze_querying_subscriber_options_default(&opts);
opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr);
#if defined(Z_FEATURE_UNSTABLE_API)
opts.allowed_origin = options.allowed_origin;
opts.query_accept_replies = options.query_accept_replies;
#endif
opts.query_target = options.query_target;
opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation);
opts.query_timeout_ms = options.query_timeout_ms;
ext::QueryingSubscriber<void> qs = interop::detail::null<ext::QueryingSubscriber<void>>();
ZResult res =
::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs),
interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts);
__ZENOH_RESULT_CHECK(res, err, "Failed to declare Querying Subscriber");
if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second));
return ext::QueryingSubscriber<typename Channel::template HandlerType<Sample>>(std::move(qs),
std::move(cb_handler_pair.second));
}

} // namespace zenoh
#endif
73 changes: 0 additions & 73 deletions include/zenoh/api/get.hxx

This file was deleted.

Loading

0 comments on commit 95d6da0

Please sign in to comment.