Skip to content

Commit

Permalink
pvf priority (#2305)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
Co-authored-by: kamilsa <[email protected]>
  • Loading branch information
turuslan and kamilsa authored Dec 16, 2024
1 parent 1d5d755 commit d056f23
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
1 change: 1 addition & 0 deletions core/parachain/pvf/pvf_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ namespace kagome::parachain {
}
cb(scale::decode<ValidationResult>(r.value()));
},
.kind = timeout_kind,
.timeout =
std::chrono::milliseconds{
timeout_kind == runtime::PvfExecTimeoutKind::Backing
Expand Down
41 changes: 30 additions & 11 deletions core/parachain/pvf/workers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "utils/weak_macro.hpp"

namespace kagome::parachain {
constexpr auto kMetricQueueSize = "kagome_pvf_queue_size";

struct AsyncPipe : boost::process::async_pipe {
using async_pipe::async_pipe;
using lowest_layer_type = AsyncPipe;
Expand Down Expand Up @@ -133,13 +135,26 @@ namespace kagome::parachain {
.cache_dir = app_config.runtimeCacheDirPath(),
.log_params = app_config.log(),
.force_disable_secure_mode = app_config.disableSecureMode(),
} {}
} {
metrics_registry_->registerGaugeFamily(kMetricQueueSize, "pvf queue size");
std::unordered_map<PvfExecTimeoutKind, std::string> kind_name{
{PvfExecTimeoutKind::Approval, "Approval"},
{PvfExecTimeoutKind::Backing, "Backing"},
};
for (auto &[kind, name] : kind_name) {
metric_queue_size_.emplace(kind,
metrics_registry_->registerGaugeMetric(
kMetricQueueSize, {{"kind", name}}));
}
}

void PvfWorkers::execute(Job &&job) {
REINVOKE(*main_pool_handler_, execute, std::move(job));
if (free_.empty()) {
if (used_ >= max_) {
queue_.emplace(std::move(job));
auto &queue = queues_[job.kind];
queue.emplace_back(std::move(job));
metric_queue_size_.at(job.kind)->set(queue.size());
return;
}
auto used = std::make_shared<Used>(*this);
Expand All @@ -157,10 +172,9 @@ namespace kagome::parachain {
if (not r) {
return job.cb(r.error());
}
self->writeCode(
std::move(job),
{.process = std::move(process)},
std::move(used));
self->writeCode(std::move(job),
{.process = std::move(process)},
std::move(used));
});
return;
}
Expand Down Expand Up @@ -244,11 +258,16 @@ namespace kagome::parachain {
}

void PvfWorkers::dequeue() {
if (queue_.empty()) {
return;
for (auto &kind :
{PvfExecTimeoutKind::Approval, PvfExecTimeoutKind::Backing}) {
auto &queue = queues_[kind];
if (queue.empty()) {
continue;
}
auto job = std::move(queue.front());
queue.pop_front();
metric_queue_size_.at(kind)->set(queue.size());
findFree(std::move(job));
}
auto job = std::move(queue_.front());
queue_.pop();
findFree(std::move(job));
}
} // namespace kagome::parachain
12 changes: 10 additions & 2 deletions core/parachain/pvf/workers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

#pragma once

#include <deque>
#include <filesystem>
#include <list>
#include <queue>

#include "metrics/metrics.hpp"
#include "parachain/pvf/pvf_worker_types.hpp"
#include "runtime/runtime_api/parachain_host_types.hpp"

namespace boost::asio {
class io_context;
Expand All @@ -33,6 +35,8 @@ namespace kagome::common {
} // namespace kagome::common

namespace kagome::parachain {
using runtime::PvfExecTimeoutKind;

struct ProcessAndPipes;

class PvfWorkers : public std::enable_shared_from_this<PvfWorkers> {
Expand All @@ -46,6 +50,7 @@ namespace kagome::parachain {
PvfWorkerInputCodeParams code_params;
Buffer args;
Cb cb;
PvfExecTimeoutKind kind;
std::chrono::milliseconds timeout{0};
};
void execute(Job &&job);
Expand Down Expand Up @@ -77,6 +82,9 @@ namespace kagome::parachain {
PvfWorkerInputConfig worker_config_;
std::list<Worker> free_;
size_t used_ = 0;
std::queue<Job> queue_;
std::unordered_map<PvfExecTimeoutKind, std::deque<Job>> queues_;

metrics::RegistryPtr metrics_registry_ = metrics::createRegistry();
std::unordered_map<PvfExecTimeoutKind, metrics::Gauge *> metric_queue_size_;
};
} // namespace kagome::parachain
2 changes: 1 addition & 1 deletion core/storage/rocksdb/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace kagome::storage {
if (not enable_migration) {
SL_ERROR(log,
"Database migration is disabled, use older kagome version or "
"run with migration enabling flag");
"run with --enable-db-migration flag");
return DatabaseError::IO_ERROR;
}

Expand Down

0 comments on commit d056f23

Please sign in to comment.