Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prover): Create reqwest client only once #3324

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions prover/crates/bin/prover_autoscaler/src/global/queuer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@ use std::{collections::HashMap, ops::Deref};
use anyhow::{Context, Ok};
use reqwest::Method;
use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport};
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::{
config::QueueReportFields,
metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE},
};

const MAX_RETRIES: usize = 5;
use crate::{config::QueueReportFields, http_client::HttpClient};

pub struct Queue(HashMap<(String, QueueReportFields), u64>);

Expand All @@ -23,6 +17,7 @@ impl Deref for Queue {

#[derive(Default)]
pub struct Queuer {
http_client: HttpClient,
pub prover_job_monitor_url: String,
}

Expand All @@ -40,8 +35,9 @@ fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 {
}

impl Queuer {
pub fn new(pjm_url: String) -> Self {
pub fn new(http_client: HttpClient, pjm_url: String) -> Self {
Self {
http_client,
prover_job_monitor_url: pjm_url,
}
}
Expand All @@ -50,13 +46,13 @@ impl Queuer {
/// list of jobs.
pub async fn get_queue(&self, jobs: &[QueueReportFields]) -> anyhow::Result<Queue> {
let url = &self.prover_job_monitor_url;
let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await;
let response = response.map_err(|err| {
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}")
})?;
let response = self
.http_client
.send_request_with_retries(url, Method::GET, None, None)
.await;
let response = response
.map_err(|err| anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}"))?;

AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc();
let response = response
.json::<Vec<VersionedQueueReport>>()
.await
Expand Down
37 changes: 17 additions & 20 deletions prover/crates/bin/prover_autoscaler/src/global/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ use reqwest::{
};
use tokio::sync::Mutex;
use url::Url;
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::{
agent::{ScaleRequest, ScaleResponse},
cluster_types::{Cluster, Clusters},
metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE},
http_client::HttpClient,
task_wiring::Task,
};

const MAX_RETRIES: usize = 5;

#[derive(Default)]
pub struct WatchedData {
pub clusters: Clusters,
Expand All @@ -36,16 +33,18 @@ pub fn check_is_ready(v: &Vec<bool>) -> Result<()> {

#[derive(Default, Clone)]
pub struct Watcher {
http_client: HttpClient,
/// List of base URLs of all agents.
pub cluster_agents: Vec<Arc<Url>>,
pub dry_run: bool,
pub data: Arc<Mutex<WatchedData>>,
}

impl Watcher {
pub fn new(agent_urls: Vec<String>, dry_run: bool) -> Self {
pub fn new(http_client: HttpClient, agent_urls: Vec<String>, dry_run: bool) -> Self {
let size = agent_urls.len();
Self {
http_client,
cluster_agents: agent_urls
.into_iter()
.map(|u| {
Expand Down Expand Up @@ -92,26 +91,25 @@ impl Watcher {
.unwrap()
.to_string();
tracing::debug!("Sending scale request to {}, data: {:?}.", url, sr);
let http_client = self.http_client.clone();
tokio::spawn(async move {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
if dry_run {
tracing::info!("Dry-run mode, not sending the request.");
return Ok((id, Ok(ScaleResponse::default())));
}
let response = send_request_with_retries(
&url,
MAX_RETRIES,
Method::POST,
Some(headers),
Some(serde_json::to_vec(&sr)?),
)
.await;
let response = http_client
.send_request_with_retries(
&url,
Method::POST,
Some(headers),
Some(serde_json::to_vec(&sr)?),
)
.await;
let response = response.map_err(|err| {
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}")
})?;
AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc();
let response = response
.json::<ScaleResponse>()
.await
Expand Down Expand Up @@ -164,21 +162,20 @@ impl Task for Watcher {
.enumerate()
.map(|(i, a)| {
tracing::debug!("Getting cluster data from agent {}.", a);
let http_client = self.http_client.clone();
tokio::spawn(async move {
let url: String = a
.clone()
.join("/cluster")
.context("Failed to join URL with /cluster")?
.to_string();
let response =
send_request_with_retries(&url, MAX_RETRIES, Method::GET, None, None).await;
let response = http_client
.send_request_with_retries(&url, Method::GET, None, None)
.await;

let response = response.map_err(|err| {
// TODO: refactor send_request_with_retries to return status.
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}")
})?;
AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc();
let response = response
.json::<Cluster>()
.await
Expand Down
94 changes: 94 additions & 0 deletions prover/crates/bin/prover_autoscaler/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use reqwest::{header::HeaderMap, Client, Error, Method, Response, StatusCode};
use tokio::time::{sleep, Duration};

use crate::metrics::AUTOSCALER_METRICS;

#[derive(Clone)]
pub struct HttpClient {
client: Client,
max_retries: usize,
}

impl Default for HttpClient {
fn default() -> Self {
Self {
client: Client::new(),
max_retries: 5,
}
}
}

#[derive(Debug)]
pub enum HttpError {
ReqwestError(Error),
RetryExhausted(String),
}

impl HttpClient {
/// Method to send HTTP request with fixed number of retires with exponential back-offs.
pub async fn send_request_with_retries(
&self,
url: &str,
method: Method,
headers: Option<HeaderMap>,
body: Option<Vec<u8>>,
) -> Result<Response, HttpError> {
let mut retries = 0usize;
let mut delay = Duration::from_secs(1);
loop {
let result = self
.send_request(url, method.clone(), headers.clone(), body.clone())
.await;
AUTOSCALER_METRICS.calls[&(
url.into(),
match result {
Ok(ref response) => response.status().as_u16(),
Err(ref err) => err
.status()
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
.as_u16(),
},
)]
.inc();
match result {
Ok(response) if response.status().is_success() => return Ok(response),
Ok(response) => {
tracing::error!("Received non OK http response {:?}", response.status())
}
Err(err) => tracing::error!("Error while sending http request {:?}", err),
}

if retries >= self.max_retries {
return Err(HttpError::RetryExhausted(format!(
"All {} http retires failed",
self.max_retries
)));
}
retries += 1;
sleep(delay).await;
delay = delay.checked_mul(2).unwrap_or(Duration::MAX);
}
}

async fn send_request(
&self,
url: &str,
method: Method,
headers: Option<HeaderMap>,
body: Option<Vec<u8>>,
) -> Result<Response, Error> {
let mut request = self.client.request(method, url);

if let Some(headers) = headers {
yorik marked this conversation as resolved.
Show resolved Hide resolved
request = request.headers(headers);
}

if let Some(body) = body {
request = request.body(body);
}

let request = request.build()?;
let response = self.client.execute(request).await?;
Ok(response)
}
}
15 changes: 10 additions & 5 deletions prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@ use reqwest::{
Method,
};
use tokio::sync::Mutex;
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent};
use crate::{
cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent},
http_client::HttpClient,
};

#[derive(Clone)]
pub struct Watcher {
pub client: kube::Client,
pub cluster: Arc<Mutex<Cluster>>,
}

async fn get_cluster_name() -> anyhow::Result<String> {
async fn get_cluster_name(http_client: HttpClient) -> anyhow::Result<String> {
let mut headers = HeaderMap::new();
headers.insert("Metadata-Flavor", HeaderValue::from_static("Google"));
let url = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name";
let response = send_request_with_retries(url, 5, Method::GET, Some(headers), None).await;
let response = http_client
.send_request_with_retries(url, Method::GET, Some(headers), None)
.await;
response
.map_err(|err| anyhow::anyhow!("Failed fetching response from url: {url}: {err:?}"))?
.text()
Expand All @@ -37,6 +41,7 @@ async fn get_cluster_name() -> anyhow::Result<String> {

impl Watcher {
pub async fn new(
http_client: HttpClient,
client: kube::Client,
cluster_name: Option<String>,
namespaces: Vec<String>,
Expand All @@ -48,7 +53,7 @@ impl Watcher {

let cluster_name = match cluster_name {
Some(c) => c,
None => get_cluster_name()
None => get_cluster_name(http_client)
.await
.expect("Load cluster_name from GCP"),
};
Expand Down
1 change: 1 addition & 0 deletions prover/crates/bin/prover_autoscaler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod agent;
pub(crate) mod cluster_types;
pub mod config;
pub mod global;
pub mod http_client;
yorik marked this conversation as resolved.
Show resolved Hide resolved
pub mod k8s;
pub(crate) mod metrics;
pub mod task_wiring;
24 changes: 19 additions & 5 deletions prover/crates/bin/prover_autoscaler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use zksync_prover_autoscaler::{
agent,
config::{config_from_yaml, ProverAutoscalerConfig},
global::{self},
http_client::HttpClient,
k8s::{Scaler, Watcher},
task_wiring::TaskRunner,
};
Expand Down Expand Up @@ -74,6 +75,8 @@ async fn main() -> anyhow::Result<()> {

let mut tasks = vec![];

let http_client = HttpClient::default();

match opt.job {
AutoscalerType::Agent => {
tracing::info!("Starting ProverAutoscaler Agent");
Expand All @@ -84,8 +87,13 @@ async fn main() -> anyhow::Result<()> {
let _ = rustls::crypto::ring::default_provider().install_default();
let client = kube::Client::try_default().await?;

let watcher =
Watcher::new(client.clone(), opt.cluster_name, agent_config.namespaces).await;
let watcher = Watcher::new(
http_client,
client.clone(),
opt.cluster_name,
agent_config.namespaces,
)
.await;
let scaler = Scaler::new(client, agent_config.dry_run);
tasks.push(tokio::spawn(watcher.clone().run()));
tasks.push(tokio::spawn(agent::run_server(
Expand All @@ -101,9 +109,15 @@ async fn main() -> anyhow::Result<()> {
let interval = scaler_config.scaler_run_interval;
let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port);
tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone())));
let watcher =
global::watcher::Watcher::new(scaler_config.agents.clone(), scaler_config.dry_run);
let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone());
let watcher = global::watcher::Watcher::new(
http_client.clone(),
scaler_config.agents.clone(),
scaler_config.dry_run,
);
let queuer = global::queuer::Queuer::new(
http_client,
scaler_config.prover_job_monitor_url.clone(),
);
let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config);
tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?);
}
Expand Down
2 changes: 0 additions & 2 deletions prover/crates/bin/prover_autoscaler/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use vise::{Counter, Gauge, LabeledFamily, Metrics};

use crate::config::Gpu;

pub const DEFAULT_ERROR_CODE: u16 = 500;

#[derive(Debug, Metrics)]
#[metrics(prefix = "autoscaler")]
pub(crate) struct AutoscalerMetrics {
Expand Down
Loading