Skip to content

Commit

Permalink
Merge pull request #157 from spacemeshos/configurable-concurrency-limit
Browse files Browse the repository at this point in the history
Configurable concurrency limit
  • Loading branch information
poszu authored Nov 28, 2023
2 parents e5d891f + aa2be08 commit 1e96cea
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions certifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rand = "0.8.5"
serde_json = "1.0.108"
base64 = "0.21.5"
axum-prometheus = "0.4.0"
tower = { version = "0.4.13", features = ["limit"] }

[dev-dependencies]
reqwest = { version = "0.11.22", features = ["json"] }
Expand Down
13 changes: 13 additions & 0 deletions certifier/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,23 @@ init_cfg:
p: 1

metrics: "127.0.0.1:9090"
randomx_mode: Fast
```
Each field can also be provided as env variable prefixed with CERTIFIER. For example, `CERTIFIER_SIGNING_KEY`.

##### Concurrency limit
It's important to configure the maximum number of requests that will be processed in parallel.
The POST verification is heavy on CPU and hence a value higher than the number of CPU cores might lead to drop in performance and increase latency.
It will use the number of available CPU cores if not set.

##### RandomX mode
Randomx is used for K2 PoW verification. There are two modes:
- `Fast`: uses about 2080MiB memory, runs fast
- `Light` (default): uses about 256MiB memory, but runs significantly slower (about x10 slower)

The modes give the same results, they differ in speed and memory consumption only.

#### Docker
There is a docker image created to simplify deployment: `spacemeshos/certifier-service`.

Expand Down
13 changes: 10 additions & 3 deletions certifier/src/certifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use axum::{extract::State, Json};
use axum::{routing::post, Router};
use ed25519_dalek::{Signer, SigningKey};
use post::config::{InitConfig, ProofConfig};
use post::pow::randomx::{PoW, RandomXFlag};
use post::pow::randomx::PoW;
use post::verification::Verifier;
use serde::{Deserialize, Serialize};
use serde_with::{base64::Base64, serde_as};
use tracing::instrument;

use crate::configuration::RandomXMode;

#[derive(Debug, Deserialize, Serialize)]
pub struct CertifyRequest {
pub proof: post::prove::Proof<'static>,
Expand Down Expand Up @@ -66,10 +68,15 @@ struct AppState {
signer: SigningKey,
}

pub fn new(cfg: ProofConfig, init_cfg: InitConfig, signer: SigningKey) -> Router {
pub fn new(
cfg: ProofConfig,
init_cfg: InitConfig,
signer: SigningKey,
randomx_mode: RandomXMode,
) -> Router {
let state = AppState {
verifier: Verifier::new(Box::new(
PoW::new(RandomXFlag::get_recommended_flags()).expect("creating RandomX PoW verifier"),
PoW::new(randomx_mode.into()).expect("creating RandomX PoW verifier"),
)),
cfg,
init_cfg,
Expand Down
39 changes: 39 additions & 0 deletions certifier/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,59 @@
use std::path::Path;

use ed25519_dalek::SecretKey;
use post::pow::randomx::RandomXFlag;
use serde_with::{base64::Base64, serde_as};
use tracing::info;

/// RandomX modes of operation
///
/// They are interchangeable as they give the same results but have different
/// purpose and memory requirements.
#[derive(Debug, Default, Copy, Clone, serde::Deserialize)]
pub enum RandomXMode {
/// Fast mode for proving. Requires 2080 MiB of memory.
Fast,
/// Light mode for verification. Requires only 256 MiB of memory, but runs significantly slower
#[default]
Light,
}

impl From<RandomXMode> for RandomXFlag {
fn from(val: RandomXMode) -> Self {
match val {
RandomXMode::Fast => RandomXFlag::get_recommended_flags() | RandomXFlag::FLAG_FULL_MEM,
RandomXMode::Light => RandomXFlag::get_recommended_flags(),
}
}
}

fn max_concurrency() -> usize {
std::thread::available_parallelism()
.expect("fetching number of cores")
.get()
}

#[serde_as]
#[derive(serde::Deserialize, Clone)]
pub struct Config {
/// The address to listen on for incoming requests.
pub listen: std::net::SocketAddr,

/// The maximum number of requests to process in parallel.
/// Typically set to the number of cores, which is the default (if not set).
#[serde(default = "max_concurrency")]
pub max_concurrent_requests: usize,

#[serde_as(as = "Base64")]
/// The base64-encoded secret key used to sign the proofs.
/// It's 256-bit key as defined in [RFC8032 § 5.1.5].
pub signing_key: SecretKey,
pub post_cfg: post::config::ProofConfig,
pub init_cfg: post::config::InitConfig,

#[serde(default)]
pub randomx_mode: RandomXMode,

/// Address to expose metrics on.
/// Metrics are disabled if not configured.
pub metrics: Option<std::net::SocketAddr>,
Expand Down
19 changes: 16 additions & 3 deletions certifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum_prometheus::PrometheusMetricLayerBuilder;
use base64::{engine::general_purpose, Engine as _};
use clap::{arg, Parser, Subcommand};
use ed25519_dalek::SigningKey;
use tower::limit::ConcurrencyLimitLayer;
use tracing::info;
use tracing_log::LogTracer;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
Expand Down Expand Up @@ -72,9 +73,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pubkey_b64 = general_purpose::STANDARD.encode(signer.verifying_key().as_bytes());

info!("listening on: {:?}, pubkey: {}", config.listen, pubkey_b64,);
info!("using POST configuration: {:?}", config.post_cfg);

let mut app = certifier::certifier::new(config.post_cfg, config.init_cfg, signer);
info!("POST proof configuration: {:?}", config.post_cfg);
info!("POST init configuration: {:?}", config.init_cfg);
info!("RandomX mode: {:?}", config.randomx_mode);
info!(
"max concurrent requests: {}",
config.max_concurrent_requests
);

let mut app = certifier::certifier::new(
config.post_cfg,
config.init_cfg,
signer,
config.randomx_mode,
)
.layer(ConcurrencyLimitLayer::new(config.max_concurrent_requests));

if let Some(addr) = config.metrics {
info!("metrics enabled on: http://{addr:?}/metrics");
Expand Down
4 changes: 2 additions & 2 deletions certifier/tests/test_certify.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::atomic::AtomicBool;

use certifier::certifier::CertifyRequest;
use certifier::{certifier::CertifyRequest, configuration::RandomXMode};
use ed25519_dalek::SigningKey;
use post::{
config::{InitConfig, ProofConfig, ScryptParams},
Expand Down Expand Up @@ -50,7 +50,7 @@ async fn test_certificate_post_proof() {

// Spawn the certifier service
let signer = SigningKey::generate(&mut rand::rngs::OsRng);
let app = certifier::certifier::new(cfg, init_cfg, signer);
let app = certifier::certifier::new(cfg, init_cfg, signer, RandomXMode::Light);
let server = axum::Server::bind(&([127, 0, 0, 1], 0).into()).serve(app.into_make_service());
let addr = server.local_addr();
tokio::spawn(server);
Expand Down
1 change: 1 addition & 0 deletions src/pow/randomx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl PoW {
} else {
(Some(cache), None)
};
log::debug!("RandomX initialized");

Ok(Self {
cache,
Expand Down

0 comments on commit 1e96cea

Please sign in to comment.