diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d3f93e72..24b70723 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/identifier.cpp src/detail/graph_cache.cpp src/detail/guard_condition.cpp + src/detail/liveliness_utils.cpp src/detail/message_type_support.cpp src/detail/rmw_data_types.cpp src/detail/service_type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 025b0047..b47c14e5 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -32,491 +33,341 @@ #include "graph_cache.hpp" ///============================================================================= -std::string GenerateToken::liveliness(size_t domain_id) -{ - std::string token = "@ros2_lv/" + std::to_string(domain_id) + "/**"; - return token; -} +using Entity = liveliness::Entity; +using EntityType = liveliness::EntityType; ///============================================================================= -/** - * Generate a liveliness token for the particular entity. - * - * The liveliness tokens are in the form: - * - * @ros2_lv//// - * - * Where: - * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. - * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. - * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. - * - The ROS node name for this entity. - */ -static std::string generate_base_token( - const std::string & entity, - size_t domain_id, - const std::string & namespace_, - const std::string & name) +TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) +: pub_count_(pub_count), + sub_count_(sub_count) { - std::stringstream token_ss; - token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_; - // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". - // Hence we add an "_" to denote an empty namespace such that splitting the key - // will always result in 5 parts. - if (namespace_ == "/") { - token_ss << "_/"; - } else { - token_ss << "/"; - } - // Finally append node name. - token_ss << name; - return token_ss.str(); + // Do nothing. } ///============================================================================= -std::string GenerateToken::node( - size_t domain_id, - const std::string & namespace_, - const std::string & name) -{ - return generate_base_token("NN", domain_id, namespace_, name); -} +TopicData::TopicData( + liveliness::TopicInfo info, + TopicStats stats) +: info_(std::move(info)), + stats_(std::move(stats)) +{} ///============================================================================= -std::string GenerateToken::publisher( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos) +void GraphCache::parse_put(const std::string & keyexpr) { - std::string token = generate_base_token("MP", domain_id, node_namespace, node_name); - token += topic + "/" + type + "/" + qos; - return token; -} + std::optional valid_entity = liveliness::Entity::make(keyexpr); + if (!valid_entity.has_value()) { + // Error message has already been logged. + return; + } + const liveliness::Entity & entity = *valid_entity; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update graph_topics_ + auto add_topic_data = + [this](const Entity & entity, GraphNode & graph_node) -> void + { + if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + return; + } -///============================================================================= -std::string GenerateToken::subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos) -{ - std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); - token += topic + "/" + type + "/" + qos; - return token; -} + if (!entity.topic_info().has_value()) { + // This should not happen as add_topic_data() is called after validating the existence + // of topic_info. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "add_topic_data() called without valid TopicInfo. Report this."); + return; + } -///============================================================================= -bool PublishToken::put( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } + const liveliness::TopicInfo topic_info = entity.topic_info().value(); + GraphNode::TopicMap & topic_map = entity.type() == + EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity.type() == + EntityType::Publisher ? "publisher" : "subscription"; + const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + const std::size_t sub_count = !pub_count; + TopicDataPtr graph_topic_data = std::make_shared( + topic_info, + TopicStats{pub_count, sub_count}); + + GraphNode::TopicDataMap topic_data_map = { + {graph_topic_data->info_.type_, graph_topic_data}}; + std::pair insertion = + topic_map.insert(std::make_pair(topic_info.name_, topic_data_map)); + if (!insertion.second) { + // A topic with the same name already exists in the node so we append the type. + std::pair type_insertion = + insertion.first->second.insert( + std::make_pair( + graph_topic_data->info_.type_, + graph_topic_data)); + if (!type_insertion.second) { + // We have another instance of a pub/sub over the same topic and type so we increment + // the counters. + TopicDataPtr & existing_graph_topic = type_insertion.first->second; + existing_graph_topic->stats_.pub_count_ += pub_count; + existing_graph_topic->stats_.sub_count_ += sub_count; + } + } - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); - return false; - } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); - z_put_options_t options = z_put_options_default(); - options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { - RCUTILS_SET_ERROR_MSG("unable to publish liveliness for node creation"); - return false; - } + // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph + GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(topic_info.name_); + if (cache_topic_it == graph_topics_.end()) { + // First time this topic name is added to the graph. + graph_topics_[topic_info.name_] = std::move(topic_data_map); + } else { + // If a TopicData entry for the same type exists in the topic map, update pub/sub counts + // or else create an new TopicData. + std::pair topic_data_insertion = + cache_topic_it->second.insert(std::make_pair(topic_info.type_, nullptr)); + if (topic_data_insertion.second) { + // A TopicData for the topic_type does not exist. + topic_data_insertion.first->second = std::make_shared( + topic_info, + TopicStats{pub_count, sub_count}); + } else { + // Update the existing counters. + topic_data_insertion.first->second->stats_.pub_count_ += pub_count; + topic_data_insertion.first->second->stats_.sub_count_ += sub_count; + } + } - return true; -} + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Added %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_info.name_.c_str(), + topic_info.type_.c_str(), + topic_info.qos_.c_str(), + graph_node.name_.c_str()); + }; -///============================================================================= -bool PublishToken::del( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } + // Helper lambda to convert an Entity into a GraphNode. + auto make_graph_node = + [&](const Entity & entity) -> std::shared_ptr + { + auto graph_node = std::make_shared(); + graph_node->ns_ = entity.node_namespace(); + graph_node->name_ = entity.node_name(); + graph_node->enclave_ = entity.node_enclave(); + + if (!entity.topic_info().has_value()) { + // Token was for a node. + return graph_node; + } + // Add pub/sub entries. + add_topic_data(entity, *graph_node); - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); - return false; - } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); - const z_delete_options_t options = z_delete_options_default(); - if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { - RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); - return false; - } + return graph_node; + }; - return true; -} + // Lock the graph mutex before accessing the graph. + std::lock_guard lock(graph_mutex_); -///============================================================================= -namespace -{ -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/') -{ - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } - std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const auto & prev_idx = delim_idx.at(i - 1); - const auto & idx = delim_idx.at(i); - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; - } - // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); - return result; -} -///============================================================================= -// Convert a liveliness token into a -std::optional> _parse_token(const std::string & keyexpr) -{ - std::vector parts = split_keyexpr(keyexpr); - // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). - // Basic validation. - if (parts.size() < 5) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - for (const std::string & p : parts) { - if (p.empty()) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } + // If the namespace did not exist, create it and add the node to the graph and return. + NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); + if (ns_it == graph_.end()) { + NodeMap node_map = { + {entity.node_name(), make_graph_node(entity)}}; + graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", + entity.node_name().c_str(), + entity.node_namespace().c_str()); + return; } - // Get the entity, ie NN, MP, MS, SS, SC. - std::string & entity = parts[2]; - - GraphNode node; - // Nodes with empty namespaces will contain a "_". - node.ns = parts[3] == "_" ? "/" : "/" + parts[3]; - node.name = std::move(parts[4]); + // Add the node to the namespace if it did not exist and return. + NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); + if (node_it == ns_it->second.end()) { + ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", + entity.node_name().c_str(), + entity.node_namespace().c_str()); + return; + } - if (entity != "NN") { - if (parts.size() < 8) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - GraphNode::TopicData data; - data.topic = "/" + std::move(parts[5]); - data.type = std::move(parts[6]); - data.qos = std::move(parts[7]); - - if (entity == "MP") { - node.pubs.push_back(std::move(data)); - } else if (entity == "MS") { - node.subs.push_back(std::move(data)); - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid entity [%s] in liveliness token", entity.c_str()); - return std::nullopt; - } + // Handles additions to an existing node in the graph. + if (entity.type() == EntityType::Node) { + // The NN entity is implicitly handled above where we add the node. + // If control reaches here, then we received a duplicate entry for the same node. + // This could happen when we get() all the liveliness tokens when the node spins up and + // receive a MP token before an NN one. + return; } - return std::make_pair(std::move(entity), std::move(node)); -} -} // namespace + if (!entity.topic_info().has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.", + keyexpr.c_str()); + return; + } -///============================================================================= -GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) -: pub_count_(pub_count), - sub_count_(sub_count) -{ - // Do nothing. + // Update the graph based on the entity. + add_topic_data(entity, *(node_it->second)); } ///============================================================================= -void GraphCache::parse_put(const std::string & keyexpr) +void GraphCache::parse_del(const std::string & keyexpr) { - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { + std::optional valid_entity = liveliness::Entity::make(keyexpr); + if (!valid_entity.has_value()) { // Error message has already been logged. return; } - const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); - std::lock_guard lock(graph_mutex_); + const liveliness::Entity & entity = *valid_entity; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update caches like graph_topics_ if update_cache is true. + auto remove_topic_data = + [&](const Entity & entity, GraphNode & graph_node, + bool update_cache = false) -> void + { + if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + return; + } - if (entity == "NN") { - // Node - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // New namespace. - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(node->ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second) { + if (!entity.topic_info().has_value()) { + // This should not happen as add_topic_data() is called after validating the existence + // of topic_info. RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", - node->name.c_str()); + "rmw_zenoh_cpp", + "remove_topic_data() called without valid TopicInfo. Report this."); + return; } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to the graph.", - node->name.c_str()); - return; - } else if (entity == "MP") { - // Publisher - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->pubs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->pubs.push_back(node->pubs[0]); - } else { + const liveliness::TopicInfo topic_info = entity.topic_info().value(); + GraphNode::TopicMap & topic_map = entity.type() == + EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity.type() == + EntityType::Publisher ? "publisher" : "subscription"; + const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + const std::size_t sub_count = !pub_count; + + GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_); + if (topic_it == topic_map.end()) { + // Pub/sub not found. return; } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->pub_count_; - } else { - insertion.first->second = std::make_unique(1, 0); - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str()); - return; - } else if (entity == "MS") { - // Subscription - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->subs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->subs.push_back(node->subs[0]); - } else { + + GraphNode::TopicDataMap & topic_data_map = topic_it->second; + // Search the unordered_set for the TopicData for this topic. + GraphNode::TopicDataMap::iterator topic_data_it = + topic_data_map.find(topic_info.type_); + if (topic_data_it == topic_data_map.end()) { + // Something is wrong. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", + topic_info.name_.c_str()); return; } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->sub_count_; - } else { - insertion.first->second = std::make_unique(0, 1); - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str()); - return; - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunud): Service Client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); + + // Decrement the relevant counters. If both counters are 0 remove from graph_node. + TopicDataPtr & existing_topic_data = topic_data_it->second; + existing_topic_data->stats_.pub_count_ -= pub_count; + existing_topic_data->stats_.sub_count_ -= sub_count; + if (existing_topic_data->stats_.pub_count_ == 0 && + existing_topic_data->stats_.sub_count_ == 0) + { + topic_data_map.erase(topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (topic_data_map.empty()) { + topic_map.erase(topic_info.name_); + } + + // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. + if (update_cache) { + GraphNode::TopicMap::iterator cache_topic_it = + graph_topics_.find(topic_info.name_); + if (cache_topic_it == graph_topics_.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + topic_info.name_.c_str()); + } else { + GraphNode::TopicDataMap::iterator cache_topic_data_it = + cache_topic_it->second.find(topic_info.type_); + if (cache_topic_data_it != cache_topic_it->second.end()) { + // Decrement the relevant counters. If both counters are 0 remove from cache. + cache_topic_data_it->second->stats_.pub_count_ -= pub_count; + cache_topic_data_it->second->stats_.sub_count_ -= sub_count; + if (cache_topic_data_it->second->stats_.pub_count_ == 0 && + cache_topic_data_it->second->stats_.sub_count_ == 0) + { + cache_topic_it->second.erase(cache_topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (cache_topic_it->second.empty()) { + graph_topics_.erase(cache_topic_it); + } + } + } + } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Removed %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_info.name_.c_str(), + topic_info.type_.c_str(), + topic_info.qos_.c_str(), + graph_node.name_.c_str()); + }; + + // Lock the graph mutex before accessing the graph. + std::lock_guard lock(graph_mutex_); + + // If namespace does not exist, ignore the request. + NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); + if (ns_it == graph_.end()) { return; } -} -///============================================================================= -void GraphCache::parse_del(const std::string & keyexpr) -{ - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { - // Error message has already been logged. + // If the node does not exist, ignore the request. + NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); + if (node_it == ns_it->second.end()) { return; } - const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); - std::lock_guard lock(graph_mutex_); - if (entity == "NN") { + + if (entity.type() == EntityType::Node) { // Node - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - ns_it->second.erase(node->name); + // The liveliness tokens to remove pub/subs should be received before the one to remove a node + // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in + // the node below, we should update the count in graph_topics_. + const GraphNodePtr graph_node = node_it->second; + if (!graph_node->pubs_.empty() || !graph_node->subs_.empty()) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token to remove node /%s from the graph before all pub/subs for this " + "node have been removed. Report this issue.", + entity.node_name().c_str() + ); + // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in + // graph_topics_. } + ns_it->second.erase(entity.node_name()); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", - node->name.c_str() + entity.node_name().c_str() ); - } else if (entity == "MP") { - // Publisher - if (node->pubs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of publishers and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->pubs.begin(); - for (; erase_it != found_node->pubs.end(); ++erase_it) { - const auto & pub = *erase_it; - if (pub.topic == node->pubs.at(0).topic && - pub.type == node->pubs.at(0).type && - pub.qos == node->pubs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->pubs.end()) { - found_node->pubs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->pub_count_ == 1 && topic_it->second->sub_count_ == 0) { - // The last publisher was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->pub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed publisher %s from node /%s in the graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "MS") { - // Subscription - if (node->subs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of subscriptions and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->subs.begin(); - for (; erase_it != found_node->subs.end(); ++erase_it) { - const auto & sub = *erase_it; - if (sub.topic == node->subs.at(0).topic && - sub.type == node->subs.at(0).type && - sub.qos == node->subs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->subs.end()) { - found_node->subs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { - // The last subscription was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->sub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed subscription %s from node /%s in the graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); return; } + + if (!entity.topic_info().has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.", + keyexpr.c_str()); + return; + } + + // Update the graph based on the entity. + remove_topic_data(entity, *(node_it->second), true); } ///============================================================================= @@ -543,8 +394,8 @@ rmw_ret_t GraphCache::get_node_names( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); size_t nodes_number = 0; - for (auto it = graph_.begin(); it != graph_.end(); ++it) { - nodes_number += it->second.size(); + for (const std::pair & it : graph_) { + nodes_number += it.second.size(); } rcutils_ret_t rcutils_ret = @@ -600,11 +451,13 @@ rmw_ret_t GraphCache::get_node_names( // Fill node names, namespaces and enclaves. std::size_t j = 0; - for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { + for (NamespaceMap::const_iterator ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { const std::string & ns = ns_it->first; - for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { - const auto node = node_it->second; - node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); + for (NodeMap::const_iterator node_it = ns_it->second.begin(); node_it != ns_it->second.end(); + ++node_it) + { + const GraphNodePtr node = node_it->second; + node_names->data[j] = rcutils_strdup(node->name_.c_str(), *allocator); if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; } @@ -615,7 +468,7 @@ rmw_ret_t GraphCache::get_node_names( } if (enclaves) { enclaves->data[j] = rcutils_strdup( - node->enclave.c_str(), *allocator); + node->enclave_.c_str(), *allocator); if (!enclaves->data[j]) { return RMW_RET_BAD_ALLOC; } @@ -664,7 +517,7 @@ _demangle_if_ros_type(const std::string & dds_type_string) rmw_ret_t GraphCache::get_topic_names_and_types( rcutils_allocator_t * allocator, bool no_demangle, - rmw_names_and_types_t * topic_names_and_types) + rmw_names_and_types_t * topic_names_and_types) const { static_cast(no_demangle); RCUTILS_CHECK_ALLOCATOR_WITH_MSG( @@ -686,43 +539,70 @@ rmw_ret_t GraphCache::get_topic_names_and_types( }); // Fill topic names and types. - std::size_t j = 0; - for (const auto & it : graph_topics_) { - // Split based on "?". - // TODO(Yadunund): Be more systematic about this. - // TODO(clalancette): Rather than doing the splitting here, should we store - // it in graph_topics_ already split? - std::vector parts = split_keyexpr(it.first, '?'); - if (parts.size() < 2) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid topic_key %s", it.first.c_str()); - return RMW_RET_INVALID_ARGUMENT; - } - - topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator); - if (!topic_names_and_types->names.data[j]) { + std::size_t index = 0; + for (const std::pair & item : graph_topics_) { + topic_names_and_types->names.data[index] = rcutils_strdup(item.first.c_str(), *allocator); + if (!topic_names_and_types->names.data[index]) { return RMW_RET_BAD_ALLOC; } - - // TODO(clalancette): This won't work if there are multiple types on the same topic - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - &topic_names_and_types->types[j], 1, allocator); - if (RCUTILS_RET_OK != rcutils_ret) { - RMW_SET_ERROR_MSG(rcutils_get_error_string().str); - return RMW_RET_BAD_ALLOC; + { + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &topic_names_and_types->types[index], + item.second.size(), + allocator); + if (RCUTILS_RET_OK != rcutils_ret) { + RMW_SET_ERROR_MSG(rcutils_get_error_string().str); + return RMW_RET_BAD_ALLOC; + } } - - topic_names_and_types->types[j].data[0] = rcutils_strdup( - _demangle_if_ros_type(parts[1]).c_str(), *allocator); - if (!topic_names_and_types->types[j].data[0]) { - return RMW_RET_BAD_ALLOC; + size_t type_index = 0; + for (const std::pair & type : item.second) { + char * type_name = rcutils_strdup(_demangle_if_ros_type(type.first).c_str(), *allocator); + if (!type_name) { + RMW_SET_ERROR_MSG("failed to allocate memory for type name"); + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->types[index].data[type_index] = type_name; + ++type_index; } - - ++j; + ++index; } cleanup_names_and_types.cancel(); return RMW_RET_OK; } + +///============================================================================= +rmw_ret_t GraphCache::count_publishers( + const char * topic_name, + size_t * count) const +{ + *count = 0; + + if (graph_topics_.count(topic_name) != 0) { + for (const std::pair & it : graph_topics_.at(topic_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.pub_count_; + } + } + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t GraphCache::count_subscriptions( + const char * topic_name, + size_t * count) const +{ + *count = 0; + + if (graph_topics_.count(topic_name) != 0) { + for (const std::pair & it : graph_topics_.at(topic_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.sub_count_; + } + } + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2de0018..de78dd96 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -15,15 +15,16 @@ #ifndef DETAIL__GRAPH_CACHE_HPP_ #define DETAIL__GRAPH_CACHE_HPP_ -#include - #include #include #include #include #include +#include #include +#include "liveliness_utils.hpp" + #include "rcutils/allocator.h" #include "rcutils/types.h" @@ -32,66 +33,42 @@ ///============================================================================= -class GenerateToken +struct TopicStats { -public: - static std::string liveliness(size_t domain_id); - - /// Returns a string with key-expression @ros2_lv/domain_id/N/namespace/name - static std::string node( - size_t domain_id, - const std::string & namespace_, - const std::string & name); - - static std::string publisher( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos); - - static std::string subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos); + std::size_t pub_count_; + std::size_t sub_count_; + + // Constructor which initializes counters to 0. + TopicStats(std::size_t pub_count, std::size_t sub_count); }; ///============================================================================= -/// Helper utilities to put/delete tokens until liveliness is supported in the -/// zenoh-c bindings. -class PublishToken +struct TopicData { -public: - static bool put( - z_owned_session_t * session, - const std::string & token); + liveliness::TopicInfo info_; + TopicStats stats_; - static bool del( - z_owned_session_t * session, - const std::string & token); + TopicData( + liveliness::TopicInfo info, + TopicStats stats); }; +using TopicDataPtr = std::shared_ptr; ///============================================================================= // TODO(Yadunund): Expand to services and clients. struct GraphNode { - struct TopicData - { - std::string topic; - std::string type; - std::string qos; - }; - - std::string ns; - std::string name; + std::string ns_; + std::string name_; // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? - std::string enclave; - std::vector pubs; - std::vector subs; + std::string enclave_; + + // Map topic type to TopicData + using TopicDataMap = std::unordered_map; + // Map topic name to TopicDataMap + using TopicMap = std::unordered_map; + TopicMap pubs_ = {}; + TopicMap subs_ = {}; }; using GraphNodePtr = std::shared_ptr; @@ -113,7 +90,15 @@ class GraphCache final rmw_ret_t get_topic_names_and_types( rcutils_allocator_t * allocator, bool no_demangle, - rmw_names_and_types_t * topic_names_and_types); + rmw_names_and_types_t * topic_names_and_types) const; + + rmw_ret_t count_publishers( + const char * topic_name, + size_t * count) const; + + rmw_ret_t count_subscriptions( + const char * topic_name, + size_t * count) const; private: /* @@ -137,20 +122,14 @@ class GraphCache final namespace_2: node_n: */ + + using NodeMap = std::unordered_map; + using NamespaceMap = std::unordered_map; // Map namespace to a map of . - std::unordered_map> graph_ = {}; - - // Optimize topic lookups mapping "topic_name?topic_type" keys to their pub/sub counts. - struct TopicStats - { - std::size_t pub_count_; - std::size_t sub_count_; - - // Constructor which initialized counters to 0. - TopicStats(std::size_t pub_count, std::size_t sub_count); - }; - using TopicStatsPtr = std::unique_ptr; - std::unordered_map graph_topics_ = {}; + NamespaceMap graph_ = {}; + + // Optimize topic lookups across the graph. + GraphNode::TopicMap graph_topics_ = {}; mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp new file mode 100644 index 00000000..204f515d --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -0,0 +1,362 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "liveliness_utils.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" +#include "rcutils/logging_macros.h" + + +namespace liveliness +{ + +///============================================================================= +NodeInfo::NodeInfo( + std::size_t domain_id, + std::string ns, + std::string name, + std::string enclave) +: domain_id_(std::move(domain_id)), + ns_(std::move(ns)), + name_(std::move(name)), + enclave_(std::move(enclave)) +{ + // Do nothing. +} + +///============================================================================= +TopicInfo::TopicInfo( + std::string name, + std::string type, + std::string qos) +: name_(std::move(name)), + type_(std::move(type)), + qos_(std::move(qos)) +{ + // Do nothing. +} + +///============================================================================= +namespace +{ + +/// The admin space used to prefix the liveliness tokens. +static const char ADMIN_SPACE[] = "@ros2_lv"; +static const char NODE_STR[] = "NN"; +static const char PUB_STR[] = "MP"; +static const char SUB_STR[] = "MS"; +static const char SRV_STR[] = "SS"; +static const char CLI_STR[] = "SC"; + +static const std::unordered_map entity_to_str = { + {EntityType::Node, NODE_STR}, + {EntityType::Publisher, PUB_STR}, + {EntityType::Subscription, SUB_STR}, + {EntityType::Service, SRV_STR}, + {EntityType::Client, CLI_STR} +}; + +static const std::unordered_map str_to_entity = { + {NODE_STR, EntityType::Node}, + {PUB_STR, EntityType::Publisher}, + {SUB_STR, EntityType::Subscription}, + {SRV_STR, EntityType::Service}, + {CLI_STR, EntityType::Client} +}; + +} // namespace + +///============================================================================= +std::string subscription_token(size_t domain_id) +{ + std::string token = std::string(ADMIN_SPACE) + "/" + std::to_string(domain_id) + "/**"; + return token; +} + +///============================================================================= +Entity::Entity( + EntityType type, + NodeInfo node_info, + std::optional topic_info) +: type_(std::move(type)), + node_info_(std::move(node_info)), + topic_info_(std::move(topic_info)) +{ + /** + * Set the liveliness token for the particular entity. + * + * The liveliness token keyexprs are in the form: + * + * //// + * + * Where: + * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. + * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. + * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. + * - The ROS node name for this entity. + * + * For entities with topic infomation, the liveliness token keyexpr have additional fields: + * + * /////// + */ + std::stringstream token_ss; + const std::string & ns = node_info_.ns_; + token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << entity_to_str.at(type_) << ns; + // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". + // Hence we add an "_" to denote an empty namespace such that splitting the key + // will always result in 5 parts. + if (ns == "/") { + token_ss << "_/"; + } else { + token_ss << "/"; + } + // Finally append node name. + token_ss << node_info_.name_; + // If this entity has a topic info, append it to the token. + if (topic_info_.has_value()) { + const auto & topic_info = this->topic_info_.value(); + // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". + token_ss << topic_info.name_ + "/" + topic_info.type_ + "/" + topic_info.qos_; + } + + this->keyexpr_ = token_ss.str(); +} + +///============================================================================= +std::optional Entity::make( + EntityType type, + NodeInfo node_info, + std::optional topic_info) +{ + if (entity_to_str.find(type) == entity_to_str.end()) { + RCUTILS_SET_ERROR_MSG("Invalid entity type."); + return std::nullopt; + } + if (node_info.ns_.empty() || node_info.name_.empty()) { + RCUTILS_SET_ERROR_MSG("Invalid node_info for entity."); + return std::nullopt; + } + if (type != EntityType::Node && !topic_info.has_value()) { + RCUTILS_SET_ERROR_MSG("Invalid topic_info for entity."); + return std::nullopt; + } + + Entity entity{std::move(type), std::move(node_info), std::move(topic_info)}; + return entity; +} + +namespace +{ +///============================================================================= +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') +{ + std::vector delim_idx = {}; + // Insert -1 for starting position to make the split easier when using substr. + delim_idx.push_back(-1); + std::size_t idx = 0; + for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { + if (*it == delim) { + delim_idx.push_back(idx); + } + ++idx; + } + std::vector result = {}; + try { + for (std::size_t i = 1; i < delim_idx.size(); ++i) { + const size_t prev_idx = delim_idx[i - 1]; + const size_t idx = delim_idx[i]; + result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); + } + } catch (const std::exception & e) { + printf("%s\n", e.what()); + return {}; + } + // Finally add the last substr. + result.push_back(keyexpr.substr(delim_idx.back() + 1)); + return result; +} + +} // namespace + +///============================================================================= +std::optional Entity::make(const std::string & keyexpr) +{ + std::vector parts = split_keyexpr(keyexpr); + // A token will contain at least 5 parts: + // (ADMIN_SPACE, domain_id, entity_str, namespace, node_name). + // Basic validation. + if (parts.size() < 5) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + for (const std::string & p : parts) { + if (p.empty()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + } + + if (parts[0] != ADMIN_SPACE) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid admin space."); + return std::nullopt; + } + + // Get the entity, ie NN, MP, MS, SS, SC. + std::string & entity_str = parts[2]; + std::unordered_map::const_iterator entity_it = + str_to_entity.find(entity_str); + if (entity_it == str_to_entity.end()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid entity."); + return std::nullopt; + } + + EntityType entity_type = entity_it->second; + std::size_t domain_id = std::stoul(parts[1]); + std::string ns = parts[3] == "_" ? "/" : "/" + std::move(parts[3]); + std::string node_name = std::move(parts[4]); + std::optional topic_info = std::nullopt; + + // Populate topic_info if we have a token for an entity other than a node. + if (entity_type != EntityType::Node) { + if (parts.size() < 8) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token for non-node entity without required parameters."); + return std::nullopt; + } + topic_info = TopicInfo{ + "/" + std::move(parts[5]), + std::move(parts[6]), + std::move(parts[7])}; + } + + return Entity{ + std::move(entity_type), + NodeInfo{std::move(domain_id), std::move(ns), std::move(node_name), ""}, + std::move(topic_info)}; +} + +///============================================================================= +EntityType Entity::type() const +{ + return this->type_; +} + +std::string Entity::node_namespace() const +{ + return this->node_info_.ns_; +} + +std::string Entity::node_name() const +{ + return this->node_info_.name_; +} + +std::string Entity::node_enclave() const +{ + return this->node_info_.enclave_; +} + +///============================================================================= +std::optional Entity::topic_info() const +{ + return this->topic_info_; +} + +///============================================================================= +std::string Entity::keyexpr() const +{ + return this->keyexpr_; +} + +///============================================================================= +bool PublishToken::put( + z_owned_session_t * session, + const std::string & token) +{ + if (!z_session_check(session)) { + RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); + return false; + } + + // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. + z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); + auto drop_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); + return false; + } + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); + z_put_options_t options = z_put_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); + if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { + RCUTILS_SET_ERROR_MSG("unable to publish liveliness for node creation"); + return false; + } + + return true; +} + +///============================================================================= +bool PublishToken::del( + z_owned_session_t * session, + const std::string & token) +{ + if (!z_session_check(session)) { + RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); + return false; + } + + // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. + z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); + auto drop_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); + return false; + } + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); + const z_delete_options_t options = z_delete_options_default(); + if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { + RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); + return false; + } + + return true; +} + +} // namespace liveliness diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp new file mode 100644 index 00000000..90f76e89 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -0,0 +1,133 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__LIVELINESS_UTILS_HPP_ +#define DETAIL__LIVELINESS_UTILS_HPP_ + +#include + +#include +#include +#include + + +namespace liveliness +{ + +///============================================================================= +struct NodeInfo +{ + std::size_t domain_id_; + std::string ns_; + std::string name_; + std::string enclave_; + + NodeInfo( + std::size_t domain_id, + std::string ns, + std::string name, + std::string enclave); +}; + +///============================================================================= +struct TopicInfo +{ + std::string name_; + std::string type_; + std::string qos_; + + TopicInfo( + std::string name, + std::string type, + std::string qos); +}; + +///============================================================================= +/// Retuns the keyexpr for liveliness subscription. +std::string subscription_token(size_t domain_id); + +///============================================================================= +enum class EntityType : uint8_t +{ + Invalid = 0, + Node, + Publisher, + Subscription, + Service, + Client +}; + +///============================================================================= +// An struct to bundle results of parsing a token. +// TODO(Yadunund): Consider using variadic templates to pass args instead of +// relying on optional fields. +class Entity +{ +public: + /// Make an Entity from datatypes. This will return nullopt if the required + /// fields are not present for the EntityType. + // TODO(Yadunund): Find a way to better bundle the type and the associated data. + static std::optional make( + EntityType type, + NodeInfo node_info, + std::optional topic_info = std::nullopt); + + /// Make an Entity from a liveliness keyexpr. + static std::optional make(const std::string & keyexpr); + + /// Get the entity type. + EntityType type() const; + + std::string node_namespace() const; + + std::string node_name() const; + + std::string node_enclave() const; + + /// Get the topic_info. + std::optional topic_info() const; + + /// Get the liveliness keyexpr for this entity. + std::string keyexpr() const; + +private: + Entity( + EntityType type, + NodeInfo node_info, + std::optional topic_info); + + EntityType type_; + NodeInfo node_info_; + std::optional topic_info_; + std::string keyexpr_; +}; + +///============================================================================= +/// Helper utilities to put/delete tokens until liveliness is supported in the +/// zenoh-c bindings. +class PublishToken +{ +public: + static bool put( + z_owned_session_t * session, + const std::string & token); + + static bool del( + z_owned_session_t * session, + const std::string & token); +}; + +} // namespace liveliness + +#endif // DETAIL__LIVELINESS_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 51f22e3c..1e6da94c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -18,6 +18,7 @@ #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "detail/zenoh_router_check.hpp" @@ -246,7 +247,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) }); // Setup liveliness subscriptions for discovery. - const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); + const std::string liveliness_str = liveliness::subscription_token(context->actual_domain_id); // Query router/liveliness participants to get graph information before this session was started. RCUTILS_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4de21163..4fa0ebbb 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -25,6 +25,7 @@ #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" #include "detail/message_type_support.hpp" #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" @@ -187,16 +188,25 @@ rmw_create_node( // Publish to the graph that a new node is in town // const bool pub_result = PublishToken::put( // &node->context->impl->session, - // GenerateToken::node(context->actual_domain_id, namespace_, name) + // liveliness::GenerateToken::node(context->actual_domain_id, namespace_, name) // ); // if (!pub_result) { // return nullptr; // } // Initialize liveliness token for the node to advertise that a new node is in town. rmw_node_data_t * node_data = static_cast(node->data); + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Node, + liveliness::NodeInfo{context->actual_domain_id, namespace_, name, ""}); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the node."); + return nullptr; + } node_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -241,7 +251,8 @@ rmw_destroy_node(rmw_node_t * node) // Publish to the graph that a node has ridden off into the sunset // const bool del_result = PublishToken::del( // &node->context->impl->session, - // GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + // liveliness::GenerateToken::node(node->context->actual_domain_id, node->namespace_, + // node->name) // ); // if (!del_result) { // return RMW_RET_ERROR; @@ -555,7 +566,7 @@ rmw_create_publisher( // TODO(Yadunund): Publish liveliness for the new publisher. // const bool pub_result = PublishToken::put( // &node->context->impl->session, - // GenerateToken::publisher( + // liveliness::GenerateToken::publisher( // node->context->actual_domain_id, // node->namespace_, // node->name, @@ -566,16 +577,21 @@ rmw_create_publisher( // if (!pub_result) { // return nullptr; // } + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Publisher, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_publisher->topic_name, + publisher_data->type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the publisher."); + return nullptr; + } publisher_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr( - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable").c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -632,7 +648,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // Publish to the graph that a publisher has ridden off into the sunset // const bool del_result = PublishToken::del( // &node->context->impl->session, - // GenerateToken::publisher( + // liveliness::GenerateToken::publisher( // node->context->actual_domain_id, // node->namespace_, // node->name, @@ -1253,16 +1269,21 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Subscription, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_subscription->topic_name, + sub_data->type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the subscription."); + return nullptr; + } sub_data->token = zc_liveliness_declare_token( z_loan(context_impl->session), - z_keyexpr( - GenerateToken::subscription( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_subscription->topic_name, - sub_data->type_support->get_name(), - "reliable").c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -2133,10 +2154,26 @@ rmw_count_publishers( const char * topic_name, size_t * count) { - static_cast(node); - static_cast(topic_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_publishers(topic_name, count); } //============================================================================== @@ -2147,10 +2184,26 @@ rmw_count_subscribers( const char * topic_name, size_t * count) { - static_cast(node); - static_cast(topic_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_subscriptions(topic_name, count); } //==============================================================================