diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index baeb5b70a4ef..bc781e793408 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -3,14 +3,8 @@ use std::{collections::HashMap, ops::Deref}; use anyhow::{Context, Ok}; use reqwest::Method; use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; -use zksync_utils::http_with_retries::send_request_with_retries; -use crate::{ - config::QueueReportFields, - metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, -}; - -const MAX_RETRIES: usize = 5; +use crate::{config::QueueReportFields, http_client::HttpClient}; pub struct Queue(HashMap<(String, QueueReportFields), u64>); @@ -23,6 +17,7 @@ impl Deref for Queue { #[derive(Default)] pub struct Queuer { + http_client: HttpClient, pub prover_job_monitor_url: String, } @@ -40,8 +35,9 @@ fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 { } impl Queuer { - pub fn new(pjm_url: String) -> Self { + pub fn new(http_client: HttpClient, pjm_url: String) -> Self { Self { + http_client, prover_job_monitor_url: pjm_url, } } @@ -50,13 +46,13 @@ impl Queuer { /// list of jobs. pub async fn get_queue(&self, jobs: &[QueueReportFields]) -> anyhow::Result { let url = &self.prover_job_monitor_url; - let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; - let response = response.map_err(|err| { - AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); - anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}") - })?; + let response = self + .http_client + .send_request_with_retries(url, Method::GET, None, None) + .await; + let response = response + .map_err(|err| anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}"))?; - AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc(); let response = response .json::>() .await diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index 95b9e32cac5b..9a56471b72d5 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -8,17 +8,14 @@ use reqwest::{ }; use tokio::sync::Mutex; use url::Url; -use zksync_utils::http_with_retries::send_request_with_retries; use crate::{ agent::{ScaleRequest, ScaleResponse}, cluster_types::{Cluster, Clusters}, - metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, + http_client::HttpClient, task_wiring::Task, }; -const MAX_RETRIES: usize = 5; - #[derive(Default)] pub struct WatchedData { pub clusters: Clusters, @@ -36,6 +33,7 @@ pub fn check_is_ready(v: &Vec) -> Result<()> { #[derive(Default, Clone)] pub struct Watcher { + http_client: HttpClient, /// List of base URLs of all agents. pub cluster_agents: Vec>, pub dry_run: bool, @@ -43,9 +41,10 @@ pub struct Watcher { } impl Watcher { - pub fn new(agent_urls: Vec, dry_run: bool) -> Self { + pub fn new(http_client: HttpClient, agent_urls: Vec, dry_run: bool) -> Self { let size = agent_urls.len(); Self { + http_client, cluster_agents: agent_urls .into_iter() .map(|u| { @@ -92,6 +91,7 @@ impl Watcher { .unwrap() .to_string(); tracing::debug!("Sending scale request to {}, data: {:?}.", url, sr); + let http_client = self.http_client.clone(); tokio::spawn(async move { let mut headers = HeaderMap::new(); headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); @@ -99,19 +99,17 @@ impl Watcher { tracing::info!("Dry-run mode, not sending the request."); return Ok((id, Ok(ScaleResponse::default()))); } - let response = send_request_with_retries( - &url, - MAX_RETRIES, - Method::POST, - Some(headers), - Some(serde_json::to_vec(&sr)?), - ) - .await; + let response = http_client + .send_request_with_retries( + &url, + Method::POST, + Some(headers), + Some(serde_json::to_vec(&sr)?), + ) + .await; let response = response.map_err(|err| { - AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })?; - AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); let response = response .json::() .await @@ -164,21 +162,20 @@ impl Task for Watcher { .enumerate() .map(|(i, a)| { tracing::debug!("Getting cluster data from agent {}.", a); + let http_client = self.http_client.clone(); tokio::spawn(async move { let url: String = a .clone() .join("/cluster") .context("Failed to join URL with /cluster")? .to_string(); - let response = - send_request_with_retries(&url, MAX_RETRIES, Method::GET, None, None).await; + let response = http_client + .send_request_with_retries(&url, Method::GET, None, None) + .await; let response = response.map_err(|err| { - // TODO: refactor send_request_with_retries to return status. - AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })?; - AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); let response = response .json::() .await diff --git a/prover/crates/bin/prover_autoscaler/src/http_client.rs b/prover/crates/bin/prover_autoscaler/src/http_client.rs new file mode 100644 index 000000000000..6710ea53a26d --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/http_client.rs @@ -0,0 +1,94 @@ +use reqwest::{header::HeaderMap, Client, Error, Method, Response, StatusCode}; +use tokio::time::{sleep, Duration}; + +use crate::metrics::AUTOSCALER_METRICS; + +#[derive(Clone)] +pub struct HttpClient { + client: Client, + max_retries: usize, +} + +impl Default for HttpClient { + fn default() -> Self { + Self { + client: Client::new(), + max_retries: 5, + } + } +} + +#[derive(Debug)] +pub enum HttpError { + ReqwestError(Error), + RetryExhausted(String), +} + +impl HttpClient { + /// Method to send HTTP request with fixed number of retires with exponential back-offs. + pub async fn send_request_with_retries( + &self, + url: &str, + method: Method, + headers: Option, + body: Option>, + ) -> Result { + let mut retries = 0usize; + let mut delay = Duration::from_secs(1); + loop { + let result = self + .send_request(url, method.clone(), headers.clone(), body.clone()) + .await; + AUTOSCALER_METRICS.calls[&( + url.into(), + match result { + Ok(ref response) => response.status().as_u16(), + Err(ref err) => err + .status() + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + .as_u16(), + }, + )] + .inc(); + match result { + Ok(response) if response.status().is_success() => return Ok(response), + Ok(response) => { + tracing::error!("Received non OK http response {:?}", response.status()) + } + Err(err) => tracing::error!("Error while sending http request {:?}", err), + } + + if retries >= self.max_retries { + return Err(HttpError::RetryExhausted(format!( + "All {} http retires failed", + self.max_retries + ))); + } + retries += 1; + sleep(delay).await; + delay = delay.checked_mul(2).unwrap_or(Duration::MAX); + } + } + + async fn send_request( + &self, + url: &str, + method: Method, + headers: Option, + body: Option>, + ) -> Result { + let mut request = self.client.request(method, url); + + if let Some(headers) = headers { + request = request.headers(headers); + } + + if let Some(body) = body { + request = request.body(body); + } + + let request = request.build()?; + let response = self.client.execute(request).await?; + Ok(response) + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs index b8476ab475ab..4730a0259e4c 100644 --- a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs @@ -13,9 +13,11 @@ use reqwest::{ Method, }; use tokio::sync::Mutex; -use zksync_utils::http_with_retries::send_request_with_retries; -use crate::cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent}; +use crate::{ + cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent}, + http_client::HttpClient, +}; #[derive(Clone)] pub struct Watcher { @@ -23,11 +25,13 @@ pub struct Watcher { pub cluster: Arc>, } -async fn get_cluster_name() -> anyhow::Result { +async fn get_cluster_name(http_client: HttpClient) -> anyhow::Result { let mut headers = HeaderMap::new(); headers.insert("Metadata-Flavor", HeaderValue::from_static("Google")); let url = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name"; - let response = send_request_with_retries(url, 5, Method::GET, Some(headers), None).await; + let response = http_client + .send_request_with_retries(url, Method::GET, Some(headers), None) + .await; response .map_err(|err| anyhow::anyhow!("Failed fetching response from url: {url}: {err:?}"))? .text() @@ -37,6 +41,7 @@ async fn get_cluster_name() -> anyhow::Result { impl Watcher { pub async fn new( + http_client: HttpClient, client: kube::Client, cluster_name: Option, namespaces: Vec, @@ -48,7 +53,7 @@ impl Watcher { let cluster_name = match cluster_name { Some(c) => c, - None => get_cluster_name() + None => get_cluster_name(http_client) .await .expect("Load cluster_name from GCP"), }; diff --git a/prover/crates/bin/prover_autoscaler/src/lib.rs b/prover/crates/bin/prover_autoscaler/src/lib.rs index 019fe2b7fb4d..1861f3af10da 100644 --- a/prover/crates/bin/prover_autoscaler/src/lib.rs +++ b/prover/crates/bin/prover_autoscaler/src/lib.rs @@ -2,6 +2,7 @@ pub mod agent; pub(crate) mod cluster_types; pub mod config; pub mod global; +pub mod http_client; pub mod k8s; pub(crate) mod metrics; pub mod task_wiring; diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index 98ffdb49d824..3baf3d13b2d6 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -10,6 +10,7 @@ use zksync_prover_autoscaler::{ agent, config::{config_from_yaml, ProverAutoscalerConfig}, global::{self}, + http_client::HttpClient, k8s::{Scaler, Watcher}, task_wiring::TaskRunner, }; @@ -74,6 +75,8 @@ async fn main() -> anyhow::Result<()> { let mut tasks = vec![]; + let http_client = HttpClient::default(); + match opt.job { AutoscalerType::Agent => { tracing::info!("Starting ProverAutoscaler Agent"); @@ -84,8 +87,13 @@ async fn main() -> anyhow::Result<()> { let _ = rustls::crypto::ring::default_provider().install_default(); let client = kube::Client::try_default().await?; - let watcher = - Watcher::new(client.clone(), opt.cluster_name, agent_config.namespaces).await; + let watcher = Watcher::new( + http_client, + client.clone(), + opt.cluster_name, + agent_config.namespaces, + ) + .await; let scaler = Scaler::new(client, agent_config.dry_run); tasks.push(tokio::spawn(watcher.clone().run())); tasks.push(tokio::spawn(agent::run_server( @@ -101,9 +109,15 @@ async fn main() -> anyhow::Result<()> { let interval = scaler_config.scaler_run_interval; let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); - let watcher = - global::watcher::Watcher::new(scaler_config.agents.clone(), scaler_config.dry_run); - let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone()); + let watcher = global::watcher::Watcher::new( + http_client.clone(), + scaler_config.agents.clone(), + scaler_config.dry_run, + ); + let queuer = global::queuer::Queuer::new( + http_client, + scaler_config.prover_job_monitor_url.clone(), + ); let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config); tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?); } diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index 115ae3b74259..775f7ec22abd 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -2,8 +2,6 @@ use vise::{Counter, Gauge, LabeledFamily, Metrics}; use crate::config::Gpu; -pub const DEFAULT_ERROR_CODE: u16 = 500; - #[derive(Debug, Metrics)] #[metrics(prefix = "autoscaler")] pub(crate) struct AutoscalerMetrics {