Skip to content

Commit

Permalink
[Rust] Apply clippy and fix typos;
Browse files Browse the repository at this point in the history
  • Loading branch information
mag1c1an1 committed Jan 10, 2024
1 parent 024890a commit 81f5274
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ __pycache__/
/python/lakesoul.egg-info/
/python/*.whl
/wheelhouse/
/rust/.idea
5 changes: 1 addition & 4 deletions rust/lakesoul-datafusion/src/lakesoul_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ impl LakeSoulTable {
}

pub fn hash_bucket_num(&self) -> usize {
match self.properties.hash_bucket_num {
Some(hash_bucket_num) => hash_bucket_num,
None => 1,
}
self.properties.hash_bucket_num.unwrap_or_else(|| 1)
}

pub fn schema(&self) -> SchemaRef {
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod hash_tests;

#[ctor::ctor]
fn init() {
lakesoul_io::tokio::runtime::Builder::new_multi_thread()
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
Expand Down
8 changes: 4 additions & 4 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use datafusion::{
use datafusion_common::{DFSchema, Result};

pub fn create_sort_exprs(
colunms: &[String],
columns: &[String],
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Vec<PhysicalSortExpr>> {
colunms
columns
.iter()
.map(|column| {
create_physical_sort_expr(
Expand All @@ -32,13 +32,13 @@ pub fn create_sort_exprs(
}

pub fn create_hash_partitioning(
colunms: &[String],
columns: &[String],
partitioning_num: usize,
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Partitioning> {
let runtime_expr = colunms
let runtime_expr = columns
.iter()
.map(|column| {
create_physical_expr(
Expand Down
11 changes: 5 additions & 6 deletions rust/lakesoul-io/src/lakesoul_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use atomic_refcell::AtomicRefCell;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::physical_plan::SendableRecordBatchStream;
use std::sync::Arc;

use arrow_schema::SchemaRef;
Expand All @@ -15,8 +16,6 @@ pub use datafusion::error::{DataFusionError, Result};

use datafusion::prelude::SessionContext;

use core::pin::Pin;
use datafusion::physical_plan::RecordBatchStream;
use futures::StreamExt;

use tokio::runtime::Runtime;
Expand All @@ -31,7 +30,7 @@ use crate::lakesoul_io_config::{create_session_context, LakeSoulIOConfig};
pub struct LakeSoulReader {
sess_ctx: SessionContext,
config: LakeSoulIOConfig,
stream: Option<Pin<Box<dyn RecordBatchStream + Send>>>,
stream: Option<SendableRecordBatchStream>,
pub(crate) schema: Option<SchemaRef>,
}

Expand Down Expand Up @@ -183,7 +182,7 @@ mod tests {

while let Some(rb) = reader.next_rb().await {
let num_rows = &rb.unwrap().num_rows();
row_cnt = row_cnt + num_rows;
row_cnt += num_rows;
}
assert_eq!(row_cnt, 1000);
Ok(())
Expand Down Expand Up @@ -261,7 +260,7 @@ mod tests {
let rb = rb.unwrap();
let num_rows = &rb.num_rows();
unsafe {
ROW_CNT = ROW_CNT + num_rows;
ROW_CNT += num_rows;
println!("{}", ROW_CNT);
}

Expand Down Expand Up @@ -300,7 +299,7 @@ mod tests {
while let Some(rb) = reader.next_rb().await {
let num_rows = &rb.unwrap().num_rows();
unsafe {
ROW_CNT = ROW_CNT + num_rows;
ROW_CNT += num_rows;
println!("{}", ROW_CNT);
}
sleep(Duration::from_millis(20)).await;
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl SortAsyncWriter {
Ok(batch) => {
async_writer.write_record_batch(batch).await?;
}
// received abort singal
// received abort signal
Err(_) => return async_writer.abort_and_close().await,
}
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lakesoul-io/src/sorted_merge/sort_key_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ impl Clone for SortKeyArrayRange {
}
}

// Multiple ranges with same sorted primary key from variant source record_batch. These ranges will be merged into ONE row of target record_batch finnally.
// Multiple ranges with same sorted primary key from variant source record_batch.
// These ranges will be merged into ONE row of target record_batch finnally.
#[derive(Debug, Clone)]
pub struct SortKeyBatchRanges {
// vector with length=column_num that holds a Vector of SortKeyArrayRange to be merged for each column
Expand Down
8 changes: 4 additions & 4 deletions rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) struct SortedStream {

impl Debug for SortedStream {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "InMemSorterStream")
write!(f, "InMemSortedStream")
}
}

Expand Down Expand Up @@ -271,7 +271,7 @@ impl SortedStreamMerger {
if !range.is_finished() {
self.range_combiner.push_range(Reverse(range))
} else {
// we should mark this stream uninitalized
// we should mark this stream uninitialized
// since its polling may return pending
self.initialized[stream_idx] = false;
self.range_finished[stream_idx] = true;
Expand Down Expand Up @@ -334,7 +334,7 @@ mod tests {
#[tokio::test]
async fn test_multi_file_merger() {
let session_config = SessionConfig::default().with_batch_size(32);
let session_ctx = SessionContext::with_config(session_config);
let session_ctx = SessionContext::new_with_config(session_config);
let project_dir = std::env::current_dir().unwrap();
let files: Vec<String> = vec![
project_dir
Expand Down Expand Up @@ -686,7 +686,7 @@ mod tests {
#[tokio::test]
async fn test_sorted_stream_merger_with_sum_and_last() {
let session_config = SessionConfig::default().with_batch_size(2);
let session_ctx = SessionContext::with_config(session_config);
let session_ctx = SessionContext::new_with_config(session_config);
let task_ctx = session_ctx.task_ctx();
let s1b1 = create_batch(
vec!["id", "a", "b", "c"],
Expand Down
3 changes: 2 additions & 1 deletion rust/lakesoul-io/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use datafusion_common::DataFusionError::{ArrowError, External};

use crate::constant::{ARROW_CAST_OPTIONS, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING};

/// adjust time zone to UTC
pub fn uniform_schema(orig_schema: SchemaRef) -> SchemaRef {
Arc::new(Schema::new(
orig_schema
Expand Down Expand Up @@ -112,7 +113,7 @@ pub fn transform_record_batch(
transform_arrays,
&RecordBatchOptions::new().with_row_count(Some(num_rows)),
)
.map_err(ArrowError)
.map_err(ArrowError)
}

pub fn transform_array(
Expand Down

0 comments on commit 81f5274

Please sign in to comment.