Skip to content

Commit

Permalink
Get start_transaction working with spawned thread
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Jul 21, 2024
1 parent 1df00d7 commit 6baca5f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 6 deletions.
26 changes: 26 additions & 0 deletions src/concurrency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,29 @@ pub use std::thread;
// spawned outside its control, and it doesn't work.
#[cfg(shuttle)]
pub use shuttle::thread;

#[cfg(not(shuttle))]
pub(crate) fn run_blocking<F, R>(f: F) -> R
where
F: FnOnce() -> R+Send,
R: Send,
{
// let (sender, receiver) = std::sync::mpsc::channel();
// std::thread::scope(|scope|{
// scope.spawn(f)
// });
unimplemented!()
}

#[cfg(shuttle)]
pub(crate) fn run_blocking<F, R>(f: F) -> R
where
F: FnOnce() -> R+Send,
R: Send,

{
let (sender, receiver) = shuttle::sync::mpsc::channel();
std::thread::scope(|scope| {
scope.spawn(f)
})
}
29 changes: 23 additions & 6 deletions src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,25 @@ impl Handle {
&self,
behaviour: TransactionBehavior,
) -> rusqlite::Result<OwnedTx> {
Ok(self
.start_transaction(|conn, handle| {
let rtx = conn.transaction_with_behavior(behaviour)?;
Ok(Transaction::new(rtx, handle))
})?
.into())
unsafe {
Ok(self
.start_transaction(|conn, handle| {
let conn_ptr = std::ptr::from_mut(conn);
let conn_void_ptr: CanSend<*mut ()> = std::mem::transmute(conn_ptr);
type TxRes<'a> = rusqlite::Result<rusqlite::Transaction<'a>>;
let tx_thread = std::thread::spawn(move||{
eprintln!("hello from transaction thread");
let conn: &mut Connection = std::mem::transmute(conn_void_ptr);
let tx_res: TxRes = conn.transaction_with_behavior(behaviour);
CanSend(Box::into_raw(Box::new(tx_res)))
});
let rtx_raw = tx_thread.join().unwrap();
eprintln!("joined transaction thread");
let rtx = Box::from_raw(rtx_raw.0 as *mut TxRes);
Ok(Transaction::new((*rtx)?, handle))
})?
.into())
}
}

/// Starts a deferred transaction (the default). There is no guaranteed read-only transaction
Expand Down Expand Up @@ -567,3 +580,7 @@ impl AsRef<Handle> for Rc<RwLockReadGuard<'_, Handle>> {
self.deref()
}
}

struct CanSend<T>(T);

unsafe impl<T> Send for CanSend<T> {}

0 comments on commit 6baca5f

Please sign in to comment.