From 76a37c9f225c8cd14b301767fd60b7bd2c8eaffe Mon Sep 17 00:00:00 2001 From: zhaishuang <1531939060@qq.com> Date: Thu, 7 Dec 2023 17:47:07 +0800 Subject: [PATCH] add sort feature and refine test cases in upsert_test.rs Signed-off-by: zhaishuang <1531939060@qq.com> --- .../src/test/upsert_tests.rs | 498 +++++++++++++++++- rust/lakesoul-io/src/lakesoul_io_config.rs | 8 + 2 files changed, 496 insertions(+), 10 deletions(-) diff --git a/rust/lakesoul-datafusion/src/test/upsert_tests.rs b/rust/lakesoul-datafusion/src/test/upsert_tests.rs index 77814d300..5f16f7797 100644 --- a/rust/lakesoul-datafusion/src/test/upsert_tests.rs +++ b/rust/lakesoul-datafusion/src/test/upsert_tests.rs @@ -22,7 +22,11 @@ mod upsert_with_io_config_tests { use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; use lakesoul_io::arrow::array::Int64Array; - + enum str_or_i32 { + v1(&'static str), + v2(i32), + } + fn init_table(batch: RecordBatch, table_name: &str, pks:Vec) -> LakeSoulIOConfigBuilder { let builder = LakeSoulIOConfigBuilder::new() @@ -100,6 +104,35 @@ mod upsert_with_io_config_tests { RecordBatch::try_from_iter_with_nullable(iter).unwrap() } + fn create_batch_str_or_i32(names: Vec<&str>, values: Vec<&[str_or_i32]>) -> RecordBatch { + let values = values + .into_iter() + .map(|vec| + match vec[0] { + str_or_i32::v1(_) => { + let vec=vec.into_iter().map(|val| + match val { + str_or_i32::v1(v1) => Some(*v1), + str_or_i32::v2(v2) => None, + } + ).map(|val| val.unwrap()).collect::>(); + Arc::new(StringArray::from(vec)) as ArrayRef + } + str_or_i32::v2(_) => { + let vec=vec.into_iter().map(|val| + match val { + str_or_i32::v1(v1) => None, + str_or_i32::v2(v2) => Some(v2), + } + ).map(|val| *val.unwrap()).collect::>(); + Arc::new(Int32Array::from(vec)) as ArrayRef + } + } + ).collect::>(); + let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::>(); + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + fn check_upsert_i32_and_timestamp(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option, builder: LakeSoulIOConfigBuilder, expected: &[&str]) -> LakeSoulIOConfigBuilder { let builder = execute_upsert(batch, table_name, builder.clone()); let builder = builder @@ -145,6 +178,32 @@ mod upsert_with_io_config_tests { assert_batches_eq!(expected, &[result.unwrap().unwrap()]); builder } + + fn check_upsert_string_or_i32(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option, builder: LakeSoulIOConfigBuilder, expected: &[&str]) -> LakeSoulIOConfigBuilder { + let builder = execute_upsert(batch, table_name, builder.clone()); + let builder = builder + .with_schema(SchemaRef::new(Schema::new( + selected_cols.iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))); + let builder = if let Some(filters) = filters { + builder.with_filter_str(filters) + } else { + builder + }; + let config = builder.clone().build(); + + let mut reader = SyncSendableMutableLakeSoulReader::new(LakeSoulReader::new(config).unwrap(), Builder::new_current_thread().build().unwrap()); + let _ = reader.start_blocked(); + let result = reader.next_rb_blocked(); + assert_batches_eq!(expected, &[result.unwrap().unwrap()]); + builder + } #[test] fn test_merge_same_column_i32() { @@ -837,7 +896,141 @@ mod upsert_with_io_config_tests { #[test] fn test_create_table_with_hash_key_disordered(){ - // mark + let table_name = "test_create_table_with_hash_key_disordered"; + + let batch1=create_batch_str_or_i32(vec!["range", "v1", "hash1", "v2", "hash2"], + vec![&[str_or_i32::v1("range"), str_or_i32::v1("range"), str_or_i32::v1("range")], &[str_or_i32::v1("a1"), str_or_i32::v1("b1"), str_or_i32::v1("c1")], + &[str_or_i32::v2(1), str_or_i32::v2(2), str_or_i32::v2(3)], &[str_or_i32::v1("a2"), str_or_i32::v1("b2"), str_or_i32::v1("c2")], &[str_or_i32::v1("a"), str_or_i32::v1("b"), str_or_i32::v1("c")]] + ); + + let batch2=create_batch_str_or_i32(vec!["range", "hash1", "v1", "v2", "hash2"], + vec![&[str_or_i32::v1("range"), str_or_i32::v1("range"), str_or_i32::v1("range")], &[str_or_i32::v2(1), str_or_i32::v2(2), str_or_i32::v2(3)], &[str_or_i32::v1("a11"), str_or_i32::v1("b11"), str_or_i32::v1("c11")], + &[str_or_i32::v1("a22"), str_or_i32::v1("b22"), str_or_i32::v1("c22")], &[str_or_i32::v1("a"), str_or_i32::v1("b"), str_or_i32::v1("c")]] + ); + + let batch3=create_batch_str_or_i32(vec!["range", "v1", "hash1", "v2", "hash2"], + vec![&[str_or_i32::v1("range"), str_or_i32::v1("range"), str_or_i32::v1("range")], &[str_or_i32::v1("d1"), str_or_i32::v1("b111"), str_or_i32::v1("c111")], + &[str_or_i32::v2(4), str_or_i32::v2(2), str_or_i32::v2(3)], &[str_or_i32::v1("d2"), str_or_i32::v1("b222"), str_or_i32::v1("c222")], &[str_or_i32::v1("d"), str_or_i32::v1("b"), str_or_i32::v1("c")]] + ); + + let builder = init_table( + batch1, + table_name, + vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()]); + + let builder = execute_upsert( + batch2, + table_name, + builder); + + let builder = execute_upsert( + batch3, + table_name, + builder); + + check_upsert_string_or_i32( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["range", "hash1", "hash2", "v1", "v2"], + None, + builder.clone(), + &[ + "+-------+-------+-------+------+------+", + "| range | hash1 | hash2 | v1 | v2 |", + "+-------+-------+-------+------+------+", + "| range | 1 | a | a11 | a22 |", + "| range | 2 | b | b111 | b222 |", + "| range | 3 | c | c111 | c222 |", + "| range | 4 | d | d1 | d2 |", + "+-------+-------+-------+------+------+", + ] + ); + + check_upsert_string_or_i32( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["hash1", "v1", "v2"], + None, + builder.clone(), + &[ + "+-------+------+------+", + "| hash1 | v1 | v2 |", + "+-------+------+------+", + "| 1 | a11 | a22 |", + "| 2 | b111 | b222 |", + "| 3 | c111 | c222 |", + "| 4 | d1 | d2 |", + "+-------+------+------+", + ] + ); + + check_upsert_string_or_i32( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["v1", "v2"], + None, + builder.clone(), + &[ + "+------+------+", + "| v1 | v2 |", + "+------+------+", + "| a11 | a22 |", + "| b111 | b222 |", + "| c111 | c222 |", + "| d1 | d2 |", + "+------+------+", + ] + ); + + check_upsert_string_or_i32( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["range", "v2"], + None, + builder.clone(), + &[ + "+-------+------+", + "| range | v2 |", + "+-------+------+", + "| range | a22 |", + "| range | b222 |", + "| range | c222 |", + "| range | d2 |", + "+-------+------+", + ] + ); } @@ -931,6 +1124,7 @@ mod upsert_with_metadata_tests { use lakesoul_io::filter::parser::Parser; use lakesoul_io::{arrow, datafusion, tokio, serde_json}; + use lakesoul_io::datafusion::logical_expr::Sort; use lakesoul_io::arrow::array::*; use lakesoul_io::arrow::datatypes::{DataType, i256, GenericBinaryType, Int32Type}; @@ -945,6 +1139,8 @@ mod upsert_with_metadata_tests { use datafusion::assert_batches_eq; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion::logical_expr::LogicalPlanBuilder; + use datafusion::logical_expr::col; + use datafusion::logical_expr::Expr; use crate::catalog::lakesoul_sink::LakeSoulSinkProvider; use crate::catalog::lakesoul_source::LakeSoulSourceProvider; use crate::error::Result; @@ -960,6 +1156,10 @@ mod upsert_with_metadata_tests { use crate::catalog::{create_io_config_builder, create_table, commit_data, parse_table_info_partitions}; + enum str_or_i32 { + v1(&'static str), + v2(i32), + } fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch { let values = values @@ -999,8 +1199,37 @@ mod upsert_with_metadata_tests { RecordBatch::try_from_iter_with_nullable(iter).unwrap() } + fn create_batch_str_or_i32(names: Vec<&str>, values: Vec<&[str_or_i32]>) -> RecordBatch { + let values = values + .into_iter() + .map(|vec| + match vec[0] { + str_or_i32::v1(_) => { + let vec=vec.into_iter().map(|val| + match val { + str_or_i32::v1(v1) => Some(*v1), + str_or_i32::v2(v2) => None, + } + ).map(|val| val.unwrap()).collect::>(); + Arc::new(StringArray::from(vec)) as ArrayRef + } + str_or_i32::v2(_) => { + let vec=vec.into_iter().map(|val| + match val { + str_or_i32::v1(v1) => None, + str_or_i32::v2(v2) => Some(v2), + } + ).map(|val| *val.unwrap()).collect::>(); + Arc::new(Int32Array::from(vec)) as ArrayRef + } + } + ).collect::>(); + let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::>(); + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + async fn execute_upsert(record_batch: RecordBatch, table_name: &str, client: MetaDataClientRef) -> Result<()> { - let builder = create_io_config_builder(client, None).await?; + let builder = create_io_config_builder(client, Some(table_name)).await?; let sess_ctx = create_session_context(&mut builder.clone().build())?; let provider = LakeSoulSinkProvider::new(table_name).await?; @@ -1009,8 +1238,14 @@ mod upsert_with_metadata_tests { let num_rows = record_batch.num_rows(); let schema = record_batch.schema(); + let primary_keys=builder.primary_keys_slice(); + let aux_sort_cols=builder.aux_sort_cols_slice(); + let sort_expr:Vec=primary_keys.into_iter() + .chain(aux_sort_cols.iter()) + .map(|pk| col(pk).sort(true, false)).collect(); + let logical_plan = LogicalPlanBuilder::insert_into( - sess_ctx.read_batch(record_batch)?.into_unoptimized_plan(), + sess_ctx.read_batch(record_batch)?.sort(sort_expr)?.into_unoptimized_plan(), table_name.to_string(), &schema.deref())? .build()?; @@ -1343,12 +1578,108 @@ mod upsert_with_metadata_tests { async fn test_basic_upsert_same_columns() -> Result<()>{ // require metadata checker - Ok(()) + let table_name = "test_basic_upsert_same_columns"; + let client = Arc::new(MetaDataClient::from_env().await?); + + init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + SchemaRef::new(Schema::new(["range", "hash", "value"].into_iter().map(|name| Field::new(name, DataType::Int32, true)).collect::>())), + vec!["range".to_string(), "hash".to_string()], + client.clone(), + ).await?; + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "hash", "value"].iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() + ))), + table_name, + vec!["range", "hash", "value"], + None, + client.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 1 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 3 |", + "| 20201102 | 4 | 4 |", + "+----------+------+-------+", + ] + ).await?; + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + None, + client.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 11 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 33 |", + "| 20201101 | 4 | 44 |", + "| 20201102 | 4 | 4 |", + "+----------+------+-------+", + ] + ).await } async fn test_basic_upsert_different_columns() -> Result<()>{ // require metadata checker - Ok(()) + let table_name = "test_basic_upsert_same_columns"; + let client = Arc::new(MetaDataClient::from_env().await?); + + let builder = init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + SchemaRef::new(Schema::new(["range", "hash", "value"].into_iter().map(|name| Field::new(name, DataType::Int32, true)).collect::>())), + vec!["range".to_string(), "hash".to_string()], + client.clone(), + ).await?; + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "hash", "value"].iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() + ))), + table_name, + vec!["range", "hash", "value"], + None, + client.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 1 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 3 |", + "| 20201102 | 4 | 4 |", + "+----------+------+-------+", + ] + ).await?; + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + None, + client.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 11 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 33 |", + "| 20201101 | 4 | 44 |", + "| 20201102 | 4 | 4 |", + "+----------+------+-------+", + ] + ).await } async fn test_should_failed_to_upsert_external_columns_when_schema_auto_migrate_is_false() -> Result<()>{ @@ -1737,7 +2068,7 @@ mod upsert_with_metadata_tests { ).await?; check_upsert_debug( - create_batch_i32(vec!["range", "age", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[2345, 3456, 4567], &[22, 32, 42], &[234, 345, 456], &[2, 3, 4]]), + create_batch_i32(vec!["range", "age", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[4567, 2345, 3456], &[42, 22, 32], &[456, 234, 345], &[4, 2, 3]]), table_name, vec!["range", "hash1", "hash2", "value", "name", "age"], Some("and(and(noteq(range, null), eq(range, 20201102)), noteq(value, null))".to_string()), @@ -1846,7 +2177,7 @@ mod upsert_with_metadata_tests { ).await?; check_upsert( - create_batch_string(vec!["range", "age", "hash2", "name", "hash1"], vec![&["20201102", "20201102", "20201102"], &["2345", "3456", "4567"], &["22", "32", "42"], &["234", "345", "456"], &["2", "3", "4"]]), + create_batch_string(vec!["range", "age", "hash2", "name", "hash1"], vec![&["20201102", "20201102", "20201102"], &["4567", "2345", "3456"], &["42", "22", "32"], &["456", "234", "345"], &["4", "2", "3"]]), table_name, vec!["range", "hash1", "hash2", "value", "name", "age"], Some("and(and(noteq(range, null), eq(range, ________20201102__)), noteq(value, null))".to_string()), @@ -1864,8 +2195,154 @@ mod upsert_with_metadata_tests { } async fn test_create_table_with_hash_key_disordered() -> Result<()>{ - // require metadata checker - Ok(()) + let table_name = "test_create_table_with_hash_key_disordered"; + let client = Arc::new(MetaDataClient::from_env().await?); + + let batch1=create_batch_str_or_i32(vec!["range", "v1", "hash1", "v2", "hash2"], + vec![&[str_or_i32::v1("range"), str_or_i32::v1("range"), str_or_i32::v1("range")], &[str_or_i32::v1("a1"), str_or_i32::v1("b1"), str_or_i32::v1("c1")], + &[str_or_i32::v2(1), str_or_i32::v2(2), str_or_i32::v2(3)], &[str_or_i32::v1("a2"), str_or_i32::v1("b2"), str_or_i32::v1("c2")], &[str_or_i32::v1("a"), str_or_i32::v1("b"), str_or_i32::v1("c")]] + ); + + let batch2=create_batch_str_or_i32(vec!["range", "hash1", "v1", "v2", "hash2"], + vec![&[str_or_i32::v1("range"), str_or_i32::v1("range"), str_or_i32::v1("range")], &[str_or_i32::v2(1), str_or_i32::v2(2), str_or_i32::v2(3)], &[str_or_i32::v1("a11"), str_or_i32::v1("b11"), str_or_i32::v1("c11")], + &[str_or_i32::v1("a22"), str_or_i32::v1("b22"), str_or_i32::v1("c22")], &[str_or_i32::v1("a"), str_or_i32::v1("b"), str_or_i32::v1("c")]] + ); + + let batch3=create_batch_str_or_i32(vec!["range", "v1", "hash1", "v2", "hash2"], + vec![&[str_or_i32::v1("range"), str_or_i32::v1("range"), str_or_i32::v1("range")], &[str_or_i32::v1("d1"), str_or_i32::v1("b111"), str_or_i32::v1("c111")], + &[str_or_i32::v2(4), str_or_i32::v2(2), str_or_i32::v2(3)], &[str_or_i32::v1("d2"), str_or_i32::v1("b222"), str_or_i32::v1("c222")], &[str_or_i32::v1("d"), str_or_i32::v1("b"), str_or_i32::v1("c")]] + ); + + let builder = init_table( + batch1, + table_name, + SchemaRef::new(Schema::new(["range", "v1", "hash1", "v2", "hash2"].into_iter().map(|name| + if name=="hash1"{ + Field::new(name, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(name, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + )), + vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()], + client.clone(), + ).await?; + + execute_upsert( + batch2, + table_name, + client.clone(), + ).await?; + + execute_upsert( + batch3, + table_name, + client.clone(), + ).await?; + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["range", "hash1", "hash2", "v1", "v2"], + None, + client.clone(), + &[ + "+-------+-------+-------+------+------+", + "| range | hash1 | hash2 | v1 | v2 |", + "+-------+-------+-------+------+------+", + "| range | 1 | a | a11 | a22 |", + "| range | 2 | b | b111 | b222 |", + "| range | 3 | c | c111 | c222 |", + "| range | 4 | d | d1 | d2 |", + "+-------+-------+-------+------+------+", + ] + ).await?; + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["hash1", "v1", "v2"], + None, + client.clone(), + &[ + "+-------+------+------+", + "| hash1 | v1 | v2 |", + "+-------+------+------+", + "| 1 | a11 | a22 |", + "| 2 | b111 | b222 |", + "| 3 | c111 | c222 |", + "| 4 | d1 | d2 |", + "+-------+------+------+", + ] + ).await?; + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["v1", "v2"], + None, + client.clone(), + &[ + "+------+------+", + "| v1 | v2 |", + "+------+------+", + "| a11 | a22 |", + "| b111 | b222 |", + "| c111 | c222 |", + "| d1 | d2 |", + "+------+------+", + ] + ).await?; + + check_upsert( + RecordBatch::new_empty(SchemaRef::new(Schema::new( + vec!["range", "v1", "hash1", "v2", "hash2"].iter().map(|col| + if *col=="hash1"{ + Field::new(*col, arrow::datatypes::DataType::Int32, true) + }else{ + Field::new(*col, arrow::datatypes::DataType::Utf8, true) + } + ).collect::>() + ))), + table_name, + vec!["range", "v2"], + None, + client.clone(), + &[ + "+-------+------+", + "| range | v2 |", + "+-------+------+", + "| range | a22 |", + "| range | b222 |", + "| range | c222 |", + "| range | d2 |", + "+-------+------+", + ] + ).await } async fn test_merge_same_column_with_timestamp_type_i32_time() -> Result<()>{ @@ -1986,6 +2463,7 @@ mod upsert_with_metadata_tests { test_derange_hash_key_and_data_schema_order_string_type_upsert_1_times_i32().await?; test_derange_hash_key_and_data_schema_order_string_type_upsert_2_times_i32().await?; test_derange_hash_key_and_data_schema_order_string_type_upsert_3_times_i32().await?;// + test_create_table_with_hash_key_disordered().await?; test_merge_same_column_with_timestamp_type_i32_time().await?; test_merge_different_columns_with_timestamp_type_i32_time().await?; diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 08f2c2e99..9e1f57a67 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -86,6 +86,10 @@ impl LakeSoulIOConfig { pub fn files_slice(&self) -> &[String] { &self.files } + + pub fn aux_sort_cols_slice(&self) -> &[String] { + &self.aux_sort_cols + } } #[derive(Derivative, Debug)] @@ -197,6 +201,10 @@ impl LakeSoulIOConfigBuilder { pub fn primary_keys_slice(&self) -> &[String] { self.config.primary_keys_slice() } + + pub fn aux_sort_cols_slice(&self) -> &[String] { + self.config.aux_sort_cols_slice() + } } impl From for LakeSoulIOConfigBuilder {