From 0898ff2efa2f46e50834edb7b2f2e9b323d359a9 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 17 Dec 2024 16:22:44 +0100 Subject: [PATCH] refactor advanced publisher/subscriber options --- examples/z_advanced_pub.c | 6 ++-- examples/z_advanced_sub.c | 14 +++----- include/zenoh_commons.h | 32 +++++++++++------ src/advanced_publisher.rs | 19 ++++++---- src/advanced_subscriber.rs | 54 +++++++++++++++++++++-------- tests/z_int_advanced_pub_sub_test.c | 18 ++++------ 6 files changed, 84 insertions(+), 59 deletions(-) diff --git a/examples/z_advanced_pub.c b/examples/z_advanced_pub.c index 025245710..07c4d203e 100644 --- a/examples/z_advanced_pub.c +++ b/examples/z_advanced_pub.c @@ -53,10 +53,8 @@ int main(int argc, char** argv) { ze_advanced_publisher_options_t pub_opts; ze_advanced_publisher_options_default(&pub_opts); - ze_advanced_publisher_cache_options_t cache_options; - ze_advanced_publisher_cache_options_default(&cache_options); - cache_options.max_samples = args.history; - pub_opts.cache = &cache_options; + ze_advanced_publisher_cache_options_default(&pub_opts.cache); // or pub_opts.cache.is_enabled = true; + pub_opts.cache.max_samples = args.history; pub_opts.publisher_detection = true; pub_opts.sample_miss_detection = true; diff --git a/examples/z_advanced_sub.c b/examples/z_advanced_sub.c index 00d2c1bac..9d6bbaf1b 100644 --- a/examples/z_advanced_sub.c +++ b/examples/z_advanced_sub.c @@ -64,16 +64,10 @@ int main(int argc, char** argv) { ze_advanced_subscriber_options_t sub_opts; ze_advanced_subscriber_options_default(&sub_opts); - - ze_advanced_subscriber_history_options_t sub_history_options; - ze_advanced_subscriber_history_options_default(&sub_history_options); - sub_history_options.detect_late_publishers = true; - - ze_advanced_subscriber_recovery_options_t sub_recovery_options; - ze_advanced_subscriber_recovery_options_default(&sub_recovery_options); - sub_recovery_options.periodic_queries_period_ms = 1000; - sub_opts.history = &sub_history_options; - sub_opts.recovery = &sub_recovery_options; + ze_advanced_subscriber_history_options_default(&sub_opts.history); // or sub_opts.history.is_enabled = true; + sub_opts.history.detect_late_publishers = true; + ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery); // or sub_opts.recovery.is_enabled = true; + sub_opts.recovery.periodic_queries_period_ms = 1000; sub_opts.subscriber_detection = true; z_owned_closure_sample_t callback; diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index fdb80aeba..8eec752cf 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -1009,7 +1009,11 @@ typedef struct zc_moved_shm_client_list_t { #if defined(Z_FEATURE_UNSTABLE_API) typedef struct ze_advanced_publisher_cache_options_t { /** - * Number of samples to keep for each resource + * Must be set to ``true``, to enable the cache. + */ + bool is_enabled; + /** + * Number of samples to keep for each resource. */ size_t max_samples; /** @@ -1053,9 +1057,9 @@ typedef struct ze_advanced_publisher_options_t { */ struct z_publisher_options_t publisher_options; /** - * Optional settings for publisher cache. + * Publisher cache settings. */ - struct ze_advanced_publisher_cache_options_t *cache; + struct ze_advanced_publisher_cache_options_t cache; /** * Allow matching Subscribers to detect lost samples and optionally ask for retransimission. * @@ -1070,7 +1074,7 @@ typedef struct ze_advanced_publisher_options_t { * An optional key expression to be added to the liveliness token key expression. * It can be used to convey meta data. */ - struct z_loaned_keyexpr_t *publisher_detection_metadata; + const struct z_loaned_keyexpr_t *publisher_detection_metadata; } ze_advanced_publisher_options_t; #endif /** @@ -1142,6 +1146,10 @@ typedef struct ze_moved_advanced_subscriber_t { */ #if defined(Z_FEATURE_UNSTABLE_API) typedef struct ze_advanced_subscriber_history_options_t { + /** + * Must be set to ``true``, to enable the history data recovery. + */ + bool is_enabled; /** * Enable detection of late joiner publishers and query for their historical data. * Late joiner detection can only be achieved for Publishers that enable publisher_detection. @@ -1164,6 +1172,10 @@ typedef struct ze_advanced_subscriber_history_options_t { */ #if defined(Z_FEATURE_UNSTABLE_API) typedef struct ze_advanced_subscriber_recovery_options_t { + /** + * Must be set to ``true``, to enable the lost sample recovery. + */ + bool is_enabled; /** * Period for queries for not yet received Samples. * @@ -1186,16 +1198,14 @@ typedef struct ze_advanced_subscriber_options_t { */ struct z_subscriber_options_t subscriber_options; /** - * Optional settings for querying historical data. History can only be retransmitted by Publishers that enable caching. - * Querying historical data is disabled if the value is ``NULL``. + * Settings for querying historical data. History can only be retransmitted by Publishers that enable caching. */ - struct ze_advanced_subscriber_history_options_t *history; + struct ze_advanced_subscriber_history_options_t history; /** - * Optional settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable + * Settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable * caching and sample_miss_detection. - * Retransmission is disabled if the value is ``NULL``. */ - struct ze_advanced_subscriber_recovery_options_t *recovery; + struct ze_advanced_subscriber_recovery_options_t recovery; /** * Timeout to be used for history and recovery queries. * Default value will be used if set to ``0``. @@ -1209,7 +1219,7 @@ typedef struct ze_advanced_subscriber_options_t { * An optional key expression to be added to the liveliness token key expression. * It can be used to convey meta data. */ - struct z_loaned_keyexpr_t *subscriber_detection_metadata; + const struct z_loaned_keyexpr_t *subscriber_detection_metadata; } ze_advanced_subscriber_options_t; #endif /** diff --git a/src/advanced_publisher.rs b/src/advanced_publisher.rs index 7309cc76d..02d133769 100644 --- a/src/advanced_publisher.rs +++ b/src/advanced_publisher.rs @@ -37,7 +37,9 @@ use crate::{ /// @brief Setting for advanced publisher's cache. The cache allows advanced subscribers to recover history and/or lost samples. #[repr(C)] pub struct ze_advanced_publisher_cache_options_t { - /// Number of samples to keep for each resource + /// Must be set to ``true``, to enable the cache. + pub is_enabled: bool, + /// Number of samples to keep for each resource. pub max_samples: usize, /// The congestion control to apply to replies. pub congestion_control: z_congestion_control_t, @@ -50,6 +52,7 @@ pub struct ze_advanced_publisher_cache_options_t { impl Default for ze_advanced_publisher_cache_options_t { fn default() -> Self { Self { + is_enabled: true, max_samples: 1, congestion_control: CongestionControl::default().into(), priority: Priority::default().into(), @@ -86,8 +89,8 @@ impl From<&ze_advanced_publisher_cache_options_t> for CacheConfig { pub struct ze_advanced_publisher_options_t { /// Base publisher options. pub publisher_options: z_publisher_options_t, - /// Optional settings for publisher cache. - pub cache: Option<&'static mut ze_advanced_publisher_cache_options_t>, + /// Publisher cache settings. + pub cache: ze_advanced_publisher_cache_options_t, /// Allow matching Subscribers to detect lost samples and optionally ask for retransimission. /// /// Retransmission can only be done if history is enabled on subscriber side. @@ -96,7 +99,7 @@ pub struct ze_advanced_publisher_options_t { pub publisher_detection: bool, /// An optional key expression to be added to the liveliness token key expression. /// It can be used to convey meta data. - pub publisher_detection_metadata: Option<&'static mut z_loaned_keyexpr_t>, + pub publisher_detection_metadata: Option<&'static z_loaned_keyexpr_t>, } /// Constructs the default value for `z_publisher_options_t`. @@ -104,9 +107,11 @@ pub struct ze_advanced_publisher_options_t { pub extern "C" fn ze_advanced_publisher_options_default( this_: &mut MaybeUninit, ) { + let mut cache = ze_advanced_publisher_cache_options_t::default(); + cache.is_enabled = false; this_.write(ze_advanced_publisher_options_t { publisher_options: z_publisher_options_t::default(), - cache: None, + cache, sample_miss_detection: false, publisher_detection: false, publisher_detection_metadata: None, @@ -158,8 +163,8 @@ pub extern "C" fn ze_declare_advanced_publisher( if let Some(pub_detection_metadata) = &options.publisher_detection_metadata { p = p.publisher_detection_metadata(pub_detection_metadata.as_rust_type_ref()); } - if let Some(cache) = &options.cache { - p = p.cache((&**cache).into()); + if options.cache.is_enabled { + p = p.cache((&options.cache).into()); } } match p.wait() { diff --git a/src/advanced_subscriber.rs b/src/advanced_subscriber.rs index 6312e229b..58faafc11 100644 --- a/src/advanced_subscriber.rs +++ b/src/advanced_subscriber.rs @@ -31,8 +31,9 @@ use crate::{ /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief Settings for retrievieng historical data for Advanced Subscriber. #[repr(C)] -#[derive(Default)] pub struct ze_advanced_subscriber_history_options_t { + /// Must be set to ``true``, to enable the history data recovery. + pub is_enabled: bool, /// Enable detection of late joiner publishers and query for their historical data. /// Late joiner detection can only be achieved for Publishers that enable publisher_detection. /// History can only be retransmitted by Publishers that enable caching. @@ -43,6 +44,17 @@ pub struct ze_advanced_subscriber_history_options_t { pub max_age_ms: u64, } +impl Default for ze_advanced_subscriber_history_options_t { + fn default() -> Self { + Self { + is_enabled: true, + detect_late_publishers: false, + max_samples: 0, + max_age_ms: 0, + } + } +} + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief Constructs the default value for `ze_advanced_subscriber_history_options_t`. #[no_mangle] @@ -71,8 +83,9 @@ impl From<&ze_advanced_subscriber_history_options_t> for HistoryConfig { /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief Settings for recovering lost messages for Advanced Subscriber. #[repr(C)] -#[derive(Default)] pub struct ze_advanced_subscriber_recovery_options_t { + /// Must be set to ``true``, to enable the lost sample recovery. + pub is_enabled: bool, /// Period for queries for not yet received Samples. /// /// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost. @@ -82,6 +95,15 @@ pub struct ze_advanced_subscriber_recovery_options_t { pub periodic_queries_period_ms: u64, } +impl Default for ze_advanced_subscriber_recovery_options_t { + fn default() -> Self { + Self { + is_enabled: true, + periodic_queries_period_ms: 0, + } + } +} + impl From<&ze_advanced_subscriber_recovery_options_t> for RecoveryConfig { fn from(val: &ze_advanced_subscriber_recovery_options_t) -> RecoveryConfig { let mut r = RecoveryConfig::default(); @@ -107,13 +129,11 @@ pub extern "C" fn ze_advanced_subscriber_recovery_options_default( pub struct ze_advanced_subscriber_options_t { /// Base subscriber options. pub subscriber_options: z_subscriber_options_t, - /// Optional settings for querying historical data. History can only be retransmitted by Publishers that enable caching. - /// Querying historical data is disabled if the value is ``NULL``. - pub history: Option<&'static mut ze_advanced_subscriber_history_options_t>, - /// Optional settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable + /// Settings for querying historical data. History can only be retransmitted by Publishers that enable caching. + pub history: ze_advanced_subscriber_history_options_t, + /// Settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable /// caching and sample_miss_detection. - /// Retransmission is disabled if the value is ``NULL``. - pub recovery: Option<&'static mut ze_advanced_subscriber_recovery_options_t>, + pub recovery: ze_advanced_subscriber_recovery_options_t, /// Timeout to be used for history and recovery queries. /// Default value will be used if set to ``0``. pub query_timeout_ms: u64, @@ -121,7 +141,7 @@ pub struct ze_advanced_subscriber_options_t { pub subscriber_detection: bool, /// An optional key expression to be added to the liveliness token key expression. /// It can be used to convey meta data. - pub subscriber_detection_metadata: Option<&'static mut z_loaned_keyexpr_t>, + pub subscriber_detection_metadata: Option<&'static z_loaned_keyexpr_t>, } /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. @@ -130,10 +150,14 @@ pub struct ze_advanced_subscriber_options_t { pub extern "C" fn ze_advanced_subscriber_options_default( this: &mut MaybeUninit, ) { + let mut history = ze_advanced_subscriber_history_options_t::default(); + history.is_enabled = false; + let mut recovery = ze_advanced_subscriber_recovery_options_t::default(); + recovery.is_enabled = false; this.write(ze_advanced_subscriber_options_t { subscriber_options: z_subscriber_options_t::default(), - history: None, - recovery: None, + history, + recovery, query_timeout_ms: 0, subscriber_detection: false, subscriber_detection_metadata: None, @@ -163,11 +187,11 @@ fn _declare_advanced_subscriber_inner( if let Some(sub_detection_metadata) = &options.subscriber_detection_metadata { sub = sub.subscriber_detection_metadata(sub_detection_metadata.as_rust_type_ref()); } - if let Some(history) = &options.history { - sub = sub.history((&**history).into()); + if options.history.is_enabled { + sub = sub.history((&options.history).into()); } - if let Some(recovery) = &options.recovery { - sub = sub.recovery((&**recovery).into()); + if options.recovery.is_enabled { + sub = sub.recovery((&options.recovery).into()); } } sub diff --git a/tests/z_int_advanced_pub_sub_test.c b/tests/z_int_advanced_pub_sub_test.c index bbafea630..e3c73e943 100644 --- a/tests/z_int_advanced_pub_sub_test.c +++ b/tests/z_int_advanced_pub_sub_test.c @@ -51,10 +51,8 @@ int run_publisher() { ze_advanced_publisher_options_t pub_opts; ze_advanced_publisher_options_default(&pub_opts); - ze_advanced_publisher_cache_options_t cache_options; - ze_advanced_publisher_cache_options_default(&cache_options); - cache_options.max_samples = values_count; - pub_opts.cache = &cache_options; + ze_advanced_publisher_cache_options_default(&pub_opts.cache); + pub_opts.cache.max_samples = values_count; pub_opts.publisher_detection = true; pub_opts.sample_miss_detection = true; @@ -141,15 +139,11 @@ int run_subscriber() { ze_advanced_subscriber_options_t sub_opts; ze_advanced_subscriber_options_default(&sub_opts); - ze_advanced_subscriber_history_options_t sub_history_options; - ze_advanced_subscriber_history_options_default(&sub_history_options); - sub_history_options.detect_late_publishers = true; + ze_advanced_subscriber_history_options_default(&sub_opts.history); + sub_opts.history.detect_late_publishers = true; - ze_advanced_subscriber_recovery_options_t sub_recovery_options; - ze_advanced_subscriber_recovery_options_default(&sub_recovery_options); - sub_recovery_options.periodic_queries_period_ms = 1000; - sub_opts.history = &sub_history_options; - sub_opts.recovery = &sub_recovery_options; + ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery); + sub_opts.recovery.periodic_queries_period_ms = 1000; sub_opts.subscriber_detection = true; z_owned_closure_sample_t callback;