diff --git a/src/exclusive_file.rs b/src/exclusive_file.rs index 57bce0d..01d090c 100644 --- a/src/exclusive_file.rs +++ b/src/exclusive_file.rs @@ -6,11 +6,19 @@ use std::path::{Path, PathBuf}; use super::*; use crate::FileId; +#[derive(Debug)] +enum LockLevel { + Shared, + Exclusive, +} +use LockLevel::*; + #[derive(Debug)] pub(crate) struct ExclusiveFile { pub(crate) inner: File, pub(crate) id: FileId, last_committed_offset: u64, + lock_level: LockLevel, } impl ExclusiveFile { @@ -68,6 +76,7 @@ impl ExclusiveFile { inner: file, id, last_committed_offset: end, + lock_level: Exclusive, })) } @@ -90,6 +99,22 @@ impl ExclusiveFile { pub(crate) fn next_write_offset(&mut self) -> io::Result { self.inner.stream_position() } + + pub(crate) fn downgrade_lock(&mut self) -> io::Result { + assert!(flocking()); + assert!(matches!(self.lock_level, Exclusive)); + cfg_if! { + if #[cfg(unix)] { + if !self.inner.flock(LockSharedNonblock)? { + return Ok(false); + } + self.lock_level = Shared; + Ok(true) + } else { + unimplemented!() + } + } + } } impl Drop for ExclusiveFile { diff --git a/src/lib.rs b/src/lib.rs index 20637ae..b2a504f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -321,6 +321,11 @@ impl<'handle> BatchWriter<'handle> { } fn commit_inner(mut self, before_write: impl Fn()) -> Result { + if flocking() { + for ef in &mut self.exclusive_files { + assert!(ef.downgrade_lock()?); + } + } let mut transaction: OwnedTx = self.handle.start_immediate_transaction()?; let mut write_commit_res = WriteCommitResult { count: 0 }; for pw in self.pending_writes.drain(..) { @@ -343,28 +348,32 @@ impl<'handle> BatchWriter<'handle> { /// Flush Writer's exclusive files and return them to the Handle pool. fn flush_exclusive_files(&mut self) { - let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap(); - for mut ef in self.exclusive_files.drain(..) { + for ef in &mut self.exclusive_files { ef.committed().unwrap(); - // When we're flocking, we can't have writers and readers at the same time and still be - // able to punch values asynchronously. - if flocking() { - debug!("returning exclusive file {} to handle", ef.id); - assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none()); - } } + self.return_exclusive_files_to_handle() } -} -impl Drop for BatchWriter<'_> { - fn drop(&mut self) { + fn return_exclusive_files_to_handle(&mut self) { + // When we're flocking, we can't have writers and readers at the same time and still be + // able to punch values asynchronously. + if flocking() { + return; + } let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap(); for ef in self.exclusive_files.drain(..) { + debug!("returning exclusive file {} to handle", ef.id); assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none()); } } } +impl Drop for BatchWriter<'_> { + fn drop(&mut self) { + self.return_exclusive_files_to_handle() + } +} + type ValueLength = u64; #[derive(Debug, Clone, PartialEq)] @@ -543,7 +552,7 @@ where let file_offset = file_offset + pos; // Getting lazy: Using positioned-io's ReadAt because it works on Windows. let res = file.read_at(file_offset, buf); - debug!(?file, ?file_offset, len=?buf, ?res, "snapshot value read_at"); + debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read_at"); res } } @@ -593,7 +602,7 @@ where let file = &mut file_clone.file; file.seek(Start(file_offset))?; let res = file.read(buf); - debug!(?file, ?file_offset, len=?buf, ?res, "snapshot value read"); + debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read"); res.map_err(Into::into) } } diff --git a/src/sys/flock/mod.rs b/src/sys/flock/mod.rs index f0e36a8..b431ede 100644 --- a/src/sys/flock/mod.rs +++ b/src/sys/flock/mod.rs @@ -45,7 +45,11 @@ mod tests { // Trying to exclusively lock from another file handle fails immediately. assert!(!file_reopen.lock_segment(LockExclusiveNonblock, None, 2)?); let file_reader = OpenOptions::new().read(true).open(file1_named.path())?; - assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0,)?); + // This won't work with flock, because the entire file is exclusively locked, not just a + // different segment. + if !flocking() { + assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0, )?); + } Ok(()) } }