From 6d954d4d413be42dd2a860f0a9b6caa142e362ad Mon Sep 17 00:00:00 2001 From: Joe Date: Wed, 23 Oct 2024 12:57:44 -0700 Subject: [PATCH] add rpc streaming from ironfish node --- crates/networking/src/lib.rs | 2 +- crates/networking/src/rpc_abi.rs | 3 - crates/networking/src/rpc_handler/handler.rs | 10 +- crates/networking/src/stream.rs | 104 +++++++++++++++---- crates/oreo_errors/src/lib.rs | 3 + 5 files changed, 94 insertions(+), 28 deletions(-) diff --git a/crates/networking/src/lib.rs b/crates/networking/src/lib.rs index f9aa6b3..e8c0eb9 100644 --- a/crates/networking/src/lib.rs +++ b/crates/networking/src/lib.rs @@ -4,8 +4,8 @@ pub mod rpc_abi; pub mod rpc_handler; pub mod server_handler; pub mod socket_message; -pub mod web_abi; pub mod stream; +pub mod web_abi; use db_handler::{DBTransaction, InnerBlock, Json}; use rpc_abi::RpcBlock; diff --git a/crates/networking/src/rpc_abi.rs b/crates/networking/src/rpc_abi.rs index feba465..5604850 100644 --- a/crates/networking/src/rpc_abi.rs +++ b/crates/networking/src/rpc_abi.rs @@ -16,9 +16,6 @@ pub struct RpcResponseStream { pub data: T } - - - impl IntoResponse for RpcResponse { fn into_response(self) -> axum::response::Response { Json(json!({"code": 200, "data": self.data})).into_response() diff --git a/crates/networking/src/rpc_handler/handler.rs b/crates/networking/src/rpc_handler/handler.rs index e92a6d0..505f2e7 100644 --- a/crates/networking/src/rpc_handler/handler.rs +++ b/crates/networking/src/rpc_handler/handler.rs @@ -8,7 +8,7 @@ use ureq::{Agent, AgentBuilder, Error, Response}; use crate::{ rpc_abi::{ - RpcBroadcastTxRequest, RpcBroadcastTxResponse, RpcCreateTxRequest, RpcCreateTxResponse, RpcExportAccountResponse, RpcGetAccountStatusRequest, RpcGetAccountStatusResponse, RpcGetAccountTransactionRequest, RpcGetAccountTransactionResponse, RpcGetBalancesRequest, RpcGetBalancesResponse, RpcGetBlockRequest, RpcGetBlockResponse, RpcGetBlocksRequest, RpcGetBlocksResponse, RpcGetLatestBlockResponse, RpcGetTransactionsRequest, RpcGetTransactionsResponse, RpcImportAccountRequest, RpcImportAccountResponse, RpcRemoveAccountRequest, RpcRemoveAccountResponse, RpcResetAccountRequest, RpcResponse, RpcSetAccountHeadRequest, RpcSetScanningRequest, SendTransactionRequest, SendTransactionResponse + RpcBroadcastTxRequest, RpcBroadcastTxResponse, RpcCreateTxRequest, RpcCreateTxResponse, RpcExportAccountResponse, RpcGetAccountStatusRequest, RpcGetAccountStatusResponse, RpcGetAccountTransactionRequest, RpcGetAccountTransactionResponse, RpcGetBalancesRequest, RpcGetBalancesResponse, RpcGetBlockRequest, RpcGetBlockResponse, RpcGetBlocksRequest, RpcGetBlocksResponse, RpcGetLatestBlockResponse, RpcGetTransactionsRequest, RpcGetTransactionsResponse, RpcImportAccountRequest, RpcImportAccountResponse, RpcRemoveAccountRequest, RpcRemoveAccountResponse, RpcResetAccountRequest, RpcResponse, RpcSetAccountHeadRequest, RpcSetScanningRequest, SendTransactionRequest, SendTransactionResponse, TransactionStatus }, rpc_handler::RpcError, stream::RequestExt, }; @@ -136,16 +136,20 @@ impl RpcHandler { ) -> Result, OreoError> { let path = format!("http://{}/wallet/getAccountTransactions", self.endpoint); let resp = self.agent.clone().post(&path).send_json(&request); + match resp { Ok(response) => { - let transactions = response.collect_stream(); + let transactions: Result, OreoError> = response + .into_stream::() + .collect(); + Ok(RpcResponse { status: 200, data: RpcGetTransactionsResponse { transactions: transactions?, }, }) - }, + } Err(e) => Err(OreoError::InternalRpcError(e.to_string())), } } diff --git a/crates/networking/src/stream.rs b/crates/networking/src/stream.rs index 9f4af38..b689391 100644 --- a/crates/networking/src/stream.rs +++ b/crates/networking/src/stream.rs @@ -1,33 +1,95 @@ -use std::io::Read; +use std::io::{BufRead, BufReader, Read}; +use std::marker::PhantomData; use oreo_errors::OreoError; use serde::de::DeserializeOwned; +use serde::Deserialize; use ureq::Response; use crate::rpc_abi::RpcResponseStream; -pub trait RequestExt { - fn collect_stream(self) -> Result, OreoError>; +/// Represents a stream reader that deserializes JSON objects from a reader. +pub struct StreamReader { + reader: BufReader>, + _marker: PhantomData, } -impl RequestExt for Response { - fn collect_stream(self) -> Result, OreoError> { - let reader = self.into_reader(); - let mut buffered = std::io::BufReader::new(reader); - let mut items = Vec::new(); - let mut response_str = String::new(); - buffered.read_to_string(&mut response_str).map_err(|e| OreoError::InternalRpcError(e.to_string()))?; - let lines = response_str.split('\x0c').collect::>(); - - // Get rid of status code - for line in lines[0..lines.len()-1].into_iter() { - let line = *line; // Dereference to get &str - if !line.trim().is_empty() { - let item: RpcResponseStream = serde_json::from_str(line) - .map_err(|e| OreoError::InternalRpcError(e.to_string()))?; - items.push(item.data); +#[derive(Deserialize)] +#[serde(untagged)] +enum ResponseItem { + Data(RpcResponseStream), + Status { status: u16 }, +} + +impl StreamReader { + /// Creates a new `StreamReader` from a boxed reader. + pub fn new(reader: Box) -> Self { + Self { + reader: BufReader::new(reader), + _marker: PhantomData, + } + } + + fn read_item(&mut self) -> Result>, OreoError> { + let mut item = Vec::new(); + loop { + let bytes_read = self.reader.read_until(b'\x0c', &mut item) + .map_err(|e| OreoError::RpcStreamError(e.to_string()))?; + if bytes_read == 0 { + break; + } + if item.last() == Some(&b'\x0c') { + item.pop(); + break; + } + } + match item.len() { + 0 => Ok(None), + _ => Ok(Some(item)) + } + } + + + /// Parses a chunk of data into a `ResponseItem`. + fn parse_item(&self, chunk: &[u8]) -> Result, OreoError> { + match serde_json::from_slice::>(chunk) { + Ok(ResponseItem::Data(item)) => Ok(Some(item.data)), + Ok(ResponseItem::Status { status: 200 }) => Ok(None), + Ok(ResponseItem::Status { status }) => Err(OreoError::RpcStreamError(format!("Received error status: {}", status))), + Err(e) => { + let err_str = format!("Failed to parse JSON object: {:?}", e); + Err(OreoError::RpcStreamError(err_str)) } } - Ok(items) } -} \ No newline at end of file +} + +impl Iterator for StreamReader +where + T: DeserializeOwned, +{ + type Item = Result; + + fn next(&mut self) -> Option { + match self.read_item() { + Ok(Some(chunk)) => match self.parse_item(&chunk) { + Ok(Some(data)) => Some(Ok(data)), + Ok(None) => None, // End of stream + Err(e) => Some(Err(e)), + }, + Ok(None) => None, // EOF reached + Err(e) => Some(Err(e)), + } + } +} + +pub trait RequestExt { + fn into_stream(self) -> StreamReader; +} + +impl RequestExt for Response { + fn into_stream(self) -> StreamReader { + let reader = self.into_reader(); + StreamReader::new(Box::new(reader)) + } +} diff --git a/crates/oreo_errors/src/lib.rs b/crates/oreo_errors/src/lib.rs index a25a649..fd03040 100644 --- a/crates/oreo_errors/src/lib.rs +++ b/crates/oreo_errors/src/lib.rs @@ -43,6 +43,8 @@ pub enum OreoError { AccountStatusError(String), #[error("Unauthorized")] Unauthorized, + #[error("RPC stream error")] + RpcStreamError(String), } impl IntoResponse for OreoError { @@ -73,6 +75,7 @@ impl IntoResponse for OreoError { OreoError::DServerError(_) => (StatusCode::from_u16(614).unwrap(), self.to_string()), OreoError::AccountStatusError(_) => (StatusCode::from_u16(615).unwrap(), self.to_string()), OreoError::Unauthorized => (StatusCode::UNAUTHORIZED, self.to_string()), + OreoError::RpcStreamError(_) => (StatusCode::from_u16(615).unwrap(), self.to_string()), }; Json(json!({"code": status_code.as_u16(), "error": err_msg})).into_response() }