Skip to content

Commit

Permalink
Rewrote typed_subgroup_test to use DeserializationContext
Browse files Browse the repository at this point in the history
Now that I know how to use DeserializationContexts to give deserialized
objects pointers to local objects, this should be the proper way to make
TestObject share state with the main method.
  • Loading branch information
etremel committed Sep 28, 2023
1 parent 6ec5406 commit eb7501e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
72 changes: 48 additions & 24 deletions src/applications/tests/performance_tests/typed_subgroup_bw_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ using std::endl;
using test::Bytes;
using namespace std::chrono;

/**
* State shared between the (replicated) TestObjects and the main thread
*/
struct TestState : public derecho::DeserializationContext {
std::atomic<bool> experiment_done;
steady_clock::time_point send_complete_time;
};

/**
* RPC Object with a single function that accepts a byte array and counts
* the number of updates it has received, comparing it to a total provided
Expand All @@ -31,33 +39,49 @@ using namespace std::chrono;
class TestObject : public mutils::ByteRepresentable {
uint64_t messages_received;
const uint64_t total_num_messages;
steady_clock::time_point send_complete_time;
// Shared with the main thread
std::shared_ptr<std::atomic<bool>> experiment_done;
// Pointer to a TestState object held by the main thread
TestState* main_test_state;

public:
void bytes_fun(const Bytes& bytes) {
++messages_received;
if(messages_received == total_num_messages) {
send_complete_time = std::chrono::steady_clock::now();
experiment_done->store(true);
main_test_state->send_complete_time = std::chrono::steady_clock::now();
main_test_state->experiment_done = true;
}
}
// Called by the main thread to retrieve send_complete_time after the experiment is done
const steady_clock::time_point& get_complete_time() const {
return send_complete_time;
}

REGISTER_RPC_FUNCTIONS(TestObject, ORDERED_TARGETS(bytes_fun));
DEFAULT_SERIALIZATION_SUPPORT(TestObject, messages_received, total_num_messages);
// Deserialization constructor. This will break experiment_done's link to the main thread, so we hope it isn't called.
TestObject(uint64_t messages_received, uint64_t total_num_messages)
DEFAULT_SERIALIZE(messages_received, total_num_messages);
// Custom deserialization so we can use the DeserializationManager
static std::unique_ptr<TestObject> from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer);
DEFAULT_DESERIALIZE_NOALLOC(TestObject);
// Deserialization constructor. The TestState pointer should be supplied by the deserialization context.
TestObject(uint64_t messages_received, uint64_t total_num_messages, TestState* main_test_state)
: messages_received(messages_received),
total_num_messages(total_num_messages),
experiment_done(std::make_shared<std::atomic_bool>(false)) {}
main_test_state(main_test_state) {}
// Constructor called by factory function
TestObject(uint64_t total_num_messages, std::shared_ptr<std::atomic<bool>> experiment_done)
: messages_received(0), total_num_messages(total_num_messages), experiment_done(experiment_done) {}
TestObject(uint64_t total_num_messages, TestState* test_state)
: messages_received(0), total_num_messages(total_num_messages), main_test_state(test_state) {}
};

std::unique_ptr<TestObject> TestObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) {
// Default deserialize the first 2 fields
auto messages_received_ptr = mutils::from_bytes<uint64_t>(dsm, buffer);
auto total_num_ptr = mutils::from_bytes<uint64_t>(dsm, buffer + mutils::bytes_size(*messages_received_ptr));
// Retrieve a pointer to TestState from DSM
TestState* test_state_ptr = nullptr;
assert(dsm);
assert(dsm->registered<TestState>());
if(dsm && dsm->registered<TestState>()) {
test_state_ptr = &(dsm->mgr<TestState>());
} else {
// Asserts are disabled in release mode, so try to provide some information if there's a bug
std::cerr << "ERROR: Unable to get TestState pointer in TestObject deserialization. TestObject will crash when it attempts to dereference it." << std::endl;
}
assert(test_state_ptr);
return std::make_unique<TestObject>(*messages_received_ptr, *total_num_ptr, test_state_ptr);
};

struct exp_result {
Expand Down Expand Up @@ -131,8 +155,9 @@ int main(int argc, char* argv[]) {
total_num_messages = count;
break;
}
// variable 'done' tracks the end of the test
std::shared_ptr<std::atomic<bool>> done = std::make_shared<std::atomic<bool>>(false);
// The replicated TestObject will get a pointer to this object and use it to signal that the test is done
TestState shared_test_state;
shared_test_state.experiment_done = false;

if(dashdash_pos + 4 < argc) {
pthread_setname_np(pthread_self(), argv[dashdash_pos + 3]);
Expand All @@ -144,10 +169,12 @@ int main(int argc, char* argv[]) {
derecho::SubgroupInfo subgroup_info(membership_function);

auto test_factory = [&](persistent::PersistentRegistry*, derecho::subgroup_id_t) {
return std::make_unique<TestObject>(total_num_messages, done);
return std::make_unique<TestObject>(total_num_messages, &shared_test_state);
};

derecho::Group<TestObject> group(derecho::UserMessageCallbacks{nullptr}, subgroup_info, {},
std::vector<derecho::DeserializationContext*> context_vector{&shared_test_state};

derecho::Group<TestObject> group(derecho::UserMessageCallbacks{nullptr}, subgroup_info, context_vector,
std::vector<derecho::view_upcall_t>{}, test_factory);
std::cout << "Finished constructing/joining Group" << std::endl;

Expand Down Expand Up @@ -180,15 +207,12 @@ int main(int argc, char* argv[]) {
}
}

while(!(*done)) {
while(!shared_test_state.experiment_done) {
}

delete[] bbuf;
// Retrieve the completion time from the subgroup object
// Replicated::get_ref is unsafe, but the group should be idle by the time *done is true
steady_clock::time_point send_complete_time = group.get_subgroup<TestObject>().get_ref().get_complete_time();

int64_t nsec = duration_cast<nanoseconds>(send_complete_time - begin_time).count();
int64_t nsec = duration_cast<nanoseconds>(shared_test_state.send_complete_time - begin_time).count();

double thp_gbps = (static_cast<double>(total_num_messages) * max_msg_size) / nsec;
double thp_ops = (static_cast<double>(total_num_messages) * 1000000000) / nsec;
Expand Down
4 changes: 2 additions & 2 deletions src/core/git_version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace derecho {
const int MAJOR_VERSION = 2;
const int MINOR_VERSION = 3;
const int PATCH_VERSION = 0;
const int COMMITS_AHEAD_OF_VERSION = 159;
const int COMMITS_AHEAD_OF_VERSION = 161;
const char* VERSION_STRING = "2.3.0";
const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+159";
const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+161";

}

0 comments on commit eb7501e

Please sign in to comment.