Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pending transactions subscription #647

Merged
merged 84 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
6fdf0d8
Improve readability in json_rpc/mod.rs
FabijanC Oct 18, 2024
062c55b
Add communication via channel [skip ci] [TODO]
FabijanC Oct 22, 2024
6942603
Use tokyo::sync::mpsc; edit TODOs
FabijanC Oct 23, 2024
499151e
Improve websocket/subscription types [skip ci]
FabijanC Oct 23, 2024
960f6cb
Add broadcast_changes skeleton [skip ci]
FabijanC Oct 24, 2024
f7f248c
Enable NewHeads subscription - many TODOs [skip ci]
FabijanC Oct 25, 2024
0503916
Merge branch 'json-rpc-v0.8.0' into ws-methods [skip ci]
FabijanC Oct 25, 2024
cb8a99f
Restructure SubscriptionResponse [skip ci]
FabijanC Oct 28, 2024
3a719af
Merge branch 'json-rpc-v0.8.0' into ws-methods [skip ci]
FabijanC Oct 29, 2024
b5b2272
Introduce ID (rpc and subscription) param [skip ci]
FabijanC Oct 29, 2024
73120bd
Return confirmation on subscribe request [skip ci]
FabijanC Oct 30, 2024
4bb5ef8
Add subscribe and notify methods to SocketContext [skip ci]
FabijanC Oct 30, 2024
9a4a6fa
Remove mpsc channel; use ws writer directly [skip ci]
FabijanC Oct 30, 2024
c638e3f
Remove unwraps in subscribe.rs [skip ci]
FabijanC Oct 30, 2024
beacd6a
Ignore general websocket RPC tests; introduce subscription test file …
FabijanC Oct 31, 2024
62952b7
Merge branch 'json-rpc-v0.8.0' into ws-methods [skip ci]
FabijanC Oct 31, 2024
d973f1e
Fix serialization; add block subscription happy path test [skip ci]
FabijanC Oct 31, 2024
a5bc0b3
Add helper for receiving rpc via ws [skip ci]
FabijanC Oct 31, 2024
fc94d3f
Add unhappy path test [skip ci]
FabijanC Oct 31, 2024
3382a30
Add multiple subscribers test [skip ci]
FabijanC Oct 31, 2024
225a601
Test subscribing to old block [skip ci]
FabijanC Oct 31, 2024
72ea6d2
Rename to test_subscription_to_blocks.rs [skip ci]
FabijanC Oct 31, 2024
cc05e4f
Start starknet_unsubscribe [skip ci]
FabijanC Oct 31, 2024
77eff45
Implement unsubscription TODOs [skip ci]
FabijanC Nov 4, 2024
65307e5
Add unsubscription tests [skip ci]
FabijanC Nov 4, 2024
e89dae2
Replace panic! with todo! [skip ci]
FabijanC Nov 4, 2024
d0bf128
Refactor ws error sending [skip ci]
FabijanC Nov 4, 2024
5fb0511
Improve on_websocket_call [skip ci]
FabijanC Nov 5, 2024
8821f68
Expand testing to pending and block hash [skip ci]
FabijanC Nov 5, 2024
d76675b
Refactor broadcast_changes
FabijanC Nov 5, 2024
ef355c8
Update docs; address TODO [skip ci]
FabijanC Nov 5, 2024
e18c8c4
Do interval block creation via request
FabijanC Nov 5, 2024
aa9417e
Add tests for blocks on demand and on interval
FabijanC Nov 5, 2024
58091d1
Add util function for subscribing to new heads
FabijanC Nov 6, 2024
035b6d4
Add util for unsubscribing; add test for read-only methods
FabijanC Nov 6, 2024
b392b37
Improve code clarity of subscribe_new_heads handler
FabijanC Nov 6, 2024
20ae07f
Remove TODOs for random ID generation
FabijanC Nov 6, 2024
82bb7f5
Improve testing: invalid and aborted blocks
FabijanC Nov 6, 2024
38dfed7
Improve tracing and code clarity
FabijanC Nov 6, 2024
445199a
Add tx status subscription (no tests)
FabijanC Nov 6, 2024
d843f34
Start testing; move unsubscribe util
FabijanC Nov 6, 2024
ff1cbfc
Add missing tx subscription logic and test placeholder [skip ci]
FabijanC Nov 6, 2024
3288a1b
Simplify str assertion; move block subscription function to utils
FabijanC Nov 7, 2024
669c877
Add tests; still no actual tx notification support [skip ci]
FabijanC Nov 7, 2024
7c7c663
Remove allows_method in on_websocket_rpc_call
FabijanC Nov 7, 2024
9369969
Add explanatory comment to get_latest_block [skip ci]
FabijanC Nov 7, 2024
10c19fd
Replace From<RpcError> with RpcResponse::from_rpc_error
FabijanC Nov 7, 2024
5fa5006
Merge branch 'ws-methods' into ws-methods-tx
FabijanC Nov 7, 2024
ed32a36
Edit TODO comments [skip ci]
FabijanC Nov 7, 2024
1ed8a4f
Merge branch 'json-rpc-v0.8.0' into ws-methods-tx
FabijanC Nov 8, 2024
48d527b
Merge branch 'json-rpc-v0.8.0' into ws-methods-tx
FabijanC Nov 8, 2024
3e3291c
Fix block id input; fix subscribe; improve tx subscription impl [skip…
FabijanC Nov 8, 2024
0da501a
Merge branch 'json-rpc-v0.8.0' into ws-methods-tx
FabijanC Nov 11, 2024
08f3880
Add tx status subscription logic [skip ci] (pending testing)
FabijanC Nov 11, 2024
c5824cb
Simplify SubscriptionNotification enum variant names
FabijanC Nov 12, 2024
c165afb
Replace match with if and return error in tx status subscription
FabijanC Nov 12, 2024
cef0ec6
Fix test: subscribe_to_new_tx_status_happy_path
FabijanC Nov 12, 2024
88ade87
Add extensive tx status subscription testing
FabijanC Nov 12, 2024
03843de
Fix impossible case matching
FabijanC Nov 12, 2024
4b4f087
Add TODOs related to pending block diff [skip ci]
FabijanC Nov 12, 2024
e11ea57
Merge branch 'json-rpc-v0.8.0' into ws-methods-tx
FabijanC Nov 13, 2024
823778b
Fix block and tx hash in subscription; all tests passing
FabijanC Nov 13, 2024
b3e50ce
Refactor subscription matching; remove unused struct and derive
FabijanC Nov 13, 2024
1b45ead
Minor refactor
FabijanC Nov 13, 2024
1657939
Refactor subscription tag extraction
FabijanC Nov 13, 2024
9d2572e
Improve clarity of change broadcasting
FabijanC Nov 13, 2024
2fb8b45
Rework tests after review
FabijanC Nov 13, 2024
5f131bc
Latest tx should not be notified of with pending subscription
FabijanC Nov 14, 2024
8a7ed14
Add pending tx subscription [WIP] [skip ci]
FabijanC Nov 14, 2024
b34297f
Remove todo! marks
FabijanC Nov 19, 2024
94fa114
Hash nested - wrong!
FabijanC Nov 19, 2024
d33fa6a
Test for block generation on transaction and demand [skip ci] - hash …
FabijanC Nov 19, 2024
65f40f0
Fix pending tx subscription; all complete tests passing, many unimple…
FabijanC Nov 20, 2024
7bb9266
Implement all tests except address filtering
FabijanC Nov 20, 2024
3b36a16
Implement all tests
FabijanC Nov 20, 2024
cfc5e74
Merge branch 'json-rpc-v0.8.0' into ws-methods-pending
FabijanC Nov 20, 2024
75343c9
Remove TODOs, simplify confirmation enum
FabijanC Nov 20, 2024
a0b11ec
Polish after self-review
FabijanC Nov 20, 2024
30c7193
Merge branch 'json-rpc-v0.8.0' into ws-methods-pending
FabijanC Nov 22, 2024
10bab75
Fix typo: passess
FabijanC Nov 22, 2024
3d7892d
Simplify TransactionHashWrapper serialization
FabijanC Nov 22, 2024
8b03a08
Improve explanatory comment
FabijanC Nov 22, 2024
a4fcd7c
Rename: with_pending_block -> uses_pending_block
FabijanC Nov 22, 2024
bf05121
Refactor pending txs extraction
FabijanC Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/starknet-devnet-core/src/starknet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use starknet_api::block::{BlockNumber, BlockStatus, BlockTimestamp, GasPrice, Ga
use starknet_api::core::SequencerContractAddress;
use starknet_api::felt;
use starknet_api::transaction::Fee;
use starknet_config::BlockGenerationOn;
use starknet_rs_core::types::{
BlockId, BlockTag, Call, ExecutionResult, Felt, Hash256, MsgFromL1, TransactionFinalityStatus,
};
Expand Down Expand Up @@ -409,7 +408,7 @@ impl Starknet {
self.transactions.insert(transaction_hash, transaction_to_add);

// create new block from pending one, only in block-generation-on-transaction mode
if self.config.block_generation_on == BlockGenerationOn::Transaction {
if !self.config.uses_pending_block() {
self.generate_new_block_and_state()?;
}

Expand Down
9 changes: 9 additions & 0 deletions crates/starknet-devnet-core/src/starknet/starknet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ pub struct StarknetConfig {
pub strk_erc20_contract_class: String,
}

impl StarknetConfig {
pub fn uses_pending_block(&self) -> bool {
match self.block_generation_on {
BlockGenerationOn::Transaction => false,
BlockGenerationOn::Demand | BlockGenerationOn::Interval(_) => true,
}
}
}

#[allow(clippy::unwrap_used)]
impl Default for StarknetConfig {
fn default() -> Self {
Expand Down
107 changes: 92 additions & 15 deletions crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use starknet_core::error::Error;
use starknet_core::starknet::starknet_config::BlockGenerationOn;
use starknet_rs_core::types::{BlockId, BlockTag};
use starknet_types::rpc::block::{BlockResult, PendingBlock};
use starknet_types::rpc::transactions::{TransactionWithHash, Transactions};
use starknet_types::starknet_api::block::{BlockNumber, BlockStatus};

use super::error::ApiError;
use super::models::{BlockInput, SubscriptionIdInput, TransactionBlockInput};
use super::models::{
BlockInput, PendingTransactionsSubscriptionInput, SubscriptionIdInput, TransactionBlockInput,
};
use super::{JsonRpcHandler, JsonRpcSubscriptionRequest};
use crate::rpc_core::request::Id;
use crate::subscribe::{NewTransactionStatus, SocketId, Subscription, SubscriptionNotification};
use crate::subscribe::{
AddressFilter, NewTransactionStatus, PendingTransactionNotification, SocketId, Subscription,
SubscriptionNotification, TransactionHashWrapper,
};

/// The definitions of JSON-RPC read endpoints defined in starknet_ws_api.json
impl JsonRpcHandler {
Expand All @@ -24,7 +30,9 @@ impl JsonRpcHandler {
JsonRpcSubscriptionRequest::TransactionStatus(data) => {
self.subscribe_tx_status(data, rpc_request_id, socket_id).await
}
JsonRpcSubscriptionRequest::PendingTransactions => todo!(),
JsonRpcSubscriptionRequest::PendingTransactions(data) => {
self.subscribe_pending_txs(data, rpc_request_id, socket_id).await
}
JsonRpcSubscriptionRequest::Events => todo!(),
JsonRpcSubscriptionRequest::Unsubscribe(SubscriptionIdInput { subscription_id }) => {
let mut sockets = self.api.sockets.lock().await;
Expand Down Expand Up @@ -120,26 +128,95 @@ impl JsonRpcHandler {
.get_block(&BlockId::Number(block_n))
.map_err(ApiError::StarknetDevnetError)?;

let old_header = old_block.into();
let notification = SubscriptionNotification::NewHeads(Box::new(old_header));
let old_header = Box::new(old_block.into());
let notification = SubscriptionNotification::NewHeads(old_header);
socket_context.notify(subscription_id, notification).await;
}

Ok(())
}

/// Based on block generation mode and specified block ID, decide on subscription's sensitivity:
/// Based on pending block usage and specified block ID, decide on subscription's sensitivity:
/// notify of changes in pending or latest block
fn get_subscription_tag(&self, block_id: BlockId) -> BlockTag {
match self.starknet_config.block_generation_on {
BlockGenerationOn::Transaction => BlockTag::Latest,
BlockGenerationOn::Demand | BlockGenerationOn::Interval(_) => match block_id {
if self.starknet_config.uses_pending_block() {
match block_id {
BlockId::Tag(tag) => tag,
BlockId::Hash(_) | BlockId::Number(_) => BlockTag::Pending,
},
}
} else {
BlockTag::Latest
}
}

async fn get_pending_txs(&self) -> Result<Vec<TransactionWithHash>, ApiError> {
let starknet = self.api.starknet.lock().await;
let block = starknet.get_block_with_transactions(&BlockId::Tag(BlockTag::Pending))?;
match block {
BlockResult::PendingBlock(PendingBlock {
transactions: Transactions::Full(txs),
..
}) => Ok(txs),
_ => {
// Never reached if get_block_with_transactions properly implemented.
Err(ApiError::StarknetDevnetError(Error::UnexpectedInternalError {
msg: "Invalid block".into(),
}))
}
}
}

/// Does not return TOO_MANY_ADDRESSES_IN_FILTER
pub async fn subscribe_pending_txs(
&self,
maybe_subscription_input: Option<PendingTransactionsSubscriptionInput>,
rpc_request_id: Id,
socket_id: SocketId,
) -> Result<(), ApiError> {
let with_details = maybe_subscription_input
.as_ref()
.and_then(|subscription_input| subscription_input.transaction_details)
.unwrap_or_default();

let address_filter = AddressFilter::new(
maybe_subscription_input
.and_then(|subscription_input| subscription_input.sender_address)
.unwrap_or_default(),
);

let mut sockets = self.api.sockets.lock().await;
let socket_context = sockets.get_mut(&socket_id).ok_or(ApiError::StarknetDevnetError(
Error::UnexpectedInternalError { msg: format!("Unregistered socket ID: {socket_id}") },
))?;

let subscription = if with_details {
Subscription::PendingTransactionsFull { address_filter }
} else {
Subscription::PendingTransactionsHash { address_filter }
};
let subscription_id = socket_context.subscribe(rpc_request_id, subscription).await;

// Only check pending. Regardless of block generation mode, ignore txs in latest block.
let pending_txs = self.get_pending_txs().await?;
for tx in pending_txs {
let notification = if with_details {
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Full(
Box::new(tx),
))
} else {
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Hash(
TransactionHashWrapper {
hash: *tx.get_transaction_hash(),
sender_address: tx.get_sender_address(),
},
))
};
socket_context.notify(subscription_id, notification).await;
}

Ok(())
}

async fn subscribe_tx_status(
&self,
transaction_block_input: TransactionBlockInput,
Expand Down Expand Up @@ -167,10 +244,9 @@ impl JsonRpcHandler {
// TODO if tx present, but in a block before the one specified, no point in subscribing -
// its status shall never change (unless considering block abortion). It would make
// sense to just add a ReorgSubscription
let subscription = Subscription::TransactionStatus {
tag: self.get_subscription_tag(query_block_id),
transaction_hash,
};
let subscription_tag = self.get_subscription_tag(query_block_id);
let subscription =
Subscription::TransactionStatus { tag: subscription_tag, transaction_hash };
let subscription_id = socket_context.subscribe(rpc_request_id, subscription).await;

let starknet = self.api.starknet.lock().await;
Expand All @@ -179,6 +255,7 @@ impl JsonRpcHandler {
let notification = SubscriptionNotification::TransactionStatus(NewTransactionStatus {
transaction_hash,
status: tx.get_status(),
origin_tag: subscription_tag,
});
match tx.get_block_number() {
Some(BlockNumber(block_number))
Expand Down
89 changes: 66 additions & 23 deletions crates/starknet-devnet-server/src/api/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use futures::{SinkExt, StreamExt};
use models::{
BlockAndClassHashInput, BlockAndContractAddressInput, BlockAndIndexInput, BlockInput,
CallInput, EstimateFeeInput, EventsInput, GetStorageInput, L1TransactionHashInput,
SubscriptionIdInput, TransactionBlockInput, TransactionHashInput, TransactionHashOutput,
PendingTransactionsSubscriptionInput, SubscriptionIdInput, TransactionBlockInput,
TransactionHashInput, TransactionHashOutput,
};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::json;
use starknet_core::starknet::starknet_config::{BlockGenerationOn, DumpOn, StarknetConfig};
use starknet_core::starknet::starknet_config::{DumpOn, StarknetConfig};
use starknet_core::{CasmContractClass, StarknetBlock};
use starknet_rs_core::types::{BlockId, BlockTag, ContractClass as CodegenContractClass, Felt};
use starknet_types::messaging::{MessageToL1, MessageToL2};
Expand Down Expand Up @@ -71,7 +72,10 @@ use crate::rpc_core::error::{ErrorCode, RpcError};
use crate::rpc_core::request::RpcMethodCall;
use crate::rpc_core::response::{ResponseResult, RpcResponse};
use crate::rpc_handler::RpcHandler;
use crate::subscribe::{NewTransactionStatus, SocketContext, SocketId, SubscriptionNotification};
use crate::subscribe::{
NewTransactionStatus, PendingTransactionNotification, SocketContext, SocketId,
SubscriptionNotification, TransactionHashWrapper,
};
use crate::ServerConfig;

/// Helper trait to easily convert results to rpc results
Expand Down Expand Up @@ -227,21 +231,42 @@ impl JsonRpcHandler {

if new_pending_txs.len() > old_pending_txs.len() {
#[allow(clippy::expect_used)]
let new_tx = new_pending_txs.last().expect("has at least one element");
let new_tx_hash = new_pending_txs.last().expect("has at least one element");

let starknet = self.api.starknet.lock().await;

let status = starknet
.get_transaction_execution_and_finality_status(*new_tx)
.get_transaction_execution_and_finality_status(*new_tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;
let tx_status_notification =
SubscriptionNotification::TransactionStatus(NewTransactionStatus {
transaction_hash: *new_tx,
transaction_hash: *new_tx_hash,
status,
origin_tag: BlockTag::Pending,
});

let tx = starknet
.get_transaction_by_hash(*new_tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;
let pending_tx_notification = SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Full(Box::new(tx.clone())),
);

let pending_tx_hash_notification = SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Hash(TransactionHashWrapper {
hash: *tx.get_transaction_hash(),
sender_address: tx.get_sender_address(),
}),
);

let notifications =
[tx_status_notification, pending_tx_notification, pending_tx_hash_notification];

let sockets = self.api.sockets.lock().await;
for (_, socket_context) in sockets.iter() {
socket_context.notify_subscribers(&tx_status_notification, BlockTag::Pending).await;
for notification in &notifications {
socket_context.notify_subscribers(notification).await;
}
}
}

Expand All @@ -252,27 +277,46 @@ impl JsonRpcHandler {
&self,
new_latest_block: StarknetBlock,
) -> Result<(), error::ApiError> {
let block_header = (&new_latest_block).into();
let block_notification = SubscriptionNotification::NewHeads(Box::new(block_header));
let block_header = Box::new((&new_latest_block).into());
let mut notifications = vec![SubscriptionNotification::NewHeads(block_header)];

let starknet = self.api.starknet.lock().await;

let mut tx_status_notifications = vec![];
for tx_hash in new_latest_block.get_transactions() {
let status = starknet
.get_transaction_execution_and_finality_status(*tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;

tx_status_notifications.push(SubscriptionNotification::TransactionStatus(
NewTransactionStatus { transaction_hash: *tx_hash, status },
));
notifications.push(SubscriptionNotification::TransactionStatus(NewTransactionStatus {
transaction_hash: *tx_hash,
status,
origin_tag: BlockTag::Latest,
}));

// There are no pending txs in this mode, but basically we are pretending that the
// transaction existed for a short period of time in the pending block, thus triggering
// the notification. This is important for users depending on this subscription type to
// find out about all new transactions.
if !self.starknet_config.uses_pending_block() {
let tx = starknet
.get_transaction_by_hash(*tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;
notifications.push(SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Full(Box::new(tx.clone())),
));
notifications.push(SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Hash(TransactionHashWrapper {
hash: *tx_hash,
sender_address: tx.get_sender_address(),
}),
));
}
}

let sockets = self.api.sockets.lock().await;
for (_, socket_context) in sockets.iter() {
socket_context.notify_subscribers(&block_notification, BlockTag::Latest).await;
for tx_status_notification in tx_status_notifications.iter() {
socket_context.notify_subscribers(tx_status_notification, BlockTag::Latest).await;
for notification in &notifications {
socket_context.notify_subscribers(notification).await;
}
}

Expand Down Expand Up @@ -312,11 +356,10 @@ impl JsonRpcHandler {

// for later comparison and subscription notifications
let old_latest_block = self.get_block(BlockTag::Latest).await;
let old_pending_block = match self.starknet_config.block_generation_on {
BlockGenerationOn::Transaction => None,
BlockGenerationOn::Interval(_) | BlockGenerationOn::Demand => {
Some(self.get_block(BlockTag::Pending).await)
}
let old_pending_block = if self.starknet_config.uses_pending_block() {
Some(self.get_block(BlockTag::Pending).await)
} else {
None
};

// true if origin should be tried after request fails; relevant in forking mode
Expand Down Expand Up @@ -711,8 +754,8 @@ pub enum JsonRpcSubscriptionRequest {
NewHeads(Option<BlockInput>),
#[serde(rename = "starknet_subscribeTransactionStatus")]
TransactionStatus(TransactionBlockInput),
#[serde(rename = "starknet_subscribePendingTransactions")]
PendingTransactions,
#[serde(rename = "starknet_subscribePendingTransactions", with = "optional_params")]
PendingTransactions(Option<PendingTransactionsSubscriptionInput>),
#[serde(rename = "starknet_subscribeEvents")]
Events,
#[serde(rename = "starknet_unsubscribe")]
Expand Down
7 changes: 7 additions & 0 deletions crates/starknet-devnet-server/src/api/json_rpc/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ pub struct TransactionBlockInput {
pub block: Option<BlockId>,
}

#[derive(Deserialize, Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct PendingTransactionsSubscriptionInput {
pub transaction_details: Option<bool>,
pub sender_address: Option<Vec<ContractAddress>>,
}

#[cfg(test)]
mod tests {
use starknet_rs_core::types::{BlockId as ImportedBlockId, BlockTag, Felt};
Expand Down
Loading