From dde1a8160aa5d74ac97199411658a7bcb6ed2c57 Mon Sep 17 00:00:00 2001 From: mag1c1an1 Date: Tue, 9 Jan 2024 16:54:54 +0800 Subject: [PATCH] [Rust] Apply clippy and fix typos; Signed-off-by: mag1c1an1 --- .gitignore | 1 + rust/lakesoul-datafusion/src/lakesoul_table/mod.rs | 5 +---- rust/lakesoul-datafusion/src/test/mod.rs | 2 +- rust/lakesoul-io/src/helpers.rs | 8 ++++---- rust/lakesoul-io/src/lakesoul_reader.rs | 11 +++++------ rust/lakesoul-io/src/lakesoul_writer.rs | 2 +- rust/lakesoul-io/src/sorted_merge/sort_key_range.rs | 3 ++- .../src/sorted_merge/sorted_stream_merger.rs | 8 ++++---- rust/lakesoul-io/src/transform.rs | 3 ++- 9 files changed, 21 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index 51b330fc9..28b13467b 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,4 @@ __pycache__/ /python/lakesoul.egg-info/ /python/*.whl /wheelhouse/ +/rust/.idea diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs index 9d511863d..4d7e33417 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs @@ -132,10 +132,7 @@ impl LakeSoulTable { } pub fn hash_bucket_num(&self) -> usize { - match self.properties.hash_bucket_num { - Some(hash_bucket_num) => hash_bucket_num, - None => 1, - } + self.properties.hash_bucket_num.unwrap_or_else(|| 1) } pub fn schema(&self) -> SchemaRef { diff --git a/rust/lakesoul-datafusion/src/test/mod.rs b/rust/lakesoul-datafusion/src/test/mod.rs index 11b8847d4..738329082 100644 --- a/rust/lakesoul-datafusion/src/test/mod.rs +++ b/rust/lakesoul-datafusion/src/test/mod.rs @@ -14,7 +14,7 @@ mod hash_tests; #[ctor::ctor] fn init() { - lakesoul_io::tokio::runtime::Builder::new_multi_thread() + tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index 7a89c85ca..eaaf47c54 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -13,12 +13,12 @@ use datafusion::{ use datafusion_common::{DFSchema, Result}; pub fn create_sort_exprs( - colunms: &[String], + columns: &[String], input_dfschema: &DFSchema, input_schema: &Schema, session_state: &SessionState, ) -> Result> { - colunms + columns .iter() .map(|column| { create_physical_sort_expr( @@ -32,13 +32,13 @@ pub fn create_sort_exprs( } pub fn create_hash_partitioning( - colunms: &[String], + columns: &[String], partitioning_num: usize, input_dfschema: &DFSchema, input_schema: &Schema, session_state: &SessionState, ) -> Result { - let runtime_expr = colunms + let runtime_expr = columns .iter() .map(|column| { create_physical_expr( diff --git a/rust/lakesoul-io/src/lakesoul_reader.rs b/rust/lakesoul-io/src/lakesoul_reader.rs index 3402d8681..8bc0ad7bd 100644 --- a/rust/lakesoul-io/src/lakesoul_reader.rs +++ b/rust/lakesoul-io/src/lakesoul_reader.rs @@ -4,6 +4,7 @@ use atomic_refcell::AtomicRefCell; use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::physical_plan::SendableRecordBatchStream; use std::sync::Arc; use arrow_schema::SchemaRef; @@ -15,8 +16,6 @@ pub use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; -use core::pin::Pin; -use datafusion::physical_plan::RecordBatchStream; use futures::StreamExt; use tokio::runtime::Runtime; @@ -31,7 +30,7 @@ use crate::lakesoul_io_config::{create_session_context, LakeSoulIOConfig}; pub struct LakeSoulReader { sess_ctx: SessionContext, config: LakeSoulIOConfig, - stream: Option>>, + stream: Option, pub(crate) schema: Option, } @@ -183,7 +182,7 @@ mod tests { while let Some(rb) = reader.next_rb().await { let num_rows = &rb.unwrap().num_rows(); - row_cnt = row_cnt + num_rows; + row_cnt += num_rows; } assert_eq!(row_cnt, 1000); Ok(()) @@ -261,7 +260,7 @@ mod tests { let rb = rb.unwrap(); let num_rows = &rb.num_rows(); unsafe { - ROW_CNT = ROW_CNT + num_rows; + ROW_CNT += num_rows; println!("{}", ROW_CNT); } @@ -300,7 +299,7 @@ mod tests { while let Some(rb) = reader.next_rb().await { let num_rows = &rb.unwrap().num_rows(); unsafe { - ROW_CNT = ROW_CNT + num_rows; + ROW_CNT += num_rows; println!("{}", ROW_CNT); } sleep(Duration::from_millis(20)).await; diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index c6c0a48de..cda8cc1d5 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -349,7 +349,7 @@ impl SortAsyncWriter { Ok(batch) => { async_writer.write_record_batch(batch).await?; } - // received abort singal + // received abort signal Err(_) => return async_writer.abort_and_close().await, } } diff --git a/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs b/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs index a4abe745c..ed80ef243 100644 --- a/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs +++ b/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs @@ -187,7 +187,8 @@ impl Clone for SortKeyArrayRange { } } -// Multiple ranges with same sorted primary key from variant source record_batch. These ranges will be merged into ONE row of target record_batch finnally. +// Multiple ranges with same sorted primary key from variant source record_batch. +// These ranges will be merged into ONE row of target record_batch finnally. #[derive(Debug, Clone)] pub struct SortKeyBatchRanges { // vector with length=column_num that holds a Vector of SortKeyArrayRange to be merged for each column diff --git a/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs b/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs index dbfa623bf..3feeda615 100644 --- a/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs +++ b/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs @@ -28,7 +28,7 @@ pub(crate) struct SortedStream { impl Debug for SortedStream { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "InMemSorterStream") + write!(f, "InMemSortedStream") } } @@ -271,7 +271,7 @@ impl SortedStreamMerger { if !range.is_finished() { self.range_combiner.push_range(Reverse(range)) } else { - // we should mark this stream uninitalized + // we should mark this stream uninitialized // since its polling may return pending self.initialized[stream_idx] = false; self.range_finished[stream_idx] = true; @@ -334,7 +334,7 @@ mod tests { #[tokio::test] async fn test_multi_file_merger() { let session_config = SessionConfig::default().with_batch_size(32); - let session_ctx = SessionContext::with_config(session_config); + let session_ctx = SessionContext::new_with_config(session_config); let project_dir = std::env::current_dir().unwrap(); let files: Vec = vec![ project_dir @@ -686,7 +686,7 @@ mod tests { #[tokio::test] async fn test_sorted_stream_merger_with_sum_and_last() { let session_config = SessionConfig::default().with_batch_size(2); - let session_ctx = SessionContext::with_config(session_config); + let session_ctx = SessionContext::new_with_config(session_config); let task_ctx = session_ctx.task_ctx(); let s1b1 = create_batch( vec!["id", "a", "b", "c"], diff --git a/rust/lakesoul-io/src/transform.rs b/rust/lakesoul-io/src/transform.rs index ee030d9f8..20e8709e9 100644 --- a/rust/lakesoul-io/src/transform.rs +++ b/rust/lakesoul-io/src/transform.rs @@ -17,6 +17,7 @@ use datafusion_common::DataFusionError::{ArrowError, External}; use crate::constant::{ARROW_CAST_OPTIONS, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING}; +/// adjust time zone to UTC pub fn uniform_schema(orig_schema: SchemaRef) -> SchemaRef { Arc::new(Schema::new( orig_schema @@ -112,7 +113,7 @@ pub fn transform_record_batch( transform_arrays, &RecordBatchOptions::new().with_row_count(Some(num_rows)), ) - .map_err(ArrowError) + .map_err(ArrowError) } pub fn transform_array(