Skip to content

Commit

Permalink
Add timeout for context queue pop. (#382)
Browse files Browse the repository at this point in the history
Co-authored-by: Earle Lowe <[email protected]>
  • Loading branch information
fchirica and emlowe authored Aug 16, 2023
1 parent 94f6a59 commit 10c8583
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ inline void InitDecompressorQueueDefault(bool no_cuda = false)
if (initialized) {
return;
}
decompressor_context_queue.init(1, (uint32_t)std::thread::hardware_concurrency(), false, 9, !no_cuda, 0, false);
decompressor_context_queue.init(1, (uint32_t)std::thread::hardware_concurrency(), false, 9, !no_cuda, 0, false, 30);
initialized = true;
}

Expand Down
28 changes: 24 additions & 4 deletions src/prover_disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>

#include "../lib/include/picosha2.hpp"
#include "calculate_bucket.hpp"
Expand Down Expand Up @@ -67,7 +68,8 @@ class ContextQueue {
const uint32_t max_compression_level,
bool use_gpu_harvesting,
uint32_t gpu_index,
bool enforce_gpu_index
bool enforce_gpu_index,
uint16_t context_queue_timeout
) {
assert(!_dcompressor_queue_initialized);
_dcompressor_queue_initialized = true;
Expand Down Expand Up @@ -167,6 +169,7 @@ class ContextQueue {
}
}
}
this->context_queue_timeout = context_queue_timeout;
return false;
}

Expand All @@ -179,9 +182,24 @@ class ContextQueue {

GreenReaperContext* pop() {
std::unique_lock<std::mutex> lock(mutex);
while (queue.empty()) {
condition.wait(lock);

std::chrono::duration<double> wait_time = std::chrono::seconds(context_queue_timeout);

while (queue.empty() && wait_time.count() > 0) {
auto before_wait = std::chrono::steady_clock::now();

if (condition.wait_for(lock, wait_time) == std::cv_status::timeout) {
break;
}

auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - before_wait);
wait_time -= elapsed;
}

if (queue.empty()) {
throw std::runtime_error("Timeout waiting for context queue.");
}

GreenReaperContext* gr = queue.front();
queue.pop();
return gr;
Expand All @@ -191,6 +209,7 @@ class ContextQueue {
std::queue<GreenReaperContext*> queue;
std::mutex mutex;
std::condition_variable condition;
uint16_t context_queue_timeout;
};

class ProofCache {
Expand Down Expand Up @@ -266,7 +285,8 @@ class ContextQueue {
const uint32_t max_compression_level,
bool use_gpu_harvesting,
uint32_t gpu_index,
bool enforce_gpu_index
bool enforce_gpu_index,
uint16_t context_queue_timeout
)
{
return false;
Expand Down

0 comments on commit 10c8583

Please sign in to comment.