Skip to content

Commit

Permalink
future_into_py simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
smartgoo committed Nov 29, 2024
1 parent 586c906 commit 93ff8e2
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 46 deletions.
12 changes: 6 additions & 6 deletions rpc/macros/src/wrpc/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,25 @@ impl ToTokens for RpcSubscriptions {
targets.push(quote! {
#[pymethods]
impl RpcClient {
fn #fn_subscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
fn #fn_subscribe_snake<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.start_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
})
} else {
Err(PyErr::new::<PyException, _>("RPC subscribe on a closed connection"))
}
}

fn #fn_unsubscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
fn #fn_unsubscribe_snake<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.stop_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
})
} else {
Err(PyErr::new::<PyException, _>("RPC unsubscribe on a closed connection"))
}
Expand Down
69 changes: 43 additions & 26 deletions rpc/wrpc/bindings/python/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use kaspa_notify::listener::ListenerId;
use kaspa_notify::notification::Notification;
use kaspa_notify::scope::{Scope, UtxosChangedScope, VirtualChainChangedScope, VirtualDaaScoreChangedScope};
use kaspa_notify::{connection::ChannelType, events::EventType};
use kaspa_python_macros::py_async;
use kaspa_rpc_core::api::rpc::RpcApi;
use kaspa_rpc_core::model::*;
use kaspa_rpc_core::notify::connection::ChannelConnection;
Expand Down Expand Up @@ -228,15 +227,15 @@ impl RpcClient {
}

#[pyo3(signature = (block_async_connect=None, strategy=None, url=None, timeout_duration=None, retry_interval=None))]
pub fn connect(
pub fn connect<'py>(
&self,
py: Python,
py: Python<'py>,
block_async_connect: Option<bool>,
strategy: Option<String>,
url: Option<String>,
timeout_duration: Option<u64>,
retry_interval: Option<u64>,
) -> PyResult<Py<PyAny>> {
) -> PyResult<Bound<'py, PyAny>> {
let block_async_connect = block_async_connect.unwrap_or(true);
let strategy = match strategy {
Some(strategy) => ConnectStrategy::from_str(&strategy).map_err(|err| PyException::new_err(format!("{}", err)))?,
Expand All @@ -250,29 +249,29 @@ impl RpcClient {
self.start_notification_task(py)?;

let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let _ = client.connect(Some(options)).await.map_err(|e| PyException::new_err(e.to_string()));
Ok(())
}}
})
}

fn disconnect(&self, py: Python) -> PyResult<Py<PyAny>> {
fn disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let client = self.clone();

py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.inner.client.disconnect().await?;
client.stop_notification_task().await?;
Ok(())
}}
})
}

fn start(&self, py: Python) -> PyResult<Py<PyAny>> {
fn start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.start_notification_task(py)?;
let inner = self.inner.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.client.start().await?;
Ok(())
}}
})
}

// fn stop() PY-TODO
Expand Down Expand Up @@ -495,49 +494,67 @@ impl RpcClient {

#[pymethods]
impl RpcClient {
fn subscribe_utxos_changed(&self, py: Python, addresses: Vec<Address>) -> PyResult<Py<PyAny>> {
fn subscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec<Address>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.start_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC subscribe on a closed connection"))
}
}

fn unsubscribe_utxos_changed(&self, py: Python, addresses: Vec<Address>) -> PyResult<Py<PyAny>> {
fn unsubscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec<Address>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.stop_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC unsubscribe on a closed connection"))
}
}

fn subscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult<Py<PyAny>> {
fn subscribe_virtual_chain_changed<'py>(
&self,
py: Python<'py>,
include_accepted_transaction_ids: bool,
) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.start_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.start_notify(
listener_id,
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC subscribe on a closed connection"))
}
}

fn unsubscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult<Py<PyAny>> {
fn unsubscribe_virtual_chain_changed<'py>(
&self,
py: Python<'py>,
include_accepted_transaction_ids: bool,
) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.stop_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.stop_notify(
listener_id,
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC unsubscribe on a closed connection"))
}
Expand Down
17 changes: 7 additions & 10 deletions rpc/wrpc/bindings/python/src/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use kaspa_consensus_core::network::NetworkId;
use kaspa_python_macros::py_async;
use kaspa_wrpc_client::{Resolver as NativeResolver, WrpcEncoding};
use pyo3::{exceptions::PyException, prelude::*};
use std::{str::FromStr, sync::Arc};
Expand Down Expand Up @@ -36,28 +35,26 @@ impl Resolver {
self.resolver.urls().unwrap_or_default().into_iter().map(|url| (*url).clone()).collect::<Vec<_>>()
}

fn get_node(&self, py: Python, encoding: &str, network_id: &str) -> PyResult<Py<PyAny>> {
fn get_node<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult<Bound<'py, PyAny>> {
let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?;
let network_id = NetworkId::from_str(network_id)?;

let resolver = self.resolver.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let node = resolver.get_node(encoding, network_id).await?;
Python::with_gil(|py| {
Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py))
})
}}
Python::with_gil(|py| Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py)))
})
}

fn get_url(&self, py: Python, encoding: &str, network_id: &str) -> PyResult<Py<PyAny>> {
fn get_url<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult<Bound<'py, PyAny>> {
let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?;
let network_id = NetworkId::from_str(network_id)?;

let resolver = self.resolver.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let url = resolver.get_url(encoding, network_id).await?;
Ok(url)
}}
})
}

// fn connect() TODO
Expand Down
7 changes: 3 additions & 4 deletions wallet/core/src/bindings/python/tx/generator/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::imports::*;
use crate::tx::generator as native;
use kaspa_consensus_client::Transaction;
use kaspa_consensus_core::hashing::wasm::SighashType;
use kaspa_python_macros::py_async;
use kaspa_wallet_keys::privatekey::PrivateKey;
use kaspa_wrpc_python::client::RpcClient;

Expand Down Expand Up @@ -117,14 +116,14 @@ impl PendingTransaction {
Ok(())
}

fn submit(&self, py: Python, rpc_client: &RpcClient) -> PyResult<Py<PyAny>> {
fn submit<'py>(&self, py: Python<'py>, rpc_client: &RpcClient) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let rpc: Arc<DynRpcApi> = rpc_client.client().clone();

py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let txid = inner.try_submit(&rpc).await?;
Ok(txid.to_string())
}}
})
}

#[getter]
Expand Down

0 comments on commit 93ff8e2

Please sign in to comment.