Skip to content

Commit

Permalink
async future_into_py cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
smartgoo committed Nov 29, 2024
1 parent 4a96076 commit 4f3ff10
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 45 deletions.
12 changes: 6 additions & 6 deletions rpc/macros/src/wrpc/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,25 @@ impl ToTokens for RpcSubscriptions {
targets.push(quote! {
#[pymethods]
impl RpcClient {
fn #fn_subscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
fn #fn_subscribe_snake<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.start_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
})
} else {
Err(PyErr::new::<PyException, _>("RPC subscribe on a closed connection"))
}
}

fn #fn_unsubscribe_snake(&self, py: Python) -> PyResult<Py<PyAny>> {
fn #fn_unsubscribe_snake<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.stop_notify(listener_id, Scope::#scope(#sub_scope {})).await?;
Ok(())
}}
})
} else {
Err(PyErr::new::<PyException, _>("RPC unsubscribe on a closed connection"))
}
Expand Down
64 changes: 38 additions & 26 deletions rpc/wrpc/bindings/python/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,34 +251,28 @@ impl RpcClient {
self.start_notification_task(py)?;

let client = self.inner.client.clone();
// py_async! {py, async move {
// let _ = client.connect(Some(options)).await.map_err(|e| PyException::new_err(e.to_string()))?;
// Ok(())
// }}

pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.connect(Some(options)).await?;
Ok(())
})
}

fn disconnect(&self, py: Python) -> PyResult<Py<PyAny>> {
fn disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let client = self.clone();

py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.inner.client.disconnect().await?;
client.stop_notification_task().await?;
Ok(())
}}
})
}

fn start(&self, py: Python) -> PyResult<Py<PyAny>> {
fn start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.start_notification_task(py)?;
let inner = self.inner.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.client.start().await?;
Ok(())
}}
})
}

// fn stop() PY-TODO
Expand Down Expand Up @@ -500,49 +494,67 @@ impl RpcClient {

#[pymethods]
impl RpcClient {
fn subscribe_utxos_changed(&self, py: Python, addresses: Vec<Address>) -> PyResult<Py<PyAny>> {
fn subscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec<Address>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.start_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC subscribe on a closed connection"))
}
}

fn unsubscribe_utxos_changed(&self, py: Python, addresses: Vec<Address>) -> PyResult<Py<PyAny>> {
fn unsubscribe_utxos_changed<'py>(&self, py: Python<'py>, addresses: Vec<Address>) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.stop_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC unsubscribe on a closed connection"))
}
}

fn subscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult<Py<PyAny>> {
fn subscribe_virtual_chain_changed<'py>(
&self,
py: Python<'py>,
include_accepted_transaction_ids: bool,
) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.start_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.start_notify(
listener_id,
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC subscribe on a closed connection"))
}
}

fn unsubscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult<Py<PyAny>> {
fn unsubscribe_virtual_chain_changed<'py>(
&self,
py: Python<'py>,
include_accepted_transaction_ids: bool,
) -> PyResult<Bound<'py, PyAny>> {
if let Some(listener_id) = self.listener_id() {
let client = self.inner.client.clone();
py_async! {py, async move {
client.stop_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.stop_notify(
listener_id,
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}}
})
} else {
Err(PyException::new_err("RPC unsubscribe on a closed connection"))
}
Expand Down
16 changes: 7 additions & 9 deletions rpc/wrpc/bindings/python/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,26 @@ impl Resolver {
self.resolver.urls().unwrap_or_default().into_iter().map(|url| (*url).clone()).collect::<Vec<_>>()
}

fn get_node(&self, py: Python, encoding: &str, network_id: &str) -> PyResult<Py<PyAny>> {
fn get_node<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult<Bound<'py, PyAny>> {
let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?;
let network_id = NetworkId::from_str(network_id)?;

let resolver = self.resolver.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let node = resolver.get_node(encoding, network_id).await?;
Python::with_gil(|py| {
Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py))
})
}}
Python::with_gil(|py| Ok(serde_pyobject::to_pyobject(py, &node)?.to_object(py)))
})
}

fn get_url(&self, py: Python, encoding: &str, network_id: &str) -> PyResult<Py<PyAny>> {
fn get_url<'py>(&self, py: Python<'py>, encoding: &str, network_id: &str) -> PyResult<Bound<'py, PyAny>> {
let encoding = WrpcEncoding::from_str(encoding).map_err(|err| PyException::new_err(format!("{}", err)))?;
let network_id = NetworkId::from_str(network_id)?;

let resolver = self.resolver.clone();
py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let url = resolver.get_url(encoding, network_id).await?;
Ok(url)
}}
})
}

// fn connect() TODO
Expand Down
7 changes: 3 additions & 4 deletions wallet/core/src/bindings/python/tx/generator/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,13 @@ impl PendingTransaction {
Ok(())
}

fn submit(&self, py: Python, rpc_client: &RpcClient) -> PyResult<Py<PyAny>> {
fn submit<'py>(&self, py: Python<'py>, rpc_client: &RpcClient) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let rpc: Arc<DynRpcApi> = rpc_client.client().clone();

py_async! {py, async move {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let txid = inner.try_submit(&rpc).await?;
Ok(txid.to_string())
}}
})
}

#[getter]
Expand Down

0 comments on commit 4f3ff10

Please sign in to comment.