Skip to content

Commit

Permalink
Merge pull request #264 from Derecho-Project/test_fixes
Browse files Browse the repository at this point in the history
Fix unit tests that relied on global stability callbacks
  • Loading branch information
songweijia authored Nov 29, 2023
2 parents 112310b + cc8d665 commit 1601460
Show file tree
Hide file tree
Showing 29 changed files with 785 additions and 457 deletions.
7 changes: 6 additions & 1 deletion include/derecho/core/detail/derecho_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ using rpc_handler_t = std::function<void(subgroup_id_t, node_id_t, persistent::v
* not an internal data structure).
*/
struct UserMessageCallbacks {
/** A function to be called each time a message reaches global stability in the group. */
/**
* A function to be called each time a raw (non-RPC) message reaches global stability in the group.
* Note that this will not be called when RPC messages reach global stability, since the RPC function
* itself is already a user-provided callback, and a serialized RPC message will not be readable as
* a plain byte array (the "message body" argument provided to this callback).
*/
message_callback_t global_stability_callback;
/** A function to be called when a new version of a subgroup's state finishes persisting locally */
persistence_callback_t local_persistence_callback = nullptr;
Expand Down
8 changes: 4 additions & 4 deletions include/derecho/core/detail/external_group_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ ExternalGroupClient<ReplicatedTypes...>::ExternalGroupClient(
busy_wait_before_sleep_ms(getConfUInt64(Conf::DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS)) {
RpcLoggerPtr::initialize();
for(auto dc : deserialization_contexts) {
rdv.push_back(dc);
deserialization_contexts.push_back(dc);
}
#ifdef USE_VERBS_API
sst::verbs_initialize({},
Expand Down Expand Up @@ -383,7 +383,7 @@ std::exception_ptr ExternalGroupClient<ReplicatedTypes...>::receive_message(
}
std::size_t reply_header_size = header_space();
recv_ret reply_return = receiver_function_entry->second(
&rdv, received_from, buf,
&deserialization_contexts, received_from, buf,
[&out_alloc, &reply_header_size](std::size_t size) {
return out_alloc(size + reply_header_size) + reply_header_size;
});
Expand All @@ -405,7 +405,7 @@ void ExternalGroupClient<ReplicatedTypes...>::p2p_message_handler(node_id_t send
Opcode indx;
node_id_t received_from;
uint32_t flags;
retrieve_header(nullptr, msg_buf, payload_size, indx, received_from, flags);
retrieve_header(msg_buf, payload_size, indx, received_from, flags);
if(indx.is_reply) {
// REPLYs can be handled here because they do not block.
receive_message(indx, received_from, msg_buf + header_size, payload_size,
Expand Down Expand Up @@ -447,7 +447,7 @@ void ExternalGroupClient<ReplicatedTypes...>::p2p_request_worker() {
request = p2p_request_queue.front();
p2p_request_queue.pop();
}
retrieve_header(nullptr, request.msg_buf, payload_size, indx, received_from, flags);
retrieve_header(request.msg_buf, payload_size, indx, received_from, flags);
if(indx.is_reply || RPC_HEADER_FLAG_TST(flags, CASCADE)) {
dbg_error(rpc_logger, "Invalid rpc message in fifo queue: is_reply={}, is_cascading={}",
indx.is_reply, RPC_HEADER_FLAG_TST(flags, CASCADE));
Expand Down
18 changes: 9 additions & 9 deletions include/derecho/core/detail/remote_invocable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,18 @@ struct RemoteInvoker<Tag, std::function<Ret(Args...)>> {
/**
* Entry point for responses; called when a message is received that
* contains a response to this RemoteInvocable function's RPC call.
* @param rdv Deserialization vector
* @param contexts Deserialization context vector
* @param nid The ID of the node that sent the response
* @param response The byte buffer containing the response message
* @param f
* @param f An allocation function (unused)
* @return A recv_ret containing nothing of value.
*/
inline recv_ret receive_response( //mutils::DeserializationManager* dsm,
mutils::RemoteDeserialization_v* rdv,
inline recv_ret receive_response(
mutils::RemoteDeserialization_v* contexts,
const node_id_t& nid, const uint8_t* response,
const std::function<uint8_t*(int)>& f) {
constexpr std::is_same<void, Ret>* choice{nullptr};
mutils::DeserializationManager dsm{*rdv};
mutils::DeserializationManager dsm{*contexts};
return receive_response(choice, &dsm, nid, response, f);
}

Expand Down Expand Up @@ -347,18 +347,18 @@ struct RemoteInvocable<Tag, std::function<Ret(Args...)>> {
* Entry point for handling an RPC function call to this RemoteInvocable
* function. Called when a message is received that contains a request to
* call this function.
* @param rdv Deserialization vector
* @param contexts Deserialization context vector
* @param who The node that sent the message
* @param recv_buf The buffer containing the received message
* @param out_alloc A function that can allocate a buffer for the response message
* @return
*/
inline recv_ret receive_call( //mutils::DeserializationManager* dsm,
mutils::RemoteDeserialization_v* rdv,
inline recv_ret receive_call(
mutils::RemoteDeserialization_v* contexts,
const node_id_t& who, const uint8_t* recv_buf,
const std::function<uint8_t*(std::size_t)>& out_alloc) {
constexpr std::is_same<Ret, void>* choice{nullptr};
mutils::DeserializationManager dsm{*rdv};
mutils::DeserializationManager dsm{*contexts};
return this->receive_call(choice, &dsm, who, recv_buf, out_alloc);
}

Expand Down
4 changes: 2 additions & 2 deletions include/derecho/core/detail/replicated_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ void Replicated<T>::send_object_raw(tcp::socket& receiver_socket) const {

template <typename T>
std::size_t Replicated<T>::receive_object(uint8_t* buffer) {
// *user_object_ptr = std::move(mutils::from_bytes<T>(&group_rpc_manager.dsm, buffer));
mutils::RemoteDeserialization_v rdv{group_rpc_manager.rdv};
// Add this object's persistent registry to the list of deserialization contexts
mutils::RemoteDeserialization_v rdv{group_rpc_manager.deserialization_contexts};
rdv.insert(rdv.begin(), persistent_registry.get());
mutils::DeserializationManager dsm{rdv};
*user_object_ptr = std::move(mutils::from_bytes<T>(&dsm, buffer));
Expand Down
17 changes: 13 additions & 4 deletions include/derecho/core/detail/rpc_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ class RPCManager {
* from the targets of an earlier remote call.
* Note that a FunctionID is (class ID, subgroup ID, Function Tag). */
std::unique_ptr<std::map<Opcode, receive_fun_t>> receivers;
/** An emtpy DeserializationManager, in case we need it later. */
// mutils::DeserializationManager dsm{{}};
// Weijia: I prefer the deserialization context vector.
mutils::RemoteDeserialization_v rdv;
/**
* A copy of the user-provided deserialization context vector, which is
* also stored in Group. Provided to from_bytes when deserializing a user-
* defined Replicated Object.
*/
mutils::RemoteDeserialization_v deserialization_contexts;

template <typename T>
friend class ::derecho::Replicated; // Give only Replicated access to view_manager
Expand Down Expand Up @@ -256,6 +258,13 @@ class RPCManager {
const std::function<uint8_t*(int)>& out_alloc);

public:
/**
* Constructor
*
* @param group_view_manager A reference to the ViewManager object
* @param deserialization_context A reference to the vector of user-provided
* deserialization contexts from Group, which will be copied in
*/
RPCManager(ViewManager& group_view_manager,
const std::vector<DeserializationContext*>& deserialization_context);

Expand Down
6 changes: 2 additions & 4 deletions include/derecho/core/detail/rpc_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ struct recv_ret {
* some RPC message is received.
*/
using receive_fun_t = std::function<recv_ret(
mutils::RemoteDeserialization_v* rdv, const node_id_t&, const uint8_t* recv_buf,
mutils::RemoteDeserialization_v*, const node_id_t&, const uint8_t* recv_buf,
const std::function<uint8_t*(int)>& out_alloc)>;

//Forward declaration of PendingResults, to be used by QueryResults
Expand Down Expand Up @@ -1145,9 +1145,7 @@ inline void populate_header(uint8_t* reply_buf,
reinterpret_cast<uint32_t*>(reply_buf + offset)[0] = flags; // flags
}

//inline void retrieve_header(mutils::DeserializationManager* dsm,
inline void retrieve_header(mutils::RemoteDeserialization_v* rdv,
const uint8_t* reply_buf,
inline void retrieve_header(const uint8_t* reply_buf,
std::size_t& payload_size, Opcode& op,
node_id_t& from, uint32_t& flags) {
std::size_t offset = 0;
Expand Down
2 changes: 1 addition & 1 deletion include/derecho/core/external_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ExternalGroupClient {
std::queue<p2p_req> p2p_request_queue;
std::mutex request_queue_mutex;
std::condition_variable request_queue_cv;
mutils::RemoteDeserialization_v rdv;
mutils::RemoteDeserialization_v deserialization_contexts;
void p2p_receive_loop();
void p2p_request_worker();
void p2p_message_handler(node_id_t sender_id, uint8_t* msg_buf);
Expand Down
28 changes: 17 additions & 11 deletions include/derecho/core/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,16 @@ class Group : public virtual _Group, public GroupProjection<ReplicatedTypes>...

const node_id_t my_id;
/**
* The shared pointer holding deserialization context is obsolete. I (Weijia)
* removed it because it complicated things: the deserialization context is
* generally a big object containing the group handle; however, the group handle
* need to hold a shared pointer to the object, which causes a dependency loop
* and results in an object indirectly holding a shared pointer to its self.
* Another side effect is double free. So I change it back to the raw pointer.
* The user deserialization context for all objects serialized and deserialized. */
// std::shared_ptr<DeserializationContext> user_deserialization_context;
* A list (possibly empty) of user-provided deserialization contexts that are
* needed to help deserialize the Replicated Objects. These should be passed
* into the from_bytes method whenever from_bytes is called on a Replicated
* Object. They are stored as raw pointers, even though it is more dangerous,
* because Weijia found that trying to store a shared_ptr here complicated
* things: the deserialization context is generally a big object containing
* the group handle; however, the group handle need to hold a shared pointer
* to the object, which causes a dependency loop and results in an object
* indirectly holding a shared pointer to its self.
*/
std::vector<DeserializationContext*> user_deserialization_context;

/** Persist the objects. Once persisted, persistence_manager updates the SST
Expand Down Expand Up @@ -330,9 +332,13 @@ class Group : public virtual _Group, public GroupProjection<ReplicatedTypes>...
* events in this group.
* @param subgroup_info The set of functions that define how membership in
* each subgroup and shard will be determined in this group.
* @param deserialization_context The context used for deserialization
* purpose. The application is responsible to keep it alive during Group
* object lifetime.
* @param deserialization_context A list of pointers to deserialization
* contexts, which are objects needed to help deserialize user-provided
* Replicated Object classes. These context pointers will be provided in
* the DeserializationManager argument to from_bytes any time a Replicated
* Object is deserialized, e.g. during state transfer. The calling
* application is responsible for keeping these objects alive during the
* lifetime of the Group, since Group does not own the pointers.
* @param _view_upcalls A list of functions to be called when the group
* experiences a View-Change event (optional).
* @param factories A variable number of Factory functions, one for each
Expand Down
3 changes: 1 addition & 2 deletions src/applications/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ set(CMAKE_DISABLE_SOURCE_CHANGES ON)
set(CMAKE_DISABLE_IN_SOURCE_BUILD ON)

add_subdirectory(unit_tests)
add_subdirectory(performance_tests)
add_subdirectory(scalability_tests)
add_subdirectory(performance_tests)
15 changes: 8 additions & 7 deletions src/applications/tests/performance_tests/bandwidth_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
* in the only subgroup that consists of all the nodes
* Upon completion, the results are appended to file data_derecho_bw on the leader
*/
#include "aggregate_bandwidth.hpp"
#include "log_results.hpp"
#include "partial_senders_allocator.hpp"

#include <derecho/core/derecho.hpp>

#include <atomic>
#include <chrono>
#include <fstream>
#include <iostream>
Expand All @@ -15,12 +22,6 @@
#include <optional>
#include <vector>

#include <derecho/core/derecho.hpp>

#include "aggregate_bandwidth.hpp"
#include "log_results.hpp"
#include "partial_senders_allocator.hpp"

using std::cout;
using std::endl;
using std::map;
Expand Down Expand Up @@ -98,7 +99,7 @@ int main(int argc, char* argv[]) {
}

// variable 'done' tracks the end of the test
volatile bool done = false;
std::atomic<bool> done = false;
// callback into the application code at each message delivery
auto stability_callback = [&done,
total_num_messages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
* in the only subgroup that consists of all the nodes
* Upon completion, the results are appended to file data_deliver_predicate_delay on the leader
*/
#include "aggregate_bandwidth.hpp"
#include "log_results.hpp"
#include <derecho/core/derecho.hpp>

#include <atomic>
#include <fstream>
#include <iostream>
#include <map>
Expand All @@ -14,11 +19,6 @@
#include <time.h>
#include <vector>

#include "aggregate_bandwidth.hpp"
#include <derecho/core/derecho.hpp>

#include "log_results.hpp"

using std::cout;
using std::endl;
using std::map;
Expand Down Expand Up @@ -60,7 +60,7 @@ int main(int argc, char* argv[]) {
Conf::initialize(argc, argv);

// variable 'done' tracks the end of the test
volatile bool done = false;
std::atomic<bool> done = false;
// callback into the application code at each message delivery
auto stability_callback = [&num_messages,
&done,
Expand Down
3 changes: 2 additions & 1 deletion src/applications/tests/performance_tests/latency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* in the only subgroup that consists of all the nodes
* Upon completion, the results are appended to file data_latency on the leader
*/
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
Expand Down Expand Up @@ -88,7 +89,7 @@ int main(int argc, char* argv[]) {
vector<struct timespec> start_times(num_messages), end_times(num_messages);

// variable 'done' tracks the end of the test
volatile bool done = false;
std::atomic<bool> done = false;
uint32_t my_id;
// callback into the application code at each message delivery
auto stability_callback = [&, num_delivered = 0u, time_index = 0u](
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#include "aggregate_bandwidth.hpp"
#include "log_results.hpp"
#include <derecho/core/derecho.hpp>

#include <atomic>
#include <fstream>
#include <functional>
#include <iostream>
Expand All @@ -11,6 +8,10 @@
#include <time.h>
#include <vector>

#include "aggregate_bandwidth.hpp"
#include "log_results.hpp"
#include <derecho/core/derecho.hpp>

using std::cout;
using std::endl;
using std::map;
Expand Down Expand Up @@ -54,7 +55,7 @@ int main(int argc, char* argv[]) {
Conf::initialize(argc, argv);

// variable 'done' tracks the end of the test
volatile bool done = false;
std::atomic<bool> done = false;
// callback into the application code at each message delivery
auto stability_callback = [&num_messages,
&done,
Expand Down
Loading

0 comments on commit 1601460

Please sign in to comment.