diff --git a/nekoton-proto/src/protos/rpc.proto b/nekoton-proto/src/protos/rpc.proto index ee75dfeb2..4b64ef298 100644 --- a/nekoton-proto/src/protos/rpc.proto +++ b/nekoton-proto/src/protos/rpc.proto @@ -83,7 +83,8 @@ message Response { message GetBlockchainConfig { int32 global_id = 1; - bytes config = 2; + bytes config = 2; + uint32 seqno = 3; } message GetAccountsByCodeHash { diff --git a/nekoton-proto/src/protos/rpc.rs b/nekoton-proto/src/protos/rpc.rs index 1851b1e0d..bcf308c06 100644 --- a/nekoton-proto/src/protos/rpc.rs +++ b/nekoton-proto/src/protos/rpc.rs @@ -141,6 +141,8 @@ pub mod response { pub global_id: i32, #[prost(bytes = "bytes", tag = "2")] pub config: ::prost::bytes::Bytes, + #[prost(uint32, tag = "3")] + pub seqno: u32, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/nekoton-transport/src/jrpc.rs b/nekoton-transport/src/jrpc.rs index 5d089a445..ea3b853de 100644 --- a/nekoton-transport/src/jrpc.rs +++ b/nekoton-transport/src/jrpc.rs @@ -20,6 +20,7 @@ impl JrpcClient { ); let client = reqwest::ClientBuilder::new() + .http2_prior_knowledge() .default_headers(headers) .build() .context("failed to build http client")?; diff --git a/src/transport/gql/mod.rs b/src/transport/gql/mod.rs index 17790133f..d849d802e 100644 --- a/src/transport/gql/mod.rs +++ b/src/transport/gql/mod.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::Result; +use anyhow::{Context, Result}; use serde::Deserialize; use ton_block::{Account, Deserializable, Message, MsgAddressInt, Serializable}; @@ -14,7 +14,7 @@ use crate::external::{GqlConnection, GqlRequest}; use self::queries::*; use super::models::*; -use super::utils::ConfigCache; +use super::utils::{ConfigCache, ConfigResponse}; use super::{Transport, TransportInfo}; mod queries; @@ -173,6 +173,20 @@ impl GqlTransport { Ok(block_id) } + + async fn fetch_config(&self) -> Result { + let block = self.get_latest_key_block().await?; + let seqno = block.info.read_struct()?.seq_no(); + let extra = block.read_extra()?; + let master = extra.read_custom()?.context("invalid key block")?; + let config = master.config().context("invalid key block")?.clone(); + + Ok(ConfigResponse { + global_id: block.global_id, + seqno, + config, + }) + } } #[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)] @@ -356,7 +370,7 @@ impl Transport for GqlTransport { async fn get_capabilities(&self, clock: &dyn Clock) -> Result { let (capabilities, _) = self .config_cache - .get_blockchain_config(self, clock, false) + .get_blockchain_config(clock, false, || self.fetch_config()) .await?; Ok(capabilities) } @@ -368,7 +382,7 @@ impl Transport for GqlTransport { ) -> Result { let (_, config) = self .config_cache - .get_blockchain_config(self, clock, force) + .get_blockchain_config(clock, force, || self.fetch_config()) .await?; Ok(config) } diff --git a/src/transport/jrpc/mod.rs b/src/transport/jrpc/mod.rs index 1e86bce5a..c152d82cd 100644 --- a/src/transport/jrpc/mod.rs +++ b/src/transport/jrpc/mod.rs @@ -31,6 +31,22 @@ impl JrpcTransport { accounts_cache: AccountsCache::new(), } } + + async fn fetch_config(&self) -> Result { + let req = external::JrpcRequest { + data: make_jrpc_request("getBlockchainConfig", &()), + requires_db: true, + }; + self.connection + .post(req) + .await + .map(|data| tiny_jsonrpc::parse_response(&data))? + .map(|block: GetBlockchainConfigResponse| ConfigResponse { + global_id: block.global_id, + seqno: block.seqno, + config: block.config, + }) + } } #[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)] @@ -198,7 +214,7 @@ impl Transport for JrpcTransport { async fn get_capabilities(&self, clock: &dyn Clock) -> Result { let (capabilities, _) = self .config_cache - .get_blockchain_config(self, clock, false) + .get_blockchain_config(clock, false, || self.fetch_config()) .await?; Ok(capabilities) } @@ -210,7 +226,7 @@ impl Transport for JrpcTransport { ) -> Result { let (_, config) = self .config_cache - .get_blockchain_config(self, clock, force) + .get_blockchain_config(clock, force, || self.fetch_config()) .await?; Ok(config) } diff --git a/src/transport/jrpc/models.rs b/src/transport/jrpc/models.rs index a6b05d758..0d176880b 100644 --- a/src/transport/jrpc/models.rs +++ b/src/transport/jrpc/models.rs @@ -60,3 +60,13 @@ pub struct GetBlockResponse { #[serde(with = "serde_ton_block")] pub block: ton_block::Block, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetBlockchainConfigResponse { + pub global_id: i32, + #[serde(default)] + pub seqno: u32, + #[serde(with = "serde_ton_block")] + pub config: ton_block::ConfigParams, +} diff --git a/src/transport/proto/mod.rs b/src/transport/proto/mod.rs index c38d3ca86..a4494292c 100644 --- a/src/transport/proto/mod.rs +++ b/src/transport/proto/mod.rs @@ -31,6 +31,32 @@ impl ProtoTransport { accounts_cache: AccountsCache::new(), } } + + async fn fetch_config(&self) -> Result { + let data = rpc::Request { + call: Some(rpc::request::Call::GetBlockchainConfig(())), + }; + + let req = external::ProtoRequest { + data: data.encode_to_vec(), + requires_db: true, + }; + + let data = self.connection.post(req).await?; + let response = rpc::Response::decode(Bytes::from(data))?; + + match response.result { + Some(rpc::response::Result::GetBlockchainConfig(res)) => { + let params = ton_block::ConfigParams::construct_from_bytes(res.config.as_ref())?; + Ok(ConfigResponse { + global_id: res.global_id, + seqno: res.seqno, + config: params, + }) + } + _ => Err(ProtoClientError::InvalidResponse.into()), + } + } } #[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)] @@ -289,7 +315,7 @@ impl Transport for ProtoTransport { async fn get_capabilities(&self, clock: &dyn Clock) -> Result { let (capabilities, _) = self .config_cache - .get_blockchain_config(self, clock, false) + .get_blockchain_config(clock, false, || self.fetch_config()) .await?; Ok(capabilities) } @@ -301,7 +327,7 @@ impl Transport for ProtoTransport { ) -> Result { let (_, config) = self .config_cache - .get_blockchain_config(self, clock, force) + .get_blockchain_config(clock, force, || self.fetch_config()) .await?; Ok(config) } diff --git a/src/transport/utils.rs b/src/transport/utils.rs index f6f3fd1f4..1d0762fdb 100644 --- a/src/transport/utils.rs +++ b/src/transport/utils.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use anyhow::Result; +use futures_util::Future; use quick_cache::sync::Cache as QuickCache; use tokio::sync::Mutex; use nekoton_utils::*; use super::models::RawContractState; -use super::Transport; use crate::core::models::NetworkCapabilities; #[allow(unused)] @@ -56,13 +56,18 @@ impl AccountsCache { pub struct ConfigCache { use_default_config: bool, + min_cache_for: Option, state: Mutex>, } impl ConfigCache { pub fn new(use_default_config: bool) -> Self { + // TODO: Move to params or connection config + const MIN_CACHE_FOR: u32 = 60; + Self { use_default_config, + min_cache_for: Some(MIN_CACHE_FOR), state: Mutex::new(if use_default_config { Some(ConfigCacheState { capabilities: NetworkCapabilities { @@ -71,6 +76,7 @@ impl ConfigCache { }, config: ton_executor::BlockchainConfig::default(), last_key_block_seqno: 0, + updated_at: 0, phase: ConfigCachePhase::WainingNextValidatorsSet { deadline: u32::MAX }, }) } else { @@ -79,30 +85,35 @@ impl ConfigCache { } } - pub async fn get_blockchain_config( + pub async fn get_blockchain_config( &self, - transport: &dyn Transport, clock: &dyn Clock, force: bool, - ) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)> { + f: F, + ) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)> + where + F: FnOnce() -> Fut, + Fut: Future>, + { let mut cache = self.state.lock().await; let now = clock.now_sec_u64() as u32; Ok(match &*cache { None => { - let (capabilities, config, key_block_seqno) = fetch_config(transport).await?; + let (capabilities, config, key_block_seqno) = fetch_config(f).await?; let phase = compute_next_phase(now, &config, None, key_block_seqno)?; *cache = Some(ConfigCacheState { capabilities, config: config.clone(), last_key_block_seqno: key_block_seqno, + updated_at: now, phase, }); (capabilities, config) } - Some(a) if force && !self.use_default_config || cache_expired(now, a.phase) => { - let (capabilities, config, key_block_seqno) = fetch_config(transport).await?; + Some(a) if force && !self.use_default_config || self.cache_expired(now, a) => { + let (capabilities, config, key_block_seqno) = fetch_config(f).await?; let phase = compute_next_phase( now, &config, @@ -113,6 +124,7 @@ impl ConfigCache { capabilities, config: config.clone(), last_key_block_seqno: key_block_seqno, + updated_at: now, phase, }); (capabilities, config) @@ -120,38 +132,46 @@ impl ConfigCache { Some(a) => (a.capabilities, a.config.clone()), }) } -} - -async fn fetch_config( - transport: &dyn Transport, -) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)> { - let block = transport.get_latest_key_block().await?; - - let info = block.info.read_struct()?; - let extra = block - .read_extra() - .map_err(|_| QueryConfigError::InvalidBlock)?; + fn cache_expired(&self, now: u32, state: &ConfigCacheState) -> bool { + if let Some(min_cache_for) = self.min_cache_for { + if now <= state.updated_at.saturating_add(min_cache_for) { + return false; + } + } - let master = extra - .read_custom() - .map_err(|_| QueryConfigError::InvalidBlock)? - .ok_or(QueryConfigError::InvalidBlock)?; + match state.phase { + ConfigCachePhase::WaitingKeyBlock => true, + ConfigCachePhase::WaitingElectionsEnd { deadline } + | ConfigCachePhase::WainingNextValidatorsSet { deadline } => now > deadline, + } + } +} - let params = master - .config() - .ok_or(QueryConfigError::InvalidBlock)? - .clone(); +async fn fetch_config( + f: F, +) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)> +where + F: FnOnce() -> Fut, + Fut: Future>, +{ + let res = f().await?; - let config = ton_executor::BlockchainConfig::with_config(params, block.global_id) + let config = ton_executor::BlockchainConfig::with_config(res.config, res.global_id) .map_err(|_| QueryConfigError::InvalidConfig)?; let capabilities = NetworkCapabilities { - global_id: block.global_id, + global_id: res.global_id, raw: config.capabilites(), }; - Ok((capabilities, config, info.seq_no())) + Ok((capabilities, config, res.seqno)) +} + +pub struct ConfigResponse { + pub global_id: i32, + pub seqno: u32, + pub config: ton_block::ConfigParams, } fn compute_next_phase( @@ -179,18 +199,11 @@ fn compute_next_phase( } } -fn cache_expired(now: u32, phase: ConfigCachePhase) -> bool { - match phase { - ConfigCachePhase::WaitingKeyBlock => true, - ConfigCachePhase::WaitingElectionsEnd { deadline } - | ConfigCachePhase::WainingNextValidatorsSet { deadline } => now > deadline, - } -} - struct ConfigCacheState { capabilities: NetworkCapabilities, config: ton_executor::BlockchainConfig, last_key_block_seqno: u32, + updated_at: u32, phase: ConfigCachePhase, } @@ -203,8 +216,6 @@ enum ConfigCachePhase { #[derive(thiserror::Error, Debug)] enum QueryConfigError { - #[error("Invalid block")] - InvalidBlock, #[error("Invalid config")] InvalidConfig, }