From bfc88ee310eeb2fcabb464cbb084e86472454512 Mon Sep 17 00:00:00 2001 From: Uriel Korach Date: Thu, 1 Aug 2024 10:10:31 +0300 Subject: [PATCH] feat(mempool_infra): add configuration to set number of retries of sending requests --- .../remote_component_client.rs | 31 ++++++-- .../component_server_client_http_test.rs | 71 +++++++++++++++++-- 2 files changed, 90 insertions(+), 12 deletions(-) diff --git a/crates/mempool_infra/src/component_client/remote_component_client.rs b/crates/mempool_infra/src/component_client/remote_component_client.rs index fee8b03a20..d901fd6d99 100644 --- a/crates/mempool_infra/src/component_client/remote_component_client.rs +++ b/crates/mempool_infra/src/component_client/remote_component_client.rs @@ -18,6 +18,7 @@ where { uri: Uri, client: Client, + max_retries: usize, _req: PhantomData, _res: PhantomData, } @@ -27,7 +28,7 @@ where Request: Serialize, Response: for<'a> Deserialize<'a>, { - pub fn new(ip_address: IpAddr, port: u16) -> Self { + pub fn new(ip_address: IpAddr, port: u16, max_retries: usize) -> Self { let uri = match ip_address { IpAddr::V4(ip_address) => format!("http://{}:{}/", ip_address, port).parse().unwrap(), IpAddr::V6(ip_address) => format!("http://[{}]:{}/", ip_address, port).parse().unwrap(), @@ -36,18 +37,35 @@ where // TODO(Tsabary): Add a configuration for "keep-alive" time of idle connections. let client = Client::builder().http2_only(true).pool_max_idle_per_host(usize::MAX).build_http(); - Self { uri, client, _req: PhantomData, _res: PhantomData } + Self { uri, client, max_retries, _req: PhantomData, _res: PhantomData } } pub async fn send(&self, component_request: Request) -> ClientResult { - let http_request = HyperRequest::post(self.uri.clone()) + // Construct and request, and send it up to 'max_retries' times. Return if received a + // successful response. + for _ in 0..self.max_retries { + let http_request = self.construct_http_request(&component_request); + let res = self.try_send(http_request).await; + if res.is_ok() { + return res; + } + } + // Construct and send the request, return the received respone regardless whether it + // successful or not. + let http_request = self.construct_http_request(&component_request); + self.try_send(http_request).await + } + + fn construct_http_request(&self, component_request: &Request) -> HyperRequest { + HyperRequest::post(self.uri.clone()) .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM) .body(Body::from( - serialize(&component_request).expect("Request serialization should succeed"), + serialize(component_request).expect("Request serialization should succeed"), )) - .expect("Request building should succeed"); + .expect("Request building should succeed") + } - // Todo(uriel): Add configuration for controlling the number of retries. + async fn try_send(&self, http_request: HyperRequest) -> ClientResult { let http_response = self .client .request(http_request) @@ -85,6 +103,7 @@ where Self { uri: self.uri.clone(), client: self.client.clone(), + max_retries: self.max_retries, _req: PhantomData, _res: PhantomData, } diff --git a/crates/mempool_infra/tests/component_server_client_http_test.rs b/crates/mempool_infra/tests/component_server_client_http_test.rs index bb2f322a19..d94a0a74bf 100644 --- a/crates/mempool_infra/tests/component_server_client_http_test.rs +++ b/crates/mempool_infra/tests/component_server_client_http_test.rs @@ -1,6 +1,7 @@ mod common; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use std::sync::Arc; use async_trait::async_trait; use bincode::{deserialize, serialize}; @@ -13,6 +14,7 @@ use common::{ ComponentBResponse, ResultA, ResultB, + ValueA, }; use hyper::body::to_bytes; use hyper::header::CONTENT_TYPE; @@ -27,6 +29,7 @@ use starknet_mempool_infra::component_definitions::{ APPLICATION_OCTET_STREAM, }; use starknet_mempool_infra::component_server::{ComponentServerStarter, RemoteComponentServer}; +use tokio::sync::Mutex; use tokio::task; type ComponentAClient = RemoteComponentClient; @@ -35,6 +38,7 @@ type ComponentBClient = RemoteComponentClient { @@ -146,12 +152,12 @@ where // Ensure the server starts running. task::yield_now().await; - ComponentAClient::new(LOCAL_IP, port) + ComponentAClient::new(LOCAL_IP, port, MAX_RETRIES) } async fn setup_for_tests(setup_value: ValueB, a_port: u16, b_port: u16) { - let a_client = ComponentAClient::new(LOCAL_IP, a_port); - let b_client = ComponentBClient::new(LOCAL_IP, b_port); + let a_client = ComponentAClient::new(LOCAL_IP, a_port, MAX_RETRIES); + let b_client = ComponentBClient::new(LOCAL_IP, b_port, MAX_RETRIES); let component_a = ComponentA::new(Box::new(b_client)); let component_b = ComponentB::new(setup_value, Box::new(a_client.clone())); @@ -183,8 +189,8 @@ async fn setup_for_tests(setup_value: ValueB, a_port: u16, b_port: u16) { async fn test_proper_setup() { let setup_value: ValueB = 90; setup_for_tests(setup_value, A_PORT_TEST_SETUP, B_PORT_TEST_SETUP).await; - let a_client = ComponentAClient::new(LOCAL_IP, A_PORT_TEST_SETUP); - let b_client = ComponentBClient::new(LOCAL_IP, B_PORT_TEST_SETUP); + let a_client = ComponentAClient::new(LOCAL_IP, A_PORT_TEST_SETUP, MAX_RETRIES); + let b_client = ComponentBClient::new(LOCAL_IP, B_PORT_TEST_SETUP, MAX_RETRIES); test_a_b_functionality(a_client, b_client, setup_value.into()).await; } @@ -222,7 +228,7 @@ async fn test_faulty_client_setup() { #[tokio::test] async fn test_unconnected_server() { - let client = ComponentAClient::new(LOCAL_IP, UNCONNECTED_SERVER_PORT); + let client = ComponentAClient::new(LOCAL_IP, UNCONNECTED_SERVER_PORT, MAX_RETRIES); let expected_error_contained_keywords = ["Connection refused"]; verify_error(client, &expected_error_contained_keywords).await; @@ -247,3 +253,56 @@ async fn test_faulty_server( ) { verify_error(client, expected_error_contained_keywords).await; } + +#[tokio::test] +async fn test_retry_request() { + // Spawn a server that responses with OK every other request. + task::spawn(async move { + let should_send_ok = Arc::new(Mutex::new(false)); + async fn handler( + _http_request: Request, + should_send_ok: Arc>, + ) -> Result, hyper::Error> { + let mut should_send_ok = should_send_ok.lock().await; + let body = ComponentAResponse::AGetValue(VALID_VALUE_A); + let ret = if *should_send_ok { + Response::builder() + .status(StatusCode::OK) + .body(Body::from(serialize(&body).unwrap())) + .unwrap() + } else { + Response::builder() + .status(StatusCode::IM_A_TEAPOT) + .body(Body::from(serialize(&body).unwrap())) + .unwrap() + }; + *should_send_ok = !*should_send_ok; + + Ok(ret) + } + + let socket = SocketAddr::new(LOCAL_IP, RETRY_REQ_PORT); + let make_svc = make_service_fn(|_conn| { + let is_response_ok = should_send_ok.clone(); + async move { + Ok::<_, hyper::Error>(service_fn(move |req| handler(req, is_response_ok.clone()))) + } + }); + + Server::bind(&socket).serve(make_svc).await.unwrap(); + }); + // Todo(uriel): Get rid of this + // Ensure the server starts running. + task::yield_now().await; + + // The initial server state is 'false', hence the first attempt returns an error and + // sets the server state to 'true'. The second attempt (first retry) therefore returns a + // 'success', while setting the server state to 'false' yet again. + let a_client_retry = ComponentAClient::new(LOCAL_IP, RETRY_REQ_PORT, 1); + assert_eq!(a_client_retry.a_get_value().await.unwrap(), VALID_VALUE_A); + + // The current server state is 'false', hence the first and only attempt returns an error. + let a_client_no_retry = ComponentAClient::new(LOCAL_IP, RETRY_REQ_PORT, 0); + let expected_error_contained_keywords = [DESERIALIZE_RES_ERROR_MESSAGE]; + verify_error(a_client_no_retry.clone(), &expected_error_contained_keywords).await; +}