Skip to content

Commit

Permalink
connection: switch to a weak_ptr for keeping track of the promises in…
Browse files Browse the repository at this point in the history
… the connection. This makes sure that promises get cleared, if they are no longer being held onto by the caller
  • Loading branch information
ThomasDebrunner committed Feb 29, 2024
1 parent 3423d2e commit a2e5a3f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
23 changes: 17 additions & 6 deletions include/mav/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ namespace mav {
using Expectation = std::shared_ptr<std::promise<Message>>;

private:
using ExpectationWeakRef = std::weak_ptr<std::promise<Message>>;

struct FunctionCallback {
std::function<void(const Message &message)> callback;
std::function<void(const std::exception_ptr& exception)> error_callback;
};

struct PromiseCallback {
Expectation promise;
ExpectationWeakRef promise;
std::function<bool(const Message &message)> selector;
};

Expand Down Expand Up @@ -126,11 +127,16 @@ namespace mav {
}
it++;
} else if constexpr (std::is_same_v<T, PromiseCallback>) {
if (arg.selector(message)) {
arg.promise->set_value(message);
auto promise = arg.promise.lock();
if (!promise) {
it = _message_callbacks.erase(it);
} else {
it++;
if (arg.selector(message)) {
promise->set_value(message);
it = _message_callbacks.erase(it);
} else {
it++;
}
}
}
}, callback);
Expand All @@ -152,8 +158,13 @@ namespace mav {
}
it++;
} else if constexpr (std::is_same_v<T, PromiseCallback>) {
arg.promise->set_exception(exception);
it = _message_callbacks.erase(it);
auto promise = arg.promise.lock();
if (!promise) {
it = _message_callbacks.erase(it);
} else {
promise->set_exception(exception);
it = _message_callbacks.erase(it);
}
}
}, callback);
}
Expand Down
4 changes: 4 additions & 0 deletions tests/Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ TEST_CASE("Create network runtime") {
auto expectation = connection->expect("TEST_MESSAGE");
CHECK_THROWS_AS(auto message = connection->receive(expectation, 100), TimeoutException);
}
// send a heartbeat. Any message will clear expired expectations
interface.addToReceiveQueue("\xfd\x09\x00\x00\x00\xfd\x01\x00\x00\x00\x04\x00\x00\x00\x01\x02\x03\x05\x06\x77\x53"s, interface_partner);
// wait for the heartbeat to be received, to make sure timing is correct in test
connection->receive("HEARTBEAT");
CHECK_EQ(connection->callbackCount(), 0);
}
}

0 comments on commit a2e5a3f

Please sign in to comment.