Skip to content

Commit

Permalink
rust\lakesoul-datafusion\test\upsert_tests.rs add some test cases
Browse files Browse the repository at this point in the history
Signed-off-by: zhaishuang <[email protected]>
  • Loading branch information
zhaishuangszszs committed Nov 10, 2023
1 parent b16c6ff commit 10a7ddd
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 1 deletion.
8 changes: 8 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ prost = "0.11"
async-trait = "0.1"
futures = "0.3"
uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]}
chrono = { version = "0.4", features = ["unstable-locales"] }
282 changes: 281 additions & 1 deletion rust/lakesoul-datafusion/src/test/upsert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ mod upsert_with_io_config_tests {
use std::env;
use std::path::PathBuf;
use std::time::SystemTime;

use chrono::naive::NaiveDate;

use lakesoul_io::arrow::record_batch::RecordBatch;
use lakesoul_io::arrow::util::pretty::print_batches;
use lakesoul_io::datafusion::assert_batches_eq;
Expand All @@ -20,6 +21,7 @@ mod upsert_with_io_config_tests {
use lakesoul_io::arrow::datatypes::{Schema, SchemaRef, Field};
use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder;
use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter;
use lakesoul_io::arrow::array::Int64Array;


fn init_table(batch: RecordBatch, table_name: &str, pks:Vec<String>) -> LakeSoulIOConfigBuilder {
Expand Down Expand Up @@ -79,6 +81,34 @@ mod upsert_with_io_config_tests {
RecordBatch::try_from_iter_with_nullable(iter).unwrap()
}

fn create_batch_i64(names: Vec<&str>, values: Vec<&[i64]>) -> RecordBatch {
let values = values
.into_iter()
.map(|vec| Arc::new(Int64Array::from(Vec::from(vec))) as ArrayRef)
.collect::<Vec<ArrayRef>>();
let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::<Vec<_>>();
RecordBatch::try_from_iter_with_nullable(iter).unwrap()
}

fn check_upsert_i64(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option<String>, builder: LakeSoulIOConfigBuilder, expected: &[&str]) -> LakeSoulIOConfigBuilder {
let builder = execute_upsert(batch, table_name, builder.clone());
let builder = builder
.with_schema(SchemaRef::new(Schema::new(
selected_cols.iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int64, true)).collect::<Vec<_>>()
)));
let builder = if let Some(filters) = filters {
builder.with_filter_str(filters)
} else {
builder
};
let config = builder.clone().build();

let mut reader = SyncSendableMutableLakeSoulReader::new(LakeSoulReader::new(config).unwrap(), Builder::new_current_thread().build().unwrap());
let _ = reader.start_blocked();
let result = reader.next_rb_blocked();
assert_batches_eq!(expected, &[result.unwrap().unwrap()]);
builder
}

#[test]
fn test_merge_same_column_i32() {
Expand Down Expand Up @@ -518,6 +548,256 @@ mod upsert_with_io_config_tests {
);
}

#[test]
fn test_derange_hash_key_and_data_schema_order_int_type_upsert_1_times_i32(){
let table_name = "derange_hash_key_and_data_schema_order_int_type_upsert_1_times_i32";
let builder = init_table(
create_batch_i32(vec!["range", "hash1", "hash2", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2],&[1, 2]]),
table_name,
vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()]);
check_upsert(
create_batch_i32(vec!["range", "hash1", "hash2", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[12, 32, 42], &[1, 3, 4]]),
table_name,
vec!["range", "hash1", "hash2", "value", "name", "age"],
Some("and(noteq(range, null), eq(range, 20201102))".to_string()),
builder.clone(),
&[
"+----------+-------+-------+-------+------+-----+",
"| range | hash1 | hash2 | value | name | age |",
"+----------+-------+-------+-------+------+-----+",
"| 20201102 | 1 | 12 | 1 | | |",
"| 20201102 | 3 | 32 | 3 | | |",
"| 20201102 | 4 | 42 | 4 | | |",
"+----------+-------+-------+-------+------+-----+",
]
);
}

#[test]
fn test_derange_hash_key_and_data_schema_order_int_type_upsert_2_times_i32(){
let table_name = "derange_hash_key_and_data_schema_order_int_type_upsert_2_times_i32";
let builder = init_table(
create_batch_i32(vec!["range", "hash1", "hash2", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2],&[1, 2]]),
table_name,
vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()]);
let builder = execute_upsert(
create_batch_i32(vec!["range", "hash1", "hash2", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[12, 32, 42], &[1, 3, 4]]),
table_name,
builder);
check_upsert(
create_batch_i32(vec!["range", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[12, 22, 32], &[11, 22, 33], &[1, 2, 3]]),
table_name,
vec!["range", "hash1", "hash2", "value", "name", "age"],
Some("and(noteq(range, null), eq(range, 20201102))".to_string()),
builder.clone(),
&[
"+----------+-------+-------+-------+------+-----+",
"| range | hash1 | hash2 | value | name | age |",
"+----------+-------+-------+-------+------+-----+",
"| 20201102 | 1 | 12 | 1 | 11 | |",
"| 20201102 | 2 | 22 | | 22 | |",
"| 20201102 | 3 | 32 | 3 | 33 | |",
"| 20201102 | 4 | 42 | 4 | | |",
"+----------+-------+-------+-------+------+-----+",
]
);
}

#[test]
fn test_derange_hash_key_and_data_schema_order_int_type_upsert_3_times_i32(){
let table_name = "derange_hash_key_and_data_schema_order_int_type_upsert_3_times_i32";
let builder = init_table(
create_batch_i32(vec!["range", "hash1", "hash2", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2],&[1, 2]]),
table_name,
vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()]);
let builder = execute_upsert(
create_batch_i32(vec!["range", "hash1", "hash2", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[12, 32, 42], &[1, 3, 4]]),
table_name,
builder);
let builder = execute_upsert(
create_batch_i32(vec!["range", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[12, 22, 32], &[11, 22, 33], &[1, 2, 3]]),
table_name,
builder);
check_upsert(
create_batch_i32(vec!["range", "age", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[4567, 2345, 3456], &[42, 22, 32], &[456, 234, 345], &[4, 2, 3]]),
table_name,
vec!["range", "hash1", "hash2", "value", "name", "age"],
Some("and(and(noteq(range, null), eq(range, 20201102)), noteq(value, null))".to_string()),
builder.clone(),
&[
"+----------+-------+-------+-------+------+------+",
"| range | hash1 | hash2 | value | name | age |",
"+----------+-------+-------+-------+------+------+",
"| 20201102 | 1 | 12 | 1 | 11 | |",
"| 20201102 | 3 | 32 | 3 | 345 | 3456 |",
"| 20201102 | 4 | 42 | 4 | 456 | 4567 |",
"+----------+-------+-------+-------+------+------+",
]
);
}

#[test]
fn test_derange_hash_key_and_data_schema_order_string_type_upsert_1_times_i32(){
let table_name = "derange_hash_key_and_data_schema_order_string_type_upsert_1_times_i32";
let builder = init_table(
create_batch_i32(vec!["range", "hash1", "hash2", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2],&[1, 2]]),
table_name,
vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()]);
check_upsert(
create_batch_i32(vec!["range", "hash1", "hash2", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[12, 32, 42], &[1, 3, 4]]),
table_name,
vec!["range", "hash1", "hash2", "value", "name", "age"],
Some("and(noteq(range, null), eq(range, 20201102))".to_string()),
builder.clone(),
&[
"+----------+-------+-------+-------+------+-----+",
"| range | hash1 | hash2 | value | name | age |",
"+----------+-------+-------+-------+------+-----+",
"| 20201102 | 1 | 12 | 1 | | |",
"| 20201102 | 3 | 32 | 3 | | |",
"| 20201102 | 4 | 42 | 4 | | |",
"+----------+-------+-------+-------+------+-----+",
]
);
}

#[test]
fn test_derange_hash_key_and_data_schema_order_string_type_upsert_2_times_i32(){
let table_name = "derange_hash_key_and_data_schema_order_string_type_upsert_2_times_i32";
let builder = init_table(
create_batch_i32(vec!["range", "hash1", "hash2", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2],&[1, 2]]),
table_name,
vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()]);
let builder = execute_upsert(
create_batch_i32(vec!["range", "hash1", "hash2", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[12, 32, 42], &[1, 3, 4]]),
table_name,
builder);
check_upsert(
create_batch_i32(vec!["range", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[12, 22, 32], &[11, 22, 33], &[1, 2, 3]]),
table_name,
vec!["range", "hash1", "hash2", "value", "name", "age"],
Some("and(noteq(range, null), eq(range, 20201102))".to_string()),
builder.clone(),
&[
"+----------+-------+-------+-------+------+-----+",
"| range | hash1 | hash2 | value | name | age |",
"+----------+-------+-------+-------+------+-----+",
"| 20201102 | 1 | 12 | 1 | 11 | |",
"| 20201102 | 2 | 22 | | 22 | |",
"| 20201102 | 3 | 32 | 3 | 33 | |",
"| 20201102 | 4 | 42 | 4 | | |",
"+----------+-------+-------+-------+------+-----+",
]
);
}

#[test]
fn test_derange_hash_key_and_data_schema_order_string_type_upsert_3_times_i32(){
let table_name = "derange_hash_key_and_data_schema_order_string_type_upsert_3_times_i32";
let builder = init_table(
create_batch_i32(vec!["range", "hash1", "hash2", "value", "name", "age"], vec![&[20201101, 20201101], &[1, 2], &[1, 2], &[1, 2], &[1, 2],&[1, 2]]),
table_name,
vec!["range".to_string(), "hash1".to_string(),"hash2".to_string()]);
let builder = execute_upsert(
create_batch_i32(vec!["range", "hash1", "hash2", "value"], vec![&[20201102, 20201102, 20201102], &[1, 3, 4], &[12, 32, 42], &[1, 3, 4]]),
table_name,
builder);
let builder = execute_upsert(
create_batch_i32(vec!["range", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[12, 22, 32], &[11, 22, 33], &[1, 2, 3]]),
table_name,
builder);
check_upsert(
create_batch_i32(vec!["range", "age", "hash2", "name", "hash1"], vec![&[20201102, 20201102, 20201102], &[4567, 2345, 3456], &[42, 22, 32], &[456, 234, 345], &[4, 2, 3]]),
table_name,
vec!["range", "hash1", "hash2", "value", "name", "age"],
Some("and(and(noteq(range, null), eq(range, 20201102)), noteq(value, null))".to_string()),
builder.clone(),
&[
"+----------+-------+-------+-------+------+------+",
"| range | hash1 | hash2 | value | name | age |",
"+----------+-------+-------+-------+------+------+",
"| 20201102 | 1 | 12 | 1 | 11 | |",
"| 20201102 | 3 | 32 | 3 | 345 | 3456 |",
"| 20201102 | 4 | 42 | 4 | 456 | 4567 |",
"+----------+-------+-------+-------+------+------+",
]
);
}

#[test]
fn test_merge_same_column_with_timestamp_type_i64_time(){
let dt1=NaiveDate::from_ymd_opt(1000, 6, 14).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();
let dt2=NaiveDate::from_ymd_opt(1582, 6, 15).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();
let dt3=NaiveDate::from_ymd_opt(1900, 6, 16).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();
let dt4=NaiveDate::from_ymd_opt(2018, 6, 17).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();

let val1=dt1.timestamp_micros();
let val2=dt2.timestamp_micros();
let val3=dt3.timestamp_micros();
let val4=dt4.timestamp_micros();

let table_name = "test_merge_same_column_with_timestamp_type_i64_time";
let builder = init_table(
create_batch_i64(vec!["range", "hash", "value", "timestamp"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4], &[val1, val2, val3, val4]]),
table_name,
vec!["range".to_string(), "hash".to_string()]);
check_upsert_i64(
create_batch_i64(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]),
table_name,
vec!["range", "hash", "value", "timestamp"],
None,
builder.clone(),
&[
"+----------+------+-------+--------------------+",
"| range | hash | value | timestamp |",
"+----------+------+-------+--------------------+",
"| 20201101 | 1 | 11 | -30596023866876544 |",
"| 20201101 | 2 | 2 | -12229803066876544 |",
"| 20201101 | 3 | 33 | -2194615866876544 |",
"| 20201101 | 4 | 44 | |",
"| 20201102 | 4 | 4 | 1529224133123456 |",
"+----------+------+-------+--------------------+",
]
);
}

#[test]
fn test_merge_different_columns_with_timestamp_type_i64_time(){
let dt1=NaiveDate::from_ymd_opt(1000, 6, 14).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();
let _dt2=NaiveDate::from_ymd_opt(1582, 6, 15).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();
let dt3=NaiveDate::from_ymd_opt(1900, 6, 16).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();
let dt4=NaiveDate::from_ymd_opt(2018, 6, 17).unwrap().and_hms_micro_opt(8, 28, 53, 123456).unwrap();

let val1=dt1.timestamp_micros();
let _val2=_dt2.timestamp_micros();
let val3=dt3.timestamp_micros();
let val4=dt4.timestamp_micros();

let table_name = "merge_different_columns_with_timestamp_type_i32_time";
let builder = init_table(
create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]),
table_name,
vec!["range".to_string(), "hash".to_string()]);
check_upsert_i64(
create_batch_i64(vec!["range", "hash", "name", "timestamp"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44], &[val1, val3, val4]]),
table_name,
vec!["range", "hash", "value", "name", "timestamp"],
None,
builder.clone(),
&[
"+----------+------+-------+------+--------------------+",
"| range | hash | value | name | timestamp |",
"+----------+------+-------+------+--------------------+",
"| 20201101 | 1 | 1 | 11 | -30596023866876544 |",
"| 20201101 | 2 | 2 | | |",
"| 20201101 | 3 | 3 | 33 | -2194615866876544 |",
"| 20201101 | 4 | | 44 | 1529224133123456 |",
"| 20201102 | 4 | 4 | | |",
"+----------+------+-------+------+--------------------+",
]
);
}


}

Expand Down

0 comments on commit 10a7ddd

Please sign in to comment.