diff --git a/core/parachain/pvf/pvf_impl.cpp b/core/parachain/pvf/pvf_impl.cpp index 882d11aad7..cdec26202a 100644 --- a/core/parachain/pvf/pvf_impl.cpp +++ b/core/parachain/pvf/pvf_impl.cpp @@ -391,6 +391,7 @@ namespace kagome::parachain { } cb(scale::decode(r.value())); }, + .kind = timeout_kind, .timeout = std::chrono::milliseconds{ timeout_kind == runtime::PvfExecTimeoutKind::Backing diff --git a/core/parachain/pvf/workers.cpp b/core/parachain/pvf/workers.cpp index 3842cc0759..5551676a6d 100644 --- a/core/parachain/pvf/workers.cpp +++ b/core/parachain/pvf/workers.cpp @@ -20,6 +20,8 @@ #include "utils/weak_macro.hpp" namespace kagome::parachain { + constexpr auto kMetricQueueSize = "kagome_pvf_queue_size"; + struct AsyncPipe : boost::process::async_pipe { using async_pipe::async_pipe; using lowest_layer_type = AsyncPipe; @@ -133,13 +135,26 @@ namespace kagome::parachain { .cache_dir = app_config.runtimeCacheDirPath(), .log_params = app_config.log(), .force_disable_secure_mode = app_config.disableSecureMode(), - } {} + } { + metrics_registry_->registerGaugeFamily(kMetricQueueSize, "pvf queue size"); + std::unordered_map kind_name{ + {PvfExecTimeoutKind::Approval, "Approval"}, + {PvfExecTimeoutKind::Backing, "Backing"}, + }; + for (auto &[kind, name] : kind_name) { + metric_queue_size_.emplace(kind, + metrics_registry_->registerGaugeMetric( + kMetricQueueSize, {{"kind", name}})); + } + } void PvfWorkers::execute(Job &&job) { REINVOKE(*main_pool_handler_, execute, std::move(job)); if (free_.empty()) { if (used_ >= max_) { - queue_.emplace(std::move(job)); + auto &queue = queues_[job.kind]; + queue.emplace_back(std::move(job)); + metric_queue_size_.at(job.kind)->set(queue.size()); return; } auto used = std::make_shared(*this); @@ -157,10 +172,9 @@ namespace kagome::parachain { if (not r) { return job.cb(r.error()); } - self->writeCode( - std::move(job), - {.process = std::move(process)}, - std::move(used)); + self->writeCode(std::move(job), + {.process = std::move(process)}, + std::move(used)); }); return; } @@ -244,11 +258,16 @@ namespace kagome::parachain { } void PvfWorkers::dequeue() { - if (queue_.empty()) { - return; + for (auto &kind : + {PvfExecTimeoutKind::Approval, PvfExecTimeoutKind::Backing}) { + auto &queue = queues_[kind]; + if (queue.empty()) { + continue; + } + auto job = std::move(queue.front()); + queue.pop_front(); + metric_queue_size_.at(kind)->set(queue.size()); + findFree(std::move(job)); } - auto job = std::move(queue_.front()); - queue_.pop(); - findFree(std::move(job)); } } // namespace kagome::parachain diff --git a/core/parachain/pvf/workers.hpp b/core/parachain/pvf/workers.hpp index b267e73cd9..b9dcbde5ad 100644 --- a/core/parachain/pvf/workers.hpp +++ b/core/parachain/pvf/workers.hpp @@ -6,11 +6,13 @@ #pragma once +#include #include #include -#include +#include "metrics/metrics.hpp" #include "parachain/pvf/pvf_worker_types.hpp" +#include "runtime/runtime_api/parachain_host_types.hpp" namespace boost::asio { class io_context; @@ -33,6 +35,8 @@ namespace kagome::common { } // namespace kagome::common namespace kagome::parachain { + using runtime::PvfExecTimeoutKind; + struct ProcessAndPipes; class PvfWorkers : public std::enable_shared_from_this { @@ -46,6 +50,7 @@ namespace kagome::parachain { PvfWorkerInputCodeParams code_params; Buffer args; Cb cb; + PvfExecTimeoutKind kind; std::chrono::milliseconds timeout{0}; }; void execute(Job &&job); @@ -77,6 +82,9 @@ namespace kagome::parachain { PvfWorkerInputConfig worker_config_; std::list free_; size_t used_ = 0; - std::queue queue_; + std::unordered_map> queues_; + + metrics::RegistryPtr metrics_registry_ = metrics::createRegistry(); + std::unordered_map metric_queue_size_; }; } // namespace kagome::parachain diff --git a/core/storage/rocksdb/rocksdb.cpp b/core/storage/rocksdb/rocksdb.cpp index 8e1623026c..104050e18f 100644 --- a/core/storage/rocksdb/rocksdb.cpp +++ b/core/storage/rocksdb/rocksdb.cpp @@ -93,7 +93,7 @@ namespace kagome::storage { if (not enable_migration) { SL_ERROR(log, "Database migration is disabled, use older kagome version or " - "run with migration enabling flag"); + "run with --enable-db-migration flag"); return DatabaseError::IO_ERROR; }