Skip to content

Commit

Permalink
feat(mempool_infra): add configuration to set number of retries of se…
Browse files Browse the repository at this point in the history
…nding requests
  • Loading branch information
uriel-starkware committed Aug 6, 2024
1 parent dcb4ce3 commit bfc88ee
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ where
{
uri: Uri,
client: Client<hyper::client::HttpConnector>,
max_retries: usize,
_req: PhantomData<Request>,
_res: PhantomData<Response>,
}
Expand All @@ -27,7 +28,7 @@ where
Request: Serialize,
Response: for<'a> Deserialize<'a>,
{
pub fn new(ip_address: IpAddr, port: u16) -> Self {
pub fn new(ip_address: IpAddr, port: u16, max_retries: usize) -> Self {
let uri = match ip_address {
IpAddr::V4(ip_address) => format!("http://{}:{}/", ip_address, port).parse().unwrap(),
IpAddr::V6(ip_address) => format!("http://[{}]:{}/", ip_address, port).parse().unwrap(),
Expand All @@ -36,18 +37,35 @@ where
// TODO(Tsabary): Add a configuration for "keep-alive" time of idle connections.
let client =
Client::builder().http2_only(true).pool_max_idle_per_host(usize::MAX).build_http();
Self { uri, client, _req: PhantomData, _res: PhantomData }
Self { uri, client, max_retries, _req: PhantomData, _res: PhantomData }
}

pub async fn send(&self, component_request: Request) -> ClientResult<Response> {
let http_request = HyperRequest::post(self.uri.clone())
// Construct and request, and send it up to 'max_retries' times. Return if received a
// successful response.
for _ in 0..self.max_retries {
let http_request = self.construct_http_request(&component_request);
let res = self.try_send(http_request).await;
if res.is_ok() {
return res;
}
}
// Construct and send the request, return the received respone regardless whether it
// successful or not.
let http_request = self.construct_http_request(&component_request);
self.try_send(http_request).await
}

fn construct_http_request(&self, component_request: &Request) -> HyperRequest<Body> {
HyperRequest::post(self.uri.clone())
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(Body::from(
serialize(&component_request).expect("Request serialization should succeed"),
serialize(component_request).expect("Request serialization should succeed"),
))
.expect("Request building should succeed");
.expect("Request building should succeed")
}

// Todo(uriel): Add configuration for controlling the number of retries.
async fn try_send(&self, http_request: HyperRequest<Body>) -> ClientResult<Response> {
let http_response = self
.client
.request(http_request)
Expand Down Expand Up @@ -85,6 +103,7 @@ where
Self {
uri: self.uri.clone(),
client: self.client.clone(),
max_retries: self.max_retries,
_req: PhantomData,
_res: PhantomData,
}
Expand Down
71 changes: 65 additions & 6 deletions crates/mempool_infra/tests/component_server_client_http_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod common;

use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;

use async_trait::async_trait;
use bincode::{deserialize, serialize};
Expand All @@ -13,6 +14,7 @@ use common::{
ComponentBResponse,
ResultA,
ResultB,
ValueA,
};
use hyper::body::to_bytes;
use hyper::header::CONTENT_TYPE;
Expand All @@ -27,6 +29,7 @@ use starknet_mempool_infra::component_definitions::{
APPLICATION_OCTET_STREAM,
};
use starknet_mempool_infra::component_server::{ComponentServerStarter, RemoteComponentServer};
use tokio::sync::Mutex;
use tokio::task;

type ComponentAClient = RemoteComponentClient<ComponentARequest, ComponentAResponse>;
Expand All @@ -35,19 +38,22 @@ type ComponentBClient = RemoteComponentClient<ComponentBRequest, ComponentBRespo
use crate::common::{test_a_b_functionality, ComponentA, ComponentB, ValueB};

const LOCAL_IP: IpAddr = IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
const MAX_RETRIES: usize = 0;
const A_PORT_TEST_SETUP: u16 = 10000;
const B_PORT_TEST_SETUP: u16 = 10001;
const A_PORT_FAULTY_CLIENT: u16 = 10010;
const B_PORT_FAULTY_CLIENT: u16 = 10011;
const UNCONNECTED_SERVER_PORT: u16 = 10002;
const FAULTY_SERVER_REQ_DESER_PORT: u16 = 10003;
const FAULTY_SERVER_RES_DESER_PORT: u16 = 10004;
const RETRY_REQ_PORT: u16 = 10005;
const MOCK_SERVER_ERROR: &str = "mock server error";
const ARBITRARY_DATA: &str = "arbitrary data";
// ServerError::RequestDeserializationFailure error message.
const DESERIALIZE_REQ_ERROR_MESSAGE: &str = "Could not deserialize client request";
// ClientError::ResponseDeserializationFailure error message.
const DESERIALIZE_RES_ERROR_MESSAGE: &str = "Could not deserialize server response";
const VALID_VALUE_A: ValueA = 1;

#[async_trait]
impl ComponentAClientTrait for RemoteComponentClient<ComponentARequest, ComponentAResponse> {
Expand Down Expand Up @@ -146,12 +152,12 @@ where
// Ensure the server starts running.
task::yield_now().await;

ComponentAClient::new(LOCAL_IP, port)
ComponentAClient::new(LOCAL_IP, port, MAX_RETRIES)
}

async fn setup_for_tests(setup_value: ValueB, a_port: u16, b_port: u16) {
let a_client = ComponentAClient::new(LOCAL_IP, a_port);
let b_client = ComponentBClient::new(LOCAL_IP, b_port);
let a_client = ComponentAClient::new(LOCAL_IP, a_port, MAX_RETRIES);
let b_client = ComponentBClient::new(LOCAL_IP, b_port, MAX_RETRIES);

let component_a = ComponentA::new(Box::new(b_client));
let component_b = ComponentB::new(setup_value, Box::new(a_client.clone()));
Expand Down Expand Up @@ -183,8 +189,8 @@ async fn setup_for_tests(setup_value: ValueB, a_port: u16, b_port: u16) {
async fn test_proper_setup() {
let setup_value: ValueB = 90;
setup_for_tests(setup_value, A_PORT_TEST_SETUP, B_PORT_TEST_SETUP).await;
let a_client = ComponentAClient::new(LOCAL_IP, A_PORT_TEST_SETUP);
let b_client = ComponentBClient::new(LOCAL_IP, B_PORT_TEST_SETUP);
let a_client = ComponentAClient::new(LOCAL_IP, A_PORT_TEST_SETUP, MAX_RETRIES);
let b_client = ComponentBClient::new(LOCAL_IP, B_PORT_TEST_SETUP, MAX_RETRIES);
test_a_b_functionality(a_client, b_client, setup_value.into()).await;
}

Expand Down Expand Up @@ -222,7 +228,7 @@ async fn test_faulty_client_setup() {

#[tokio::test]
async fn test_unconnected_server() {
let client = ComponentAClient::new(LOCAL_IP, UNCONNECTED_SERVER_PORT);
let client = ComponentAClient::new(LOCAL_IP, UNCONNECTED_SERVER_PORT, MAX_RETRIES);

let expected_error_contained_keywords = ["Connection refused"];
verify_error(client, &expected_error_contained_keywords).await;
Expand All @@ -247,3 +253,56 @@ async fn test_faulty_server(
) {
verify_error(client, expected_error_contained_keywords).await;
}

#[tokio::test]
async fn test_retry_request() {
// Spawn a server that responses with OK every other request.
task::spawn(async move {
let should_send_ok = Arc::new(Mutex::new(false));
async fn handler(
_http_request: Request<Body>,
should_send_ok: Arc<Mutex<bool>>,
) -> Result<Response<Body>, hyper::Error> {
let mut should_send_ok = should_send_ok.lock().await;
let body = ComponentAResponse::AGetValue(VALID_VALUE_A);
let ret = if *should_send_ok {
Response::builder()
.status(StatusCode::OK)
.body(Body::from(serialize(&body).unwrap()))
.unwrap()
} else {
Response::builder()
.status(StatusCode::IM_A_TEAPOT)
.body(Body::from(serialize(&body).unwrap()))
.unwrap()
};
*should_send_ok = !*should_send_ok;

Ok(ret)
}

let socket = SocketAddr::new(LOCAL_IP, RETRY_REQ_PORT);
let make_svc = make_service_fn(|_conn| {
let is_response_ok = should_send_ok.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| handler(req, is_response_ok.clone())))
}
});

Server::bind(&socket).serve(make_svc).await.unwrap();
});
// Todo(uriel): Get rid of this
// Ensure the server starts running.
task::yield_now().await;

// The initial server state is 'false', hence the first attempt returns an error and
// sets the server state to 'true'. The second attempt (first retry) therefore returns a
// 'success', while setting the server state to 'false' yet again.
let a_client_retry = ComponentAClient::new(LOCAL_IP, RETRY_REQ_PORT, 1);
assert_eq!(a_client_retry.a_get_value().await.unwrap(), VALID_VALUE_A);

// The current server state is 'false', hence the first and only attempt returns an error.
let a_client_no_retry = ComponentAClient::new(LOCAL_IP, RETRY_REQ_PORT, 0);
let expected_error_contained_keywords = [DESERIALIZE_RES_ERROR_MESSAGE];
verify_error(a_client_no_retry.clone(), &expected_error_contained_keywords).await;
}

0 comments on commit bfc88ee

Please sign in to comment.