Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/issue/196'
Browse files Browse the repository at this point in the history
* origin/issue/196:
  Fix publisher deadlock when passing large batches

(cherry picked from commit bef5e0b)
  • Loading branch information
timwoj committed Aug 3, 2021
1 parent 01591c2 commit ef4fd6f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
2.1.0 | 2021-07-16 19:02:24 -0700

* Fix publisher deadlock when passing large batches (Dominik Charousset, Corelight)

(cherry-picked from bef5e0b485c63e3da4bde9cf01285634a0587cc6)

* Raise CAF dependency to 0.18.5 and update embedded version (Tim Wojtulewicz, Corelight)

* Release 2.1.0.
Expand Down
2 changes: 1 addition & 1 deletion src/detail/flare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ flare::flare() {
BROKER_ERROR("failed to set flare fd 0 CLOEXEC: " << res.error());
if (auto res = child_process_inherit(second, false); !res)
BROKER_ERROR("failed to set flare fd 1 CLOEXEC: " << res.error());
if (auto res = nonblocking(first, false); !res) {
if (auto res = nonblocking(first, true); !res) {
BROKER_ERROR("failed to set flare fd 0 NONBLOCK: " << res.error());
std::terminate();
}
Expand Down
34 changes: 34 additions & 0 deletions tests/cpp/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,37 @@ CAF_TEST(nonblocking_publishers) {
}

CAF_TEST_FIXTURE_SCOPE_END()

TEST(regression GH196) {
endpoint ep1;
endpoint ep2;
auto port = ep1.listen("127.0.0.1", 0);
auto sub1 = ep1.make_subscriber({topic{"/test"}});
auto sub2 = ep1.make_subscriber({topic{"/test"}});
ep2.peer("127.0.0.1", port);
auto pub = ep2.make_publisher({topic{"/test"}});
auto cap = pub.capacity();
std::vector<data> batch1;
for (size_t i = 0; i < cap; ++i)
batch1.emplace_back(i);
auto batch2 = batch1;
pub.publish(std::move(batch1));
pub.publish(std::move(batch2));
for (size_t n = 0; n < 2; ++n) {
for (size_t i = 0; i < cap; ++i) {
auto msg = sub1.get();
CHECK_EQUAL(get_topic(msg).string(), "/test");
CHECK_EQUAL(get_data(msg), data(i));
}
}
CHECK(sub1.poll().empty());
auto res = sub2.get(cap * 2);
for (size_t n = 0; n < 2; ++n) {
for (size_t i = 0; i < cap; ++i) {
auto& msg = res[(n * cap) + i];
CHECK_EQUAL(get_topic(msg).string(), "/test");
CHECK_EQUAL(get_data(msg), data(i));
}
}
CHECK(sub2.poll().empty());
}

0 comments on commit ef4fd6f

Please sign in to comment.