diff --git a/src/dir.rs b/src/dir.rs index a4e8252..9909345 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -60,4 +60,9 @@ impl Dir { pub fn supports_file_cloning(&self) -> bool { self.supports_file_cloning } + + /// Walks the underlying files in the possum directory. + pub fn walk_dir(&self) -> Result> { + crate::walk::walk_dir(self) + } } diff --git a/src/file_id.rs b/src/file_id.rs index 8742702..8e55480 100644 --- a/src/file_id.rs +++ b/src/file_id.rs @@ -14,7 +14,7 @@ impl FileIdFancy { } /// Value file identifier -#[derive(Clone, Eq, PartialEq, Hash)] +#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] pub struct FileId(OsString); impl Deref for FileId { diff --git a/src/handle.rs b/src/handle.rs index 8db80f5..9f38e6b 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -23,6 +23,7 @@ pub struct Handle { pub(crate) instance_limits: Limits, deleted_values: Option, _value_puncher: Option>, + pub(crate) value_puncher_done: Arc>>, } /// 4 bytes stored in the database header https://sqlite.org/fileformat2.html#database_header. @@ -106,6 +107,8 @@ impl Handle { let mut conn = Connection::open(dir.path().join(MANIFEST_DB_FILE_NAME))?; Self::init_sqlite_conn(&mut conn)?; let (deleted_values, receiver) = std::sync::mpsc::sync_channel(10); + let (value_puncher_done_sender, value_puncher_done) = std::sync::mpsc::sync_channel(0); + let value_puncher_done = Arc::new(Mutex::new(value_puncher_done)); let handle = Self { conn: Mutex::new(conn), exclusive_files: Default::default(), @@ -113,11 +116,15 @@ impl Handle { clones: Default::default(), instance_limits: Default::default(), deleted_values: Some(deleted_values), - _value_puncher: Some(std::thread::spawn(|| -> () { + // Don't wait on this, at least in the Drop handler, because it stays alive until it + // succeeds in punching everything. + _value_puncher: Some(std::thread::spawn(move || -> () { + let _value_puncher_done_sender = value_puncher_done_sender; if let Err(err) = Self::value_puncher(dir, receiver) { error!("value puncher thread failed with {err:?}"); } })), + value_puncher_done, }; Ok(handle) } @@ -289,15 +296,43 @@ impl Handle { | OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI, )?; - while let Ok(mut values) = values_receiver.recv() { - while let Ok(mut more_values) = values_receiver.try_recv() { - values.append(&mut more_values); + const RETRY_DURATION: Duration = Duration::from_secs(1); + let mut pending_values: Vec<_> = Default::default(); + let mut values_receiver_opt = Some(values_receiver); + while values_receiver_opt.is_some() || !pending_values.is_empty() { + match &values_receiver_opt { + Some(values_receiver) => { + let timeout = if pending_values.is_empty() { + Duration::MAX + } else { + RETRY_DURATION + }; + let recv_result = values_receiver.recv_timeout(timeout); + use std::sync::mpsc::RecvTimeoutError; + match recv_result { + Ok(mut values) => { + pending_values.append(&mut values); + // Drain the channel + while let Ok(more_values) = values_receiver.try_recv() { + pending_values.extend(more_values); + } + } + Err(RecvTimeoutError::Timeout) => {} + Err(RecvTimeoutError::Disconnected) => { + // Don't try receiving again. + values_receiver_opt = None; + } + } + } + None => { + std::thread::sleep(RETRY_DURATION); + } } let tx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?; let tx = ReadTransaction { tx: ReadOnlyRusqliteTransaction { conn: tx }, }; - Self::punch_values(&dir, &values, &tx)?; + pending_values = Self::punch_values(&dir, pending_values, &tx)?; } Ok(()) } @@ -306,16 +341,17 @@ impl Handle { /// offsets above the targeted values, ongoing writes should not be affected. pub(crate) fn punch_values( dir: &Dir, - values: &[NonzeroValueLocation], + values: Vec, transaction: &ReadTransactionOwned, - ) -> PubResult<()> { + ) -> PubResult> { + let mut failed = Vec::with_capacity(values.len()); for v in values { let NonzeroValueLocation { file_id, file_offset, length, .. - } = v; + } = &v; let value_length = length; let msg = format!( "deleting value at {:?} {} {}", @@ -323,7 +359,7 @@ impl Handle { ); debug!("{}", msg); // self.handle.clones.lock().unwrap().remove(&file_id); - punch_value(PunchValueOptions { + if !punch_value(PunchValueOptions { dir: dir.path(), file_id, offset: *file_offset, @@ -332,9 +368,12 @@ impl Handle { block_size: dir.block_size(), constraints: Default::default(), }) - .context(msg)?; + .context(msg)? + { + failed.push(v); + } } - Ok(()) + Ok(failed) } pub(crate) fn send_values_for_delete(&self, values: Vec) { diff --git a/src/lib.rs b/src/lib.rs index b2a504f..722fe54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -383,7 +383,7 @@ pub struct Value { } /// Storage location info for a non-zero-length value. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq)] pub struct NonzeroValueLocation { pub file_id: FileId, pub file_offset: u64, @@ -552,7 +552,13 @@ where let file_offset = file_offset + pos; // Getting lazy: Using positioned-io's ReadAt because it works on Windows. let res = file.read_at(file_offset, buf); - debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read_at"); + debug!( + ?file, + ?file_offset, + len = buf.len(), + ?res, + "snapshot value read_at" + ); res } } @@ -602,7 +608,13 @@ where let file = &mut file_clone.file; file.seek(Start(file_offset))?; let res = file.read(buf); - debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read"); + debug!( + ?file, + ?file_offset, + len = buf.len(), + ?res, + "snapshot value read" + ); res.map_err(Into::into) } } @@ -915,7 +927,7 @@ struct PunchValueOptions<'a> { } // Can't do this as &mut self for dumb Rust reasons. -fn punch_value(opts: PunchValueOptions) -> Result<()> { +fn punch_value(opts: PunchValueOptions) -> Result { let PunchValueOptions { dir, file_id, @@ -941,7 +953,7 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> { // Punching values probably requires write permission. let mut file = match OpenOptions::new().write(true).open(&file_path) { // The file could have already been deleted by a previous punch. - Err(err) if err.kind() == ErrorKind::NotFound && allow_remove => return Ok(()), + Err(err) if err.kind() == ErrorKind::NotFound && allow_remove => return Ok(true), Err(err) => return Err(err).context("opening value file"), Ok(ok) => ok, }; @@ -970,10 +982,10 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> { // because there are no values in this file to clone. if offset == 0 && allow_remove { remove_file(file_path).context("removing value file")?; - return Ok(()); + return Ok(true); } else if allow_truncate { file.set_len(offset as u64)?; - return Ok(()); + return Ok(true); } file_end } else if cloning_lock_aware { @@ -997,14 +1009,14 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> { // full block. assert!(length >= -block_size); if length <= 0 { - return Ok(()); + return Ok(true); } assert_eq!(offset % block_size, 0); if !file.lock_segment(LockExclusiveNonblock, Some(length as u64), offset as u64)? { // TODO: If we can't delete immediately, we should schedule to try again later. Maybe // spinning up a thread, or putting in a slow queue. warn!(%file_id, %offset, %length, "can't punch, file segment locked"); - return Ok(()); + return Ok(false); } debug!(?file, %offset, %length, "punching"); punchfile( @@ -1013,14 +1025,14 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> { length.try_into().unwrap(), ) .with_context(|| format!("length {}", length))?; - // fcntl(file.as_raw_fd(), nix::fcntl::F_FULLFSYNC)?; + // nix::fcntl::fcntl(file.as_raw_fd(), nix::fcntl::F_FULLFSYNC)?; // file.flush()?; if check_holes { if let Err(err) = check_hole(&mut file, offset as u64, length as u64) { warn!("checking hole: {}", err); } } - Ok(()) + Ok(true) } /// Checks that there's no data allocated in the region provided. diff --git a/src/sys/flock/mod.rs b/src/sys/flock/mod.rs index b431ede..8252a97 100644 --- a/src/sys/flock/mod.rs +++ b/src/sys/flock/mod.rs @@ -48,7 +48,7 @@ mod tests { // This won't work with flock, because the entire file is exclusively locked, not just a // different segment. if !flocking() { - assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0, )?); + assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0,)?); } Ok(()) } diff --git a/src/tests.rs b/src/tests.rs index 42f998f..6d0b821 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -72,25 +72,33 @@ fn test_replace_keys() -> Result<()> { handle.read_single(&a).unwrap().unwrap().new_reader(), a_value.as_slice(), ); - let entries = handle.walk_dir()?; + + let dir = handle.dir.clone(); + let values_punched = Arc::clone(&handle.value_puncher_done); + drop(handle); + // Wait for it to recv, which should be a disconnect when the value_puncher hangs up. + values_punched.lock().unwrap().recv(); + + let entries = dir.walk_dir()?; let values_files: Vec<_> = entries .iter() .filter(|entry| entry.entry_type == walk::EntryType::ValuesFile) .collect(); - // Make sure there's only a single values file. - assert_eq!(values_files.len(), 1); - let value_file = values_files[0]; - let mut file = File::open(&value_file.path)?; + let mut allocated_space = 0; - for region in seekhole::Iter::new(&mut file) { - let region = region?; - if matches!(region.region_type, seekhole::RegionType::Data) { - allocated_space += region.length(); + // There can be multiple value files if the value puncher is holding onto a file when another + // write occurs. + for value_file in values_files { + let mut file = File::open(&value_file.path)?; + for region in seekhole::Iter::new(&mut file) { + let region = region?; + if matches!(region.region_type, seekhole::RegionType::Data) { + allocated_space += region.length(); + } } } assert!( - [2, 3] - .map(|num_blocks| num_blocks * block_size as seekhole::RegionOffset) + [2].map(|num_blocks| num_blocks * block_size as seekhole::RegionOffset) .contains(&allocated_space), "block_size={}, allocated_space={}", block_size,