Skip to content

Commit

Permalink
Python bug fix (#128)
Browse files Browse the repository at this point in the history
* RpcClient url bug

* future_into_py simplification
  • Loading branch information
smartgoo authored Nov 29, 2024
1 parent cdbdc43 commit e491d9d
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 56 deletions.
12 changes: 6 additions & 6 deletions rpc/macros/src/wrpc/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,25 @@ impl ToTokens for RpcSubscriptions {
targets.push(quote! {
#[pymethods]
impl RpcClient {
fn #fn_subscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
fn #fn_subscribe_snake<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.start_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
})
} else {
Err(PyErr::new::<PyException, _>("RPC subscribe on a closed connection"))
}
}

fn #fn_unsubscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
fn #fn_unsubscribe_snake<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.stop_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
})
} else {
Err(PyErr::new::<PyException, _>("RPC unsubscribe on a closed connection"))
}
Expand Down
102 changes: 66 additions & 36 deletions rpc/wrpc/bindings/python/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use kaspa_notify::listener::ListenerId;
use kaspa_notify::notification::Notification;
use kaspa_notify::scope::{Scope, UtxosChangedScope, VirtualChainChangedScope, VirtualDaaScoreChangedScope};
use kaspa_notify::{connection::ChannelType, events::EventType};
use kaspa_python_macros::py_async;
use kaspa_rpc_core::api::rpc::RpcApi;
use kaspa_rpc_core::model::*;
use kaspa_rpc_core::notify::connection::ChannelConnection;
Expand Down Expand Up @@ -141,19 +140,26 @@ impl RpcClient {
url: Option<String>,
encoding: Option<WrpcEncoding>,
network_id: Option<NetworkId>,
) -> Result<RpcClient> {
let client = Arc::new(KaspaRpcClient::new(
encoding.unwrap_or(Encoding::Borsh),
url.as_deref(),
Some(resolver.as_ref().unwrap().clone().into()),
network_id,
None,
)?);
) -> PyResult<RpcClient> {
let encoding = encoding.unwrap_or(Encoding::Borsh);
let url = url
.map(
|url| {
if let Some(network_id) = network_id {
Self::parse_url(&url, encoding, network_id)
} else {
Ok(url.to_string())
}
},
)
.transpose()?;

let client = Arc::new(KaspaRpcClient::new(encoding, url.as_deref(), resolver.clone().map(Into::into), network_id, None)?);

let rpc_client = RpcClient {
inner: Arc::new(Inner {
client,
resolver,
resolver: resolver.map(Into::into),
notification_task: Arc::new(AtomicBool::new(false)),
notification_ctl: DuplexChannel::oneshot(),
callbacks: Arc::new(Default::default()),
Expand Down Expand Up @@ -221,15 +227,15 @@ impl RpcClient {
}

#[pyo3(signature = (block_async_connect=None, strategy=None, url=None, timeout_duration=None, retry_interval=None))]
pub fn connect(
pub fn connect<'py>(
&self,
py: Python,
py: Python<'py>,
block_async_connect: Option<bool>,
strategy: Option<String>,
url: Option<String>,
timeout_duration: Option<u64>,
retry_interval: Option<u64>,
) -> PyResult<Py<PyAny>> {
) -> PyResult<Bound<'py, PyAny>> {
let block_async_connect = block_async_connect.unwrap_or(true);
let strategy = match strategy {
Some(strategy) => ConnectStrategy::from_str(&strategy).map_err(|err| PyException::new_err(format!("{}", err)))?,
Expand All @@ -243,29 +249,29 @@ impl RpcClient {
self.start_notification_task(py)?;

let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let _ = client.connect(Some(options)).await.map_err(|e| PyException::new_err(e.to_string()));
Ok(())
}}
})
}

fn disconnect(&self, py: Python) -> PyResult<Py<PyAny>> {
fn disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let client = self.clone();

py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.inner.client.disconnect().await?;
client.stop_notification_task().await?;
Ok(())
}}
})
}

fn start(&self, py: Python) -> PyResult<Py<PyAny>> {
fn start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.start_notification_task(py)?;
let inner = self.inner.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.client.start().await?;
Ok(())
}}
})
}

// fn stop() PY-TODO
Expand Down Expand Up @@ -335,14 +341,20 @@ impl RpcClient {

// fn clear_event_listener PY-TODO
// fn default_port PY-TODO
// fn parse_url PY-TODO

fn remove_all_event_listeners(&self) -> PyResult<()> {
*self.inner.callbacks.lock().unwrap() = Default::default();
Ok(())
}
}

impl RpcClient {
pub fn parse_url(url: &str, encoding: Encoding, network_id: NetworkId) -> PyResult<String> {
let url_ = KaspaRpcClient::parse_url(url.to_string(), encoding, network_id.into())?;
Ok(url_)
}
}

impl RpcClient {
// fn new_with_rpc_client() PY-TODO

Expand Down Expand Up @@ -482,49 +494,67 @@ impl RpcClient {

#[pymethods]
impl RpcClient {
fn subscribe_utxos_changed(&self, py: Python, addresses: Vec<Address>) -> PyResult<Py<PyAny>> {
fn subscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec<Address>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.start_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC subscribe on a closed connection"))
}
}

fn unsubscribe_utxos_changed(&self, py: Python, addresses: Vec<Address>) -> PyResult<Py<PyAny>> {
fn unsubscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec<Address>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.stop_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC unsubscribe on a closed connection"))
}
}

fn subscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult<Py<PyAny>> {
fn subscribe_virtual_chain_changed<'py>(
&self,
py: Python<'py>,
include_accepted_transaction_ids: bool,
) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.start_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.start_notify(
listener_id,
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC subscribe on a closed connection"))
}
}

fn unsubscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult<Py<PyAny>> {
fn unsubscribe_virtual_chain_changed<'py>(
&self,
py: Python<'py>,
include_accepted_transaction_ids: bool,
) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.stop_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.stop_notify(
listener_id,
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC unsubscribe on a closed connection"))
}
Expand Down
17 changes: 7 additions & 10 deletions rpc/wrpc/bindings/python/src/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use kaspa_consensus_core::network::NetworkId;
use kaspa_python_macros::py_async;
use kaspa_wrpc_client::{Resolver as NativeResolver, WrpcEncoding};
use pyo3::{exceptions::PyException, prelude::*};
use std::{str::FromStr, sync::Arc};
Expand Down Expand Up @@ -36,28 +35,26 @@ impl Resolver {
self.resolver.urls().unwrap_or_default().into_iter().map(|url| (*url).clone()).collect::<Vec<_>>()
}

fn get_node(&self, py: Python, encoding: &str, network_id: &str) -> PyResult<Py<PyAny>> {
fn get_node<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult<Bound<'py, PyAny>> {
let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?;
let network_id = NetworkId::from_str(network_id)?;

let resolver = self.resolver.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let node = resolver.get_node(encoding, network_id).await?;
Python::with_gil(|py| {
Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py))
})
}}
Python::with_gil(|py| Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py)))
})
}

fn get_url(&self, py: Python, encoding: &str, network_id: &str) -> PyResult<Py<PyAny>> {
fn get_url<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult<Bound<'py, PyAny>> {
let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?;
let network_id = NetworkId::from_str(network_id)?;

let resolver = self.resolver.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let url = resolver.get_url(encoding, network_id).await?;
Ok(url)
}}
})
}

// fn connect() TODO
Expand Down
7 changes: 3 additions & 4 deletions wallet/core/src/bindings/python/tx/generator/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::imports::*;
use crate::tx::generator as native;
use kaspa_consensus_client::Transaction;
use kaspa_consensus_core::hashing::wasm::SighashType;
use kaspa_python_macros::py_async;
use kaspa_wallet_keys::privatekey::PrivateKey;
use kaspa_wrpc_python::client::RpcClient;

Expand Down Expand Up @@ -117,14 +116,14 @@ impl PendingTransaction {
Ok(())
}

fn submit(&self, py: Python, rpc_client: &RpcClient) -> PyResult<Py<PyAny>> {
fn submit<'py>(&self, py: Python<'py>, rpc_client: &RpcClient) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let rpc: Arc<DynRpcApi> = rpc_client.client().clone();

py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let txid = inner.try_submit(&rpc).await?;
Ok(txid.to_string())
}}
})
}

#[getter]
Expand Down

0 comments on commit e491d9d

Please sign in to comment.