Skip to content

Commit

Permalink
Merge branch 'persistent_latency_fix'
Browse files Browse the repository at this point in the history
  • Loading branch information
songweijia committed Jul 1, 2021
2 parents 3271531 + 66d4032 commit 503d097
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 50 deletions.
4 changes: 4 additions & 0 deletions src/applications/tests/performance_tests/bytes_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,8 @@ mutils::context_ptr<const Bytes> Bytes::from_bytes_noalloc_const(mutils::Deseria
((std::size_t*)(buffer))[0],
true)};
}

char* Bytes::get() const {
return bytes;
}
} // namespace test
1 change: 1 addition & 0 deletions src/applications/tests/performance_tests/bytes_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Bytes : public mutils::ByteRepresentable {
Bytes(char* buffer, std::size_t size, bool is_temporary);

public:
char* get() const;
/**
* Constructs a byte array by copying the contents of a char array into the
* internal buffer.
Expand Down
140 changes: 91 additions & 49 deletions src/applications/tests/performance_tests/persistent_latency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,26 @@
#include <vector>

#include <derecho/core/derecho.hpp>
#include <derecho/utils/time.h>

#include "bytes_object.hpp"
#include "partial_senders_allocator.hpp"


/**
* This latency test will timestamp the following events to measure the breakdown latencies
* ts1 - when a message is sent
* ts2 - message stablized
* ts3 - locally persisted
* ts4 - globally persisted
* (ts4-ts1): end-to-end latency
* (ts2-ts1): stablizing latency
* (ts3-ts2): local persistence latency
* (ts4-ts3): global persistence latency
*
* We assume that the clock on all nodes are percisely synchronized, for example, synchronized by PTP.
*/

using std::cout;
using std::endl;
using test::Bytes;
Expand All @@ -20,10 +36,7 @@ using namespace persistent;

//the payload is used to identify the user timestamp
struct PayLoad {
uint32_t node_rank; // rank of the sender
uint32_t msg_seqno; // sequence of the message sent by the same sender
uint64_t tv_sec; // second
uint64_t tv_nsec; // nano second
uint64_t send_timestamp_us; // message sending time stamp
};

class ByteArrayObject : public mutils::ByteRepresentable, public derecho::PersistsFields {
Expand Down Expand Up @@ -81,17 +94,8 @@ int main(int argc, char* argv[]) {

bool is_sending = true;
uint32_t node_rank = -1;
// message_pers_ts_us[] is the time when a message with version 'ver' is persisted.
uint64_t* message_pers_ts_us = (uint64_t*)malloc(sizeof(uint64_t) * num_msgs * num_of_nodes);
if(message_pers_ts_us == NULL) {
std::cerr << "allocate memory error!" << std::endl;
return -1;
}
// the total span:
struct timespec t_begin;
// is only for local
uint64_t* local_message_ts_us = (uint64_t*)malloc(sizeof(uint64_t) * num_msgs);
long total_num_messages;


uint32_t num_sender = 0;
switch(sender_selector) {
case PartialSendMode::ALL_SENDERS:
Expand All @@ -104,61 +108,92 @@ int main(int argc, char* argv[]) {
num_sender = 1;
break;
}
total_num_messages = num_sender * num_msgs;

// variable 'done' tracks the end of the test
volatile bool done = false;
size_t total_number_of_messages = num_sender * num_msgs;

// Timing instruments
uint64_t* t1_us = (uint64_t*)malloc(sizeof(uint64_t)*total_number_of_messages);
uint64_t* t2_us = (uint64_t*)malloc(sizeof(uint64_t)*total_number_of_messages);
uint64_t* t3_us = (uint64_t*)malloc(sizeof(uint64_t)*total_number_of_messages);
uint64_t* t4_us = (uint64_t*)malloc(sizeof(uint64_t)*total_number_of_messages);
if (t1_us == nullptr || t2_us == nullptr || t3_us == nullptr || t4_us == nullptr) {
std::cerr << "allocate memory error!" << std::endl;
return -1;
}
std::memset(t1_us,0,sizeof(uint64_t)*total_number_of_messages);
std::memset(t2_us,0,sizeof(uint64_t)*total_number_of_messages);
std::memset(t3_us,0,sizeof(uint64_t)*total_number_of_messages);
std::memset(t4_us,0,sizeof(uint64_t)*total_number_of_messages);
std::map<persistent::version_t,size_t> version_to_index;
std::mutex version_to_index_mutex;
size_t number_of_stable_messages = 0;

// last_version and its flag is shared between the stability callback and persistence callback.
// This is a clumsy hack to figure out what version number is assigned to the last delivered message.
persistent::version_t last_version;
std::atomic<bool> last_version_set = false;
std::atomic<bool> local_persistence_done = false;
std::atomic<bool> global_persistence_done = false;

auto stability_callback = [&last_version,
&last_version_set,
total_num_messages,
num_delivered = 0u](uint32_t subgroup,
auto stability_callback = [&](uint32_t subgroup,
uint32_t sender_id,
long long int index,
std::optional<std::pair<char*, long long int>> data,
persistent::version_t ver) mutable {
t2_us[number_of_stable_messages] = get_walltime()/1000;
{
std::lock_guard<std::mutex> lck(version_to_index_mutex);
version_to_index[ver] = number_of_stable_messages;
}
if (!data) {
throw derecho::derecho_exception("Critical: stability_callback got no data.");
}
if (data->second < static_cast<long long int>(sizeof (PayLoad))) {
throw derecho::derecho_exception("Critical: stability_callback got invalid data size.");
}
// 35 is the size of cooked header -- TODO: find a better way to index the parameters.
t1_us[number_of_stable_messages] = reinterpret_cast<PayLoad*>(data->first + 35)->send_timestamp_us;
//Count the total number of messages delivered
++num_delivered;
if(num_delivered == total_num_messages) {
++number_of_stable_messages;
if(number_of_stable_messages == total_number_of_messages) {
last_version = ver;
last_version_set = true;
}
};

auto global_persistence_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) {
struct timespec ts;
static persistent::version_t pers_ver = 0;
if(pers_ver > ver) return;
auto local_persistence_callback = [&](derecho::subgroup_id_t, persistent::version_t ver) {
size_t index = 0;
uint64_t now_us = get_walltime()/1000;
{
std::lock_guard<std::mutex> lck(version_to_index_mutex);
index = version_to_index.at(ver);
}
while (index >= 0 && t3_us[index] == 0) {
t3_us[index--] = now_us;
}

clock_gettime(CLOCK_REALTIME, &ts);
uint64_t tsus = ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
if (last_version_set && ver == last_version) {
local_persistence_done = true;
}
};

while(pers_ver <= ver) {
message_pers_ts_us[pers_ver++] = tsus;
auto global_persistence_callback = [&](derecho::subgroup_id_t subgroup, persistent::version_t ver) {
size_t index = 0;
uint64_t now_us = get_walltime()/1000;
{
std::lock_guard<std::mutex> lck(version_to_index_mutex);
index = version_to_index.at(ver);
}
while (index >= 0 && t4_us[index] == 0) {
t4_us[index--] = now_us;
}

if(last_version_set && ver == last_version) {
if(is_sending) {
for(uint32_t i = 0; i < num_msgs; i++) {
std::cout << "[" << i << "]" << local_message_ts_us[i] << " "
<< message_pers_ts_us[num_sender * i + node_rank] << " "
<< (message_pers_ts_us[num_sender * i + node_rank] - local_message_ts_us[i]) << " us" << std::endl;
}
}
double thp_mbps = (double)total_num_messages * msg_size / DELTA_T_US(t_begin, ts);
std::cout << "throughput(pers): " << thp_mbps << " MBps" << std::endl;
std::cout << std::flush;
done = true;
global_persistence_done = true;
}
};
derecho::UserMessageCallbacks callback_set{
stability_callback,
nullptr,
local_persistence_callback,
global_persistence_callback};

derecho::SubgroupInfo subgroup_info{PartialSendersAllocator(num_of_nodes, sender_selector)};
Expand All @@ -182,8 +217,6 @@ int main(int argc, char* argv[]) {

std::cout << "my rank is:" << node_rank << ", and I'm sending: " << std::boolalpha << is_sending << std::endl;

clock_gettime(CLOCK_REALTIME, &t_begin);

derecho::Replicated<ByteArrayObject>& handle = group.get_subgroup<ByteArrayObject>();

if(is_sending) {
Expand All @@ -201,7 +234,9 @@ int main(int argc, char* argv[]) {
clock_gettime(CLOCK_REALTIME, &cur);
} while(DELTA_T_US(start, cur) < i * (double)si_us);
{
local_message_ts_us[i] = cur.tv_sec * 1e6 + cur.tv_nsec / 1e3;

memset(bs.get(),0xcc,msg_size);
reinterpret_cast<PayLoad*>(bs.get())->send_timestamp_us = get_walltime()/1000;
handle.ordered_send<RPC_NAME(change_pers_bytes)>(bs);
}
}
Expand All @@ -212,7 +247,14 @@ int main(int argc, char* argv[]) {
}
}

while(!done) {
while(!local_persistence_done || !global_persistence_done);
std::cout << "#index\t#end-to-end(us)\t#t2-t1(us)\t#t3-t2(us)\t#t4-t3(us)" << std::endl;
for (size_t i=0; i < total_number_of_messages; i++) {
std::cout << i << "\t"
<< (t4_us[i] - t1_us[i]) << "\t"
<< (t2_us[i] - t1_us[i]) << "\t"
<< (t3_us[i] - t2_us[i]) << "\t"
<< (t4_us[i] - t3_us[i]) << std::endl;
}
group.barrier_sync();
group.leave();
Expand Down
3 changes: 2 additions & 1 deletion src/core/multicast_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ void MulticastGroup::deliver_message(SSTMessage& msg, const subgroup_id_t& subgr
internal_callbacks.post_next_version_callback(subgroup_num, version, msg_ts_us);
internal_callbacks.rpc_callback(subgroup_num, msg.sender_id, version, msg_ts_us, buf, payload_size);
if(callbacks.global_stability_callback) {
callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {},
callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index,
{{buf + h->header_size, msg.size - h->header_size}},
version);
}
} else if(callbacks.global_stability_callback) {
Expand Down

0 comments on commit 503d097

Please sign in to comment.