From cb8faec072c64eeec05dd8e5f69f1dab47683d5f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 3 Mar 2024 15:12:12 +1100 Subject: [PATCH] Use flocking env in a few places --- src/env.rs | 4 ++++ src/lib.rs | 39 ++++++++++++++++++++++--------- src/sys/flock/flock.rs | 21 +++++++++++++++++ src/sys/flock/freebsd.rs | 20 ++++++++++++++++ src/sys/flock/mod.rs | 16 +++++++++---- src/sys/flock/{unix.rs => ofd.rs} | 25 +++++++------------- src/sys/mod.rs | 4 ++-- 7 files changed, 95 insertions(+), 34 deletions(-) create mode 100644 src/sys/flock/flock.rs create mode 100644 src/sys/flock/freebsd.rs rename src/sys/flock/{unix.rs => ofd.rs} (86%) diff --git a/src/env.rs b/src/env.rs index 19fef7d..2f915ae 100644 --- a/src/env.rs +++ b/src/env.rs @@ -16,3 +16,7 @@ pub(crate) fn emulate_freebsd() -> bool { } } } + +pub(crate) fn flocking() -> bool { + emulate_freebsd() +} diff --git a/src/lib.rs b/src/lib.rs index 9ee28f1..20637ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,6 @@ use std::io::{ErrorKind, Read, Seek, Write}; use std::num::TryFromIntError; use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; -use std::process::abort; use std::sync::{Arc, Mutex, MutexGuard, OnceLock}; use std::time::Duration; use std::{fs, io, str}; @@ -19,6 +18,7 @@ use std::{fs, io, str}; use anyhow::{bail, Context, Result}; use cfg_if::cfg_if; use chrono::NaiveDateTime; +use env::flocking; pub use error::*; use exclusive_file::ExclusiveFile; use file_id::{FileId, FileIdFancy}; @@ -346,8 +346,12 @@ impl<'handle> BatchWriter<'handle> { let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap(); for mut ef in self.exclusive_files.drain(..) { ef.committed().unwrap(); - debug!("returning exclusive file {} to handle", ef.id); - assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none()); + // When we're flocking, we can't have writers and readers at the same time and still be + // able to punch values asynchronously. + if flocking() { + debug!("returning exclusive file {} to handle", ef.id); + assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none()); + } } } } @@ -536,9 +540,9 @@ where .0; let mut file_clone = self.file_clone().unwrap().lock().unwrap(); let file = &mut file_clone.file; - let file_offset = file_offset+pos; + let file_offset = file_offset + pos; // Getting lazy: Using positioned-io's ReadAt because it works on Windows. - let res = file.read_at( file_offset, buf); + let res = file.read_at(file_offset, buf); debug!(?file, ?file_offset, len=?buf, ?res, "snapshot value read_at"); res } @@ -628,7 +632,7 @@ where } } -#[derive(Ord, PartialOrd, Eq, PartialEq)] +#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)] struct ReadExtent { pub offset: u64, pub len: u64, @@ -782,17 +786,30 @@ impl<'a> Reader<'a> { Ok(file_clone) } + fn lock_read_extents<'b>( + file: &File, + read_extents: impl Iterator, + ) -> io::Result<()> { + // This might require a conditional var if it's used everywhere + #[cfg(not(windows))] + if flocking() { + // Possibly we want to block if we're flocking. + assert!(file.flock(LockShared)?); + } + for extent in read_extents { + assert!(file.lock_segment(LockSharedNonblock, Some(extent.len), extent.offset)?); + } + Ok(()) + } + fn get_file_for_read_by_segment_locking( &self, file_id: &FileId, read_extents: &BTreeSet, ) -> PubResult>> { let mut file = open_file_id(OpenOptions::new().read(true), self.handle.dir(), file_id)?; - for extent in read_extents { - if !file.lock_segment(LockSharedNonblock, Some(extent.len), extent.offset)? { - abort(); - } - } + + Self::lock_read_extents(&file, read_extents.iter())?; let len = file.seek(std::io::SeekFrom::End(0))?; let file_clone = FileClone { file, diff --git a/src/sys/flock/flock.rs b/src/sys/flock/flock.rs new file mode 100644 index 0000000..ad464aa --- /dev/null +++ b/src/sys/flock/flock.rs @@ -0,0 +1,21 @@ +//! Implement file locking for systems that lack open file description segment locking but have flock. + +pub use nix::fcntl::FlockArg; +pub use nix::fcntl::FlockArg::*; + +use super::*; + +pub trait Flock { + fn flock(&self, arg: FlockArg) -> io::Result; +} + +impl Flock for File { + /// Locks a segment that spans the maximum possible range of offsets. + fn flock(&self, arg: FlockArg) -> io::Result { + match nix::fcntl::flock(self.as_raw_fd(), arg) { + Ok(()) => Ok(true), + Err(errno) if errno == nix::Error::EWOULDBLOCK => Ok(false), + Err(errno) => Err(std::io::Error::from_raw_os_error(errno as i32)), + } + } +} diff --git a/src/sys/flock/freebsd.rs b/src/sys/flock/freebsd.rs new file mode 100644 index 0000000..95da715 --- /dev/null +++ b/src/sys/flock/freebsd.rs @@ -0,0 +1,20 @@ +use std::fs::File; +use std::io; + +use nix::fcntl::FlockArg; + +use crate::sys::{FileLocking, Flock}; + +impl FileLocking for File { + fn trim_exclusive_lock_left(&self, _old_left: u64, _new_left: u64) -> io::Result { + Ok(true) + } + + fn lock_segment(&self, arg: FlockArg, _len: Option, _offset: u64) -> io::Result { + self.lock_max_segment(arg) + } + + fn lock_max_segment(&self, arg: FlockArg) -> io::Result { + self.flock(arg) + } +} diff --git a/src/sys/flock/mod.rs b/src/sys/flock/mod.rs index cac0a63..f0e36a8 100644 --- a/src/sys/flock/mod.rs +++ b/src/sys/flock/mod.rs @@ -1,12 +1,20 @@ use super::*; cfg_if! { - if #[cfg(unix)] { - mod unix; - pub use self::unix::*; - } else if #[cfg(windows)] { + if #[cfg(windows)] { mod windows; pub use self::windows::*; + } else if #[cfg(target_os = "freebsd")] { + mod freebsd; + } else { + mod ofd; + } +} + +cfg_if! { + if #[cfg(unix)] { + mod flock; + pub use self::flock::*; } } diff --git a/src/sys/flock/unix.rs b/src/sys/flock/ofd.rs similarity index 86% rename from src/sys/flock/unix.rs rename to src/sys/flock/ofd.rs index 62807bc..a2b6fe3 100644 --- a/src/sys/flock/unix.rs +++ b/src/sys/flock/ofd.rs @@ -1,8 +1,5 @@ use std::io::SeekFrom; -pub use nix::fcntl::FlockArg; -pub use nix::fcntl::FlockArg::*; - use super::*; fn seek_from_offset(seek_from: SeekFrom) -> i64 { @@ -60,19 +57,7 @@ pub(super) fn lock_file_segment( let l_type = l_type.try_into().unwrap(); flock_arg.l_type = l_type; flock_arg.l_whence = seek_from_whence(whence); - cfg_if! { - if #[cfg(target_os = "freebsd")] { - use libc::{F_SETLK as SetLock, F_SETLKW as SetLockWait}; - } else { - use libc::*; - #[allow(non_snake_case)] - let (SetLock, SetLockWait) = if emulate_freebsd() { - (F_SETLK, F_SETLKW) - } else { - (F_OFD_SETLK, F_OFD_SETLKW) - }; - } - } + use libc::{F_OFD_SETLK as SetLock, F_OFD_SETLKW as SetLockWait}; #[allow(deprecated)] let arg = match arg { LockShared | LockExclusive => SetLockWait, @@ -97,11 +82,17 @@ pub(super) fn lock_file_segment( impl FileLocking for File { fn trim_exclusive_lock_left(&self, old_left: u64, new_left: u64) -> io::Result { - #[allow(deprecated)] + if flocking() { + return Ok(true); + } + // #[allow(deprecated)] self.lock_segment(UnlockNonblock, Some(new_left - old_left), old_left) } fn lock_segment(&self, arg: FlockArg, len: Option, offset: u64) -> io::Result { + if flocking() { + return self.flock(arg); + } Ok(lock_file_segment( self, arg, diff --git a/src/sys/mod.rs b/src/sys/mod.rs index ac1c9de..6e13525 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -15,7 +15,7 @@ pub use flock::*; pub(crate) use pathconf::*; pub use punchfile::*; -use crate::env::emulate_freebsd; +use crate::env::flocking; cfg_if! { if #[cfg(windows)] { @@ -88,7 +88,7 @@ impl FileSystemFlags for UnixFilesystemFlags { // AFAIK there's no way to check if a filesystem supports block cloning on non-Windows // platforms, and even then it depends on where you're copying to/from, sometimes even on // the same filesystem. - if emulate_freebsd() { + if flocking() { Some(false) } else { None