Skip to content

Commit

Permalink
Finish getting tests to work with shuttle
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Jul 21, 2024
1 parent 6baca5f commit 05b4358
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[build]
# Setting cfg here means our IDE and CLI both use the same values.
#rustflags = "--cfg shuttle"
#rustflags = "--cfg loom"
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tracing = { version = "0.1.40", features = ["log"] }
twox-hash = { version = "1.6.3", optional = true }
once_cell = "1.19.0"
ctx-thread = "0.1.1"
shuttle = { version = "0.7.1", optional = true }

[target.'cfg(windows)'.dependencies.windows]
version = "0.52.0"
Expand All @@ -66,12 +67,10 @@ test-log = "0.2.14"
[target.'cfg(loom)'.dev-dependencies]
loom = "0.7"

[target.'cfg(shuttle)'.dependencies]
shuttle = "0.7.1"

[features]
default = []
testing = ["dep:fdlimit", "dep:rayon", "dep:twox-hash"]
shuttle = ["dep:shuttle"]

[[bench]]
name = "possum"
Expand Down
55 changes: 37 additions & 18 deletions src/concurrency/mod.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,54 @@
pub(crate) mod sync;

#[cfg(not(shuttle))]
#[cfg(not(feature = "shuttle"))]
pub use std::thread;

// This isn't available in loom or shuttle yet. Unfortunately for shuttle it means threads are
// spawned outside its control, and it doesn't work.
#[cfg(shuttle)]
#[cfg(feature = "shuttle")]
pub use shuttle::thread;

#[cfg(not(shuttle))]
#[cfg(not(feature = "shuttle"))]
pub(crate) fn run_blocking<F, R>(f: F) -> R
where
F: FnOnce() -> R+Send,
R: Send,
F: FnOnce() -> R + Send,
R: Send,
{
// let (sender, receiver) = std::sync::mpsc::channel();
// std::thread::scope(|scope|{
// scope.spawn(f)
// });
unimplemented!()
if false {
let (sender, receiver) = std::sync::mpsc::channel();
let tx_thread = std::thread::scope(|scope| {
scope.spawn(|| {
let res = f();
sender.send(res).unwrap();
});
receiver.recv().unwrap()
});
tx_thread
} else {
f()
}
}

#[cfg(shuttle)]
#[cfg(feature = "shuttle")]
pub(crate) fn run_blocking<F, R>(f: F) -> R
where
F: FnOnce() -> R+Send,
R: Send,

F: FnOnce() -> R + Send,
R: Send,
{
let (sender, receiver) = shuttle::sync::mpsc::channel();
std::thread::scope(|scope| {
scope.spawn(f)
})
use std::sync::mpsc;
let (sender, receiver) = mpsc::channel();
let tx_thread = std::thread::scope(|scope| {
scope.spawn(||{
let res = f();
sender.send(res).unwrap();
});
loop {
shuttle::thread::yield_now();
match receiver.try_recv() {
Err(mpsc::TryRecvError::Empty) => continue,
default => return default.unwrap()
}
}
});
tx_thread
}
4 changes: 2 additions & 2 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::StableDeref;
use std::ops::{Deref, DerefMut};

#[cfg(shuttle)]
#[cfg(feature = "shuttle")]
use shuttle::sync;
#[cfg(not(shuttle))]
#[cfg(not(feature = "shuttle"))]
use std::sync;

use sync::Mutex as InnerMutex;
Expand Down
2 changes: 1 addition & 1 deletion src/exclusive_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,6 @@ impl ExclusiveFile {

impl Drop for ExclusiveFile {
fn drop(&mut self) {
debug!("dropping exclusive file {}", self.id.deref());
debug!("dropping exclusive file {}", self.id);
}
}
8 changes: 0 additions & 8 deletions src/file_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ impl Debug for FileId {
}
}

impl Deref for FileId {
type Target = FileIdInner;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::str::FromStr for FileId {
type Err = std::num::ParseIntError;

Expand Down
45 changes: 14 additions & 31 deletions src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,25 +224,17 @@ impl Handle {
&self,
behaviour: TransactionBehavior,
) -> rusqlite::Result<OwnedTx> {
unsafe {
Ok(self
.start_transaction(|conn, handle| {
let conn_ptr = std::ptr::from_mut(conn);
let conn_void_ptr: CanSend<*mut ()> = std::mem::transmute(conn_ptr);
type TxRes<'a> = rusqlite::Result<rusqlite::Transaction<'a>>;
let tx_thread = std::thread::spawn(move||{
eprintln!("hello from transaction thread");
let conn: &mut Connection = std::mem::transmute(conn_void_ptr);
let tx_res: TxRes = conn.transaction_with_behavior(behaviour);
CanSend(Box::into_raw(Box::new(tx_res)))
});
let rtx_raw = tx_thread.join().unwrap();
eprintln!("joined transaction thread");
let rtx = Box::from_raw(rtx_raw.0 as *mut TxRes);
Ok(Transaction::new((*rtx)?, handle))
})?
.into())
}
Ok(self
.start_transaction(|conn, handle| {
let tx_thread = run_blocking(|| {
eprintln!("hello from transaction thread");
conn.transaction_with_behavior(behaviour).map(CanSend)
});
eprintln!("joined transaction thread");
let rtx = tx_thread?.0;
Ok(Transaction::new(rtx, handle))
})?
.into())
}

/// Starts a deferred transaction (the default). There is no guaranteed read-only transaction
Expand Down Expand Up @@ -274,15 +266,6 @@ impl Handle {
Ok(reader)
}

// pub(crate) fn associated_read<'h, H>(handle: H) -> rusqlite::Result<Reader<'h, H>> where H: WithHandle {
// let reader = Reader {
// owned_tx: handle.as_ref().start_deferred_transaction()?,
// handle,
// reads: Default::default(),
// };
// Ok(reader)
// }

pub fn read_single(&self, key: &[u8]) -> Result<Option<SnapshotValue<Value>>> {
let mut reader = self.read()?;
let Some(value) = reader.add(key)? else {
Expand Down Expand Up @@ -467,9 +450,9 @@ impl Handle {
Ok(())
}

pub fn delete_prefix(&self, prefix: &[u8]) -> PubResult<()> {
pub fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> PubResult<()> {
let mut tx = self.start_deferred_transaction()?;
for item in tx.list_items(prefix)? {
for item in tx.list_items(prefix.as_ref())? {
tx.delete_key(&item.key)?;
}
tx.commit()?.complete();
Expand Down Expand Up @@ -583,4 +566,4 @@ impl AsRef<Handle> for Rc<RwLockReadGuard<'_, Handle>> {

struct CanSend<T>(T);

unsafe impl<T> Send for CanSend<T> {}
unsafe impl<T> Send for CanSend<T> {}
7 changes: 4 additions & 3 deletions src/testing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod torrent_storage;

use std::hash::Hasher;
use std::io::{copy, SeekFrom, Write};
use std::io::{BufReader, copy, SeekFrom, Write};

use anyhow::{ensure, Result};
use rand::Rng;
Expand Down Expand Up @@ -90,6 +90,7 @@ pub fn readable_repeated_bytes(byte: u8, limit: usize) -> Vec<u8> {
pub fn condense_repeated_bytes(r: impl Read) -> (Option<u8>, u64) {
let mut count = 0;
let mut byte = None;
let r = BufReader::new(r);
for b in r.bytes() {
let b = b.unwrap();
match byte {
Expand Down Expand Up @@ -118,12 +119,12 @@ pub fn check_concurrency(
loom::model(move || f().unwrap());
Ok(())
}
#[cfg(shuttle)]
#[cfg(feature = "shuttle")]
{
shuttle::check_random(move || f().unwrap(), iterations_hint);
Ok(())
}
#[cfg(all(not(loom), not(shuttle)))]
#[cfg(all(not(loom), not(feature = "shuttle")))]
if false {
for _ in 0..1000 {
f()?
Expand Down
10 changes: 8 additions & 2 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ fn test_inc_array() {
/// Show that replacing keys doesn't cause a key earlier in the same values file to be punched. This
/// occurred because there were file_id values in the manifest file that had the wrong type, and so
/// the query that looked for the starting offset for hole punching would punch out the whole file
/// thinking it was empty.
/// thinking it was empty. Note sometimes this test fails and there's extra values files floating
/// around. I haven't figured out why.
#[test]
#[cfg(not(miri))]
fn test_replace_keys() -> Result<()> {
check_concurrency(
|| {
let tempdir = test_tempdir("test_replace_keys")?;
let handle = Handle::new(tempdir.path.clone())?;
handle.delete_prefix("")?;
let a = "a".as_bytes().to_vec();
let b = "b".as_bytes().to_vec();
let block_size: usize = handle.block_size().try_into()?;
Expand Down Expand Up @@ -92,9 +94,13 @@ fn test_replace_keys() -> Result<()> {
// There can be multiple value files if the value puncher is holding onto a file when another
// write occurs.
for value_file in values_files {
let mut file = File::open(&value_file.path)?;
let path = &value_file.path;
eprintln!("{:?}", path);
let mut file = File::open(path)?;
// file.sync_all()?;
for region in seekhole::Iter::new(&mut file) {
let region = region?;
eprintln!("{:?}", region);
if matches!(region.region_type, seekhole::RegionType::Data) {
allocated_space += region.length();
}
Expand Down
11 changes: 6 additions & 5 deletions tests/simple_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ fn torrent_storage_small() -> Result<()> {
}
Ok(())
},
100,
1,
)
}

Expand All @@ -244,7 +244,7 @@ fn torrent_storage_big() -> Result<()> {
view_snapshot_values: true,
})
},
100,
1,
)
}

Expand Down Expand Up @@ -288,7 +288,7 @@ fn torrent_storage_inner(opts: TorrentStorageOpts) -> Result<()> {
let piece_data = Arc::clone(&piece_data);
let start_delay = Duration::from_micros(1000 * (index / 2) as u64);
let handle = Arc::clone(&handle);
join_handles.push(std::thread::spawn(move || -> Result<()> {
join_handles.push(thread::spawn(move || -> Result<()> {
let key = offset_key(offset);
sleep(start_delay);
debug!("starting block write");
Expand Down Expand Up @@ -481,13 +481,14 @@ fn reads_update_last_used() -> Result<()> {
let uniform = UniformDuration::new(Duration::from_nanos(0), LAST_USED_RESOLUTION);
for _ in 0..100 {
let dither = uniform.sample(&mut rng);
sleep(LAST_USED_RESOLUTION + dither);
// This needs to be a real sleep or the timestamps sqlite generates don't progress.
std::thread::sleep(LAST_USED_RESOLUTION + dither);
let new_read_ts = handle.read_single(&key)?.unwrap().last_used();
assert!(new_read_ts > read_ts);
}
Ok(())
},
100,
10,
)
}

Expand Down

0 comments on commit 05b4358

Please sign in to comment.