Skip to content

Commit

Permalink
feat(transport): use getBlockchainConfig for jrpc and proto
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed May 31, 2024
1 parent 9b4d401 commit 6e81724
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 40 deletions.
3 changes: 2 additions & 1 deletion nekoton-proto/src/protos/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ message Response {

message GetBlockchainConfig {
int32 global_id = 1;
bytes config = 2;
bytes config = 2;
uint32 seqno = 3;
}

message GetAccountsByCodeHash {
Expand Down
2 changes: 2 additions & 0 deletions nekoton-proto/src/protos/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ pub mod response {
pub global_id: i32,
#[prost(bytes = "bytes", tag = "2")]
pub config: ::prost::bytes::Bytes,
#[prost(uint32, tag = "3")]
pub seqno: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
22 changes: 18 additions & 4 deletions src/transport/gql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use anyhow::{Context, Result};
use serde::Deserialize;
use ton_block::{Account, Deserializable, Message, MsgAddressInt, Serializable};

Expand All @@ -14,7 +14,7 @@ use crate::external::{GqlConnection, GqlRequest};

use self::queries::*;
use super::models::*;
use super::utils::ConfigCache;
use super::utils::{ConfigCache, ConfigResponse};
use super::{Transport, TransportInfo};

mod queries;
Expand Down Expand Up @@ -173,6 +173,20 @@ impl GqlTransport {

Ok(block_id)
}

async fn fetch_config(&self) -> Result<ConfigResponse> {
let block = self.get_latest_key_block().await?;
let seqno = block.info.read_struct()?.seq_no();
let extra = block.read_extra()?;
let master = extra.read_custom()?.context("invalid key block")?;
let config = master.config().context("invalid key block")?.clone();

Ok(ConfigResponse {
global_id: block.global_id,
seqno,
config,
})
}
}

#[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)]
Expand Down Expand Up @@ -356,7 +370,7 @@ impl Transport for GqlTransport {
async fn get_capabilities(&self, clock: &dyn Clock) -> Result<NetworkCapabilities> {
let (capabilities, _) = self
.config_cache
.get_blockchain_config(self, clock, false)
.get_blockchain_config(clock, false, || self.fetch_config())
.await?;
Ok(capabilities)
}
Expand All @@ -368,7 +382,7 @@ impl Transport for GqlTransport {
) -> Result<ton_executor::BlockchainConfig> {
let (_, config) = self
.config_cache
.get_blockchain_config(self, clock, force)
.get_blockchain_config(clock, force, || self.fetch_config())
.await?;
Ok(config)
}
Expand Down
20 changes: 18 additions & 2 deletions src/transport/jrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ impl JrpcTransport {
accounts_cache: AccountsCache::new(),
}
}

async fn fetch_config(&self) -> Result<ConfigResponse> {
let req = external::JrpcRequest {
data: make_jrpc_request("getBlockchainConfig", &()),
requires_db: true,
};
self.connection
.post(req)
.await
.map(|data| tiny_jsonrpc::parse_response(&data))?
.map(|block: GetBlockchainConfigResponse| ConfigResponse {
global_id: block.global_id,
seqno: block.seqno,
config: block.config,
})
}
}

#[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)]
Expand Down Expand Up @@ -198,7 +214,7 @@ impl Transport for JrpcTransport {
async fn get_capabilities(&self, clock: &dyn Clock) -> Result<NetworkCapabilities> {
let (capabilities, _) = self
.config_cache
.get_blockchain_config(self, clock, false)
.get_blockchain_config(clock, false, || self.fetch_config())
.await?;
Ok(capabilities)
}
Expand All @@ -210,7 +226,7 @@ impl Transport for JrpcTransport {
) -> Result<ton_executor::BlockchainConfig> {
let (_, config) = self
.config_cache
.get_blockchain_config(self, clock, force)
.get_blockchain_config(clock, force, || self.fetch_config())
.await?;
Ok(config)
}
Expand Down
9 changes: 9 additions & 0 deletions src/transport/jrpc/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ pub struct GetBlockResponse {
#[serde(with = "serde_ton_block")]
pub block: ton_block::Block,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetBlockchainConfigResponse {
pub global_id: i32,
pub seqno: u32,
#[serde(with = "serde_ton_block")]
pub config: ton_block::ConfigParams,
}
30 changes: 28 additions & 2 deletions src/transport/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ impl ProtoTransport {
accounts_cache: AccountsCache::new(),
}
}

async fn fetch_config(&self) -> Result<ConfigResponse> {
let data = rpc::Request {
call: Some(rpc::request::Call::GetBlockchainConfig(())),
};

let req = external::ProtoRequest {
data: data.encode_to_vec(),
requires_db: true,
};

let data = self.connection.post(req).await?;
let response = rpc::Response::decode(Bytes::from(data))?;

match response.result {
Some(rpc::response::Result::GetBlockchainConfig(res)) => {
let params = ton_block::ConfigParams::construct_from_bytes(res.config.as_ref())?;
Ok(ConfigResponse {
global_id: res.global_id,
seqno: res.seqno,
config: params,
})
}
_ => Err(ProtoClientError::InvalidResponse.into()),
}
}
}

#[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)]
Expand Down Expand Up @@ -289,7 +315,7 @@ impl Transport for ProtoTransport {
async fn get_capabilities(&self, clock: &dyn Clock) -> Result<NetworkCapabilities> {
let (capabilities, _) = self
.config_cache
.get_blockchain_config(self, clock, false)
.get_blockchain_config(clock, false, || self.fetch_config())
.await?;
Ok(capabilities)
}
Expand All @@ -301,7 +327,7 @@ impl Transport for ProtoTransport {
) -> Result<ton_executor::BlockchainConfig> {
let (_, config) = self
.config_cache
.get_blockchain_config(self, clock, force)
.get_blockchain_config(clock, force, || self.fetch_config())
.await?;
Ok(config)
}
Expand Down
58 changes: 27 additions & 31 deletions src/transport/utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::sync::Arc;

use anyhow::Result;
use futures_util::Future;
use quick_cache::sync::Cache as QuickCache;
use tokio::sync::Mutex;

use nekoton_utils::*;

use super::models::RawContractState;
use super::Transport;
use crate::core::models::NetworkCapabilities;

#[allow(unused)]
Expand Down Expand Up @@ -79,19 +79,23 @@ impl ConfigCache {
}
}

pub async fn get_blockchain_config(
pub async fn get_blockchain_config<F, Fut>(
&self,
transport: &dyn Transport,
clock: &dyn Clock,
force: bool,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)> {
f: F,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<ConfigResponse>>,
{
let mut cache = self.state.lock().await;

let now = clock.now_sec_u64() as u32;

Ok(match &*cache {
None => {
let (capabilities, config, key_block_seqno) = fetch_config(transport).await?;
let (capabilities, config, key_block_seqno) = fetch_config(f).await?;
let phase = compute_next_phase(now, &config, None, key_block_seqno)?;
*cache = Some(ConfigCacheState {
capabilities,
Expand All @@ -102,7 +106,7 @@ impl ConfigCache {
(capabilities, config)
}
Some(a) if force && !self.use_default_config || cache_expired(now, a.phase) => {
let (capabilities, config, key_block_seqno) = fetch_config(transport).await?;
let (capabilities, config, key_block_seqno) = fetch_config(f).await?;
let phase = compute_next_phase(
now,
&config,
Expand All @@ -122,36 +126,30 @@ impl ConfigCache {
}
}

async fn fetch_config(
transport: &dyn Transport,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)> {
let block = transport.get_latest_key_block().await?;
async fn fetch_config<F, Fut>(
f: F,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<ConfigResponse>>,
{
let res = f().await?;

let info = block.info.read_struct()?;

let extra = block
.read_extra()
.map_err(|_| QueryConfigError::InvalidBlock)?;

let master = extra
.read_custom()
.map_err(|_| QueryConfigError::InvalidBlock)?
.ok_or(QueryConfigError::InvalidBlock)?;

let params = master
.config()
.ok_or(QueryConfigError::InvalidBlock)?
.clone();

let config = ton_executor::BlockchainConfig::with_config(params, block.global_id)
let config = ton_executor::BlockchainConfig::with_config(res.config, res.global_id)
.map_err(|_| QueryConfigError::InvalidConfig)?;

let capabilities = NetworkCapabilities {
global_id: block.global_id,
global_id: res.global_id,
raw: config.capabilites(),
};

Ok((capabilities, config, info.seq_no()))
Ok((capabilities, config, res.seqno))
}

pub struct ConfigResponse {
pub global_id: i32,
pub seqno: u32,
pub config: ton_block::ConfigParams,
}

fn compute_next_phase(
Expand Down Expand Up @@ -203,8 +201,6 @@ enum ConfigCachePhase {

#[derive(thiserror::Error, Debug)]
enum QueryConfigError {
#[error("Invalid block")]
InvalidBlock,
#[error("Invalid config")]
InvalidConfig,
}

0 comments on commit 6e81724

Please sign in to comment.