Skip to content

Commit

Permalink
MessageQueue: made it possible to use python iterator and length meth…
Browse files Browse the repository at this point in the history
…od on the queue
  • Loading branch information
ThomasDebrunner committed Jun 13, 2024
1 parent 38d3352 commit 42261e4
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions src/bind_Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ struct _ExpectationWrapper {
};

// Class to queue incoming messages from a Connection to be accessed asynchronously by python
// (in order to avoid deadlocks)
class MessageQueue {
private:
std::queue<Message> _messages;
std::mutex _lock;
std::weak_ptr<Connection> _connection;
CallbackHandle _cb_handle;
public:
MessageQueue(std::shared_ptr<Connection>& connection) : _connection(connection) {
_cb_handle = connection->addMessageCallback([this](const Message& message) {
MessageQueue(std::shared_ptr<Connection> &connection) : _connection(connection) {
_cb_handle = connection->addMessageCallback([this](const Message &message) {
std::lock_guard lg{_lock};
_messages.push(message);
});
}

~MessageQueue() {
auto connection = _connection.lock();
if (connection) {
Expand All @@ -78,6 +78,11 @@ class MessageQueue {
_messages.pop();
return ret;
}

std::size_t size() {
std::lock_guard lg{_lock};
return _messages.size();
}
};


Expand All @@ -86,8 +91,18 @@ void bind_Connection(py::module m) {
.def(py::init<>());

py::class_<MessageQueue>(m, "MessageQueue")
.def(py::init<std::shared_ptr<Connection>&>())
.def("next", &MessageQueue::next, py::call_guard<py::gil_scoped_release>());
.def(py::init<std::shared_ptr<Connection> &>())
.def("next", &MessageQueue::next, py::call_guard<py::gil_scoped_release>())
.def("__iter__", [](MessageQueue &self) -> MessageQueue & { return self; })
.def("__next__", [](MessageQueue &self) {
py::gil_scoped_release release;
auto msg = self.next();
if (!msg) {
throw py::stop_iteration();
}
return *msg;
})
.def("__len__", &MessageQueue::size, py::call_guard<py::gil_scoped_release>());

py::class_<Connection, std::shared_ptr<Connection>>(m, "Connection")
.def("alive", &Connection::alive)
Expand Down Expand Up @@ -134,4 +149,3 @@ void bind_Connection(py::module m) {
py::call_guard<py::gil_scoped_release>(),
py::arg("message_id"), py::arg("timeout_ms") = -1);
}

0 comments on commit 42261e4

Please sign in to comment.