Skip to content

Commit

Permalink
create MetaDataClient && upsert_with_metadata_tests
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Oct 30, 2023
1 parent f82eb3c commit a013d81
Show file tree
Hide file tree
Showing 12 changed files with 968 additions and 185 deletions.
316 changes: 176 additions & 140 deletions rust/Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ lakesoul-io = { path = "../lakesoul-io" }
lakesoul-metadata = { path = "../lakesoul-metadata" }
proto = { path = "../proto" }
prost = "0.11"
async-trait = "0.1"
futures = "0.3"
uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]}
202 changes: 201 additions & 1 deletion rust/lakesoul-datafusion/src/test/upsert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,4 +519,204 @@ mod upsert_with_io_config_tests {
}


}
}

mod upsert_with_metadata_tests {
use std::sync::Arc;
use std::env;
use std::path::PathBuf;
use std::time::SystemTime;


use lakesoul_io::datasource::parquet_source::EmptySchemaProvider;
use lakesoul_io::serde_json::json;
use lakesoul_io::{arrow, datafusion, tokio, serde_json};

use arrow::util::pretty::print_batches;
use arrow::datatypes::{Schema, SchemaRef, Field};
use arrow::record_batch::RecordBatch;
use arrow::array::{ArrayRef, Int32Array};

use datafusion::assert_batches_eq;
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::error::Result;

use proto::proto::entity::{TableInfo, DataCommitInfo, FileOp, DataFileOp, CommitOp, Uuid};
use tokio::runtime::{Builder, Runtime};

use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfigBuilder, LakeSoulIOConfig};
use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader};
use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter;

use lakesoul_metadata::{Client, PreparedStatementMap, MetaDataClient};


fn commit_data(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()>{
let table_name_id = client.get_table_name_id_by_table_name(table_name, "default")?;
match client.commit_data_commit_info(DataCommitInfo {
table_id: table_name_id.table_id,
partition_desc: "-5".to_string(),
file_ops: config.files_slice()
.iter()
.map(|file| DataFileOp {
file_op: FileOp::Add as i32,
path: file.clone(),
..Default::default()
})
.collect(),
commit_op: CommitOp::AppendCommit as i32,
timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as i64,
commit_id: {
let (high, low) = uuid::Uuid::new_v4().as_u64_pair();
Some(Uuid{high, low})
},
..Default::default()
})
{
Ok(()) => Ok(()),
Err(e) => Err(lakesoul_io::lakesoul_reader::DataFusionError::IoError(e))
}
}

fn create_table(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()> {
match client.create_table(
TableInfo {
table_id: format!("table_{}", uuid::Uuid::new_v4().to_string()),
table_name: table_name.to_string(),
table_path: [env::temp_dir().to_str().unwrap(), table_name].iter().collect::<PathBuf>().to_str().unwrap().to_string(),
table_schema: serde_json::to_string(&config.schema()).unwrap(),
table_namespace: "default".to_string(),
properties: "{}".to_string(),
partitions: ";".to_owned() + config.primary_keys_slice().iter().map(String::as_str).collect::<Vec<_>>().join(",").as_str(),
domain: "public".to_string(),
})
{
Ok(()) => Ok(()),
Err(e) => Err(lakesoul_io::lakesoul_reader::DataFusionError::IoError(e))
}
}

fn create_io_config_builder(client: &mut MetaDataClient, table_name: &str) -> LakeSoulIOConfigBuilder {
let table_info = client.get_table_info_by_table_name(table_name, "default").unwrap();
let data_files = client.get_data_files_by_table_name(table_name, vec![], "default").unwrap();
let schema_str = client.get_schema_by_table_name(table_name, "default").unwrap();
let schema = serde_json::from_str::<Schema>(schema_str.as_str()).unwrap();

LakeSoulIOConfigBuilder::new()
.with_files(data_files)
.with_schema(Arc::new(schema))
.with_primary_keys(
parse_table_info_partitions(table_info.partitions).1
)
}

fn parse_table_info_partitions(partitions: String) -> (Vec<String>, Vec<String>) {
let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').unwrap());
let hash_keys = &hash_keys[1..];
(
range_keys.split(',')
.collect::<Vec<&str>>()
.iter()
.map(|str|str.to_string())
.collect::<Vec<String>>(),
hash_keys.split(',')
.collect::<Vec<&str>>()
.iter()
.map(|str|str.to_string())
.collect::<Vec<String>>()
)
}

fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch {
let values = values
.into_iter()
.map(|vec| Arc::new(Int32Array::from(Vec::from(vec))) as ArrayRef)
.collect::<Vec<ArrayRef>>();
let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::<Vec<_>>();
RecordBatch::try_from_iter_with_nullable(iter).unwrap()
}


fn execute_upsert(batch: RecordBatch, table_name: &str, client: &mut MetaDataClient) -> Result<()> {
let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::<PathBuf>().to_str().unwrap().to_string();
let builder = create_io_config_builder(client, table_name).with_file(file.clone()).with_schema(batch.schema());
let config = builder.clone().build();

let writer = SyncSendableMutableLakeSoulWriter::try_new(config, Builder::new_current_thread().build().unwrap()).unwrap();
writer.write_batch(batch)?;
writer.flush_and_close()?;
commit_data(client, table_name, builder.clone().build())
}




fn init_table(batch: RecordBatch, table_name: &str, pks:Vec<String>, client: &mut MetaDataClient) -> Result<()> {
let schema = batch.schema();
let builder = LakeSoulIOConfigBuilder::new()
.with_schema(schema.clone())
.with_primary_keys(pks);
create_table(client, table_name, builder.build())?;
execute_upsert(batch, table_name, client)
}



fn check_upsert(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option<String>, client: &mut MetaDataClient, expected: &[&str]) -> Result<()> {
execute_upsert(batch, table_name, client)?;
let builder = create_io_config_builder(client, table_name);
let builder = builder
.with_schema(SchemaRef::new(Schema::new(
selected_cols.iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::<Vec<_>>()
)));
let builder = if let Some(filters) = filters {
builder.with_filter_str(filters)
} else {
builder
};
let mut reader = SyncSendableMutableLakeSoulReader::new(LakeSoulReader::new(builder.build()).unwrap(), Builder::new_current_thread().build().unwrap());
reader.start_blocked()?;
let result = reader.next_rb_blocked();
match result {
Some(result) => {
assert_batches_eq!(expected, &[result?]);
Ok(())
},
None => Ok(())
}
}

#[test]
fn test_merge_same_column_i32() -> Result<()>{
let table_name = "merge-same_column";
let mut client = MetaDataClient::from_env();
// let mut client = MetaDataClient::from_config("host=127.0.0.1 port=5433 dbname=test_lakesoul_meta user=yugabyte password=yugabyte".to_string());
client.meta_cleanup()?;
init_table(
create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]),
table_name,
vec!["range".to_string(), "hash".to_string()],
&mut client,
)?;

check_upsert(
create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]),
table_name,
vec!["range", "hash", "value"],
None,
&mut client,
&[
"+----------+------+-------+",
"| range | hash | value |",
"+----------+------+-------+",
"| 20201101 | 1 | 11 |",
"| 20201101 | 2 | 2 |",
"| 20201101 | 3 | 33 |",
"| 20201101 | 4 | 44 |",
"| 20201102 | 4 | 4 |",
"+----------+------+-------+",
]
)
}
}
8 changes: 5 additions & 3 deletions rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ version = "2.3.0"
edition = "2021"

[dependencies]
datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch", features = ["simd"] }
datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch"}
object_store = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["aws"] }

tokio-stream = "0.1.9"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["io", "compat"]}
derivative = "2.2.0"
atomic_refcell = "0.1.8"
arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint", "simd"] }
arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint"] }
arrow-schema = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["serde"] }
arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["simd", "chrono-tz"] }
arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["chrono-tz"] }
arrow-buffer = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred" }
parquet = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["async", "arrow"] }
futures = "0.3"
Expand All @@ -36,6 +36,8 @@ serde_json = { version = "1.0"}

[features]
hdfs = ["dep:hdrs"]
simd = ["datafusion/simd", "arrow/simd", "arrow-array/simd"]
default = []

[dev-dependencies]
tempfile = "3.3.0"
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-io/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
// SPDX-License-Identifier: Apache-2.0

pub mod parquet_source;
pub mod parquet_sink;
Loading

0 comments on commit a013d81

Please sign in to comment.