From 681d7a4b032c4b42a915f8d0f75994ecc23af0be Mon Sep 17 00:00:00 2001 From: thetheveloper Date: Sat, 15 Jun 2024 14:00:08 +0200 Subject: [PATCH] feat: implement JsonRpc transport for batch requests --- examples/jsonrpc_batch.rs | 35 ++++++++++ starknet-providers/src/any.rs | 44 ++++++++++++- starknet-providers/src/jsonrpc/mod.rs | 66 +++++++++++++++++++ .../src/jsonrpc/transports/http.rs | 42 ++++++++++++ .../src/jsonrpc/transports/mod.rs | 9 +++ starknet-providers/src/provider.rs | 18 +++++ starknet-providers/src/sequencer/provider.rs | 23 +++++++ 7 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 examples/jsonrpc_batch.rs diff --git a/examples/jsonrpc_batch.rs b/examples/jsonrpc_batch.rs new file mode 100644 index 00000000..c1b17ea4 --- /dev/null +++ b/examples/jsonrpc_batch.rs @@ -0,0 +1,35 @@ +use starknet::{ + core::types::{BlockId, BlockTag}, + providers::{ + jsonrpc::{HttpTransport, JsonRpcClient}, + Provider, Url, + }, +}; +use starknet_core::types::requests::{GetBlockWithTxHashesRequestRef, GetBlockWithTxsRequestRef}; +use starknet_providers::jsonrpc::{JsonRpcMethod, JsonRpcRequestParams}; + +#[tokio::main] +async fn main() { + let provider = JsonRpcClient::new(HttpTransport::new( + Url::parse("https://starknet-sepolia.public.blastapi.io/rpc/v0_7").unwrap(), + )); + + let batch_mixed_results = provider.batch_requests(vec![(JsonRpcMethod::GetBlockWithTxHashes, JsonRpcRequestParams::GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef { + block_id: BlockId::Tag(BlockTag::Latest).as_ref(), + })), (JsonRpcMethod::GetBlockWithTxs, JsonRpcRequestParams::GetBlockWithTxs(GetBlockWithTxsRequestRef { + block_id: BlockId::Tag(BlockTag::Latest).as_ref(), + }))]).await; + + match batch_mixed_results { + Ok(v) => println!("{v:#?}"), + Err(e) => println!("Error: {e:#?}"), + } + + let batched_blocks = provider.get_block_with_tx_hashes_batch(vec![BlockId::Tag(BlockTag::Latest), BlockId::Tag(BlockTag::Latest)]).await; + + match batched_blocks { + Ok(v) => println!("{v:#?}"), + Err(e) => println!("Error: {e:#?}"), + } + +} diff --git a/starknet-providers/src/any.rs b/starknet-providers/src/any.rs index beff9c47..7af0d2d5 100644 --- a/starknet-providers/src/any.rs +++ b/starknet-providers/src/any.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use serde::Serialize; use starknet_core::types::{ BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction, BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction, @@ -11,7 +12,7 @@ use starknet_core::types::{ }; use crate::{ - jsonrpc::{HttpTransport, JsonRpcClient}, + jsonrpc::{HttpTransport, JsonRpcClient, JsonRpcMethod}, Provider, ProviderError, SequencerGatewayProvider, }; @@ -663,4 +664,45 @@ impl Provider for AnyProvider { } } } + + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + { + match self { + Self::JsonRpcHttp(inner) => { + as Provider>::batch_requests(inner, requests).await + } + Self::SequencerGateway(inner) => { + ::batch_requests(inner, requests).await + } + } + } + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync, + { + match self { + Self::JsonRpcHttp(inner) => { + as Provider>::get_block_with_tx_hashes_batch( + inner, block_ids, + ) + .await + } + Self::SequencerGateway(inner) => { + ::get_block_with_tx_hashes_batch( + inner, block_ids, + ) + .await + } + } + } + } diff --git a/starknet-providers/src/jsonrpc/mod.rs b/starknet-providers/src/jsonrpc/mod.rs index be9b9571..eeea4f18 100644 --- a/starknet-providers/src/jsonrpc/mod.rs +++ b/starknet-providers/src/jsonrpc/mod.rs @@ -91,6 +91,13 @@ pub enum JsonRpcMethod { TraceBlockTransactions, } +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] +pub enum JsonRpcRequestParams<'a> { + GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef<'a>), + GetBlockWithTxs(GetBlockWithTxsRequestRef<'a>), +} + #[derive(Debug, Clone)] pub struct JsonRpcRequest { pub id: u64, @@ -98,6 +105,11 @@ pub struct JsonRpcRequest { } #[derive(Debug, Clone)] +pub struct JsonRpcRequests { + pub requests: Vec, +} + +#[derive(Debug, Clone, Serialize)] pub enum JsonRpcRequestData { SpecVersion(SpecVersionRequest), GetBlockWithTxHashes(GetBlockWithTxHashesRequest), @@ -204,6 +216,31 @@ where } } } + async fn send_requests(&self, requests: I) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + R: DeserializeOwned, + { + let responses = self + .transport + .send_requests(requests) + .await + .map_err(JsonRpcClientError::TransportError)?; + + responses + .into_iter() + .map(|response| match response { + JsonRpcResponse::Success { result, .. } => Ok(result), + JsonRpcResponse::Error { error, .. } => { + Err(match TryInto::::try_into(&error) { + Ok(error) => ProviderError::StarknetError(error), + Err(_) => JsonRpcClientError::::JsonRpcError(error).into(), + }) + } + }) + .collect() + } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -218,6 +255,35 @@ where .await } + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + { + self.send_requests(requests).await + } + + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync, + { + let requests = block_ids.iter().map(|b_id| { + ( + JsonRpcMethod::GetBlockWithTxHashes, + GetBlockWithTxHashesRequestRef { + block_id: b_id.as_ref(), + }, + ) + }); + self.send_requests(requests).await + } + /// Get block information with transaction hashes given the block id async fn get_block_with_tx_hashes( &self, diff --git a/starknet-providers/src/jsonrpc/transports/http.rs b/starknet-providers/src/jsonrpc/transports/http.rs index f50bb46a..154ea4b6 100644 --- a/starknet-providers/src/jsonrpc/transports/http.rs +++ b/starknet-providers/src/jsonrpc/transports/http.rs @@ -101,4 +101,46 @@ impl JsonRpcTransport for HttpTransport { Ok(parsed_response) } + + async fn send_requests( + &self, + requests: I, + ) -> Result>, Self::Error> + where + I: IntoIterator + Send, + P: Serialize + Send, + R: DeserializeOwned, + { + let batch_requests: Vec<_> = requests + .into_iter() + .enumerate() + .map(|(id, (method, params))| JsonRpcRequest { + id: id as u64 + 1, + jsonrpc: "2.0", + method, + params, + }) + .collect(); + + let serialized_batch = serde_json::to_string(&batch_requests).map_err(Self::Error::Json)?; + + let mut request = self + .client + .post(self.url.clone()) + .body(serialized_batch) + .header("Content-Type", "application/json"); + + for (name, value) in self.headers.iter() { + request = request.header(name, value); + } + + let response = request.send().await.map_err(Self::Error::Reqwest)?; + + let response_body = response.text().await.map_err(Self::Error::Reqwest)?; + trace!("Response from JSON-RPC: {}", response_body); + + let parsed_response = serde_json::from_str(&response_body).map_err(Self::Error::Json)?; + + Ok(parsed_response) + } } diff --git a/starknet-providers/src/jsonrpc/transports/mod.rs b/starknet-providers/src/jsonrpc/transports/mod.rs index 7b119f74..57e1ebc1 100644 --- a/starknet-providers/src/jsonrpc/transports/mod.rs +++ b/starknet-providers/src/jsonrpc/transports/mod.rs @@ -22,4 +22,13 @@ pub trait JsonRpcTransport { where P: Serialize + Send + Sync, R: DeserializeOwned; + + async fn send_requests( + &self, + requests: I, + ) -> Result>, Self::Error> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + R: DeserializeOwned; } diff --git a/starknet-providers/src/provider.rs b/starknet-providers/src/provider.rs index 1d0588da..efb2daf4 100644 --- a/starknet-providers/src/provider.rs +++ b/starknet-providers/src/provider.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use auto_impl::auto_impl; +use serde::Serialize; use starknet_core::types::{ BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction, BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction, @@ -12,6 +13,8 @@ use starknet_core::types::{ }; use std::{any::Any, error::Error, fmt::Debug}; +use crate::jsonrpc::JsonRpcMethod; + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[auto_impl(&, Box, Arc)] @@ -298,6 +301,21 @@ pub trait Provider { Err(ProviderError::ArrayLengthMismatch) } } + + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync; + + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync; } /// Trait for implementation-specific error type. These errors are irrelevant in most cases, diff --git a/starknet-providers/src/sequencer/provider.rs b/starknet-providers/src/sequencer/provider.rs index 313cda9c..fadb46c9 100644 --- a/starknet-providers/src/sequencer/provider.rs +++ b/starknet-providers/src/sequencer/provider.rs @@ -3,6 +3,7 @@ use std::any::Any; use async_trait::async_trait; +use serde::Serialize; use starknet_core::types::{ BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction, BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction, @@ -15,6 +16,7 @@ use starknet_core::types::{ }; use crate::{ + jsonrpc::JsonRpcMethod, provider::ProviderImplError, sequencer::{models::conversions::ConversionError, GatewayClientError}, Provider, ProviderError, SequencerGatewayProvider, @@ -30,6 +32,27 @@ impl Provider for SequencerGatewayProvider { Ok(String::from("0.7.1")) } + async fn batch_requests( + &self, + requests: I, + ) -> Result, ProviderError> + where + I: IntoIterator + Send + Sync, + P: Serialize + Send + Sync, + { + Ok(self.batch_requests(requests).await?) + } + + async fn get_block_with_tx_hashes_batch( + &self, + block_ids: Vec, + ) -> Result, ProviderError> + where + B: AsRef + Send + Sync, + { + Ok(self.get_block_with_tx_hashes_batch(block_ids).await?) + } + async fn get_block_with_tx_hashes( &self, block_id: B,