From e491d9d0b55d25ab2ea6f64249d86d5e45c76195 Mon Sep 17 00:00:00 2001 From: smartgoo Date: Fri, 29 Nov 2024 12:43:20 -0500 Subject: [PATCH] Python bug fix (#128) * RpcClient url bug * future_into_py simplification --- rpc/macros/src/wrpc/python.rs | 12 +-- rpc/wrpc/bindings/python/src/client.rs | 102 +++++++++++------- rpc/wrpc/bindings/python/src/resolver.rs | 17 ++- .../bindings/python/tx/generator/pending.rs | 7 +- 4 files changed, 82 insertions(+), 56 deletions(-) diff --git a/rpc/macros/src/wrpc/python.rs b/rpc/macros/src/wrpc/python.rs index 630fa3a95..ccfbf95b9 100644 --- a/rpc/macros/src/wrpc/python.rs +++ b/rpc/macros/src/wrpc/python.rs @@ -155,25 +155,25 @@ impl ToTokens for RpcSubscriptions { targets.push(quote! { #[pymethods] impl RpcClient { - fn #fn_subscribe_snake(&self, py: Python) -> PyResult> { + fn #fn_subscribe_snake<'py>(&self, py: Python<'py>) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { client.start_notify(listener_id, Scope::#scope(#sub_scope {})).await?; Ok(()) - }} + }) } else { Err(PyErr::new::("RPC subscribe on a closed connection")) } } - fn #fn_unsubscribe_snake(&self, py: Python) -> PyResult> { + fn #fn_unsubscribe_snake<'py>(&self, py: Python<'py>) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { client.stop_notify(listener_id, Scope::#scope(#sub_scope {})).await?; Ok(()) - }} + }) } else { Err(PyErr::new::("RPC unsubscribe on a closed connection")) } diff --git a/rpc/wrpc/bindings/python/src/client.rs b/rpc/wrpc/bindings/python/src/client.rs index b905305d4..6d2cf6d6a 100644 --- a/rpc/wrpc/bindings/python/src/client.rs +++ b/rpc/wrpc/bindings/python/src/client.rs @@ -6,7 +6,6 @@ use kaspa_notify::listener::ListenerId; use kaspa_notify::notification::Notification; use kaspa_notify::scope::{Scope, UtxosChangedScope, VirtualChainChangedScope, VirtualDaaScoreChangedScope}; use kaspa_notify::{connection::ChannelType, events::EventType}; -use kaspa_python_macros::py_async; use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::model::*; use kaspa_rpc_core::notify::connection::ChannelConnection; @@ -141,19 +140,26 @@ impl RpcClient { url: Option, encoding: Option, network_id: Option, - ) -> Result { - let client = Arc::new(KaspaRpcClient::new( - encoding.unwrap_or(Encoding::Borsh), - url.as_deref(), - Some(resolver.as_ref().unwrap().clone().into()), - network_id, - None, - )?); + ) -> PyResult { + let encoding = encoding.unwrap_or(Encoding::Borsh); + let url = url + .map( + |url| { + if let Some(network_id) = network_id { + Self::parse_url(&url, encoding, network_id) + } else { + Ok(url.to_string()) + } + }, + ) + .transpose()?; + + let client = Arc::new(KaspaRpcClient::new(encoding, url.as_deref(), resolver.clone().map(Into::into), network_id, None)?); let rpc_client = RpcClient { inner: Arc::new(Inner { client, - resolver, + resolver: resolver.map(Into::into), notification_task: Arc::new(AtomicBool::new(false)), notification_ctl: DuplexChannel::oneshot(), callbacks: Arc::new(Default::default()), @@ -221,15 +227,15 @@ impl RpcClient { } #[pyo3(signature = (block_async_connect=None, strategy=None, url=None, timeout_duration=None, retry_interval=None))] - pub fn connect( + pub fn connect<'py>( &self, - py: Python, + py: Python<'py>, block_async_connect: Option, strategy: Option, url: Option, timeout_duration: Option, retry_interval: Option, - ) -> PyResult> { + ) -> PyResult> { let block_async_connect = block_async_connect.unwrap_or(true); let strategy = match strategy { Some(strategy) => ConnectStrategy::from_str(&strategy).map_err(|err| PyException::new_err(format!("{}", err)))?, @@ -243,29 +249,29 @@ impl RpcClient { self.start_notification_task(py)?; let client = self.inner.client.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { let _ = client.connect(Some(options)).await.map_err(|e| PyException::new_err(e.to_string())); Ok(()) - }} + }) } - fn disconnect(&self, py: Python) -> PyResult> { + fn disconnect<'py>(&self, py: Python<'py>) -> PyResult> { let client = self.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { client.inner.client.disconnect().await?; client.stop_notification_task().await?; Ok(()) - }} + }) } - fn start(&self, py: Python) -> PyResult> { + fn start<'py>(&self, py: Python<'py>) -> PyResult> { self.start_notification_task(py)?; let inner = self.inner.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { inner.client.start().await?; Ok(()) - }} + }) } // fn stop() PY-TODO @@ -335,7 +341,6 @@ impl RpcClient { // fn clear_event_listener PY-TODO // fn default_port PY-TODO - // fn parse_url PY-TODO fn remove_all_event_listeners(&self) -> PyResult<()> { *self.inner.callbacks.lock().unwrap() = Default::default(); @@ -343,6 +348,13 @@ impl RpcClient { } } +impl RpcClient { + pub fn parse_url(url: &str, encoding: Encoding, network_id: NetworkId) -> PyResult { + let url_ = KaspaRpcClient::parse_url(url.to_string(), encoding, network_id.into())?; + Ok(url_) + } +} + impl RpcClient { // fn new_with_rpc_client() PY-TODO @@ -482,49 +494,67 @@ impl RpcClient { #[pymethods] impl RpcClient { - fn subscribe_utxos_changed(&self, py: Python, addresses: Vec
) -> PyResult> { + fn subscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec
) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { client.start_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; Ok(()) - }} + }) } else { Err(PyException::new_err("RPC subscribe on a closed connection")) } } - fn unsubscribe_utxos_changed(&self, py: Python, addresses: Vec
) -> PyResult> { + fn unsubscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec
) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { client.stop_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; Ok(()) - }} + }) } else { Err(PyException::new_err("RPC unsubscribe on a closed connection")) } } - fn subscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult> { + fn subscribe_virtual_chain_changed<'py>( + &self, + py: Python<'py>, + include_accepted_transaction_ids: bool, + ) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); - py_async! {py, async move { - client.start_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?; + pyo3_async_runtimes::tokio::future_into_py(py, async move { + client + .start_notify( + listener_id, + Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }), + ) + .await?; Ok(()) - }} + }) } else { Err(PyException::new_err("RPC subscribe on a closed connection")) } } - fn unsubscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult> { + fn unsubscribe_virtual_chain_changed<'py>( + &self, + py: Python<'py>, + include_accepted_transaction_ids: bool, + ) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); - py_async! {py, async move { - client.stop_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?; + pyo3_async_runtimes::tokio::future_into_py(py, async move { + client + .stop_notify( + listener_id, + Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }), + ) + .await?; Ok(()) - }} + }) } else { Err(PyException::new_err("RPC unsubscribe on a closed connection")) } diff --git a/rpc/wrpc/bindings/python/src/resolver.rs b/rpc/wrpc/bindings/python/src/resolver.rs index 71b4a007b..8e8215642 100644 --- a/rpc/wrpc/bindings/python/src/resolver.rs +++ b/rpc/wrpc/bindings/python/src/resolver.rs @@ -1,5 +1,4 @@ use kaspa_consensus_core::network::NetworkId; -use kaspa_python_macros::py_async; use kaspa_wrpc_client::{Resolver as NativeResolver, WrpcEncoding}; use pyo3::{exceptions::PyException, prelude::*}; use std::{str::FromStr, sync::Arc}; @@ -36,28 +35,26 @@ impl Resolver { self.resolver.urls().unwrap_or_default().into_iter().map(|url| (*url).clone()).collect::>() } - fn get_node(&self, py: Python, encoding: &str, network_id: &str) -> PyResult> { + fn get_node<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult> { let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?; let network_id = NetworkId::from_str(network_id)?; let resolver = self.resolver.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { let node = resolver.get_node(encoding, network_id).await?; - Python::with_gil(|py| { - Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py)) - }) - }} + Python::with_gil(|py| Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py))) + }) } - fn get_url(&self, py: Python, encoding: &str, network_id: &str) -> PyResult> { + fn get_url<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult> { let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?; let network_id = NetworkId::from_str(network_id)?; let resolver = self.resolver.clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { let url = resolver.get_url(encoding, network_id).await?; Ok(url) - }} + }) } // fn connect() TODO diff --git a/wallet/core/src/bindings/python/tx/generator/pending.rs b/wallet/core/src/bindings/python/tx/generator/pending.rs index f1842c3db..7f57f416c 100644 --- a/wallet/core/src/bindings/python/tx/generator/pending.rs +++ b/wallet/core/src/bindings/python/tx/generator/pending.rs @@ -2,7 +2,6 @@ use crate::imports::*; use crate::tx::generator as native; use kaspa_consensus_client::Transaction; use kaspa_consensus_core::hashing::wasm::SighashType; -use kaspa_python_macros::py_async; use kaspa_wallet_keys::privatekey::PrivateKey; use kaspa_wrpc_python::client::RpcClient; @@ -117,14 +116,14 @@ impl PendingTransaction { Ok(()) } - fn submit(&self, py: Python, rpc_client: &RpcClient) -> PyResult> { + fn submit<'py>(&self, py: Python<'py>, rpc_client: &RpcClient) -> PyResult> { let inner = self.inner.clone(); let rpc: Arc = rpc_client.client().clone(); - py_async! {py, async move { + pyo3_async_runtimes::tokio::future_into_py(py, async move { let txid = inner.try_submit(&rpc).await?; Ok(txid.to_string()) - }} + }) } #[getter]