Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock in the lazy join. #1649

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 57 additions & 35 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,53 +435,75 @@
Result::IdTableVocabPair,
std::function<void(IdTable&, LocalVocab&)>> auto action,
OptionalPermutation permutation) const {
std::atomic_flag write = true;
// TODO<joka921, RobinTF> This heavily mixes a synchronization algorithm with
// the actual logic. This should be refactored.
std::mutex mutex;
std::condition_variable cv;
enum struct State { Inner, Outer, Finished };
State state = State::Inner;
struct CancelException : public std::exception {};
std::variant<std::monostate, Result::IdTableVocabPair, std::exception_ptr>
storage;
ad_utility::JThread thread{[&write, &storage, &action, &permutation]() {
auto writeValue = [&write, &storage](auto value) noexcept {
storage = std::move(value);
write.clear();
write.notify_one();
};
auto writeValueAndWait = [&permutation, &write,
&writeValue](Result::IdTableVocabPair value) {
AD_CORRECTNESS_CHECK(write.test());
applyPermutation(value.idTable_, permutation);
writeValue(std::move(value));
// Wait until we are allowed to write again.
write.wait(false);
};
auto addValue = [&writeValueAndWait](IdTable& idTable,
LocalVocab& localVocab) {
if (idTable.size() < CHUNK_SIZE) {
return;
}
writeValueAndWait({std::move(idTable), std::move(localVocab)});
};
try {
auto finalValue = action(addValue);
if (!finalValue.idTable_.empty()) {
writeValueAndWait(std::move(finalValue));
}
writeValue(std::monostate{});
} catch (...) {
writeValue(std::current_exception());
}
ad_utility::JThread thread{
[&mutex, &cv, &state, &storage, &action, &permutation]() {
std::unique_lock lock(mutex);
auto wait = [&]() {
cv.wait(lock, [&]() { return state != State::Outer; });
if (state == State::Finished) {
throw CancelException{};
}

Check warning on line 454 in src/engine/Join.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Join.cpp#L453-L454

Added lines #L453 - L454 were not covered by tests
};

auto writeValue = [&cv, &state, &storage](auto value) noexcept {
storage = std::move(value);
state = State::Outer;
cv.notify_one();
};
auto writeValueAndWait = [&state, &permutation, &writeValue,
&wait](Result::IdTableVocabPair value) {
AD_CORRECTNESS_CHECK(state == State::Inner);
applyPermutation(value.idTable_, permutation);
writeValue(std::move(value));
wait();
};
auto addValue = [&writeValueAndWait](IdTable& idTable,
LocalVocab& localVocab) {
if (idTable.size() < CHUNK_SIZE) {
return;
}
writeValueAndWait({std::move(idTable), std::move(localVocab)});
};
try {
auto finalValue = action(addValue);
if (!finalValue.idTable_.empty()) {
writeValueAndWait(std::move(finalValue));
}
writeValue(std::monostate{});
} catch (...) {
writeValue(std::current_exception());
}
}};
std::unique_lock lock{mutex};
cv.wait(lock, [&state]() { return state == State::Outer; });
auto cleanup = absl::Cleanup{[&cv, &state, &lock]() {
state = State::Finished;
lock.unlock();
cv.notify_one();
}};
while (true) {
// Wait for read phase.
write.wait(true);
cv.wait(lock, [&state] { return state == State::Outer; });
if (std::holds_alternative<std::monostate>(storage)) {
break;
}
if (std::holds_alternative<std::exception_ptr>(storage)) {
std::rethrow_exception(std::get<std::exception_ptr>(storage));
}
co_yield std::get<Result::IdTableVocabPair>(storage);
// Initiate write phase.
write.test_and_set();
write.notify_one();
state = State::Inner;
lock.unlock();
cv.notify_one();
lock.lock();
}
}

Expand Down
Loading