From ef4fd6f2d25b244ae990eb29aa37880decd0512b Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Tue, 3 Aug 2021 11:55:22 -0700 Subject: [PATCH] Merge remote-tracking branch 'origin/issue/196' * origin/issue/196: Fix publisher deadlock when passing large batches (cherry picked from commit bef5e0b485c63e3da4bde9cf01285634a0587cc6) --- CHANGES | 4 ++++ src/detail/flare.cc | 2 +- tests/cpp/publisher.cc | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/CHANGES b/CHANGES index 1948c387..65aee777 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,9 @@ 2.1.0 | 2021-07-16 19:02:24 -0700 + * Fix publisher deadlock when passing large batches (Dominik Charousset, Corelight) + + (cherry-picked from bef5e0b485c63e3da4bde9cf01285634a0587cc6) + * Raise CAF dependency to 0.18.5 and update embedded version (Tim Wojtulewicz, Corelight) * Release 2.1.0. diff --git a/src/detail/flare.cc b/src/detail/flare.cc index 92d81b65..9aab1a6f 100644 --- a/src/detail/flare.cc +++ b/src/detail/flare.cc @@ -80,7 +80,7 @@ flare::flare() { BROKER_ERROR("failed to set flare fd 0 CLOEXEC: " << res.error()); if (auto res = child_process_inherit(second, false); !res) BROKER_ERROR("failed to set flare fd 1 CLOEXEC: " << res.error()); - if (auto res = nonblocking(first, false); !res) { + if (auto res = nonblocking(first, true); !res) { BROKER_ERROR("failed to set flare fd 0 NONBLOCK: " << res.error()); std::terminate(); } diff --git a/tests/cpp/publisher.cc b/tests/cpp/publisher.cc index ded7d4b4..706918f4 100644 --- a/tests/cpp/publisher.cc +++ b/tests/cpp/publisher.cc @@ -189,3 +189,37 @@ CAF_TEST(nonblocking_publishers) { } CAF_TEST_FIXTURE_SCOPE_END() + +TEST(regression GH196) { + endpoint ep1; + endpoint ep2; + auto port = ep1.listen("127.0.0.1", 0); + auto sub1 = ep1.make_subscriber({topic{"/test"}}); + auto sub2 = ep1.make_subscriber({topic{"/test"}}); + ep2.peer("127.0.0.1", port); + auto pub = ep2.make_publisher({topic{"/test"}}); + auto cap = pub.capacity(); + std::vector batch1; + for (size_t i = 0; i < cap; ++i) + batch1.emplace_back(i); + auto batch2 = batch1; + pub.publish(std::move(batch1)); + pub.publish(std::move(batch2)); + for (size_t n = 0; n < 2; ++n) { + for (size_t i = 0; i < cap; ++i) { + auto msg = sub1.get(); + CHECK_EQUAL(get_topic(msg).string(), "/test"); + CHECK_EQUAL(get_data(msg), data(i)); + } + } + CHECK(sub1.poll().empty()); + auto res = sub2.get(cap * 2); + for (size_t n = 0; n < 2; ++n) { + for (size_t i = 0; i < cap; ++i) { + auto& msg = res[(n * cap) + i]; + CHECK_EQUAL(get_topic(msg).string(), "/test"); + CHECK_EQUAL(get_data(msg), data(i)); + } + } + CHECK(sub2.poll().empty()); +}