From d5c03cded7f8b6460aeb387ea2c35dd683908e6a Mon Sep 17 00:00:00 2001 From: Banyc <36535895+Banyc@users.noreply.github.com> Date: Wed, 13 Nov 2024 13:52:16 +0800 Subject: [PATCH 1/3] feat: Add buffer size, replication, blocksize as file open options --- src/open_options.rs | 48 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/src/open_options.rs b/src/open_options.rs index 8bb0c47..d4b0fa4 100644 --- a/src/open_options.rs +++ b/src/open_options.rs @@ -1,4 +1,4 @@ -use std::ffi::CString; +use std::ffi::{c_int, c_short, CString}; use std::io::{Error, Result}; use hdfs_sys::*; @@ -54,6 +54,9 @@ pub struct OpenOptions { truncate: bool, create: bool, create_new: bool, + buffer_size: c_int, + replication: c_short, + blocksize: i32, } /// HDFS's client handle is thread safe. @@ -71,9 +74,42 @@ impl OpenOptions { truncate: false, create: false, create_new: false, + buffer_size: 0, + replication: 0, + blocksize: 0, } } + /// Sets size of buffer for read/write. + /// + /// Pass `0` if you want to use the default configured values. + /// + /// `0` by default. + pub fn with_buffer_size(&mut self, buffer_size: c_int) -> &mut Self { + self.buffer_size = buffer_size; + self + } + + /// Sets block replication. + /// + /// Pass `0` if you want to use the default configured values. + /// + /// `0` by default. + pub fn with_replication(&mut self, replication: c_short) -> &mut Self { + self.replication = replication; + self + } + + /// Sets size of block. + /// + /// Pass `0` if you want to use the default configured values. + /// + /// `0` by default. + pub fn with_blocksize(&mut self, blocksize: i32) -> &mut Self { + self.blocksize = blocksize; + self + } + /// Sets the option for read access. /// /// This option, when true, will indicate that the file should be @@ -337,8 +373,14 @@ impl OpenOptions { debug!("open file {} with flags {}", path, flags); let b = unsafe { let p = CString::new(path)?; - // TODO: we need to support buffer size, replication and block size. - hdfsOpenFile(self.fs, p.as_ptr(), flags, 0, 0, 0) + hdfsOpenFile( + self.fs, + p.as_ptr(), + flags, + self.buffer_size, + self.replication, + self.blocksize, + ) }; if b.is_null() { From 1c9991337391cf2633883997dc3ac9e81169c6f1 Mon Sep 17 00:00:00 2001 From: Banyc <36535895+Banyc@users.noreply.github.com> Date: Wed, 13 Nov 2024 15:10:51 +0800 Subject: [PATCH 2/3] fix: Clean up unnecessary feature gates and fix sync uses in async test cases (#136) --- tests/main.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/tests/main.rs b/tests/main.rs index e4fabb8..e5d0ffd 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -199,7 +199,7 @@ fn test_file() -> Result<()> { Ok(()) } -#[cfg(feature = "futures-io")] +#[cfg(feature = "async_file")] #[tokio::test] async fn test_tokio_file() -> Result<()> { use futures::io::*; @@ -225,7 +225,12 @@ async fn test_tokio_file() -> Result<()> { { // Write file debug!("test file write"); - let mut f = fs.open_file().create(true).write(true).open(&path)?; + let mut f = fs + .open_file() + .create(true) + .write(true) + .async_open(&path) + .await?; f.write_all(&content).await?; // Flush file debug!("test file flush"); @@ -235,7 +240,7 @@ async fn test_tokio_file() -> Result<()> { { // Read file debug!("test file read"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let mut buf = Vec::new(); let n = f.read_to_end(&mut buf).await?; assert_eq!(n, content.len()); @@ -254,7 +259,7 @@ async fn test_tokio_file() -> Result<()> { { // Seek file. debug!("test file seek"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let offset = content.len() / 2; let size = content.len() - offset; let mut buf = Vec::new(); @@ -282,9 +287,10 @@ async fn test_tokio_file() -> Result<()> { Ok(()) } -#[cfg(feature = "tokio-io")] +#[cfg(feature = "async_file")] #[tokio::test] async fn test_futures_file() -> Result<()> { + use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::io::*; let _ = env_logger::try_init(); @@ -308,7 +314,12 @@ async fn test_futures_file() -> Result<()> { { // Write file debug!("test file write"); - let mut f = fs.open_file().create(true).write(true).open(&path)?; + let mut f = fs + .open_file() + .create(true) + .write(true) + .async_open(&path) + .await?; f.write_all(&content).await?; // Flush file debug!("test file flush"); @@ -318,7 +329,7 @@ async fn test_futures_file() -> Result<()> { { // Read file debug!("test file read"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let mut buf = Vec::new(); let n = f.read_to_end(&mut buf).await?; assert_eq!(n, content.len()); @@ -337,7 +348,7 @@ async fn test_futures_file() -> Result<()> { { // Seek file. debug!("test file seek"); - let mut f = fs.open_file().read(true).open(&path)?; + let mut f = fs.open_file().read(true).async_open(&path).await?; let offset = content.len() / 2; let size = content.len() - offset; let mut buf = Vec::new(); From 8f4839ba94aeffd8f642720926d5ea3c50a96426 Mon Sep 17 00:00:00 2001 From: Banyc <36535895+Banyc@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:29:33 +0800 Subject: [PATCH 3/3] fix: Expose only Rust primitive integer types --- src/open_options.rs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/src/open_options.rs b/src/open_options.rs index d4b0fa4..58175ba 100644 --- a/src/open_options.rs +++ b/src/open_options.rs @@ -1,5 +1,5 @@ use std::ffi::{c_int, c_short, CString}; -use std::io::{Error, Result}; +use std::io::{Error, ErrorKind, Result}; use hdfs_sys::*; use log::debug; @@ -54,9 +54,9 @@ pub struct OpenOptions { truncate: bool, create: bool, create_new: bool, - buffer_size: c_int, - replication: c_short, - blocksize: i32, + buffer_size: usize, + replication: usize, + blocksize: usize, } /// HDFS's client handle is thread safe. @@ -85,7 +85,7 @@ impl OpenOptions { /// Pass `0` if you want to use the default configured values. /// /// `0` by default. - pub fn with_buffer_size(&mut self, buffer_size: c_int) -> &mut Self { + pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut Self { self.buffer_size = buffer_size; self } @@ -95,7 +95,7 @@ impl OpenOptions { /// Pass `0` if you want to use the default configured values. /// /// `0` by default. - pub fn with_replication(&mut self, replication: c_short) -> &mut Self { + pub fn with_replication(&mut self, replication: usize) -> &mut Self { self.replication = replication; self } @@ -105,7 +105,7 @@ impl OpenOptions { /// Pass `0` if you want to use the default configured values. /// /// `0` by default. - pub fn with_blocksize(&mut self, blocksize: i32) -> &mut Self { + pub fn with_blocksize(&mut self, blocksize: usize) -> &mut Self { self.blocksize = blocksize; self } @@ -340,7 +340,8 @@ impl OpenOptions { /// * [`AlreadyExists`]: `create_new` was specified and the file already /// exists. /// * [`InvalidInput`]: Invalid combinations of open options (truncate - /// without write access, no access mode set, etc.). + /// without write access, no access mode set, incompatible integer values, + /// etc.). /// /// The following errors don't match any existing [`io::ErrorKind`] at the moment: /// * One of the directory components of the specified file path @@ -373,13 +374,31 @@ impl OpenOptions { debug!("open file {} with flags {}", path, flags); let b = unsafe { let p = CString::new(path)?; + let buffer_size: c_int = self.buffer_size.try_into().map_err(|_| { + Error::new( + ErrorKind::InvalidInput, + format!("`buffer_size` {} exceeds valid `c_int`", self.buffer_size), + ) + })?; + let replication: c_short = self.replication.try_into().map_err(|_| { + Error::new( + ErrorKind::InvalidInput, + format!("`replication` {} exceeds valid `c_short`", self.replication), + ) + })?; + let blocksize: i32 = self.blocksize.try_into().map_err(|_| { + Error::new( + ErrorKind::InvalidInput, + format!("`blocksize` {} exceeds valid `i32`", self.blocksize), + ) + })?; hdfsOpenFile( self.fs, p.as_ptr(), flags, - self.buffer_size, - self.replication, - self.blocksize, + buffer_size, + replication, + blocksize, ) };