diff --git a/include/derecho/core/detail/derecho_internal.hpp b/include/derecho/core/detail/derecho_internal.hpp index 60af5dcb..354a58c7 100644 --- a/include/derecho/core/detail/derecho_internal.hpp +++ b/include/derecho/core/detail/derecho_internal.hpp @@ -71,7 +71,12 @@ using rpc_handler_t = std::function::ExternalGroupClient( busy_wait_before_sleep_ms(getConfUInt64(Conf::DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS)) { RpcLoggerPtr::initialize(); for(auto dc : deserialization_contexts) { - rdv.push_back(dc); + deserialization_contexts.push_back(dc); } #ifdef USE_VERBS_API sst::verbs_initialize({}, @@ -383,7 +383,7 @@ std::exception_ptr ExternalGroupClient::receive_message( } std::size_t reply_header_size = header_space(); recv_ret reply_return = receiver_function_entry->second( - &rdv, received_from, buf, + &deserialization_contexts, received_from, buf, [&out_alloc, &reply_header_size](std::size_t size) { return out_alloc(size + reply_header_size) + reply_header_size; }); @@ -405,7 +405,7 @@ void ExternalGroupClient::p2p_message_handler(node_id_t send Opcode indx; node_id_t received_from; uint32_t flags; - retrieve_header(nullptr, msg_buf, payload_size, indx, received_from, flags); + retrieve_header(msg_buf, payload_size, indx, received_from, flags); if(indx.is_reply) { // REPLYs can be handled here because they do not block. receive_message(indx, received_from, msg_buf + header_size, payload_size, @@ -447,7 +447,7 @@ void ExternalGroupClient::p2p_request_worker() { request = p2p_request_queue.front(); p2p_request_queue.pop(); } - retrieve_header(nullptr, request.msg_buf, payload_size, indx, received_from, flags); + retrieve_header(request.msg_buf, payload_size, indx, received_from, flags); if(indx.is_reply || RPC_HEADER_FLAG_TST(flags, CASCADE)) { dbg_error(rpc_logger, "Invalid rpc message in fifo queue: is_reply={}, is_cascading={}", indx.is_reply, RPC_HEADER_FLAG_TST(flags, CASCADE)); diff --git a/include/derecho/core/detail/remote_invocable.hpp b/include/derecho/core/detail/remote_invocable.hpp index c52594a5..103fe250 100644 --- a/include/derecho/core/detail/remote_invocable.hpp +++ b/include/derecho/core/detail/remote_invocable.hpp @@ -199,18 +199,18 @@ struct RemoteInvoker> { /** * Entry point for responses; called when a message is received that * contains a response to this RemoteInvocable function's RPC call. - * @param rdv Deserialization vector + * @param contexts Deserialization context vector * @param nid The ID of the node that sent the response * @param response The byte buffer containing the response message - * @param f + * @param f An allocation function (unused) * @return A recv_ret containing nothing of value. */ - inline recv_ret receive_response( //mutils::DeserializationManager* dsm, - mutils::RemoteDeserialization_v* rdv, + inline recv_ret receive_response( + mutils::RemoteDeserialization_v* contexts, const node_id_t& nid, const uint8_t* response, const std::function& f) { constexpr std::is_same* choice{nullptr}; - mutils::DeserializationManager dsm{*rdv}; + mutils::DeserializationManager dsm{*contexts}; return receive_response(choice, &dsm, nid, response, f); } @@ -347,18 +347,18 @@ struct RemoteInvocable> { * Entry point for handling an RPC function call to this RemoteInvocable * function. Called when a message is received that contains a request to * call this function. - * @param rdv Deserialization vector + * @param contexts Deserialization context vector * @param who The node that sent the message * @param recv_buf The buffer containing the received message * @param out_alloc A function that can allocate a buffer for the response message * @return */ - inline recv_ret receive_call( //mutils::DeserializationManager* dsm, - mutils::RemoteDeserialization_v* rdv, + inline recv_ret receive_call( + mutils::RemoteDeserialization_v* contexts, const node_id_t& who, const uint8_t* recv_buf, const std::function& out_alloc) { constexpr std::is_same* choice{nullptr}; - mutils::DeserializationManager dsm{*rdv}; + mutils::DeserializationManager dsm{*contexts}; return this->receive_call(choice, &dsm, who, recv_buf, out_alloc); } diff --git a/include/derecho/core/detail/replicated_impl.hpp b/include/derecho/core/detail/replicated_impl.hpp index 9ef52734..b14a8b53 100644 --- a/include/derecho/core/detail/replicated_impl.hpp +++ b/include/derecho/core/detail/replicated_impl.hpp @@ -197,8 +197,8 @@ void Replicated::send_object_raw(tcp::socket& receiver_socket) const { template std::size_t Replicated::receive_object(uint8_t* buffer) { - // *user_object_ptr = std::move(mutils::from_bytes(&group_rpc_manager.dsm, buffer)); - mutils::RemoteDeserialization_v rdv{group_rpc_manager.rdv}; + // Add this object's persistent registry to the list of deserialization contexts + mutils::RemoteDeserialization_v rdv{group_rpc_manager.deserialization_contexts}; rdv.insert(rdv.begin(), persistent_registry.get()); mutils::DeserializationManager dsm{rdv}; *user_object_ptr = std::move(mutils::from_bytes(&dsm, buffer)); diff --git a/include/derecho/core/detail/rpc_manager.hpp b/include/derecho/core/detail/rpc_manager.hpp index b1a75ea9..d7a9dfd6 100644 --- a/include/derecho/core/detail/rpc_manager.hpp +++ b/include/derecho/core/detail/rpc_manager.hpp @@ -89,10 +89,12 @@ class RPCManager { * from the targets of an earlier remote call. * Note that a FunctionID is (class ID, subgroup ID, Function Tag). */ std::unique_ptr> receivers; - /** An emtpy DeserializationManager, in case we need it later. */ - // mutils::DeserializationManager dsm{{}}; - // Weijia: I prefer the deserialization context vector. - mutils::RemoteDeserialization_v rdv; + /** + * A copy of the user-provided deserialization context vector, which is + * also stored in Group. Provided to from_bytes when deserializing a user- + * defined Replicated Object. + */ + mutils::RemoteDeserialization_v deserialization_contexts; template friend class ::derecho::Replicated; // Give only Replicated access to view_manager @@ -256,6 +258,13 @@ class RPCManager { const std::function& out_alloc); public: + /** + * Constructor + * + * @param group_view_manager A reference to the ViewManager object + * @param deserialization_context A reference to the vector of user-provided + * deserialization contexts from Group, which will be copied in + */ RPCManager(ViewManager& group_view_manager, const std::vector& deserialization_context); diff --git a/include/derecho/core/detail/rpc_utils.hpp b/include/derecho/core/detail/rpc_utils.hpp index 5e2f1e42..ebe74e8e 100644 --- a/include/derecho/core/detail/rpc_utils.hpp +++ b/include/derecho/core/detail/rpc_utils.hpp @@ -234,7 +234,7 @@ struct recv_ret { * some RPC message is received. */ using receive_fun_t = std::function& out_alloc)>; //Forward declaration of PendingResults, to be used by QueryResults @@ -1145,9 +1145,7 @@ inline void populate_header(uint8_t* reply_buf, reinterpret_cast(reply_buf + offset)[0] = flags; // flags } -//inline void retrieve_header(mutils::DeserializationManager* dsm, -inline void retrieve_header(mutils::RemoteDeserialization_v* rdv, - const uint8_t* reply_buf, +inline void retrieve_header(const uint8_t* reply_buf, std::size_t& payload_size, Opcode& op, node_id_t& from, uint32_t& flags) { std::size_t offset = 0; diff --git a/include/derecho/core/external_group.hpp b/include/derecho/core/external_group.hpp index 91a8655e..f89ccd60 100644 --- a/include/derecho/core/external_group.hpp +++ b/include/derecho/core/external_group.hpp @@ -177,7 +177,7 @@ class ExternalGroupClient { std::queue p2p_request_queue; std::mutex request_queue_mutex; std::condition_variable request_queue_cv; - mutils::RemoteDeserialization_v rdv; + mutils::RemoteDeserialization_v deserialization_contexts; void p2p_receive_loop(); void p2p_request_worker(); void p2p_message_handler(node_id_t sender_id, uint8_t* msg_buf); diff --git a/include/derecho/core/group.hpp b/include/derecho/core/group.hpp index 7921ceba..9b7c9a15 100644 --- a/include/derecho/core/group.hpp +++ b/include/derecho/core/group.hpp @@ -188,14 +188,16 @@ class Group : public virtual _Group, public GroupProjection... const node_id_t my_id; /** - * The shared pointer holding deserialization context is obsolete. I (Weijia) - * removed it because it complicated things: the deserialization context is - * generally a big object containing the group handle; however, the group handle - * need to hold a shared pointer to the object, which causes a dependency loop - * and results in an object indirectly holding a shared pointer to its self. - * Another side effect is double free. So I change it back to the raw pointer. - * The user deserialization context for all objects serialized and deserialized. */ - // std::shared_ptr user_deserialization_context; + * A list (possibly empty) of user-provided deserialization contexts that are + * needed to help deserialize the Replicated Objects. These should be passed + * into the from_bytes method whenever from_bytes is called on a Replicated + * Object. They are stored as raw pointers, even though it is more dangerous, + * because Weijia found that trying to store a shared_ptr here complicated + * things: the deserialization context is generally a big object containing + * the group handle; however, the group handle need to hold a shared pointer + * to the object, which causes a dependency loop and results in an object + * indirectly holding a shared pointer to its self. + */ std::vector user_deserialization_context; /** Persist the objects. Once persisted, persistence_manager updates the SST @@ -330,9 +332,13 @@ class Group : public virtual _Group, public GroupProjection... * events in this group. * @param subgroup_info The set of functions that define how membership in * each subgroup and shard will be determined in this group. - * @param deserialization_context The context used for deserialization - * purpose. The application is responsible to keep it alive during Group - * object lifetime. + * @param deserialization_context A list of pointers to deserialization + * contexts, which are objects needed to help deserialize user-provided + * Replicated Object classes. These context pointers will be provided in + * the DeserializationManager argument to from_bytes any time a Replicated + * Object is deserialized, e.g. during state transfer. The calling + * application is responsible for keeping these objects alive during the + * lifetime of the Group, since Group does not own the pointers. * @param _view_upcalls A list of functions to be called when the group * experiences a View-Change event (optional). * @param factories A variable number of Factory functions, one for each diff --git a/src/applications/tests/CMakeLists.txt b/src/applications/tests/CMakeLists.txt index 03bc9bac..e9264ffc 100644 --- a/src/applications/tests/CMakeLists.txt +++ b/src/applications/tests/CMakeLists.txt @@ -3,5 +3,4 @@ set(CMAKE_DISABLE_SOURCE_CHANGES ON) set(CMAKE_DISABLE_IN_SOURCE_BUILD ON) add_subdirectory(unit_tests) -add_subdirectory(performance_tests) -add_subdirectory(scalability_tests) +add_subdirectory(performance_tests) \ No newline at end of file diff --git a/src/applications/tests/performance_tests/bandwidth_test.cpp b/src/applications/tests/performance_tests/bandwidth_test.cpp index 09dc2c6f..59a80c6b 100644 --- a/src/applications/tests/performance_tests/bandwidth_test.cpp +++ b/src/applications/tests/performance_tests/bandwidth_test.cpp @@ -7,6 +7,13 @@ * in the only subgroup that consists of all the nodes * Upon completion, the results are appended to file data_derecho_bw on the leader */ +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include "partial_senders_allocator.hpp" + +#include + +#include #include #include #include @@ -15,12 +22,6 @@ #include #include -#include - -#include "aggregate_bandwidth.hpp" -#include "log_results.hpp" -#include "partial_senders_allocator.hpp" - using std::cout; using std::endl; using std::map; @@ -98,7 +99,7 @@ int main(int argc, char* argv[]) { } // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&done, total_num_messages, diff --git a/src/applications/tests/performance_tests/deliver_predicate_delay.cpp b/src/applications/tests/performance_tests/deliver_predicate_delay.cpp index 92881b08..b4e52fab 100644 --- a/src/applications/tests/performance_tests/deliver_predicate_delay.cpp +++ b/src/applications/tests/performance_tests/deliver_predicate_delay.cpp @@ -6,6 +6,11 @@ * in the only subgroup that consists of all the nodes * Upon completion, the results are appended to file data_deliver_predicate_delay on the leader */ +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include + +#include #include #include #include @@ -14,11 +19,6 @@ #include #include -#include "aggregate_bandwidth.hpp" -#include - -#include "log_results.hpp" - using std::cout; using std::endl; using std::map; @@ -60,7 +60,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&num_messages, &done, diff --git a/src/applications/tests/performance_tests/latency_test.cpp b/src/applications/tests/performance_tests/latency_test.cpp index 70cb32f8..d738bb08 100644 --- a/src/applications/tests/performance_tests/latency_test.cpp +++ b/src/applications/tests/performance_tests/latency_test.cpp @@ -10,6 +10,7 @@ * in the only subgroup that consists of all the nodes * Upon completion, the results are appended to file data_latency on the leader */ +#include #include #include #include @@ -88,7 +89,7 @@ int main(int argc, char* argv[]) { vector start_times(num_messages), end_times(num_messages); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; uint32_t my_id; // callback into the application code at each message delivery auto stability_callback = [&, num_delivered = 0u, time_index = 0u]( diff --git a/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp b/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp index 9722d559..bed1848d 100644 --- a/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp +++ b/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp @@ -1,7 +1,4 @@ -#include "aggregate_bandwidth.hpp" -#include "log_results.hpp" -#include - +#include #include #include #include @@ -11,6 +8,10 @@ #include #include +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include + using std::cout; using std::endl; using std::map; @@ -54,7 +55,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&num_messages, &done, diff --git a/src/applications/tests/performance_tests/persistent_bw_test.cpp b/src/applications/tests/performance_tests/persistent_bw_test.cpp index 45785fe0..6f8ba180 100644 --- a/src/applications/tests/performance_tests/persistent_bw_test.cpp +++ b/src/applications/tests/performance_tests/persistent_bw_test.cpp @@ -1,3 +1,10 @@ +#include "aggregate_bandwidth.hpp" +#include "bytes_object.hpp" +#include "log_results.hpp" +#include "partial_senders_allocator.hpp" + +#include + #include #include #include @@ -6,36 +13,79 @@ #include #include -#include - -#include "aggregate_bandwidth.hpp" -#include "bytes_object.hpp" -#include "log_results.hpp" -#include "partial_senders_allocator.hpp" - using std::cout; using std::endl; using namespace persistent; using namespace std::chrono; -class ByteArrayObject : public mutils::ByteRepresentable, public derecho::PersistsFields { +/** + * State shared between the replicated test objects and the main thread. The replicated + * objects get a pointer to this object using DeserializationManager. + */ +struct TestState : public derecho::DeserializationContext { + uint64_t total_num_messages; + // Used to signal the main thread that the experiment is done + std::atomic experiment_done; + // Set by ByteArrayObject when the last RPC message is delivered and its version is known + persistent::version_t last_version; + // Used to alert the global persistence callback that last_version is ready + std::atomic last_version_set; + // The time the last RPC message is delivered to ByteArrayObject + steady_clock::time_point send_complete_time; + // Set by the global persistence callback when version last_version is persisted + steady_clock::time_point persist_complete_time; +}; + +// Note: Inheriting GroupReference is necessary so the RPC method can get the object's current version +class ByteArrayObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::PersistsFields { public: Persistent pers_bytes; + uint64_t messages_received; + TestState* test_state; - void change_pers_bytes(const test::Bytes& bytes) { - *pers_bytes = bytes; - } + void change_pers_bytes(const test::Bytes& bytes); // deserialization constructor - ByteArrayObject(Persistent& _p_bytes) : pers_bytes(std::move(_p_bytes)) {} + ByteArrayObject(Persistent& _p_bytes, uint64_t messages_received, TestState* test_state) + : pers_bytes(std::move(_p_bytes)), messages_received(messages_received), test_state(test_state) {} // default constructor - ByteArrayObject(PersistentRegistry* pr) - : pers_bytes(pr) {} + ByteArrayObject(PersistentRegistry* pr, TestState* test_state) + : pers_bytes(pr), messages_received(0), test_state(test_state) {} REGISTER_RPC_FUNCTIONS(ByteArrayObject, ORDERED_TARGETS(change_pers_bytes)); - DEFAULT_SERIALIZATION_SUPPORT(ByteArrayObject, pers_bytes); + DEFAULT_SERIALIZE(pers_bytes, messages_received); + DEFAULT_DESERIALIZE_NOALLOC(ByteArrayObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer); }; +void ByteArrayObject::change_pers_bytes(const test::Bytes& bytes) { + *pers_bytes = bytes; + // This logic used to be in the global stability callback, but now it needs to be here + // because global stability callbacks were disabled for RPC messages. It also now requires + // a call to get_subgroup().get_current_version(), which means we can't write the method inline. + auto ver = this->group->template get_subgroup(this->subgroup_index).get_current_version(); + ++messages_received; + if(messages_received == test_state->total_num_messages) { + test_state->send_complete_time = std::chrono::steady_clock::now(); + test_state->last_version = std::get<0>(ver); + test_state->last_version_set = true; + } +} + +std::unique_ptr ByteArrayObject::from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer) { + // Default serialization will serialize each named field in order, so deserialize in the same order + auto pers_bytes_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*pers_bytes_ptr); + auto messages_received_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + // Get the TestState pointer from the DeserializationManager + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*pers_bytes_ptr, *messages_received_ptr, test_state_ptr); +} + struct persistent_bw_result { int num_nodes; int num_senders_selector; @@ -90,63 +140,43 @@ int main(int argc, char* argv[]) { derecho::Conf::initialize(argc, argv); - steady_clock::time_point begin_time, send_complete_time, persist_complete_time; + TestState shared_test_state; + shared_test_state.experiment_done = false; + shared_test_state.last_version_set = false; + steady_clock::time_point begin_time; bool is_sending = true; - long total_num_messages; switch(sender_selector) { case PartialSendMode::ALL_SENDERS: - total_num_messages = num_of_nodes * num_msgs; + shared_test_state.total_num_messages = num_of_nodes * num_msgs; break; case PartialSendMode::HALF_SENDERS: - total_num_messages = (num_of_nodes / 2) * num_msgs; + shared_test_state.total_num_messages = (num_of_nodes / 2) * num_msgs; break; case PartialSendMode::ONE_SENDER: - total_num_messages = num_msgs; + shared_test_state.total_num_messages = num_msgs; break; } - // variable 'done' tracks the end of the test - volatile bool done = false; - - // last_version and its flag is shared between the stability callback and persistence callback. - // This is a clumsy hack to figure out what version number is assigned to the last delivered message. - persistent::version_t last_version; - std::atomic last_version_set = false; - - auto stability_callback = [&last_version, - &last_version_set, - &send_complete_time, - total_num_messages, - num_delivered = 0u](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - //Count the total number of messages delivered - ++num_delivered; - if(num_delivered == total_num_messages) { - send_complete_time = std::chrono::steady_clock::now(); - last_version = ver; - last_version_set = true; - } - }; auto persistence_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) { - if(last_version_set && ver == last_version) { - persist_complete_time = std::chrono::steady_clock::now(); - done = true; + if(shared_test_state.last_version_set && ver == shared_test_state.last_version) { + shared_test_state.persist_complete_time = std::chrono::steady_clock::now(); + shared_test_state.experiment_done = true; } }; derecho::UserMessageCallbacks callback_set{ - stability_callback, + nullptr, nullptr, persistence_callback}; derecho::SubgroupInfo subgroup_info(PartialSendersAllocator(num_of_nodes, sender_selector)); - auto ba_factory = [](PersistentRegistry* pr, derecho::subgroup_id_t) { return std::make_unique(pr); }; + auto ba_factory = [&shared_test_state](PersistentRegistry* pr, derecho::subgroup_id_t) { + return std::make_unique(pr, &shared_test_state); + }; - derecho::Group group{callback_set, subgroup_info, {}, std::vector{}, ba_factory}; + derecho::Group group(callback_set, subgroup_info, {&shared_test_state}, + std::vector{}, ba_factory); std::cout << "Finished constructing/joining Group" << std::endl; @@ -176,24 +206,26 @@ int main(int argc, char* argv[]) { #endif //_PERFORMANCE_DEBUG } - while(!done) { + while(!shared_test_state.experiment_done) { } - int64_t send_nanosec = duration_cast(send_complete_time - begin_time).count(); + delete[] bbuf; + + int64_t send_nanosec = duration_cast(shared_test_state.send_complete_time - begin_time).count(); double send_millisec = static_cast(send_nanosec) / 1000000; - int64_t persist_nanosec = duration_cast(persist_complete_time - begin_time).count(); + int64_t persist_nanosec = duration_cast(shared_test_state.persist_complete_time - begin_time).count(); double persist_millisec = static_cast(persist_nanosec) / 1000000; //Calculate bandwidth //Bytes / nanosecond just happens to be equivalent to GigaBytes / second (in "decimal" GB) //Note that total_num_messages already incorporates multiplying by the number of senders - double send_thp_gbps = (static_cast(total_num_messages) * msg_size) / send_nanosec; - double send_thp_ops = (static_cast(total_num_messages) * 1000000000) / send_nanosec; + double send_thp_gbps = (static_cast(shared_test_state.total_num_messages) * msg_size) / send_nanosec; + double send_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / send_nanosec; std::cout << "(send)timespan: " << send_millisec << " milliseconds." << std::endl; std::cout << "(send)throughput: " << send_thp_gbps << "GB/s." << std::endl; std::cout << "(send)throughput: " << send_thp_ops << "ops." << std::endl; - double thp_gbps = (static_cast(total_num_messages) * msg_size) / persist_nanosec; - double thp_ops = (static_cast(total_num_messages) * 1000000000) / persist_nanosec; + double thp_gbps = (static_cast(shared_test_state.total_num_messages) * msg_size) / persist_nanosec; + double thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / persist_nanosec; std::cout << "(pers)timespan: " << persist_millisec << " millisecond." << std::endl; std::cout << "(pers)throughput: " << thp_gbps << "GB/s." << std::endl; std::cout << "(pers)throughput: " << thp_ops << "ops." << std::endl; diff --git a/src/applications/tests/performance_tests/sender_delay_test.cpp b/src/applications/tests/performance_tests/sender_delay_test.cpp index da14edde..469b793f 100644 --- a/src/applications/tests/performance_tests/sender_delay_test.cpp +++ b/src/applications/tests/performance_tests/sender_delay_test.cpp @@ -1,3 +1,8 @@ +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include + +#include #include #include #include @@ -6,9 +11,6 @@ #include #include -#include "aggregate_bandwidth.hpp" -#include "log_results.hpp" -#include using std::cout; using std::endl; @@ -35,7 +37,7 @@ struct exp_result { } }; -void busy_wait(uint32_t wait_time, volatile bool& done) { +void busy_wait(uint32_t wait_time, std::atomic& done) { struct timespec start_time, end_time; clock_gettime(CLOCK_REALTIME, &start_time); @@ -80,7 +82,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // start and end time to compute bw struct timespec start_time; long long int nanoseconds_elapsed; diff --git a/src/applications/tests/performance_tests/signed_bw_test.cpp b/src/applications/tests/performance_tests/signed_bw_test.cpp index 13751255..bba222b5 100644 --- a/src/applications/tests/performance_tests/signed_bw_test.cpp +++ b/src/applications/tests/performance_tests/signed_bw_test.cpp @@ -1,3 +1,10 @@ +#include "aggregate_bandwidth.hpp" +#include "bytes_object.hpp" +#include "log_results.hpp" +#include "partial_senders_allocator.hpp" + +#include + #include #include #include @@ -6,36 +13,85 @@ #include #include -#include - -#include "aggregate_bandwidth.hpp" -#include "bytes_object.hpp" -#include "log_results.hpp" -#include "partial_senders_allocator.hpp" - using std::cout; using std::endl; +using std::chrono::steady_clock; using namespace persistent; -class ByteArrayObject : public mutils::ByteRepresentable, public derecho::SignedPersistentFields { +/** + * State shared between the replicated test objects and the main thread. The replicated + * objects get a pointer to this object using DeserializationManager. + */ +struct TestState : public derecho::DeserializationContext { + uint64_t total_num_messages; + // Used to signal the main thread that the experiment is done + std::atomic experiment_done; + // Set by ByteArrayObject when the last RPC message is delivered and its version is known + persistent::version_t last_version; + // Used to alert the global persistence callback that last_version is ready + std::atomic last_version_set; + // The time the last RPC message is delivered to ByteArrayObject + steady_clock::time_point send_complete_time; + // Set by the global persistence callback when version last_version is persisted + steady_clock::time_point persist_complete_time; + // Set by the global verification callback when version last_version is verified + steady_clock::time_point verify_complete_time; +}; + +class ByteArrayObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { public: Persistent pers_bytes; + uint64_t messages_received; + TestState* test_state; - void change_pers_bytes(const test::Bytes& bytes) { - *pers_bytes = bytes; - } + void change_pers_bytes(const test::Bytes& bytes); // default constructor - ByteArrayObject(PersistentRegistry* pr) - : pers_bytes(pr, true) {} + ByteArrayObject(PersistentRegistry* pr, TestState* test_state) + : pers_bytes(pr, true), + messages_received(0), + test_state(test_state) {} // deserialization constructor - ByteArrayObject(Persistent& _p_bytes) : pers_bytes(std::move(_p_bytes)) {} + ByteArrayObject(Persistent& _p_bytes, uint64_t _messages_received, TestState* _test_state) + : pers_bytes(std::move(_p_bytes)), + messages_received(_messages_received), + test_state(_test_state) {} REGISTER_RPC_FUNCTIONS(ByteArrayObject, ORDERED_TARGETS(change_pers_bytes)); - DEFAULT_SERIALIZATION_SUPPORT(ByteArrayObject, pers_bytes); + DEFAULT_SERIALIZE(pers_bytes, messages_received); + DEFAULT_DESERIALIZE_NOALLOC(ByteArrayObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer); }; +void ByteArrayObject::change_pers_bytes(const test::Bytes& bytes) { + *pers_bytes = bytes; + // This logic used to be in the global stability callback, but now it needs to be here + // because global stability callbacks were disabled for RPC messages. It also now requires + // a call to get_subgroup().get_current_version(), which means we can't write the method inline. + auto ver = this->group->template get_subgroup(this->subgroup_index).get_current_version(); + ++messages_received; + if(messages_received == test_state->total_num_messages) { + test_state->send_complete_time = std::chrono::steady_clock::now(); + test_state->last_version = std::get<0>(ver); + test_state->last_version_set = true; + } +} + +std::unique_ptr ByteArrayObject::from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer) { + // Default serialization will serialize each named field in order, so deserialize in the same order + auto pers_bytes_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*pers_bytes_ptr); + auto messages_received_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + // Get the TestState pointer from the DeserializationManager + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*pers_bytes_ptr, *messages_received_ptr, test_state_ptr); +} + struct signed_bw_result { int num_nodes; int num_senders_selector; @@ -94,70 +150,49 @@ int main(int argc, char* argv[]) { pthread_setname_np(pthread_self(), DEFAULT_PROC_NAME); } - steady_clock::time_point begin_time, send_complete_time, persist_complete_time, verify_complete_time; + TestState shared_test_state; + shared_test_state.experiment_done = false; + shared_test_state.last_version_set = false; + steady_clock::time_point begin_time; bool is_sending = true; - long total_num_messages; switch(sender_selector) { case PartialSendMode::ALL_SENDERS: - total_num_messages = num_of_nodes * num_msgs; + shared_test_state.total_num_messages = num_of_nodes * num_msgs; break; case PartialSendMode::HALF_SENDERS: - total_num_messages = (num_of_nodes / 2) * num_msgs; + shared_test_state.total_num_messages = (num_of_nodes / 2) * num_msgs; break; case PartialSendMode::ONE_SENDER: - total_num_messages = num_msgs; + shared_test_state.total_num_messages = num_msgs; break; } - // variable 'done' tracks the end of the test - volatile bool done = false; - - // last_version and its flag is shared between the stability callback and persistence callback. - // This is a clumsy hack to figure out what version number is assigned to the last delivered message. - persistent::version_t last_version; - std::atomic last_version_set = false; - - auto stability_callback = [&last_version, - &last_version_set, - &send_complete_time, - total_num_messages, - num_delivered = 0u](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - //Count the total number of messages delivered - ++num_delivered; - if(num_delivered == total_num_messages) { - send_complete_time = std::chrono::steady_clock::now(); - last_version = ver; - last_version_set = true; - } - }; auto persistence_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) { - if(last_version_set && ver == last_version) { - persist_complete_time = std::chrono::steady_clock::now(); + if(shared_test_state.last_version_set && ver == shared_test_state.last_version) { + shared_test_state.persist_complete_time = steady_clock::now(); } }; auto verified_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) { - if(last_version_set && ver == last_version) { - verify_complete_time = std::chrono::steady_clock::now(); - done = true; + if(shared_test_state.last_version_set && ver == shared_test_state.last_version) { + shared_test_state.verify_complete_time = steady_clock::now(); + shared_test_state.experiment_done = true; } }; derecho::UserMessageCallbacks callback_set{ - stability_callback, - persistence_callback, nullptr, + nullptr, + persistence_callback, verified_callback}; derecho::SubgroupInfo subgroup_info(PartialSendersAllocator(num_of_nodes, sender_selector)); - auto ba_factory = [](PersistentRegistry* pr, derecho::subgroup_id_t) { return std::make_unique(pr); }; + auto ba_factory = [&shared_test_state](PersistentRegistry* pr, derecho::subgroup_id_t) { + return std::make_unique(pr, &shared_test_state); + }; - derecho::Group group(callback_set, subgroup_info, {}, + derecho::Group group(callback_set, subgroup_info, {&shared_test_state}, std::vector{}, ba_factory); auto node_rank = group.get_my_rank(); @@ -186,32 +221,32 @@ int main(int argc, char* argv[]) { #endif //_PERFORMANCE_DEBUG } - while(!done) { + while(!shared_test_state.experiment_done) { } - int64_t send_nanosec = duration_cast(send_complete_time - begin_time).count(); + int64_t send_nanosec = duration_cast(shared_test_state.send_complete_time - begin_time).count(); double send_millisec = static_cast(send_nanosec) / 1000000; - int64_t persist_nanosec = duration_cast(persist_complete_time - begin_time).count(); + int64_t persist_nanosec = duration_cast(shared_test_state.persist_complete_time - begin_time).count(); double persist_millisec = static_cast(persist_nanosec) / 1000000; - int64_t verified_nanosec = duration_cast(verify_complete_time - begin_time).count(); + int64_t verified_nanosec = duration_cast(shared_test_state.verify_complete_time - begin_time).count(); double verified_millisec = static_cast(verified_nanosec) / 1000000; //Calculate bandwidth //Bytes / nanosecond just happens to be equivalent to GigaBytes / second (in "decimal" GB) //Note that total_num_messages already incorporates multiplying by the number of senders - double send_thp_gbps = (static_cast(total_num_messages) * msg_size) / send_nanosec; - double send_thp_ops = (static_cast(total_num_messages) * 1000000000) / send_nanosec; + double send_thp_gbps = (static_cast(shared_test_state.total_num_messages) * msg_size) / send_nanosec; + double send_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / send_nanosec; std::cout << "(send)timespan: " << send_millisec << " milliseconds." << std::endl; std::cout << "(send)throughput: " << send_thp_gbps << "GB/s." << std::endl; std::cout << "(send)throughput: " << send_thp_ops << "ops." << std::endl; - double persist_thp_gbs = (static_cast(total_num_messages) * msg_size) / persist_nanosec; - double persist_thp_ops = (static_cast(total_num_messages) * 1000000000) / persist_nanosec; + double persist_thp_gbs = (static_cast(shared_test_state.total_num_messages) * msg_size) / persist_nanosec; + double persist_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / persist_nanosec; std::cout << "(pers)timespan: " << persist_millisec << " millisecond." << std::endl; std::cout << "(pers)throughput: " << persist_thp_gbs << "GB/s." << std::endl; std::cout << "(pers)throughput: " << persist_thp_ops << "ops." << std::endl; - double verified_thp_gbs = (static_cast(total_num_messages) * msg_size) / verified_nanosec; - double verified_thp_ops = (static_cast(total_num_messages) * 1000000000) / verified_nanosec; + double verified_thp_gbs = (static_cast(shared_test_state.total_num_messages) * msg_size) / verified_nanosec; + double verified_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / verified_nanosec; std::cout << "(verify)timespan: " << verified_millisec << " millisecond." << std::endl; std::cout << "(verify)throughput: " << verified_thp_gbs << "GB/s." << std::endl; std::cout << "(verify)throughput: " << verified_thp_ops << "ops." << std::endl; diff --git a/src/applications/tests/performance_tests/signed_store_test.cpp b/src/applications/tests/performance_tests/signed_store_test.cpp index 25a4fc23..55bcf333 100644 --- a/src/applications/tests/performance_tests/signed_store_test.cpp +++ b/src/applications/tests/performance_tests/signed_store_test.cpp @@ -223,9 +223,9 @@ bool ClientTier::update_batch_test(const int& num_updates) const { /* ---------------- SignatureStore implementation --------------------- */ SignatureStore::SignatureStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done) + ExperimentState* experiment_state) : hashes(std::make_unique, "SignedHashLog", pr, true), - experiment_done(experiment_done) {} + experiment_state(experiment_state) {} std::vector SignatureStore::add_hash(const SHA256Hash& hash) const { derecho::Replicated& this_subgroup = group->get_subgroup(this->subgroup_index); @@ -253,15 +253,22 @@ void SignatureStore::ordered_add_hash(const SHA256Hash& hash) { void SignatureStore::end_test() const { std::cout << "Received the end_test message, shutting down" << std::endl; - *experiment_done = true; + experiment_state->done = true; +} + +std::unique_ptr SignatureStore::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto hash_log_ptr = mutils::from_bytes>(dsm, buffer); + assert(dsm->registered()); + ExperimentState* experiment_state_ptr = &(dsm->mgr()); + return std::make_unique(*hash_log_ptr, experiment_state_ptr); } /* ----------------- ObjectStore implementation ------------------------ */ ObjectStore::ObjectStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done) + ExperimentState* experiment_state) : object_log(std::make_unique, "BlobLog", pr, false), - experiment_done(experiment_done) {} + experiment_state(experiment_state) {} std::pair ObjectStore::update(const Blob& new_data) const { derecho::Replicated& this_subgroup = group->get_subgroup(this->subgroup_index); @@ -299,8 +306,15 @@ Blob ObjectStore::get_latest() const { } void ObjectStore::end_test() const { - std::cout << "Recieved the end_test message, shutting down" << std::endl; - *experiment_done = true; + std::cout << "Received the end_test message, shutting down" << std::endl; + experiment_state->done = true; +} + +std::unique_ptr ObjectStore::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto object_log_ptr = mutils::from_bytes>(dsm, buffer); + assert(dsm->registered()); + ExperimentState* experiment_state_ptr = &(dsm->mgr()); + return std::make_unique(*object_log_ptr, experiment_state_ptr); } /* -------------------------------------------------------------------- */ @@ -352,16 +366,17 @@ int main(int argc, char** argv) { + derecho::remote_invocation_utilities::header_space(); const std::size_t update_size = derecho::getConfUInt64(derecho::Conf::SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE) - rpc_header_size; steady_clock::time_point begin_time, batch_complete_time; - //This atomic flag will be shared with the SignatureStore or ObjectStore subgroup, - //if this node ends up in that group - std::shared_ptr> experiment_done = std::make_shared>(false); + //This DeserializationContext, containing a flag to indicate when the experiment is done, + //will be shared with the SignatureStore or ObjectStore subgroup if this node ends up in that group + ExperimentState experiment_state; + experiment_state.done = false; auto object_subgroup_factory = [&](persistent::PersistentRegistry* registry, derecho::subgroup_id_t subgroup_id) { - return std::make_unique(registry, experiment_done); + return std::make_unique(registry, &experiment_state); }; auto signature_subgroup_factory = [&](persistent::PersistentRegistry* registry, derecho::subgroup_id_t subgroup_id) { - return std::make_unique(registry, experiment_done); + return std::make_unique(registry, &experiment_state); }; //Subgroup and shard layout @@ -380,7 +395,7 @@ int main(int argc, char** argv) { derecho::Group group( {nullptr, nullptr, nullptr, nullptr}, subgroup_layout, - {}, {}, + {&experiment_state}, {}, client_node_factory, object_subgroup_factory, signature_subgroup_factory); @@ -431,12 +446,12 @@ int main(int argc, char** argv) { std::cout << "Assigned the SignatureStore role." << std::endl; std::cout << "Waiting for the end_test message." << std::endl; //Wait for this flag to be flipped by the end_test message handler - while(!(*experiment_done)) { + while(!(experiment_state.done)) { } } else if(my_storage_shard != -1) { std::cout << "Assigned the ObjectStore role." << std::endl; std::cout << "Waiting for the end_test message." << std::endl; - while(!(*experiment_done)) { + while(!(experiment_state.done)) { } } else { std::cout << "Not assigned to any role (?!)" << std::endl; diff --git a/src/applications/tests/performance_tests/signed_store_test.hpp b/src/applications/tests/performance_tests/signed_store_test.hpp index 9ea0e110..c0e77295 100644 --- a/src/applications/tests/performance_tests/signed_store_test.hpp +++ b/src/applications/tests/performance_tests/signed_store_test.hpp @@ -71,6 +71,14 @@ class Blob : public mutils::ByteRepresentable { Blob(uint8_t* buffer, std::size_t size, bool temporary); }; +/** + * A DeserializationContext object used to share state between the replicated + * objects (e.g. ObjectStore, SignatureStore) and the main thread. + */ +struct ExperimentState : public derecho::DeserializationContext { + std::atomic done; +}; + class ObjectStore : public mutils::ByteRepresentable, public derecho::PersistsFields, public derecho::GroupReference { @@ -83,17 +91,20 @@ class ObjectStore : public mutils::ByteRepresentable, * Shared with the main thread to tell it when the experiment is done and it should * call group.leave() (which can't be done from inside this subgroup object). */ - std::shared_ptr> experiment_done; + ExperimentState* experiment_state; public: ObjectStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done); + ExperimentState* experiment_state); /** Deserialization constructor */ - ObjectStore(persistent::Persistent& other_log) : object_log(std::move(other_log)) {} + ObjectStore(persistent::Persistent& other_log, + ExperimentState* experiment_state) + : object_log(std::move(other_log)), + experiment_state(experiment_state) {} /** * P2P-callable function that creates a new log entry with the provided data. * @return The version assigned to the new log entry, and the timestamp assigned to the new log entry - */ + */ std::pair update(const Blob& new_data) const; /** Actual implementation of update, only callable from within the subgroup as an ordered send. */ @@ -123,7 +134,10 @@ class ObjectStore : public mutils::ByteRepresentable, */ void end_test() const; - DEFAULT_SERIALIZATION_SUPPORT(ObjectStore, object_log); + DEFAULT_SERIALIZE(object_log); + DEFAULT_DESERIALIZE_NOALLOC(ObjectStore); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + REGISTER_RPC_FUNCTIONS(ObjectStore, ORDERED_TARGETS(ordered_update), P2P_TARGETS(update, await_persistence, get, get_latest, end_test)); }; @@ -138,16 +152,18 @@ class SignatureStore : public mutils::ByteRepresentable, persistent::Persistent hashes; /** - * Shared with the main thread to tell it when the experiment is done and it should - * call group.leave() (which can't be done from inside this subgroup object). + * Shared with the main thread to tell it when the experiment is done. */ - std::shared_ptr> experiment_done; + ExperimentState* experiment_state; public: SignatureStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done); + ExperimentState* experiment_state); /** Deserialization constructor */ - SignatureStore(persistent::Persistent& other_hashes) : hashes(std::move(other_hashes)) {} + SignatureStore(persistent::Persistent& other_hashes, + ExperimentState* experiment_state) + : hashes(std::move(other_hashes)), + experiment_state(experiment_state) {} /** * P2P-callable function that appends a new object-update hash to the signed log. @@ -168,7 +184,10 @@ class SignatureStore : public mutils::ByteRepresentable, */ void end_test() const; - DEFAULT_SERIALIZATION_SUPPORT(SignatureStore, hashes); + DEFAULT_SERIALIZE(hashes); + DEFAULT_DESERIALIZE_NOALLOC(SignatureStore); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + REGISTER_RPC_FUNCTIONS(SignatureStore, P2P_TARGETS(add_hash, end_test), ORDERED_TARGETS(ordered_add_hash)); }; diff --git a/src/applications/tests/performance_tests/single_active_subgroup_test.cpp b/src/applications/tests/performance_tests/single_active_subgroup_test.cpp index c2fca0aa..4b35e2ef 100644 --- a/src/applications/tests/performance_tests/single_active_subgroup_test.cpp +++ b/src/applications/tests/performance_tests/single_active_subgroup_test.cpp @@ -2,6 +2,7 @@ #include "log_results.hpp" #include +#include #include #include #include @@ -53,7 +54,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&num_messages, &done, diff --git a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp index a1772c42..280b779f 100644 --- a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp +++ b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -21,21 +22,66 @@ using test::Bytes; using namespace std::chrono; /** - * RPC Object with a single function that accepts a string + * State shared between the (replicated) TestObjects and the main thread */ -class TestObject { -public: - void fun(const std::string& words) { - } +struct TestState : public derecho::DeserializationContext { + std::atomic experiment_done; + steady_clock::time_point send_complete_time; +}; + +/** + * RPC Object with a single function that accepts a byte array and counts + * the number of updates it has received, comparing it to a total provided + * in the constructor. When the total number of messages received by the + * RPC function equals the expected total, it records the time and signals + * a shared atomic flag to indicate the experiment is complete. + */ +class TestObject : public mutils::ByteRepresentable { + uint64_t messages_received; + const uint64_t total_num_messages; + // Pointer to a TestState object held by the main thread + TestState* main_test_state; +public: void bytes_fun(const Bytes& bytes) { + ++messages_received; + if(messages_received == total_num_messages) { + main_test_state->send_complete_time = std::chrono::steady_clock::now(); + main_test_state->experiment_done = true; + } } - bool finishing_call(int x) { - return true; - } + REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(bytes_fun)); + DEFAULT_SERIALIZE(messages_received, total_num_messages); + // Custom deserialization so we can use the DeserializationManager + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + DEFAULT_DESERIALIZE_NOALLOC(TestObject); + // Deserialization constructor. The TestState pointer should be supplied by the deserialization context. + TestObject(uint64_t messages_received, uint64_t total_num_messages, TestState* main_test_state) + : messages_received(messages_received), + total_num_messages(total_num_messages), + main_test_state(main_test_state) {} + // Constructor called by factory function + TestObject(uint64_t total_num_messages, TestState* test_state) + : messages_received(0), total_num_messages(total_num_messages), main_test_state(test_state) {} +}; - REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(fun, bytes_fun, finishing_call)); +std::unique_ptr TestObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + // Default deserialize the first 2 fields + auto messages_received_ptr = mutils::from_bytes(dsm, buffer); + auto total_num_ptr = mutils::from_bytes(dsm, buffer + mutils::bytes_size(*messages_received_ptr)); + // Retrieve a pointer to TestState from DSM + TestState* test_state_ptr = nullptr; + assert(dsm); + assert(dsm->registered()); + if(dsm && dsm->registered()) { + test_state_ptr = &(dsm->mgr()); + } else { + // Asserts are disabled in release mode, so try to provide some information if there's a bug + std::cerr << "ERROR: Unable to get TestState pointer in TestObject deserialization. TestObject will crash when it attempts to dereference it." << std::endl; + } + assert(test_state_ptr); + return std::make_unique(*messages_received_ptr, *total_num_ptr, test_state_ptr); }; struct exp_result { @@ -87,7 +133,7 @@ int main(int argc, char* argv[]) { const uint32_t count = std::stoi(argv[dashdash_pos + 2]); const uint32_t num_senders_selector = std::stoi(argv[dashdash_pos + 3]); - steady_clock::time_point begin_time, send_complete_time; + steady_clock::time_point begin_time; // Convert this integer to a more readable enum value const PartialSendMode senders_mode = num_senders_selector == 0 @@ -109,25 +155,9 @@ int main(int argc, char* argv[]) { total_num_messages = count; break; } - // variable 'done' tracks the end of the test - volatile bool done = false; - // callback into the application code at each message delivery - auto stability_callback = [&done, - &send_complete_time, - total_num_messages, - num_delivered = 0u](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - // Count the total number of messages delivered - ++num_delivered; - // Check for completion - if(num_delivered == total_num_messages) { - send_complete_time = std::chrono::steady_clock::now(); - done = true; - } - }; + // The replicated TestObject will get a pointer to this object and use it to signal that the test is done + TestState shared_test_state; + shared_test_state.experiment_done = false; if(dashdash_pos + 4 < argc) { pthread_setname_np(pthread_self(), argv[dashdash_pos + 3]); @@ -135,39 +165,20 @@ int main(int argc, char* argv[]) { pthread_setname_np(pthread_self(), DEFAULT_PROC_NAME); } - /******************* - derecho::SubgroupInfo subgroup_info{[num_nodes]( - const std::vector& subgroup_type_order, - const std::unique_ptr& prev_view, derecho::View& curr_view) { - if(curr_view.num_members < num_nodes) { - std::cout << "not enough members yet:" << curr_view.num_members << " < " << num_nodes << std::endl; - throw derecho::subgroup_provisioning_exception(); - } - derecho::subgroup_shard_layout_t subgroup_layout(1); - - std::vector members(num_nodes); - for(int i = 0; i < num_nodes; i++) { - members[i] = i; - } - - subgroup_layout[0].emplace_back(curr_view.make_subview(members)); - curr_view.next_unassigned_rank = std::max(curr_view.next_unassigned_rank, num_nodes); - derecho::subgroup_allocation_map_t subgroup_allocation; - subgroup_allocation.emplace(std::type_index(typeid(TestObject)), std::move(subgroup_layout)); - return subgroup_allocation; - }}; - *****************/ - auto membership_function = PartialSendersAllocator(num_nodes, senders_mode, derecho::Mode::ORDERED); derecho::SubgroupInfo subgroup_info(membership_function); - auto ba_factory = [](persistent::PersistentRegistry*, derecho::subgroup_id_t) { return std::make_unique(); }; + auto test_factory = [&](persistent::PersistentRegistry*, derecho::subgroup_id_t) { + return std::make_unique(total_num_messages, &shared_test_state); + }; + + std::vector context_vector{&shared_test_state}; - derecho::Group group(derecho::UserMessageCallbacks{stability_callback}, subgroup_info, {}, std::vector{}, ba_factory); + derecho::Group group(derecho::UserMessageCallbacks{nullptr}, subgroup_info, context_vector, + std::vector{}, test_factory); std::cout << "Finished constructing/joining Group" << std::endl; - //std::string str_1k(max_msg_size, 'x'); - uint8_t* bbuf = (uint8_t*)malloc(max_msg_size); + uint8_t* bbuf = new uint8_t[max_msg_size]; memset(bbuf, 0, max_msg_size); Bytes bytes(bbuf, max_msg_size); @@ -195,22 +206,13 @@ int main(int argc, char* argv[]) { send_all(); } } - /* - if(node_rank == 0) { - derecho::rpc::QueryResults results = handle.ordered_send(0); - std::cout << "waiting for response..." << std::endl; -#pragma GCC diagnostic ignored "-Wunused-variable" - decltype(results)::ReplyMap& replies = results.get(); -#pragma GCC diagnostic pop - } - */ - while(!done) { + while(!shared_test_state.experiment_done) { } - free(bbuf); + delete[] bbuf; - int64_t nsec = duration_cast(send_complete_time - begin_time).count(); + int64_t nsec = duration_cast(shared_test_state.send_complete_time - begin_time).count(); double thp_gbps = (static_cast(total_num_messages) * max_msg_size) / nsec; double thp_ops = (static_cast(total_num_messages) * 1000000000) / nsec; diff --git a/src/applications/tests/scalability_tests/CMakeLists.txt b/src/applications/tests/scalability_tests/CMakeLists.txt deleted file mode 100644 index ce16ea7b..00000000 --- a/src/applications/tests/scalability_tests/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -cmake_minimum_required(VERSION 3.15.4) -set(CMAKE_DISABLE_SOURCE_CHANGES ON) -set(CMAKE_DISABLE_IN_SOURCE_BUILD ON) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -include_directories(${CMAKE_SOURCE_DIR}/include) diff --git a/src/applications/tests/unit_tests/cooked_send_test.cpp b/src/applications/tests/unit_tests/cooked_send_test.cpp index b2af64b5..31154496 100644 --- a/src/applications/tests/unit_tests/cooked_send_test.cpp +++ b/src/applications/tests/unit_tests/cooked_send_test.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include @@ -11,16 +12,46 @@ using std::map; using std::pair; using std::vector; +/** + * This object contains state that is shared between the replicated test objects + * and the main thread, rather than stored inside the replicated objects. It's + * used to provide a way for the replicated objects to "call back" to the main + * thread. Each replicated object will get a pointer to this object when it is + * constructed or deserialized, set up by the deserialization manager. + */ +struct TestState : public derecho::DeserializationContext { + uint32_t num_messages; + uint32_t num_nodes; + uint32_t counter; + std::atomic done; + void check_test_done() { + if(counter == num_messages * num_nodes) { + done = true; + } + } + TestState(uint32_t num_messages, uint32_t num_nodes) + : num_messages(num_messages), + num_nodes(num_nodes), + counter(0), + done(false) {} +}; + class CookedMessages : public mutils::ByteRepresentable { vector> msgs; // vector of (nodeid, msg #) + TestState* test_state; public: - CookedMessages() = default; - CookedMessages(const vector>& msgs) : msgs(msgs) { - } + // Factory constructor + CookedMessages(TestState* test_state) : test_state(test_state) {} + // Deserialization constructor + CookedMessages(const vector>& msgs, TestState* test_state) + : msgs(msgs), test_state(test_state) {} void send(uint nodeid, uint msg) { msgs.push_back(std::make_pair(nodeid, msg)); + // Count the number of RPC messages received here + test_state->counter++; + test_state->check_test_done(); } vector> get_msgs(uint start_index, uint end_index) { @@ -31,13 +62,23 @@ class CookedMessages : public mutils::ByteRepresentable { return vector>(msgs.begin() + start_index, msgs.begin() + end_index); } - // default state - DEFAULT_SERIALIZATION_SUPPORT(CookedMessages, msgs); + DEFAULT_SERIALIZE(msgs); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + DEFAULT_DESERIALIZE_NOALLOC(CookedMessages); // what operations you want as part of the subgroup REGISTER_RPC_FUNCTIONS(CookedMessages, ORDERED_TARGETS(send, get_msgs)); }; +// Custom deserializer that retrieves the TestState pointer from the DeserializationManager +std::unique_ptr CookedMessages::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto msgs_ptr = mutils::from_bytes>>(dsm, buffer); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*msgs_ptr, test_state_ptr); +} + bool verify_local_order(vector> msgs) { map order; for(auto [nodeid, msg] : msgs) { @@ -57,33 +98,19 @@ int main(int argc, char* argv[]) { } const uint32_t num_nodes = atoi(argv[1]); Conf::initialize(argc, argv); - auto subgroup_membership_function = [num_nodes]( - const std::vector& subgroup_type_order, - const std::unique_ptr& prev_view, derecho::View& curr_view) { - auto& members = curr_view.members; - auto num_members = members.size(); - if(num_members < num_nodes) { - throw subgroup_provisioning_exception(); - } - subgroup_shard_layout_t layout(num_members); - layout[0].push_back(curr_view.make_subview(vector(members))); - derecho::subgroup_allocation_map_t subgroup_allocation; - subgroup_allocation.emplace(std::type_index(typeid(CookedMessages)), std::move(layout)); - return subgroup_allocation; - }; - auto cooked_subgroup_factory = [](persistent::PersistentRegistry*,derecho::subgroup_id_t) { return std::make_unique(); }; + // Configure the default subgroup allocator to put all the nodes in one fixed-size subgroup + SubgroupInfo subgroup_info(derecho::DefaultSubgroupAllocator( + {{std::type_index(typeid(CookedMessages)), + derecho::one_subgroup_policy(derecho::fixed_even_shards(1, num_nodes))}})); - SubgroupInfo subgroup_info(subgroup_membership_function); - volatile bool done = false; uint32_t num_msgs = 500; - auto stability_callback = [num_msgs, num_nodes, &done, counter = (uint)0](subgroup_id_t, node_id_t sender_id, message_id_t index, std::optional>, persistent::version_t) mutable { - counter++; - if(counter == num_msgs * num_nodes) { - done = true; - } + TestState test_state(num_msgs, num_nodes); + auto cooked_subgroup_factory = [&](persistent::PersistentRegistry*, derecho::subgroup_id_t) { + return std::make_unique(&test_state); }; - Group group({stability_callback}, subgroup_info, {}, {}, cooked_subgroup_factory); + // Put a pointer to test_state in Group's vector of DeserializationContexts so it will be passed to CookedMessages + Group group({}, subgroup_info, {&test_state}, {}, cooked_subgroup_factory); cout << "Finished constructing/joining the group" << endl; auto group_members = group.get_members(); @@ -106,7 +133,7 @@ int main(int argc, char* argv[]) { for(uint i = 1; i < num_msgs + 1; ++i) { cookedMessagesHandle.ordered_send(my_rank, i); } - while(!done) { + while(!test_state.done) { } if(my_rank == 0) { uint32_t max_msg_size = getConfUInt64(Conf::SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE); diff --git a/src/applications/tests/unit_tests/delivery_order_test.cpp b/src/applications/tests/unit_tests/delivery_order_test.cpp index 2ddd4249..cc9d8f03 100644 --- a/src/applications/tests/unit_tests/delivery_order_test.cpp +++ b/src/applications/tests/unit_tests/delivery_order_test.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -56,7 +57,7 @@ int main(int argc, char* argv[]) { std::unique_ptr> group; uint32_t my_rank; uint64_t max_msg_size = getConfUInt64(Conf::SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE); - volatile bool done = false; + std::atomic done = false; auto delivery_callback = [&, num_received_msgs_map = std::map(), received_msgs_index_map = std::map(), received_msgs = std::vector(), diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index c16b03ce..61943daf 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -17,77 +17,159 @@ #include -class OneFieldObject : public mutils::ByteRepresentable, public derecho::SignedPersistentFields { - persistent::Persistent string_field; - -public: - OneFieldObject(persistent::PersistentRegistry* registry) - : string_field(std::make_unique, - "OneFieldObjectStringField", registry, true) {} - OneFieldObject(persistent::Persistent& other_value) - : string_field(std::move(other_value)) {} - - std::string get_state() const { - return *string_field; - } +#include "signed_log_test.hpp" - void update_state(const std::string& new_value) { - *string_field = new_value; - } +/* --- TestState implementation --- */ - DEFAULT_SERIALIZATION_SUPPORT(OneFieldObject, string_field); - REGISTER_RPC_FUNCTIONS(OneFieldObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); -}; - -class TwoFieldObject : public mutils::ByteRepresentable, public derecho::SignedPersistentFields { - persistent::Persistent foo; - persistent::Persistent bar; - -public: - TwoFieldObject(persistent::PersistentRegistry* registry) - : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), - bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true) {} - TwoFieldObject(persistent::Persistent& other_foo, - persistent::Persistent& other_bar) - : foo(std::move(other_foo)), - bar(std::move(other_bar)) {} - - std::string get_foo() const { - return *foo; +void TestState::notify_update_delivered(uint64_t update_counter, persistent::version_t version) { + dbg_default_debug("Update {}/{} delivered", update_counter, subgroup_total_updates); + if(update_counter == subgroup_total_updates) { + dbg_default_info("Final update (#{}) delivered, version is {}", update_counter, version); + last_version = version; + last_version_ready = true; } +} - std::string get_bar() const { - return *bar; +void TestState::notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); + //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" + if(my_subgroup_is_unsigned && last_version_ready && version == last_version) { + { + std::unique_lock finish_lock(finish_mutex); + subgroup_finished = true; + } + subgroup_finished_condition.notify_all(); } - - void update(const std::string& new_foo, const std::string& new_bar) { - *foo = new_foo; - *bar = new_bar; +} +void TestState::notify_global_verified(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + dbg_default_info("Verified: Subgroup {}, version {}.", subgroup_id, version); + dbg_default_flush(); + // Each node should only be placed in one subgroup, so this callback should not be invoked for any other subgroup IDs + assert(subgroup_id == my_subgroup_id); + if(last_version_ready && version == last_version) { + { + std::unique_lock finish_lock(finish_mutex); + subgroup_finished = true; + } + subgroup_finished_condition.notify_all(); } +} - DEFAULT_SERIALIZATION_SUPPORT(TwoFieldObject, foo, bar); - REGISTER_RPC_FUNCTIONS(TwoFieldObject, P2P_TARGETS(get_foo, get_bar), ORDERED_TARGETS(update)); -}; +/* --- OneFieldObject implementation --- */ -class UnsignedObject : public mutils::ByteRepresentable, public derecho::PersistsFields { - persistent::Persistent string_field; +OneFieldObject::OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) + : string_field(std::make_unique, + "OneFieldObjectStringField", registry, true), + updates_delivered(0), + test_state(test_state) { + assert(test_state); +} -public: - UnsignedObject(persistent::PersistentRegistry* registry) - : string_field(std::make_unique, "UnsignedObjectField", registry, false) {} - UnsignedObject(persistent::Persistent& other_field) - : string_field(std::move(other_field)) {} - std::string get_state() const { - return *string_field; - } +OneFieldObject::OneFieldObject(persistent::Persistent& other_value, + uint64_t other_updates_delivered, + TestState* test_state) + : string_field(std::move(other_value)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); +} - void update_state(const std::string& new_value) { - *string_field = new_value; - } +void OneFieldObject::update_state(const std::string& new_value) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); + ++updates_delivered; + *string_field = new_value; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); +} + +// Custom deserializer that retrieves the TestState pointer from the DeserializationManager +std::unique_ptr OneFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto field_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*field_ptr); + auto counter_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*field_ptr, *counter_ptr, test_state_ptr); +} + +/* --- TwoFieldObject implementation --- */ - DEFAULT_SERIALIZATION_SUPPORT(UnsignedObject, string_field); - REGISTER_RPC_FUNCTIONS(UnsignedObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); -}; +TwoFieldObject::TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) + : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), + bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true), + updates_delivered(0), + test_state(test_state) { + assert(test_state); +} + +TwoFieldObject::TwoFieldObject(persistent::Persistent& other_foo, + persistent::Persistent& other_bar, + uint64_t other_updates_delivered, + TestState* test_state) + : foo(std::move(other_foo)), + bar(std::move(other_bar)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); +} + +void TwoFieldObject::update(const std::string& new_foo, const std::string& new_bar) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); + ++updates_delivered; + *foo = new_foo; + *bar = new_bar; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); +} + +std::unique_ptr TwoFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto foo_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*foo_ptr); + auto bar_ptr = mutils::from_bytes>(dsm, buffer + bytes_read); + bytes_read += mutils::bytes_size(*bar_ptr); + auto update_counter_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*foo_ptr, *bar_ptr, *update_counter_ptr, test_state_ptr); +} + +/* --- UnsignedObject implementation --- */ + +UnsignedObject::UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) + : string_field(std::make_unique, "UnsignedObjectField", registry, false), + updates_delivered(0), + test_state(test_state) { + assert(test_state); +} + +UnsignedObject::UnsignedObject(persistent::Persistent& other_field, + uint64_t other_updates_delivered, + TestState* test_state) + : string_field(std::move(other_field)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); +} + +void UnsignedObject::update_state(const std::string& new_value) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); + ++updates_delivered; + *string_field = new_value; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); +} + +std::unique_ptr UnsignedObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto field_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*field_ptr); + auto counter_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*field_ptr, *counter_ptr, test_state_ptr); +} /** * Command-line arguments: @@ -101,12 +183,6 @@ int main(int argc, char** argv) { const std::string characters("abcdefghijklmnopqrstuvwxyz"); std::mt19937 random_generator(getpid()); std::uniform_int_distribution char_distribution(0, characters.size() - 1); - std::mutex finish_mutex; - //One condition variable per subgroup to represent when they are finished verifying all updates - std::vector subgroup_finished_condition(2); - //The actual boolean for the condition, since wakeups can be spurious - std::vector subgroup_finished = {false, false}; - const int num_args = 4; if(argc < (num_args + 1) || (argc > (num_args + 1) && strcmp("--", argv[argc - (num_args + 1)]) != 0)) { std::cout << "Invalid command line arguments." << std::endl; @@ -131,58 +207,37 @@ int main(int argc, char** argv) { derecho::one_subgroup_policy(derecho::flexible_even_shards( 1, 1, subgroup_unsigned_size))}})); - //Count the total number of messages delivered in each subgroup to figure out what version is assigned to the last one + //Count the total number of messages to be delivered in each subgroup, so we can + //identify when the last message has been delivered and discover what version it got std::array subgroup_total_messages = {subgroup_1_size * num_updates, subgroup_2_size * num_updates, subgroup_unsigned_size * num_updates}; - std::array subgroup_last_version; - std::array, 3> last_version_ready = {false, false, false}; - auto stability_callback = [&subgroup_total_messages, - &subgroup_last_version, - &last_version_ready, - num_delivered = std::vector{0u, 0u, 0u}](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - num_delivered[subgroup]++; - if(num_delivered[subgroup] == subgroup_total_messages[subgroup]) { - subgroup_last_version[subgroup] = ver; - last_version_ready[subgroup] = true; - } - }; + + TestState test_state; + test_state.subgroup_finished = false; + test_state.last_version_ready = false; auto global_persist_callback = [&](derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); - //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" - //NOTE: This relies on UnsignedObject always being the third subgroup (with ID 2) - if(subgroup_id == 2 && last_version_ready[subgroup_id] && version == subgroup_last_version[subgroup_id]) { - { - std::unique_lock finish_lock(finish_mutex); - subgroup_finished[subgroup_id] = true; - } - subgroup_finished_condition[subgroup_id].notify_all(); - } + test_state.notify_global_persistence(subgroup_id, version); }; auto global_verified_callback = [&](derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - dbg_default_info("Verified: Subgroup {}, version {}.", subgroup_id, version); - dbg_default_flush(); - if(last_version_ready[subgroup_id] && version == subgroup_last_version[subgroup_id]) { - { - std::unique_lock finish_lock(finish_mutex); - subgroup_finished[subgroup_id] = true; - } - subgroup_finished_condition[subgroup_id].notify_all(); - } + test_state.notify_global_verified(subgroup_id, version); }; auto new_view_callback = [](const derecho::View& view) { dbg_default_info("Now on View {}", view.vid); }; + // Pass test_state to the Group constructor as a DeserializationContext derecho::Group group( - {stability_callback, nullptr, global_persist_callback, global_verified_callback}, - subgroup_info, {}, {new_view_callback}, - [](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr); }, - [](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr); }, - [](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr); }); + {nullptr, nullptr, global_persist_callback, global_verified_callback}, + subgroup_info, {&test_state}, {new_view_callback}, + [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { + return std::make_unique(pr, &test_state); + }, + [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { + return std::make_unique(pr, &test_state); + }, + [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { + return std::make_unique(pr, &test_state); + }); //Figure out which subgroup this node got assigned to int32_t my_shard_subgroup_1 = group.get_my_shard(); int32_t my_shard_subgroup_2 = group.get_my_shard(); @@ -190,6 +245,9 @@ int main(int argc, char** argv) { if(my_shard_subgroup_1 != -1) { std::cout << "In the OneFieldObject subgroup" << std::endl; derecho::Replicated& object_handle = group.get_subgroup(); + test_state.subgroup_total_updates = subgroup_total_messages[0]; + test_state.my_subgroup_id = object_handle.get_subgroup_id(); + test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_string('a', 32); @@ -200,6 +258,9 @@ int main(int argc, char** argv) { } else if(my_shard_subgroup_2 != -1) { std::cout << "In the TwoFieldObject subgroup" << std::endl; derecho::Replicated& object_handle = group.get_subgroup(); + test_state.subgroup_total_updates = subgroup_total_messages[1]; + test_state.my_subgroup_id = object_handle.get_subgroup_id(); + test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_foo('a', 32); @@ -213,6 +274,9 @@ int main(int argc, char** argv) { } else if(my_shard_unsigned_subgroup != -1) { std::cout << "In the UnsignedObject subgroup" << std::endl; derecho::Replicated& object_handle = group.get_subgroup(); + test_state.subgroup_total_updates = subgroup_total_messages[2]; + test_state.my_subgroup_id = object_handle.get_subgroup_id(); + test_state.my_subgroup_is_unsigned = true; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_string('a', 32); @@ -222,18 +286,10 @@ int main(int argc, char** argv) { } } //Wait for all updates to finish being verified, using the condition variables updated by the callback - if(my_shard_subgroup_1 != -1) { - std::cout << "Waiting for final version to be verified" << std::endl; - std::unique_lock lock(finish_mutex); - subgroup_finished_condition[0].wait(lock, [&]() { return subgroup_finished[0]; }); - } else if(my_shard_subgroup_2 != -1) { - std::cout << "Waiting for final version to be verified" << std::endl; - std::unique_lock lock(finish_mutex); - subgroup_finished_condition[1].wait(lock, [&]() { return subgroup_finished[1]; }); - } else if(my_shard_unsigned_subgroup != -1) { - std::cout << "Waiting for final version to be persisted" << std::endl; - std::unique_lock lock(finish_mutex); - subgroup_finished_condition[2].wait(lock, [&]() { return subgroup_finished[2]; }); + std::cout << "Waiting for final version to be verified" << std::endl; + { + std::unique_lock lock(test_state.finish_mutex); + test_state.subgroup_finished_condition.wait(lock, [&]() { return test_state.subgroup_finished; }); } std::cout << "Done" << std::endl; group.barrier_sync(); diff --git a/src/applications/tests/unit_tests/signed_log_test.hpp b/src/applications/tests/unit_tests/signed_log_test.hpp new file mode 100644 index 00000000..9bdd5b39 --- /dev/null +++ b/src/applications/tests/unit_tests/signed_log_test.hpp @@ -0,0 +1,136 @@ +#include +#include +#include +#include + +#include + +/** + * This object contains state that is shared between the replicated test objects + * and the main thread, rather than stored inside the replicated objects. It's + * used to provide a way for the replicated objects to "call back" to the main + * thread. Each replicated object will get a pointer to this object when it is + * constructed or deserialized, set up by the deserialization manager. + */ +struct TestState : public derecho::DeserializationContext { + // The next 3 are set by the main thread after it figures out which subgroup this node was assigned to + derecho::subgroup_id_t my_subgroup_id; + uint32_t subgroup_total_updates; + bool my_subgroup_is_unsigned; + // Set by each replicated object when the last update is delivered and its version is known + persistent::version_t last_version; + // Used to alert other threads (i.e. global callbacks) that last_version has been set + std::atomic last_version_ready; + // Mutex for subgroup_finished + std::mutex finish_mutex; + // Condition variable used to indicate when this node's subgroup has finished persisting/verifying all updates + std::condition_variable subgroup_finished_condition; + // Boolean to set to true when signaling the condition variable + bool subgroup_finished; + // Called from replicated object update_state methods to notify the main thread that an update was delivered + void notify_update_delivered(uint64_t update_counter, persistent::version_t version); + // Called by Derecho's global persistence callback + void notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version); + // Called by Derecho's global verified callback + void notify_global_verified(derecho::subgroup_id_t subgroup_id, persistent::version_t version); +}; + +/** + * Test object with one signed persistent field + */ +class OneFieldObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { + persistent::Persistent string_field; + // Counts the number of updates delivered within this subgroup. + // Not persisted, but needs to be replicated so that all replicas have a consistent count of + // the total number of messages, even across view changes + uint64_t updates_delivered; + // Pointer to an object held by the main thread, set from DeserializationManager + TestState* test_state; + +public: + /** Factory constructor */ + OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state); + /** Deserialization constructor */ + OneFieldObject(persistent::Persistent& other_value, + uint64_t other_updates_delivered, + TestState* test_state); + + std::string get_state() const { + return *string_field; + } + + void update_state(const std::string& new_value); + + REGISTER_RPC_FUNCTIONS(OneFieldObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); + + DEFAULT_SERIALIZE(string_field, updates_delivered); + DEFAULT_DESERIALIZE_NOALLOC(OneFieldObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; + +/** + * Test object with two signed persistent fields + */ +class TwoFieldObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { + persistent::Persistent foo; + persistent::Persistent bar; + uint64_t updates_delivered; + TestState* test_state; + +public: + /** Factory constructor */ + TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state); + /** Deserialization constructor */ + TwoFieldObject(persistent::Persistent& other_foo, + persistent::Persistent& other_bar, + uint64_t other_updates_delivered, + TestState* test_state); + + std::string get_foo() const { + return *foo; + } + + std::string get_bar() const { + return *bar; + } + + void update(const std::string& new_foo, const std::string& new_bar); + + REGISTER_RPC_FUNCTIONS(TwoFieldObject, P2P_TARGETS(get_foo, get_bar), ORDERED_TARGETS(update)); + DEFAULT_SERIALIZE(foo, bar, updates_delivered); + DEFAULT_DESERIALIZE_NOALLOC(TwoFieldObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; + +/** + * Test object with one un-signed persistent field + */ +class UnsignedObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::PersistsFields { + persistent::Persistent string_field; + uint64_t updates_delivered; + TestState* test_state; + +public: + /** Factory constructor */ + UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state); + /** Deserialization constructor */ + UnsignedObject(persistent::Persistent& other_field, + uint64_t other_updates_delivered, + TestState* test_state); + std::string get_state() const { + return *string_field; + } + + void update_state(const std::string& new_value); + + REGISTER_RPC_FUNCTIONS(UnsignedObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); + DEFAULT_SERIALIZE(string_field, updates_delivered); + DEFAULT_DESERIALIZE_NOALLOC(UnsignedObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; \ No newline at end of file diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index fd51b1f6..ed419fb8 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 99; +const int COMMITS_AHEAD_OF_VERSION = 167; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+99"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+167"; } diff --git a/src/core/multicast_group.cpp b/src/core/multicast_group.cpp index f3b861ad..f593f8a6 100644 --- a/src/core/multicast_group.cpp +++ b/src/core/multicast_group.cpp @@ -508,12 +508,6 @@ void MulticastGroup::deliver_message(RDMCMessage& msg, const subgroup_id_t& subg auto payload_size = msg.size - h->header_size; internal_callbacks.post_next_version_callback(subgroup_num, version, msg_ts_us); internal_callbacks.rpc_callback(subgroup_num, msg.sender_id, version, msg_ts_us, buf, payload_size); - - // if(callbacks.global_stability_callback) { - // callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {}, - // version); - // } - } else if(callbacks.global_stability_callback) { callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {{buf + h->header_size, msg.size - h->header_size}}, @@ -536,13 +530,6 @@ void MulticastGroup::deliver_message(SSTMessage& msg, const subgroup_id_t& subgr auto payload_size = msg.size - h->header_size; internal_callbacks.post_next_version_callback(subgroup_num, version, msg_ts_us); internal_callbacks.rpc_callback(subgroup_num, msg.sender_id, version, msg_ts_us, buf, payload_size); - - // if(callbacks.global_stability_callback) { - // callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, - // {{buf + h->header_size, msg.size - h->header_size}}, - // version); - // } - } else if(callbacks.global_stability_callback) { callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {{buf + h->header_size, msg.size - h->header_size}}, diff --git a/src/core/rpc_manager.cpp b/src/core/rpc_manager.cpp index 36f8d6cf..45d183f9 100644 --- a/src/core/rpc_manager.cpp +++ b/src/core/rpc_manager.cpp @@ -28,12 +28,10 @@ RPCManager::RPCManager(ViewManager& group_view_manager, : nid(getConfUInt32(Conf::DERECHO_LOCAL_ID)), rpc_logger(LoggerFactory::createIfAbsent(LoggerFactory::RPC_LOGGER_NAME, getConfString(Conf::LOGGER_RPC_LOG_LEVEL))), receivers(new std::decay_t()), + deserialization_contexts(deserialization_context), view_manager(group_view_manager), busy_wait_before_sleep_ms(getConfUInt64(Conf::DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS)) { RpcLoggerPtr::initialize(); - for(const auto& deserialization_context_ptr : deserialization_context) { - rdv.push_back(deserialization_context_ptr); - } rpc_listener_thread = std::thread(&RPCManager::p2p_receive_loop, this); } @@ -121,7 +119,7 @@ std::exception_ptr RPCManager::receive_message( std::size_t reply_header_size = header_space(); //Pass through the provided out_alloc function, but add space for the reply header recv_ret reply_return = receiver_function_entry->second( - &rdv, received_from, buf, + &deserialization_contexts, received_from, buf, [&out_alloc, &reply_header_size](std::size_t size) { return out_alloc(size + reply_header_size) + reply_header_size; }); @@ -143,7 +141,7 @@ std::exception_ptr RPCManager::parse_and_receive(uint8_t* buf, std::size_t size, Opcode indx; node_id_t received_from; uint32_t flags; - retrieve_header(&rdv, buf, payload_size, indx, received_from, flags); + retrieve_header(buf, payload_size, indx, received_from, flags); RPCManager::rpc_caller_id = received_from; return receive_message(indx, received_from, buf + header_space(), payload_size, out_alloc); @@ -233,7 +231,7 @@ void RPCManager::p2p_message_handler(node_id_t sender_id, uint8_t* msg_buf) { Opcode indx; node_id_t received_from; uint32_t flags; - retrieve_header(nullptr, msg_buf, payload_size, indx, received_from, flags); + retrieve_header(msg_buf, payload_size, indx, received_from, flags); dbg_trace(rpc_logger, "Handling a P2P message: function_id = {}, is_reply = {}, received_from = {}, payload_size = {}, invocation_id = {}", indx.function_id, indx.is_reply, received_from, payload_size, ((long*)(msg_buf + header_size))[0]); if(indx.is_reply) { @@ -487,7 +485,7 @@ void RPCManager::p2p_request_worker() { request = p2p_request_queue.front(); p2p_request_queue.pop(); } - retrieve_header(nullptr, request.msg_buf, payload_size, indx, received_from, flags); + retrieve_header(request.msg_buf, payload_size, indx, received_from, flags); if(indx.is_reply || RPC_HEADER_FLAG_TST(flags, CASCADE)) { dbg_error(rpc_logger, "Invalid rpc message in fifo queue: is_reply={}, is_cascading={}", indx.is_reply, RPC_HEADER_FLAG_TST(flags, CASCADE)); diff --git a/src/sst/lf.cpp b/src/sst/lf.cpp index 5d92d4a0..e411fd92 100644 --- a/src/sst/lf.cpp +++ b/src/sst/lf.cpp @@ -263,6 +263,7 @@ void _resources::connect_endpoint(bool is_lf_server) { fi_freeinfo(entry.info); crash_with_message("failed to initialize server endpoint.\n"); } + dbg_trace(sst_logger, "calling fi_accept()"); if(fi_accept(this->ep, NULL, 0)) { fi_reject(g_ctxt.pep, entry.info->handle, NULL, 0); fi_freeinfo(entry.info); @@ -292,6 +293,7 @@ void _resources::connect_endpoint(bool is_lf_server) { malloc, remote_cm_data.pep_addr_len); memcpy((void*)client_hints->dest_addr, (void*)remote_cm_data.pep_addr, (size_t)remote_cm_data.pep_addr_len); client_hints->dest_addrlen = remote_cm_data.pep_addr_len; + dbg_trace(sst_logger, "calling fi_getinfo()"); fail_if_nonzero_retry_on_eagain("fi_getinfo() failed.", CRASH_ON_FAILURE, fi_getinfo, LF_VERSION, nullptr, nullptr, 0, client_hints, &client_info); if(init_endpoint(client_info)) { @@ -300,9 +302,10 @@ void _resources::connect_endpoint(bool is_lf_server) { crash_with_message("failed to initialize client endpoint.\n"); } + dbg_trace(sst_logger, "calling fi_connect()"); fail_if_nonzero_retry_on_eagain("fi_connect()", CRASH_ON_FAILURE, fi_connect, this->ep, remote_cm_data.pep_addr, nullptr, 0); - + dbg_trace(sst_logger, "fi_connect() succeeded, calling fi_eq_sread()"); nRead = fi_eq_sread(this->eq, &event, &entry, sizeof(entry), -1, 0); if(nRead != sizeof(entry)) { dbg_error(sst_logger, "failed to connect remote.");