diff --git a/client/src/jrpc.rs b/client/src/jrpc.rs index 1184d03..420753c 100644 --- a/client/src/jrpc.rs +++ b/client/src/jrpc.rs @@ -359,6 +359,7 @@ pub struct JrpcConnection { client: reqwest::Client, was_dead: Arc, stats: Arc>>, + params: Arc, } impl PartialEq for JrpcConnection { @@ -401,12 +402,17 @@ impl std::fmt::Display for JrpcConnection { #[async_trait::async_trait] impl Connection for JrpcConnection { - fn new(endpoint: String, client: reqwest::Client) -> Self { + fn new( + endpoint: String, + client: reqwest::Client, + reliability_params: ReliabilityParams, + ) -> Self { JrpcConnection { endpoint: Arc::new(endpoint), client, was_dead: Arc::new(AtomicBool::new(false)), stats: Arc::new(Default::default()), + params: Arc::new(reliability_params), } } @@ -430,6 +436,10 @@ impl Connection for JrpcConnection { &self.client } + fn get_reliability_params(&self) -> &ReliabilityParams { + &self.params + } + async fn is_alive_inner(&self) -> LiveCheckResult { let request: RpcRequest<_> = RpcRequest::JRPC(JrpcRequest { method: "getTimings", @@ -461,10 +471,14 @@ impl Connection for JrpcConnection { #[cfg(feature = "simd")] let timings: Result = simd_json::serde::from_owned_value(v); - + let params = self.get_reliability_params(); if let Ok(t) = timings { let t = Timings::from(t); - let is_reliable = t.is_reliable(); + let is_reliable = t.is_reliable( + params.mc_acceptable_time_diff_sec, + params.sc_acceptable_time_diff_sec, + params.acceptable_blocks_diff_sec, + ); if !is_reliable { let Timings { last_mc_block_seqno, diff --git a/client/src/lib.rs b/client/src/lib.rs index 57df57b..df2e23c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -313,7 +313,7 @@ where let state = Arc::new(State { endpoints: endpoints .into_iter() - .map(|e| T::new(e.to_string(), client.clone())) + .map(|e| T::new(e.to_string(), client.clone(), options.reliability_params)) .collect(), live_endpoints: Default::default(), options: options.clone(), @@ -560,13 +560,17 @@ where #[async_trait::async_trait] pub trait Connection: Send + Sync { - fn new(endpoint: String, client: reqwest::Client) -> Self; + fn new( + endpoint: String, + client: reqwest::Client, + reliability_params: ReliabilityParams, + ) -> Self; fn update_was_dead(&self, is_dead: bool); async fn is_alive(&self) -> bool { let check_result = self.is_alive_inner().await; - let is_alive = check_result.as_bool(); + let is_alive = check_result.as_bool(self.get_reliability_params()); self.update_was_dead(!is_alive); match check_result { @@ -586,6 +590,8 @@ pub trait Connection: Send + Sync { fn get_client(&self) -> &reqwest::Client; + fn get_reliability_params(&self) -> &ReliabilityParams; + async fn is_alive_inner(&self) -> LiveCheckResult; async fn method_is_supported(&self, method: &str) -> Result; @@ -612,6 +618,13 @@ pub trait Connection: Send + Sync { } } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ReliabilityParams { + pub mc_acceptable_time_diff_sec: u64, + pub sc_acceptable_time_diff_sec: u64, + pub acceptable_blocks_diff_sec: u32, +} + pub enum RpcRequest<'a, T> { JRPC(JrpcRequest<'a, T>), PROTO(rpc::Request), @@ -710,6 +723,8 @@ pub struct ClientOptions { pub aggressive_poll_interval: Duration, pub choose_strategy: ChooseStrategy, + + pub reliability_params: ReliabilityParams, } impl Default for ClientOptions { @@ -719,6 +734,11 @@ impl Default for ClientOptions { request_timeout: Duration::from_secs(3), aggressive_poll_interval: Duration::from_secs(1), choose_strategy: ChooseStrategy::Random, + reliability_params: ReliabilityParams { + mc_acceptable_time_diff_sec: 120, + sc_acceptable_time_diff_sec: 120, + acceptable_blocks_diff_sec: 500, + }, } } } @@ -762,9 +782,13 @@ pub enum LiveCheckResult { } impl LiveCheckResult { - fn as_bool(&self) -> bool { + fn as_bool(&self, reliability_params: &ReliabilityParams) -> bool { match self { - LiveCheckResult::Live(metrics) => metrics.is_reliable(), + LiveCheckResult::Live(metrics) => metrics.is_reliable( + reliability_params.mc_acceptable_time_diff_sec, + reliability_params.sc_acceptable_time_diff_sec, + reliability_params.acceptable_blocks_diff_sec, + ), LiveCheckResult::Dummy => true, LiveCheckResult::Dead => false, } diff --git a/client/src/proto.rs b/client/src/proto.rs index 4b6fc20..102b52b 100644 --- a/client/src/proto.rs +++ b/client/src/proto.rs @@ -419,6 +419,7 @@ pub struct ProtoConnection { client: reqwest::Client, was_dead: Arc, stats: Arc>>, + reliability_params: Arc, } impl PartialEq for ProtoConnection { @@ -461,12 +462,17 @@ impl std::fmt::Display for ProtoConnection { #[async_trait::async_trait] impl Connection for ProtoConnection { - fn new(endpoint: String, client: reqwest::Client) -> Self { + fn new( + endpoint: String, + client: reqwest::Client, + reliability_params: ReliabilityParams, + ) -> Self { ProtoConnection { endpoint: Arc::new(endpoint), client, was_dead: Arc::new(AtomicBool::new(false)), stats: Arc::new(Default::default()), + reliability_params: Arc::new(reliability_params), } } @@ -490,6 +496,10 @@ impl Connection for ProtoConnection { &self.client } + fn get_reliability_params(&self) -> &ReliabilityParams { + &self.reliability_params + } + async fn is_alive_inner(&self) -> LiveCheckResult { let request: RpcRequest<()> = RpcRequest::PROTO(rpc::Request { call: Some(rpc::request::Call::GetTimings(())), @@ -516,7 +526,12 @@ impl Connection for ProtoConnection { ProtoAnswer::Result(result) => { if let Some(rpc::response::Result::GetTimings(t)) = result.result { let t = Timings::from(t); - let is_reliable = t.is_reliable(); + let params = self.get_reliability_params(); + let is_reliable = t.is_reliable( + params.mc_acceptable_time_diff_sec, + params.sc_acceptable_time_diff_sec, + params.acceptable_blocks_diff_sec, + ); if !is_reliable { let Timings { last_mc_block_seqno, diff --git a/models/src/lib.rs b/models/src/lib.rs index e24bfd5..a1cdd53 100644 --- a/models/src/lib.rs +++ b/models/src/lib.rs @@ -1,16 +1,8 @@ use std::time::SystemTime; -use nekoton_utils::*; - pub mod jrpc; pub mod proto; -pub const MC_ACCEPTABLE_TIME_DIFF: u64 = 120; -pub const SC_ACCEPTABLE_TIME_DIFF: u64 = 120; -const ACCEPTABLE_BLOCKS_DIFF: u32 = 500; - -const ACCEPTABLE_NODE_BLOCK_INSERT_TIME: u64 = 240; - pub fn now() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -29,19 +21,21 @@ pub struct Timings { } impl Timings { - pub fn is_reliable(&self) -> bool { + pub fn is_reliable( + &self, + mc_acceptable_time_diff: u64, + sc_acceptable_time_diff: u64, + acceptable_blocks_diff: u32, + ) -> bool { // just booted up if self == &Self::default() { return false; } - let acceptable_time = (now_sec_u64() - ACCEPTABLE_NODE_BLOCK_INSERT_TIME) as u32; - - self.mc_time_diff.unsigned_abs() < MC_ACCEPTABLE_TIME_DIFF - && self.shard_client_time_diff.unsigned_abs() < SC_ACCEPTABLE_TIME_DIFF + self.mc_time_diff.unsigned_abs() < mc_acceptable_time_diff + && self.shard_client_time_diff.unsigned_abs() < sc_acceptable_time_diff && self.last_mc_block_seqno - self.last_shard_client_mc_block_seqno - < ACCEPTABLE_BLOCKS_DIFF - && self.last_mc_utime > acceptable_time + < acceptable_blocks_diff } pub fn has_state_for(&self, time: u32) -> bool {