From 6c9133226496c7d75f258f3a86e43d6d3a2b4039 Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 28 Sep 2023 11:35:08 -0400 Subject: [PATCH 01/12] Replaced volatile bool with atomic in test programs Volatile variables are not meant to be used for synchronization between threads and are not guaranteed to work correctly in busy-waiting loops like we have in these tests. The correct way to ensure a variable in a busy-waiting loop is not cached, reordered, or optimized out is to use std::atomic, which we mostly do already in our newer test programs. --- .../performance_tests/bandwidth_test.cpp | 15 +++---- .../deliver_predicate_delay.cpp | 12 +++--- .../tests/performance_tests/latency_test.cpp | 3 +- .../multiple_active_subgroups_test.cpp | 11 ++--- .../performance_tests/persistent_bw_test.cpp | 14 +++---- .../performance_tests/sender_delay_test.cpp | 12 +++--- .../performance_tests/signed_bw_test.cpp | 16 +++---- .../single_active_subgroup_test.cpp | 3 +- .../typed_subgroup_bw_test.cpp | 42 ++++--------------- .../tests/unit_tests/cooked_send_test.cpp | 5 ++- .../tests/unit_tests/delivery_order_test.cpp | 3 +- src/core/git_version.cpp | 4 +- 12 files changed, 61 insertions(+), 79 deletions(-) diff --git a/src/applications/tests/performance_tests/bandwidth_test.cpp b/src/applications/tests/performance_tests/bandwidth_test.cpp index 09dc2c6f..59a80c6b 100644 --- a/src/applications/tests/performance_tests/bandwidth_test.cpp +++ b/src/applications/tests/performance_tests/bandwidth_test.cpp @@ -7,6 +7,13 @@ * in the only subgroup that consists of all the nodes * Upon completion, the results are appended to file data_derecho_bw on the leader */ +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include "partial_senders_allocator.hpp" + +#include + +#include #include #include #include @@ -15,12 +22,6 @@ #include #include -#include - -#include "aggregate_bandwidth.hpp" -#include "log_results.hpp" -#include "partial_senders_allocator.hpp" - using std::cout; using std::endl; using std::map; @@ -98,7 +99,7 @@ int main(int argc, char* argv[]) { } // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&done, total_num_messages, diff --git a/src/applications/tests/performance_tests/deliver_predicate_delay.cpp b/src/applications/tests/performance_tests/deliver_predicate_delay.cpp index 92881b08..b4e52fab 100644 --- a/src/applications/tests/performance_tests/deliver_predicate_delay.cpp +++ b/src/applications/tests/performance_tests/deliver_predicate_delay.cpp @@ -6,6 +6,11 @@ * in the only subgroup that consists of all the nodes * Upon completion, the results are appended to file data_deliver_predicate_delay on the leader */ +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include + +#include #include #include #include @@ -14,11 +19,6 @@ #include #include -#include "aggregate_bandwidth.hpp" -#include - -#include "log_results.hpp" - using std::cout; using std::endl; using std::map; @@ -60,7 +60,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&num_messages, &done, diff --git a/src/applications/tests/performance_tests/latency_test.cpp b/src/applications/tests/performance_tests/latency_test.cpp index 70cb32f8..d738bb08 100644 --- a/src/applications/tests/performance_tests/latency_test.cpp +++ b/src/applications/tests/performance_tests/latency_test.cpp @@ -10,6 +10,7 @@ * in the only subgroup that consists of all the nodes * Upon completion, the results are appended to file data_latency on the leader */ +#include #include #include #include @@ -88,7 +89,7 @@ int main(int argc, char* argv[]) { vector start_times(num_messages), end_times(num_messages); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; uint32_t my_id; // callback into the application code at each message delivery auto stability_callback = [&, num_delivered = 0u, time_index = 0u]( diff --git a/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp b/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp index 9722d559..bed1848d 100644 --- a/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp +++ b/src/applications/tests/performance_tests/multiple_active_subgroups_test.cpp @@ -1,7 +1,4 @@ -#include "aggregate_bandwidth.hpp" -#include "log_results.hpp" -#include - +#include #include #include #include @@ -11,6 +8,10 @@ #include #include +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include + using std::cout; using std::endl; using std::map; @@ -54,7 +55,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&num_messages, &done, diff --git a/src/applications/tests/performance_tests/persistent_bw_test.cpp b/src/applications/tests/performance_tests/persistent_bw_test.cpp index 45785fe0..52ff3acb 100644 --- a/src/applications/tests/performance_tests/persistent_bw_test.cpp +++ b/src/applications/tests/performance_tests/persistent_bw_test.cpp @@ -1,3 +1,10 @@ +#include "aggregate_bandwidth.hpp" +#include "bytes_object.hpp" +#include "log_results.hpp" +#include "partial_senders_allocator.hpp" + +#include + #include #include #include @@ -6,13 +13,6 @@ #include #include -#include - -#include "aggregate_bandwidth.hpp" -#include "bytes_object.hpp" -#include "log_results.hpp" -#include "partial_senders_allocator.hpp" - using std::cout; using std::endl; using namespace persistent; diff --git a/src/applications/tests/performance_tests/sender_delay_test.cpp b/src/applications/tests/performance_tests/sender_delay_test.cpp index da14edde..469b793f 100644 --- a/src/applications/tests/performance_tests/sender_delay_test.cpp +++ b/src/applications/tests/performance_tests/sender_delay_test.cpp @@ -1,3 +1,8 @@ +#include "aggregate_bandwidth.hpp" +#include "log_results.hpp" +#include + +#include #include #include #include @@ -6,9 +11,6 @@ #include #include -#include "aggregate_bandwidth.hpp" -#include "log_results.hpp" -#include using std::cout; using std::endl; @@ -35,7 +37,7 @@ struct exp_result { } }; -void busy_wait(uint32_t wait_time, volatile bool& done) { +void busy_wait(uint32_t wait_time, std::atomic& done) { struct timespec start_time, end_time; clock_gettime(CLOCK_REALTIME, &start_time); @@ -80,7 +82,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // start and end time to compute bw struct timespec start_time; long long int nanoseconds_elapsed; diff --git a/src/applications/tests/performance_tests/signed_bw_test.cpp b/src/applications/tests/performance_tests/signed_bw_test.cpp index 13751255..52d19fc8 100644 --- a/src/applications/tests/performance_tests/signed_bw_test.cpp +++ b/src/applications/tests/performance_tests/signed_bw_test.cpp @@ -1,3 +1,10 @@ +#include "aggregate_bandwidth.hpp" +#include "bytes_object.hpp" +#include "log_results.hpp" +#include "partial_senders_allocator.hpp" + +#include + #include #include #include @@ -6,13 +13,6 @@ #include #include -#include - -#include "aggregate_bandwidth.hpp" -#include "bytes_object.hpp" -#include "log_results.hpp" -#include "partial_senders_allocator.hpp" - using std::cout; using std::endl; using namespace persistent; @@ -110,7 +110,7 @@ int main(int argc, char* argv[]) { break; } // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // last_version and its flag is shared between the stability callback and persistence callback. // This is a clumsy hack to figure out what version number is assigned to the last delivered message. diff --git a/src/applications/tests/performance_tests/single_active_subgroup_test.cpp b/src/applications/tests/performance_tests/single_active_subgroup_test.cpp index c2fca0aa..4b35e2ef 100644 --- a/src/applications/tests/performance_tests/single_active_subgroup_test.cpp +++ b/src/applications/tests/performance_tests/single_active_subgroup_test.cpp @@ -2,6 +2,7 @@ #include "log_results.hpp" #include +#include #include #include #include @@ -53,7 +54,7 @@ int main(int argc, char* argv[]) { Conf::initialize(argc, argv); // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&num_messages, &done, diff --git a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp index a1772c42..ba6ceda8 100644 --- a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp +++ b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -21,13 +22,10 @@ using test::Bytes; using namespace std::chrono; /** - * RPC Object with a single function that accepts a string + * RPC Object with a single function that accepts a byte array */ class TestObject { public: - void fun(const std::string& words) { - } - void bytes_fun(const Bytes& bytes) { } @@ -35,7 +33,7 @@ class TestObject { return true; } - REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(fun, bytes_fun, finishing_call)); + REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(bytes_fun, finishing_call)); }; struct exp_result { @@ -110,7 +108,7 @@ int main(int argc, char* argv[]) { break; } // variable 'done' tracks the end of the test - volatile bool done = false; + std::atomic done = false; // callback into the application code at each message delivery auto stability_callback = [&done, &send_complete_time, @@ -135,39 +133,15 @@ int main(int argc, char* argv[]) { pthread_setname_np(pthread_self(), DEFAULT_PROC_NAME); } - /******************* - derecho::SubgroupInfo subgroup_info{[num_nodes]( - const std::vector& subgroup_type_order, - const std::unique_ptr& prev_view, derecho::View& curr_view) { - if(curr_view.num_members < num_nodes) { - std::cout << "not enough members yet:" << curr_view.num_members << " < " << num_nodes << std::endl; - throw derecho::subgroup_provisioning_exception(); - } - derecho::subgroup_shard_layout_t subgroup_layout(1); - - std::vector members(num_nodes); - for(int i = 0; i < num_nodes; i++) { - members[i] = i; - } - - subgroup_layout[0].emplace_back(curr_view.make_subview(members)); - curr_view.next_unassigned_rank = std::max(curr_view.next_unassigned_rank, num_nodes); - derecho::subgroup_allocation_map_t subgroup_allocation; - subgroup_allocation.emplace(std::type_index(typeid(TestObject)), std::move(subgroup_layout)); - return subgroup_allocation; - }}; - *****************/ - auto membership_function = PartialSendersAllocator(num_nodes, senders_mode, derecho::Mode::ORDERED); derecho::SubgroupInfo subgroup_info(membership_function); - auto ba_factory = [](persistent::PersistentRegistry*, derecho::subgroup_id_t) { return std::make_unique(); }; + auto test_factory = [](persistent::PersistentRegistry*, derecho::subgroup_id_t) { return std::make_unique(); }; - derecho::Group group(derecho::UserMessageCallbacks{stability_callback}, subgroup_info, {}, std::vector{}, ba_factory); + derecho::Group group(derecho::UserMessageCallbacks{stability_callback}, subgroup_info, {}, std::vector{}, test_factory); std::cout << "Finished constructing/joining Group" << std::endl; - //std::string str_1k(max_msg_size, 'x'); - uint8_t* bbuf = (uint8_t*)malloc(max_msg_size); + uint8_t* bbuf = new uint8_t[max_msg_size]; memset(bbuf, 0, max_msg_size); Bytes bytes(bbuf, max_msg_size); @@ -208,7 +182,7 @@ int main(int argc, char* argv[]) { while(!done) { } - free(bbuf); + delete[] bbuf; int64_t nsec = duration_cast(send_complete_time - begin_time).count(); diff --git a/src/applications/tests/unit_tests/cooked_send_test.cpp b/src/applications/tests/unit_tests/cooked_send_test.cpp index b2af64b5..49a35d37 100644 --- a/src/applications/tests/unit_tests/cooked_send_test.cpp +++ b/src/applications/tests/unit_tests/cooked_send_test.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include @@ -72,10 +73,10 @@ int main(int argc, char* argv[]) { return subgroup_allocation; }; - auto cooked_subgroup_factory = [](persistent::PersistentRegistry*,derecho::subgroup_id_t) { return std::make_unique(); }; + auto cooked_subgroup_factory = [](persistent::PersistentRegistry*, derecho::subgroup_id_t) { return std::make_unique(); }; SubgroupInfo subgroup_info(subgroup_membership_function); - volatile bool done = false; + std::atomic done = false; uint32_t num_msgs = 500; auto stability_callback = [num_msgs, num_nodes, &done, counter = (uint)0](subgroup_id_t, node_id_t sender_id, message_id_t index, std::optional>, persistent::version_t) mutable { counter++; diff --git a/src/applications/tests/unit_tests/delivery_order_test.cpp b/src/applications/tests/unit_tests/delivery_order_test.cpp index 2ddd4249..cc9d8f03 100644 --- a/src/applications/tests/unit_tests/delivery_order_test.cpp +++ b/src/applications/tests/unit_tests/delivery_order_test.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -56,7 +57,7 @@ int main(int argc, char* argv[]) { std::unique_ptr> group; uint32_t my_rank; uint64_t max_msg_size = getConfUInt64(Conf::SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE); - volatile bool done = false; + std::atomic done = false; auto delivery_callback = [&, num_received_msgs_map = std::map(), received_msgs_index_map = std::map(), received_msgs = std::vector(), diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index fd51b1f6..24dd05a8 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 99; +const int COMMITS_AHEAD_OF_VERSION = 156; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+99"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+156"; } From f26b5cf8f35f6f0165b6925fd4d8c573e5f8cabd Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 28 Sep 2023 11:39:55 -0400 Subject: [PATCH 02/12] Removed the unused "scalability_tests" directory This has always been empty and we have never used it. The tests of scalability are in performance_tests anyway: running a test like bandwidth_test with larger and larger group sizes is how we test scalability. --- src/applications/tests/CMakeLists.txt | 3 +-- src/applications/tests/scalability_tests/CMakeLists.txt | 6 ------ src/core/git_version.cpp | 4 ++-- 3 files changed, 3 insertions(+), 10 deletions(-) delete mode 100644 src/applications/tests/scalability_tests/CMakeLists.txt diff --git a/src/applications/tests/CMakeLists.txt b/src/applications/tests/CMakeLists.txt index 03bc9bac..e9264ffc 100644 --- a/src/applications/tests/CMakeLists.txt +++ b/src/applications/tests/CMakeLists.txt @@ -3,5 +3,4 @@ set(CMAKE_DISABLE_SOURCE_CHANGES ON) set(CMAKE_DISABLE_IN_SOURCE_BUILD ON) add_subdirectory(unit_tests) -add_subdirectory(performance_tests) -add_subdirectory(scalability_tests) +add_subdirectory(performance_tests) \ No newline at end of file diff --git a/src/applications/tests/scalability_tests/CMakeLists.txt b/src/applications/tests/scalability_tests/CMakeLists.txt deleted file mode 100644 index ce16ea7b..00000000 --- a/src/applications/tests/scalability_tests/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -cmake_minimum_required(VERSION 3.15.4) -set(CMAKE_DISABLE_SOURCE_CHANGES ON) -set(CMAKE_DISABLE_IN_SOURCE_BUILD ON) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -include_directories(${CMAKE_SOURCE_DIR}/include) diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 24dd05a8..07288df5 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 156; +const int COMMITS_AHEAD_OF_VERSION = 157; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+156"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+157"; } From 7650ff8548a3a77bffda3e5ffb4b19d11c4b90c5 Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 28 Sep 2023 13:59:54 -0400 Subject: [PATCH 03/12] Refactored typed_subgroup_test to not use global stability callback Apparently, as of commit eb5b281a31277faea6dd61649d057fcb094ec941 the global stability callback is disabled for "cooked" (RPC) messages, which means this test can't use a stability callback to detect when the last message has been received. I'm going to try this workaround, based on the shared-atomic-bool code I wrote for signed_store_test. I'm not sure if it will work but I have to commit this in order to test it on Fractus. --- .../typed_subgroup_bw_test.cpp | 78 ++++++++++--------- src/core/git_version.cpp | 4 +- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp index ba6ceda8..7edbdd64 100644 --- a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp +++ b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp @@ -22,18 +22,42 @@ using test::Bytes; using namespace std::chrono; /** - * RPC Object with a single function that accepts a byte array + * RPC Object with a single function that accepts a byte array and counts + * the number of updates it has received, comparing it to a total provided + * in the constructor. When the total number of messages received by the + * RPC function equals the expected total, it records the time and signals + * a shared atomic flag to indicate the experiment is complete. */ -class TestObject { +class TestObject : public mutils::ByteRepresentable { + uint64_t messages_received; + const uint64_t total_num_messages; + steady_clock::time_point send_complete_time; + // Shared with the main thread + std::shared_ptr> experiment_done; + public: void bytes_fun(const Bytes& bytes) { + ++messages_received; + if(messages_received == total_num_messages) { + send_complete_time = std::chrono::steady_clock::now(); + experiment_done->store(true); + } } - - bool finishing_call(int x) { - return true; + // Called by the main thread to retrieve send_complete_time after the experiment is done + const steady_clock::time_point& get_complete_time() const { + return send_complete_time; } - REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(bytes_fun, finishing_call)); + REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(bytes_fun)); + DEFAULT_SERIALIZATION_SUPPORT(TestObject, messages_received, total_num_messages); + // Deserialization constructor. This will break experiment_done's link to the main thread, so we hope it isn't called. + TestObject(uint64_t messages_received, uint64_t total_num_messages) + : messages_received(messages_received), + total_num_messages(total_num_messages), + experiment_done(std::make_shared(false)) {} + // Constructor called by factory function + TestObject(uint64_t total_num_messages, std::shared_ptr> experiment_done) + : messages_received(0), total_num_messages(total_num_messages), experiment_done(experiment_done) {} }; struct exp_result { @@ -85,7 +109,7 @@ int main(int argc, char* argv[]) { const uint32_t count = std::stoi(argv[dashdash_pos + 2]); const uint32_t num_senders_selector = std::stoi(argv[dashdash_pos + 3]); - steady_clock::time_point begin_time, send_complete_time; + steady_clock::time_point begin_time; // Convert this integer to a more readable enum value const PartialSendMode senders_mode = num_senders_selector == 0 @@ -108,24 +132,7 @@ int main(int argc, char* argv[]) { break; } // variable 'done' tracks the end of the test - std::atomic done = false; - // callback into the application code at each message delivery - auto stability_callback = [&done, - &send_complete_time, - total_num_messages, - num_delivered = 0u](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - // Count the total number of messages delivered - ++num_delivered; - // Check for completion - if(num_delivered == total_num_messages) { - send_complete_time = std::chrono::steady_clock::now(); - done = true; - } - }; + std::shared_ptr> done = std::make_shared>(false); if(dashdash_pos + 4 < argc) { pthread_setname_np(pthread_self(), argv[dashdash_pos + 3]); @@ -136,9 +143,12 @@ int main(int argc, char* argv[]) { auto membership_function = PartialSendersAllocator(num_nodes, senders_mode, derecho::Mode::ORDERED); derecho::SubgroupInfo subgroup_info(membership_function); - auto test_factory = [](persistent::PersistentRegistry*, derecho::subgroup_id_t) { return std::make_unique(); }; + auto test_factory = [&](persistent::PersistentRegistry*, derecho::subgroup_id_t) { + return std::make_unique(total_num_messages, done); + }; - derecho::Group group(derecho::UserMessageCallbacks{stability_callback}, subgroup_info, {}, std::vector{}, test_factory); + derecho::Group group(derecho::UserMessageCallbacks{nullptr}, subgroup_info, {}, + std::vector{}, test_factory); std::cout << "Finished constructing/joining Group" << std::endl; uint8_t* bbuf = new uint8_t[max_msg_size]; @@ -169,20 +179,14 @@ int main(int argc, char* argv[]) { send_all(); } } - /* - if(node_rank == 0) { - derecho::rpc::QueryResults results = handle.ordered_send(0); - std::cout << "waiting for response..." << std::endl; -#pragma GCC diagnostic ignored "-Wunused-variable" - decltype(results)::ReplyMap& replies = results.get(); -#pragma GCC diagnostic pop - } - */ - while(!done) { + while(!(*done)) { } delete[] bbuf; + // Retrieve the completion time from the subgroup object + // Replicated::get_ref is unsafe, but the group should be idle by the time *done is true + steady_clock::time_point send_complete_time = group.get_subgroup().get_ref().get_complete_time(); int64_t nsec = duration_cast(send_complete_time - begin_time).count(); diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 07288df5..f2f1ce6d 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 157; +const int COMMITS_AHEAD_OF_VERSION = 158; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+157"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+158"; } From 6ec54064f728f84586d37640949fd31a90497436 Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 28 Sep 2023 17:25:40 -0400 Subject: [PATCH 04/12] Improved documentation of DeserializationContext and stability callback After talking with Weijia I think I understand how the vector of DeserializationContext pointers can be used to link replicated objects with state in the main thread. This is actually an important component of making Replicated Objects work in more complicated settings, so I added some more comments documenting what the DeserializationContexts are used for, and replaced the confusing name "rdv" with something more meaningful. I also updated the comments on global_stability_callback to note that it is not called unless you use raw messages, and removed the commented-out old code, since this design decision is not going to be changed. --- .../derecho/core/detail/derecho_internal.hpp | 7 ++++- .../core/detail/external_group_impl.hpp | 8 +++--- .../derecho/core/detail/remote_invocable.hpp | 18 ++++++------ .../derecho/core/detail/replicated_impl.hpp | 4 +-- include/derecho/core/detail/rpc_manager.hpp | 17 ++++++++--- include/derecho/core/detail/rpc_utils.hpp | 6 ++-- include/derecho/core/external_group.hpp | 2 +- include/derecho/core/group.hpp | 28 +++++++++++-------- src/core/git_version.cpp | 4 +-- src/core/multicast_group.cpp | 13 --------- src/core/rpc_manager.cpp | 12 ++++---- 11 files changed, 61 insertions(+), 58 deletions(-) diff --git a/include/derecho/core/detail/derecho_internal.hpp b/include/derecho/core/detail/derecho_internal.hpp index 60af5dcb..354a58c7 100644 --- a/include/derecho/core/detail/derecho_internal.hpp +++ b/include/derecho/core/detail/derecho_internal.hpp @@ -71,7 +71,12 @@ using rpc_handler_t = std::function::ExternalGroupClient( busy_wait_before_sleep_ms(getConfUInt64(Conf::DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS)) { RpcLoggerPtr::initialize(); for(auto dc : deserialization_contexts) { - rdv.push_back(dc); + deserialization_contexts.push_back(dc); } #ifdef USE_VERBS_API sst::verbs_initialize({}, @@ -383,7 +383,7 @@ std::exception_ptr ExternalGroupClient::receive_message( } std::size_t reply_header_size = header_space(); recv_ret reply_return = receiver_function_entry->second( - &rdv, received_from, buf, + &deserialization_contexts, received_from, buf, [&out_alloc, &reply_header_size](std::size_t size) { return out_alloc(size + reply_header_size) + reply_header_size; }); @@ -405,7 +405,7 @@ void ExternalGroupClient::p2p_message_handler(node_id_t send Opcode indx; node_id_t received_from; uint32_t flags; - retrieve_header(nullptr, msg_buf, payload_size, indx, received_from, flags); + retrieve_header(msg_buf, payload_size, indx, received_from, flags); if(indx.is_reply) { // REPLYs can be handled here because they do not block. receive_message(indx, received_from, msg_buf + header_size, payload_size, @@ -447,7 +447,7 @@ void ExternalGroupClient::p2p_request_worker() { request = p2p_request_queue.front(); p2p_request_queue.pop(); } - retrieve_header(nullptr, request.msg_buf, payload_size, indx, received_from, flags); + retrieve_header(request.msg_buf, payload_size, indx, received_from, flags); if(indx.is_reply || RPC_HEADER_FLAG_TST(flags, CASCADE)) { dbg_error(rpc_logger, "Invalid rpc message in fifo queue: is_reply={}, is_cascading={}", indx.is_reply, RPC_HEADER_FLAG_TST(flags, CASCADE)); diff --git a/include/derecho/core/detail/remote_invocable.hpp b/include/derecho/core/detail/remote_invocable.hpp index c52594a5..103fe250 100644 --- a/include/derecho/core/detail/remote_invocable.hpp +++ b/include/derecho/core/detail/remote_invocable.hpp @@ -199,18 +199,18 @@ struct RemoteInvoker> { /** * Entry point for responses; called when a message is received that * contains a response to this RemoteInvocable function's RPC call. - * @param rdv Deserialization vector + * @param contexts Deserialization context vector * @param nid The ID of the node that sent the response * @param response The byte buffer containing the response message - * @param f + * @param f An allocation function (unused) * @return A recv_ret containing nothing of value. */ - inline recv_ret receive_response( //mutils::DeserializationManager* dsm, - mutils::RemoteDeserialization_v* rdv, + inline recv_ret receive_response( + mutils::RemoteDeserialization_v* contexts, const node_id_t& nid, const uint8_t* response, const std::function& f) { constexpr std::is_same* choice{nullptr}; - mutils::DeserializationManager dsm{*rdv}; + mutils::DeserializationManager dsm{*contexts}; return receive_response(choice, &dsm, nid, response, f); } @@ -347,18 +347,18 @@ struct RemoteInvocable> { * Entry point for handling an RPC function call to this RemoteInvocable * function. Called when a message is received that contains a request to * call this function. - * @param rdv Deserialization vector + * @param contexts Deserialization context vector * @param who The node that sent the message * @param recv_buf The buffer containing the received message * @param out_alloc A function that can allocate a buffer for the response message * @return */ - inline recv_ret receive_call( //mutils::DeserializationManager* dsm, - mutils::RemoteDeserialization_v* rdv, + inline recv_ret receive_call( + mutils::RemoteDeserialization_v* contexts, const node_id_t& who, const uint8_t* recv_buf, const std::function& out_alloc) { constexpr std::is_same* choice{nullptr}; - mutils::DeserializationManager dsm{*rdv}; + mutils::DeserializationManager dsm{*contexts}; return this->receive_call(choice, &dsm, who, recv_buf, out_alloc); } diff --git a/include/derecho/core/detail/replicated_impl.hpp b/include/derecho/core/detail/replicated_impl.hpp index 9ef52734..b14a8b53 100644 --- a/include/derecho/core/detail/replicated_impl.hpp +++ b/include/derecho/core/detail/replicated_impl.hpp @@ -197,8 +197,8 @@ void Replicated::send_object_raw(tcp::socket& receiver_socket) const { template std::size_t Replicated::receive_object(uint8_t* buffer) { - // *user_object_ptr = std::move(mutils::from_bytes(&group_rpc_manager.dsm, buffer)); - mutils::RemoteDeserialization_v rdv{group_rpc_manager.rdv}; + // Add this object's persistent registry to the list of deserialization contexts + mutils::RemoteDeserialization_v rdv{group_rpc_manager.deserialization_contexts}; rdv.insert(rdv.begin(), persistent_registry.get()); mutils::DeserializationManager dsm{rdv}; *user_object_ptr = std::move(mutils::from_bytes(&dsm, buffer)); diff --git a/include/derecho/core/detail/rpc_manager.hpp b/include/derecho/core/detail/rpc_manager.hpp index b1a75ea9..d7a9dfd6 100644 --- a/include/derecho/core/detail/rpc_manager.hpp +++ b/include/derecho/core/detail/rpc_manager.hpp @@ -89,10 +89,12 @@ class RPCManager { * from the targets of an earlier remote call. * Note that a FunctionID is (class ID, subgroup ID, Function Tag). */ std::unique_ptr> receivers; - /** An emtpy DeserializationManager, in case we need it later. */ - // mutils::DeserializationManager dsm{{}}; - // Weijia: I prefer the deserialization context vector. - mutils::RemoteDeserialization_v rdv; + /** + * A copy of the user-provided deserialization context vector, which is + * also stored in Group. Provided to from_bytes when deserializing a user- + * defined Replicated Object. + */ + mutils::RemoteDeserialization_v deserialization_contexts; template friend class ::derecho::Replicated; // Give only Replicated access to view_manager @@ -256,6 +258,13 @@ class RPCManager { const std::function& out_alloc); public: + /** + * Constructor + * + * @param group_view_manager A reference to the ViewManager object + * @param deserialization_context A reference to the vector of user-provided + * deserialization contexts from Group, which will be copied in + */ RPCManager(ViewManager& group_view_manager, const std::vector& deserialization_context); diff --git a/include/derecho/core/detail/rpc_utils.hpp b/include/derecho/core/detail/rpc_utils.hpp index 5e2f1e42..ebe74e8e 100644 --- a/include/derecho/core/detail/rpc_utils.hpp +++ b/include/derecho/core/detail/rpc_utils.hpp @@ -234,7 +234,7 @@ struct recv_ret { * some RPC message is received. */ using receive_fun_t = std::function& out_alloc)>; //Forward declaration of PendingResults, to be used by QueryResults @@ -1145,9 +1145,7 @@ inline void populate_header(uint8_t* reply_buf, reinterpret_cast(reply_buf + offset)[0] = flags; // flags } -//inline void retrieve_header(mutils::DeserializationManager* dsm, -inline void retrieve_header(mutils::RemoteDeserialization_v* rdv, - const uint8_t* reply_buf, +inline void retrieve_header(const uint8_t* reply_buf, std::size_t& payload_size, Opcode& op, node_id_t& from, uint32_t& flags) { std::size_t offset = 0; diff --git a/include/derecho/core/external_group.hpp b/include/derecho/core/external_group.hpp index 91a8655e..f89ccd60 100644 --- a/include/derecho/core/external_group.hpp +++ b/include/derecho/core/external_group.hpp @@ -177,7 +177,7 @@ class ExternalGroupClient { std::queue p2p_request_queue; std::mutex request_queue_mutex; std::condition_variable request_queue_cv; - mutils::RemoteDeserialization_v rdv; + mutils::RemoteDeserialization_v deserialization_contexts; void p2p_receive_loop(); void p2p_request_worker(); void p2p_message_handler(node_id_t sender_id, uint8_t* msg_buf); diff --git a/include/derecho/core/group.hpp b/include/derecho/core/group.hpp index 7921ceba..9b7c9a15 100644 --- a/include/derecho/core/group.hpp +++ b/include/derecho/core/group.hpp @@ -188,14 +188,16 @@ class Group : public virtual _Group, public GroupProjection... const node_id_t my_id; /** - * The shared pointer holding deserialization context is obsolete. I (Weijia) - * removed it because it complicated things: the deserialization context is - * generally a big object containing the group handle; however, the group handle - * need to hold a shared pointer to the object, which causes a dependency loop - * and results in an object indirectly holding a shared pointer to its self. - * Another side effect is double free. So I change it back to the raw pointer. - * The user deserialization context for all objects serialized and deserialized. */ - // std::shared_ptr user_deserialization_context; + * A list (possibly empty) of user-provided deserialization contexts that are + * needed to help deserialize the Replicated Objects. These should be passed + * into the from_bytes method whenever from_bytes is called on a Replicated + * Object. They are stored as raw pointers, even though it is more dangerous, + * because Weijia found that trying to store a shared_ptr here complicated + * things: the deserialization context is generally a big object containing + * the group handle; however, the group handle need to hold a shared pointer + * to the object, which causes a dependency loop and results in an object + * indirectly holding a shared pointer to its self. + */ std::vector user_deserialization_context; /** Persist the objects. Once persisted, persistence_manager updates the SST @@ -330,9 +332,13 @@ class Group : public virtual _Group, public GroupProjection... * events in this group. * @param subgroup_info The set of functions that define how membership in * each subgroup and shard will be determined in this group. - * @param deserialization_context The context used for deserialization - * purpose. The application is responsible to keep it alive during Group - * object lifetime. + * @param deserialization_context A list of pointers to deserialization + * contexts, which are objects needed to help deserialize user-provided + * Replicated Object classes. These context pointers will be provided in + * the DeserializationManager argument to from_bytes any time a Replicated + * Object is deserialized, e.g. during state transfer. The calling + * application is responsible for keeping these objects alive during the + * lifetime of the Group, since Group does not own the pointers. * @param _view_upcalls A list of functions to be called when the group * experiences a View-Change event (optional). * @param factories A variable number of Factory functions, one for each diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index f2f1ce6d..e8004752 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 158; +const int COMMITS_AHEAD_OF_VERSION = 159; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+158"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+159"; } diff --git a/src/core/multicast_group.cpp b/src/core/multicast_group.cpp index f3b861ad..f593f8a6 100644 --- a/src/core/multicast_group.cpp +++ b/src/core/multicast_group.cpp @@ -508,12 +508,6 @@ void MulticastGroup::deliver_message(RDMCMessage& msg, const subgroup_id_t& subg auto payload_size = msg.size - h->header_size; internal_callbacks.post_next_version_callback(subgroup_num, version, msg_ts_us); internal_callbacks.rpc_callback(subgroup_num, msg.sender_id, version, msg_ts_us, buf, payload_size); - - // if(callbacks.global_stability_callback) { - // callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {}, - // version); - // } - } else if(callbacks.global_stability_callback) { callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {{buf + h->header_size, msg.size - h->header_size}}, @@ -536,13 +530,6 @@ void MulticastGroup::deliver_message(SSTMessage& msg, const subgroup_id_t& subgr auto payload_size = msg.size - h->header_size; internal_callbacks.post_next_version_callback(subgroup_num, version, msg_ts_us); internal_callbacks.rpc_callback(subgroup_num, msg.sender_id, version, msg_ts_us, buf, payload_size); - - // if(callbacks.global_stability_callback) { - // callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, - // {{buf + h->header_size, msg.size - h->header_size}}, - // version); - // } - } else if(callbacks.global_stability_callback) { callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {{buf + h->header_size, msg.size - h->header_size}}, diff --git a/src/core/rpc_manager.cpp b/src/core/rpc_manager.cpp index 36f8d6cf..45d183f9 100644 --- a/src/core/rpc_manager.cpp +++ b/src/core/rpc_manager.cpp @@ -28,12 +28,10 @@ RPCManager::RPCManager(ViewManager& group_view_manager, : nid(getConfUInt32(Conf::DERECHO_LOCAL_ID)), rpc_logger(LoggerFactory::createIfAbsent(LoggerFactory::RPC_LOGGER_NAME, getConfString(Conf::LOGGER_RPC_LOG_LEVEL))), receivers(new std::decay_t()), + deserialization_contexts(deserialization_context), view_manager(group_view_manager), busy_wait_before_sleep_ms(getConfUInt64(Conf::DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS)) { RpcLoggerPtr::initialize(); - for(const auto& deserialization_context_ptr : deserialization_context) { - rdv.push_back(deserialization_context_ptr); - } rpc_listener_thread = std::thread(&RPCManager::p2p_receive_loop, this); } @@ -121,7 +119,7 @@ std::exception_ptr RPCManager::receive_message( std::size_t reply_header_size = header_space(); //Pass through the provided out_alloc function, but add space for the reply header recv_ret reply_return = receiver_function_entry->second( - &rdv, received_from, buf, + &deserialization_contexts, received_from, buf, [&out_alloc, &reply_header_size](std::size_t size) { return out_alloc(size + reply_header_size) + reply_header_size; }); @@ -143,7 +141,7 @@ std::exception_ptr RPCManager::parse_and_receive(uint8_t* buf, std::size_t size, Opcode indx; node_id_t received_from; uint32_t flags; - retrieve_header(&rdv, buf, payload_size, indx, received_from, flags); + retrieve_header(buf, payload_size, indx, received_from, flags); RPCManager::rpc_caller_id = received_from; return receive_message(indx, received_from, buf + header_space(), payload_size, out_alloc); @@ -233,7 +231,7 @@ void RPCManager::p2p_message_handler(node_id_t sender_id, uint8_t* msg_buf) { Opcode indx; node_id_t received_from; uint32_t flags; - retrieve_header(nullptr, msg_buf, payload_size, indx, received_from, flags); + retrieve_header(msg_buf, payload_size, indx, received_from, flags); dbg_trace(rpc_logger, "Handling a P2P message: function_id = {}, is_reply = {}, received_from = {}, payload_size = {}, invocation_id = {}", indx.function_id, indx.is_reply, received_from, payload_size, ((long*)(msg_buf + header_size))[0]); if(indx.is_reply) { @@ -487,7 +485,7 @@ void RPCManager::p2p_request_worker() { request = p2p_request_queue.front(); p2p_request_queue.pop(); } - retrieve_header(nullptr, request.msg_buf, payload_size, indx, received_from, flags); + retrieve_header(request.msg_buf, payload_size, indx, received_from, flags); if(indx.is_reply || RPC_HEADER_FLAG_TST(flags, CASCADE)) { dbg_error(rpc_logger, "Invalid rpc message in fifo queue: is_reply={}, is_cascading={}", indx.is_reply, RPC_HEADER_FLAG_TST(flags, CASCADE)); From eb7501e86616dc09f3f0e5af5bcc272f8bb0bd15 Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 28 Sep 2023 18:31:29 -0400 Subject: [PATCH 05/12] Rewrote typed_subgroup_test to use DeserializationContext Now that I know how to use DeserializationContexts to give deserialized objects pointers to local objects, this should be the proper way to make TestObject share state with the main method. --- .../typed_subgroup_bw_test.cpp | 72 ++++++++++++------- src/core/git_version.cpp | 4 +- 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp index 7edbdd64..280b779f 100644 --- a/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp +++ b/src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp @@ -21,6 +21,14 @@ using std::endl; using test::Bytes; using namespace std::chrono; +/** + * State shared between the (replicated) TestObjects and the main thread + */ +struct TestState : public derecho::DeserializationContext { + std::atomic experiment_done; + steady_clock::time_point send_complete_time; +}; + /** * RPC Object with a single function that accepts a byte array and counts * the number of updates it has received, comparing it to a total provided @@ -31,33 +39,49 @@ using namespace std::chrono; class TestObject : public mutils::ByteRepresentable { uint64_t messages_received; const uint64_t total_num_messages; - steady_clock::time_point send_complete_time; - // Shared with the main thread - std::shared_ptr> experiment_done; + // Pointer to a TestState object held by the main thread + TestState* main_test_state; public: void bytes_fun(const Bytes& bytes) { ++messages_received; if(messages_received == total_num_messages) { - send_complete_time = std::chrono::steady_clock::now(); - experiment_done->store(true); + main_test_state->send_complete_time = std::chrono::steady_clock::now(); + main_test_state->experiment_done = true; } } - // Called by the main thread to retrieve send_complete_time after the experiment is done - const steady_clock::time_point& get_complete_time() const { - return send_complete_time; - } REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(bytes_fun)); - DEFAULT_SERIALIZATION_SUPPORT(TestObject, messages_received, total_num_messages); - // Deserialization constructor. This will break experiment_done's link to the main thread, so we hope it isn't called. - TestObject(uint64_t messages_received, uint64_t total_num_messages) + DEFAULT_SERIALIZE(messages_received, total_num_messages); + // Custom deserialization so we can use the DeserializationManager + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + DEFAULT_DESERIALIZE_NOALLOC(TestObject); + // Deserialization constructor. The TestState pointer should be supplied by the deserialization context. + TestObject(uint64_t messages_received, uint64_t total_num_messages, TestState* main_test_state) : messages_received(messages_received), total_num_messages(total_num_messages), - experiment_done(std::make_shared(false)) {} + main_test_state(main_test_state) {} // Constructor called by factory function - TestObject(uint64_t total_num_messages, std::shared_ptr> experiment_done) - : messages_received(0), total_num_messages(total_num_messages), experiment_done(experiment_done) {} + TestObject(uint64_t total_num_messages, TestState* test_state) + : messages_received(0), total_num_messages(total_num_messages), main_test_state(test_state) {} +}; + +std::unique_ptr TestObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + // Default deserialize the first 2 fields + auto messages_received_ptr = mutils::from_bytes(dsm, buffer); + auto total_num_ptr = mutils::from_bytes(dsm, buffer + mutils::bytes_size(*messages_received_ptr)); + // Retrieve a pointer to TestState from DSM + TestState* test_state_ptr = nullptr; + assert(dsm); + assert(dsm->registered()); + if(dsm && dsm->registered()) { + test_state_ptr = &(dsm->mgr()); + } else { + // Asserts are disabled in release mode, so try to provide some information if there's a bug + std::cerr << "ERROR: Unable to get TestState pointer in TestObject deserialization. TestObject will crash when it attempts to dereference it." << std::endl; + } + assert(test_state_ptr); + return std::make_unique(*messages_received_ptr, *total_num_ptr, test_state_ptr); }; struct exp_result { @@ -131,8 +155,9 @@ int main(int argc, char* argv[]) { total_num_messages = count; break; } - // variable 'done' tracks the end of the test - std::shared_ptr> done = std::make_shared>(false); + // The replicated TestObject will get a pointer to this object and use it to signal that the test is done + TestState shared_test_state; + shared_test_state.experiment_done = false; if(dashdash_pos + 4 < argc) { pthread_setname_np(pthread_self(), argv[dashdash_pos + 3]); @@ -144,10 +169,12 @@ int main(int argc, char* argv[]) { derecho::SubgroupInfo subgroup_info(membership_function); auto test_factory = [&](persistent::PersistentRegistry*, derecho::subgroup_id_t) { - return std::make_unique(total_num_messages, done); + return std::make_unique(total_num_messages, &shared_test_state); }; - derecho::Group group(derecho::UserMessageCallbacks{nullptr}, subgroup_info, {}, + std::vector context_vector{&shared_test_state}; + + derecho::Group group(derecho::UserMessageCallbacks{nullptr}, subgroup_info, context_vector, std::vector{}, test_factory); std::cout << "Finished constructing/joining Group" << std::endl; @@ -180,15 +207,12 @@ int main(int argc, char* argv[]) { } } - while(!(*done)) { + while(!shared_test_state.experiment_done) { } delete[] bbuf; - // Retrieve the completion time from the subgroup object - // Replicated::get_ref is unsafe, but the group should be idle by the time *done is true - steady_clock::time_point send_complete_time = group.get_subgroup().get_ref().get_complete_time(); - int64_t nsec = duration_cast(send_complete_time - begin_time).count(); + int64_t nsec = duration_cast(shared_test_state.send_complete_time - begin_time).count(); double thp_gbps = (static_cast(total_num_messages) * max_msg_size) / nsec; double thp_ops = (static_cast(total_num_messages) * 1000000000) / nsec; diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index e8004752..e6704904 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 159; +const int COMMITS_AHEAD_OF_VERSION = 161; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+159"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+161"; } From 11e53d8e4f700418064bc6e5a49e50c06b26f612 Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 28 Sep 2023 20:07:35 -0400 Subject: [PATCH 06/12] Updated cooked_send_test to use DeserializationContext This test seems very similar to typed_subgroup_bw_test and was also relying on a global stability callback to determine when it was done. Tried a slight variation on the "TestState" object where the object has a "callback-like" method instead of just being a simple struct. --- .../tests/unit_tests/cooked_send_test.cpp | 84 ++++++++++++------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/src/applications/tests/unit_tests/cooked_send_test.cpp b/src/applications/tests/unit_tests/cooked_send_test.cpp index 49a35d37..31154496 100644 --- a/src/applications/tests/unit_tests/cooked_send_test.cpp +++ b/src/applications/tests/unit_tests/cooked_send_test.cpp @@ -12,16 +12,46 @@ using std::map; using std::pair; using std::vector; +/** + * This object contains state that is shared between the replicated test objects + * and the main thread, rather than stored inside the replicated objects. It's + * used to provide a way for the replicated objects to "call back" to the main + * thread. Each replicated object will get a pointer to this object when it is + * constructed or deserialized, set up by the deserialization manager. + */ +struct TestState : public derecho::DeserializationContext { + uint32_t num_messages; + uint32_t num_nodes; + uint32_t counter; + std::atomic done; + void check_test_done() { + if(counter == num_messages * num_nodes) { + done = true; + } + } + TestState(uint32_t num_messages, uint32_t num_nodes) + : num_messages(num_messages), + num_nodes(num_nodes), + counter(0), + done(false) {} +}; + class CookedMessages : public mutils::ByteRepresentable { vector> msgs; // vector of (nodeid, msg #) + TestState* test_state; public: - CookedMessages() = default; - CookedMessages(const vector>& msgs) : msgs(msgs) { - } + // Factory constructor + CookedMessages(TestState* test_state) : test_state(test_state) {} + // Deserialization constructor + CookedMessages(const vector>& msgs, TestState* test_state) + : msgs(msgs), test_state(test_state) {} void send(uint nodeid, uint msg) { msgs.push_back(std::make_pair(nodeid, msg)); + // Count the number of RPC messages received here + test_state->counter++; + test_state->check_test_done(); } vector> get_msgs(uint start_index, uint end_index) { @@ -32,13 +62,23 @@ class CookedMessages : public mutils::ByteRepresentable { return vector>(msgs.begin() + start_index, msgs.begin() + end_index); } - // default state - DEFAULT_SERIALIZATION_SUPPORT(CookedMessages, msgs); + DEFAULT_SERIALIZE(msgs); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + DEFAULT_DESERIALIZE_NOALLOC(CookedMessages); // what operations you want as part of the subgroup REGISTER_RPC_FUNCTIONS(CookedMessages, ORDERED_TARGETS(send, get_msgs)); }; +// Custom deserializer that retrieves the TestState pointer from the DeserializationManager +std::unique_ptr CookedMessages::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto msgs_ptr = mutils::from_bytes>>(dsm, buffer); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*msgs_ptr, test_state_ptr); +} + bool verify_local_order(vector> msgs) { map order; for(auto [nodeid, msg] : msgs) { @@ -58,33 +98,19 @@ int main(int argc, char* argv[]) { } const uint32_t num_nodes = atoi(argv[1]); Conf::initialize(argc, argv); - auto subgroup_membership_function = [num_nodes]( - const std::vector& subgroup_type_order, - const std::unique_ptr& prev_view, derecho::View& curr_view) { - auto& members = curr_view.members; - auto num_members = members.size(); - if(num_members < num_nodes) { - throw subgroup_provisioning_exception(); - } - subgroup_shard_layout_t layout(num_members); - layout[0].push_back(curr_view.make_subview(vector(members))); - derecho::subgroup_allocation_map_t subgroup_allocation; - subgroup_allocation.emplace(std::type_index(typeid(CookedMessages)), std::move(layout)); - return subgroup_allocation; - }; - auto cooked_subgroup_factory = [](persistent::PersistentRegistry*, derecho::subgroup_id_t) { return std::make_unique(); }; + // Configure the default subgroup allocator to put all the nodes in one fixed-size subgroup + SubgroupInfo subgroup_info(derecho::DefaultSubgroupAllocator( + {{std::type_index(typeid(CookedMessages)), + derecho::one_subgroup_policy(derecho::fixed_even_shards(1, num_nodes))}})); - SubgroupInfo subgroup_info(subgroup_membership_function); - std::atomic done = false; uint32_t num_msgs = 500; - auto stability_callback = [num_msgs, num_nodes, &done, counter = (uint)0](subgroup_id_t, node_id_t sender_id, message_id_t index, std::optional>, persistent::version_t) mutable { - counter++; - if(counter == num_msgs * num_nodes) { - done = true; - } + TestState test_state(num_msgs, num_nodes); + auto cooked_subgroup_factory = [&](persistent::PersistentRegistry*, derecho::subgroup_id_t) { + return std::make_unique(&test_state); }; - Group group({stability_callback}, subgroup_info, {}, {}, cooked_subgroup_factory); + // Put a pointer to test_state in Group's vector of DeserializationContexts so it will be passed to CookedMessages + Group group({}, subgroup_info, {&test_state}, {}, cooked_subgroup_factory); cout << "Finished constructing/joining the group" << endl; auto group_members = group.get_members(); @@ -107,7 +133,7 @@ int main(int argc, char* argv[]) { for(uint i = 1; i < num_msgs + 1; ++i) { cookedMessagesHandle.ordered_send(my_rank, i); } - while(!done) { + while(!test_state.done) { } if(my_rank == 0) { uint32_t max_msg_size = getConfUInt64(Conf::SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE); From 65c1c16fa74f3c80782f9ef20b380a031049a0aa Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 28 Sep 2023 21:27:05 -0400 Subject: [PATCH 07/12] Added more trace-level logging inside lf.cpp Tests are sometimes getting stuck on startup, during the construction of SST resources objects. I think the problem is within connect_endpoint but I can't tell exactly which LibFabric operation is blocking the "client" end from connecting to the "server" end. --- src/core/git_version.cpp | 4 ++-- src/sst/lf.cpp | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index e6704904..c40a8a22 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 161; +const int COMMITS_AHEAD_OF_VERSION = 163; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+161"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+163"; } diff --git a/src/sst/lf.cpp b/src/sst/lf.cpp index 5d92d4a0..e411fd92 100644 --- a/src/sst/lf.cpp +++ b/src/sst/lf.cpp @@ -263,6 +263,7 @@ void _resources::connect_endpoint(bool is_lf_server) { fi_freeinfo(entry.info); crash_with_message("failed to initialize server endpoint.\n"); } + dbg_trace(sst_logger, "calling fi_accept()"); if(fi_accept(this->ep, NULL, 0)) { fi_reject(g_ctxt.pep, entry.info->handle, NULL, 0); fi_freeinfo(entry.info); @@ -292,6 +293,7 @@ void _resources::connect_endpoint(bool is_lf_server) { malloc, remote_cm_data.pep_addr_len); memcpy((void*)client_hints->dest_addr, (void*)remote_cm_data.pep_addr, (size_t)remote_cm_data.pep_addr_len); client_hints->dest_addrlen = remote_cm_data.pep_addr_len; + dbg_trace(sst_logger, "calling fi_getinfo()"); fail_if_nonzero_retry_on_eagain("fi_getinfo() failed.", CRASH_ON_FAILURE, fi_getinfo, LF_VERSION, nullptr, nullptr, 0, client_hints, &client_info); if(init_endpoint(client_info)) { @@ -300,9 +302,10 @@ void _resources::connect_endpoint(bool is_lf_server) { crash_with_message("failed to initialize client endpoint.\n"); } + dbg_trace(sst_logger, "calling fi_connect()"); fail_if_nonzero_retry_on_eagain("fi_connect()", CRASH_ON_FAILURE, fi_connect, this->ep, remote_cm_data.pep_addr, nullptr, 0); - + dbg_trace(sst_logger, "fi_connect() succeeded, calling fi_eq_sread()"); nRead = fi_eq_sread(this->eq, &event, &entry, sizeof(entry), -1, 0); if(nRead != sizeof(entry)) { dbg_error(sst_logger, "failed to connect remote."); From 21e618746349c99f8f7b6bfc22295d90d042badf Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Wed, 4 Oct 2023 17:33:59 -0400 Subject: [PATCH 08/12] Updated signed_log_test to use DeserializationContext This test also relied on the global stability callback to signal the main thread, which won't work any more. Since it also used the persistence and verified callbacks, which still work for cooked messages, I'm creating a test-state object that receives those callbacks as well as the "message delivered" callback, instead of putting some logic in the main method and some within the test state object. This also eliminates the need to create 3 counters and 3 condition variables, since each node should only be in one subgroup, and the test object will only contain the state for the local node's subgroup. --- .../tests/unit_tests/signed_log_test.cpp | 283 ++++++++++++------ 1 file changed, 198 insertions(+), 85 deletions(-) diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index c16b03ce..2d4eae24 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -17,40 +17,141 @@ #include -class OneFieldObject : public mutils::ByteRepresentable, public derecho::SignedPersistentFields { +/** + * This object contains state that is shared between the replicated test objects + * and the main thread, rather than stored inside the replicated objects. It's + * used to provide a way for the replicated objects to "call back" to the main + * thread. Each replicated object will get a pointer to this object when it is + * constructed or deserialized, set up by the deserialization manager. + */ +struct TestState : public derecho::DeserializationContext { + // Set by the main thread after it figures out which subgroup this node was assigned to + derecho::subgroup_id_t my_subgroup_id; + // Set by the main thread after it figures out which subgroup this node was assigned to + uint32_t subgroup_total_updates; + // Counts the number of updates delivered within this node's subgroup + uint32_t subgroup_updates_delivered; + persistent::version_t last_version; + std::atomic last_version_ready; + // Mutex for subgroup_finished + std::mutex finish_mutex; + // Condition variable used to indicate when this node's subgroup has finished persisting/verifying all updates + std::condition_variable subgroup_finished_condition; + // Boolean to set to true when signaling the condition variable + bool subgroup_finished; + // Called from replicated object update_state methods to notify the main thread that an update was delivered + void update_delivered(persistent::version_t version) { + subgroup_updates_delivered++; + if(subgroup_updates_delivered == subgroup_total_updates) { + last_version = version; + last_version_ready = true; + } + } + // Called by Derecho's global persistence callback + void global_persistence_callback(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); + //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" + //NOTE: This relies on UnsignedObject always being the third subgroup (with ID 2) + if(subgroup_id == 2 && last_version_ready && version == last_version) { + { + std::unique_lock finish_lock(finish_mutex); + subgroup_finished = true; + } + subgroup_finished_condition.notify_all(); + } + } + // Called by Derecho's global verified callback + void global_verified_callback(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + dbg_default_info("Verified: Subgroup {}, version {}.", subgroup_id, version); + dbg_default_flush(); + // Each node should only be placed in one subgroup, so this callback should not be invoked for any other subgroup IDs + assert(subgroup_id == my_subgroup_id); + if(last_version_ready && version == last_version) { + { + std::unique_lock finish_lock(finish_mutex); + subgroup_finished = true; + } + subgroup_finished_condition.notify_all(); + } + } +}; + +class OneFieldObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { persistent::Persistent string_field; + TestState* test_state; public: - OneFieldObject(persistent::PersistentRegistry* registry) + /** Factory constructor */ + OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) : string_field(std::make_unique, - "OneFieldObjectStringField", registry, true) {} - OneFieldObject(persistent::Persistent& other_value) - : string_field(std::move(other_value)) {} + "OneFieldObjectStringField", registry, true), + test_state(test_state) { + assert(test_state); + } + /** Deserialization constructor */ + OneFieldObject(persistent::Persistent& other_value, + TestState* test_state) + : string_field(std::move(other_value)), + test_state(test_state) { + assert(test_state); + } std::string get_state() const { return *string_field; } - void update_state(const std::string& new_value) { - *string_field = new_value; - } + void update_state(const std::string& new_value); - DEFAULT_SERIALIZATION_SUPPORT(OneFieldObject, string_field); REGISTER_RPC_FUNCTIONS(OneFieldObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); + + DEFAULT_SERIALIZE(string_field); + DEFAULT_DESERIALIZE_NOALLOC(OneFieldObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); }; -class TwoFieldObject : public mutils::ByteRepresentable, public derecho::SignedPersistentFields { +// Can't be declared inline because it uses get_subgroup +void OneFieldObject::update_state(const std::string& new_value) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); + *string_field = new_value; + test_state->update_delivered(std::get<0>(version_and_timestamp)); + } + +// Custom deserializer that retrieves the TestState pointer from the DeserializationManager +std::unique_ptr OneFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto field_ptr = mutils::from_bytes>(dsm, buffer); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*field_ptr, test_state_ptr); +} + +class TwoFieldObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { persistent::Persistent foo; persistent::Persistent bar; + TestState* test_state; public: - TwoFieldObject(persistent::PersistentRegistry* registry) + /** Factory constructor */ + TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), - bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true) {} + bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true), + test_state(test_state) { + assert(test_state); + } + /** Deserialization constructor */ TwoFieldObject(persistent::Persistent& other_foo, - persistent::Persistent& other_bar) + persistent::Persistent& other_bar, + TestState* test_state) : foo(std::move(other_foo)), - bar(std::move(other_bar)) {} + bar(std::move(other_bar)), + test_state(test_state) { + assert(test_state); + } std::string get_foo() const { return *foo; @@ -60,35 +161,79 @@ class TwoFieldObject : public mutils::ByteRepresentable, public derecho::SignedP return *bar; } - void update(const std::string& new_foo, const std::string& new_bar) { + void update(const std::string& new_foo, const std::string& new_bar); + + REGISTER_RPC_FUNCTIONS(TwoFieldObject, P2P_TARGETS(get_foo, get_bar), ORDERED_TARGETS(update)); + DEFAULT_SERIALIZE(foo, bar); + DEFAULT_DESERIALIZE_NOALLOC(TwoFieldObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; + +void TwoFieldObject::update(const std::string& new_foo, const std::string& new_bar) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); *foo = new_foo; *bar = new_bar; + test_state->update_delivered(std::get<0>(version_and_timestamp)); } - DEFAULT_SERIALIZATION_SUPPORT(TwoFieldObject, foo, bar); - REGISTER_RPC_FUNCTIONS(TwoFieldObject, P2P_TARGETS(get_foo, get_bar), ORDERED_TARGETS(update)); -}; +std::unique_ptr TwoFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto foo_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*foo_ptr); + auto bar_ptr = mutils::from_bytes>(dsm, buffer + bytes_read); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*foo_ptr, *bar_ptr, test_state_ptr); +} -class UnsignedObject : public mutils::ByteRepresentable, public derecho::PersistsFields { +class UnsignedObject : public mutils::ByteRepresentable, +public derecho::GroupReference, +public derecho::PersistsFields { persistent::Persistent string_field; + TestState* test_state; public: - UnsignedObject(persistent::PersistentRegistry* registry) - : string_field(std::make_unique, "UnsignedObjectField", registry, false) {} - UnsignedObject(persistent::Persistent& other_field) - : string_field(std::move(other_field)) {} + /** Factory constructor */ + UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) + : string_field(std::make_unique, "UnsignedObjectField", registry, false), + test_state(test_state) { + assert(test_state); + } + /** Deserialization constructor */ + UnsignedObject(persistent::Persistent& other_field, TestState* test_state) + : string_field(std::move(other_field)), + test_state(test_state) { + assert(test_state); + } std::string get_state() const { return *string_field; } - void update_state(const std::string& new_value) { - *string_field = new_value; - } + void update_state(const std::string& new_value); - DEFAULT_SERIALIZATION_SUPPORT(UnsignedObject, string_field); REGISTER_RPC_FUNCTIONS(UnsignedObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); + DEFAULT_SERIALIZE(string_field); + DEFAULT_DESERIALIZE_NOALLOC(UnsignedObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); }; +void UnsignedObject::update_state(const std::string& new_value) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); + *string_field = new_value; + test_state->update_delivered(std::get<0>(version_and_timestamp)); +} + +std::unique_ptr UnsignedObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto field_ptr = mutils::from_bytes>(dsm, buffer); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*field_ptr, test_state_ptr); +} + + /** * Command-line arguments: * one_field_size: Maximum size of the subgroup that replicates the one-field signed object @@ -101,12 +246,6 @@ int main(int argc, char** argv) { const std::string characters("abcdefghijklmnopqrstuvwxyz"); std::mt19937 random_generator(getpid()); std::uniform_int_distribution char_distribution(0, characters.size() - 1); - std::mutex finish_mutex; - //One condition variable per subgroup to represent when they are finished verifying all updates - std::vector subgroup_finished_condition(2); - //The actual boolean for the condition, since wakeups can be spurious - std::vector subgroup_finished = {false, false}; - const int num_args = 4; if(argc < (num_args + 1) || (argc > (num_args + 1) && strcmp("--", argv[argc - (num_args + 1)]) != 0)) { std::cout << "Invalid command line arguments." << std::endl; @@ -135,54 +274,30 @@ int main(int argc, char** argv) { std::array subgroup_total_messages = {subgroup_1_size * num_updates, subgroup_2_size * num_updates, subgroup_unsigned_size * num_updates}; - std::array subgroup_last_version; - std::array, 3> last_version_ready = {false, false, false}; - auto stability_callback = [&subgroup_total_messages, - &subgroup_last_version, - &last_version_ready, - num_delivered = std::vector{0u, 0u, 0u}](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - num_delivered[subgroup]++; - if(num_delivered[subgroup] == subgroup_total_messages[subgroup]) { - subgroup_last_version[subgroup] = ver; - last_version_ready[subgroup] = true; - } - }; + + TestState test_state; + test_state.subgroup_updates_delivered = 0; + test_state.subgroup_finished = false; + test_state.last_version_ready = false; auto global_persist_callback = [&](derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); - //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" - //NOTE: This relies on UnsignedObject always being the third subgroup (with ID 2) - if(subgroup_id == 2 && last_version_ready[subgroup_id] && version == subgroup_last_version[subgroup_id]) { - { - std::unique_lock finish_lock(finish_mutex); - subgroup_finished[subgroup_id] = true; - } - subgroup_finished_condition[subgroup_id].notify_all(); - } + test_state.global_persistence_callback(subgroup_id, version); }; auto global_verified_callback = [&](derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - dbg_default_info("Verified: Subgroup {}, version {}.", subgroup_id, version); - dbg_default_flush(); - if(last_version_ready[subgroup_id] && version == subgroup_last_version[subgroup_id]) { - { - std::unique_lock finish_lock(finish_mutex); - subgroup_finished[subgroup_id] = true; - } - subgroup_finished_condition[subgroup_id].notify_all(); - } + test_state.global_verified_callback(subgroup_id, version); }; auto new_view_callback = [](const derecho::View& view) { dbg_default_info("Now on View {}", view.vid); }; + // Pass test_state to the Group constructor as a DeserializationContext derecho::Group group( - {stability_callback, nullptr, global_persist_callback, global_verified_callback}, - subgroup_info, {}, {new_view_callback}, - [](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr); }, - [](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr); }, - [](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr); }); + {nullptr, nullptr, global_persist_callback, global_verified_callback}, + subgroup_info, {&test_state}, {new_view_callback}, + [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { + return std::make_unique(pr, &test_state); }, + [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { + return std::make_unique(pr, &test_state); }, + [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { + return std::make_unique(pr, &test_state); }); //Figure out which subgroup this node got assigned to int32_t my_shard_subgroup_1 = group.get_my_shard(); int32_t my_shard_subgroup_2 = group.get_my_shard(); @@ -190,6 +305,8 @@ int main(int argc, char** argv) { if(my_shard_subgroup_1 != -1) { std::cout << "In the OneFieldObject subgroup" << std::endl; derecho::Replicated& object_handle = group.get_subgroup(); + test_state.subgroup_total_updates = subgroup_total_messages[0]; + test_state.my_subgroup_id = object_handle.get_subgroup_id(); //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_string('a', 32); @@ -200,6 +317,8 @@ int main(int argc, char** argv) { } else if(my_shard_subgroup_2 != -1) { std::cout << "In the TwoFieldObject subgroup" << std::endl; derecho::Replicated& object_handle = group.get_subgroup(); + test_state.subgroup_total_updates = subgroup_total_messages[1]; + test_state.my_subgroup_id = object_handle.get_subgroup_id(); //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_foo('a', 32); @@ -213,6 +332,8 @@ int main(int argc, char** argv) { } else if(my_shard_unsigned_subgroup != -1) { std::cout << "In the UnsignedObject subgroup" << std::endl; derecho::Replicated& object_handle = group.get_subgroup(); + test_state.subgroup_total_updates = subgroup_total_messages[2]; + test_state.my_subgroup_id = object_handle.get_subgroup_id(); //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_string('a', 32); @@ -222,18 +343,10 @@ int main(int argc, char** argv) { } } //Wait for all updates to finish being verified, using the condition variables updated by the callback - if(my_shard_subgroup_1 != -1) { - std::cout << "Waiting for final version to be verified" << std::endl; - std::unique_lock lock(finish_mutex); - subgroup_finished_condition[0].wait(lock, [&]() { return subgroup_finished[0]; }); - } else if(my_shard_subgroup_2 != -1) { - std::cout << "Waiting for final version to be verified" << std::endl; - std::unique_lock lock(finish_mutex); - subgroup_finished_condition[1].wait(lock, [&]() { return subgroup_finished[1]; }); - } else if(my_shard_unsigned_subgroup != -1) { - std::cout << "Waiting for final version to be persisted" << std::endl; - std::unique_lock lock(finish_mutex); - subgroup_finished_condition[2].wait(lock, [&]() { return subgroup_finished[2]; }); + std::cout << "Waiting for final version to be verified" << std::endl; + { + std::unique_lock lock(test_state.finish_mutex); + test_state.subgroup_finished_condition.wait(lock, [&]() { return test_state.subgroup_finished; }); } std::cout << "Done" << std::endl; group.barrier_sync(); From 5dbf73f8c44493cdfbc0db7348ba66568904447a Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 5 Oct 2023 16:37:30 -0400 Subject: [PATCH 09/12] Moved counters into replicated state in signed_log_test The TestState object mostly works, except that each node starts its copy of subgroup_updates_delivered at 0, even if some updates have already been sent in the group by the time the node joins. This can happen because the subgroup allocator is configured with flexible subgroups, so the Group will get an adequate configuration when the minimum number of nodes joins, and those nodes will start sending updates even if more are about to join. Other tests don't have this problem because they use fixed subgroup sizes, so no updates are sent until all the nodes join, but this test needs to include the possibility of View changes so it can make sure the signature chains work even when the version number jumps due to a View change. I really hope adding a non-persistent replicated variable to each of the test objects doesn't mess anything up, but it's the only way to allow late-joining nodes to initialize their counter to the correct value. Otherwise those nodes wait forever for the "last update" (even though they have received it). --- .../tests/unit_tests/signed_log_test.cpp | 113 +++++++++++------- src/core/git_version.cpp | 4 +- 2 files changed, 73 insertions(+), 44 deletions(-) diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index 2d4eae24..02969335 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -29,9 +29,9 @@ struct TestState : public derecho::DeserializationContext { derecho::subgroup_id_t my_subgroup_id; // Set by the main thread after it figures out which subgroup this node was assigned to uint32_t subgroup_total_updates; - // Counts the number of updates delivered within this node's subgroup - uint32_t subgroup_updates_delivered; + // Set by each replicated object when the last update is delivered and its version is known persistent::version_t last_version; + // Used to alert other threads (i.e. global callbacks) that last_version has been set std::atomic last_version_ready; // Mutex for subgroup_finished std::mutex finish_mutex; @@ -40,15 +40,16 @@ struct TestState : public derecho::DeserializationContext { // Boolean to set to true when signaling the condition variable bool subgroup_finished; // Called from replicated object update_state methods to notify the main thread that an update was delivered - void update_delivered(persistent::version_t version) { - subgroup_updates_delivered++; - if(subgroup_updates_delivered == subgroup_total_updates) { + void notify_update_delivered(uint64_t update_counter, persistent::version_t version) { + dbg_default_debug("Update {}/{} delivered", update_counter, subgroup_total_updates); + if(update_counter == subgroup_total_updates) { + dbg_default_info("Final update (#{}) delivered, version is {}", update_counter, version); last_version = version; last_version_ready = true; } } // Called by Derecho's global persistence callback - void global_persistence_callback(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + void notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" //NOTE: This relies on UnsignedObject always being the third subgroup (with ID 2) @@ -61,7 +62,7 @@ struct TestState : public derecho::DeserializationContext { } } // Called by Derecho's global verified callback - void global_verified_callback(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + void notify_global_verified(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { dbg_default_info("Verified: Subgroup {}, version {}.", subgroup_id, version); dbg_default_flush(); // Each node should only be placed in one subgroup, so this callback should not be invoked for any other subgroup IDs @@ -80,6 +81,11 @@ class OneFieldObject : public mutils::ByteRepresentable, public derecho::GroupReference, public derecho::SignedPersistentFields { persistent::Persistent string_field; + // Counts the number of updates delivered within this subgroup. + // Not persisted, but needs to be replicated so that all replicas have a consistent count of + // the total number of messages, even across view changes + uint64_t updates_delivered; + // Pointer to an object held by the main thread, set from DeserializationManager TestState* test_state; public: @@ -87,13 +93,16 @@ class OneFieldObject : public mutils::ByteRepresentable, OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) : string_field(std::make_unique, "OneFieldObjectStringField", registry, true), + updates_delivered(0), test_state(test_state) { assert(test_state); } /** Deserialization constructor */ OneFieldObject(persistent::Persistent& other_value, + uint64_t other_updates_delivered, TestState* test_state) : string_field(std::move(other_value)), + updates_delivered(other_updates_delivered), test_state(test_state) { assert(test_state); } @@ -106,26 +115,29 @@ class OneFieldObject : public mutils::ByteRepresentable, REGISTER_RPC_FUNCTIONS(OneFieldObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); - DEFAULT_SERIALIZE(string_field); + DEFAULT_SERIALIZE(string_field, updates_delivered); DEFAULT_DESERIALIZE_NOALLOC(OneFieldObject); static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); }; // Can't be declared inline because it uses get_subgroup void OneFieldObject::update_state(const std::string& new_value) { - auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); - auto version_and_timestamp = this_subgroup_reference.get_current_version(); - *string_field = new_value; - test_state->update_delivered(std::get<0>(version_and_timestamp)); - } + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); + ++updates_delivered; + *string_field = new_value; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); +} // Custom deserializer that retrieves the TestState pointer from the DeserializationManager std::unique_ptr OneFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { auto field_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*field_ptr); + auto counter_ptr = mutils::from_bytes(dsm, buffer + bytes_read); assert(dsm); assert(dsm->registered()); TestState* test_state_ptr = &(dsm->mgr()); - return std::make_unique(*field_ptr, test_state_ptr); + return std::make_unique(*field_ptr, *counter_ptr, test_state_ptr); } class TwoFieldObject : public mutils::ByteRepresentable, @@ -133,6 +145,7 @@ class TwoFieldObject : public mutils::ByteRepresentable, public derecho::SignedPersistentFields { persistent::Persistent foo; persistent::Persistent bar; + uint64_t updates_delivered; TestState* test_state; public: @@ -140,15 +153,18 @@ class TwoFieldObject : public mutils::ByteRepresentable, TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true), + updates_delivered(0), test_state(test_state) { assert(test_state); } /** Deserialization constructor */ TwoFieldObject(persistent::Persistent& other_foo, persistent::Persistent& other_bar, + uint64_t other_updates_delivered, TestState* test_state) : foo(std::move(other_foo)), bar(std::move(other_bar)), + updates_delivered(other_updates_delivered), test_state(test_state) { assert(test_state); } @@ -164,48 +180,56 @@ class TwoFieldObject : public mutils::ByteRepresentable, void update(const std::string& new_foo, const std::string& new_bar); REGISTER_RPC_FUNCTIONS(TwoFieldObject, P2P_TARGETS(get_foo, get_bar), ORDERED_TARGETS(update)); - DEFAULT_SERIALIZE(foo, bar); + DEFAULT_SERIALIZE(foo, bar, updates_delivered); DEFAULT_DESERIALIZE_NOALLOC(TwoFieldObject); static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); }; void TwoFieldObject::update(const std::string& new_foo, const std::string& new_bar) { - auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); - auto version_and_timestamp = this_subgroup_reference.get_current_version(); - *foo = new_foo; - *bar = new_bar; - test_state->update_delivered(std::get<0>(version_and_timestamp)); - } + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_timestamp = this_subgroup_reference.get_current_version(); + ++updates_delivered; + *foo = new_foo; + *bar = new_bar; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); +} std::unique_ptr TwoFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { auto foo_ptr = mutils::from_bytes>(dsm, buffer); std::size_t bytes_read = mutils::bytes_size(*foo_ptr); auto bar_ptr = mutils::from_bytes>(dsm, buffer + bytes_read); + bytes_read += mutils::bytes_size(*bar_ptr); + auto update_counter_ptr = mutils::from_bytes(dsm, buffer + bytes_read); assert(dsm); assert(dsm->registered()); TestState* test_state_ptr = &(dsm->mgr()); - return std::make_unique(*foo_ptr, *bar_ptr, test_state_ptr); + return std::make_unique(*foo_ptr, *bar_ptr, *update_counter_ptr, test_state_ptr); } class UnsignedObject : public mutils::ByteRepresentable, -public derecho::GroupReference, -public derecho::PersistsFields { + public derecho::GroupReference, + public derecho::PersistsFields { persistent::Persistent string_field; + uint64_t updates_delivered; TestState* test_state; public: /** Factory constructor */ UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) : string_field(std::make_unique, "UnsignedObjectField", registry, false), - test_state(test_state) { - assert(test_state); - } + updates_delivered(0), + test_state(test_state) { + assert(test_state); + } /** Deserialization constructor */ - UnsignedObject(persistent::Persistent& other_field, TestState* test_state) + UnsignedObject(persistent::Persistent& other_field, + uint64_t other_updates_delivered, + TestState* test_state) : string_field(std::move(other_field)), - test_state(test_state) { - assert(test_state); - } + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); + } std::string get_state() const { return *string_field; } @@ -213,7 +237,7 @@ public derecho::PersistsFields { void update_state(const std::string& new_value); REGISTER_RPC_FUNCTIONS(UnsignedObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); - DEFAULT_SERIALIZE(string_field); + DEFAULT_SERIALIZE(string_field, updates_delivered); DEFAULT_DESERIALIZE_NOALLOC(UnsignedObject); static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); }; @@ -221,19 +245,21 @@ public derecho::PersistsFields { void UnsignedObject::update_state(const std::string& new_value) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); auto version_and_timestamp = this_subgroup_reference.get_current_version(); + ++updates_delivered; *string_field = new_value; - test_state->update_delivered(std::get<0>(version_and_timestamp)); + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); } std::unique_ptr UnsignedObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { auto field_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*field_ptr); + auto counter_ptr = mutils::from_bytes(dsm, buffer + bytes_read); assert(dsm); assert(dsm->registered()); TestState* test_state_ptr = &(dsm->mgr()); - return std::make_unique(*field_ptr, test_state_ptr); + return std::make_unique(*field_ptr, *counter_ptr, test_state_ptr); } - /** * Command-line arguments: * one_field_size: Maximum size of the subgroup that replicates the one-field signed object @@ -270,20 +296,20 @@ int main(int argc, char** argv) { derecho::one_subgroup_policy(derecho::flexible_even_shards( 1, 1, subgroup_unsigned_size))}})); - //Count the total number of messages delivered in each subgroup to figure out what version is assigned to the last one + //Count the total number of messages to be delivered in each subgroup, so we can + //identify when the last message has been delivered and discover what version it got std::array subgroup_total_messages = {subgroup_1_size * num_updates, subgroup_2_size * num_updates, subgroup_unsigned_size * num_updates}; TestState test_state; - test_state.subgroup_updates_delivered = 0; test_state.subgroup_finished = false; test_state.last_version_ready = false; auto global_persist_callback = [&](derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - test_state.global_persistence_callback(subgroup_id, version); + test_state.notify_global_persistence(subgroup_id, version); }; auto global_verified_callback = [&](derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - test_state.global_verified_callback(subgroup_id, version); + test_state.notify_global_verified(subgroup_id, version); }; auto new_view_callback = [](const derecho::View& view) { dbg_default_info("Now on View {}", view.vid); @@ -293,11 +319,14 @@ int main(int argc, char** argv) { {nullptr, nullptr, global_persist_callback, global_verified_callback}, subgroup_info, {&test_state}, {new_view_callback}, [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { - return std::make_unique(pr, &test_state); }, + return std::make_unique(pr, &test_state); + }, [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { - return std::make_unique(pr, &test_state); }, + return std::make_unique(pr, &test_state); + }, [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { - return std::make_unique(pr, &test_state); }); + return std::make_unique(pr, &test_state); + }); //Figure out which subgroup this node got assigned to int32_t my_shard_subgroup_1 = group.get_my_shard(); int32_t my_shard_subgroup_2 = group.get_my_shard(); diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index c40a8a22..50e5a24b 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 163; +const int COMMITS_AHEAD_OF_VERSION = 165; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+163"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+165"; } From 17257388138fc1fb270baebe62039904f8759c47 Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 5 Oct 2023 16:59:58 -0400 Subject: [PATCH 10/12] Split up signed_log_test, fixed persistent_bw_test The signed_log_test.cpp file was getting too long and unwieldy, and most of the methods couldn't be written inline in the class definitions anyway, so I split it into a header and source file. Now that I know how to use a shared test state object to get the persistent version of the last message in the experiment, I can apply the same setup to persistent_bw_test, which also relied on the old global stability callback. This test is much simpler, so I'll try a TestState object with no methods, and leave the state-updating logic in the RPC method or persistence callback. --- .../performance_tests/persistent_bw_test.cpp | 134 ++++++----- .../tests/unit_tests/signed_log_test.cpp | 212 +++--------------- .../tests/unit_tests/signed_log_test.hpp | 169 ++++++++++++++ 3 files changed, 288 insertions(+), 227 deletions(-) create mode 100644 src/applications/tests/unit_tests/signed_log_test.hpp diff --git a/src/applications/tests/performance_tests/persistent_bw_test.cpp b/src/applications/tests/performance_tests/persistent_bw_test.cpp index 52ff3acb..6f8ba180 100644 --- a/src/applications/tests/performance_tests/persistent_bw_test.cpp +++ b/src/applications/tests/performance_tests/persistent_bw_test.cpp @@ -18,24 +18,74 @@ using std::endl; using namespace persistent; using namespace std::chrono; -class ByteArrayObject : public mutils::ByteRepresentable, public derecho::PersistsFields { +/** + * State shared between the replicated test objects and the main thread. The replicated + * objects get a pointer to this object using DeserializationManager. + */ +struct TestState : public derecho::DeserializationContext { + uint64_t total_num_messages; + // Used to signal the main thread that the experiment is done + std::atomic experiment_done; + // Set by ByteArrayObject when the last RPC message is delivered and its version is known + persistent::version_t last_version; + // Used to alert the global persistence callback that last_version is ready + std::atomic last_version_set; + // The time the last RPC message is delivered to ByteArrayObject + steady_clock::time_point send_complete_time; + // Set by the global persistence callback when version last_version is persisted + steady_clock::time_point persist_complete_time; +}; + +// Note: Inheriting GroupReference is necessary so the RPC method can get the object's current version +class ByteArrayObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::PersistsFields { public: Persistent pers_bytes; + uint64_t messages_received; + TestState* test_state; - void change_pers_bytes(const test::Bytes& bytes) { - *pers_bytes = bytes; - } + void change_pers_bytes(const test::Bytes& bytes); // deserialization constructor - ByteArrayObject(Persistent& _p_bytes) : pers_bytes(std::move(_p_bytes)) {} + ByteArrayObject(Persistent& _p_bytes, uint64_t messages_received, TestState* test_state) + : pers_bytes(std::move(_p_bytes)), messages_received(messages_received), test_state(test_state) {} // default constructor - ByteArrayObject(PersistentRegistry* pr) - : pers_bytes(pr) {} + ByteArrayObject(PersistentRegistry* pr, TestState* test_state) + : pers_bytes(pr), messages_received(0), test_state(test_state) {} REGISTER_RPC_FUNCTIONS(ByteArrayObject, ORDERED_TARGETS(change_pers_bytes)); - DEFAULT_SERIALIZATION_SUPPORT(ByteArrayObject, pers_bytes); + DEFAULT_SERIALIZE(pers_bytes, messages_received); + DEFAULT_DESERIALIZE_NOALLOC(ByteArrayObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer); }; +void ByteArrayObject::change_pers_bytes(const test::Bytes& bytes) { + *pers_bytes = bytes; + // This logic used to be in the global stability callback, but now it needs to be here + // because global stability callbacks were disabled for RPC messages. It also now requires + // a call to get_subgroup().get_current_version(), which means we can't write the method inline. + auto ver = this->group->template get_subgroup(this->subgroup_index).get_current_version(); + ++messages_received; + if(messages_received == test_state->total_num_messages) { + test_state->send_complete_time = std::chrono::steady_clock::now(); + test_state->last_version = std::get<0>(ver); + test_state->last_version_set = true; + } +} + +std::unique_ptr ByteArrayObject::from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer) { + // Default serialization will serialize each named field in order, so deserialize in the same order + auto pers_bytes_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*pers_bytes_ptr); + auto messages_received_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + // Get the TestState pointer from the DeserializationManager + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*pers_bytes_ptr, *messages_received_ptr, test_state_ptr); +} + struct persistent_bw_result { int num_nodes; int num_senders_selector; @@ -90,63 +140,43 @@ int main(int argc, char* argv[]) { derecho::Conf::initialize(argc, argv); - steady_clock::time_point begin_time, send_complete_time, persist_complete_time; + TestState shared_test_state; + shared_test_state.experiment_done = false; + shared_test_state.last_version_set = false; + steady_clock::time_point begin_time; bool is_sending = true; - long total_num_messages; switch(sender_selector) { case PartialSendMode::ALL_SENDERS: - total_num_messages = num_of_nodes * num_msgs; + shared_test_state.total_num_messages = num_of_nodes * num_msgs; break; case PartialSendMode::HALF_SENDERS: - total_num_messages = (num_of_nodes / 2) * num_msgs; + shared_test_state.total_num_messages = (num_of_nodes / 2) * num_msgs; break; case PartialSendMode::ONE_SENDER: - total_num_messages = num_msgs; + shared_test_state.total_num_messages = num_msgs; break; } - // variable 'done' tracks the end of the test - volatile bool done = false; - - // last_version and its flag is shared between the stability callback and persistence callback. - // This is a clumsy hack to figure out what version number is assigned to the last delivered message. - persistent::version_t last_version; - std::atomic last_version_set = false; - - auto stability_callback = [&last_version, - &last_version_set, - &send_complete_time, - total_num_messages, - num_delivered = 0u](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - //Count the total number of messages delivered - ++num_delivered; - if(num_delivered == total_num_messages) { - send_complete_time = std::chrono::steady_clock::now(); - last_version = ver; - last_version_set = true; - } - }; auto persistence_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) { - if(last_version_set && ver == last_version) { - persist_complete_time = std::chrono::steady_clock::now(); - done = true; + if(shared_test_state.last_version_set && ver == shared_test_state.last_version) { + shared_test_state.persist_complete_time = std::chrono::steady_clock::now(); + shared_test_state.experiment_done = true; } }; derecho::UserMessageCallbacks callback_set{ - stability_callback, + nullptr, nullptr, persistence_callback}; derecho::SubgroupInfo subgroup_info(PartialSendersAllocator(num_of_nodes, sender_selector)); - auto ba_factory = [](PersistentRegistry* pr, derecho::subgroup_id_t) { return std::make_unique(pr); }; + auto ba_factory = [&shared_test_state](PersistentRegistry* pr, derecho::subgroup_id_t) { + return std::make_unique(pr, &shared_test_state); + }; - derecho::Group group{callback_set, subgroup_info, {}, std::vector{}, ba_factory}; + derecho::Group group(callback_set, subgroup_info, {&shared_test_state}, + std::vector{}, ba_factory); std::cout << "Finished constructing/joining Group" << std::endl; @@ -176,24 +206,26 @@ int main(int argc, char* argv[]) { #endif //_PERFORMANCE_DEBUG } - while(!done) { + while(!shared_test_state.experiment_done) { } - int64_t send_nanosec = duration_cast(send_complete_time - begin_time).count(); + delete[] bbuf; + + int64_t send_nanosec = duration_cast(shared_test_state.send_complete_time - begin_time).count(); double send_millisec = static_cast(send_nanosec) / 1000000; - int64_t persist_nanosec = duration_cast(persist_complete_time - begin_time).count(); + int64_t persist_nanosec = duration_cast(shared_test_state.persist_complete_time - begin_time).count(); double persist_millisec = static_cast(persist_nanosec) / 1000000; //Calculate bandwidth //Bytes / nanosecond just happens to be equivalent to GigaBytes / second (in "decimal" GB) //Note that total_num_messages already incorporates multiplying by the number of senders - double send_thp_gbps = (static_cast(total_num_messages) * msg_size) / send_nanosec; - double send_thp_ops = (static_cast(total_num_messages) * 1000000000) / send_nanosec; + double send_thp_gbps = (static_cast(shared_test_state.total_num_messages) * msg_size) / send_nanosec; + double send_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / send_nanosec; std::cout << "(send)timespan: " << send_millisec << " milliseconds." << std::endl; std::cout << "(send)throughput: " << send_thp_gbps << "GB/s." << std::endl; std::cout << "(send)throughput: " << send_thp_ops << "ops." << std::endl; - double thp_gbps = (static_cast(total_num_messages) * msg_size) / persist_nanosec; - double thp_ops = (static_cast(total_num_messages) * 1000000000) / persist_nanosec; + double thp_gbps = (static_cast(shared_test_state.total_num_messages) * msg_size) / persist_nanosec; + double thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / persist_nanosec; std::cout << "(pers)timespan: " << persist_millisec << " millisecond." << std::endl; std::cout << "(pers)throughput: " << thp_gbps << "GB/s." << std::endl; std::cout << "(pers)throughput: " << thp_ops << "ops." << std::endl; diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index 02969335..adbab770 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -17,110 +17,47 @@ #include -/** - * This object contains state that is shared between the replicated test objects - * and the main thread, rather than stored inside the replicated objects. It's - * used to provide a way for the replicated objects to "call back" to the main - * thread. Each replicated object will get a pointer to this object when it is - * constructed or deserialized, set up by the deserialization manager. - */ -struct TestState : public derecho::DeserializationContext { - // Set by the main thread after it figures out which subgroup this node was assigned to - derecho::subgroup_id_t my_subgroup_id; - // Set by the main thread after it figures out which subgroup this node was assigned to - uint32_t subgroup_total_updates; - // Set by each replicated object when the last update is delivered and its version is known - persistent::version_t last_version; - // Used to alert other threads (i.e. global callbacks) that last_version has been set - std::atomic last_version_ready; - // Mutex for subgroup_finished - std::mutex finish_mutex; - // Condition variable used to indicate when this node's subgroup has finished persisting/verifying all updates - std::condition_variable subgroup_finished_condition; - // Boolean to set to true when signaling the condition variable - bool subgroup_finished; - // Called from replicated object update_state methods to notify the main thread that an update was delivered - void notify_update_delivered(uint64_t update_counter, persistent::version_t version) { - dbg_default_debug("Update {}/{} delivered", update_counter, subgroup_total_updates); - if(update_counter == subgroup_total_updates) { - dbg_default_info("Final update (#{}) delivered, version is {}", update_counter, version); - last_version = version; - last_version_ready = true; - } - } - // Called by Derecho's global persistence callback - void notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); - //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" - //NOTE: This relies on UnsignedObject always being the third subgroup (with ID 2) - if(subgroup_id == 2 && last_version_ready && version == last_version) { - { - std::unique_lock finish_lock(finish_mutex); - subgroup_finished = true; - } - subgroup_finished_condition.notify_all(); - } - } - // Called by Derecho's global verified callback - void notify_global_verified(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { - dbg_default_info("Verified: Subgroup {}, version {}.", subgroup_id, version); - dbg_default_flush(); - // Each node should only be placed in one subgroup, so this callback should not be invoked for any other subgroup IDs - assert(subgroup_id == my_subgroup_id); - if(last_version_ready && version == last_version) { - { - std::unique_lock finish_lock(finish_mutex); - subgroup_finished = true; - } - subgroup_finished_condition.notify_all(); - } - } -}; +#include "signed_log_test.hpp" -class OneFieldObject : public mutils::ByteRepresentable, - public derecho::GroupReference, - public derecho::SignedPersistentFields { - persistent::Persistent string_field; - // Counts the number of updates delivered within this subgroup. - // Not persisted, but needs to be replicated so that all replicas have a consistent count of - // the total number of messages, even across view changes - uint64_t updates_delivered; - // Pointer to an object held by the main thread, set from DeserializationManager - TestState* test_state; +/* --- TestState implementation --- */ -public: - /** Factory constructor */ - OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) - : string_field(std::make_unique, - "OneFieldObjectStringField", registry, true), - updates_delivered(0), - test_state(test_state) { - assert(test_state); - } - /** Deserialization constructor */ - OneFieldObject(persistent::Persistent& other_value, - uint64_t other_updates_delivered, - TestState* test_state) - : string_field(std::move(other_value)), - updates_delivered(other_updates_delivered), - test_state(test_state) { - assert(test_state); +void TestState::notify_update_delivered(uint64_t update_counter, persistent::version_t version) { + dbg_default_debug("Update {}/{} delivered", update_counter, subgroup_total_updates); + if(update_counter == subgroup_total_updates) { + dbg_default_info("Final update (#{}) delivered, version is {}", update_counter, version); + last_version = version; + last_version_ready = true; } +} - std::string get_state() const { - return *string_field; +void TestState::notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); + //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" + //NOTE: This relies on UnsignedObject always being the third subgroup (with ID 2) + if(subgroup_id == 2 && last_version_ready && version == last_version) { + { + std::unique_lock finish_lock(finish_mutex); + subgroup_finished = true; + } + subgroup_finished_condition.notify_all(); } +} +void TestState::notify_global_verified(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { + dbg_default_info("Verified: Subgroup {}, version {}.", subgroup_id, version); + dbg_default_flush(); + // Each node should only be placed in one subgroup, so this callback should not be invoked for any other subgroup IDs + assert(subgroup_id == my_subgroup_id); + if(last_version_ready && version == last_version) { + { + std::unique_lock finish_lock(finish_mutex); + subgroup_finished = true; + } + subgroup_finished_condition.notify_all(); + } +} - void update_state(const std::string& new_value); - - REGISTER_RPC_FUNCTIONS(OneFieldObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); - - DEFAULT_SERIALIZE(string_field, updates_delivered); - DEFAULT_DESERIALIZE_NOALLOC(OneFieldObject); - static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); -}; +/* --- OneFieldObject implementation --- */ -// Can't be declared inline because it uses get_subgroup void OneFieldObject::update_state(const std::string& new_value) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); auto version_and_timestamp = this_subgroup_reference.get_current_version(); @@ -140,50 +77,7 @@ std::unique_ptr OneFieldObject::from_bytes(mutils::Deserializati return std::make_unique(*field_ptr, *counter_ptr, test_state_ptr); } -class TwoFieldObject : public mutils::ByteRepresentable, - public derecho::GroupReference, - public derecho::SignedPersistentFields { - persistent::Persistent foo; - persistent::Persistent bar; - uint64_t updates_delivered; - TestState* test_state; - -public: - /** Factory constructor */ - TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) - : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), - bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true), - updates_delivered(0), - test_state(test_state) { - assert(test_state); - } - /** Deserialization constructor */ - TwoFieldObject(persistent::Persistent& other_foo, - persistent::Persistent& other_bar, - uint64_t other_updates_delivered, - TestState* test_state) - : foo(std::move(other_foo)), - bar(std::move(other_bar)), - updates_delivered(other_updates_delivered), - test_state(test_state) { - assert(test_state); - } - - std::string get_foo() const { - return *foo; - } - - std::string get_bar() const { - return *bar; - } - - void update(const std::string& new_foo, const std::string& new_bar); - - REGISTER_RPC_FUNCTIONS(TwoFieldObject, P2P_TARGETS(get_foo, get_bar), ORDERED_TARGETS(update)); - DEFAULT_SERIALIZE(foo, bar, updates_delivered); - DEFAULT_DESERIALIZE_NOALLOC(TwoFieldObject); - static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); -}; +/* --- TwoFieldObject implementation --- */ void TwoFieldObject::update(const std::string& new_foo, const std::string& new_bar) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); @@ -206,41 +100,7 @@ std::unique_ptr TwoFieldObject::from_bytes(mutils::Deserializati return std::make_unique(*foo_ptr, *bar_ptr, *update_counter_ptr, test_state_ptr); } -class UnsignedObject : public mutils::ByteRepresentable, - public derecho::GroupReference, - public derecho::PersistsFields { - persistent::Persistent string_field; - uint64_t updates_delivered; - TestState* test_state; - -public: - /** Factory constructor */ - UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) - : string_field(std::make_unique, "UnsignedObjectField", registry, false), - updates_delivered(0), - test_state(test_state) { - assert(test_state); - } - /** Deserialization constructor */ - UnsignedObject(persistent::Persistent& other_field, - uint64_t other_updates_delivered, - TestState* test_state) - : string_field(std::move(other_field)), - updates_delivered(other_updates_delivered), - test_state(test_state) { - assert(test_state); - } - std::string get_state() const { - return *string_field; - } - - void update_state(const std::string& new_value); - - REGISTER_RPC_FUNCTIONS(UnsignedObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); - DEFAULT_SERIALIZE(string_field, updates_delivered); - DEFAULT_DESERIALIZE_NOALLOC(UnsignedObject); - static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); -}; +/* --- UnsignedObject implementation --- */ void UnsignedObject::update_state(const std::string& new_value) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); diff --git a/src/applications/tests/unit_tests/signed_log_test.hpp b/src/applications/tests/unit_tests/signed_log_test.hpp new file mode 100644 index 00000000..fb8f5b51 --- /dev/null +++ b/src/applications/tests/unit_tests/signed_log_test.hpp @@ -0,0 +1,169 @@ +#include +#include +#include +#include + +#include + +/** + * This object contains state that is shared between the replicated test objects + * and the main thread, rather than stored inside the replicated objects. It's + * used to provide a way for the replicated objects to "call back" to the main + * thread. Each replicated object will get a pointer to this object when it is + * constructed or deserialized, set up by the deserialization manager. + */ +struct TestState : public derecho::DeserializationContext { + // Set by the main thread after it figures out which subgroup this node was assigned to + derecho::subgroup_id_t my_subgroup_id; + // Set by the main thread after it figures out which subgroup this node was assigned to + uint32_t subgroup_total_updates; + // Set by each replicated object when the last update is delivered and its version is known + persistent::version_t last_version; + // Used to alert other threads (i.e. global callbacks) that last_version has been set + std::atomic last_version_ready; + // Mutex for subgroup_finished + std::mutex finish_mutex; + // Condition variable used to indicate when this node's subgroup has finished persisting/verifying all updates + std::condition_variable subgroup_finished_condition; + // Boolean to set to true when signaling the condition variable + bool subgroup_finished; + // Called from replicated object update_state methods to notify the main thread that an update was delivered + void notify_update_delivered(uint64_t update_counter, persistent::version_t version); + // Called by Derecho's global persistence callback + void notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version); + // Called by Derecho's global verified callback + void notify_global_verified(derecho::subgroup_id_t subgroup_id, persistent::version_t version); +}; + +/** + * Test object with one signed persistent field + */ +class OneFieldObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { + persistent::Persistent string_field; + // Counts the number of updates delivered within this subgroup. + // Not persisted, but needs to be replicated so that all replicas have a consistent count of + // the total number of messages, even across view changes + uint64_t updates_delivered; + // Pointer to an object held by the main thread, set from DeserializationManager + TestState* test_state; + +public: + /** Factory constructor */ + OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) + : string_field(std::make_unique, + "OneFieldObjectStringField", registry, true), + updates_delivered(0), + test_state(test_state) { + assert(test_state); + } + /** Deserialization constructor */ + OneFieldObject(persistent::Persistent& other_value, + uint64_t other_updates_delivered, + TestState* test_state) + : string_field(std::move(other_value)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); + } + + std::string get_state() const { + return *string_field; + } + + void update_state(const std::string& new_value); + + REGISTER_RPC_FUNCTIONS(OneFieldObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); + + DEFAULT_SERIALIZE(string_field, updates_delivered); + DEFAULT_DESERIALIZE_NOALLOC(OneFieldObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; + +/** + * Test object with two signed persistent fields + */ +class TwoFieldObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { + persistent::Persistent foo; + persistent::Persistent bar; + uint64_t updates_delivered; + TestState* test_state; + +public: + /** Factory constructor */ + TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) + : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), + bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true), + updates_delivered(0), + test_state(test_state) { + assert(test_state); + } + /** Deserialization constructor */ + TwoFieldObject(persistent::Persistent& other_foo, + persistent::Persistent& other_bar, + uint64_t other_updates_delivered, + TestState* test_state) + : foo(std::move(other_foo)), + bar(std::move(other_bar)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); + } + + std::string get_foo() const { + return *foo; + } + + std::string get_bar() const { + return *bar; + } + + void update(const std::string& new_foo, const std::string& new_bar); + + REGISTER_RPC_FUNCTIONS(TwoFieldObject, P2P_TARGETS(get_foo, get_bar), ORDERED_TARGETS(update)); + DEFAULT_SERIALIZE(foo, bar, updates_delivered); + DEFAULT_DESERIALIZE_NOALLOC(TwoFieldObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; + +/** + * Test object with one un-signed persistent field + */ +class UnsignedObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::PersistsFields { + persistent::Persistent string_field; + uint64_t updates_delivered; + TestState* test_state; + +public: + /** Factory constructor */ + UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) + : string_field(std::make_unique, "UnsignedObjectField", registry, false), + updates_delivered(0), + test_state(test_state) { + assert(test_state); + } + /** Deserialization constructor */ + UnsignedObject(persistent::Persistent& other_field, + uint64_t other_updates_delivered, + TestState* test_state) + : string_field(std::move(other_field)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); + } + std::string get_state() const { + return *string_field; + } + + void update_state(const std::string& new_value); + + REGISTER_RPC_FUNCTIONS(UnsignedObject, P2P_TARGETS(get_state), ORDERED_TARGETS(update_state)); + DEFAULT_SERIALIZE(string_field, updates_delivered); + DEFAULT_DESERIALIZE_NOALLOC(UnsignedObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; \ No newline at end of file From 6c8b2cc91a592d5f2ea62681c71be6d22a8127cb Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 5 Oct 2023 17:40:17 -0400 Subject: [PATCH 11/12] Further cleanup of signed_log_test; fixed signed_bw_test Constructor bodies should go in the cpp file too. We can use a simple flag in the TestState object to determine whether notify_global_persistence needs to signal the subgroup is finished, instead of hard-coding subgroup ID 2 = UnsignedObject. The revisions to persistent_bw_test can be easily copied to signed_bw_test, which is almost the same except for using a signed persistent field instead of a regular persistent field. --- .../performance_tests/signed_bw_test.cpp | 149 +++++++++++------- .../tests/unit_tests/signed_log_test.cpp | 58 ++++++- .../tests/unit_tests/signed_log_test.hpp | 49 +----- src/core/git_version.cpp | 4 +- 4 files changed, 158 insertions(+), 102 deletions(-) diff --git a/src/applications/tests/performance_tests/signed_bw_test.cpp b/src/applications/tests/performance_tests/signed_bw_test.cpp index 52d19fc8..bba222b5 100644 --- a/src/applications/tests/performance_tests/signed_bw_test.cpp +++ b/src/applications/tests/performance_tests/signed_bw_test.cpp @@ -15,27 +15,83 @@ using std::cout; using std::endl; +using std::chrono::steady_clock; using namespace persistent; -class ByteArrayObject : public mutils::ByteRepresentable, public derecho::SignedPersistentFields { +/** + * State shared between the replicated test objects and the main thread. The replicated + * objects get a pointer to this object using DeserializationManager. + */ +struct TestState : public derecho::DeserializationContext { + uint64_t total_num_messages; + // Used to signal the main thread that the experiment is done + std::atomic experiment_done; + // Set by ByteArrayObject when the last RPC message is delivered and its version is known + persistent::version_t last_version; + // Used to alert the global persistence callback that last_version is ready + std::atomic last_version_set; + // The time the last RPC message is delivered to ByteArrayObject + steady_clock::time_point send_complete_time; + // Set by the global persistence callback when version last_version is persisted + steady_clock::time_point persist_complete_time; + // Set by the global verification callback when version last_version is verified + steady_clock::time_point verify_complete_time; +}; + +class ByteArrayObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { public: Persistent pers_bytes; + uint64_t messages_received; + TestState* test_state; - void change_pers_bytes(const test::Bytes& bytes) { - *pers_bytes = bytes; - } + void change_pers_bytes(const test::Bytes& bytes); // default constructor - ByteArrayObject(PersistentRegistry* pr) - : pers_bytes(pr, true) {} + ByteArrayObject(PersistentRegistry* pr, TestState* test_state) + : pers_bytes(pr, true), + messages_received(0), + test_state(test_state) {} // deserialization constructor - ByteArrayObject(Persistent& _p_bytes) : pers_bytes(std::move(_p_bytes)) {} + ByteArrayObject(Persistent& _p_bytes, uint64_t _messages_received, TestState* _test_state) + : pers_bytes(std::move(_p_bytes)), + messages_received(_messages_received), + test_state(_test_state) {} REGISTER_RPC_FUNCTIONS(ByteArrayObject, ORDERED_TARGETS(change_pers_bytes)); - DEFAULT_SERIALIZATION_SUPPORT(ByteArrayObject, pers_bytes); + DEFAULT_SERIALIZE(pers_bytes, messages_received); + DEFAULT_DESERIALIZE_NOALLOC(ByteArrayObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer); }; +void ByteArrayObject::change_pers_bytes(const test::Bytes& bytes) { + *pers_bytes = bytes; + // This logic used to be in the global stability callback, but now it needs to be here + // because global stability callbacks were disabled for RPC messages. It also now requires + // a call to get_subgroup().get_current_version(), which means we can't write the method inline. + auto ver = this->group->template get_subgroup(this->subgroup_index).get_current_version(); + ++messages_received; + if(messages_received == test_state->total_num_messages) { + test_state->send_complete_time = std::chrono::steady_clock::now(); + test_state->last_version = std::get<0>(ver); + test_state->last_version_set = true; + } +} + +std::unique_ptr ByteArrayObject::from_bytes(mutils::DeserializationManager* dsm, const uint8_t* buffer) { + // Default serialization will serialize each named field in order, so deserialize in the same order + auto pers_bytes_ptr = mutils::from_bytes>(dsm, buffer); + std::size_t bytes_read = mutils::bytes_size(*pers_bytes_ptr); + auto messages_received_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + // Get the TestState pointer from the DeserializationManager + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*pers_bytes_ptr, *messages_received_ptr, test_state_ptr); +} + struct signed_bw_result { int num_nodes; int num_senders_selector; @@ -94,70 +150,49 @@ int main(int argc, char* argv[]) { pthread_setname_np(pthread_self(), DEFAULT_PROC_NAME); } - steady_clock::time_point begin_time, send_complete_time, persist_complete_time, verify_complete_time; + TestState shared_test_state; + shared_test_state.experiment_done = false; + shared_test_state.last_version_set = false; + steady_clock::time_point begin_time; bool is_sending = true; - long total_num_messages; switch(sender_selector) { case PartialSendMode::ALL_SENDERS: - total_num_messages = num_of_nodes * num_msgs; + shared_test_state.total_num_messages = num_of_nodes * num_msgs; break; case PartialSendMode::HALF_SENDERS: - total_num_messages = (num_of_nodes / 2) * num_msgs; + shared_test_state.total_num_messages = (num_of_nodes / 2) * num_msgs; break; case PartialSendMode::ONE_SENDER: - total_num_messages = num_msgs; + shared_test_state.total_num_messages = num_msgs; break; } - // variable 'done' tracks the end of the test - std::atomic done = false; - - // last_version and its flag is shared between the stability callback and persistence callback. - // This is a clumsy hack to figure out what version number is assigned to the last delivered message. - persistent::version_t last_version; - std::atomic last_version_set = false; - - auto stability_callback = [&last_version, - &last_version_set, - &send_complete_time, - total_num_messages, - num_delivered = 0u](uint32_t subgroup, - uint32_t sender_id, - long long int index, - std::optional> data, - persistent::version_t ver) mutable { - //Count the total number of messages delivered - ++num_delivered; - if(num_delivered == total_num_messages) { - send_complete_time = std::chrono::steady_clock::now(); - last_version = ver; - last_version_set = true; - } - }; auto persistence_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) { - if(last_version_set && ver == last_version) { - persist_complete_time = std::chrono::steady_clock::now(); + if(shared_test_state.last_version_set && ver == shared_test_state.last_version) { + shared_test_state.persist_complete_time = steady_clock::now(); } }; auto verified_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) { - if(last_version_set && ver == last_version) { - verify_complete_time = std::chrono::steady_clock::now(); - done = true; + if(shared_test_state.last_version_set && ver == shared_test_state.last_version) { + shared_test_state.verify_complete_time = steady_clock::now(); + shared_test_state.experiment_done = true; } }; derecho::UserMessageCallbacks callback_set{ - stability_callback, - persistence_callback, nullptr, + nullptr, + persistence_callback, verified_callback}; derecho::SubgroupInfo subgroup_info(PartialSendersAllocator(num_of_nodes, sender_selector)); - auto ba_factory = [](PersistentRegistry* pr, derecho::subgroup_id_t) { return std::make_unique(pr); }; + auto ba_factory = [&shared_test_state](PersistentRegistry* pr, derecho::subgroup_id_t) { + return std::make_unique(pr, &shared_test_state); + }; - derecho::Group group(callback_set, subgroup_info, {}, + derecho::Group group(callback_set, subgroup_info, {&shared_test_state}, std::vector{}, ba_factory); auto node_rank = group.get_my_rank(); @@ -186,32 +221,32 @@ int main(int argc, char* argv[]) { #endif //_PERFORMANCE_DEBUG } - while(!done) { + while(!shared_test_state.experiment_done) { } - int64_t send_nanosec = duration_cast(send_complete_time - begin_time).count(); + int64_t send_nanosec = duration_cast(shared_test_state.send_complete_time - begin_time).count(); double send_millisec = static_cast(send_nanosec) / 1000000; - int64_t persist_nanosec = duration_cast(persist_complete_time - begin_time).count(); + int64_t persist_nanosec = duration_cast(shared_test_state.persist_complete_time - begin_time).count(); double persist_millisec = static_cast(persist_nanosec) / 1000000; - int64_t verified_nanosec = duration_cast(verify_complete_time - begin_time).count(); + int64_t verified_nanosec = duration_cast(shared_test_state.verify_complete_time - begin_time).count(); double verified_millisec = static_cast(verified_nanosec) / 1000000; //Calculate bandwidth //Bytes / nanosecond just happens to be equivalent to GigaBytes / second (in "decimal" GB) //Note that total_num_messages already incorporates multiplying by the number of senders - double send_thp_gbps = (static_cast(total_num_messages) * msg_size) / send_nanosec; - double send_thp_ops = (static_cast(total_num_messages) * 1000000000) / send_nanosec; + double send_thp_gbps = (static_cast(shared_test_state.total_num_messages) * msg_size) / send_nanosec; + double send_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / send_nanosec; std::cout << "(send)timespan: " << send_millisec << " milliseconds." << std::endl; std::cout << "(send)throughput: " << send_thp_gbps << "GB/s." << std::endl; std::cout << "(send)throughput: " << send_thp_ops << "ops." << std::endl; - double persist_thp_gbs = (static_cast(total_num_messages) * msg_size) / persist_nanosec; - double persist_thp_ops = (static_cast(total_num_messages) * 1000000000) / persist_nanosec; + double persist_thp_gbs = (static_cast(shared_test_state.total_num_messages) * msg_size) / persist_nanosec; + double persist_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / persist_nanosec; std::cout << "(pers)timespan: " << persist_millisec << " millisecond." << std::endl; std::cout << "(pers)throughput: " << persist_thp_gbs << "GB/s." << std::endl; std::cout << "(pers)throughput: " << persist_thp_ops << "ops." << std::endl; - double verified_thp_gbs = (static_cast(total_num_messages) * msg_size) / verified_nanosec; - double verified_thp_ops = (static_cast(total_num_messages) * 1000000000) / verified_nanosec; + double verified_thp_gbs = (static_cast(shared_test_state.total_num_messages) * msg_size) / verified_nanosec; + double verified_thp_ops = (static_cast(shared_test_state.total_num_messages) * 1000000000) / verified_nanosec; std::cout << "(verify)timespan: " << verified_millisec << " millisecond." << std::endl; std::cout << "(verify)throughput: " << verified_thp_gbs << "GB/s." << std::endl; std::cout << "(verify)throughput: " << verified_thp_ops << "ops." << std::endl; diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index adbab770..61943daf 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -33,8 +33,7 @@ void TestState::notify_update_delivered(uint64_t update_counter, persistent::ver void TestState::notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version) { dbg_default_info("Persisted: Subgroup {}, version {}.", subgroup_id, version); //Mark the unsigned subgroup as finished when it has finished persisting, since it won't be "verified" - //NOTE: This relies on UnsignedObject always being the third subgroup (with ID 2) - if(subgroup_id == 2 && last_version_ready && version == last_version) { + if(my_subgroup_is_unsigned && last_version_ready && version == last_version) { { std::unique_lock finish_lock(finish_mutex); subgroup_finished = true; @@ -58,6 +57,23 @@ void TestState::notify_global_verified(derecho::subgroup_id_t subgroup_id, persi /* --- OneFieldObject implementation --- */ +OneFieldObject::OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) + : string_field(std::make_unique, + "OneFieldObjectStringField", registry, true), + updates_delivered(0), + test_state(test_state) { + assert(test_state); +} + +OneFieldObject::OneFieldObject(persistent::Persistent& other_value, + uint64_t other_updates_delivered, + TestState* test_state) + : string_field(std::move(other_value)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); +} + void OneFieldObject::update_state(const std::string& new_value) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); auto version_and_timestamp = this_subgroup_reference.get_current_version(); @@ -79,6 +95,25 @@ std::unique_ptr OneFieldObject::from_bytes(mutils::Deserializati /* --- TwoFieldObject implementation --- */ +TwoFieldObject::TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) + : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), + bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true), + updates_delivered(0), + test_state(test_state) { + assert(test_state); +} + +TwoFieldObject::TwoFieldObject(persistent::Persistent& other_foo, + persistent::Persistent& other_bar, + uint64_t other_updates_delivered, + TestState* test_state) + : foo(std::move(other_foo)), + bar(std::move(other_bar)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); +} + void TwoFieldObject::update(const std::string& new_foo, const std::string& new_bar) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); auto version_and_timestamp = this_subgroup_reference.get_current_version(); @@ -102,6 +137,22 @@ std::unique_ptr TwoFieldObject::from_bytes(mutils::Deserializati /* --- UnsignedObject implementation --- */ +UnsignedObject::UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) + : string_field(std::make_unique, "UnsignedObjectField", registry, false), + updates_delivered(0), + test_state(test_state) { + assert(test_state); +} + +UnsignedObject::UnsignedObject(persistent::Persistent& other_field, + uint64_t other_updates_delivered, + TestState* test_state) + : string_field(std::move(other_field)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); +} + void UnsignedObject::update_state(const std::string& new_value) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); auto version_and_timestamp = this_subgroup_reference.get_current_version(); @@ -196,6 +247,7 @@ int main(int argc, char** argv) { derecho::Replicated& object_handle = group.get_subgroup(); test_state.subgroup_total_updates = subgroup_total_messages[0]; test_state.my_subgroup_id = object_handle.get_subgroup_id(); + test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_string('a', 32); @@ -208,6 +260,7 @@ int main(int argc, char** argv) { derecho::Replicated& object_handle = group.get_subgroup(); test_state.subgroup_total_updates = subgroup_total_messages[1]; test_state.my_subgroup_id = object_handle.get_subgroup_id(); + test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_foo('a', 32); @@ -223,6 +276,7 @@ int main(int argc, char** argv) { derecho::Replicated& object_handle = group.get_subgroup(); test_state.subgroup_total_updates = subgroup_total_messages[2]; test_state.my_subgroup_id = object_handle.get_subgroup_id(); + test_state.my_subgroup_is_unsigned = true; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { std::string new_string('a', 32); diff --git a/src/applications/tests/unit_tests/signed_log_test.hpp b/src/applications/tests/unit_tests/signed_log_test.hpp index fb8f5b51..9bdd5b39 100644 --- a/src/applications/tests/unit_tests/signed_log_test.hpp +++ b/src/applications/tests/unit_tests/signed_log_test.hpp @@ -13,10 +13,10 @@ * constructed or deserialized, set up by the deserialization manager. */ struct TestState : public derecho::DeserializationContext { - // Set by the main thread after it figures out which subgroup this node was assigned to + // The next 3 are set by the main thread after it figures out which subgroup this node was assigned to derecho::subgroup_id_t my_subgroup_id; - // Set by the main thread after it figures out which subgroup this node was assigned to uint32_t subgroup_total_updates; + bool my_subgroup_is_unsigned; // Set by each replicated object when the last update is delivered and its version is known persistent::version_t last_version; // Used to alert other threads (i.e. global callbacks) that last_version has been set @@ -51,22 +51,11 @@ class OneFieldObject : public mutils::ByteRepresentable, public: /** Factory constructor */ - OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) - : string_field(std::make_unique, - "OneFieldObjectStringField", registry, true), - updates_delivered(0), - test_state(test_state) { - assert(test_state); - } + OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state); /** Deserialization constructor */ OneFieldObject(persistent::Persistent& other_value, uint64_t other_updates_delivered, - TestState* test_state) - : string_field(std::move(other_value)), - updates_delivered(other_updates_delivered), - test_state(test_state) { - assert(test_state); - } + TestState* test_state); std::string get_state() const { return *string_field; @@ -94,24 +83,12 @@ class TwoFieldObject : public mutils::ByteRepresentable, public: /** Factory constructor */ - TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) - : foo(std::make_unique, "TwoFieldObjectStringOne", registry, true), - bar(std::make_unique, "TwoFieldObjectStringTwo", registry, true), - updates_delivered(0), - test_state(test_state) { - assert(test_state); - } + TwoFieldObject(persistent::PersistentRegistry* registry, TestState* test_state); /** Deserialization constructor */ TwoFieldObject(persistent::Persistent& other_foo, persistent::Persistent& other_bar, uint64_t other_updates_delivered, - TestState* test_state) - : foo(std::move(other_foo)), - bar(std::move(other_bar)), - updates_delivered(other_updates_delivered), - test_state(test_state) { - assert(test_state); - } + TestState* test_state); std::string get_foo() const { return *foo; @@ -141,21 +118,11 @@ class UnsignedObject : public mutils::ByteRepresentable, public: /** Factory constructor */ - UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) - : string_field(std::make_unique, "UnsignedObjectField", registry, false), - updates_delivered(0), - test_state(test_state) { - assert(test_state); - } + UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state); /** Deserialization constructor */ UnsignedObject(persistent::Persistent& other_field, uint64_t other_updates_delivered, - TestState* test_state) - : string_field(std::move(other_field)), - updates_delivered(other_updates_delivered), - test_state(test_state) { - assert(test_state); - } + TestState* test_state); std::string get_state() const { return *string_field; } diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 50e5a24b..ed419fb8 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 165; +const int COMMITS_AHEAD_OF_VERSION = 167; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+165"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+167"; } From cc8d665b1dbd051125374b2a5e26716f457d110c Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 9 Nov 2023 16:41:52 -0500 Subject: [PATCH 12/12] Updated signed_store_test to use DeserializationContext This test was not relying on global stability callbacks, but it was using a fragile workaround to notify the main thread when the experiment was finished, by passing a pointer to an atomic bool into the ObjectStore/SignatureStore factory and then hoping the subgroups were only constructed once. Now that I know how to use the DeserializationContext to give replicated objects pointers to the main thread, I should do it here too, so the test can support view changes without breaking the link between the ObjectStore objects and the main thread. --- .../performance_tests/signed_store_test.cpp | 45 ++++++++++++------- .../performance_tests/signed_store_test.hpp | 41 ++++++++++++----- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/src/applications/tests/performance_tests/signed_store_test.cpp b/src/applications/tests/performance_tests/signed_store_test.cpp index 25a4fc23..55bcf333 100644 --- a/src/applications/tests/performance_tests/signed_store_test.cpp +++ b/src/applications/tests/performance_tests/signed_store_test.cpp @@ -223,9 +223,9 @@ bool ClientTier::update_batch_test(const int& num_updates) const { /* ---------------- SignatureStore implementation --------------------- */ SignatureStore::SignatureStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done) + ExperimentState* experiment_state) : hashes(std::make_unique, "SignedHashLog", pr, true), - experiment_done(experiment_done) {} + experiment_state(experiment_state) {} std::vector SignatureStore::add_hash(const SHA256Hash& hash) const { derecho::Replicated& this_subgroup = group->get_subgroup(this->subgroup_index); @@ -253,15 +253,22 @@ void SignatureStore::ordered_add_hash(const SHA256Hash& hash) { void SignatureStore::end_test() const { std::cout << "Received the end_test message, shutting down" << std::endl; - *experiment_done = true; + experiment_state->done = true; +} + +std::unique_ptr SignatureStore::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto hash_log_ptr = mutils::from_bytes>(dsm, buffer); + assert(dsm->registered()); + ExperimentState* experiment_state_ptr = &(dsm->mgr()); + return std::make_unique(*hash_log_ptr, experiment_state_ptr); } /* ----------------- ObjectStore implementation ------------------------ */ ObjectStore::ObjectStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done) + ExperimentState* experiment_state) : object_log(std::make_unique, "BlobLog", pr, false), - experiment_done(experiment_done) {} + experiment_state(experiment_state) {} std::pair ObjectStore::update(const Blob& new_data) const { derecho::Replicated& this_subgroup = group->get_subgroup(this->subgroup_index); @@ -299,8 +306,15 @@ Blob ObjectStore::get_latest() const { } void ObjectStore::end_test() const { - std::cout << "Recieved the end_test message, shutting down" << std::endl; - *experiment_done = true; + std::cout << "Received the end_test message, shutting down" << std::endl; + experiment_state->done = true; +} + +std::unique_ptr ObjectStore::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + auto object_log_ptr = mutils::from_bytes>(dsm, buffer); + assert(dsm->registered()); + ExperimentState* experiment_state_ptr = &(dsm->mgr()); + return std::make_unique(*object_log_ptr, experiment_state_ptr); } /* -------------------------------------------------------------------- */ @@ -352,16 +366,17 @@ int main(int argc, char** argv) { + derecho::remote_invocation_utilities::header_space(); const std::size_t update_size = derecho::getConfUInt64(derecho::Conf::SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE) - rpc_header_size; steady_clock::time_point begin_time, batch_complete_time; - //This atomic flag will be shared with the SignatureStore or ObjectStore subgroup, - //if this node ends up in that group - std::shared_ptr> experiment_done = std::make_shared>(false); + //This DeserializationContext, containing a flag to indicate when the experiment is done, + //will be shared with the SignatureStore or ObjectStore subgroup if this node ends up in that group + ExperimentState experiment_state; + experiment_state.done = false; auto object_subgroup_factory = [&](persistent::PersistentRegistry* registry, derecho::subgroup_id_t subgroup_id) { - return std::make_unique(registry, experiment_done); + return std::make_unique(registry, &experiment_state); }; auto signature_subgroup_factory = [&](persistent::PersistentRegistry* registry, derecho::subgroup_id_t subgroup_id) { - return std::make_unique(registry, experiment_done); + return std::make_unique(registry, &experiment_state); }; //Subgroup and shard layout @@ -380,7 +395,7 @@ int main(int argc, char** argv) { derecho::Group group( {nullptr, nullptr, nullptr, nullptr}, subgroup_layout, - {}, {}, + {&experiment_state}, {}, client_node_factory, object_subgroup_factory, signature_subgroup_factory); @@ -431,12 +446,12 @@ int main(int argc, char** argv) { std::cout << "Assigned the SignatureStore role." << std::endl; std::cout << "Waiting for the end_test message." << std::endl; //Wait for this flag to be flipped by the end_test message handler - while(!(*experiment_done)) { + while(!(experiment_state.done)) { } } else if(my_storage_shard != -1) { std::cout << "Assigned the ObjectStore role." << std::endl; std::cout << "Waiting for the end_test message." << std::endl; - while(!(*experiment_done)) { + while(!(experiment_state.done)) { } } else { std::cout << "Not assigned to any role (?!)" << std::endl; diff --git a/src/applications/tests/performance_tests/signed_store_test.hpp b/src/applications/tests/performance_tests/signed_store_test.hpp index 9ea0e110..c0e77295 100644 --- a/src/applications/tests/performance_tests/signed_store_test.hpp +++ b/src/applications/tests/performance_tests/signed_store_test.hpp @@ -71,6 +71,14 @@ class Blob : public mutils::ByteRepresentable { Blob(uint8_t* buffer, std::size_t size, bool temporary); }; +/** + * A DeserializationContext object used to share state between the replicated + * objects (e.g. ObjectStore, SignatureStore) and the main thread. + */ +struct ExperimentState : public derecho::DeserializationContext { + std::atomic done; +}; + class ObjectStore : public mutils::ByteRepresentable, public derecho::PersistsFields, public derecho::GroupReference { @@ -83,17 +91,20 @@ class ObjectStore : public mutils::ByteRepresentable, * Shared with the main thread to tell it when the experiment is done and it should * call group.leave() (which can't be done from inside this subgroup object). */ - std::shared_ptr> experiment_done; + ExperimentState* experiment_state; public: ObjectStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done); + ExperimentState* experiment_state); /** Deserialization constructor */ - ObjectStore(persistent::Persistent& other_log) : object_log(std::move(other_log)) {} + ObjectStore(persistent::Persistent& other_log, + ExperimentState* experiment_state) + : object_log(std::move(other_log)), + experiment_state(experiment_state) {} /** * P2P-callable function that creates a new log entry with the provided data. * @return The version assigned to the new log entry, and the timestamp assigned to the new log entry - */ + */ std::pair update(const Blob& new_data) const; /** Actual implementation of update, only callable from within the subgroup as an ordered send. */ @@ -123,7 +134,10 @@ class ObjectStore : public mutils::ByteRepresentable, */ void end_test() const; - DEFAULT_SERIALIZATION_SUPPORT(ObjectStore, object_log); + DEFAULT_SERIALIZE(object_log); + DEFAULT_DESERIALIZE_NOALLOC(ObjectStore); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + REGISTER_RPC_FUNCTIONS(ObjectStore, ORDERED_TARGETS(ordered_update), P2P_TARGETS(update, await_persistence, get, get_latest, end_test)); }; @@ -138,16 +152,18 @@ class SignatureStore : public mutils::ByteRepresentable, persistent::Persistent hashes; /** - * Shared with the main thread to tell it when the experiment is done and it should - * call group.leave() (which can't be done from inside this subgroup object). + * Shared with the main thread to tell it when the experiment is done. */ - std::shared_ptr> experiment_done; + ExperimentState* experiment_state; public: SignatureStore(persistent::PersistentRegistry* pr, - std::shared_ptr> experiment_done); + ExperimentState* experiment_state); /** Deserialization constructor */ - SignatureStore(persistent::Persistent& other_hashes) : hashes(std::move(other_hashes)) {} + SignatureStore(persistent::Persistent& other_hashes, + ExperimentState* experiment_state) + : hashes(std::move(other_hashes)), + experiment_state(experiment_state) {} /** * P2P-callable function that appends a new object-update hash to the signed log. @@ -168,7 +184,10 @@ class SignatureStore : public mutils::ByteRepresentable, */ void end_test() const; - DEFAULT_SERIALIZATION_SUPPORT(SignatureStore, hashes); + DEFAULT_SERIALIZE(hashes); + DEFAULT_DESERIALIZE_NOALLOC(SignatureStore); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); + REGISTER_RPC_FUNCTIONS(SignatureStore, P2P_TARGETS(add_hash, end_test), ORDERED_TARGETS(ordered_add_hash)); };