Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated wire.h to be the same as tateyama #141

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions src/tateyama/transport/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -1012,30 +1012,41 @@ class connection_queue
}
void push(std::size_t sid, std::size_t admin_slots = 0) {
boost::interprocess::scoped_lock lock(mutex_);
queue_.at(index(pushed_.load() + admin_slots)) = sid;
pushed_.fetch_add(1);
if (admin_slots > 0 && is_admin(sid)) {
queue_.at(index(pushed_.load() + admin_slots)) = reset_admin(sid);
admin_slots_in_use_.fetch_sub(1, std::memory_order_release);
pushed_.fetch_add(1);
} else {
queue_.at(index(pushed_.load() + admin_slots)) = sid;
pushed_.fetch_add(1, std::memory_order_release);
}
std::atomic_thread_fence(std::memory_order_acq_rel);
condition_.notify_one();
}
[[nodiscard]] std::size_t try_pop() {
boost::interprocess::scoped_lock lock(mutex_); // trade off
auto current = poped_.load();
while (true) {
if (pushed_.load() <= current) {
throw std::runtime_error("no request available");
auto ps = pushed_.load(std::memory_order_acquire);
if ((ps + admin_slots_in_use_.load()) <= current) {
throw std::runtime_error("no request slot is available for normal request");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
return queue_.at(index(current));
}
}
}
[[nodiscard]] std::size_t try_pop(std::uint8_t admin_slots) {
boost::interprocess::scoped_lock lock(mutex_);
auto current = poped_.load();
while (true) {
if ((pushed_.load() + admin_slots) <= current) {
throw std::runtime_error("no request available");
auto ps = pushed_.load(std::memory_order_acquire);
if ((ps + (admin_slots - admin_slots_in_use_.load())) <= current) {
throw std::runtime_error("no request slot is available for admin request");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
return queue_.at(index(current));
admin_slots_in_use_.fetch_add(1);
return set_admin(queue_.at(index(current)));
}
}
}
Expand Down Expand Up @@ -1064,7 +1075,8 @@ class connection_queue
}
private:
boost::interprocess::vector<std::size_t, long_allocator> queue_;
std::size_t capacity_;
std::uint32_t capacity_;
std::atomic_uint8_t admin_slots_in_use_{0};
boost::interprocess::interprocess_mutex mutex_{};
boost::interprocess::interprocess_condition condition_{};

Expand Down Expand Up @@ -1136,6 +1148,11 @@ class connection_queue

using element_allocator = boost::interprocess::allocator<element, boost::interprocess::managed_shared_memory::segment_manager>;
constexpr static std::size_t session_id_indicating_error = UINT64_MAX;
constexpr static std::size_t admin_bit = 1ULL << 63UL;

static std::size_t set_admin(std::size_t slot) { return slot | admin_bit; }
static std::size_t reset_admin(std::size_t slot) { return slot & ~admin_bit; }
static bool is_admin(std::size_t slot) { return (slot & admin_bit) != 0; }

/**
* @brief Construct a new object.
Expand Down Expand Up @@ -1165,7 +1182,7 @@ class connection_queue
return sid;
}
std::size_t wait(std::size_t sid, std::int64_t timeout = 0) {
auto& entry = v_requested_.at(sid);
auto& entry = v_requested_.at(reset_admin(sid));
try {
auto rtnv = entry.wait(timeout);
entry.reuse();
Expand All @@ -1176,7 +1193,7 @@ class connection_queue
}
}
bool check(std::size_t sid) {
return v_requested_.at(sid).check();
return v_requested_.at(reset_admin(sid)).check();
}
std::size_t listen() {
if (q_requested_.wait(terminate_)) {
Expand All @@ -1190,12 +1207,12 @@ class connection_queue
// either accept() or reject() must be called
void accept(std::size_t sid, std::size_t session_id) {
q_requested_.pop();
v_requested_.at(sid).accept(session_id);
v_requested_.at(reset_admin(sid)).accept(session_id);
}
// either accept() or reject() must be called
void reject(std::size_t sid) {
q_requested_.pop();
v_requested_.at(sid).reject();
v_requested_.at(reset_admin(sid)).reject();
q_free_.push(sid, admin_slots_);
}
void disconnect(std::size_t sid) {
Expand Down