From 931ca4903281196547ca9f8323511c46a4e7118d Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Mon, 22 Apr 2024 19:44:04 +0200 Subject: [PATCH] all builds except shm --- Cargo.lock | 9 +- Cargo.toml | 1 + Cargo.toml.in | 1 + build-resources/opaque-types/Cargo.lock | 1 + build-resources/opaque-types/Cargo.toml | 1 + build-resources/opaque-types/src/lib.rs | 135 ++--- include/zenoh-gen.h | 635 +++++++++++++++++++++++- src/closures/sample_closure.rs | 10 +- src/commons.rs | 21 +- src/errors.rs | 5 + src/get.rs | 2 +- src/keyexpr.rs | 4 +- src/lib.rs | 25 +- src/liveliness.rs | 210 +++----- src/payload.rs | 7 +- src/platform/synchronization.rs | 329 +++++------- src/publication_cache.rs | 126 ++--- src/publisher.rs | 2 +- src/put.rs | 141 +++--- src/querying_subscriber.rs | 220 ++++---- src/subscriber.rs | 208 ++++---- src/transmute.rs | 37 +- 22 files changed, 1266 insertions(+), 864 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c10ee7ae4..3ebd15726 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,18 +433,18 @@ checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" [[package]] name = "const_format" -version = "0.2.31" +version = "0.2.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c990efc7a285731f9a4378d81aff2f0e85a2c8781a05ef0f8baa8dac54d0ff48" +checksum = "e3a214c7af3d04997541b18d432afaff4c455e79e2029079647e72fc2bd27673" dependencies = [ "const_format_proc_macros", ] [[package]] name = "const_format_proc_macros" -version = "0.2.31" +version = "0.2.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e026b6ce194a874cb9cf32cd5772d1ef9767cc8fcb5765948d74f37a9d8b2bf6" +checksum = "c7f6ff08fd20f4f299298a28e2dfa8a8ba1036e6cd2460ac1de7b425d76f2500" dependencies = [ "proc-macro2", "quote", @@ -3152,6 +3152,7 @@ dependencies = [ "async-trait", "cbindgen", "chrono", + "const_format", "env_logger", "fs2", "fs_extra", diff --git a/Cargo.toml b/Cargo.toml index 767d9a3a3..4e62df310 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ log = "0.4.17" rand = "0.8.5" spin = "0.9.5" unwrap-infallible = "0.1.5" +const_format = "0.2.32" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` #zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "explicit_api2", features = ["shared-memory", "unstable"], default-features = false } #zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "explicit_api2", features = ["shared-memory"] } diff --git a/Cargo.toml.in b/Cargo.toml.in index b68560103..af0cd4fa0 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -51,6 +51,7 @@ log = "0.4.17" rand = "0.8.5" spin = "0.9.5" unwrap-infallible = "0.1.5" +const_format = "0.2.32" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` #zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "explicit_api2", features = ["shared-memory", "unstable"], default-features = false } #zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "explicit_api2", features = ["shared-memory"] } diff --git a/build-resources/opaque-types/Cargo.lock b/build-resources/opaque-types/Cargo.lock index b3cd2f5b5..87d494976 100644 --- a/build-resources/opaque-types/Cargo.lock +++ b/build-resources/opaque-types/Cargo.lock @@ -1147,6 +1147,7 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" name = "opaque-types" version = "0.1.0" dependencies = [ + "const_format", "zenoh", "zenoh-ext", ] diff --git a/build-resources/opaque-types/Cargo.toml b/build-resources/opaque-types/Cargo.toml index 93f88d05d..a1c020ed4 100644 --- a/build-resources/opaque-types/Cargo.toml +++ b/build-resources/opaque-types/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` # zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false } +const_format = "0.2.32" zenoh = { path = "../../../zenoh/zenoh", features = [ "shared-memory", "unstable", diff --git a/build-resources/opaque-types/src/lib.rs b/build-resources/opaque-types/src/lib.rs index dd5443d73..846e887a3 100644 --- a/build-resources/opaque-types/src/lib.rs +++ b/build-resources/opaque-types/src/lib.rs @@ -1,12 +1,17 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; +use std::sync::Condvar; +use std::sync::Mutex; +use std::sync::MutexGuard; +use std::thread::JoinHandle; use zenoh::config::Config; use zenoh::config::ZenohId; use zenoh::encoding::Encoding; use zenoh::handlers::DefaultHandler; use zenoh::key_expr::KeyExpr; use zenoh::bytes::{ZBytes, ZBytesReader}; +use zenoh::liveliness::LivelinessToken; use zenoh::publication::MatchingListener; use zenoh::publication::Publisher; use zenoh::query::Reply; @@ -14,75 +19,22 @@ use zenoh::queryable::Query; use zenoh::queryable::Queryable; use zenoh::sample::Sample; use zenoh::session::Session; +use zenoh::subscriber::Subscriber; use zenoh::time::Timestamp; use zenoh::value::Value; -// Disabled due to dependency on z_session_t. To be reworked as for autogeneration this dependency is cicrular. -// pub struct FetchingSubscriberWrapper { -// fetching_subscriber: zenoh_ext::FetchingSubscriber<'static, ()>, -// session: z_session_t, -// } - #[macro_export] macro_rules! get_opaque_type_data { ($src_type:ty, $name:ident) => { const _: () = { - const fn get_num_digits(n: usize) -> usize { - let mut out = 0; - let mut res = n; - while res > 0 { - out += 1; - res = res / 10; - } - if out == 0 { - out = 1; - } - out - } - - const fn write_str(src: &[u8], mut dst: [u8; MSG_LEN], offset: usize) -> [u8; MSG_LEN] { - let mut i = 0; - while i < src.len() { - dst[i + offset] = src[i]; - i += 1; - } - dst - } - - const fn write_num(src: usize, mut dst: [u8; MSG_LEN], offset: usize) -> [u8; MSG_LEN] { - let mut i = 0; - let num_digits = get_num_digits(src) as u32; - while i < num_digits { - dst[i as usize + offset] = b'0' + ((src / 10u32.pow(num_digits - i - 1) as usize) % 10) as u8; - i += 1; - } - dst - } - + use const_format::concatcp; const DST_NAME: &str = stringify!($name); const ALIGN: usize = std::mem::align_of::<$src_type>(); const SIZE: usize = std::mem::size_of::<$src_type>(); - const TYPE_TOKEN: [u8; 6] = *b"type: "; - const ALIGN_TOKEN: [u8; 9] = *b", align: "; - const SIZE_TOKEN: [u8; 8] = *b", size: "; - const SIZE_NUM_DIGITS: usize = get_num_digits(SIZE); - const ALIGN_NUM_DIGITS: usize = get_num_digits(ALIGN); - const MSG_LEN: usize = TYPE_TOKEN.len() + ALIGN_TOKEN.len() + SIZE_TOKEN.len() + SIZE_NUM_DIGITS + ALIGN_NUM_DIGITS + DST_NAME.len(); - const TYPE_OFFSET: usize = TYPE_TOKEN.len(); - const ALIGN_OFFSET: usize = TYPE_OFFSET + DST_NAME.len() + ALIGN_TOKEN.len(); - const SIZE_OFFSET: usize = ALIGN_OFFSET + ALIGN_NUM_DIGITS + SIZE_TOKEN.len(); - let mut msg: [u8; MSG_LEN] = [b' '; MSG_LEN]; - - msg = write_str(&TYPE_TOKEN, msg, 0); - msg = write_str(&DST_NAME.as_bytes(), msg, TYPE_OFFSET); - msg = write_str(&ALIGN_TOKEN, msg, ALIGN_OFFSET - ALIGN_TOKEN.len()); - msg = write_num(ALIGN, msg, ALIGN_OFFSET); - msg = write_str(&SIZE_TOKEN, msg, SIZE_OFFSET - SIZE_TOKEN.len()); - msg = write_num(SIZE, msg, SIZE_OFFSET); - - panic!("{}", unsafe { - std::str::from_utf8_unchecked(msg.as_slice()) - }); + const INFO_MESSAGE: &str = concatcp!( + "type: ", DST_NAME, ", align: ", ALIGN, ", size: ", SIZE + ); + panic!("{}", INFO_MESSAGE); }; } } @@ -151,14 +103,18 @@ get_opaque_type_data!(&'static Query, z_query_t); get_opaque_type_data!(Option>, z_owned_queryable_t); get_opaque_type_data!(&'static Queryable<'static, ()>, z_queryable_t); -// get_opaque_type_data!( -// Option>, -// ze_owned_querying_subscriber_t -// ); -// get_opaque_type_data!( -// &'static FetchingSubscriberWrapper, -// ze_querying_subscriber_t -// ); +/// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. +/// +/// Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +get_opaque_type_data!(Option<(zenoh_ext::FetchingSubscriber<'static, ()>, &'static Session)>, ze_owned_querying_subscriber_t); +get_opaque_type_data!(&'static (zenoh_ext::FetchingSubscriber<'static, ()>, &'static Session), ze_querying_subscriber_t); /// A zenoh-allocated key expression. /// @@ -255,3 +211,48 @@ get_opaque_type_data!(&'static Publisher<'static>, z_publisher_t); /// /// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. get_opaque_type_data!(Option>, zcu_owned_matching_listener_t); + + +/// An owned zenoh subscriber. Destroying the subscriber cancels the subscription. +/// +/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +get_opaque_type_data!(Option>, z_owned_subscriber_t); +get_opaque_type_data!(&'static Subscriber<'static, ()>, z_subscriber_t); + +/// A liveliness token that can be used to provide the network with information about connectivity to its +/// declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key +/// expressions. +/// +/// A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. +get_opaque_type_data!(Option>, zc_owned_liveliness_token_t); +get_opaque_type_data!(&'static LivelinessToken<'static>, zc_liveliness_token_t); + + +/// An owned zenoh publication_cache. +/// +/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +get_opaque_type_data!(Option>, ze_owned_publication_cache_t); +get_opaque_type_data!(&'static zenoh_ext::PublicationCache<'static>, ze_publication_cache_t); + + +get_opaque_type_data!(Option<(Mutex<()>, Option>)>, z_owned_mutex_t); +get_opaque_type_data!(&'static (Mutex<()>, Option>), z_mutex_t); + +get_opaque_type_data!(Option, z_owned_condvar_t); +get_opaque_type_data!(&'static Condvar, z_condvar_t); + +get_opaque_type_data!(Option>, z_owned_task_t); \ No newline at end of file diff --git a/include/zenoh-gen.h b/include/zenoh-gen.h index 1761f69b2..19b3fc6c9 100644 --- a/include/zenoh-gen.h +++ b/include/zenoh-gen.h @@ -90,6 +90,17 @@ typedef enum z_query_target_t { Z_QUERY_TARGET_T_ALL_COMPLETE, } z_query_target_t; +/** + * The subscription reliability. + * + * - **Z_RELIABILITY_BEST_EFFORT** + * - **Z_RELIABILITY_RELIABLE** + */ +typedef enum z_reliability_t { + Z_RELIABILITY_T_BEST_EFFORT, + Z_RELIABILITY_T_RELIABLE, +} z_reliability_t; + typedef enum z_sample_kind_t { Z_SAMPLE_KIND_T_PUT = 0, Z_SAMPLE_KIND_T_DELETE = 1, @@ -173,6 +184,15 @@ typedef struct ALIGN(8) z_bytes_reader_t { uint8_t _0[8]; } z_bytes_reader_t; +/** + * Clock + * Uses monotonic clock + */ +typedef struct z_clock_t { + uint64_t t; + const void *t_base; +} z_clock_t; + /** * An owned zenoh session. * @@ -346,7 +366,7 @@ typedef struct ALIGN(8) z_sample_t { * * Members: * void *context: a pointer to an arbitrary state. - * void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument. + * void *call(struct z_sample_t, const void *context): the typical callback function. `context` will be passed as its last argument. * void *drop(void*): allows the callback's state to be freed. * * Closures are not guaranteed not to be called concurrently. @@ -359,7 +379,7 @@ typedef struct ALIGN(8) z_sample_t { */ typedef struct z_owned_closure_sample_t { void *context; - void (*call)(const struct z_sample_t*, void *context); + void (*call)(struct z_sample_t, void *context); void (*drop)(void*); } z_owned_closure_sample_t; @@ -385,6 +405,18 @@ typedef struct z_owned_closure_zid_t { void (*drop)(void*); } z_owned_closure_zid_t; +typedef struct ALIGN(8) z_owned_condvar_t { + uint8_t _0[24]; +} z_owned_condvar_t; + +typedef struct ALIGN(8) z_condvar_t { + uint8_t _0[8]; +} z_condvar_t; + +typedef struct ALIGN(8) z_mutex_t { + uint8_t _0[8]; +} z_mutex_t; + /** * An owned zenoh configuration. * @@ -512,6 +544,40 @@ typedef struct ALIGN(8) z_owned_queryable_t { uint8_t _0[32]; } z_owned_queryable_t; +/** + * An owned zenoh subscriber. Destroying the subscriber cancels the subscription. + * + * Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ALIGN(8) z_owned_subscriber_t { + uint8_t _0[32]; +} z_owned_subscriber_t; + +/** + * Options passed to the :c:func:`z_declare_subscriber` or :c:func:`z_declare_pull_subscriber` function. + * + * Members: + * z_reliability_t reliability: The subscription reliability. + */ +typedef struct z_subscriber_options_t { + enum z_reliability_t reliability; +} z_subscriber_options_t; + +/** + * Options passed to the :c:func:`z_delete` function. + */ +typedef struct z_delete_options_t { + enum z_congestion_control_t congestion_control; + enum z_priority_t priority; +} z_delete_options_t; + typedef struct ALIGN(8) z_owned_encoding_t { uint8_t _0[48]; } z_owned_encoding_t; @@ -579,6 +645,10 @@ typedef struct z_hello_t { struct z_str_array_t locators; } z_hello_t; +typedef struct ALIGN(8) z_owned_mutex_t { + uint8_t _0[32]; +} z_owned_mutex_t; + typedef struct ALIGN(8) z_publisher_t { uint8_t _0[8]; } z_publisher_t; @@ -603,6 +673,22 @@ typedef struct z_publisher_put_options_t { struct z_owned_bytes_t *attachment; } z_publisher_put_options_t; +/** + * Options passed to the :c:func:`z_put` function. + * + * Members: + * z_encoding_t encoding: The encoding of the payload. + * z_congestion_control_t congestion_control: The congestion control to apply when routing this message. + * z_priority_t priority: The priority of this message. + * z_bytes_t attachment: The attachment to this message. + */ +typedef struct z_put_options_t { + struct z_owned_encoding_t *encoding; + enum z_congestion_control_t congestion_control; + enum z_priority_t priority; + struct z_owned_bytes_t *attachment; +} z_put_options_t; + /** * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: * - `this` is a pointer to an arbitrary state. @@ -707,6 +793,58 @@ typedef struct z_owned_scouting_config_t { */ typedef bool (*z_slice_map_iter_body_t)(struct z_slice_t key, struct z_slice_t value, void *context); +typedef struct ALIGN(8) z_subscriber_t { + uint8_t _0[8]; +} z_subscriber_t; + +typedef struct ALIGN(8) z_owned_task_t { + uint8_t _0[24]; +} z_owned_task_t; + +typedef struct z_task_attr_t { + size_t _0; +} z_task_attr_t; + +/** + * Time + * Uses system clock + */ +typedef struct z_time_t { + uint64_t t; +} z_time_t; + +/** + * The options for `zc_liveliness_declare_token` + */ +typedef struct zc_liveliness_declaration_options_t { + uint8_t _dummy; +} zc_liveliness_declaration_options_t; + +/** + * The options for :c:func:`zc_liveliness_declare_subscriber` + */ +typedef struct zc_liveliness_declare_subscriber_options_t { + uint8_t _dummy; +} zc_liveliness_declare_subscriber_options_t; + +/** + * A liveliness token that can be used to provide the network with information about connectivity to its + * declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key + * expressions. + * + * A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. + */ +typedef struct ALIGN(8) zc_owned_liveliness_token_t { + uint8_t _0[32]; +} zc_owned_liveliness_token_t; + +/** + * The options for :c:func:`zc_liveliness_declare_subscriber` + */ +typedef struct zc_liveliness_get_options_t { + uint32_t timeout_ms; +} zc_liveliness_get_options_t; + /** * An owned sample. * @@ -765,6 +903,85 @@ typedef struct ALIGN(8) zcu_owned_matching_listener_t { uint8_t _0[40]; } zcu_owned_matching_listener_t; +/** + * An owned zenoh publication_cache. + * + * Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ALIGN(8) ze_owned_publication_cache_t { + uint8_t _0[96]; +} ze_owned_publication_cache_t; + +/** + * Options passed to the :c:func:`ze_declare_publication_cache` function. + * + * Members: + * z_keyexpr_t queryable_prefix: The prefix used for queryable + * zcu_locality_t queryable_origin: The restriction for the matching queries that will be receive by this + * publication cache + * bool queryable_complete: the `complete` option for the queryable + * size_t history: The the history size + * size_t resources_limit: The limit number of cached resources + */ +typedef struct ze_publication_cache_options_t { + const struct z_keyexpr_t *queryable_prefix; + enum zcu_locality_t queryable_origin; + bool queryable_complete; + size_t history; + size_t resources_limit; +} ze_publication_cache_options_t; + +/** + * An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. + * + * Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ALIGN(8) ze_owned_querying_subscriber_t { + uint8_t _0[64]; +} ze_owned_querying_subscriber_t; + +/** + * Represents the set of options that can be applied to a querying subscriber, + * upon its declaration via :c:func:`ze_declare_querying_subscriber`. + * + * Members: + * z_reliability_t reliability: The subscription reliability. + * zcu_locality_t allowed_origin: The restriction for the matching publications that will be + * receive by this subscriber. + * z_keyexpr_t query_selector: The selector to be used for queries. + * z_query_target_t query_target: The target to be used for queries. + * z_query_consolidation_t query_consolidation: The consolidation mode to be used for queries. + * zcu_reply_keyexpr_t query_accept_replies: The accepted replies for queries. + * uint64_t query_timeout_ms: The timeout to be used for queries. + */ +typedef struct ze_querying_subscriber_options_t { + enum z_reliability_t reliability; + enum zcu_locality_t allowed_origin; + const struct z_keyexpr_t *query_selector; + enum z_query_target_t query_target; + struct z_query_consolidation_t query_consolidation; + enum zcu_reply_keyexpr_t query_accept_replies; + uint64_t query_timeout_ms; +} ze_querying_subscriber_options_t; + +typedef struct ALIGN(8) ze_querying_subscriber_t { + uint8_t _0[8]; +} ze_querying_subscriber_t; + #define Z_OK 0 #define Z_EINVAL -1 @@ -775,6 +992,14 @@ typedef struct ALIGN(8) zcu_owned_matching_listener_t { #define Z_ENETWORK -4 +#define Z_EBUSY_MUTEX -16 + +#define Z_EINVAL_MUTEX -22 + +#define Z_EAGAIN_MUTEX -11 + +#define Z_EPOISON_MUTEX -22 + #define Z_EGENERIC INT8_MIN extern const unsigned int Z_ROUTER; @@ -917,6 +1142,14 @@ ZCError z_bytes_reader_seek(struct z_bytes_reader_t reader, */ ZENOHC_API int64_t z_bytes_reader_tell(struct z_bytes_reader_t reader); +ZENOHC_API uint64_t z_clock_elapsed_ms(const struct z_clock_t *time); + +ZENOHC_API uint64_t z_clock_elapsed_s(const struct z_clock_t *time); + +ZENOHC_API uint64_t z_clock_elapsed_us(const struct z_clock_t *time); + +ZENOHC_API struct z_clock_t z_clock_now(void); + /** * Closes a zenoh session. This drops and invalidates `session` for double-drop safety. * @@ -998,7 +1231,7 @@ ZENOHC_API struct z_owned_closure_reply_t z_closure_reply_null(void); */ ZENOHC_API void z_closure_sample_call(const struct z_owned_closure_sample_t *closure, - const struct z_sample_t *sample); + struct z_sample_t sample); /** * Drops the closure. Droping an uninitialized closure is a no-op. @@ -1027,6 +1260,20 @@ ZENOHC_API void z_closure_zid_drop(struct z_owned_closure_zid_t *closure); */ ZENOHC_API struct z_owned_closure_zid_t z_closure_zid_null(void); +ZENOHC_API bool z_condvar_check(const struct z_owned_condvar_t *this_); + +ZENOHC_API void z_condvar_drop(struct z_owned_condvar_t *this_); + +ZENOHC_API void z_condvar_init(struct z_owned_condvar_t *this_); + +ZENOHC_API struct z_condvar_t z_condvar_loan(const struct z_owned_condvar_t *this_); + +ZENOHC_API void z_condvar_null(struct z_owned_condvar_t *this_); + +ZENOHC_API ZCError z_condvar_signal(struct z_condvar_t this_); + +ZENOHC_API ZCError z_condvar_wait(struct z_condvar_t this_, struct z_mutex_t m); + /** * Returns ``true`` if `config` is valid. */ @@ -1152,6 +1399,66 @@ ZCError z_declare_queryable(struct z_session_t session, const struct z_queryable_options_t *options, struct z_owned_queryable_t *this_); +/** + * Declare a subscriber for a given key expression. + * + * Parameters: + * session: The zenoh session. + * key_expr: The key expression to subscribe. + * callback: The callback function that will be called each time a data matching the subscribed expression is received. + * opts: The options to be passed to describe the options to be passed to the subscriber declaration. + * + * Returns: + * A :c:type:`z_owned_subscriber_t`. + * + * To check if the subscription succeeded and if the subscriber is still valid, + * you may use `z_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + * + * Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * Example: + * Declaring a subscriber passing `NULL` for the options: + * + * .. code-block:: C + * + * z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); + * + * is equivalent to initializing and passing the default subscriber options: + * + * .. code-block:: C + * + * z_subscriber_options_t opts = z_subscriber_options_default(); + * z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); + */ +ZENOHC_API +ZCError z_declare_subscriber(struct z_owned_subscriber_t *this_, + struct z_session_t session, + struct z_keyexpr_t key_expr, + struct z_owned_closure_sample_t *callback, + struct z_subscriber_options_t options); + +/** + * Delete data. + * + * Parameters: + * session: The zenoh session. + * key_expr: The key expression to delete. + * options: The put options. + * Returns: + * ``0`` in case of success, negative values in case of failure. + */ +ZENOHC_API +ZCError z_delete(struct z_session_t session, + struct z_keyexpr_t key_expr, + struct z_delete_options_t opts); + +/** + * Constructs the default value for :c:type:`z_put_options_t`. + */ +ZENOHC_API struct z_delete_options_t z_delete_options_default(void); + /** * Returns ``true`` if `encoding` is valid. */ @@ -1421,6 +1728,20 @@ ZENOHC_API void z_keyexpr_unchecked(struct z_owned_keyexpr_t *this_, const char *name); +ZENOHC_API bool z_mutex_check(const struct z_owned_mutex_t *this_); + +ZENOHC_API void z_mutex_drop(struct z_owned_mutex_t *this_); + +ZENOHC_API ZCError z_mutex_init(struct z_owned_mutex_t *this_); + +ZENOHC_API struct z_mutex_t z_mutex_loan(const struct z_owned_mutex_t *this_); + +ZENOHC_API ZCError z_mutex_lock(struct z_mutex_t this_); + +ZENOHC_API ZCError z_mutex_try_lock(struct z_mutex_t this_); + +ZENOHC_API ZCError z_mutex_unlock(struct z_mutex_t this_); + /** * Opens a zenoh session. Should the session opening fail, `z_check` ing the returned value will return `false`. * Config value is always consumed upon function return. @@ -1498,6 +1819,32 @@ ZCError z_publisher_put(struct z_publisher_t publisher, */ ZENOHC_API struct z_publisher_put_options_t z_publisher_put_options_default(void); +/** + * Put data, transfering its ownership. + * + * + * The payload's encoding and attachment can be sepcified through the options. These values are consumed upon function + * return. + * + * Parameters: + * session: The zenoh session. + * key_expr: The key expression to put. + * payload: The value to put (consumed upon function return). + * options: The put options. + * Returns: + * ``0`` in case of success, negative error values in case of failure. + */ +ZENOHC_API +ZCError z_put(struct z_session_t session, + struct z_keyexpr_t key_expr, + struct z_owned_bytes_t *payload, + struct z_put_options_t options); + +/** + * Constructs the default value for :c:type:`z_put_options_t`. + */ +ZENOHC_API struct z_put_options_t z_put_options_default(void); + /** * Gets the attachment to the query by aliasing. * @@ -1666,6 +2013,16 @@ ZENOHC_API void z_queryable_null(struct z_owned_queryable_t *this_); */ ZENOHC_API struct z_queryable_options_t z_queryable_options_default(void); +ZENOHC_API void z_random_fill(void *buf, size_t len); + +ZENOHC_API uint16_t z_random_u16(void); + +ZENOHC_API uint32_t z_random_u32(void); + +ZENOHC_API uint64_t z_random_u64(void); + +ZENOHC_API uint8_t z_random_u8(void); + /** * Calls the closure. Calling an uninitialized closure is a no-op. */ @@ -1832,6 +2189,12 @@ struct z_session_t z_session_loan(const struct z_owned_session_t *s); */ ZENOHC_API void z_session_null(struct z_owned_session_t *s); +ZENOHC_API int8_t z_sleep_ms(size_t time); + +ZENOHC_API int8_t z_sleep_s(size_t time); + +ZENOHC_API int8_t z_sleep_us(size_t time); + /** * Returns ``true`` if `b` is initialized. */ @@ -1976,6 +2339,61 @@ ZENOHC_API const char *z_str_loan(const struct z_owned_str_t *s); */ ZENOHC_API struct z_owned_str_t z_str_null(void); +/** + * Returns ``true`` if `sub` is valid. + */ +ZENOHC_API bool z_subscriber_check(const struct z_owned_subscriber_t *subscriber); + +/** + * Returns the key expression of the subscriber. + */ +ZENOHC_API struct z_keyexpr_t z_subscriber_keyexpr(struct z_subscriber_t subscriber); + +/** + * Returns a :c:type:`z_subscriber_t` loaned from `this`. + */ +ZENOHC_API struct z_subscriber_t z_subscriber_loan(const struct z_owned_subscriber_t *this_); + +/** + * Constructs a null safe-to-drop value of 'z_owned_subscriber_t' type + */ +ZENOHC_API void z_subscriber_null(struct z_owned_subscriber_t *this_); + +/** + * Constructs the default value for :c:type:`z_subscriber_options_t`. + */ +ZENOHC_API struct z_subscriber_options_t z_subscriber_options_default(void); + +ZENOHC_API bool z_task_check(const struct z_owned_task_t *this_); + +/** + * Detaches the task and releases all allocated resources. + */ +ZENOHC_API void z_task_detach(struct z_owned_task_t *this_); + +ZENOHC_API +ZCError z_task_init(struct z_owned_task_t *this_, + const struct z_task_attr_t *_attr, + void (*fun)(void *arg), + void *arg); + +/** + * Joins the task and releases all allocated resources + */ +ZENOHC_API ZCError z_task_join(struct z_owned_task_t *this_); + +ZENOHC_API void z_task_null(struct z_owned_task_t *this_); + +ZENOHC_API uint64_t z_time_elapsed_ms(const struct z_time_t *time); + +ZENOHC_API uint64_t z_time_elapsed_s(const struct z_time_t *time); + +ZENOHC_API uint64_t z_time_elapsed_us(const struct z_time_t *time); + +ZENOHC_API struct z_time_t z_time_now(void); + +ZENOHC_API const char *z_time_now_as_str(const char *buf, size_t len); + ZENOHC_API struct z_id_t z_timestamp_get_id(const struct z_timestamp_t *timestamp); ZENOHC_API uint64_t z_timestamp_npt64_time(const struct z_timestamp_t *timestamp); @@ -2000,6 +2418,12 @@ ZCError z_undeclare_publisher(struct z_owned_publisher_t *publisher); */ ZENOHC_API ZCError z_undeclare_queryable(struct z_owned_queryable_t *qable); +/** + * Undeclares the given :c:type:`z_owned_subscriber_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +ZCError z_undeclare_subscriber(struct z_owned_subscriber_t *subscriber); + /** * Converts the kind of zenoh entity into a string. * @@ -2094,6 +2518,82 @@ void zc_keyexpr_from_slice_unchecked(struct z_owned_keyexpr_t *this_, const char *start, size_t len); +ZENOHC_API +struct zc_liveliness_declaration_options_t zc_liveliness_declaration_options_default(void); + +/** + * Declares a subscriber on liveliness tokens that intersect `key`. + * + * Parameters: + * z_session_t session: The zenoh session. + * z_keyexpr_t key_expr: The key expression to subscribe. + * z_owned_closure_sample_t callback: The callback function that will be called each time a + * liveliness token status changed. + * zc_owned_liveliness_declare_subscriber_options_t _options: The options to be passed to describe the options to be passed to the liveliness subscriber declaration. + * + * Returns: + * A :c:type:`z_owned_subscriber_t`. + * + * To check if the subscription succeeded and if the subscriber is still valid, + * you may use `z_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +ZENOHC_API +ZCError zc_liveliness_declare_subscriber(struct z_owned_subscriber_t *this_, + struct z_session_t session, + struct z_keyexpr_t key_expr, + struct z_owned_closure_sample_t *callback, + struct zc_liveliness_declare_subscriber_options_t _options); + +/** + * Constructs and declares a liveliness token on the network. + * + * Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity + * is achieved, and a DELETE sample if it's lost. + * + * Passing `NULL` as options is valid and equivalent to a pointer to the default options. + */ +ZENOHC_API +ZCError zc_liveliness_declare_token(struct zc_owned_liveliness_token_t *this_, + struct z_session_t session, + struct z_keyexpr_t key_expr, + struct zc_liveliness_declaration_options_t _options); + +/** + * Queries liveliness tokens currently on the network with a key expression intersecting with `key`. + * + * Note that the same "value stealing" tricks apply as with a normal :c:func:`z_get` + * + * Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. + */ +ZENOHC_API +ZCError zc_liveliness_get(struct z_session_t session, + struct z_keyexpr_t key_expr, + struct z_owned_closure_reply_t *callback, + struct zc_liveliness_get_options_t options); + +/** + * The gravestone value for `zc_liveliness_get_options_t` + */ +ZENOHC_API struct zc_liveliness_get_options_t zc_liveliness_get_options_default(void); + +ZENOHC_API +struct zc_liveliness_declare_subscriber_options_t zc_liveliness_subscriber_options_default(void); + +/** + * Returns `true` unless the token is at its gravestone value. + */ +ZENOHC_API bool zc_liveliness_token_check(const struct zc_owned_liveliness_token_t *this_); + +/** + * The gravestone value for liveliness tokens. + */ +ZENOHC_API void zc_liveliness_token_null(struct zc_owned_liveliness_token_t *this_); + +/** + * Destroys a liveliness token, notifying subscribers of its destruction. + */ +ZENOHC_API ZCError zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *this_); + /** * Creates a new blocking fifo channel, returned as a pair of closures. * @@ -2214,3 +2714,132 @@ ZCError zcu_publisher_matching_listener_callback(struct z_publisher_t publisher, struct zcu_owned_matching_listener_t *this_); ZENOHC_API enum zcu_reply_keyexpr_t zcu_reply_keyexpr_default(void); + +/** + * Declares a Publication Cache. + * + * Parameters: + * z_session_t session: The zenoh session. + * z_keyexpr_t key_expr: The key expression to publish. + * ze_publication_cache_options_t options: Additional options for the publication_cache. + * + * Returns: + * :c:type:`ze_owned_publication_cache_t`. + * + * + * Example: + * Declaring a publication cache `NULL` for the options: + * + * .. code-block:: C + * + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL); + * + * is equivalent to initializing and passing the default publication cache options: + * + * .. code-block:: C + * + * ze_publication_cache_options_t opts = ze_publication_cache_options_default(); + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts); + */ +ZENOHC_API +ZCError ze_declare_publication_cache(struct ze_owned_publication_cache_t *this_, + struct z_session_t session, + struct z_keyexpr_t key_expr, + struct ze_publication_cache_options_t options); + +/** + * Declares a Querying Subscriber for a given key expression. + * + * Parameters: + * z_session_t session: The zenoh session. + * z_keyexpr_t keyexpr: The key expression to subscribe. + * z_owned_closure_sample_t callback: The callback function that will be called each time a data matching the subscribed expression is received. + * ze_querying_subscriber_options_t options: Additional options for the querying subscriber. + * + * Returns: + * :c:type:`ze_owned_subscriber_t`. + * + * To check if the subscription succeeded and if the querying subscriber is still valid, + * you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * Example: + * Declaring a subscriber passing ``NULL`` for the options: + * + * .. code-block:: C + * + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); + * + * is equivalent to initializing and passing the default subscriber options: + * + * .. code-block:: C + * + * z_subscriber_options_t opts = z_subscriber_options_default(); + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); + */ +ZENOHC_API +ZCError ze_declare_querying_subscriber(struct ze_owned_querying_subscriber_t *this_, + struct z_session_t session, + struct z_keyexpr_t key_expr, + struct z_owned_closure_sample_t *callback, + struct ze_querying_subscriber_options_t options); + +/** + * Returns ``true`` if `pub_cache` is valid. + */ +ZENOHC_API bool ze_publication_cache_check(const struct ze_owned_publication_cache_t *this_); + +/** + * Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type + */ +ZENOHC_API void ze_publication_cache_null(struct ze_owned_publication_cache_t *this_); + +/** + * Constructs the default value for :c:type:`ze_publication_cache_options_t`. + */ +ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_default(void); + +/** + * Returns ``true`` if `this` is valid. + */ +ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *this_); + +/** + * Make a :c:type:`ze_owned_querying_subscriber_t` to perform an additional query on a specified selector. + * The queried samples will be merged with the received publications and made available in the subscriber callback. + */ +ZENOHC_API +ZCError ze_querying_subscriber_get(struct ze_querying_subscriber_t sub, + struct z_keyexpr_t selector, + const struct z_get_options_t *options); + +/** + * Returns a :c:type:`ze_querying_subscriber_loan` loaned from `this`. + */ +ZENOHC_API +struct ze_querying_subscriber_t ze_querying_subscriber_loan(const struct ze_owned_querying_subscriber_t *this_); + +/** + * Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type + */ +ZENOHC_API void ze_querying_subscriber_null(struct ze_owned_querying_subscriber_t *this_); + +/** + * Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. + */ +ZENOHC_API struct ze_querying_subscriber_options_t ze_querying_subscriber_options_default(void); + +/** + * Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +ZCError ze_undeclare_publication_cache(struct ze_owned_publication_cache_t *this_); + +/** + * Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +ZCError ze_undeclare_querying_subscriber(struct ze_owned_querying_subscriber_t *this_); diff --git a/src/closures/sample_closure.rs b/src/closures/sample_closure.rs index 6f0e55707..4d95cf177 100644 --- a/src/closures/sample_closure.rs +++ b/src/closures/sample_closure.rs @@ -4,7 +4,7 @@ use libc::c_void; /// /// Members: /// void *context: a pointer to an arbitrary state. -/// void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument. +/// void *call(struct z_sample_t, const void *context): the typical callback function. `context` will be passed as its last argument. /// void *drop(void*): allows the callback's state to be freed. /// /// Closures are not guaranteed not to be called concurrently. @@ -17,7 +17,7 @@ use libc::c_void; #[repr(C)] pub struct z_owned_closure_sample_t { context: *mut c_void, - call: Option, + call: Option, drop: Option, } impl z_owned_closure_sample_t { @@ -47,7 +47,7 @@ pub extern "C" fn z_closure_sample_null() -> z_owned_closure_sample_t { /// Calls the closure. Calling an uninitialized closure is a no-op. #[no_mangle] -pub extern "C" fn z_closure_sample_call(closure: &z_owned_closure_sample_t, sample: &z_sample_t) { +pub extern "C" fn z_closure_sample_call(closure: &z_owned_closure_sample_t, sample: z_sample_t) { match closure.call { Some(call) => call(sample, closure.context), None => log::error!("Attempted to call an uninitialized closure!"), @@ -60,10 +60,10 @@ pub extern "C" fn z_closure_sample_drop(closure: &mut z_owned_closure_sample_t) let mut empty_closure = z_owned_closure_sample_t::empty(); std::mem::swap(&mut empty_closure, closure); } -impl From for z_owned_closure_sample_t { +impl From for z_owned_closure_sample_t { fn from(f: F) -> Self { let this = Box::into_raw(Box::new(f)) as _; - extern "C" fn call(sample: &z_sample_t, this: *mut c_void) { + extern "C" fn call(sample: z_sample_t, this: *mut c_void) { let this = unsafe { &*(this as *const F) }; this(sample) } diff --git a/src/commons.rs b/src/commons.rs index ba29657be..fb245e40b 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -19,6 +19,7 @@ use std::str::FromStr; use crate::transmute::unwrap_ref_unchecked; use crate::transmute::Inplace; use crate::transmute::TransmuteCopy; +use crate::transmute::TransmuteFromHandle; use crate::transmute::TransmuteIntoHandle; use crate::transmute::TransmuteRef; use crate::transmute::TransmuteUninitPtr; @@ -87,34 +88,34 @@ pub unsafe extern "C" fn z_timestamp_get_id(timestamp: &z_timestamp_t) -> z_id_t /// /// A sample is the value associated to a given resource at a given point in time. use crate::opaque_types::z_sample_t; -decl_transmute_copy!(&'static Sample, z_sample_t); +decl_transmute_handle!(Sample, z_sample_t); /// The Key Expression of the sample. /// /// `sample` is aliased by its return value. #[no_mangle] pub extern "C" fn z_sample_keyexpr(sample: &z_sample_t) -> z_keyexpr_t { - let sample = sample.transmute_copy(); + let sample = sample.transmute_ref(); sample.key_expr().into() } /// The encoding of the payload. #[no_mangle] pub extern "C" fn z_sample_encoding(sample: z_sample_t) -> z_encoding_t { - let sample = sample.transmute_copy(); + let sample = sample.transmute_ref(); sample.encoding().transmute_copy() } /// The sample's data, the return value aliases the sample. /// #[no_mangle] pub extern "C" fn z_sample_payload(sample: &z_sample_t) -> z_bytes_t { - let sample = sample.transmute_copy(); + let sample = sample.transmute_ref(); sample.payload().transmute_handle() } /// The sample's kind (put or delete). #[no_mangle] pub extern "C" fn z_sample_kind(sample: &z_sample_t) -> z_sample_kind_t { - let sample = sample.transmute_copy(); + let sample = sample.transmute_ref(); sample.kind().into() } /// The samples timestamp @@ -122,7 +123,7 @@ pub extern "C" fn z_sample_kind(sample: &z_sample_t) -> z_sample_kind_t { /// Returns true if Sample contains timestamp, false otherwise. In the latter case the timestamp_out value is not altered. #[no_mangle] pub extern "C" fn z_sample_timestamp(sample: &z_sample_t, timestamp_out: &mut z_timestamp_t) -> bool { - let sample = sample.transmute_copy(); + let sample = sample.transmute_ref(); if let Some(t) = sample.timestamp() { *timestamp_out = t.transmute_copy(); true @@ -136,7 +137,7 @@ pub extern "C" fn z_sample_timestamp(sample: &z_sample_t, timestamp_out: &mut z_ /// Checks if sample contains an attachment. #[no_mangle] pub extern "C" fn z_sample_has_attachment(sample: z_sample_t) -> bool { - let sample = sample.transmute_copy(); + let sample = sample.transmute_ref(); sample.attachment().is_some() } @@ -145,7 +146,7 @@ pub extern "C" fn z_sample_has_attachment(sample: z_sample_t) -> bool { /// Before calling this function, ensure that `zc_sample_has_attachment` returns true #[no_mangle] pub extern "C" fn z_sample_attachment(sample: z_sample_t) -> z_bytes_t { - let sample = sample.transmute_copy(); + let sample = sample.transmute_ref(); sample.attachment().expect("Sample does not have an attachment").transmute_handle() } @@ -155,7 +156,7 @@ decl_transmute_owned!(Option, zc_owned_sample_t); /// Clone a sample in the cheapest way available. #[no_mangle] pub extern "C" fn zc_sample_clone(src: &z_sample_t, dst: *mut MaybeUninit) { - let src = src.transmute_copy(); + let src = src.transmute_ref(); let src = src.clone(); let dst = dst.transmute_uninit_ptr(); Inplace::init(dst, Some(src)); @@ -176,7 +177,7 @@ pub extern "C" fn zc_sample_check(sample: &zc_owned_sample_t) -> bool { /// Calling this function using a dropped sample is undefined behaviour. #[no_mangle] pub extern "C" fn zc_sample_loan(sample: &'static zc_owned_sample_t) -> z_sample_t { - unwrap_ref_unchecked(sample.transmute_ref()).transmute_copy() + unwrap_ref_unchecked(sample.transmute_ref()).transmute_handle() } /// Destroy the sample. diff --git a/src/errors.rs b/src/errors.rs index 90e8508de..298752442 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -4,4 +4,9 @@ pub const Z_EINVAL: ZCError = -1; pub const Z_EPARSE: ZCError = -2; pub const Z_EIO: ZCError = -3; pub const Z_ENETWORK: ZCError = -4; +// negativ pthread error codes (due to convention to return negative values on error) +pub const Z_EBUSY_MUTEX: ZCError = -16; +pub const Z_EINVAL_MUTEX: ZCError = -22; +pub const Z_EAGAIN_MUTEX: ZCError = -11; +pub const Z_EPOISON_MUTEX: ZCError = -22; // same as Z_EINVAL_MUTEX pub const Z_EGENERIC: ZCError = i8::MIN; \ No newline at end of file diff --git a/src/get.rs b/src/get.rs index de8e9d183..9a6855559 100644 --- a/src/get.rs +++ b/src/get.rs @@ -62,7 +62,7 @@ pub unsafe extern "C" fn z_reply_is_ok(reply: z_reply_t) -> bool { #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn z_reply_ok(reply: z_reply_t) -> z_sample_t { let reply = reply.transmute_ref(); - reply.result().expect("Reply does not contain a sample").transmute_copy() + reply.result().expect("Reply does not contain a sample").transmute_handle() } /// Yields the contents of the reply by asserting it indicates a failure. diff --git a/src/keyexpr.rs b/src/keyexpr.rs index cf19224fa..9d4cc194b 100644 --- a/src/keyexpr.rs +++ b/src/keyexpr.rs @@ -44,7 +44,7 @@ pub extern "C" fn z_keyexpr_null(this: *mut MaybeUninit) { Inplace::empty(this.transmute_uninit_ptr()); } -fn keyexpr_create_inner(name: &mut str, should_auto_canonize: bool, should_copy: bool) -> Result, Box> { +fn keyexpr_create_inner(name: &'static mut str, should_auto_canonize: bool, should_copy: bool) -> Result, Box> { if should_copy { let s = name.to_owned(); match should_auto_canonize { @@ -69,7 +69,7 @@ unsafe fn keyexpr_create(name: &'static mut [u8], should_auto_canonize: bool, sh Ok(v) } Err(e) => { - log::error!("Couldn't construct a keyexpr from {:02x?}: {}", name, e); + log::error!("Couldn't construct a keyexpr: {}", e); Err(errors::Z_EINVAL) } } diff --git a/src/lib.rs b/src/lib.rs index ad25e1e28..6e6f5843e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,29 +40,28 @@ mod get; pub use crate::get::*; mod queryable; pub use crate::queryable::*; -// mod put; -// pub use crate::put::*; +mod put; +pub use crate::put::*; mod scouting; pub use crate::scouting::*; mod session; pub use crate::session::*; -// mod subscriber; -// pub use crate::subscriber::*; +mod subscriber; +pub use crate::subscriber::*; // // mod pull_subscriber; // // pub use crate::pull_subscriber::*; mod publisher; pub use crate::publisher::*; mod closures; pub use closures::*; -// mod liveliness; -// pub use liveliness::*; -// mod publication_cache; -// pub use publication_cache::*; -// // Disabled due to dependency on z_session_t. To be reworked as for autogeneration this dependency is cicrular. -// // mod querying_subscriber; -// // pub use querying_subscriber::*; -// pub use platform::*; -// pub mod platform; +mod liveliness; +pub use liveliness::*; +mod publication_cache; +pub use publication_cache::*; +mod querying_subscriber; +pub use querying_subscriber::*; +pub use platform::*; +pub mod platform; // #[cfg(feature = "shared-memory")] // mod shm; diff --git a/src/liveliness.rs b/src/liveliness.rs index 46ec7c8a5..ec50a7798 100644 --- a/src/liveliness.rs +++ b/src/liveliness.rs @@ -12,76 +12,46 @@ // ZettaScale Zenoh Team, // +use std::mem::MaybeUninit; +use zenoh::prelude::SyncResolve; use zenoh::{ - liveliness::{Liveliness, LivelinessToken}, - prelude::SessionDeclarations, + liveliness::{Liveliness, LivelinessToken}, prelude::SessionDeclarations }; +use crate::transmute::TransmuteIntoHandle; use crate::{ - opaque_types::z_sample_t, z_closure_reply_call, z_closure_sample_call, z_keyexpr_t, - z_owned_closure_reply_t, z_owned_closure_sample_t, z_owned_subscriber_t, z_session_t, + errors, transmute::{Inplace, TransmuteCopy, TransmuteFromHandle, TransmuteRef, TransmuteUninitPtr}, z_closure_reply_call, z_closure_sample_call, z_keyexpr_t, z_owned_closure_reply_t, z_owned_closure_sample_t, z_owned_subscriber_t, z_session_t }; -/// A liveliness token that can be used to provide the network with information about connectivity to its -/// declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key -/// expressions. -/// -/// A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. -#[repr(C)] -pub struct zc_owned_liveliness_token_t { - _inner: [usize; 4], -} +use crate::opaque_types::zc_owned_liveliness_token_t; +use crate::opaque_types::zc_liveliness_token_t; +decl_transmute_owned!(Option>, zc_owned_liveliness_token_t); +decl_transmute_handle!(LivelinessToken<'static>, zc_liveliness_token_t); /// The gravestone value for liveliness tokens. #[no_mangle] -pub extern "C" fn zc_liveliness_token_null() -> zc_owned_liveliness_token_t { - zc_owned_liveliness_token_t { _inner: [0; 4] } +pub extern "C" fn zc_liveliness_token_null(this: *mut MaybeUninit) { + let this = this.transmute_uninit_ptr(); + Inplace::empty(this); } /// Returns `true` unless the token is at its gravestone value. #[no_mangle] -pub extern "C" fn zc_liveliness_token_check(token: &zc_owned_liveliness_token_t) -> bool { - token._inner.iter().any(|v| *v != 0) +pub extern "C" fn zc_liveliness_token_check(this: &zc_owned_liveliness_token_t) -> bool { + this.transmute_ref().is_some() } /// The options for `zc_liveliness_declare_token` #[repr(C)] -pub struct zc_owned_liveliness_declaration_options_t { - _inner: u8, -} -/// The gravestone value for `zc_owned_liveliness_declaration_options_t` -#[no_mangle] -pub extern "C" fn zc_liveliness_declaration_options_null( -) -> zc_owned_liveliness_declaration_options_t { - zc_owned_liveliness_declaration_options_t { _inner: 0 } -} -/// Returns `true` if the options are valid. -#[no_mangle] -pub extern "C" fn zc_liveliness_declaration_options_check( - _opts: &zc_owned_liveliness_declaration_options_t, -) -> bool { - true +pub struct zc_liveliness_declaration_options_t { + _dummy: u8, } -/// Destroys the options. + #[no_mangle] -pub extern "C" fn zc_liveliness_declaration_options_drop( - opts: &mut zc_owned_liveliness_declaration_options_t, -) { - *opts = zc_liveliness_declaration_options_null() -} -impl From> for zc_owned_liveliness_token_t { - fn from(value: LivelinessToken<'static>) -> Self { - unsafe { core::mem::transmute(value) } - } -} -impl From for Option> { - fn from(value: zc_owned_liveliness_token_t) -> Self { - if value._inner.iter().all(|v| *v == 0) { - None - } else { - Some(unsafe { core::mem::transmute(value) }) - } - } +pub extern "C" fn zc_liveliness_declaration_options_default( +) -> zc_liveliness_declaration_options_t { + zc_liveliness_declaration_options_t { _dummy: 0 } } + /// Constructs and declares a liveliness token on the network. /// /// Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity @@ -90,67 +60,57 @@ impl From for Option> { /// Passing `NULL` as options is valid and equivalent to a pointer to the default options. #[no_mangle] pub extern "C" fn zc_liveliness_declare_token( + this: *mut MaybeUninit, session: z_session_t, - key: z_keyexpr_t, - _options: Option<&zc_owned_liveliness_declaration_options_t>, -) -> zc_owned_liveliness_token_t { - let Some(session) = session.upgrade() else { - log::error!("Failed to declare liveliness token: provided session was invalid"); - return zc_liveliness_token_null(); - }; - match session.liveliness().declare_token(key).res() { - Ok(token) => unsafe { core::mem::transmute(token) }, + key_expr: z_keyexpr_t, + _options: zc_liveliness_declaration_options_t, +) -> errors::ZCError { + let this = this.transmute_uninit_ptr(); + let session = session.transmute_copy(); + let key_expr = key_expr.transmute_ref(); + match session.liveliness().declare_token(key_expr).res() { + Ok(token) => { + Inplace::init(this, Some(token)); + errors::Z_OK + }, Err(e) => { - log::error!("Failed to declare liveliness token: {e}"); - zc_liveliness_token_null() + log::error!("Failed to undeclare token: {e}"); + Inplace::empty(this); + errors::Z_EGENERIC } } } /// Destroys a liveliness token, notifying subscribers of its destruction. #[no_mangle] -pub extern "C" fn zc_liveliness_undeclare_token(token: &mut zc_owned_liveliness_token_t) { - let Some(token): Option = - core::mem::replace(token, zc_liveliness_token_null()).into() - else { - return; - }; - if let Err(e) = token.undeclare().res() { - log::error!("Failed to undeclare token: {e}"); +pub extern "C" fn zc_liveliness_undeclare_token(this: &mut zc_owned_liveliness_token_t) -> errors::ZCError { + let this = this.transmute_mut(); + if let Some(token) = this.extract().take() { + if let Err(e) = token.undeclare().res() { + log::error!("Failed to undeclare token: {e}"); + return errors::Z_EGENERIC; + } } + errors::Z_OK } /// The options for :c:func:`zc_liveliness_declare_subscriber` #[repr(C)] -pub struct zc_owned_liveliness_declare_subscriber_options_t { - _inner: u8, +pub struct zc_liveliness_declare_subscriber_options_t { + _dummy: u8, } -/// The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t` -#[no_mangle] -pub extern "C" fn zc_liveliness_subscriber_options_null( -) -> zc_owned_liveliness_declare_subscriber_options_t { - zc_owned_liveliness_declare_subscriber_options_t { _inner: 0 } -} -/// Returns `true` if the options are valid. -#[no_mangle] -pub extern "C" fn zc_liveliness_subscriber_options_check( - _opts: &zc_owned_liveliness_declare_subscriber_options_t, -) -> bool { - true -} -/// Destroys the options. + #[no_mangle] -pub extern "C" fn zc_liveliness_subscriber_options_drop( - opts: &mut zc_owned_liveliness_declare_subscriber_options_t, -) { - *opts = zc_liveliness_subscriber_options_null() +pub extern "C" fn zc_liveliness_subscriber_options_default( +) -> zc_liveliness_declare_subscriber_options_t { + zc_liveliness_declare_subscriber_options_t { _dummy: 0 } } /// Declares a subscriber on liveliness tokens that intersect `key`. /// /// Parameters: /// z_session_t session: The zenoh session. -/// z_keyexpr_t keyexpr: The key expression to subscribe. +/// z_keyexpr_t key_expr: The key expression to subscribe. /// z_owned_closure_sample_t callback: The callback function that will be called each time a /// liveliness token status changed. /// zc_owned_liveliness_declare_subscriber_options_t _options: The options to be passed to describe the options to be passed to the liveliness subscriber declaration. @@ -162,29 +122,33 @@ pub extern "C" fn zc_liveliness_subscriber_options_drop( /// you may use `z_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. #[no_mangle] pub extern "C" fn zc_liveliness_declare_subscriber( + this: *mut MaybeUninit, session: z_session_t, - key: z_keyexpr_t, + key_expr: z_keyexpr_t, callback: &mut z_owned_closure_sample_t, - _options: Option<&zc_owned_liveliness_declare_subscriber_options_t>, -) -> z_owned_subscriber_t { - let Some(session) = session.upgrade() else { - log::error!("Failed to declare liveliness token: provided session was invalid"); - return z_owned_subscriber_t::null(); - }; + _options: zc_liveliness_declare_subscriber_options_t, +) -> errors::ZCError { + let this = this.transmute_uninit_ptr(); + let session = session.transmute_copy(); let callback = core::mem::replace(callback, z_owned_closure_sample_t::empty()); + let key_expr = key_expr.transmute_ref(); match session .liveliness() - .declare_subscriber(key) + .declare_subscriber(key_expr) .callback(move |sample| { - let sample = z_sample_t::new(&sample); - z_closure_sample_call(&callback, &sample) + let sample = sample.transmute_handle(); + z_closure_sample_call(&callback, sample) }) .res() { - Ok(token) => z_owned_subscriber_t::new(token), + Ok(subscriber) => { + Inplace::init(this, Some(subscriber)); + errors::Z_OK + } Err(e) => { log::error!("Failed to subscribe to liveliness: {e}"); - z_owned_subscriber_t::null() + Inplace::empty(this); + errors::Z_EGENERIC } } } @@ -194,26 +158,12 @@ pub extern "C" fn zc_liveliness_declare_subscriber( pub struct zc_liveliness_get_options_t { timeout_ms: u32, } -/// The gravestone value for `zc_liveliness_get_options_t` -#[no_mangle] -pub extern "C" fn zc_liveliness_get_options_null() -> zc_liveliness_get_options_t { - zc_liveliness_get_options_t { timeout_ms: 0 } -} + /// The gravestone value for `zc_liveliness_get_options_t` #[no_mangle] pub extern "C" fn zc_liveliness_get_options_default() -> zc_liveliness_get_options_t { zc_liveliness_get_options_t { timeout_ms: 10000 } } -/// Returns `true` if the options are valid. -#[no_mangle] -pub extern "C" fn zc_liveliness_get_options_check(_opts: &zc_liveliness_get_options_t) -> bool { - true -} -/// Destroys the options. -#[no_mangle] -pub extern "C" fn zc_liveliness_get_options_drop(opts: &mut zc_liveliness_get_options_t) { - *opts = zc_liveliness_get_options_null() -} /// Queries liveliness tokens currently on the network with a key expression intersecting with `key`. /// @@ -223,27 +173,23 @@ pub extern "C" fn zc_liveliness_get_options_drop(opts: &mut zc_liveliness_get_op #[no_mangle] pub extern "C" fn zc_liveliness_get( session: z_session_t, - key: z_keyexpr_t, + key_expr: z_keyexpr_t, callback: &mut z_owned_closure_reply_t, - options: Option<&zc_liveliness_get_options_t>, -) -> i8 { - let Some(session) = session.upgrade() else { - log::error!("Failed to declare liveliness token: provided session was invalid"); - return i8::MIN; - }; + options: zc_liveliness_get_options_t, +) -> errors::ZCError { + let session = session.transmute_copy(); + let key_expr = key_expr.transmute_ref(); let callback = core::mem::replace(callback, z_owned_closure_reply_t::empty()); let liveliness: Liveliness<'static> = session.liveliness(); let mut builder = liveliness - .get(key) - .callback(move |response| z_closure_reply_call(&callback, &mut response.into())); - if let Some(options) = options { - builder = builder.timeout(core::time::Duration::from_millis(options.timeout_ms as u64)) - } + .get(key_expr) + .callback(move |response| z_closure_reply_call(&callback, response.transmute_handle())); + builder = builder.timeout(core::time::Duration::from_millis(options.timeout_ms as u64)); match builder.res() { - Ok(()) => 0, + Ok(()) => errors::Z_OK, Err(e) => { log::error!("Failed to subscribe to liveliness: {e}"); - e.errno().get() + errors::Z_EGENERIC } } } diff --git a/src/payload.rs b/src/payload.rs index 4ed3feb3e..2dd56c7d9 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -5,7 +5,6 @@ use crate::transmute::{ use crate::{z_owned_slice_map_t, z_owned_slice_t, z_owned_str_t, z_slice_map_t, z_slice_t, ZHashMap}; use core::slice; use std::any::Any; -use std::borrow::Cow; use std::io::{Read, Seek, SeekFrom}; use std::mem::MaybeUninit; use std::slice::from_raw_parts_mut; @@ -87,7 +86,11 @@ pub unsafe extern "C" fn z_bytes_decode_into_bytes_map( ) -> ZCError { let dst = dst.transmute_uninit_ptr(); let payload = payload.transmute_ref(); - let hm = ZHashMap::from_iter(payload.iter::<(Cow<'static, [u8]>, Cow<'static, [u8]>)>()); + let iter = payload.iter::<(Vec, Vec)>(); + let mut hm = ZHashMap::new(); + for (k, v) in iter { + hm.insert(k.into(), v.into()); + } Inplace::init(dst, Some(hm)); errors::Z_OK } diff --git a/src/platform/synchronization.rs b/src/platform/synchronization.rs index a16664352..fa6523128 100644 --- a/src/platform/synchronization.rs +++ b/src/platform/synchronization.rs @@ -1,240 +1,186 @@ use std::{ - sync::{Condvar, Mutex, MutexGuard}, - thread::{self, JoinHandle}, + mem::MaybeUninit, sync::{Condvar, Mutex, MutexGuard}, thread::{self, JoinHandle} }; use libc::c_void; -use crate::{impl_guarded_transmute, GuardedTransmute}; +use crate::{errors, transmute::{unwrap_ref_unchecked, Inplace, TransmuteFromHandle, TransmuteIntoHandle, TransmuteRef, TransmuteUninitPtr}}; +pub use crate::opaque_types::z_owned_mutex_t; +pub use crate::opaque_types::z_mutex_t; -pub struct ZMutex<'a> { - mutex: Mutex<()>, - lock: Option>, -} - -pub struct ZMutexPtr { - data: Option>>, -} - -/// Mutex -/// -#[repr(C)] -#[derive(Clone, Copy)] -pub struct z_mutex_t(usize); +decl_transmute_owned!(Option<(Mutex<()>, Option>)>, z_owned_mutex_t); +decl_transmute_handle!((Mutex<()>, Option>), z_mutex_t); -impl_guarded_transmute!(noderefs z_mutex_t, ZMutexPtr); -impl_guarded_transmute!(noderefs ZMutexPtr, z_mutex_t); -// using the same error codes as in GNU pthreads, but with negative sign -// due to convention to return negative values on error -const EBUSY: i8 = -16; -const EINVAL: i8 = -22; -const EAGAIN: i8 = -11; -const EPOISON: i8 = -22; // same as EINVAL +#[no_mangle] +pub extern "C" fn z_mutex_init(this: *mut MaybeUninit) -> errors::ZCError { + let this = this.transmute_uninit_ptr(); + let m = (Mutex::<()>::new(()), None::>); + Inplace::init(this, Some(m)); + errors::Z_OK +} #[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_mutex_init(m: *mut z_mutex_t) -> i8 { - if m.is_null() { - return EINVAL; - } - let t = ZMutexPtr { - data: Some(Box::new(ZMutex { - mutex: Mutex::new(()), - lock: None, - })), - }; - *m = t.transmute(); - 0 +pub extern "C" fn z_mutex_drop(this: &mut z_owned_mutex_t) { + let _ = this.transmute_mut().extract().take(); } #[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_mutex_free(m: *mut z_mutex_t) -> i8 { - if m.is_null() { - return EINVAL; - } - let mut t = (*m).transmute(); +pub extern "C" fn z_mutex_check(this: &z_owned_mutex_t) -> bool { + this.transmute_ref().is_some() +} - t.data.take(); - *m = t.transmute(); - 0 +#[no_mangle] +pub extern "C" fn z_mutex_loan(this: &z_owned_mutex_t) -> z_mutex_t { + let this = this.transmute_ref(); + let this = unwrap_ref_unchecked(this); + this.transmute_handle() } + #[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_mutex_lock(m: *mut z_mutex_t) -> i8 { - if m.is_null() { - return EINVAL; - } - let mut t = (*m).transmute(); - if t.data.is_none() { - return EINVAL; - } - let mut_data = t.data.as_mut().unwrap(); - match mut_data.mutex.lock() { +pub extern "C" fn z_mutex_lock(this: z_mutex_t) -> errors::ZCError { + let this = this.transmute_mut(); + + match this.0.lock() { Ok(new_lock) => { - let old_lock = mut_data.lock.replace(std::mem::transmute(new_lock)); + let old_lock = this.1.replace(new_lock); std::mem::forget(old_lock); } Err(_) => { - return EPOISON; + return errors::Z_EPOISON_MUTEX; } } - - *m = t.transmute(); - 0 + errors::Z_OK } #[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_mutex_unlock(m: *mut z_mutex_t) -> i8 { - if m.is_null() { - return EINVAL; - } - let mut t = (*m).transmute(); - if t.data.is_none() { - return EINVAL; - } - let mut_data = t.data.as_mut().unwrap(); - if mut_data.lock.is_none() { - return EINVAL; +pub extern "C" fn z_mutex_unlock(this: z_mutex_t) -> errors::ZCError { + let this = this.transmute_mut(); + if this.1.is_none() { + return errors::Z_EINVAL_MUTEX; } else { - mut_data.lock.take(); + this.1.take(); } - *m = t.transmute(); - 0 + errors::Z_OK } #[no_mangle] #[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_mutex_try_lock(m: *mut z_mutex_t) -> i8 { - if m.is_null() { - return EINVAL; - } - let mut t = (*m).transmute(); - if t.data.is_none() { - return EINVAL; - } - let mut_data = t.data.as_mut().unwrap(); - let mut ret: i8 = 0; - match mut_data.mutex.try_lock() { +pub unsafe extern "C" fn z_mutex_try_lock(this: z_mutex_t) -> errors::ZCError { + let this = this.transmute_mut(); + match this.0.try_lock() { Ok(new_lock) => { - let old_lock = mut_data.lock.replace(std::mem::transmute(new_lock)); + let old_lock = this.1.replace(new_lock); std::mem::forget(old_lock); } Err(_) => { - ret = EBUSY; + return errors::Z_EBUSY_MUTEX; } } - *m = t.transmute(); - ret + errors::Z_OK } -struct ZCondvarPtr { - data: Option>, + +pub use crate::opaque_types::z_owned_condvar_t; +pub use crate::opaque_types::z_condvar_t; + +decl_transmute_owned!(Option, z_owned_condvar_t); +decl_transmute_handle!(Condvar, z_condvar_t); + +#[no_mangle] +pub extern "C" fn z_condvar_init(this: *mut MaybeUninit) { + let this = this.transmute_uninit_ptr(); + Inplace::init(this, Some(Condvar::new())); } -/// Condvar -/// -#[repr(C)] -#[derive(Clone, Copy)] -pub struct z_condvar_t(usize); +#[no_mangle] +pub extern "C" fn z_condvar_null(this: *mut MaybeUninit) { + let this = this.transmute_uninit_ptr(); + Inplace::empty(this); +} -impl_guarded_transmute!(noderefs z_condvar_t, ZCondvarPtr); -impl_guarded_transmute!(noderefs ZCondvarPtr, z_condvar_t); +#[no_mangle] +pub extern "C" fn z_condvar_drop(this: &mut z_owned_condvar_t) { + let _ = this.transmute_mut().extract().take(); +} #[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_condvar_init(cv: *mut z_condvar_t) -> i8 { - if cv.is_null() { - return EINVAL; - } - let t: ZCondvarPtr = ZCondvarPtr { - data: Some(Box::new(Condvar::new())), - }; - *cv = t.transmute(); - 0 +pub extern "C" fn z_condvar_check(this: &z_owned_condvar_t) -> bool { + this.transmute_ref().is_some() } #[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_condvar_free(cv: *mut z_condvar_t) -> i8 { - if cv.is_null() { - return EINVAL; - } - let mut t = (*cv).transmute(); - if t.data.is_none() { - return EINVAL; - } - t.data.take(); - *cv = t.transmute(); - 0 +pub extern "C" fn z_condvar_loan(this: &z_owned_condvar_t) -> z_condvar_t { + let this = this.transmute_ref(); + let this = unwrap_ref_unchecked(this); + this.transmute_handle() } #[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_condvar_signal(cv: *mut z_condvar_t) -> i8 { - if cv.is_null() { - return EINVAL; - } - let t = (*cv).transmute(); - if t.data.is_none() { - return EINVAL; - } - t.data.as_ref().unwrap().notify_one(); - *cv = t.transmute(); - 0 +pub extern "C" fn z_condvar_signal(this: z_condvar_t) -> errors::ZCError { + let this = this.transmute_mut(); + this.notify_one(); + errors::Z_OK } #[no_mangle] #[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_condvar_wait(cv: *mut z_condvar_t, m: *mut z_mutex_t) -> i8 { - if cv.is_null() { - return EINVAL; +pub unsafe extern "C" fn z_condvar_wait(this: z_condvar_t, m: z_mutex_t) -> errors::ZCError { + let this = this.transmute_mut(); + let m = m.transmute_mut(); + if m.1.is_none() { + return errors::Z_EINVAL_MUTEX; // lock was not aquired prior to wait call } - let tcv = (*cv).transmute(); - if tcv.data.is_none() { - return EINVAL; - } - if m.is_null() { - return EINVAL; - } - let mut tm = (*m).transmute(); - if tm.data.is_none() || tm.data.as_ref().unwrap().lock.is_none() { - return EINVAL; - } - let mut_data = tm.data.as_mut().unwrap(); - let lock = mut_data.lock.take().unwrap(); - match tcv.data.as_ref().unwrap().wait(lock) { - Ok(new_lock) => mut_data.lock = Some(std::mem::transmute(new_lock)), - Err(_) => return EPOISON, + + let lock = m.1.take().unwrap(); + match this.wait(lock) { + Ok(new_lock) => m.1 = Some(new_lock), + Err(_) => return errors::Z_EPOISON_MUTEX, } - *cv = tcv.transmute(); - *m = tm.transmute(); - 0 -} -struct ZTask { - join_handle: JoinHandle<()>, + errors::Z_OK } -struct ZTaskPtr { - data: Option>, -} +pub use crate::opaque_types::z_owned_task_t; -/// Task -/// -#[repr(C)] -#[derive(Clone, Copy)] -pub struct z_task_t(usize); +decl_transmute_owned!(Option>, z_owned_task_t); #[repr(C)] #[derive(Clone, Copy)] pub struct z_task_attr_t(usize); -impl_guarded_transmute!(noderefs z_task_t, ZTaskPtr); -impl_guarded_transmute!(noderefs ZTaskPtr, z_task_t); +#[no_mangle] +pub extern "C" fn z_task_null(this: *mut MaybeUninit) { + let this = this.transmute_uninit_ptr(); + Inplace::empty(this); +} + +/// Detaches the task and releases all allocated resources. +#[no_mangle] +pub extern "C" fn z_task_detach(this: &mut z_owned_task_t) { + let _ = this.transmute_mut().extract().take(); +} + +/// Joins the task and releases all allocated resources +#[no_mangle] +pub extern "C" fn z_task_join(this: &mut z_owned_task_t) -> errors::ZCError { + let this = this.transmute_mut().extract().take(); + if let Some(task) = this { + match task.join() { + Ok(_) => errors::Z_OK, + Err(_) => errors::Z_EINVAL_MUTEX, + } + } else { + errors::Z_OK + } +} + +#[no_mangle] +pub extern "C" fn z_task_check(this: &z_owned_task_t) -> bool { + this.transmute_ref().is_some() +} + struct FunArgPair { fun: unsafe extern "C" fn(arg: *mut c_void), @@ -252,42 +198,19 @@ unsafe impl Send for FunArgPair {} #[no_mangle] #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn z_task_init( - task: *mut z_task_t, + this: *mut MaybeUninit, _attr: *const z_task_attr_t, fun: unsafe extern "C" fn(arg: *mut c_void), arg: *mut c_void, -) -> i8 { - if task.is_null() { - return EINVAL; - } - - let mut ttask = ZTaskPtr { data: None }; +) -> errors::ZCError { + let this = this.transmute_uninit_ptr(); let fun_arg_pair = FunArgPair { fun, arg }; - let mut ret = 0; match thread::Builder::new().spawn(move || fun_arg_pair.call()) { - Ok(join_handle) => ttask.data = Some(Box::new(ZTask { join_handle })), - Err(_) => ret = EAGAIN, + Ok(join_handle) => { + Inplace::init(this, Some(join_handle)); + }, + Err(_) => return errors::Z_EAGAIN_MUTEX, } - *task = ttask.transmute(); - ret -} - -#[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn z_task_join(task: *mut z_task_t) -> i8 { - if task.is_null() { - return EINVAL; - } - let mut ttask = (*task).transmute(); - if ttask.data.is_none() { - return EINVAL; - } - let data = ttask.data.take(); - let ret = match data.unwrap().join_handle.join() { - Ok(_) => 0, - Err(_) => EINVAL, - }; - *task = ttask.transmute(); - ret -} + errors::Z_OK +} \ No newline at end of file diff --git a/src/publication_cache.rs b/src/publication_cache.rs index 50e1dad47..dadc91cc3 100644 --- a/src/publication_cache.rs +++ b/src/publication_cache.rs @@ -12,13 +12,15 @@ // ZettaScale Zenoh team, // -use std::ops::Deref; +use std::mem::MaybeUninit; +use zenoh::prelude::SyncResolve; +use std::ptr::null; use zenoh_ext::SessionExt; +use crate::transmute::{Inplace, TransmuteCopy, TransmuteFromHandle, TransmuteRef, TransmuteUninitPtr}; use crate::{ - impl_guarded_transmute, z_keyexpr_t, z_session_t, zcu_locality_default, zcu_locality_t, - UninitializedKeyExprError, + errors, z_keyexpr_t, z_session_t, zcu_locality_default, zcu_locality_t }; /// Options passed to the :c:func:`ze_declare_publication_cache` function. @@ -32,7 +34,7 @@ use crate::{ /// size_t resources_limit: The limit number of cached resources #[repr(C)] pub struct ze_publication_cache_options_t { - pub queryable_prefix: z_keyexpr_t, + pub queryable_prefix: *const z_keyexpr_t, pub queryable_origin: zcu_locality_t, pub queryable_complete: bool, pub history: usize, @@ -43,7 +45,7 @@ pub struct ze_publication_cache_options_t { #[no_mangle] pub extern "C" fn ze_publication_cache_options_default() -> ze_publication_cache_options_t { ze_publication_cache_options_t { - queryable_prefix: z_keyexpr_t::null(), + queryable_prefix: null(), queryable_origin: zcu_locality_default(), queryable_complete: false, history: 1, @@ -51,36 +53,16 @@ pub extern "C" fn ze_publication_cache_options_default() -> ze_publication_cache } } -/// An owned zenoh publication_cache. -/// -/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. -/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. -/// -/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. -/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. -/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. -/// -/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. -#[repr(C)] -pub struct ze_owned_publication_cache_t([usize; 1]); - -type PublicationCache = Option>>; -impl_guarded_transmute!(PublicationCache, ze_owned_publication_cache_t); - -impl ze_owned_publication_cache_t { - pub fn new(pub_cache: zenoh_ext::PublicationCache<'static>) -> Self { - Some(Box::new(pub_cache)).into() - } - pub fn null() -> Self { - None.into() - } -} +pub use crate::opaque_types::ze_owned_publication_cache_t; +pub use crate::opaque_types::ze_publication_cache_t; +decl_transmute_owned!(Option>, ze_owned_publication_cache_t); +decl_transmute_handle!(zenoh_ext::PublicationCache<'static>, ze_publication_cache_t); /// Declares a Publication Cache. /// /// Parameters: /// z_session_t session: The zenoh session. -/// z_keyexpr_t keyexpr: The key expression to publish. +/// z_keyexpr_t key_expr: The key expression to publish. /// ze_publication_cache_options_t options: Additional options for the publication_cache. /// /// Returns: @@ -103,74 +85,64 @@ impl ze_owned_publication_cache_t { #[no_mangle] #[allow(clippy::missing_safety_doc)] pub extern "C" fn ze_declare_publication_cache( + this: *mut MaybeUninit, session: z_session_t, - keyexpr: z_keyexpr_t, - options: Option<&ze_publication_cache_options_t>, -) -> ze_owned_publication_cache_t { - match session.upgrade() { - Some(s) => { - let keyexpr = keyexpr.deref().as_ref().map(|s| s.clone().into_owned()); - if let Some(key_expr) = keyexpr { - let mut p = s.declare_publication_cache(key_expr); - if let Some(options) = options { - p = p.history(options.history); - p = p.queryable_allowed_origin(options.queryable_origin.into()); - p = p.queryable_complete(options.queryable_complete); - if options.resources_limit != 0 { - p = p.resources_limit(options.resources_limit) - } - if options.queryable_prefix.deref().is_some() { - let queryable_prefix = options - .queryable_prefix - .deref() - .as_ref() - .map(|s| s.clone().into_owned()); - if let Some(queryable_prefix) = queryable_prefix { - p = p.queryable_prefix(queryable_prefix) - } - } - } - match p.res_sync() { - Ok(publication_cache) => ze_owned_publication_cache_t::new(publication_cache), - Err(e) => { - log::error!("{}", e); - ze_owned_publication_cache_t::null() - } - } - } else { - log::error!("{}", UninitializedKeyExprError); - ze_owned_publication_cache_t::null() - } + key_expr: z_keyexpr_t, + options: ze_publication_cache_options_t, +) -> errors::ZCError { + let this = this.transmute_uninit_ptr(); + let session = session.transmute_copy(); + let key_expr = key_expr.transmute_ref(); + let mut p = session.declare_publication_cache(key_expr); + p = p.history(options.history); + p = p.queryable_allowed_origin(options.queryable_origin.into()); + p = p.queryable_complete(options.queryable_complete); + if options.resources_limit != 0 { + p = p.resources_limit(options.resources_limit) + } + if !options.queryable_prefix.is_null() { + let queryable_prefix = unsafe { *options.queryable_prefix }.transmute_ref(); + p = p.queryable_prefix(queryable_prefix.clone()); + } + match p.res_sync() { + Ok(publication_cache) => { + Inplace::init(this, Some(publication_cache)); + errors::Z_OK + } + Err(e) => { + log::error!("{}", e); + Inplace::empty(this); + errors::Z_EGENERIC } - None => ze_owned_publication_cache_t::null(), } } /// Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type #[no_mangle] #[allow(clippy::missing_safety_doc)] -pub extern "C" fn ze_publication_cache_null() -> ze_owned_publication_cache_t { - ze_owned_publication_cache_t::null() +pub extern "C" fn ze_publication_cache_null(this: *mut MaybeUninit) { + let this = this.transmute_uninit_ptr(); + Inplace::empty(this); } /// Returns ``true`` if `pub_cache` is valid. #[no_mangle] #[allow(clippy::missing_safety_doc)] -pub extern "C" fn ze_publication_cache_check(pub_cache: &ze_owned_publication_cache_t) -> bool { - pub_cache.as_ref().is_some() +pub extern "C" fn ze_publication_cache_check(this: &ze_owned_publication_cache_t) -> bool { + this.transmute_ref().is_some() } /// Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. #[no_mangle] #[allow(clippy::missing_safety_doc)] pub extern "C" fn ze_undeclare_publication_cache( - pub_cache: &mut ze_owned_publication_cache_t, -) -> i8 { - if let Some(p) = pub_cache.take() { + this: &mut ze_owned_publication_cache_t, +) -> errors::ZCError { + if let Some(p) = this.transmute_mut().extract().take() { if let Err(e) = p.close().res_sync() { log::error!("{}", e); - return e.errno().get(); + return errors::Z_EGENERIC; } } - 0 + errors::Z_OK } diff --git a/src/publisher.rs b/src/publisher.rs index 833798959..2ebf4c410 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -304,7 +304,7 @@ pub extern "C" fn zcu_publisher_matching_listener_callback( }) .res(); match listener { - Ok(l) => { + Ok(_) => { Inplace::empty(this); errors::Z_OK }, diff --git a/src/put.rs b/src/put.rs index c2d1bddd2..5457dd522 100644 --- a/src/put.rs +++ b/src/put.rs @@ -1,3 +1,5 @@ +use std::ptr::null_mut; + // // Copyright (c) 2017, 2022 ZettaScale Technology. // @@ -12,24 +14,25 @@ // ZettaScale Zenoh team, // use crate::commons::*; +use crate::errors; use crate::keyexpr::*; use crate::session::*; +use crate::transmute::Inplace; +use crate::transmute::TransmuteCopy; +use crate::transmute::TransmuteFromHandle; +use crate::transmute::TransmuteRef; use crate::z_owned_bytes_t; +use crate::z_session_t; use crate::LOG_INVALID_SESSION; use libc::c_void; use zenoh::encoding; +use zenoh::key_expr; use zenoh::prelude::{sync::SyncResolve, Priority, SampleKind}; use zenoh::publication::CongestionControl; -use zenoh::sample::AttachmentBuilder; use zenoh::sample::QoSBuilderTrait; use zenoh::sample::SampleBuilderTrait; use zenoh::sample::ValueBuilderTrait; -use crate::attachment::{ - insert_in_attachment_builder, z_attachment_check, z_attachment_iterate, z_attachment_null, - z_bytes_t, -}; - /// Options passed to the :c:func:`z_put` function. /// /// Members: @@ -40,10 +43,10 @@ use crate::attachment::{ #[repr(C)] #[allow(non_camel_case_types)] pub struct z_put_options_t { - pub encoding: z_encoding_t, + pub encoding: *mut z_owned_encoding_t, pub congestion_control: z_congestion_control_t, pub priority: z_priority_t, - pub attachment:z_bytes_t, + pub attachment: *mut z_owned_bytes_t, } /// Constructs the default value for :c:type:`z_put_options_t`. @@ -51,71 +54,60 @@ pub struct z_put_options_t { #[allow(clippy::missing_safety_doc)] pub extern "C" fn z_put_options_default() -> z_put_options_t { z_put_options_t { - encoding: z_encoding_default(), + encoding: null_mut(), congestion_control: CongestionControl::default().into(), priority: Priority::default().into(), - attachment: z_attachment_null(), + attachment: null_mut(), } } -/// Put data, transfering the buffer ownership. +/// Put data, transfering its ownership. /// -/// This is avoids copies when transfering data that was either: -/// - `zc_sample_payload_rcinc`'d from a sample, when forwarding samples from a subscriber/query to a publisher -/// - constructed from a `zc_owned_shmbuf_t` /// -/// The payload's encoding can be sepcified through the options. +/// The payload's encoding and attachment can be sepcified through the options. These values are consumed upon function +/// return. /// /// Parameters: /// session: The zenoh session. -/// keyexpr: The key expression to put. -/// payload: The value to put. +/// key_expr: The key expression to put. +/// payload: The value to put (consumed upon function return). /// options: The put options. /// Returns: -/// ``0`` in case of success, negative values in case of failure. +/// ``0`` in case of success, negative error values in case of failure. #[no_mangle] #[allow(clippy::missing_safety_doc)] pub extern "C" fn z_put( session: z_session_t, - keyexpr: z_keyexpr_t, - payload: Option<&mut z_owned_bytes_t>, - opts: Option<&z_put_options_t>, -) -> i8 { - match session.upgrade() { - Some(s) => { - if let Some(payload) = payload.and_then(|p| p.take()) { - let mut res = s.put(keyexpr, payload); - if let Some(opts) = opts { - res = res - .encoding(**opts.encoding) - .congestion_control(opts.congestion_control.into()) - .priority(opts.priority.into()); - if z_attachment_check(&opts.attachment) { - let mut attachment_builder = AttachmentBuilder::new(); - z_attachment_iterate( - opts.attachment, - insert_in_attachment_builder, - &mut attachment_builder as *mut AttachmentBuilder as *mut c_void, - ); - res = res.attachment(attachment_builder.build()); - }; - } - match res.res_sync() { - Err(e) => { - log::error!("{}", e); - e.errno().get() - } - Ok(()) => 0, - } - } else { - log::debug!("z_bytes_null was provided as payload for put"); - i8::MIN - } - } + key_expr: z_keyexpr_t, + payload: &mut z_owned_bytes_t, + options: z_put_options_t, +) -> errors::ZCError { + let session = session.transmute_copy(); + let key_expr = key_expr.transmute_ref(); + let payload = match payload.transmute_mut().extract() { + Some(p) => p, None => { - log::debug!("{}", LOG_INVALID_SESSION); - i8::MIN + log::debug!("Attempted to put with a null payload"); + return errors::Z_EINVAL; } + }; + + let mut put = session.put(key_expr, payload); + + if !options.encoding.is_null() { + let encoding = unsafe{ *options.encoding }.transmute_mut().extract(); + put = put.encoding(encoding); + }; + if !options.attachment.is_null() { + let attachment = unsafe { *options.attachment }.transmute_mut().extract(); + put = put.attachment(attachment); + } + + if let Err(e) = put.res_sync() { + log::error!("{}", e); + errors::Z_EGENERIC + } else { + errors::Z_OK } } @@ -141,7 +133,7 @@ pub unsafe extern "C" fn z_delete_options_default() -> z_delete_options_t { /// /// Parameters: /// session: The zenoh session. -/// keyexpr: The key expression to delete. +/// key_expr: The key expression to delete. /// options: The put options. /// Returns: /// ``0`` in case of success, negative values in case of failure. @@ -149,28 +141,21 @@ pub unsafe extern "C" fn z_delete_options_default() -> z_delete_options_t { #[allow(clippy::missing_safety_doc)] pub extern "C" fn z_delete( session: z_session_t, - keyexpr: z_keyexpr_t, - opts: Option<&z_delete_options_t>, -) -> i8 { - match session.upgrade() { - Some(s) => { - let mut res = s.delete(keyexpr); - if let Some(opts) = opts { - res = res - .congestion_control(opts.congestion_control.into()) - .priority(opts.priority.into()); - } - match res.res_sync() { - Err(e) => { - log::error!("{}", e); - e.errno().get() - } - Ok(()) => 0, - } - } - None => { - log::debug!("{}", LOG_INVALID_SESSION); - i8::MIN + key_expr: z_keyexpr_t, + opts: z_delete_options_t, +) -> errors::ZCError { + let session = session.transmute_copy(); + let key_expr = key_expr.transmute_ref(); + let del + = session.delete(key_expr) + .congestion_control(opts.congestion_control.into()) + .priority(opts.priority.into()); + + match del.res_sync() { + Err(e) => { + log::error!("{}", e); + errors::Z_EGENERIC } + Ok(()) => errors::Z_OK, } } diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs index 92e797e2e..4ab406708 100644 --- a/src/querying_subscriber.rs +++ b/src/querying_subscriber.rs @@ -13,45 +13,37 @@ // use std::mem::MaybeUninit; +use std::ptr::null; use zenoh::prelude::sync::SyncResolve; use zenoh::prelude::KeyExpr; use zenoh::prelude::SessionDeclarations; +use zenoh::session; +use zenoh::session::Session; +use zenoh::subscriber::Reliability; use zenoh_ext::*; -use crate::z_closure_owned_query_call; -use crate::ze_owned_publication_cache_t; -use crate::{ - impl_guarded_transmute, opaque_types::z_sample_t, z_closure_sample_call, z_get_options_t, - z_keyexpr_t, z_owned_closure_sample_t, z_query_consolidation_none, z_query_consolidation_t, +use crate::errors; +use crate::transmute::unwrap_ref_unchecked; +use crate::transmute::Inplace; +use crate::transmute::TransmuteCopy; +use crate::transmute::TransmuteFromHandle; +use crate::transmute::TransmuteIntoHandle; +use crate::transmute::TransmuteRef; +use crate::transmute::TransmuteUninitPtr; +use crate::z_keyexpr_t; +use crate::z_owned_closure_sample_t; +use crate::z_reliability_t; +use crate::{z_closure_sample_call, z_get_options_t, + z_query_consolidation_none, z_query_consolidation_t, z_query_target_default, z_query_target_t, z_session_t, zcu_locality_default, zcu_locality_t, - zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, LOG_INVALID_SESSION, + zcu_reply_keyexpr_default, zcu_reply_keyexpr_t }; -pub struct FetchingSubscriberWrapper { - fetching_subscriber: zenoh_ext::FetchingSubscriber<'static, ()>, - session: z_session_t, -} -//type FetchingSubscriber = Option>>; - -/// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. -/// -/// Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. -/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. -/// -/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. -/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. -/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. -/// -/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. use crate::opaque_types::ze_owned_querying_subscriber_t; -decl_transmute_owned!(default_inplace_init Option>, z_owned_querying_subscriber_t); - use crate::opaque_types::ze_querying_subscriber_t; -decl_transmute_copy!( - &'static Option>, - z_querying_subscriber_t -); +decl_transmute_owned!(Option<(zenoh_ext::FetchingSubscriber<'static, ()>, &'static Session)>, ze_owned_querying_subscriber_t); +decl_transmute_handle!((zenoh_ext::FetchingSubscriber<'static, ()>, &'static Session), ze_querying_subscriber_t); /// Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type #[no_mangle] @@ -59,7 +51,7 @@ decl_transmute_copy!( pub extern "C" fn ze_querying_subscriber_null( this: *mut MaybeUninit, ) { - let this = ze_owned_querying_subscriber_t::transmute_uninit_ptr(this); + let this = this.transmute_uninit_ptr(); Inplace::empty(this); } @@ -78,9 +70,9 @@ pub extern "C" fn ze_querying_subscriber_null( #[repr(C)] #[allow(non_camel_case_types)] pub struct ze_querying_subscriber_options_t { - // reliability: z_reliability_t, + reliability: z_reliability_t, allowed_origin: zcu_locality_t, - query_selector: z_keyexpr_t, + query_selector: *const z_keyexpr_t, query_target: z_query_target_t, query_consolidation: z_query_consolidation_t, query_accept_replies: zcu_reply_keyexpr_t, @@ -91,9 +83,9 @@ pub struct ze_querying_subscriber_options_t { #[no_mangle] pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscriber_options_t { ze_querying_subscriber_options_t { - // reliability: SubInfo::default().reliability.into(), + reliability: Reliability::BestEffort.into(), allowed_origin: zcu_locality_default(), - query_selector: z_keyexpr_t::null(), + query_selector: null(), query_target: z_query_target_default(), query_consolidation: z_query_consolidation_none(), query_accept_replies: zcu_reply_keyexpr_default(), @@ -135,55 +127,46 @@ pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscr #[no_mangle] #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn ze_declare_querying_subscriber( + this: *mut MaybeUninit, session: z_session_t, - keyexpr: z_keyexpr_t, + key_expr: z_keyexpr_t, callback: &mut z_owned_closure_sample_t, - options: Option<&ze_querying_subscriber_options_t>, -) -> ze_owned_querying_subscriber_t { + options: ze_querying_subscriber_options_t, +) -> errors::ZCError { + let this = this.transmute_uninit_ptr(); let mut closure = z_owned_closure_sample_t::empty(); std::mem::swap(callback, &mut closure); - - match session.upgrade() { - Some(s) => { - let mut sub = s.declare_subscriber(keyexpr).querying(); - if let Some(options) = options { - sub = sub - // .reliability(options.reliability.into()) - .allowed_origin(options.allowed_origin.into()) - .query_target(options.query_target.into()) - .query_consolidation(options.query_consolidation) - .query_accept_replies(options.query_accept_replies.into()); - if options.query_selector.is_some() { - let query_selector = options - .query_selector - .as_ref() - .map(|s| s.clone().into_owned()); - if let Some(query_selector) = query_selector { - sub = sub.query_selector(query_selector) - } - } - if options.query_timeout_ms != 0 { - sub = sub - .query_timeout(std::time::Duration::from_millis(options.query_timeout_ms)); - } - } - match sub - .callback(move |sample| { - let sample = z_sample_t::new(&sample); - z_closure_sample_call(&closure, &sample) - }) - .res() - { - Ok(sub) => ze_owned_querying_subscriber_t::new(sub, session), - Err(e) => { - log::debug!("{}", e); - ze_owned_querying_subscriber_t::null() - } - } + let session = session.transmute_copy(); + let mut sub = session.declare_subscriber(key_expr.transmute_ref()).querying(); + sub = sub + .reliability(options.reliability.into()) + .allowed_origin(options.allowed_origin.into()) + .query_target(options.query_target.into()) + .query_consolidation(options.query_consolidation) + .query_accept_replies(options.query_accept_replies.into()); + if !options.query_selector.is_null() { + let query_selector = unsafe { *options.query_selector }.transmute_ref().clone(); + sub = sub.query_selector(query_selector) + } + if options.query_timeout_ms != 0 { + sub = sub + .query_timeout(std::time::Duration::from_millis(options.query_timeout_ms)); + } + let sub = sub + .callback(move |sample| { + let sample = sample.transmute_handle(); + z_closure_sample_call(&closure, sample); + }); + match sub.res() + { + Ok(sub) => { + Inplace::init(this, Some((sub, session))); + errors::Z_OK } - None => { - log::debug!("{}", LOG_INVALID_SESSION); - ze_owned_querying_subscriber_t::null() + Err(e) => { + log::debug!("{}", e); + Inplace::empty(this); + errors::Z_EGENERIC } } } @@ -196,68 +179,57 @@ pub unsafe extern "C" fn ze_querying_subscriber_get( sub: ze_querying_subscriber_t, selector: z_keyexpr_t, options: Option<&z_get_options_t>, -) -> i8 { +) -> errors::ZCError { unsafe impl Sync for z_get_options_t {} - - if let Some(sub) = sub.as_ref() { - match sub.session.upgrade() { - Some(s) => { - if let Err(e) = sub - .fetching_subscriber - .fetch({ - let selector = KeyExpr::try_from(selector).unwrap(); - move |cb| match options { - Some(options) => s - .get(selector) - .target(options.target.into()) - .consolidation(options.consolidation) - .timeout(std::time::Duration::from_millis(options.timeout_ms)) - .callback(cb) - .res_sync(), - None => s.get(selector).callback(cb).res_sync(), - } - }) - .res() - { - log::debug!("{}", e); - return -1; - } - } - None => { - log::debug!("{}", LOG_INVALID_SESSION); - return -1; - } + let sub = sub.transmute_ref(); + let session = sub.1; + let selector = selector.transmute_ref().clone(); + if let Err(e) = sub.0 + .fetch({ + let selector = KeyExpr::try_from(selector).unwrap(); + move |cb| match options { + Some(options) => session + .get(selector) + .target(options.target.into()) + .consolidation(options.consolidation) + .timeout(std::time::Duration::from_millis(options.timeout_ms)) + .callback(cb) + .res_sync(), + None => session.get(selector).callback(cb).res_sync(), } + }) + .res() + { + log::debug!("{}", e); + return errors::Z_EGENERIC; } - 0 + errors::Z_OK } /// Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. #[allow(clippy::missing_safety_doc)] #[no_mangle] -pub extern "C" fn ze_undeclare_querying_subscriber(sub: &mut ze_owned_querying_subscriber_t) -> i8 { - let sub = sub.transmute_mut(); - - if let Some(s) = sub.take() { - if let Err(e) = s.fetching_subscriber.close().res_sync() { - log::warn!("{}", e); - return e.errno().get(); +pub extern "C" fn ze_undeclare_querying_subscriber(this: &mut ze_owned_querying_subscriber_t) -> errors::ZCError { + if let Some(s) = this.transmute_mut().extract().take() { + if let Err(e) = s.0.close().res_sync() { + log::error!("{}", e); + return errors::Z_EGENERIC } } - 0 + errors::Z_OK } -/// Returns ``true`` if `sub` is valid. +/// Returns ``true`` if `this` is valid. #[allow(clippy::missing_safety_doc)] #[no_mangle] -pub extern "C" fn ze_querying_subscriber_check(sub: &ze_owned_querying_subscriber_t) -> bool { - sub.as_ref().is_some() +pub extern "C" fn ze_querying_subscriber_check(this: &ze_owned_querying_subscriber_t) -> bool { + this.transmute_ref().is_some() } -/// Returns a :c:type:`ze_querying_subscriber_loan` loaned from `p`. +/// Returns a :c:type:`ze_querying_subscriber_loan` loaned from `this`. #[no_mangle] -pub extern "C" fn ze_querying_subscriber_loan( - p: &ze_owned_querying_subscriber_t, -) -> ze_querying_subscriber_t { - ze_querying_subscriber_t(p) +pub extern "C" fn ze_querying_subscriber_loan(this: &ze_owned_querying_subscriber_t,) -> ze_querying_subscriber_t { + let this = this.transmute_ref(); + let this = unwrap_ref_unchecked(this); + this.transmute_handle() } diff --git a/src/subscriber.rs b/src/subscriber.rs index acdae4f60..ceab769cf 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -12,107 +12,76 @@ // ZettaScale Zenoh team, // -use crate::commons::*; -use crate::impl_guarded_transmute; +use std::mem::MaybeUninit; + +use crate::errors; use crate::keyexpr::*; -use crate::session::*; +use crate::transmute::unwrap_ref_unchecked; +use crate::transmute::Inplace; +use crate::transmute::TransmuteCopy; +use crate::transmute::TransmuteFromHandle; +use crate::transmute::TransmuteIntoHandle; +use crate::transmute::TransmuteRef; +use crate::transmute::TransmuteUninitPtr; use crate::z_closure_sample_call; use crate::z_owned_closure_sample_t; -use crate::LOG_INVALID_SESSION; +use crate::z_session_t; use zenoh::prelude::sync::SyncResolve; use zenoh::prelude::SessionDeclarations; -// use zenoh::subscriber::Reliability; -// use zenoh_protocol::core::SubInfo; -use zenoh_util::core::zresult::ErrNo; +use zenoh::subscriber::Reliability; +use zenoh::subscriber::Subscriber; /// The subscription reliability. /// /// - **Z_RELIABILITY_BEST_EFFORT** /// - **Z_RELIABILITY_RELIABLE** -// #[allow(non_camel_case_types, clippy::upper_case_acronyms)] -// #[repr(C)] -// #[derive(Clone, Copy)] -// pub enum z_reliability_t { -// BEST_EFFORT, -// RELIABLE, -// } - -// impl From for z_reliability_t { -// #[inline] -// fn from(r: Reliability) -> Self { -// match r { -// Reliability::BestEffort => z_reliability_t::BEST_EFFORT, -// Reliability::Reliable => z_reliability_t::RELIABLE, -// } -// } -// } - -// impl From for Reliability { -// #[inline] -// fn from(val: z_reliability_t) -> Self { -// match val { -// z_reliability_t::BEST_EFFORT => Reliability::BestEffort, -// z_reliability_t::RELIABLE => Reliability::Reliable, -// } -// } -// } - -/**************************************/ -/* DECLARATION */ -/**************************************/ -type Subscriber = Option>>; - -/// An owned zenoh subscriber. Destroying the subscriber cancels the subscription. -/// -/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. -/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. -/// -/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. -/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. -/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. -/// -/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. -#[cfg(not(target_arch = "arm"))] -#[repr(C, align(8))] -pub struct z_owned_subscriber_t([u64; 1]); - -#[cfg(target_arch = "arm")] -#[repr(C, align(4))] -pub struct z_owned_subscriber_t([u32; 1]); - -impl_guarded_transmute!(Subscriber, z_owned_subscriber_t); +#[allow(non_camel_case_types, clippy::upper_case_acronyms)] +#[repr(C)] +#[derive(Clone, Copy)] +pub enum z_reliability_t { + BEST_EFFORT, + RELIABLE, +} -impl z_owned_subscriber_t { - pub fn new(sub: zenoh::subscriber::Subscriber<'static, ()>) -> Self { - Some(Box::new(sub)).into() - } - pub fn null() -> Self { - None.into() +impl From for z_reliability_t { +#[inline] + fn from(r: Reliability) -> Self { + match r { + Reliability::BestEffort => z_reliability_t::BEST_EFFORT, + Reliability::Reliable => z_reliability_t::RELIABLE, + } } } -/// Constructs a null safe-to-drop value of 'z_owned_subscriber_t' type -#[no_mangle] -pub extern "C" fn z_subscriber_null() -> z_owned_subscriber_t { - z_owned_subscriber_t::null() +impl From for Reliability { +#[inline] + fn from(val: z_reliability_t) -> Self { + match val { + z_reliability_t::BEST_EFFORT => Reliability::BestEffort, + z_reliability_t::RELIABLE => Reliability::Reliable, + } + } } -/// A loaned zenoh subscriber. -#[allow(non_camel_case_types)] -#[derive(Clone, Copy)] -#[repr(C)] -pub struct z_subscriber_t(*const z_owned_subscriber_t); +pub use crate::opaque_types::z_owned_subscriber_t; +pub use crate::opaque_types::z_subscriber_t; -impl AsRef for z_subscriber_t { - fn as_ref(&self) -> &Subscriber { - unsafe { &(*self.0) } - } +decl_transmute_owned!(Option>, z_owned_subscriber_t); +decl_transmute_handle!(Subscriber<'static, ()>, z_subscriber_t); + +/// Constructs a null safe-to-drop value of 'z_owned_subscriber_t' type +#[no_mangle] +pub extern "C" fn z_subscriber_null(this: *mut MaybeUninit) { + let this = this.transmute_uninit_ptr(); + Inplace::empty(this); } -/// Returns a :c:type:`z_subscriber_t` loaned from `p`. +/// Returns a :c:type:`z_subscriber_t` loaned from `this`. #[no_mangle] -pub extern "C" fn z_subscriber_loan(p: &z_owned_subscriber_t) -> z_subscriber_t { - z_subscriber_t(p) +pub extern "C" fn z_subscriber_loan(this: &z_owned_subscriber_t) -> z_subscriber_t { + let this = this.transmute_ref(); + let this = unwrap_ref_unchecked(this); + this.transmute_handle() } /// Options passed to the :c:func:`z_declare_subscriber` or :c:func:`z_declare_pull_subscriber` function. @@ -122,15 +91,14 @@ pub extern "C" fn z_subscriber_loan(p: &z_owned_subscriber_t) -> z_subscriber_t #[allow(non_camel_case_types)] #[repr(C)] pub struct z_subscriber_options_t { - // pub reliability: z_reliability_t, + pub reliability: z_reliability_t, } /// Constructs the default value for :c:type:`z_subscriber_options_t`. #[no_mangle] pub extern "C" fn z_subscriber_options_default() -> z_subscriber_options_t { - // let info = SubInfo::default(); z_subscriber_options_t { - // reliability: info.reliability.into(), + reliability: Reliability::BestEffort.into() } } @@ -138,7 +106,7 @@ pub extern "C" fn z_subscriber_options_default() -> z_subscriber_options_t { /// /// Parameters: /// session: The zenoh session. -/// keyexpr: The key expression to subscribe. +/// key_expr: The key expression to subscribe. /// callback: The callback function that will be called each time a data matching the subscribed expression is received. /// opts: The options to be passed to describe the options to be passed to the subscriber declaration. /// @@ -168,34 +136,33 @@ pub extern "C" fn z_subscriber_options_default() -> z_subscriber_options_t { #[no_mangle] #[allow(clippy::missing_safety_doc)] pub extern "C" fn z_declare_subscriber( + this: *mut MaybeUninit, session: z_session_t, - keyexpr: z_keyexpr_t, + key_expr: z_keyexpr_t, callback: &mut z_owned_closure_sample_t, - opts: Option<&z_subscriber_options_t>, -) -> z_owned_subscriber_t { + options: z_subscriber_options_t, +) -> errors::ZCError { + let this = this.transmute_uninit_ptr(); let mut closure = z_owned_closure_sample_t::empty(); std::mem::swap(callback, &mut closure); - - match session.upgrade() { - Some(s) => { - let mut res = s.declare_subscriber(keyexpr).callback(move |sample| { - let sample = z_sample_t::new(&sample); - z_closure_sample_call(&closure, &sample) - }); - if let Some(opts) = opts { - // res = res.reliability(opts.reliability.into()) - } - match res.res() { - Ok(sub) => z_owned_subscriber_t::new(sub), - Err(e) => { - log::debug!("{}", e); - z_owned_subscriber_t::null() - } - } - } - None => { - log::debug!("{}", LOG_INVALID_SESSION); - z_owned_subscriber_t::null() + let session = session.transmute_copy(); + let key_expr = key_expr.transmute_ref(); + let subscriber + = session.declare_subscriber(key_expr).callback(move |sample| { + let sample = sample.transmute_handle(); + z_closure_sample_call(&closure, sample) + }); + + let subscriber = subscriber.reliability(options.reliability.into()); + match subscriber.res() { + Ok(sub) => { + Inplace::init(this, Some(sub)); + errors::Z_OK + }, + Err(e) => { + log::error!("{}", e); + Inplace::empty(this); + errors::Z_EGENERIC } } } @@ -203,30 +170,27 @@ pub extern "C" fn z_declare_subscriber( /// Returns the key expression of the subscriber. #[no_mangle] #[allow(clippy::missing_safety_doc)] -pub extern "C" fn z_subscriber_keyexpr(subscriber: z_subscriber_t) -> z_owned_keyexpr_t { - if let Some(p) = subscriber.as_ref() { - p.key_expr().clone().into() - } else { - z_keyexpr_t::null().into() - } +pub extern "C" fn z_subscriber_keyexpr(subscriber: z_subscriber_t) -> z_keyexpr_t { + let subscriber = subscriber.transmute_ref(); + subscriber.key_expr().transmute_handle() } /// Undeclares the given :c:type:`z_owned_subscriber_t`, droping it and invalidating it for double-drop safety. #[allow(clippy::missing_safety_doc)] #[no_mangle] -pub extern "C" fn z_undeclare_subscriber(sub: &mut z_owned_subscriber_t) -> i8 { - if let Some(s) = sub.take() { +pub extern "C" fn z_undeclare_subscriber(subscriber: &mut z_owned_subscriber_t) -> errors::ZCError { + if let Some(s) = subscriber.transmute_mut().extract().take() { if let Err(e) = s.undeclare().res_sync() { - log::warn!("{}", e); - return e.errno().get(); + log::error!("{}", e); + return errors::Z_EGENERIC } } - 0 + errors::Z_OK } /// Returns ``true`` if `sub` is valid. #[allow(clippy::missing_safety_doc)] #[no_mangle] -pub extern "C" fn z_subscriber_check(sub: &z_owned_subscriber_t) -> bool { - sub.as_ref().is_some() +pub extern "C" fn z_subscriber_check(subscriber: &z_owned_subscriber_t) -> bool { + subscriber.transmute_ref().is_some() } diff --git a/src/transmute.rs b/src/transmute.rs index 4aaa3d157..cafe930ea 100644 --- a/src/transmute.rs +++ b/src/transmute.rs @@ -92,27 +92,24 @@ impl Inplace for T { macro_rules! validate_equivalence { ($type_a:ty, $type_b:ty) => { const _: () = { - let align_a = std::mem::align_of::<$type_a>(); - let align_b = std::mem::align_of::<$type_b>(); - if align_a != align_b { - panic!( - "Alingment mismatch: type `{}` has align {}, type `{}` has align {}", - stringify!($type_a), - align_a, - stringify!($type_b), - align_b - ); + use const_format::concatcp; + const TYPE_NAME_A: &str = stringify!($type_a); + const TYPE_NAME_B: &str = stringify!($type_b); + const ALIGN_A: usize = std::mem::align_of::<$type_a>(); + const ALIGN_B: usize = std::mem::align_of::<$type_b>(); + if ALIGN_A != ALIGN_B { + const ERR_MESSAGE: &str = concatcp!( + "Alingment mismatch: type ", TYPE_NAME_A, " has alignment ", ALIGN_A, + " while type ", TYPE_NAME_B, " has alignment ", ALIGN_B); + panic!("{}", ERR_MESSAGE); } - let size_a = std::mem::size_of::<$type_a>(); - let size_b = std::mem::size_of::<$type_b>(); - if size_a != size_b { - panic!( - "Size mismatch: type `{}` has size {}, type `{}` has size {}", - stringify!($type_a), - size_a, - stringify!($type_b), - size_b - ); + const SIZE_A: usize = std::mem::size_of::<$type_a>(); + const SIZE_B: usize = std::mem::size_of::<$type_b>(); + if SIZE_A != SIZE_B { + const ERR_MESSAGE: &str = concatcp!( + "Size mismatch: type ", TYPE_NAME_A, " has size ", SIZE_A, + " while type ", TYPE_NAME_B, " has size ", SIZE_B); + panic!("{}", ERR_MESSAGE); } }; };