From a79163fc0a7467e6796df5dcbfce2740c9c19ac1 Mon Sep 17 00:00:00 2001 From: Weijia Song Date: Sat, 15 Jun 2024 12:37:35 -0400 Subject: [PATCH 1/2] Adding 'as_trigger' flag to put/put_and_forget If 'as_trigger' flag is specified, the object will only be used to trigger UDLs and will NOT apply to the K/V store. --- include/cascade/cascade_interface.hpp | 20 ++++--- include/cascade/detail/delta_store_core.hpp | 2 +- .../cascade/detail/delta_store_core_impl.hpp | 14 ++--- .../cascade/detail/persistent_store_impl.hpp | 22 ++++---- include/cascade/detail/service_impl.hpp | 54 ++++++++++--------- include/cascade/detail/trigger_store_impl.hpp | 8 +-- .../cascade/detail/volatile_store_impl.hpp | 40 +++++++------- include/cascade/persistent_store.hpp | 10 ++-- include/cascade/service.hpp | 36 +++++++++---- include/cascade/trigger_store.hpp | 8 +-- include/cascade/volatile_store.hpp | 10 ++-- .../include/cascade_dds/detail/dds_impl.hpp | 2 +- src/applications/standalone/dds/src/dds.cpp | 2 +- .../standalone/kvs_client/kvs_client.cpp | 2 +- .../cli_example.cpp | 4 +- .../cascade_as_subgroup_classes/perf.cpp | 4 +- src/service/client.cpp | 4 +- src/service/cs/cascade_client_cs.cpp | 10 ++-- src/service/perftest.cpp | 6 +-- src/service/python/cascade_client_py.cpp | 23 ++++---- 20 files changed, 159 insertions(+), 122 deletions(-) diff --git a/include/cascade/cascade_interface.hpp b/include/cascade/cascade_interface.hpp index 6cd571b4..4b1df133 100644 --- a/include/cascade/cascade_interface.hpp +++ b/include/cascade/cascade_interface.hpp @@ -91,16 +91,17 @@ class ICascadeStore { KT* InvKeyPtr = IK; VT* InvValPtr = IV; /** - * @brief put(const VT&) + * @brief put(const VT&, bool ) * * Put a value. VT must implement ICascadeObject interface. The key is given in value and retrieved by * ICascadeObject::get_key_ref() * - * @param[in] value The K/V pair value + * @param[in] value The K/V pair value + * @param[in] as_trigger The object will NOT be used to update the K/V state. * * @return a tuple including version number (version_t) and a timestamp in microseconds. */ - virtual version_tuple put(const VT& value) const = 0; + virtual version_tuple put(const VT& value, bool as_trigger) const = 0; /** * @brief put_and_forget(const VT&) @@ -109,10 +110,11 @@ class ICascadeStore { * ICascadeObject::get_key_ref(). This function ignores any return value. * * @param[in] value The K/V pair value + * @param[in] as_trigger The object will NOT be used to update the K/V state. * * @return void */ - virtual void put_and_forget(const VT& value) const = 0; + virtual void put_and_forget(const VT& value, bool as_trigger) const = 0; #ifdef ENABLE_EVALUATION /** @@ -336,18 +338,20 @@ class ICascadeStore { /** * @brief ordered_put * - * @param[in] value The K/V pair object. + * @param[in] value The K/V pair object. + * @param[in] as_trigger If true, the value will NOT apply to the K/V state. * * @return A tuple including version number (version_t) and a timestamp in microseconds. */ - virtual version_tuple ordered_put(const VT& value) = 0; + virtual version_tuple ordered_put(const VT& value, bool as_trigger) = 0; /** * @brief ordered_put_and_forget * - * @param[in] value The K/V pair object. + * @param[in] value The K/V pair object. + * @param[in] as_trigger If true, the value will NOT apply to the K/V state. */ - virtual void ordered_put_and_forget(const VT& value) = 0; + virtual void ordered_put_and_forget(const VT& value, bool as_trigger) = 0; /** * @brief ordered_remove diff --git a/include/cascade/detail/delta_store_core.hpp b/include/cascade/detail/delta_store_core.hpp index 3c3a9a6a..807379c7 100644 --- a/include/cascade/detail/delta_store_core.hpp +++ b/include/cascade/detail/delta_store_core.hpp @@ -78,7 +78,7 @@ class DeltaCascadeStoreCore : public mutils::ByteRepresentable, /** * Ordered put, and generate a delta. */ - virtual bool ordered_put(const VT& value, persistent::version_t prever); + virtual bool ordered_put(const VT& value, persistent::version_t prever, bool as_trigger); /** * Ordered remove, and generate a delta. */ diff --git a/include/cascade/detail/delta_store_core_impl.hpp b/include/cascade/detail/delta_store_core_impl.hpp index 91cc93fe..08f0646a 100644 --- a/include/cascade/detail/delta_store_core_impl.hpp +++ b/include/cascade/detail/delta_store_core_impl.hpp @@ -179,7 +179,7 @@ std::unique_ptr> DeltaCascadeStoreCore -bool DeltaCascadeStoreCore::ordered_put(const VT& value, persistent::version_t prev_ver) { +bool DeltaCascadeStoreCore::ordered_put(const VT& value, persistent::version_t prev_ver, bool as_trigger) { // call validator if constexpr(std::is_base_of, VT>::value) { if(!value.validate(this->kv_map)) { @@ -207,11 +207,13 @@ bool DeltaCascadeStoreCore::ordered_put(const VT& value, persist } value.set_previous_version(prev_ver, prev_ver_by_key); } - // create delta. - assert(this->delta.empty()); - this->delta.push_back(value.get_key_ref()); - // apply_ordered_put - apply_ordered_put(value); + if (!as_trigger) { + // create delta. + assert(this->delta.empty()); + this->delta.push_back(value.get_key_ref()); + // apply_ordered_put + apply_ordered_put(value); + } return true; } diff --git a/include/cascade/detail/persistent_store_impl.hpp b/include/cascade/detail/persistent_store_impl.hpp index 88d0465b..10355312 100644 --- a/include/cascade/detail/persistent_store_impl.hpp +++ b/include/cascade/detail/persistent_store_impl.hpp @@ -24,12 +24,12 @@ namespace derecho { namespace cascade { template -version_tuple PersistentCascadeStore::put(const VT& value) const { +version_tuple PersistentCascadeStore::put(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref()={}", value.get_key_ref()); LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_START, group, value); derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); - auto results = subgroup_handle.template ordered_send(value); + auto results = subgroup_handle.template ordered_send(value, as_trigger); auto& replies = results.get(); version_tuple ret{CURRENT_VERSION, 0}; // TODO: verfiy consistency ? @@ -43,12 +43,12 @@ version_tuple PersistentCascadeStore::put(const VT& value) c } template -void PersistentCascadeStore::put_and_forget(const VT& value) const { +void PersistentCascadeStore::put_and_forget(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref()={}", value.get_key_ref()); LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_AND_FORGET_START, group, value); derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); - subgroup_handle.template ordered_send(value); + subgroup_handle.template ordered_send(value, as_trigger); LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_AND_FORGET_END, group, value); debug_leave_func(); @@ -497,7 +497,7 @@ std::vector PersistentCascadeStore::list_keys_by_time(co } template -version_tuple PersistentCascadeStore::ordered_put(const VT& value) { +version_tuple PersistentCascadeStore::ordered_put(const VT& value,bool as_trigger) { debug_enter_func_with_args("key={}", value.get_key_ref()); auto version_and_hlc = group->template get_subgroup(this->subgroup_index).get_current_version(); @@ -507,7 +507,7 @@ version_tuple PersistentCascadeStore::ordered_put(const VT& LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_ORDERED_PUT_START, group, value, std::get<0>(version_and_hlc)); #endif version_tuple version_and_timestamp{persistent::INVALID_VERSION,0}; - if(this->internal_ordered_put(value) == true) { + if(this->internal_ordered_put(value,as_trigger) == true) { version_and_timestamp = {std::get<0>(version_and_hlc),std::get<1>(version_and_hlc).m_rtc_us}; } @@ -524,7 +524,7 @@ version_tuple PersistentCascadeStore::ordered_put(const VT& } template -void PersistentCascadeStore::ordered_put_and_forget(const VT& value) { +void PersistentCascadeStore::ordered_put_and_forget(const VT& value,bool as_trigger) { debug_enter_func_with_args("key={}", value.get_key_ref()); #ifdef ENABLE_EVALUATION auto version_and_hlc = group->template get_subgroup(this->subgroup_index).get_current_version(); @@ -536,7 +536,7 @@ void PersistentCascadeStore::ordered_put_and_forget(const VT LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_ORDERED_PUT_AND_FORGET_START, group, value, std::get<0>(version_and_hlc)); #endif - this->internal_ordered_put(value); + this->internal_ordered_put(value,as_trigger); #if __cplusplus > 201703L LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_ORDERED_PUT_AND_FORGET_END, group, value, std::get<0>(version_and_hlc)); @@ -554,7 +554,7 @@ void PersistentCascadeStore::ordered_put_and_forget(const VT } template -bool PersistentCascadeStore::internal_ordered_put(const VT& value) { +bool PersistentCascadeStore::internal_ordered_put(const VT& value, bool as_trigger) { auto version_and_hlc = group->template get_subgroup(this->subgroup_index).get_current_version(); if constexpr(std::is_base_of::value) { value.set_version(std::get<0>(version_and_hlc)); @@ -562,13 +562,15 @@ bool PersistentCascadeStore::internal_ordered_put(const VT& if constexpr(std::is_base_of::value) { value.set_timestamp(std::get<1>(version_and_hlc).m_rtc_us); } - if(this->persistent_core->ordered_put(value, this->persistent_core.getLatestVersion()) == false) { + + if(this->persistent_core->ordered_put(value, this->persistent_core.getLatestVersion(), as_trigger) == false) { // verification failed. S we return invalid versions. debug_leave_func_with_value("version=0x{:x},timestamp={}us", std::get<0>(version_and_hlc), std::get<1>(version_and_hlc).m_rtc_us); return false; } + if(cascade_watcher_ptr) { (*cascade_watcher_ptr)( // group->template get_subgroup(this->subgroup_index).get_subgroup_id(), // this is subgroup id diff --git a/include/cascade/detail/service_impl.hpp b/include/cascade/detail/service_impl.hpp index 2d57455a..98e32f81 100644 --- a/include/cascade/detail/service_impl.hpp +++ b/include/cascade/detail/service_impl.hpp @@ -536,7 +536,8 @@ template derecho::rpc::QueryResults ServiceClient::put( const typename SubgroupType::ObjectType& value, uint32_t subgroup_index, - uint32_t shard_index) { + uint32_t shard_index, + bool as_trigger) { LOG_SERVICE_CLIENT_TIMESTAMP(TLT_SERVICE_CLIENT_PUT_START, (std::is_base_of::value?value.get_message_id():0)); if (!is_external_client()) { @@ -544,18 +545,18 @@ derecho::rpc::QueryResults ServiceClient::put( if (static_cast(group_ptr->template get_my_shard(subgroup_index)) == shard_index) { // ordered put as a shard member auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); - return subgroup_handle.template ordered_send(value); + return subgroup_handle.template ordered_send(value,as_trigger); } else { // p2p put node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); try { // as a subgroup member auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); - return subgroup_handle.template p2p_send(node_id,value); + return subgroup_handle.template p2p_send(node_id,value,as_trigger); } catch (derecho::invalid_subgroup_exception& ex) { // as an external caller auto& subgroup_handle = group_ptr->template get_nonmember_subgroup(subgroup_index); - return subgroup_handle.template p2p_send(node_id,value); + return subgroup_handle.template p2p_send(node_id,value,as_trigger); } } } else { @@ -563,7 +564,7 @@ derecho::rpc::QueryResults ServiceClient::put( // call as an external client (ExternalClientCaller). auto& caller = external_group_ptr->template get_subgroup_caller(subgroup_index); node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); - return caller.template p2p_send(node_id,value); + return caller.template p2p_send(node_id,value,as_trigger); } } @@ -573,11 +574,12 @@ derecho::rpc::QueryResults ServiceClient::type_r uint32_t type_index, const ObjectType& value, uint32_t subgroup_index, - uint32_t shard_index) { + uint32_t shard_index, + bool as_trigger) { if (type_index == 0) { - return this->template put(value,subgroup_index,shard_index); + return this->template put(value,subgroup_index,shard_index,as_trigger); } else { - return this->template type_recursive_put(type_index-1,value,subgroup_index,shard_index); + return this->template type_recursive_put(type_index-1,value,subgroup_index,shard_index,as_trigger); } } @@ -587,9 +589,10 @@ derecho::rpc::QueryResults ServiceClient::type_r uint32_t type_index, const ObjectType& value, uint32_t subgroup_index, - uint32_t shard_index) { + uint32_t shard_index, + bool as_trigger) { if (type_index == 0) { - return this->template put(value,subgroup_index,shard_index); + return this->template put(value,subgroup_index,shard_index,as_trigger); } else { throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + ": type index is out of boundary."); } @@ -598,7 +601,7 @@ derecho::rpc::QueryResults ServiceClient::type_r template template derecho::rpc::QueryResults ServiceClient::put( - const ObjectType& value) { + const ObjectType& value, bool as_trigger) { // STEP 1 - get key if constexpr (!std::is_base_of_v,ObjectType>) { @@ -610,7 +613,7 @@ derecho::rpc::QueryResults ServiceClient::put( std::tie(subgroup_type_index,subgroup_index,shard_index) = this->template key_to_shard(value.get_key_ref()); // STEP 3 - call recursive put - return this->template type_recursive_put(subgroup_type_index,value,subgroup_index,shard_index); + return this->template type_recursive_put(subgroup_type_index,value,subgroup_index,shard_index,as_trigger); } template @@ -618,7 +621,8 @@ template void ServiceClient::put_and_forget( const typename SubgroupType::ObjectType& value, uint32_t subgroup_index, - uint32_t shard_index) { + uint32_t shard_index, + bool as_trigger) { LOG_SERVICE_CLIENT_TIMESTAMP(TLT_SERVICE_CLIENT_PUT_AND_FORGET_START, (std::is_base_of::value?value.get_message_id():0)); if (!is_external_client()) { @@ -626,18 +630,18 @@ void ServiceClient::put_and_forget( if (static_cast(group_ptr->template get_my_shard(subgroup_index)) == shard_index) { // do ordered put as a shard member (Replicated). auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); - subgroup_handle.template ordered_send(value); + subgroup_handle.template ordered_send(value,as_trigger); } else { node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); // do p2p put try{ // as a subgroup member auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); - subgroup_handle.template p2p_send(node_id,value); + subgroup_handle.template p2p_send(node_id,value,as_trigger); } catch (derecho::invalid_subgroup_exception& ex) { // as an external caller auto& subgroup_handle = group_ptr->template get_nonmember_subgroup(subgroup_index); - subgroup_handle.template p2p_send(node_id,value); + subgroup_handle.template p2p_send(node_id,value,as_trigger); } } } else { @@ -645,7 +649,7 @@ void ServiceClient::put_and_forget( // call as an external client (ExternalClientCaller). auto& caller = external_group_ptr->template get_subgroup_caller(subgroup_index); node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); - caller.template p2p_send(node_id,value); + caller.template p2p_send(node_id,value,as_trigger); } } @@ -655,11 +659,12 @@ void ServiceClient::type_recursive_put_and_forget( uint32_t type_index, const ObjectType& value, uint32_t subgroup_index, - uint32_t shard_index) { + uint32_t shard_index, + bool as_trigger) { if (type_index == 0) { - put_and_forget(value,subgroup_index,shard_index); + put_and_forget(value,subgroup_index,shard_index,as_trigger); } else { - type_recursive_put_and_forget(type_index-1,value,subgroup_index,shard_index); + type_recursive_put_and_forget(type_index-1,value,subgroup_index,shard_index,as_trigger); } } @@ -669,9 +674,10 @@ void ServiceClient::type_recursive_put_and_forget( uint32_t type_index, const ObjectType& value, uint32_t subgroup_index, - uint32_t shard_index) { + uint32_t shard_index, + bool as_trigger) { if (type_index == 0) { - put_and_forget(value,subgroup_index,shard_index); + put_and_forget(value,subgroup_index,shard_index,as_trigger); } else { throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + ": type index is out of boundary."); } @@ -679,7 +685,7 @@ void ServiceClient::type_recursive_put_and_forget( template template -void ServiceClient::put_and_forget(const ObjectType& value) { +void ServiceClient::put_and_forget(const ObjectType& value,bool as_trigger) { // STEP 1 - get key if constexpr (!std::is_base_of_v,ObjectType>) { throw derecho::derecho_exception(__PRETTY_FUNCTION__ + std::string(" only supports object of type ICascadeObject,but we get ") + typeid(ObjectType).name()); @@ -690,7 +696,7 @@ void ServiceClient::put_and_forget(const ObjectType& value) { std::tie(subgroup_type_index,subgroup_index,shard_index) = this->template key_to_shard(value.get_key_ref()); // STEP 3 - call recursive put_and_forget - this->template type_recursive_put_and_forget(subgroup_type_index,value,subgroup_index,shard_index); + this->template type_recursive_put_and_forget(subgroup_type_index,value,subgroup_index,shard_index,as_trigger); } template diff --git a/include/cascade/detail/trigger_store_impl.hpp b/include/cascade/detail/trigger_store_impl.hpp index 86bc399d..0f85aaf0 100644 --- a/include/cascade/detail/trigger_store_impl.hpp +++ b/include/cascade/detail/trigger_store_impl.hpp @@ -18,13 +18,13 @@ namespace derecho { namespace cascade { template -version_tuple TriggerCascadeNoStore::put(const VT& value) const { +version_tuple TriggerCascadeNoStore::put(const VT& value, bool as_trigger) const { dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); return {persistent::INVALID_VERSION, 0}; } template -void TriggerCascadeNoStore::put_and_forget(const VT& value) const { +void TriggerCascadeNoStore::put_and_forget(const VT& value, bool as_trigger) const { dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); } @@ -103,13 +103,13 @@ std::vector TriggerCascadeNoStore::ordered_list_keys(const s } template -version_tuple TriggerCascadeNoStore::ordered_put(const VT& value) { +version_tuple TriggerCascadeNoStore::ordered_put(const VT& value, bool as_trigger) { dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); return {persistent::INVALID_VERSION, 0}; } template -void TriggerCascadeNoStore::ordered_put_and_forget(const VT& value) { +void TriggerCascadeNoStore::ordered_put_and_forget(const VT& value, bool as_trigger) { dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); } diff --git a/include/cascade/detail/volatile_store_impl.hpp b/include/cascade/detail/volatile_store_impl.hpp index 01b2ae15..69e90c8a 100644 --- a/include/cascade/detail/volatile_store_impl.hpp +++ b/include/cascade/detail/volatile_store_impl.hpp @@ -17,12 +17,12 @@ namespace derecho { namespace cascade { template -version_tuple VolatileCascadeStore::put(const VT& value) const { +version_tuple VolatileCascadeStore::put(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref={}", value.get_key_ref()); LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_START, group, value); derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); - auto results = subgroup_handle.template ordered_send(value); + auto results = subgroup_handle.template ordered_send(value,as_trigger); auto& replies = results.get(); version_tuple ret{CURRENT_VERSION, 0}; // TODO: verfiy consistency ? @@ -36,12 +36,12 @@ version_tuple VolatileCascadeStore::put(const VT& value) const { } template -void VolatileCascadeStore::put_and_forget(const VT& value) const { +void VolatileCascadeStore::put_and_forget(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref={}", value.get_key_ref()); LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_AND_FORGET_START, group, value); derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); - subgroup_handle.template ordered_send(value); + subgroup_handle.template ordered_send(value,as_trigger); LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_AND_FORGET_END, group, value); debug_leave_func(); @@ -67,12 +67,12 @@ double internal_perf_put(derecho::Replicated& subgroup_handle, cons uint64_t start_ns = now_ns; uint64_t end_ns = now_ns + duration_sec * INT64_1E9; while(end_ns > now_ns) { - subgroup_handle.template ordered_send(objects.at(now_ns % num_distinct_objects)); + subgroup_handle.template ordered_send(objects.at(now_ns % num_distinct_objects),false); now_ns = get_walltime(); num_messages_sent++; } // send a normal put - auto results = subgroup_handle.template ordered_send(objects.at(now_ns % num_distinct_objects)); + auto results = subgroup_handle.template ordered_send(objects.at(now_ns % num_distinct_objects),false); auto& replies = results.get(); version_tuple ret(CURRENT_VERSION, 0); // TODO: verfiy consistency ? @@ -367,7 +367,7 @@ std::vector VolatileCascadeStore::ordered_list_keys(const st } template -version_tuple VolatileCascadeStore::ordered_put(const VT& value) { +version_tuple VolatileCascadeStore::ordered_put(const VT& value, bool as_trigger) { debug_enter_func_with_args("key={}", value.get_key_ref()); auto version_and_hlc = group->template get_subgroup(this->subgroup_index).get_current_version(); @@ -380,7 +380,7 @@ version_tuple VolatileCascadeStore::ordered_put(const VT& value) version_tuple version_and_timestamp{persistent::INVALID_VERSION, 0}; - if(this->internal_ordered_put(value) == true) { + if(this->internal_ordered_put(value,as_trigger) == true) { version_and_timestamp = {std::get<0>(version_and_hlc),std::get<1>(version_and_hlc).m_rtc_us}; } @@ -398,7 +398,7 @@ version_tuple VolatileCascadeStore::ordered_put(const VT& value) } template -void VolatileCascadeStore::ordered_put_and_forget(const VT& value) { +void VolatileCascadeStore::ordered_put_and_forget(const VT& value, bool as_trigger) { debug_enter_func_with_args("key={}", value.get_key_ref()); #ifdef ENABLE_EVALUATION auto version_and_hlc = group->template get_subgroup(this->subgroup_index).get_current_version(); @@ -408,7 +408,7 @@ void VolatileCascadeStore::ordered_put_and_forget(const VT& valu #else LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_VOLATILE_ORDERED_PUT_AND_FORGET_START,group,value,std::get<0>(version_and_hlc)); #endif - internal_ordered_put(value); + internal_ordered_put(value,as_trigger); #if __cplusplus > 201703L LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_ORDERED_PUT_AND_FORGET_END,group,value,std::get<0>(version_and_hlc)); #else @@ -418,7 +418,7 @@ void VolatileCascadeStore::ordered_put_and_forget(const VT& valu } template -bool VolatileCascadeStore::internal_ordered_put(const VT& value) { +bool VolatileCascadeStore::internal_ordered_put(const VT& value, bool as_trigger) { auto version_and_hlc = group->template get_subgroup(this->subgroup_index).get_current_version(); if constexpr(std::is_base_of::value) { @@ -456,29 +456,31 @@ bool VolatileCascadeStore::internal_ordered_put(const VT& value) } } + if (!as_trigger) { // for lockless check this->lockless_v1.store(std::get<0>(version_and_hlc), std::memory_order_relaxed); // compiler reordering barrier #ifdef __GNUC__ - asm volatile("" :: + asm volatile("" :: : "memory"); #else #error Lockless support is currently for GCC only #endif - this->kv_map.erase(value.get_key_ref()); // remove - this->kv_map.emplace(value.get_key_ref(), value); // copy constructor - this->update_version = std::get<0>(version_and_hlc); + this->kv_map.erase(value.get_key_ref()); // remove + this->kv_map.emplace(value.get_key_ref(), value); // copy constructor + this->update_version = std::get<0>(version_and_hlc); - // for lockless check - // compiler reordering barrier + // for lockless check + // compiler reordering barrier #ifdef __GNUC__ - asm volatile("" :: + asm volatile("" :: : "memory"); #else #error Lockless support is currently for GCC only #endif - this->lockless_v2.store(std::get<0>(version_and_hlc), std::memory_order_relaxed); + this->lockless_v2.store(std::get<0>(version_and_hlc), std::memory_order_relaxed); + } if(cascade_watcher_ptr) { (*cascade_watcher_ptr)( diff --git a/include/cascade/persistent_store.hpp b/include/cascade/persistent_store.hpp index d7bd8406..a32704c6 100644 --- a/include/cascade/persistent_store.hpp +++ b/include/cascade/persistent_store.hpp @@ -30,7 +30,7 @@ class PersistentCascadeStore : public ICascadeStore, public derecho::GroupReference, public derecho::NotificationSupport { private: - bool internal_ordered_put(const VT& value); + bool internal_ordered_put(const VT& value, bool as_trigger); public: using derecho::GroupReference::group; @@ -85,8 +85,8 @@ class PersistentCascadeStore : public ICascadeStore, #endif #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; - virtual version_tuple put(const VT& value) const override; - virtual void put_and_forget(const VT& value) const override; + virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual void put_and_forget(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; #endif // ENABLE_EVALUATION @@ -100,8 +100,8 @@ class PersistentCascadeStore : public ICascadeStore, virtual uint64_t multi_get_size(const KT& key) const override; virtual uint64_t get_size(const KT& key, const persistent::version_t& ver, const bool stable, bool exact = false) const override; virtual uint64_t get_size_by_time(const KT& key, const uint64_t& ts_us, const bool stable) const override; - virtual version_tuple ordered_put(const VT& value) override; - virtual void ordered_put_and_forget(const VT& value) override; + virtual version_tuple ordered_put(const VT& value, bool as_trigger) override; + virtual void ordered_put_and_forget(const VT& value, bool as_trigger) override; virtual version_tuple ordered_remove(const KT& key) override; virtual const VT ordered_get(const KT& key) override; virtual std::vector ordered_list_keys(const std::string& prefix) override; diff --git a/include/cascade/service.hpp b/include/cascade/service.hpp index 0072c21e..dbc51f29 100644 --- a/include/cascade/service.hpp +++ b/include/cascade/service.hpp @@ -689,15 +689,17 @@ namespace cascade { * of the version and timestamp meaning what is the latest version/timestamp the caller * has seen. Cascade will reject the write if the corresponding key has been updated * already. TODO: should we make it an optional feature? - * @param[in] subgroup_index the subgroup index of CascadeType + * @param[in] subgroup_index the subgroup index of CascadeType * @param[in] shard_index the shard index. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. * * @return a future to the version and timestamp of the put operation. * TODO: check if the user application is responsible for reclaim the future by reading it sometime. */ template derecho::rpc::QueryResults put(const typename SubgroupType::ObjectType& object, - uint32_t subgroup_index, uint32_t shard_index); + uint32_t subgroup_index, uint32_t shard_index, bool as_trigger = false); /** * "type_recursive_put" is a helper function for internal use only. * @param[in] type_index the index of the subgroup type in the CascadeTypes... list. And the FirstType, @@ -706,6 +708,8 @@ namespace cascade { * @param[in] subgroup_index * the subgroup index in the subgroup type designated by type_index * @param[in] shard_index the shard index + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. * * @return a future to the version and timestamp of the put operation. */ @@ -715,23 +719,27 @@ namespace cascade { uint32_t type_index, const ObjectType& object, uint32_t subgroup_index, - uint32_t shard_index); + uint32_t shard_index, + bool as_trigger = false); template derecho::rpc::QueryResults type_recursive_put( uint32_t type_index, const ObjectType& object, uint32_t subgroup_index, - uint32_t shard_index); + uint32_t shard_index, + bool as_trigger = false); public: /** * object pool version * @param[in] object the object to write, the object pool is extracted from the object key. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. * * @return a future to the version and timestamp of the put operation. */ template - derecho::rpc::QueryResults put(const ObjectType& object); + derecho::rpc::QueryResults put(const ObjectType& object, bool as_trigger = false); /** * "put_and_forget" writes an object to a given subgroup/shard, but no return value. @@ -748,10 +756,12 @@ namespace cascade { * already. TODO: should we make it an optional feature? * @param[in] subgroup_index the subgroup index of CascadeType * @param[in] shard_index the shard index. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. */ template void put_and_forget(const typename SubgroupType::ObjectType& object, - uint32_t subgroup_index, uint32_t shard_index); + uint32_t subgroup_index, uint32_t shard_index, bool as_trigger = false); /** * "type_recursive_put_and_forget" is a helper function for internal use only. @@ -761,6 +771,8 @@ namespace cascade { * @param[in] subgroup_index * the subgroup index in the subgroup type designated by type_index * @param[in] shard_index the shard index + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. */ protected: template @@ -768,21 +780,25 @@ namespace cascade { uint32_t type_index, const ObjectType& object, uint32_t subgroup_index, - uint32_t shard_index); + uint32_t shard_index, + bool as_trigger = false); template void type_recursive_put_and_forget( uint32_t type_index, const ObjectType& object, uint32_t subgroup_index, - uint32_t shard_index); + uint32_t shard_index, + bool as_trigger = false); public: /** * object pool version - * @param[in] object the object to write, the object pool is extracted from the object key. + * @param[in] object the object to write, the object pool is extracted from the object key. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. */ template - void put_and_forget(const ObjectType& object); + void put_and_forget(const ObjectType& object, bool as_trigger = false); /** * "trigger_put" writes an object to a given subgroup/shard. diff --git a/include/cascade/trigger_store.hpp b/include/cascade/trigger_store.hpp index 461e6392..95fdb921 100644 --- a/include/cascade/trigger_store.hpp +++ b/include/cascade/trigger_store.hpp @@ -80,8 +80,8 @@ class TriggerCascadeNoStore : public ICascadeStore, #endif #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; - virtual version_tuple put(const VT& value) const override; - virtual void put_and_forget(const VT& value) const override; + virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual void put_and_forget(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; #endif // ENABLE_EVALUATION @@ -95,8 +95,8 @@ class TriggerCascadeNoStore : public ICascadeStore, virtual uint64_t multi_get_size(const KT& key) const override; virtual uint64_t get_size(const KT& key, const persistent::version_t& ver, const bool stable, bool exact = false) const override; virtual uint64_t get_size_by_time(const KT& key, const uint64_t& ts_us, const bool stable) const override; - virtual version_tuple ordered_put(const VT& value) override; - virtual void ordered_put_and_forget(const VT& value) override; + virtual version_tuple ordered_put(const VT& value, bool as_trigger) override; + virtual void ordered_put_and_forget(const VT& value, bool as_trigger) override; virtual version_tuple ordered_remove(const KT& key) override; virtual const VT ordered_get(const KT& key) override; virtual std::vector ordered_list_keys(const std::string& prefix) override; diff --git a/include/cascade/volatile_store.hpp b/include/cascade/volatile_store.hpp index f645cce5..6f2d166f 100644 --- a/include/cascade/volatile_store.hpp +++ b/include/cascade/volatile_store.hpp @@ -25,7 +25,7 @@ class VolatileCascadeStore : public ICascadeStore, public derecho::GroupReference, public derecho::NotificationSupport { private: - bool internal_ordered_put(const VT& value); + bool internal_ordered_put(const VT& value, bool as_trigger); #if defined(__i386__) || defined(__x86_64__) || defined(_M_AMD64) || defined(_M_IX86) mutable std::atomic lockless_v1; mutable std::atomic lockless_v2; @@ -90,11 +90,11 @@ class VolatileCascadeStore : public ICascadeStore, #endif #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; - virtual version_tuple put(const VT& value) const override; + virtual version_tuple put(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; #endif // ENABLE_EVALUATION - virtual void put_and_forget(const VT& value) const override; + virtual void put_and_forget(const VT& value, bool as_trigger) const override; virtual version_tuple remove(const KT& key) const override; virtual const VT get(const KT& key, const persistent::version_t& ver, const bool stable, bool exact = false) const override; virtual const VT multi_get(const KT& key) const override; @@ -105,8 +105,8 @@ class VolatileCascadeStore : public ICascadeStore, virtual uint64_t multi_get_size(const KT& key) const override; virtual uint64_t get_size(const KT& key, const persistent::version_t& ver, const bool stable, bool exact = false) const override; virtual uint64_t get_size_by_time(const KT& key, const uint64_t& ts_us, const bool stable) const override; - virtual version_tuple ordered_put(const VT& value) override; - virtual void ordered_put_and_forget(const VT& value) override; + virtual version_tuple ordered_put(const VT& value, bool as_trigger) override; + virtual void ordered_put_and_forget(const VT& value, bool as_trigger) override; virtual version_tuple ordered_remove(const KT& key) override; virtual const VT ordered_get(const KT& key) override; virtual std::vector ordered_list_keys(const std::string& prefix) override; diff --git a/src/applications/standalone/dds/include/cascade_dds/detail/dds_impl.hpp b/src/applications/standalone/dds/include/cascade_dds/detail/dds_impl.hpp index 21e4046c..88d3f7b4 100644 --- a/src/applications/standalone/dds/include/cascade_dds/detail/dds_impl.hpp +++ b/src/applications/standalone/dds/include/cascade_dds/detail/dds_impl.hpp @@ -119,7 +119,7 @@ class DDSPublisherImpl: public DDSPublisher { // send message dbg_default_trace("in {}: put object with key:{}", __PRETTY_FUNCTION__, cascade_key); - capi.put_and_forget(object); + capi.put_and_forget(object,false); #if !defined(USE_DDS_TIMESTAMP_LOG) TimestampLogger::log(TLT_DDS_PUBLISHER_SEND_END,capi.get_my_id(),message_id,get_time_ns()); #endif diff --git a/src/applications/standalone/dds/src/dds.cpp b/src/applications/standalone/dds/src/dds.cpp index c0d1310e..dd8fa031 100644 --- a/src/applications/standalone/dds/src/dds.cpp +++ b/src/applications/standalone/dds/src/dds.cpp @@ -159,7 +159,7 @@ void DDSMetadataClient::create_topic(const Topic& topic) { Blob blob(stack_buffer,size,true); ObjectWithStringKey topic_object(metadata_pathname+PATH_SEPARATOR+topic.name,blob); dbg_default_trace("create topic:{}", topic.name); - auto result = capi.put(topic_object); + auto result = capi.put(topic_object,false); for (auto& reply_future: result.get() ) { auto reply = reply_future.second.get(); dbg_default_trace("Node {} replied with (v:0x{:x},t:{}us)", reply_future.first, diff --git a/src/applications/standalone/kvs_client/kvs_client.cpp b/src/applications/standalone/kvs_client/kvs_client.cpp index 0e5a882d..36374fe3 100644 --- a/src/applications/standalone/kvs_client/kvs_client.cpp +++ b/src/applications/standalone/kvs_client/kvs_client.cpp @@ -40,7 +40,7 @@ int main(int argc, char** argv) { obj.previous_version = INVALID_VERSION; obj.previous_version_by_key = INVALID_VERSION; obj.blob = Blob(reinterpret_cast(OBJECT_VALUE),std::strlen(OBJECT_VALUE)); - auto result_4 = capi.put(obj); + auto result_4 = capi.put(obj,false); for (auto& reply_future:result_4.get()) { auto reply = reply_future.second.get(); std::cout << "node(" << reply_future.first << ") replied with version:" << std::get<0>(reply) diff --git a/src/applications/tests/cascade_as_subgroup_classes/cli_example.cpp b/src/applications/tests/cascade_as_subgroup_classes/cli_example.cpp index 57043e3e..df89e258 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/cli_example.cpp +++ b/src/applications/tests/cascade_as_subgroup_classes/cli_example.cpp @@ -73,13 +73,13 @@ static void client_put(derecho::ExternalGroupClient& group, if (is_persistent) { ExternalClientCaller::type>& pcs_ec = group.get_subgroup_caller(); - auto result = pcs_ec.p2p_send(member,o); + auto result = pcs_ec.p2p_send(member,o,false); auto reply = result.get().get(member); std::cout << "put finished with timestamp=" << std::get<1>(reply) << ",version=" << std::get<0>(reply) << std::endl; } else { ExternalClientCaller::type>& vcs_ec = group.get_subgroup_caller(); - auto result = vcs_ec.p2p_send(member,o); + auto result = vcs_ec.p2p_send(member,o,false); auto reply = result.get().get(member); std::cout << "put finished with timestamp=" << std::get<1>(reply) << ",version=" << std::get<0>(reply) << std::endl; diff --git a/src/applications/tests/cascade_as_subgroup_classes/perf.cpp b/src/applications/tests/cascade_as_subgroup_classes/perf.cpp index 4b31e69c..05fff96d 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/perf.cpp +++ b/src/applications/tests/cascade_as_subgroup_classes/perf.cpp @@ -347,7 +347,7 @@ int do_client(int argc,char** args) { for(uint64_t i = 0; i < num_messages; i++) { ObjectWithUInt64Key o(randomize_key(i)%max_distinct_objects,Blob(bbuf, msg_size)); - cs.do_send(i,[&o,&pcs_ec,&server_id](){return std::move(pcs_ec.p2p_send(server_id,o));}); + cs.do_send(i,[&o,&pcs_ec,&server_id](){return std::move(pcs_ec.p2p_send(server_id,o,false));}); } free(bbuf); @@ -367,7 +367,7 @@ int do_client(int argc,char** args) { for(uint64_t i = 0; i < num_messages; i++) { ObjectWithUInt64Key o(randomize_key(i)%max_distinct_objects,Blob(bbuf, msg_size)); - cs.do_send(i,[&o,&vcs_ec,&server_id](){return std::move(vcs_ec.p2p_send(server_id,o));}); + cs.do_send(i,[&o,&vcs_ec,&server_id](){return std::move(vcs_ec.p2p_send(server_id,o,false));}); } free(bbuf); diff --git a/src/service/client.cpp b/src/service/client.cpp index be512155..992d2721 100644 --- a/src/service/client.cpp +++ b/src/service/client.cpp @@ -229,7 +229,7 @@ void op_put(ServiceClientAPI& capi, const std::string& key, const std::string& v obj.previous_version = pver; obj.previous_version_by_key = pver_bk; obj.blob = Blob(reinterpret_cast(value.c_str()),value.length()); - derecho::rpc::QueryResults result = capi.put(obj); + derecho::rpc::QueryResults result = capi.put(obj,false); check_put_and_remove_result(result); } @@ -251,7 +251,7 @@ void op_put_file(ServiceClientAPI& capi, const std::string& key, const std::stri ObjectWithStringKey obj(key,message_generator,file_size); obj.previous_version = pver; obj.previous_version_by_key = pver_bk; - derecho::rpc::QueryResults result = capi.put(obj); + derecho::rpc::QueryResults result = capi.put(obj,false); value_file.close(); check_put_and_remove_result(result); } diff --git a/src/service/cs/cascade_client_cs.cpp b/src/service/cs/cascade_client_cs.cpp index fff811fa..67c1fed9 100644 --- a/src/service/cs/cascade_client_cs.cpp +++ b/src/service/cs/cascade_client_cs.cpp @@ -238,7 +238,7 @@ auto get_by_time(ServiceClientAPI& capi, const std::string& key, uint64_t ts_us, */ template auto put_internal(ServiceClientAPI& capi, const typename SubgroupType::ObjectType& obj, uint32_t subgroup_index = UINT32_MAX, uint32_t shard_index = 0) { - derecho::rpc::QueryResults result = (subgroup_index == UINT32_MAX) ? capi.put(obj) : capi.template put(obj, subgroup_index, shard_index); + derecho::rpc::QueryResults result = (subgroup_index == UINT32_MAX) ? capi.put(obj,false) : capi.template put(obj, subgroup_index, shard_index, false); QueryResultsStore* s = new QueryResultsStore(std::move(result), bundle_f); return s; } @@ -255,9 +255,9 @@ auto put_internal(ServiceClientAPI& capi, const typename SubgroupType::ObjectTyp template void put_and_forget(ServiceClientAPI& capi, const typename SubgroupType::ObjectType& obj, uint32_t subgroup_index = UINT32_MAX, uint32_t shard_index = 0) { if(subgroup_index == UINT32_MAX) { - capi.put_and_forget(obj); + capi.put_and_forget(obj,false); } else { - capi.template put_and_forget(obj, subgroup_index, shard_index); + capi.template put_and_forget(obj, subgroup_index, shard_index, false); } } @@ -747,11 +747,11 @@ EXPORT auto EXPORT_put(ServiceClientAPI& capi, char* key, uint8_t* bytes, std::s if (trigger) { capi.trigger_put(obj); } else if (blocking) { - auto result = capi.put(obj); + auto result = capi.put(obj,false); auto s = new QueryResultsStore(std::move(result), bundle_f); return s; } else { - capi.put_and_forget(obj); + capi.put_and_forget(obj,false); } } else { if (trigger) { diff --git a/src/service/perftest.cpp b/src/service/perftest.cpp index 2a1684ba..e7222399 100644 --- a/src/service/perftest.cpp +++ b/src/service/perftest.cpp @@ -135,7 +135,7 @@ bool PerfTestServer::eval_put(uint64_t max_operation_per_second, TimestampLogger::log(TLT_READY_TO_SEND,this->capi.get_my_id(),message_id); if (subgroup_index == INVALID_SUBGROUP_INDEX || shard_index == INVALID_SHARD_INDEX) { - future_appender(this->capi.put(objects.at(now_ns%num_distinct_objects))); + future_appender(this->capi.put(objects.at(now_ns%num_distinct_objects),false)); } else { on_subgroup_type_index_with_return( std::decay_t::subgroup_type_order.at(subgroup_type_index), @@ -309,7 +309,7 @@ bool PerfTestServer::eval_get(int32_t log_depth, std::unique_ptr> put_result_future; if(subgroup_index == INVALID_SUBGROUP_INDEX || shard_index == INVALID_SHARD_INDEX) { // put returns the QueryResults by value, so we have to move-construct it into a unique_ptr - put_result_future = std::make_unique>(std::move(this->capi.put(object))); + put_result_future = std::make_unique>(std::move(this->capi.put(object,false))); } else { // Manual copy of on_subgroup_type_index macro so I can use make_unique std::type_index tindex = std::decay_t::subgroup_type_order.at(subgroup_type_index); @@ -517,7 +517,7 @@ bool PerfTestServer::eval_get_by_time(uint64_t ms_in_past, put_futures_queue.emplace(current_object, std::move(query_results)); }; if(subgroup_index == INVALID_SUBGROUP_INDEX || shard_index == INVALID_SHARD_INDEX) { - future_appender(this->capi.put(objects.at(current_object))); + future_appender(this->capi.put(objects.at(current_object),false)); } else { on_subgroup_type_index_with_return( std::decay_t::subgroup_type_order.at(subgroup_type_index), diff --git a/src/service/python/cascade_client_py.cpp b/src/service/python/cascade_client_py.cpp index fa60eb38..4cfa5f52 100644 --- a/src/service/python/cascade_client_py.cpp +++ b/src/service/python/cascade_client_py.cpp @@ -165,8 +165,8 @@ static void print_red(std::string msg) { @return QueryResultsStore that handles the tuple of version and ts_us. */ template -auto put(ServiceClientAPI& capi, const typename SubgroupType::ObjectType& obj, uint32_t subgroup_index = UINT32_MAX, uint32_t shard_index = 0) { - derecho::rpc::QueryResults result = (subgroup_index == UINT32_MAX) ? capi.put(obj) : capi.template put(obj, subgroup_index, shard_index); +auto put(ServiceClientAPI& capi, const typename SubgroupType::ObjectType& obj, uint32_t subgroup_index = UINT32_MAX, uint32_t shard_index = 0, bool as_trigger = false) { + derecho::rpc::QueryResults result = (subgroup_index == UINT32_MAX) ? capi.put(obj, as_trigger) : capi.template put(obj, subgroup_index, shard_index, as_trigger); QueryResultsStore>* s = new QueryResultsStore>(std::move(result), bundle_f); return py::cast(s); @@ -183,11 +183,11 @@ auto put(ServiceClientAPI& capi, const typename SubgroupType::ObjectType& obj, u @return QueryResultsStore that handles the tuple of version and ts_us. */ template -void put_and_forget(ServiceClientAPI& capi, const typename SubgroupType::ObjectType& obj, uint32_t subgroup_index = UINT32_MAX, uint32_t shard_index = 0) { +void put_and_forget(ServiceClientAPI& capi, const typename SubgroupType::ObjectType& obj, uint32_t subgroup_index = UINT32_MAX, uint32_t shard_index = 0, bool as_trigger = false) { if(subgroup_index == UINT32_MAX) { - capi.put_and_forget(obj); + capi.put_and_forget(obj, as_trigger); } else { - capi.template put_and_forget(obj, subgroup_index, shard_index); + capi.template put_and_forget(obj, subgroup_index, shard_index, as_trigger); } } @@ -611,6 +611,7 @@ PYBIND11_MODULE(member_client, m) { persistent::version_t previous_version_by_key = CURRENT_VERSION; bool blocking = true; bool trigger = false; + bool as_trigger = false; #ifdef ENABLE_EVALUATION uint64_t message_id = 0; #endif @@ -635,6 +636,9 @@ PYBIND11_MODULE(member_client, m) { if (kwargs.contains("trigger")) { trigger = kwargs["trigger"].cast(); } + if (kwargs.contains("as_trigger")) { + as_trigger = kwargs["as_trigger"].cast(); + } #ifdef ENABLE_EVALUATION if (kwargs.contains("message_id")) { message_id = kwargs["message_id"].cast(); @@ -652,19 +656,19 @@ PYBIND11_MODULE(member_client, m) { if (trigger) { capi.ref.trigger_put(obj); } else if (blocking) { - auto result = capi.ref.put(obj); + auto result = capi.ref.put(obj,as_trigger); auto s = new QueryResultsStore>(std::move(result), bundle_f); return py::cast(s); } else { - capi.ref.put_and_forget(obj); + capi.ref.put_and_forget(obj,as_trigger); } } else { if (trigger) { on_all_subgroup_type(subgroup_type, trigger_put, capi.ref, obj, subgroup_index, shard_index); } else if (blocking) { - on_all_subgroup_type(subgroup_type, return put, capi.ref, obj, subgroup_index, shard_index); + on_all_subgroup_type(subgroup_type, return put, capi.ref, obj, subgroup_index, shard_index, as_trigger); } else { - on_all_subgroup_type(subgroup_type, put_and_forget, capi.ref, obj, subgroup_index, shard_index); + on_all_subgroup_type(subgroup_type, put_and_forget, capi.ref, obj, subgroup_index, shard_index, as_trigger); } } @@ -684,6 +688,7 @@ PYBIND11_MODULE(member_client, m) { "\t@argX pervious_version_by_key \n" "\t@argX blocking \n" "\t@argX trigger Using trigger put, always non-blocking regardless of blocking argument.\n" + "\t@argX as_trigger Enable 'trigger' flag for normal put. If true, the value will ONLY trigger the UDL and NOT apply to the K/V. Defaulted to false\n" #ifdef ENABLE_EVALUATION "\t@argX message_id \n" #endif From 464fc17f2d6ceb0e4344569a117d8f21757cbc60 Mon Sep 17 00:00:00 2001 From: Weijia Song Date: Sat, 15 Jun 2024 13:04:54 -0400 Subject: [PATCH 2/2] cascade_client.py can use as_trigger flag now. TODO: add as_trigger flag support to C++/Java/C# clients. --- src/service/python/cascade_client.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/service/python/cascade_client.py b/src/service/python/cascade_client.py index 438d60c2..b1055129 100755 --- a/src/service/python/cascade_client.py +++ b/src/service/python/cascade_client.py @@ -189,8 +189,12 @@ def do_put(self, arg): the key. The value will get rejected if the latest version of the key grows beyond previous_version. message_id: The message_id for the object. - blocking: optional blocking flag. Default to True. - trigger: optional trigger flag, Default to False. + blocking: optional blocking flag. Defaulted to True. + trigger: optional trigger flag, Defaulted to False. + as_trigger: optional as_trigger flag, Defaulted to False. This flag only applies when 'trigger' + flag is False. If 'trigger' == False and 'as_trigger' == True, the object will NOT + apply to the K/V store, which is similar to 'trigger' flag, but it will multicast + to all replicas and trigger the UDLs registerd on ordered data path. ''' self.check_capi() args = arg.split() @@ -205,6 +209,7 @@ def do_put(self, arg): message_id = 0 blocking = True trigger = False + as_trigger = False argpos = 2 while argpos < len(args): extra_option = args[argpos].split('=') @@ -227,8 +232,10 @@ def do_put(self, arg): blocking = False elif extra_option[0] == 'trigger' and ( extra_option[1].lower() == 'yes' or extra_option[1].lower() == 'true' or extra_option[1].lower() == 'on' or extra_option[1].lower() == '1' ): trigger = True + elif extra_option[0] == 'as_trigger' and ( extra_option[1].lower() == 'yes' or extra_option[1].lower() == 'true' or extra_option[1].lower() == 'on' or extra_option[1].lower() == '1' ): + as_trigger = True argpos = argpos + 1 - res = self.capi.put(args[0],bytes(args[1],'utf-8'),subgroup_type=subgroup_type,subgroup_index=subgroup_index,shard_index=shard_index,previous_version=previous_version,previous_version_by_key=previous_version_by_key,message_id=message_id,blocking=blocking,trigger=trigger) + res = self.capi.put(args[0],bytes(args[1],'utf-8'),subgroup_type=subgroup_type,subgroup_index=subgroup_index,shard_index=shard_index,previous_version=previous_version,previous_version_by_key=previous_version_by_key,message_id=message_id,blocking=blocking,trigger=trigger,as_trigger=as_trigger) if blocking and not trigger and res: print(bcolors.OK + f"{res.get_result()}" + bcolors.RESET) elif trigger and not res: