From a2e0037dd3f742fc973e4bd5f18565ab0199aa65 Mon Sep 17 00:00:00 2001 From: clangenb <37865735+clangenb@users.noreply.github.com> Date: Sun, 27 Oct 2024 11:41:17 +0100 Subject: [PATCH] support json rpc 2.0 with `author_submitAndWatchExtrinsic` trusted calls (#1623) * add workaround to support json rpc 2.0 with `author_submitAndWatch` trusted call * update trusted_op_direct_request * fix obsolete imports in rpc_responder * make direct request no longer generic, it can only return the hash anyhow, we can easily introduce it later. * fix deserializing rpc response * [cli/trusted_operation] remove unnecessary nesting * fix clippy * extract await status update * [cli] fix correctly extract hash in benchmarks * fmt * add check that we can only us direct_calls in send direct request * improve logs * add comment about storing initial top hash * add todos for rpc refactor. * better comment * add more comments * fix handling of indirect calls * derive copy for trusted operation status * fix type inference in trusted transfer cmd * [cli] merge await status update from benchmark and send_direct_request into one function. * [cli] partially fix benchmarks * fix clippy * fix local setup for two workers * [cli] fix: always close client in trusted_operation and return if the status is invalid. --- cli/src/benchmark/mod.rs | 38 +-- cli/src/evm/commands/evm_call.rs | 12 +- cli/src/evm/commands/evm_create.rs | 9 +- cli/src/guess_the_number/commands/guess.rs | 9 +- .../commands/push_by_one_day.rs | 9 +- .../guess_the_number/commands/set_winnings.rs | 10 +- .../trusted_base_cli/commands/set_balance.rs | 10 +- cli/src/trusted_base_cli/commands/transfer.rs | 13 +- .../commands/unshield_funds.rs | 11 +- cli/src/trusted_operation.rs | 244 +++++++++--------- core-primitives/rpc/src/lib.rs | 21 ++ core-primitives/types/src/lib.rs | 2 +- core/direct-rpc-server/src/rpc_responder.rs | 45 +++- .../src/rpc_watch_extractor.rs | 20 +- local-setup/config/two-workers.json | 3 +- .../rpc-handler/src/direct_top_pool_api.rs | 12 +- 16 files changed, 291 insertions(+), 177 deletions(-) diff --git a/cli/src/benchmark/mod.rs b/cli/src/benchmark/mod.rs index d514f35a68..eb5aa559f6 100644 --- a/cli/src/benchmark/mod.rs +++ b/cli/src/benchmark/mod.rs @@ -20,7 +20,10 @@ use crate::{ get_layer_two_nonce, trusted_cli::TrustedCli, trusted_command_utils::{get_identifiers, get_keystore_path, get_pair_from_str}, - trusted_operation::{get_json_request, get_state, perform_trusted_operation, wait_until}, + trusted_operation::{ + await_status, await_subscription_response, get_json_request, get_state, + perform_trusted_operation, + }, Cli, CliResult, CliResultOk, SR25519_KEY_TYPE, }; use codec::Decode; @@ -35,7 +38,7 @@ use itp_stf_primitives::{ }; use itp_types::{ AccountInfo, Balance, ShardIdentifier, TrustedOperationStatus, - TrustedOperationStatus::{InSidechainBlock, Submitted}, + TrustedOperationStatus::InSidechainBlock, }; use log::*; use rand::Rng; @@ -47,7 +50,7 @@ use sp_keystore::Keystore; use std::{ boxed::Box, string::ToString, - sync::mpsc::{channel, Receiver}, + sync::mpsc::{channel, Receiver, Sender}, thread, time, time::Instant, vec::Vec, @@ -89,6 +92,7 @@ struct BenchmarkClient { account: sr25519_core::Pair, current_balance: u128, client_api: DirectClient, + sender: Sender, receiver: Receiver, } @@ -104,8 +108,8 @@ impl BenchmarkClient { debug!("setup sender and receiver"); let (sender, receiver) = channel(); - client_api.watch(initial_request, sender); - BenchmarkClient { account, current_balance: initial_balance, client_api, receiver } + client_api.watch(initial_request, sender.clone()); + BenchmarkClient { account, current_balance: initial_balance, client_api, sender, receiver } } } @@ -161,7 +165,7 @@ impl BenchmarkCommand { &mrenclave, &shard, ) - .into_trusted_operation(trusted_args.direct); + .into_trusted_operation(true); // For the last account we wait for confirmation in order to ensure all accounts were setup correctly let wait_for_confirmation = i == self.number_clients - 1; @@ -218,7 +222,8 @@ impl BenchmarkCommand { let last_iteration = i == self.number_iterations - 1; let jsonrpc_call = get_json_request(shard, &top, shielding_pubkey); - client.client_api.send(&jsonrpc_call).unwrap(); + + client.client_api.watch(jsonrpc_call, client.sender.clone()); let result = wait_for_top_confirmation( self.wait_for_confirmation || last_iteration, &client, @@ -337,15 +342,22 @@ fn wait_for_top_confirmation( ) -> BenchmarkTransaction { let started = Instant::now(); - let submitted = wait_until(&client.receiver, is_submitted); + // the first response of `submitAndWatch` is just the plain top hash + let submitted = match await_subscription_response(&client.receiver) { + Ok(hash) => Some((hash, Instant::now())), + Err(e) => { + error!("recv error: {e:?}"); + None + }, + }; let confirmed = if wait_for_sidechain_block { // We wait for the transaction hash that actually matches the submitted hash loop { - let transaction_information = wait_until(&client.receiver, is_sidechain_block); - if let Some((hash, _)) = transaction_information { + let transaction_information = await_status(&client.receiver, is_sidechain_block).ok(); + if let Some((hash, _status)) = transaction_information { if hash == submitted.unwrap().0 { - break transaction_information + break Some((hash, Instant::now())) } } } @@ -364,10 +376,6 @@ fn wait_for_top_confirmation( } } -fn is_submitted(s: TrustedOperationStatus) -> bool { - matches!(s, Submitted) -} - fn is_sidechain_block(s: TrustedOperationStatus) -> bool { matches!(s, InSidechainBlock(_)) } diff --git a/cli/src/evm/commands/evm_call.rs b/cli/src/evm/commands/evm_call.rs index 294b6a0331..bf72494185 100644 --- a/cli/src/evm/commands/evm_call.rs +++ b/cli/src/evm/commands/evm_call.rs @@ -19,7 +19,7 @@ use crate::{ get_layer_two_evm_nonce, get_layer_two_nonce, trusted_cli::TrustedCli, trusted_command_utils::{get_identifiers, get_pair_from_str}, - trusted_operation::perform_trusted_operation, + trusted_operation::{perform_trusted_operation, send_direct_request}, Cli, CliResult, CliResultOk, }; use ita_stf::{Index, TrustedCall, TrustedGetter}; @@ -31,6 +31,7 @@ use itp_types::AccountId; use log::*; use sp_core::{crypto::Ss58Codec, Pair, H160, U256}; use std::{boxed::Box, vec::Vec}; + #[derive(Parser)] pub struct EvmCallCommands { /// Sender's incognito AccountId in ss58check format, mnemonic or hex seed @@ -81,7 +82,12 @@ impl EvmCallCommands { ) .sign(&KeyPair::Sr25519(Box::new(sender)), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - Ok(perform_trusted_operation::<()>(cli, trusted_args, &function_call) - .map(|_| CliResultOk::None)?) + + if trusted_args.direct { + Ok(send_direct_request(cli, trusted_args, &function_call).map(|_| CliResultOk::None)?) + } else { + Ok(perform_trusted_operation::<()>(cli, trusted_args, &function_call) + .map(|_| CliResultOk::None)?) + } } } diff --git a/cli/src/evm/commands/evm_create.rs b/cli/src/evm/commands/evm_create.rs index d0e693548a..04be461cfe 100644 --- a/cli/src/evm/commands/evm_create.rs +++ b/cli/src/evm/commands/evm_create.rs @@ -19,7 +19,7 @@ use crate::{ get_layer_two_evm_nonce, get_layer_two_nonce, trusted_cli::TrustedCli, trusted_command_utils::{get_identifiers, get_pair_from_str}, - trusted_operation::perform_trusted_operation, + trusted_operation::{perform_trusted_operation, send_direct_request}, Cli, CliResult, CliResultOk, }; use ita_stf::{evm_helpers::evm_create_address, Index, TrustedCall, TrustedGetter}; @@ -33,6 +33,7 @@ use pallet_evm::{AddressMapping, HashedAddressMapping}; use sp_core::{crypto::Ss58Codec, Pair, H160, U256}; use sp_runtime::traits::BlakeTwo256; use std::vec::Vec; + #[derive(Parser)] pub struct EvmCreateCommands { /// Sender's incognito AccountId in ss58check format, mnemonic or hex seed @@ -79,7 +80,11 @@ impl EvmCreateCommands { .sign(&from.into(), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - perform_trusted_operation::<()>(cli, trusted_args, &top)?; + if trusted_args.direct { + send_direct_request(cli, trusted_args, &top).map(|_| CliResultOk::None)?; + } else { + perform_trusted_operation::<()>(cli, trusted_args, &top).map(|_| CliResultOk::None)?; + } let execution_address = evm_create_address(sender_evm_acc, evm_account_nonce); info!("trusted call evm_create executed"); diff --git a/cli/src/guess_the_number/commands/guess.rs b/cli/src/guess_the_number/commands/guess.rs index 932d1786e5..38ec145cee 100644 --- a/cli/src/guess_the_number/commands/guess.rs +++ b/cli/src/guess_the_number/commands/guess.rs @@ -23,6 +23,7 @@ use crate::{ Cli, CliResult, CliResultOk, }; +use crate::trusted_operation::send_direct_request; use ita_stf::{ guess_the_number::GuessTheNumberTrustedCall, Getter, Index, TrustedCall, TrustedCallSigned, }; @@ -55,6 +56,12 @@ impl GuessCommand { ) .sign(&KeyPair::Sr25519(Box::new(signer)), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - Ok(perform_trusted_operation::<()>(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + + if trusted_args.direct { + Ok(send_direct_request(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + } else { + Ok(perform_trusted_operation::<()>(cli, trusted_args, &top) + .map(|_| CliResultOk::None)?) + } } } diff --git a/cli/src/guess_the_number/commands/push_by_one_day.rs b/cli/src/guess_the_number/commands/push_by_one_day.rs index 1a1cfb2358..8b9d3377bf 100644 --- a/cli/src/guess_the_number/commands/push_by_one_day.rs +++ b/cli/src/guess_the_number/commands/push_by_one_day.rs @@ -23,6 +23,7 @@ use crate::{ Cli, CliResult, CliResultOk, }; +use crate::trusted_operation::send_direct_request; use ita_stf::{ guess_the_number::GuessTheNumberTrustedCall, Getter, Index, TrustedCall, TrustedCallSigned, }; @@ -53,6 +54,12 @@ impl PushByOneDayCommand { ) .sign(&KeyPair::Sr25519(Box::new(signer)), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - Ok(perform_trusted_operation::<()>(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + + if trusted_args.direct { + Ok(send_direct_request(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + } else { + Ok(perform_trusted_operation::<()>(cli, trusted_args, &top) + .map(|_| CliResultOk::None)?) + } } } diff --git a/cli/src/guess_the_number/commands/set_winnings.rs b/cli/src/guess_the_number/commands/set_winnings.rs index beab2475a2..f7b814d2e1 100644 --- a/cli/src/guess_the_number/commands/set_winnings.rs +++ b/cli/src/guess_the_number/commands/set_winnings.rs @@ -19,7 +19,7 @@ use crate::{ get_layer_two_nonce, trusted_cli::TrustedCli, trusted_command_utils::{get_identifiers, get_pair_from_str}, - trusted_operation::perform_trusted_operation, + trusted_operation::{perform_trusted_operation, send_direct_request}, Cli, CliResult, CliResultOk, }; use ita_parentchain_interface::integritee::Balance; @@ -59,6 +59,12 @@ impl SetWinningsCommand { ) .sign(&KeyPair::Sr25519(Box::new(signer)), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - Ok(perform_trusted_operation::<()>(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + + if trusted_args.direct { + Ok(send_direct_request(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + } else { + Ok(perform_trusted_operation::<()>(cli, trusted_args, &top) + .map(|_| CliResultOk::None)?) + } } } diff --git a/cli/src/trusted_base_cli/commands/set_balance.rs b/cli/src/trusted_base_cli/commands/set_balance.rs index 7d8dc1f193..74cd3c9f34 100644 --- a/cli/src/trusted_base_cli/commands/set_balance.rs +++ b/cli/src/trusted_base_cli/commands/set_balance.rs @@ -19,7 +19,7 @@ use crate::{ get_layer_two_nonce, trusted_cli::TrustedCli, trusted_command_utils::{get_identifiers, get_pair_from_str}, - trusted_operation::perform_trusted_operation, + trusted_operation::{perform_trusted_operation, send_direct_request}, Cli, CliResult, CliResultOk, }; use ita_parentchain_interface::integritee::Balance; @@ -59,6 +59,12 @@ impl SetBalanceCommand { ) .sign(&KeyPair::Sr25519(Box::new(signer)), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - Ok(perform_trusted_operation::<()>(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + + if trusted_args.direct { + Ok(send_direct_request(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + } else { + Ok(perform_trusted_operation::<()>(cli, trusted_args, &top) + .map(|_| CliResultOk::None)?) + } } } diff --git a/cli/src/trusted_base_cli/commands/transfer.rs b/cli/src/trusted_base_cli/commands/transfer.rs index 427d9054f6..b2e050adbb 100644 --- a/cli/src/trusted_base_cli/commands/transfer.rs +++ b/cli/src/trusted_base_cli/commands/transfer.rs @@ -19,7 +19,7 @@ use crate::{ get_layer_two_nonce, trusted_cli::TrustedCli, trusted_command_utils::{get_accountid_from_str, get_identifiers, get_pair_from_str}, - trusted_operation::perform_trusted_operation, + trusted_operation::{perform_trusted_operation, send_direct_request}, Cli, CliResult, CliResultOk, }; use base58::ToBase58; @@ -65,9 +65,12 @@ impl TransferCommand { TrustedCall::balance_transfer(from.public().into(), to, self.amount) .sign(&KeyPair::Sr25519(Box::new(from)), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - let res = - perform_trusted_operation::<()>(cli, trusted_args, &top).map(|_| CliResultOk::None)?; - info!("trusted call transfer executed"); - Ok(res) + + if trusted_args.direct { + Ok(send_direct_request(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + } else { + Ok(perform_trusted_operation::<()>(cli, trusted_args, &top) + .map(|_| CliResultOk::None)?) + } } } diff --git a/cli/src/trusted_base_cli/commands/unshield_funds.rs b/cli/src/trusted_base_cli/commands/unshield_funds.rs index 9d0e66d07b..fffe4737b8 100644 --- a/cli/src/trusted_base_cli/commands/unshield_funds.rs +++ b/cli/src/trusted_base_cli/commands/unshield_funds.rs @@ -19,7 +19,7 @@ use crate::{ get_layer_two_nonce, trusted_cli::TrustedCli, trusted_command_utils::{get_accountid_from_str, get_identifiers, get_pair_from_str}, - trusted_operation::perform_trusted_operation, + trusted_operation::{perform_trusted_operation, send_direct_request}, Cli, CliResult, CliResultOk, }; use ita_parentchain_interface::integritee::Balance; @@ -31,6 +31,7 @@ use itp_stf_primitives::{ use log::*; use sp_core::{crypto::Ss58Codec, Pair}; use std::boxed::Box; + #[derive(Parser)] pub struct UnshieldFundsCommand { /// Sender's incognito AccountId in ss58check format, mnemonic or hex seed @@ -63,6 +64,12 @@ impl UnshieldFundsCommand { TrustedCall::balance_unshield(from.public().into(), to, self.amount, shard) .sign(&KeyPair::Sr25519(Box::new(from)), nonce, &mrenclave, &shard) .into_trusted_operation(trusted_args.direct); - Ok(perform_trusted_operation::<()>(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + + if trusted_args.direct { + Ok(send_direct_request(cli, trusted_args, &top).map(|_| CliResultOk::None)?) + } else { + Ok(perform_trusted_operation::<()>(cli, trusted_args, &top) + .map(|_| CliResultOk::None)?) + } } } diff --git a/cli/src/trusted_operation.rs b/cli/src/trusted_operation.rs index 0ee160741c..321e8626d4 100644 --- a/cli/src/trusted_operation.rs +++ b/cli/src/trusted_operation.rs @@ -26,7 +26,7 @@ use enclave_bridge_primitives::Request; use ita_stf::{Getter, TrustedCallSigned}; use itc_rpc_client::direct_client::{DirectApi, DirectClient}; use itp_node_api::api_client::{ApiClientError, ENCLAVE_BRIDGE}; -use itp_rpc::{RpcRequest, RpcResponse, RpcReturnValue}; +use itp_rpc::{RpcRequest, RpcResponse, RpcReturnValue, RpcSubscriptionUpdate}; use itp_sgx_crypto::ShieldingCryptoEncrypt; use itp_stf_primitives::types::{ShardIdentifier, TrustedOperation}; use itp_types::{ @@ -42,7 +42,6 @@ use std::{ fmt::Debug, result::Result as StdResult, sync::mpsc::{channel, Receiver}, - time::Instant, }; use substrate_api_client::{ ac_compose_macros::compose_extrinsic, GetChainInfo, SubmitAndWatch, SubscribeEvents, XtStatus, @@ -72,6 +71,10 @@ pub(crate) enum TrustedOperationError { Default { msg: String }, } +pub(crate) fn into_default_trusted_op_err(err_msg: impl ToString) -> TrustedOperationError { + TrustedOperationError::Default { msg: err_msg.to_string() } +} + impl From for TrustedOperationError { fn from(error: ApiClientError) -> Self { Self::ApiClient(error) @@ -87,9 +90,12 @@ pub(crate) fn perform_trusted_operation( ) -> TrustedOpResult { match top { TrustedOperation::indirect_call(_) => send_indirect_request::(cli, trusted_args, top), - TrustedOperation::direct_call(_) => send_direct_request::(cli, trusted_args, top), TrustedOperation::get(getter) => execute_getter_from_cli_args::(cli, trusted_args, getter), + TrustedOperation::direct_call(_) => Err(TrustedOperationError::Default { + msg: "Invalid call to `perform_trusted_operation`, use `send_direct_request` directly" + .into(), + }), } } @@ -268,11 +274,16 @@ pub fn read_shard(trusted_args: &TrustedCli) -> StdResult( +pub(crate) fn send_direct_request( cli: &Cli, trusted_args: &TrustedCli, operation_call: &TrustedOperation, -) -> TrustedOpResult { +) -> TrustedOpResult { + if !matches!(operation_call, TrustedOperation::direct_call(_)) { + return Err(TrustedOperationError::Default { + msg: "can only use direct_calls in this function".into(), + }) + } let encryption_key = get_shielding_key(cli).unwrap(); let shard = read_shard(trusted_args).unwrap(); let jsonrpc_call: String = get_json_request(shard, operation_call, encryption_key); @@ -287,63 +298,117 @@ fn send_direct_request( let (sender, receiver) = channel(); direct_api.watch(jsonrpc_call, sender); - debug!("waiting for rpc response"); + // the first response of `submitAndWatch` is just the plain top hash + let top_hash = await_subscription_response(&receiver).map_err(|e| { + error!("Error getting subscription response: {:?}", e); + direct_api.close().unwrap(); + e + })?; + + debug!("subscribing to updates for top with hash: {top_hash:?}"); + + match await_status(&receiver, connection_can_be_closed) { + Ok((_hash, status)) => { + debug!("Trusted operation reached status {status:?}"); + direct_api.close().unwrap(); + Ok(top_hash) + }, + Err(e) => { + error!("Error submitting top: {e:?}"); + direct_api.close().unwrap(); + Err(e) + }, + } +} + +pub(crate) fn await_status( + receiver: &Receiver, + wait_until: impl Fn(TrustedOperationStatus) -> bool, +) -> TrustedOpResult<(Hash, TrustedOperationStatus)> { loop { - match receiver.recv() { - Ok(response) => { - debug!("received response"); - let response: RpcResponse = serde_json::from_str(&response).unwrap(); - if let Ok(return_value) = RpcReturnValue::from_hex(&response.result) { - debug!("successfully decoded rpc response: {:?}", return_value); - match return_value.status { - DirectRequestStatus::Error => { - debug!("request status is error"); - if let Ok(value) = String::decode(&mut return_value.value.as_slice()) { - error!("{}", value); - } - direct_api.close().unwrap(); - return Err(TrustedOperationError::Default { - msg: "[Error] DirectRequestStatus::Error".to_string(), - }) - }, - DirectRequestStatus::TrustedOperationStatus(status) => { - debug!("request status is: {:?}", status); - if let Ok(value) = Hash::decode(&mut return_value.value.as_slice()) { - println!("Trusted call {:?} is {:?}", value, status); - } - if connection_can_be_closed(status) { - direct_api.close().unwrap(); - let value = - decode_response_value(&mut return_value.value.as_slice())?; - return Ok(value) - } - }, - DirectRequestStatus::Ok => { - debug!("request status is ignored"); - direct_api.close().unwrap(); - let value = decode_response_value(&mut return_value.value.as_slice())?; - return Ok(value) - }, - } - if !return_value.do_watch { - debug!("do watch is false, closing connection"); - direct_api.close().unwrap(); - let value = decode_response_value(&mut return_value.value.as_slice())?; - return Ok(value) - } - }; + debug!("waiting for update"); + let (subscription_update, direct_request_status) = + await_status_update(receiver).map_err(|e| { + error!("Error getting status update: {:?}", e); + e + })?; + + debug!("successfully decoded request status: {:?}", direct_request_status); + match direct_request_status { + DirectRequestStatus::Error => { + let err = subscription_update.params.error.unwrap_or("{}".into()); + debug!("request status is error"); + return Err(into_default_trusted_op_err(format!( + "[Error] DirectRequestStatus::Error: {err}" + ))) + }, + DirectRequestStatus::TrustedOperationStatus(status) => { + debug!("request status is: {:?}", status); + let hash = + Hash::from_hex(&subscription_update.params.subscription).map_err(|e| { + into_default_trusted_op_err(format!("Invalid subscription top hash: {e:?}")) + })?; + + println!("Trusted call {:?} is {:?}", hash, status); + + if is_cancelled(status) { + debug!("trusted call has been cancelled"); + return Ok((hash, status)) + } + + if wait_until(status) { + return Ok((hash, status)) + } }, - Err(e) => { - error!("failed to receive rpc response: {:?}", e); - direct_api.close().unwrap(); - return Err(TrustedOperationError::Default { - msg: "failed to receive rpc response".to_string(), - }) + DirectRequestStatus::Ok => { + // Todo: #1625. When sending `author_submitAndWatchExtrinsic` this can never happen. + // our cli tries to do too much in one method, we should have better separation of + // concerns. + panic!("`DirectRequestStatus::Ok` should never occur with `author_submitAndWatchExtrinsic`.\ + This is a bug in the usage of the cli."); }, - }; + } } } +pub(crate) fn await_subscription_response(receiver: &Receiver) -> TrustedOpResult { + let response_string = receiver.recv().map_err(|e| { + into_default_trusted_op_err(format!("failed to receive rpc response: {e:?}")) + })?; + + let response: RpcResponse = serde_json::from_str(&response_string).map_err(|e| { + into_default_trusted_op_err(format!("Error deserializing RpcResponse: {e:?}")) + })?; + + let top_hash = Hash::from_hex(&response.result) + .map_err(|e| into_default_trusted_op_err(format!("Error decoding top hash: {e:?}")))?; + + Ok(top_hash) +} + +pub(crate) fn await_status_update( + receiver: &Receiver, +) -> TrustedOpResult<(RpcSubscriptionUpdate, DirectRequestStatus)> { + let response = receiver.recv().map_err(|e| { + into_default_trusted_op_err(format!("error receiving subscription update: {e:?}")) + })?; + debug!("received subscription update"); + + let subscription_update: RpcSubscriptionUpdate = + serde_json::from_str(&response).map_err(|e| { + into_default_trusted_op_err(format!("error deserializing subscription update: {e:?}")) + })?; + + trace!("successfully decoded subscription update: {:?}", subscription_update); + + let direct_request_status = DirectRequestStatus::from_hex(&subscription_update.params.result) + .map_err(|e| { + into_default_trusted_op_err(format!("Error decoding direct_request_status: {e:?}")) + })?; + + Ok((subscription_update, direct_request_status)) +} + fn decode_response_value( value: &mut I, ) -> StdResult { @@ -368,67 +433,12 @@ pub(crate) fn get_json_request( .unwrap() } -pub(crate) fn wait_until( - receiver: &Receiver, - until: impl Fn(TrustedOperationStatus) -> bool, -) -> Option<(H256, Instant)> { - debug!("waiting for rpc response"); - loop { - match receiver.recv() { - Ok(response) => { - debug!("received response: {}", response); - let parse_result: StdResult = serde_json::from_str(&response); - if let Ok(response) = parse_result { - if let Ok(return_value) = RpcReturnValue::from_hex(&response.result) { - debug!("successfully decoded rpc response: {:?}", return_value); - match return_value.status { - DirectRequestStatus::Error => { - debug!("request status is error"); - if let Ok(value) = - String::decode(&mut return_value.value.as_slice()) - { - error!("{}", value); - } - return None - }, - DirectRequestStatus::TrustedOperationStatus(status) => { - debug!("request status is: {:?}", status); - if let Ok(value) = Hash::decode(&mut return_value.value.as_slice()) - { - println!("Trusted call {:?} is {:?}", value, status); - if until(status.clone()) { - return Some((value, Instant::now())) - } else if status == TrustedOperationStatus::Invalid { - error!("Invalid request"); - return None - } - } - }, - DirectRequestStatus::Ok => { - debug!("request status is ignored"); - return None - }, - } - }; - } else { - error!("Could not parse response"); - }; - }, - Err(e) => { - error!("failed to receive rpc response: {:?}", e); - return None - }, - }; - } +fn connection_can_be_closed(top_status: TrustedOperationStatus) -> bool { + use TrustedOperationStatus::*; + !matches!(top_status, Submitted | Future | Ready | Broadcast) } -fn connection_can_be_closed(top_status: TrustedOperationStatus) -> bool { - !matches!( - top_status, - TrustedOperationStatus::Submitted - | TrustedOperationStatus::Future - | TrustedOperationStatus::Ready - | TrustedOperationStatus::Broadcast - | TrustedOperationStatus::Invalid - ) +fn is_cancelled(top_status: TrustedOperationStatus) -> bool { + use TrustedOperationStatus::*; + matches!(top_status, Invalid | Usurped | Dropped | Retracted) } diff --git a/core-primitives/rpc/src/lib.rs b/core-primitives/rpc/src/lib.rs index 06cd8e6737..aab1fed564 100644 --- a/core-primitives/rpc/src/lib.rs +++ b/core-primitives/rpc/src/lib.rs @@ -62,6 +62,27 @@ pub struct RpcResponse { pub id: Id, } +#[derive(Clone, Encode, Decode, Debug, Serialize, Deserialize)] +pub struct RpcSubscriptionUpdate { + pub jsonrpc: String, + pub id: Id, + pub method: String, + pub params: SubscriptionParams, +} + +#[derive(Clone, Encode, Decode, Debug, Serialize, Deserialize)] +pub struct SubscriptionParams { + pub error: Option, + pub result: String, + pub subscription: String, +} + +impl RpcSubscriptionUpdate { + pub fn new(method: String, params: SubscriptionParams) -> Self { + Self { jsonrpc: "2.0".to_owned(), id: Id::Number(1), method, params } + } +} + #[derive(Clone, Encode, Decode, Serialize, Deserialize)] pub struct RpcRequest { pub jsonrpc: String, diff --git a/core-primitives/types/src/lib.rs b/core-primitives/types/src/lib.rs index 08ff827db2..19563a7de5 100644 --- a/core-primitives/types/src/lib.rs +++ b/core-primitives/types/src/lib.rs @@ -82,7 +82,7 @@ pub enum DirectRequestStatus { Error, } -#[derive(Debug, Clone, PartialEq, Encode, Decode)] +#[derive(Debug, Clone, PartialEq, Encode, Decode, Copy)] pub enum TrustedOperationStatus { /// TrustedOperation is submitted to the top pool. Submitted, diff --git a/core/direct-rpc-server/src/rpc_responder.rs b/core/direct-rpc-server/src/rpc_responder.rs index bc288abf65..ed39f17586 100644 --- a/core/direct-rpc-server/src/rpc_responder.rs +++ b/core/direct-rpc-server/src/rpc_responder.rs @@ -19,10 +19,10 @@ use crate::{ response_channel::ResponseChannel, DirectRpcError, DirectRpcResult, RpcConnectionRegistry, RpcHash, SendRpcResponse, }; -use alloc::format; -use itp_rpc::{RpcResponse, RpcReturnValue}; +use alloc::string::ToString; +use itp_rpc::{RpcResponse, RpcReturnValue, RpcSubscriptionUpdate, SubscriptionParams}; use itp_types::{DirectRequestStatus, TrustedOperationStatus}; -use itp_utils::{FromHexPrefixed, ToHexPrefixed}; +use itp_utils::ToHexPrefixed; use log::*; use std::{sync::Arc, vec::Vec}; @@ -59,6 +59,17 @@ where self.response_channel.respond(connection, string_response).map_err(|e| e.into()) } + + fn encode_and_send_subscription_update( + &self, + connection: Registry::Connection, + rpc_response: &RpcSubscriptionUpdate, + ) -> DirectRpcResult<()> { + let string_response = + serde_json::to_string(&rpc_response).map_err(DirectRpcError::SerializationError)?; + + self.response_channel.respond(connection, string_response).map_err(|e| e.into()) + } } impl SendRpcResponse @@ -83,22 +94,28 @@ where .withdraw(&hash) .ok_or(DirectRpcError::InvalidConnectionHash)?; - let mut new_response = rpc_response.clone(); - - let mut result = RpcReturnValue::from_hex(&rpc_response.result) - .map_err(|e| DirectRpcError::Other(format!("{:?}", e).into()))?; - let do_watch = continue_watching(&status_update); - // update response - result.do_watch = do_watch; - result.status = DirectRequestStatus::TrustedOperationStatus(status_update); - new_response.result = result.to_hex(); + let sub = RpcSubscriptionUpdate::new( + "author_submitAndWatchExtrinsic".to_string(), + SubscriptionParams { + error: None, + result: DirectRequestStatus::TrustedOperationStatus(status_update).to_hex(), + subscription: hash.to_hex(), + }, + ); - self.encode_and_send_response(connection_token, &new_response)?; + self.encode_and_send_subscription_update(connection_token, &sub)?; if do_watch { - self.connection_registry.store(hash, connection_token, new_response); + // We just store back the initial response, which is the top hash. + // This was implemented before we added the `RpcSubscriptionUpdate`, + // which can't be stored due to type incompatibilities. + // This should probably be refactored in the future, see #1624. + // + // But for now this is fine, as we only use the connection token + // to track ongoing connections, the response is ignored. + self.connection_registry.store(hash, connection_token, rpc_response); } debug!("updating status event successful"); diff --git a/core/direct-rpc-server/src/rpc_watch_extractor.rs b/core/direct-rpc-server/src/rpc_watch_extractor.rs index a117a34b4a..8d688d1d18 100644 --- a/core/direct-rpc-server/src/rpc_watch_extractor.rs +++ b/core/direct-rpc-server/src/rpc_watch_extractor.rs @@ -21,6 +21,7 @@ use codec::Decode; use itp_rpc::{RpcResponse, RpcReturnValue}; use itp_types::DirectRequestStatus; use itp_utils::FromHexPrefixed; +use log::debug; use std::marker::PhantomData; pub struct RpcWatchExtractor @@ -55,8 +56,23 @@ where type Hash = Hash; fn must_be_watched(&self, rpc_response: &RpcResponse) -> DirectRpcResult> { - let rpc_return_value = RpcReturnValue::from_hex(&rpc_response.result) - .map_err(|e| DirectRpcError::Other(format!("{:?}", e).into()))?; + let rpc_return_value = match RpcReturnValue::from_hex(&rpc_response.result) { + Ok(return_value) => return_value, + Err(e) => { + // `author_submitAndWatchExtrinsic` does currently only return the top hash + // as the first subscription response in order to comply with JSON RPC 2.0. + // + // We support this for now with this hack here, but it should be properly + // refactored in #1624. + if let Ok(hash) = Self::Hash::from_hex(&rpc_response.result) { + // fixme: fix hack in #1624. + debug!("returning hash as connection token: {hash:?}"); + return Ok(Some(hash)) + } + + return Err(DirectRpcError::Other(format!("{:?}", e).into())) + }, + }; if !rpc_return_value.do_watch { return Ok(None) diff --git a/local-setup/config/two-workers.json b/local-setup/config/two-workers.json index 07b9d8b6ef..a52d79fc52 100644 --- a/local-setup/config/two-workers.json +++ b/local-setup/config/two-workers.json @@ -63,8 +63,7 @@ ], "subcommand_flags": [ "--skip-ra", - "--dev", - "--request-state" + "--dev" ] } ] diff --git a/sidechain/rpc-handler/src/direct_top_pool_api.rs b/sidechain/rpc-handler/src/direct_top_pool_api.rs index bf4ce02f0d..6b64726254 100644 --- a/sidechain/rpc-handler/src/direct_top_pool_api.rs +++ b/sidechain/rpc-handler/src/direct_top_pool_api.rs @@ -60,14 +60,10 @@ pub fn add_top_pool_direct_rpc_methods( ]) .unwrap_or_else(|e| error!("failed to update prometheus metric: {:?}", e)); let json_value = match author_submit_extrinsic_inner(local_author.clone(), params) { - Ok(hash_value) => RpcReturnValue { - do_watch: true, - value: hash_value.encode(), - status: DirectRequestStatus::TrustedOperationStatus( - TrustedOperationStatus::Submitted, - ), - } - .to_hex(), + // Only return hash to support JSON RPC 2.0. + // Other methods will follow this pattern when + // we tackle #1624. + Ok(hash_value) => hash_value.to_hex(), Err(error) => compute_hex_encoded_return_error(error.as_str()), }; Ok(json!(json_value))