From ce2ddc30731880f8c74be77594a6a26555fb3124 Mon Sep 17 00:00:00 2001 From: IchHabeKeineNamen <36535895+Banyc@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:43:57 +0800 Subject: [PATCH] feat: Add buffer size, replication, blocksize as file open options (#135) (#136) --- src/open_options.rs | 71 +++++++++++++++++++++++++++++++++++++++++---- tests/main.rs | 27 ++++++++++++----- 2 files changed, 85 insertions(+), 13 deletions(-) diff --git a/src/open_options.rs b/src/open_options.rs index 8bb0c47..58175ba 100644 --- a/src/open_options.rs +++ b/src/open_options.rs @@ -1,5 +1,5 @@ -use std::ffi::CString; -use std::io::{Error, Result}; +use std::ffi::{c_int, c_short, CString}; +use std::io::{Error, ErrorKind, Result}; use hdfs_sys::*; use log::debug; @@ -54,6 +54,9 @@ pub struct OpenOptions { truncate: bool, create: bool, create_new: bool, + buffer_size: usize, + replication: usize, + blocksize: usize, } /// HDFS's client handle is thread safe. @@ -71,9 +74,42 @@ impl OpenOptions { truncate: false, create: false, create_new: false, + buffer_size: 0, + replication: 0, + blocksize: 0, } } + /// Sets size of buffer for read/write. + /// + /// Pass `0` if you want to use the default configured values. + /// + /// `0` by default. + pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut Self { + self.buffer_size = buffer_size; + self + } + + /// Sets block replication. + /// + /// Pass `0` if you want to use the default configured values. + /// + /// `0` by default. + pub fn with_replication(&mut self, replication: usize) -> &mut Self { + self.replication = replication; + self + } + + /// Sets size of block. + /// + /// Pass `0` if you want to use the default configured values. + /// + /// `0` by default. + pub fn with_blocksize(&mut self, blocksize: usize) -> &mut Self { + self.blocksize = blocksize; + self + } + /// Sets the option for read access. /// /// This option, when true, will indicate that the file should be @@ -304,7 +340,8 @@ impl OpenOptions { /// * [`AlreadyExists`]: `create_new` was specified and the file already /// exists. /// * [`InvalidInput`]: Invalid combinations of open options (truncate - /// without write access, no access mode set, etc.). + /// without write access, no access mode set, incompatible integer values, + /// etc.). /// /// The following errors don't match any existing [`io::ErrorKind`] at the moment: /// * One of the directory components of the specified file path @@ -337,8 +374,32 @@ impl OpenOptions { debug!("open file {} with flags {}", path, flags); let b = unsafe { let p = CString::new(path)?; - // TODO: we need to support buffer size, replication and block size. - hdfsOpenFile(self.fs, p.as_ptr(), flags, 0, 0, 0) + let buffer_size: c_int = self.buffer_size.try_into().map_err(|_| { + Error::new( + ErrorKind::InvalidInput, + format!("`buffer_size` {} exceeds valid `c_int`", self.buffer_size), + ) + })?; + let replication: c_short = self.replication.try_into().map_err(|_| { + Error::new( + ErrorKind::InvalidInput, + format!("`replication` {} exceeds valid `c_short`", self.replication), + ) + })?; + let blocksize: i32 = self.blocksize.try_into().map_err(|_| { + Error::new( + ErrorKind::InvalidInput, + format!("`blocksize` {} exceeds valid `i32`", self.blocksize), + ) + })?; + hdfsOpenFile( + self.fs, + p.as_ptr(), + flags, + buffer_size, + replication, + blocksize, + ) }; if b.is_null() { diff --git a/tests/main.rs b/tests/main.rs index e4fabb8..e5d0ffd 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -199,7 +199,7 @@ fn test_file() -> Result<()> { Ok(()) } -#[cfg(feature = "futures-io")] +#[cfg(feature = "async_file")] #[tokio::test] async fn test_tokio_file() -> Result<()> { use futures::io::*; @@ -225,7 +225,12 @@ async fn test_tokio_file() -> Result<()> { { // Write file debug!("test file write"); - let mut f = fs.open_file().create(true).write(true).open(&path)?; + let mut f = fs + .open_file() + .create(true) + .write(true) + .async_open(&path) + .await?; f.write_all(&content).await?; // Flush file debug!("test file flush"); @@ -235,7 +240,7 @@ async fn test_tokio_file() -> Result<()> { { // Read file debug!("test file read"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let mut buf = Vec::new(); let n = f.read_to_end(&mut buf).await?; assert_eq!(n, content.len()); @@ -254,7 +259,7 @@ async fn test_tokio_file() -> Result<()> { { // Seek file. debug!("test file seek"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let offset = content.len() / 2; let size = content.len() - offset; let mut buf = Vec::new(); @@ -282,9 +287,10 @@ async fn test_tokio_file() -> Result<()> { Ok(()) } -#[cfg(feature = "tokio-io")] +#[cfg(feature = "async_file")] #[tokio::test] async fn test_futures_file() -> Result<()> { + use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::io::*; let _ = env_logger::try_init(); @@ -308,7 +314,12 @@ async fn test_futures_file() -> Result<()> { { // Write file debug!("test file write"); - let mut f = fs.open_file().create(true).write(true).open(&path)?; + let mut f = fs + .open_file() + .create(true) + .write(true) + .async_open(&path) + .await?; f.write_all(&content).await?; // Flush file debug!("test file flush"); @@ -318,7 +329,7 @@ async fn test_futures_file() -> Result<()> { { // Read file debug!("test file read"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let mut buf = Vec::new(); let n = f.read_to_end(&mut buf).await?; assert_eq!(n, content.len()); @@ -337,7 +348,7 @@ async fn test_futures_file() -> Result<()> { { // Seek file. debug!("test file seek"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let offset = content.len() / 2; let size = content.len() - offset; let mut buf = Vec::new();