diff --git a/.cargo/config.toml b/.cargo/config.toml index f7f01f9..7f0c0fc 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,3 @@ [build] # Setting cfg here means our IDE and CLI both use the same values. -#rustflags = "--cfg shuttle" +#rustflags = "--cfg loom" diff --git a/Cargo.toml b/Cargo.toml index dc38562..1e3f047 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ tracing = { version = "0.1.40", features = ["log"] } twox-hash = { version = "1.6.3", optional = true } once_cell = "1.19.0" ctx-thread = "0.1.1" +shuttle = { version = "0.7.1", optional = true } [target.'cfg(windows)'.dependencies.windows] version = "0.52.0" @@ -66,12 +67,10 @@ test-log = "0.2.14" [target.'cfg(loom)'.dev-dependencies] loom = "0.7" -[target.'cfg(shuttle)'.dependencies] -shuttle = "0.7.1" - [features] default = [] testing = ["dep:fdlimit", "dep:rayon", "dep:twox-hash"] +shuttle = ["dep:shuttle"] [[bench]] name = "possum" diff --git a/src/concurrency/mod.rs b/src/concurrency/mod.rs index 8805463..ee2ac1a 100644 --- a/src/concurrency/mod.rs +++ b/src/concurrency/mod.rs @@ -1,35 +1,54 @@ pub(crate) mod sync; -#[cfg(not(shuttle))] +#[cfg(not(feature = "shuttle"))] pub use std::thread; // This isn't available in loom or shuttle yet. Unfortunately for shuttle it means threads are // spawned outside its control, and it doesn't work. -#[cfg(shuttle)] +#[cfg(feature = "shuttle")] pub use shuttle::thread; -#[cfg(not(shuttle))] +#[cfg(not(feature = "shuttle"))] pub(crate) fn run_blocking(f: F) -> R where - F: FnOnce() -> R+Send, -R: Send, + F: FnOnce() -> R + Send, + R: Send, { - // let (sender, receiver) = std::sync::mpsc::channel(); - // std::thread::scope(|scope|{ - // scope.spawn(f) - // }); - unimplemented!() + if false { + let (sender, receiver) = std::sync::mpsc::channel(); + let tx_thread = std::thread::scope(|scope| { + scope.spawn(|| { + let res = f(); + sender.send(res).unwrap(); + }); + receiver.recv().unwrap() + }); + tx_thread + } else { + f() + } } -#[cfg(shuttle)] +#[cfg(feature = "shuttle")] pub(crate) fn run_blocking(f: F) -> R where - F: FnOnce() -> R+Send, -R: Send, - + F: FnOnce() -> R + Send, + R: Send, { - let (sender, receiver) = shuttle::sync::mpsc::channel(); - std::thread::scope(|scope| { - scope.spawn(f) - }) + use std::sync::mpsc; + let (sender, receiver) = mpsc::channel(); + let tx_thread = std::thread::scope(|scope| { + scope.spawn(||{ + let res = f(); + sender.send(res).unwrap(); + }); + loop { + shuttle::thread::yield_now(); + match receiver.try_recv() { + Err(mpsc::TryRecvError::Empty) => continue, + default => return default.unwrap() + } + } + }); + tx_thread } diff --git a/src/concurrency/sync.rs b/src/concurrency/sync.rs index 4438f8f..ea0535c 100644 --- a/src/concurrency/sync.rs +++ b/src/concurrency/sync.rs @@ -1,9 +1,9 @@ use crate::StableDeref; use std::ops::{Deref, DerefMut}; -#[cfg(shuttle)] +#[cfg(feature = "shuttle")] use shuttle::sync; -#[cfg(not(shuttle))] +#[cfg(not(feature = "shuttle"))] use std::sync; use sync::Mutex as InnerMutex; diff --git a/src/exclusive_file.rs b/src/exclusive_file.rs index d377fbd..c6436e8 100644 --- a/src/exclusive_file.rs +++ b/src/exclusive_file.rs @@ -131,6 +131,6 @@ impl ExclusiveFile { impl Drop for ExclusiveFile { fn drop(&mut self) { - debug!("dropping exclusive file {}", self.id.deref()); + debug!("dropping exclusive file {}", self.id); } } diff --git a/src/file_id.rs b/src/file_id.rs index 279faa4..aebf60a 100644 --- a/src/file_id.rs +++ b/src/file_id.rs @@ -15,14 +15,6 @@ impl Debug for FileId { } } -impl Deref for FileId { - type Target = FileIdInner; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl std::str::FromStr for FileId { type Err = std::num::ParseIntError; diff --git a/src/handle.rs b/src/handle.rs index ff993d9..f58b04d 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -224,25 +224,17 @@ impl Handle { &self, behaviour: TransactionBehavior, ) -> rusqlite::Result { - unsafe { - Ok(self - .start_transaction(|conn, handle| { - let conn_ptr = std::ptr::from_mut(conn); - let conn_void_ptr: CanSend<*mut ()> = std::mem::transmute(conn_ptr); - type TxRes<'a> = rusqlite::Result>; - let tx_thread = std::thread::spawn(move||{ - eprintln!("hello from transaction thread"); - let conn: &mut Connection = std::mem::transmute(conn_void_ptr); - let tx_res: TxRes = conn.transaction_with_behavior(behaviour); - CanSend(Box::into_raw(Box::new(tx_res))) - }); - let rtx_raw = tx_thread.join().unwrap(); - eprintln!("joined transaction thread"); - let rtx = Box::from_raw(rtx_raw.0 as *mut TxRes); - Ok(Transaction::new((*rtx)?, handle)) - })? - .into()) - } + Ok(self + .start_transaction(|conn, handle| { + let tx_thread = run_blocking(|| { + eprintln!("hello from transaction thread"); + conn.transaction_with_behavior(behaviour).map(CanSend) + }); + eprintln!("joined transaction thread"); + let rtx = tx_thread?.0; + Ok(Transaction::new(rtx, handle)) + })? + .into()) } /// Starts a deferred transaction (the default). There is no guaranteed read-only transaction @@ -274,15 +266,6 @@ impl Handle { Ok(reader) } - // pub(crate) fn associated_read<'h, H>(handle: H) -> rusqlite::Result> where H: WithHandle { - // let reader = Reader { - // owned_tx: handle.as_ref().start_deferred_transaction()?, - // handle, - // reads: Default::default(), - // }; - // Ok(reader) - // } - pub fn read_single(&self, key: &[u8]) -> Result>> { let mut reader = self.read()?; let Some(value) = reader.add(key)? else { @@ -467,9 +450,9 @@ impl Handle { Ok(()) } - pub fn delete_prefix(&self, prefix: &[u8]) -> PubResult<()> { + pub fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> PubResult<()> { let mut tx = self.start_deferred_transaction()?; - for item in tx.list_items(prefix)? { + for item in tx.list_items(prefix.as_ref())? { tx.delete_key(&item.key)?; } tx.commit()?.complete(); @@ -583,4 +566,4 @@ impl AsRef for Rc> { struct CanSend(T); -unsafe impl Send for CanSend {} \ No newline at end of file +unsafe impl Send for CanSend {} diff --git a/src/testing.rs b/src/testing.rs index 1fa367e..96f71a7 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -1,7 +1,7 @@ pub mod torrent_storage; use std::hash::Hasher; -use std::io::{copy, SeekFrom, Write}; +use std::io::{BufReader, copy, SeekFrom, Write}; use anyhow::{ensure, Result}; use rand::Rng; @@ -90,6 +90,7 @@ pub fn readable_repeated_bytes(byte: u8, limit: usize) -> Vec { pub fn condense_repeated_bytes(r: impl Read) -> (Option, u64) { let mut count = 0; let mut byte = None; + let r = BufReader::new(r); for b in r.bytes() { let b = b.unwrap(); match byte { @@ -118,12 +119,12 @@ pub fn check_concurrency( loom::model(move || f().unwrap()); Ok(()) } - #[cfg(shuttle)] + #[cfg(feature = "shuttle")] { shuttle::check_random(move || f().unwrap(), iterations_hint); Ok(()) } - #[cfg(all(not(loom), not(shuttle)))] + #[cfg(all(not(loom), not(feature = "shuttle")))] if false { for _ in 0..1000 { f()? diff --git a/src/tests.rs b/src/tests.rs index 29c95a2..79d78ed 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -53,7 +53,8 @@ fn test_inc_array() { /// Show that replacing keys doesn't cause a key earlier in the same values file to be punched. This /// occurred because there were file_id values in the manifest file that had the wrong type, and so /// the query that looked for the starting offset for hole punching would punch out the whole file -/// thinking it was empty. +/// thinking it was empty. Note sometimes this test fails and there's extra values files floating +/// around. I haven't figured out why. #[test] #[cfg(not(miri))] fn test_replace_keys() -> Result<()> { @@ -61,6 +62,7 @@ fn test_replace_keys() -> Result<()> { || { let tempdir = test_tempdir("test_replace_keys")?; let handle = Handle::new(tempdir.path.clone())?; + handle.delete_prefix("")?; let a = "a".as_bytes().to_vec(); let b = "b".as_bytes().to_vec(); let block_size: usize = handle.block_size().try_into()?; @@ -92,9 +94,13 @@ fn test_replace_keys() -> Result<()> { // There can be multiple value files if the value puncher is holding onto a file when another // write occurs. for value_file in values_files { - let mut file = File::open(&value_file.path)?; + let path = &value_file.path; + eprintln!("{:?}", path); + let mut file = File::open(path)?; + // file.sync_all()?; for region in seekhole::Iter::new(&mut file) { let region = region?; + eprintln!("{:?}", region); if matches!(region.region_type, seekhole::RegionType::Data) { allocated_space += region.length(); } diff --git a/tests/simple_tests.rs b/tests/simple_tests.rs index 12a3735..0ddbec9 100644 --- a/tests/simple_tests.rs +++ b/tests/simple_tests.rs @@ -228,7 +228,7 @@ fn torrent_storage_small() -> Result<()> { } Ok(()) }, - 100, + 1, ) } @@ -244,7 +244,7 @@ fn torrent_storage_big() -> Result<()> { view_snapshot_values: true, }) }, - 100, + 1, ) } @@ -288,7 +288,7 @@ fn torrent_storage_inner(opts: TorrentStorageOpts) -> Result<()> { let piece_data = Arc::clone(&piece_data); let start_delay = Duration::from_micros(1000 * (index / 2) as u64); let handle = Arc::clone(&handle); - join_handles.push(std::thread::spawn(move || -> Result<()> { + join_handles.push(thread::spawn(move || -> Result<()> { let key = offset_key(offset); sleep(start_delay); debug!("starting block write"); @@ -481,13 +481,14 @@ fn reads_update_last_used() -> Result<()> { let uniform = UniformDuration::new(Duration::from_nanos(0), LAST_USED_RESOLUTION); for _ in 0..100 { let dither = uniform.sample(&mut rng); - sleep(LAST_USED_RESOLUTION + dither); + // This needs to be a real sleep or the timestamps sqlite generates don't progress. + std::thread::sleep(LAST_USED_RESOLUTION + dither); let new_read_ts = handle.read_single(&key)?.unwrap().last_used(); assert!(new_read_ts > read_ts); } Ok(()) }, - 100, + 10, ) }