From 9a831dc5d77570fc70601f3ea72c76e144c6cf13 Mon Sep 17 00:00:00 2001 From: Weijia Song Date: Sun, 28 Jul 2024 22:27:10 -0400 Subject: [PATCH 1/2] Add ServiceClient::get_my_shard() API. --- include/cascade/detail/service_impl.hpp | 48 +++++++++++++++++++++++++ include/cascade/service.hpp | 28 +++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/include/cascade/detail/service_impl.hpp b/include/cascade/detail/service_impl.hpp index 98e32f81..e4f1cae5 100644 --- a/include/cascade/detail/service_impl.hpp +++ b/include/cascade/detail/service_impl.hpp @@ -335,6 +335,54 @@ uint32_t ServiceClient::get_number_of_shards ( return get_number_of_shards(opm.subgroup_type_index,opm.subgroup_index); } +template +template +int32_t ServiceClient::get_my_shard(uint32_t subgroup_index) const { + if (!is_external_client()) { + return group_ptr->template get_my_shard(subgroup_index); + } else { + return -1; + } +} + +template +template +int32_t ServiceClient::type_recursive_get_my_shard ( + uint32_t type_index,uint32_t subgroup_index) const { + if (type_index == 0) { + return this->template get_my_shard(subgroup_index); + } else { + return this->template type_recursive_get_number_of_shards(type_index-1,subgroup_index); + } +} + +template +template +int32_t ServiceClient::type_recursive_get_my_shard ( + uint32_t type_index, uint32_t subgroup_index) const { + if (type_index == 0) { + return this->template get_my_shard(subgroup_index); + } else { + throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + " type index is out of boundary"); + } +} + +template +int32_t ServiceClient::get_my_shard ( + uint32_t subgroup_type_index, uint32_t subgroup_index) const { + return type_recursive_get_my_shard(subgroup_type_index,subgroup_index); +} + +template +int32_t ServiceClient::get_my_shard ( + const std::string& object_pool_pathname) { + auto opm = find_object_pool(object_pool_pathname); + if (!opm.is_valid() || opm.is_null() || opm.deleted) { + throw derecho::derecho_exception("Failed to find object_pool:" + object_pool_pathname); + } + return get_my_shard(opm.subgroup_type_index,opm.subgroup_index); +} + template template void ServiceClient::set_member_selection_policy(uint32_t subgroup_index,uint32_t shard_index, diff --git a/include/cascade/service.hpp b/include/cascade/service.hpp index dbc51f29..63190fd6 100644 --- a/include/cascade/service.hpp +++ b/include/cascade/service.hpp @@ -603,6 +603,9 @@ namespace cascade { * shard index. * - get_number_of_subgroups returns the number of subgroups of a given type * - get_number_of_shards returns the number of shards of a given subgroup + * - get_my_shard returns the shard number that this node is a member of in the specific + * subgroup (by subgroup type and index), or -1 if this node is not a member + * of any shard in the specified subgroup. * During view change, the Client might experience failure if the member is gone. In such a case, the client needs * refresh its local member cache by calling get_shard_members. */ @@ -657,6 +660,31 @@ namespace cascade { * @param[in] object_pool_pathname - the object pool name */ uint32_t get_number_of_shards(const std::string& object_pool_pathname); + + template + int32_t get_my_shard(uint32_t subgroup_index) const; + protected: + template + int32_t type_recursive_get_my_shard(uint32_t type_index, uint32_t subgroup_index) const; + template + int32_t type_recursive_get_my_shard(uint32_t type_index, uint32_t subgroup_index) const; + public: + /** + * @fn int32_t get_my_shard(uint32_t subgroup_type_index, uint32_t subgroup_index) const + * @brief find the shard I belong to, given the subgroup specified by type and index. + * @param[in] subgroup_type_index - the type index of the subgroup type. + * @param[in] subgroup_index - the subgroup index in the given type. + * @return The number of the shard, or -1 if current node is not in the specified subgroup. + */ + int32_t get_my_shard(uint32_t subgroup_type_index, uint32_t subgroup_index) const; + + /** + * @fn int32_t get_my_shard(const std::string& object_pool_pathname) + * @brief find the shard I belong to, given the object pool specified by object pool path name. + * @param[in] object_pool_pathname - the object pool path name. + * @return The number of the shard, or -1 if current node is not in the specified subgroup. + */ + int32_t get_my_shard(const std::string& object_pool_pathname); /** * Member selection policy control API. From edec60f70a9749ca4358bb5169fc26b3f99d1309 Mon Sep 17 00:00:00 2001 From: Weijia Song Date: Mon, 29 Jul 2024 15:05:56 -0400 Subject: [PATCH 2/2] bugfix, smoke tested. --- include/cascade/detail/service_impl.hpp | 2 +- .../tests/user_defined_logic/console_printer_udl.cpp | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/include/cascade/detail/service_impl.hpp b/include/cascade/detail/service_impl.hpp index e4f1cae5..82624c62 100644 --- a/include/cascade/detail/service_impl.hpp +++ b/include/cascade/detail/service_impl.hpp @@ -370,7 +370,7 @@ int32_t ServiceClient::type_recursive_get_my_shard ( template int32_t ServiceClient::get_my_shard ( uint32_t subgroup_type_index, uint32_t subgroup_index) const { - return type_recursive_get_my_shard(subgroup_type_index,subgroup_index); + return this->template type_recursive_get_my_shard(subgroup_type_index,subgroup_index); } template diff --git a/src/applications/tests/user_defined_logic/console_printer_udl.cpp b/src/applications/tests/user_defined_logic/console_printer_udl.cpp index ffc1dafe..c6fb6af5 100644 --- a/src/applications/tests/user_defined_logic/console_printer_udl.cpp +++ b/src/applications/tests/user_defined_logic/console_printer_udl.cpp @@ -26,6 +26,13 @@ class ConsolePrinterOCDPO: public OffCriticalDataPathObserver { uint32_t worker_id) override { std::cout << "[console printer ocdpo]: I(" << worker_id << ") received an object with key=" << key_string << ", matching prefix=" << key_string.substr(0,prefix_length) << std::endl; + auto* typed_ctxt = dynamic_cast(ctxt); + std::string prefix = key_string.substr(0,prefix_length); + std::cout << "[console printer ocdpo]: my shard @" + << prefix + << " is:" + << typed_ctxt->get_service_client_ref().get_my_shard(prefix) + << std::endl; } static std::shared_ptr ocdpo_ptr;