Skip to content

Commit

Permalink
Refactored p2p_connections again. Much simpler structure.
Browse files Browse the repository at this point in the history
  • Loading branch information
sagarjha committed Apr 13, 2019
1 parent a743407 commit a84b957
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 114 deletions.
11 changes: 6 additions & 5 deletions include/derecho/core/detail/p2p_connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ struct P2PParams {

enum REQUEST_TYPE {
P2P_REPLY,
P2P_SEND,
P2P_REQUEST,
RPC_REPLY
};
static const REQUEST_TYPE p2p_request_types[] = {P2P_REPLY,
P2P_REQUEST,
RPC_REPLY};

class P2PConnections {
const std::vector<uint32_t> members;
Expand All @@ -41,14 +44,12 @@ class P2PConnections {
std::vector<std::unique_ptr<volatile char[]>> outgoing_p2p_buffers;
std::vector<std::unique_ptr<resources>> res_vec;
uint64_t p2p_buf_size;
std::vector<uint64_t> incoming_send_seq_nums, incoming_rpc_reply_seq_nums, incoming_p2p_reply_seq_nums,
outgoing_send_seq_nums, outgoing_rpc_reply_seq_nums, outgoing_p2p_reply_seq_nums;
std::map<REQUEST_TYPE, std::vector<uint64_t>> incoming_seq_nums_map, outgoing_seq_nums_map;
std::vector<REQUEST_TYPE> prev_mode;
std::atomic<bool> thread_shutdown{false};
std::thread timeout_thread;
uint64_t getOffsetSeqNum(REQUEST_TYPE type, uint64_t seq_num);
uint64_t getOffsetBuf(REQUEST_TYPE type, uint64_t& seq_num);
uint64_t getOffsetBufNoIncrement(REQUEST_TYPE type, uint64_t seq_num);
uint64_t getOffsetBuf(REQUEST_TYPE type, uint64_t seq_num);
char* probe(uint32_t rank);
uint32_t num_rdma_writes = 0;
void check_failures_loop();
Expand Down
4 changes: 2 additions & 2 deletions include/derecho/core/detail/replicated_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ auto Replicated<T>::p2p_send(node_id_t dest_node, Args&&... args) {
size = _size;
if(size <= max_payload_size) {
return (char*)group_rpc_manager.get_sendbuffer_ptr(dest_node,
sst::REQUEST_TYPE::P2P_SEND);
sst::REQUEST_TYPE::P2P_REQUEST);
} else {
return nullptr;
}
Expand Down Expand Up @@ -234,7 +234,7 @@ auto ExternalCaller<T>::p2p_send(node_id_t dest_node, Args&&... args) {
size = _size;
if(size <= max_payload_size) {
return (char*)group_rpc_manager.get_sendbuffer_ptr(dest_node,
sst::REQUEST_TYPE::P2P_SEND);
sst::REQUEST_TYPE::P2P_REQUEST);
} else {
return nullptr;
}
Expand Down
149 changes: 42 additions & 107 deletions src/core/p2p_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@ P2PConnections::P2PConnections(const P2PParams params)
incoming_p2p_buffers(num_members),
outgoing_p2p_buffers(num_members),
res_vec(num_members),
p2p_buf_size(4 * max_msg_size * window_size + sizeof(bool)),
incoming_send_seq_nums(num_members),
incoming_rpc_reply_seq_nums(num_members),
incoming_p2p_reply_seq_nums(num_members),
outgoing_send_seq_nums(num_members),
outgoing_rpc_reply_seq_nums(num_members),
outgoing_p2p_reply_seq_nums(num_members),
p2p_buf_size(3 * max_msg_size * window_size + sizeof(bool)),
prev_mode(num_members) {
//Figure out my SST index
my_index = (uint32_t)-1;
Expand All @@ -33,7 +27,11 @@ P2PConnections::P2PConnections(const P2PParams params)
}
node_id_to_rank[members[i]] = i;
}
assert(my_index != (uint32_t)-1);

for(auto type: p2p_request_types) {
incoming_seq_nums_map[type].resize(num_members);
outgoing_seq_nums_map[type].resize(num_members);
}

for(uint i = 0; i < num_members; ++i) {
incoming_p2p_buffers[i] = std::make_unique<volatile char[]>(p2p_buf_size);
Expand Down Expand Up @@ -63,13 +61,7 @@ P2PConnections::P2PConnections(P2PConnections&& old_connections, const std::vect
incoming_p2p_buffers(num_members),
outgoing_p2p_buffers(num_members),
res_vec(num_members),
p2p_buf_size(4 * max_msg_size * window_size + sizeof(bool)),
incoming_send_seq_nums(num_members),
incoming_rpc_reply_seq_nums(num_members),
incoming_p2p_reply_seq_nums(num_members),
outgoing_send_seq_nums(num_members),
outgoing_rpc_reply_seq_nums(num_members),
outgoing_p2p_reply_seq_nums(num_members),
p2p_buf_size(3 * max_msg_size * window_size + sizeof(bool)),
prev_mode(num_members) {
old_connections.shutdown_failures_thread();
//Figure out my SST index
Expand All @@ -80,7 +72,11 @@ P2PConnections::P2PConnections(P2PConnections&& old_connections, const std::vect
}
node_id_to_rank[members[i]] = i;
}
assert(my_index != (uint32_t)-1);

for(auto type : p2p_request_types) {
incoming_seq_nums_map[type].resize(num_members);
outgoing_seq_nums_map[type].resize(num_members);
}

for(uint i = 0; i < num_members; ++i) {
if(old_connections.node_id_to_rank.find(members[i]) == old_connections.node_id_to_rank.end()) {
Expand All @@ -95,12 +91,10 @@ P2PConnections::P2PConnections(P2PConnections&& old_connections, const std::vect
auto old_rank = old_connections.node_id_to_rank[members[i]];
incoming_p2p_buffers[i] = std::move(old_connections.incoming_p2p_buffers[old_rank]);
outgoing_p2p_buffers[i] = std::move(old_connections.outgoing_p2p_buffers[old_rank]);
incoming_send_seq_nums[i] = old_connections.incoming_send_seq_nums[old_rank];
incoming_rpc_reply_seq_nums[i] = old_connections.incoming_rpc_reply_seq_nums[old_rank];
incoming_p2p_reply_seq_nums[i] = old_connections.incoming_p2p_reply_seq_nums[old_rank];
outgoing_send_seq_nums[i] = old_connections.outgoing_send_seq_nums[old_rank];
outgoing_rpc_reply_seq_nums[i] = old_connections.outgoing_rpc_reply_seq_nums[old_rank];
outgoing_p2p_reply_seq_nums[i] = old_connections.outgoing_p2p_reply_seq_nums[old_rank];
for(auto type : p2p_request_types) {
incoming_seq_nums_map[type][i] = old_connections.incoming_seq_nums_map[type][i];
outgoing_seq_nums_map[type][i] = old_connections.outgoing_seq_nums_map[type][i];
}
if(i != my_index) {
res_vec[i] = std::move(old_connections.res_vec[old_rank]);
}
Expand Down Expand Up @@ -135,51 +129,30 @@ uint64_t P2PConnections::getOffsetSeqNum(REQUEST_TYPE type, uint64_t seq_num) {
return max_msg_size * (2 * window_size + (seq_num % window_size) + 1) - sizeof(uint64_t);
case REQUEST_TYPE::P2P_REPLY:
return max_msg_size * (window_size + (seq_num % window_size) + 1) - sizeof(uint64_t);
case REQUEST_TYPE::P2P_SEND:
case REQUEST_TYPE::P2P_REQUEST:
return max_msg_size * (seq_num % window_size + 1) - sizeof(uint64_t);
}
return 0;
}

// note that it takes seq_num by reference and increments it!
uint64_t P2PConnections::getOffsetBuf(REQUEST_TYPE type, uint64_t& seq_num) {
switch(type) {
case REQUEST_TYPE::RPC_REPLY:
return max_msg_size * (2 * window_size + (seq_num++ % window_size));
case REQUEST_TYPE::P2P_REPLY:
return max_msg_size * (window_size + (seq_num++ % window_size));
case REQUEST_TYPE::P2P_SEND:
return max_msg_size * (seq_num++ % window_size);
}
return 0;
}

uint64_t P2PConnections::getOffsetBufNoIncrement(REQUEST_TYPE type, uint64_t seq_num) {
uint64_t P2PConnections::getOffsetBuf(REQUEST_TYPE type, uint64_t seq_num) {
switch(type) {
case REQUEST_TYPE::RPC_REPLY:
return max_msg_size * (2 * window_size + (seq_num % window_size));
case REQUEST_TYPE::P2P_REPLY:
return max_msg_size * (window_size + (seq_num % window_size));
case REQUEST_TYPE::P2P_SEND:
return max_msg_size * (seq_num % window_size);
case REQUEST_TYPE::P2P_REQUEST:
return max_msg_size * (seq_num++ % window_size);
}
return 0;
}

// check if there's a new request from some node
char* P2PConnections::probe(uint32_t rank) {
assert(incoming_p2p_buffers[rank]);
// first check for RPC replies
if((uint64_t&)incoming_p2p_buffers[rank][getOffsetSeqNum(REQUEST_TYPE::RPC_REPLY, incoming_rpc_reply_seq_nums[rank])] == incoming_rpc_reply_seq_nums[rank] + 1) {
return const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBuf(REQUEST_TYPE::RPC_REPLY, incoming_rpc_reply_seq_nums[rank]);
}
// then check for P2P replies
if((uint64_t&)incoming_p2p_buffers[rank][getOffsetSeqNum(REQUEST_TYPE::P2P_REPLY, incoming_p2p_reply_seq_nums[rank])] == incoming_p2p_reply_seq_nums[rank] + 1) {
return const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBuf(REQUEST_TYPE::P2P_REPLY, incoming_p2p_reply_seq_nums[rank]);
}
// finally check for any new requests
if((uint64_t&)incoming_p2p_buffers[rank][getOffsetSeqNum(REQUEST_TYPE::P2P_SEND, incoming_send_seq_nums[rank])] == incoming_send_seq_nums[rank] + 1) {
return const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBuf(REQUEST_TYPE::P2P_SEND, incoming_send_seq_nums[rank]);
for(auto type : p2p_request_types) {
if((uint64_t&)incoming_p2p_buffers[rank][getOffsetSeqNum(type, incoming_seq_nums_map[type][rank])] == incoming_seq_nums_map[type][rank] + 1) {
return const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBuf(type, incoming_seq_nums_map[type][rank]++);
}
}
return nullptr;
}
Expand All @@ -197,67 +170,29 @@ std::optional<std::pair<uint32_t, char*>> P2PConnections::probe_all() {

char* P2PConnections::get_sendbuffer_ptr(uint32_t rank, REQUEST_TYPE type) {
prev_mode[rank] = type;
if(type == REQUEST_TYPE::RPC_REPLY) {
(uint64_t&)outgoing_p2p_buffers[rank][getOffsetSeqNum(type, outgoing_rpc_reply_seq_nums[rank])] = outgoing_rpc_reply_seq_nums[rank] + 1;
return const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(type, outgoing_rpc_reply_seq_nums[rank]);
} else if(type == REQUEST_TYPE::P2P_REPLY) {
(uint64_t&)outgoing_p2p_buffers[rank][getOffsetSeqNum(type, outgoing_p2p_reply_seq_nums[rank])] = outgoing_p2p_reply_seq_nums[rank] + 1;
return const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(type, outgoing_p2p_reply_seq_nums[rank]);
} else {
if((int32_t)incoming_p2p_reply_seq_nums[rank] > (int32_t)(outgoing_send_seq_nums[rank] - window_size)) {
(uint64_t&)outgoing_p2p_buffers[rank][getOffsetSeqNum(type, outgoing_send_seq_nums[rank])] = outgoing_send_seq_nums[rank] + 1;
return const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(type, outgoing_send_seq_nums[rank]);
} else {
return nullptr;
}
if(type != REQUEST_TYPE::P2P_REQUEST || (int32_t)incoming_seq_nums_map[REQUEST_TYPE::P2P_REPLY][rank] > (int32_t)(outgoing_seq_nums_map[REQUEST_TYPE::P2P_REQUEST][rank] - window_size)) {
(uint64_t&)outgoing_p2p_buffers[rank][getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank])] = outgoing_seq_nums_map[type][rank] + 1;
return const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBuf(type, outgoing_seq_nums_map[type][rank]);
}
return nullptr;
}

void P2PConnections::send(uint32_t rank) {
if(prev_mode[rank] == REQUEST_TYPE::RPC_REPLY) {
if(rank == my_index) {
// there's no reason why memcpy shouldn't also copy guard and data separately
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(prev_mode[rank], outgoing_rpc_reply_seq_nums[rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(prev_mode[rank], outgoing_rpc_reply_seq_nums[rank]),
max_msg_size - sizeof(uint64_t));
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetSeqNum(prev_mode[rank], outgoing_rpc_reply_seq_nums[rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetSeqNum(prev_mode[rank], outgoing_rpc_reply_seq_nums[rank]),
sizeof(uint64_t));
} else {
res_vec[rank]->post_remote_write(getOffsetBufNoIncrement(prev_mode[rank], outgoing_rpc_reply_seq_nums[rank]), max_msg_size - sizeof(uint64_t));
res_vec[rank]->post_remote_write(getOffsetSeqNum(prev_mode[rank], outgoing_rpc_reply_seq_nums[rank]), sizeof(uint64_t));
num_rdma_writes++;
}
outgoing_rpc_reply_seq_nums[rank]++;
} else if(prev_mode[rank] == REQUEST_TYPE::P2P_REPLY) {
if(rank == my_index) {
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(prev_mode[rank], outgoing_p2p_reply_seq_nums[rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(prev_mode[rank], outgoing_p2p_reply_seq_nums[rank]),
max_msg_size - sizeof(uint64_t));
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetSeqNum(prev_mode[rank], outgoing_p2p_reply_seq_nums[rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetSeqNum(prev_mode[rank], outgoing_p2p_reply_seq_nums[rank]),
sizeof(uint64_t));
} else {
res_vec[rank]->post_remote_write(getOffsetBufNoIncrement(prev_mode[rank], outgoing_p2p_reply_seq_nums[rank]), max_msg_size - sizeof(uint64_t));
res_vec[rank]->post_remote_write(getOffsetSeqNum(prev_mode[rank], outgoing_p2p_reply_seq_nums[rank]), sizeof(uint64_t));
num_rdma_writes++;
}
outgoing_p2p_reply_seq_nums[rank]++;
} else if(prev_mode[rank] == REQUEST_TYPE::P2P_SEND) {
if(rank == my_index) {
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(prev_mode[rank], outgoing_send_seq_nums[rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBufNoIncrement(prev_mode[rank], outgoing_send_seq_nums[rank]),
max_msg_size - sizeof(uint64_t));
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetSeqNum(prev_mode[rank], outgoing_send_seq_nums[rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetSeqNum(prev_mode[rank], outgoing_send_seq_nums[rank]),
sizeof(uint64_t));
} else {
res_vec[rank]->post_remote_write(getOffsetBufNoIncrement(prev_mode[rank], outgoing_send_seq_nums[rank]), max_msg_size - sizeof(uint64_t));
res_vec[rank]->post_remote_write(getOffsetSeqNum(prev_mode[rank], outgoing_send_seq_nums[rank]), sizeof(uint64_t));
num_rdma_writes++;
}
outgoing_send_seq_nums[rank]++;
auto type = prev_mode[rank];
if(rank == my_index) {
// there's no reason why memcpy shouldn't also copy guard and data separately
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBuf(type, outgoing_seq_nums_map[type][rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBuf(type, outgoing_seq_nums_map[type][rank]),
max_msg_size - sizeof(uint64_t));
std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank]),
const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank]),
sizeof(uint64_t));
} else {
res_vec[rank]->post_remote_write(getOffsetBuf(type, outgoing_seq_nums_map[type][rank]), max_msg_size - sizeof(uint64_t));
res_vec[rank]->post_remote_write(getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank]), sizeof(uint64_t));
num_rdma_writes++;
}
outgoing_seq_nums_map[type][rank]++;
}

void P2PConnections::check_failures_loop() {
Expand Down

0 comments on commit a84b957

Please sign in to comment.