Skip to content

Commit

Permalink
Merge pull request #75 from Derecho-Project/trigger_multicast
Browse files Browse the repository at this point in the history
Trigger multicast
  • Loading branch information
songweijia authored Jul 3, 2024
2 parents 172e07a + 464fc17 commit 38654c1
Show file tree
Hide file tree
Showing 21 changed files with 169 additions and 125 deletions.
20 changes: 12 additions & 8 deletions include/cascade/cascade_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,17 @@ class ICascadeStore {
KT* InvKeyPtr = IK;
VT* InvValPtr = IV;
/**
* @brief put(const VT&)
* @brief put(const VT&, bool )
*
* Put a value. VT must implement ICascadeObject interface. The key is given in value and retrieved by
* ICascadeObject::get_key_ref()
*
* @param[in] value The K/V pair value
* @param[in] value The K/V pair value
* @param[in] as_trigger The object will NOT be used to update the K/V state.
*
* @return a tuple including version number (version_t) and a timestamp in microseconds.
*/
virtual version_tuple put(const VT& value) const = 0;
virtual version_tuple put(const VT& value, bool as_trigger) const = 0;

/**
* @brief put_and_forget(const VT&)
Expand All @@ -109,10 +110,11 @@ class ICascadeStore {
* ICascadeObject::get_key_ref(). This function ignores any return value.
*
* @param[in] value The K/V pair value
* @param[in] as_trigger The object will NOT be used to update the K/V state.
*
* @return void
*/
virtual void put_and_forget(const VT& value) const = 0;
virtual void put_and_forget(const VT& value, bool as_trigger) const = 0;

#ifdef ENABLE_EVALUATION
/**
Expand Down Expand Up @@ -336,18 +338,20 @@ class ICascadeStore {
/**
* @brief ordered_put
*
* @param[in] value The K/V pair object.
* @param[in] value The K/V pair object.
* @param[in] as_trigger If true, the value will NOT apply to the K/V state.
*
* @return A tuple including version number (version_t) and a timestamp in microseconds.
*/
virtual version_tuple ordered_put(const VT& value) = 0;
virtual version_tuple ordered_put(const VT& value, bool as_trigger) = 0;

/**
* @brief ordered_put_and_forget
*
* @param[in] value The K/V pair object.
* @param[in] value The K/V pair object.
* @param[in] as_trigger If true, the value will NOT apply to the K/V state.
*/
virtual void ordered_put_and_forget(const VT& value) = 0;
virtual void ordered_put_and_forget(const VT& value, bool as_trigger) = 0;

/**
* @brief ordered_remove
Expand Down
2 changes: 1 addition & 1 deletion include/cascade/detail/delta_store_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DeltaCascadeStoreCore : public mutils::ByteRepresentable,
/**
* Ordered put, and generate a delta.
*/
virtual bool ordered_put(const VT& value, persistent::version_t prever);
virtual bool ordered_put(const VT& value, persistent::version_t prever, bool as_trigger);
/**
* Ordered remove, and generate a delta.
*/
Expand Down
14 changes: 8 additions & 6 deletions include/cascade/detail/delta_store_core_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ std::unique_ptr<DeltaCascadeStoreCore<KT, VT, IK, IV>> DeltaCascadeStoreCore<KT,
}

template <typename KT, typename VT, KT* IK, VT* IV>
bool DeltaCascadeStoreCore<KT, VT, IK, IV>::ordered_put(const VT& value, persistent::version_t prev_ver) {
bool DeltaCascadeStoreCore<KT, VT, IK, IV>::ordered_put(const VT& value, persistent::version_t prev_ver, bool as_trigger) {
// call validator
if constexpr(std::is_base_of<IValidator<KT, VT>, VT>::value) {
if(!value.validate(this->kv_map)) {
Expand Down Expand Up @@ -207,11 +207,13 @@ bool DeltaCascadeStoreCore<KT, VT, IK, IV>::ordered_put(const VT& value, persist
}
value.set_previous_version(prev_ver, prev_ver_by_key);
}
// create delta.
assert(this->delta.empty());
this->delta.push_back(value.get_key_ref());
// apply_ordered_put
apply_ordered_put(value);
if (!as_trigger) {
// create delta.
assert(this->delta.empty());
this->delta.push_back(value.get_key_ref());
// apply_ordered_put
apply_ordered_put(value);
}
return true;
}

Expand Down
22 changes: 12 additions & 10 deletions include/cascade/detail/persistent_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ namespace derecho {
namespace cascade {

template <typename KT, typename VT, KT* IK, VT* IV, persistent::StorageType ST>
version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::put(const VT& value) const {
version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::put(const VT& value, bool as_trigger) const {
debug_enter_func_with_args("value.get_key_ref()={}", value.get_key_ref());
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_START, group, value);

derecho::Replicated<PersistentCascadeStore>& subgroup_handle = group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index);
auto results = subgroup_handle.template ordered_send<RPC_NAME(ordered_put)>(value);
auto results = subgroup_handle.template ordered_send<RPC_NAME(ordered_put)>(value, as_trigger);
auto& replies = results.get();
version_tuple ret{CURRENT_VERSION, 0};
// TODO: verfiy consistency ?
Expand All @@ -43,12 +43,12 @@ version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::put(const VT& value) c
}

template <typename KT, typename VT, KT* IK, VT* IV, persistent::StorageType ST>
void PersistentCascadeStore<KT, VT, IK, IV, ST>::put_and_forget(const VT& value) const {
void PersistentCascadeStore<KT, VT, IK, IV, ST>::put_and_forget(const VT& value, bool as_trigger) const {
debug_enter_func_with_args("value.get_key_ref()={}", value.get_key_ref());
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_AND_FORGET_START, group, value);

derecho::Replicated<PersistentCascadeStore>& subgroup_handle = group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index);
subgroup_handle.template ordered_send<RPC_NAME(ordered_put_and_forget)>(value);
subgroup_handle.template ordered_send<RPC_NAME(ordered_put_and_forget)>(value, as_trigger);

LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_AND_FORGET_END, group, value);
debug_leave_func();
Expand Down Expand Up @@ -497,7 +497,7 @@ std::vector<KT> PersistentCascadeStore<KT, VT, IK, IV, ST>::list_keys_by_time(co
}

template <typename KT, typename VT, KT* IK, VT* IV, persistent::StorageType ST>
version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put(const VT& value) {
version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put(const VT& value,bool as_trigger) {
debug_enter_func_with_args("key={}", value.get_key_ref());

auto version_and_hlc = group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index).get_current_version();
Expand All @@ -507,7 +507,7 @@ version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put(const VT&
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_ORDERED_PUT_START, group, value, std::get<0>(version_and_hlc));
#endif
version_tuple version_and_timestamp{persistent::INVALID_VERSION,0};
if(this->internal_ordered_put(value) == true) {
if(this->internal_ordered_put(value,as_trigger) == true) {
version_and_timestamp = {std::get<0>(version_and_hlc),std::get<1>(version_and_hlc).m_rtc_us};
}

Expand All @@ -524,7 +524,7 @@ version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put(const VT&
}

template <typename KT, typename VT, KT* IK, VT* IV, persistent::StorageType ST>
void PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put_and_forget(const VT& value) {
void PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put_and_forget(const VT& value,bool as_trigger) {
debug_enter_func_with_args("key={}", value.get_key_ref());
#ifdef ENABLE_EVALUATION
auto version_and_hlc = group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index).get_current_version();
Expand All @@ -536,7 +536,7 @@ void PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put_and_forget(const VT
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_ORDERED_PUT_AND_FORGET_START, group, value, std::get<0>(version_and_hlc));
#endif

this->internal_ordered_put(value);
this->internal_ordered_put(value,as_trigger);

#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_ORDERED_PUT_AND_FORGET_END, group, value, std::get<0>(version_and_hlc));
Expand All @@ -554,21 +554,23 @@ void PersistentCascadeStore<KT, VT, IK, IV, ST>::ordered_put_and_forget(const VT
}

template <typename KT, typename VT, KT* IK, VT* IV, persistent::StorageType ST>
bool PersistentCascadeStore<KT, VT, IK, IV, ST>::internal_ordered_put(const VT& value) {
bool PersistentCascadeStore<KT, VT, IK, IV, ST>::internal_ordered_put(const VT& value, bool as_trigger) {
auto version_and_hlc = group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index).get_current_version();
if constexpr(std::is_base_of<IKeepVersion, VT>::value) {
value.set_version(std::get<0>(version_and_hlc));
}
if constexpr(std::is_base_of<IKeepTimestamp, VT>::value) {
value.set_timestamp(std::get<1>(version_and_hlc).m_rtc_us);
}
if(this->persistent_core->ordered_put(value, this->persistent_core.getLatestVersion()) == false) {

if(this->persistent_core->ordered_put(value, this->persistent_core.getLatestVersion(), as_trigger) == false) {
// verification failed. S we return invalid versions.
debug_leave_func_with_value("version=0x{:x},timestamp={}us",
std::get<0>(version_and_hlc),
std::get<1>(version_and_hlc).m_rtc_us);
return false;
}

if(cascade_watcher_ptr) {
(*cascade_watcher_ptr)(
// group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index).get_subgroup_id(), // this is subgroup id
Expand Down
54 changes: 30 additions & 24 deletions include/cascade/detail/service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,34 +536,35 @@ template <typename SubgroupType>
derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::put(
const typename SubgroupType::ObjectType& value,
uint32_t subgroup_index,
uint32_t shard_index) {
uint32_t shard_index,
bool as_trigger) {
LOG_SERVICE_CLIENT_TIMESTAMP(TLT_SERVICE_CLIENT_PUT_START,
(std::is_base_of<IHasMessageID,typename SubgroupType::ObjectType>::value?value.get_message_id():0));
if (!is_external_client()) {
std::lock_guard<std::mutex> lck(this->group_ptr_mutex);
if (static_cast<uint32_t>(group_ptr->template get_my_shard<SubgroupType>(subgroup_index)) == shard_index) {
// ordered put as a shard member
auto& subgroup_handle = group_ptr->template get_subgroup<SubgroupType>(subgroup_index);
return subgroup_handle.template ordered_send<RPC_NAME(ordered_put)>(value);
return subgroup_handle.template ordered_send<RPC_NAME(ordered_put)>(value,as_trigger);
} else {
// p2p put
node_id_t node_id = pick_member_by_policy<SubgroupType>(subgroup_index,shard_index,value.get_key_ref());
try {
// as a subgroup member
auto& subgroup_handle = group_ptr->template get_subgroup<SubgroupType>(subgroup_index);
return subgroup_handle.template p2p_send<RPC_NAME(put)>(node_id,value);
return subgroup_handle.template p2p_send<RPC_NAME(put)>(node_id,value,as_trigger);
} catch (derecho::invalid_subgroup_exception& ex) {
// as an external caller
auto& subgroup_handle = group_ptr->template get_nonmember_subgroup<SubgroupType>(subgroup_index);
return subgroup_handle.template p2p_send<RPC_NAME(put)>(node_id,value);
return subgroup_handle.template p2p_send<RPC_NAME(put)>(node_id,value,as_trigger);
}
}
} else {
std::lock_guard<std::mutex> lck(this->external_group_ptr_mutex);
// call as an external client (ExternalClientCaller).
auto& caller = external_group_ptr->template get_subgroup_caller<SubgroupType>(subgroup_index);
node_id_t node_id = pick_member_by_policy<SubgroupType>(subgroup_index,shard_index,value.get_key_ref());
return caller.template p2p_send<RPC_NAME(put)>(node_id,value);
return caller.template p2p_send<RPC_NAME(put)>(node_id,value,as_trigger);
}
}

Expand All @@ -573,11 +574,12 @@ derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::type_r
uint32_t type_index,
const ObjectType& value,
uint32_t subgroup_index,
uint32_t shard_index) {
uint32_t shard_index,
bool as_trigger) {
if (type_index == 0) {
return this->template put<FirstType>(value,subgroup_index,shard_index);
return this->template put<FirstType>(value,subgroup_index,shard_index,as_trigger);
} else {
return this->template type_recursive_put<ObjectType, SecondType, RestTypes...>(type_index-1,value,subgroup_index,shard_index);
return this->template type_recursive_put<ObjectType, SecondType, RestTypes...>(type_index-1,value,subgroup_index,shard_index,as_trigger);
}
}

Expand All @@ -587,9 +589,10 @@ derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::type_r
uint32_t type_index,
const ObjectType& value,
uint32_t subgroup_index,
uint32_t shard_index) {
uint32_t shard_index,
bool as_trigger) {
if (type_index == 0) {
return this->template put<LastType>(value,subgroup_index,shard_index);
return this->template put<LastType>(value,subgroup_index,shard_index,as_trigger);
} else {
throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + ": type index is out of boundary.");
}
Expand All @@ -598,7 +601,7 @@ derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::type_r
template <typename... CascadeTypes>
template <typename ObjectType>
derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::put(
const ObjectType& value) {
const ObjectType& value, bool as_trigger) {

// STEP 1 - get key
if constexpr (!std::is_base_of_v<ICascadeObject<std::string,ObjectType>,ObjectType>) {
Expand All @@ -610,42 +613,43 @@ derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::put(
std::tie(subgroup_type_index,subgroup_index,shard_index) = this->template key_to_shard(value.get_key_ref());

// STEP 3 - call recursive put
return this->template type_recursive_put<ObjectType,CascadeTypes...>(subgroup_type_index,value,subgroup_index,shard_index);
return this->template type_recursive_put<ObjectType,CascadeTypes...>(subgroup_type_index,value,subgroup_index,shard_index,as_trigger);
}

template <typename... CascadeTypes>
template <typename SubgroupType>
void ServiceClient<CascadeTypes...>::put_and_forget(
const typename SubgroupType::ObjectType& value,
uint32_t subgroup_index,
uint32_t shard_index) {
uint32_t shard_index,
bool as_trigger) {
LOG_SERVICE_CLIENT_TIMESTAMP(TLT_SERVICE_CLIENT_PUT_AND_FORGET_START,
(std::is_base_of<IHasMessageID,typename SubgroupType::ObjectType>::value?value.get_message_id():0));
if (!is_external_client()) {
std::lock_guard<std::mutex> lck(this->group_ptr_mutex);
if (static_cast<uint32_t>(group_ptr->template get_my_shard<SubgroupType>(subgroup_index)) == shard_index) {
// do ordered put as a shard member (Replicated).
auto& subgroup_handle = group_ptr->template get_subgroup<SubgroupType>(subgroup_index);
subgroup_handle.template ordered_send<RPC_NAME(ordered_put_and_forget)>(value);
subgroup_handle.template ordered_send<RPC_NAME(ordered_put_and_forget)>(value,as_trigger);
} else {
node_id_t node_id = pick_member_by_policy<SubgroupType>(subgroup_index,shard_index,value.get_key_ref());
// do p2p put
try{
// as a subgroup member
auto& subgroup_handle = group_ptr->template get_subgroup<SubgroupType>(subgroup_index);
subgroup_handle.template p2p_send<RPC_NAME(put_and_forget)>(node_id,value);
subgroup_handle.template p2p_send<RPC_NAME(put_and_forget)>(node_id,value,as_trigger);
} catch (derecho::invalid_subgroup_exception& ex) {
// as an external caller
auto& subgroup_handle = group_ptr->template get_nonmember_subgroup<SubgroupType>(subgroup_index);
subgroup_handle.template p2p_send<RPC_NAME(put_and_forget)>(node_id,value);
subgroup_handle.template p2p_send<RPC_NAME(put_and_forget)>(node_id,value,as_trigger);
}
}
} else {
std::lock_guard<std::mutex> lck(this->external_group_ptr_mutex);
// call as an external client (ExternalClientCaller).
auto& caller = external_group_ptr->template get_subgroup_caller<SubgroupType>(subgroup_index);
node_id_t node_id = pick_member_by_policy<SubgroupType>(subgroup_index,shard_index,value.get_key_ref());
caller.template p2p_send<RPC_NAME(put_and_forget)>(node_id,value);
caller.template p2p_send<RPC_NAME(put_and_forget)>(node_id,value,as_trigger);
}
}

Expand All @@ -655,11 +659,12 @@ void ServiceClient<CascadeTypes...>::type_recursive_put_and_forget(
uint32_t type_index,
const ObjectType& value,
uint32_t subgroup_index,
uint32_t shard_index) {
uint32_t shard_index,
bool as_trigger) {
if (type_index == 0) {
put_and_forget<FirstType>(value,subgroup_index,shard_index);
put_and_forget<FirstType>(value,subgroup_index,shard_index,as_trigger);
} else {
type_recursive_put_and_forget<ObjectType,SecondType,RestTypes...>(type_index-1,value,subgroup_index,shard_index);
type_recursive_put_and_forget<ObjectType,SecondType,RestTypes...>(type_index-1,value,subgroup_index,shard_index,as_trigger);
}
}

Expand All @@ -669,17 +674,18 @@ void ServiceClient<CascadeTypes...>::type_recursive_put_and_forget(
uint32_t type_index,
const ObjectType& value,
uint32_t subgroup_index,
uint32_t shard_index) {
uint32_t shard_index,
bool as_trigger) {
if (type_index == 0) {
put_and_forget<LastType>(value,subgroup_index,shard_index);
put_and_forget<LastType>(value,subgroup_index,shard_index,as_trigger);
} else {
throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + ": type index is out of boundary.");
}
}

template <typename... CascadeTypes>
template <typename ObjectType>
void ServiceClient<CascadeTypes...>::put_and_forget(const ObjectType& value) {
void ServiceClient<CascadeTypes...>::put_and_forget(const ObjectType& value,bool as_trigger) {
// STEP 1 - get key
if constexpr (!std::is_base_of_v<ICascadeObject<std::string,ObjectType>,ObjectType>) {
throw derecho::derecho_exception(__PRETTY_FUNCTION__ + std::string(" only supports object of type ICascadeObject<std::string,ObjectType>,but we get ") + typeid(ObjectType).name());
Expand All @@ -690,7 +696,7 @@ void ServiceClient<CascadeTypes...>::put_and_forget(const ObjectType& value) {
std::tie(subgroup_type_index,subgroup_index,shard_index) = this->template key_to_shard(value.get_key_ref());

// STEP 3 - call recursive put_and_forget
this->template type_recursive_put_and_forget<ObjectType,CascadeTypes...>(subgroup_type_index,value,subgroup_index,shard_index);
this->template type_recursive_put_and_forget<ObjectType,CascadeTypes...>(subgroup_type_index,value,subgroup_index,shard_index,as_trigger);
}

template <typename... CascadeTypes>
Expand Down
Loading

0 comments on commit 38654c1

Please sign in to comment.