diff --git a/.github/workflows/native-build.yml b/.github/workflows/native-build.yml index 8ff3153f3..f70b82357 100644 --- a/.github/workflows/native-build.yml +++ b/.github/workflows/native-build.yml @@ -78,6 +78,7 @@ jobs: - uses: Swatinem/rust-cache@v2 with: workspaces: "./rust -> target" + env-vars: "JAVA_HOME" - uses: actions-rs/cargo@v1 with: command: build diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 34678ae37..52327a22e 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -24,6 +24,27 @@ env: jobs: rust_ci: runs-on: ubuntu-latest + services: + # Label used to access the service container + postgres: + # Docker Hub image + image: postgres:14.5 + # Provide the password for postgres + env: + POSTGRES_PASSWORD: lakesoul_test + POSTGRES_USER: lakesoul_test + POSTGRES_DB: lakesoul_test + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + --name lakesoul-test-pg + ports: + # Maps tcp port 5432 on service container to the host + - 5432:5432 + steps: - uses: actions/checkout@v3 - uses: actions-rs/toolchain@v1 @@ -32,6 +53,11 @@ jobs: toolchain: nightly-2023-05-20 components: clippy default: true + - name: Install psql + run: sudo apt-get install -y postgresql-client-14 + - name: Init PG + run: | + ./script/meta_init_for_local_test.sh -j 2 - name: Install Protoc uses: arduino/setup-protoc@v2 with: diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ec1a799d3..eb8073522 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1534,10 +1534,13 @@ dependencies = [ name = "lakesoul-datafusion" version = "0.1.0" dependencies = [ + "async-trait", + "futures", "lakesoul-io", "lakesoul-metadata", "prost", "proto", + "uuid", ] [[package]] @@ -1598,6 +1601,7 @@ dependencies = [ "serde_json", "tokio", "tokio-postgres", + "url", "uuid", ] diff --git a/rust/lakesoul-datafusion/Cargo.toml b/rust/lakesoul-datafusion/Cargo.toml index 810c9e61b..8e0aa7dd9 100644 --- a/rust/lakesoul-datafusion/Cargo.toml +++ b/rust/lakesoul-datafusion/Cargo.toml @@ -14,3 +14,6 @@ lakesoul-io = { path = "../lakesoul-io" } lakesoul-metadata = { path = "../lakesoul-metadata" } proto = { path = "../proto" } prost = "0.11" +async-trait = "0.1" +futures = "0.3" +uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]} diff --git a/rust/lakesoul-datafusion/src/test/upsert_tests.rs b/rust/lakesoul-datafusion/src/test/upsert_tests.rs index aa55b06dc..4e96a21a9 100644 --- a/rust/lakesoul-datafusion/src/test/upsert_tests.rs +++ b/rust/lakesoul-datafusion/src/test/upsert_tests.rs @@ -519,4 +519,204 @@ mod upsert_with_io_config_tests { } -} \ No newline at end of file +} + +mod upsert_with_metadata_tests { + use std::sync::Arc; + use std::env; + use std::path::PathBuf; + use std::time::SystemTime; + + + use lakesoul_io::datasource::parquet_source::EmptySchemaProvider; + use lakesoul_io::serde_json::json; + use lakesoul_io::{arrow, datafusion, tokio, serde_json}; + + use arrow::util::pretty::print_batches; + use arrow::datatypes::{Schema, SchemaRef, Field}; + use arrow::record_batch::RecordBatch; + use arrow::array::{ArrayRef, Int32Array}; + + use datafusion::assert_batches_eq; + use datafusion::prelude::{DataFrame, SessionContext}; + use datafusion::logical_expr::LogicalPlanBuilder; + use datafusion::error::Result; + + use proto::proto::entity::{TableInfo, DataCommitInfo, FileOp, DataFileOp, CommitOp, Uuid}; + use tokio::runtime::{Builder, Runtime}; + + use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfigBuilder, LakeSoulIOConfig}; + use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader}; + use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; + + use lakesoul_metadata::{Client, PreparedStatementMap, MetaDataClient}; + + + fn commit_data(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()>{ + let table_name_id = client.get_table_name_id_by_table_name(table_name, "default")?; + match client.commit_data_commit_info(DataCommitInfo { + table_id: table_name_id.table_id, + partition_desc: "-5".to_string(), + file_ops: config.files_slice() + .iter() + .map(|file| DataFileOp { + file_op: FileOp::Add as i32, + path: file.clone(), + ..Default::default() + }) + .collect(), + commit_op: CommitOp::AppendCommit as i32, + timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as i64, + commit_id: { + let (high, low) = uuid::Uuid::new_v4().as_u64_pair(); + Some(Uuid{high, low}) + }, + ..Default::default() + }) + { + Ok(()) => Ok(()), + Err(e) => Err(lakesoul_io::lakesoul_reader::DataFusionError::IoError(e)) + } + } + + fn create_table(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()> { + match client.create_table( + TableInfo { + table_id: format!("table_{}", uuid::Uuid::new_v4().to_string()), + table_name: table_name.to_string(), + table_path: [env::temp_dir().to_str().unwrap(), table_name].iter().collect::().to_str().unwrap().to_string(), + table_schema: serde_json::to_string(&config.schema()).unwrap(), + table_namespace: "default".to_string(), + properties: "{}".to_string(), + partitions: ";".to_owned() + config.primary_keys_slice().iter().map(String::as_str).collect::>().join(",").as_str(), + domain: "public".to_string(), + }) + { + Ok(()) => Ok(()), + Err(e) => Err(lakesoul_io::lakesoul_reader::DataFusionError::IoError(e)) + } + } + + fn create_io_config_builder(client: &mut MetaDataClient, table_name: &str) -> LakeSoulIOConfigBuilder { + let table_info = client.get_table_info_by_table_name(table_name, "default").unwrap(); + let data_files = client.get_data_files_by_table_name(table_name, vec![], "default").unwrap(); + let schema_str = client.get_schema_by_table_name(table_name, "default").unwrap(); + let schema = serde_json::from_str::(schema_str.as_str()).unwrap(); + + LakeSoulIOConfigBuilder::new() + .with_files(data_files) + .with_schema(Arc::new(schema)) + .with_primary_keys( + parse_table_info_partitions(table_info.partitions).1 + ) + } + + fn parse_table_info_partitions(partitions: String) -> (Vec, Vec) { + let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').unwrap()); + let hash_keys = &hash_keys[1..]; + ( + range_keys.split(',') + .collect::>() + .iter() + .map(|str|str.to_string()) + .collect::>(), + hash_keys.split(',') + .collect::>() + .iter() + .map(|str|str.to_string()) + .collect::>() + ) + } + + fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch { + let values = values + .into_iter() + .map(|vec| Arc::new(Int32Array::from(Vec::from(vec))) as ArrayRef) + .collect::>(); + let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::>(); + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + + + fn execute_upsert(batch: RecordBatch, table_name: &str, client: &mut MetaDataClient) -> Result<()> { + let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::().to_str().unwrap().to_string(); + let builder = create_io_config_builder(client, table_name).with_file(file.clone()).with_schema(batch.schema()); + let config = builder.clone().build(); + + let writer = SyncSendableMutableLakeSoulWriter::try_new(config, Builder::new_current_thread().build().unwrap()).unwrap(); + writer.write_batch(batch)?; + writer.flush_and_close()?; + commit_data(client, table_name, builder.clone().build()) + } + + + + + fn init_table(batch: RecordBatch, table_name: &str, pks:Vec, client: &mut MetaDataClient) -> Result<()> { + let schema = batch.schema(); + let builder = LakeSoulIOConfigBuilder::new() + .with_schema(schema.clone()) + .with_primary_keys(pks); + create_table(client, table_name, builder.build())?; + execute_upsert(batch, table_name, client) + } + + + + fn check_upsert(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option, client: &mut MetaDataClient, expected: &[&str]) -> Result<()> { + execute_upsert(batch, table_name, client)?; + let builder = create_io_config_builder(client, table_name); + let builder = builder + .with_schema(SchemaRef::new(Schema::new( + selected_cols.iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() + ))); + let builder = if let Some(filters) = filters { + builder.with_filter_str(filters) + } else { + builder + }; + let mut reader = SyncSendableMutableLakeSoulReader::new(LakeSoulReader::new(builder.build()).unwrap(), Builder::new_current_thread().build().unwrap()); + reader.start_blocked()?; + let result = reader.next_rb_blocked(); + match result { + Some(result) => { + assert_batches_eq!(expected, &[result?]); + Ok(()) + }, + None => Ok(()) + } + } + + #[test] + fn test_merge_same_column_i32() -> Result<()>{ + let table_name = "merge-same_column"; + let mut client = MetaDataClient::from_env(); + // let mut client = MetaDataClient::from_config("host=127.0.0.1 port=5433 dbname=test_lakesoul_meta user=yugabyte password=yugabyte".to_string()); + client.meta_cleanup()?; + init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()], + &mut client, + )?; + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + None, + &mut client, + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 11 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 33 |", + "| 20201101 | 4 | 44 |", + "| 20201102 | 4 | 4 |", + "+----------+------+-------+", + ] + ) + } +} diff --git a/rust/lakesoul-io/src/datasource/mod.rs b/rust/lakesoul-io/src/datasource/mod.rs index de47b1339..eeb3c244b 100644 --- a/rust/lakesoul-io/src/datasource/mod.rs +++ b/rust/lakesoul-io/src/datasource/mod.rs @@ -3,3 +3,4 @@ // SPDX-License-Identifier: Apache-2.0 pub mod parquet_source; +pub mod parquet_sink; diff --git a/rust/lakesoul-io/src/datasource/parquet_sink.rs b/rust/lakesoul-io/src/datasource/parquet_sink.rs new file mode 100644 index 000000000..45a41bac3 --- /dev/null +++ b/rust/lakesoul-io/src/datasource/parquet_sink.rs @@ -0,0 +1,192 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + + +use std::any::Any; +use std::fmt::{self, Debug}; +use std::sync::Arc; + +use async_trait::async_trait; + +use datafusion::common::{Statistics, DataFusionError}; + +use datafusion::datasource::TableProvider; +use datafusion::error::Result; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::logical_expr::{ + Expr, TableType, +}; +use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Distribution, stream::RecordBatchStreamAdapter +}; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; + +use crate::lakesoul_io_config::{IOSchema, LakeSoulIOConfig}; +use crate::lakesoul_writer::MultiPartAsyncWriter; +use crate::transform::uniform_schema; + +#[derive(Debug, Clone)] +pub struct LakeSoulParquetSinkProvider{ + schema: SchemaRef, + config: LakeSoulIOConfig +} + +#[async_trait] +impl TableProvider for LakeSoulParquetSinkProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + _projections: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let msg = "Scan not implemented for LakeSoulParquetSinkProvider".to_owned(); + Err(DataFusionError::NotImplemented(msg)) + } + + + async fn insert_into( + &self, + _state: &SessionState, + input: Arc, + ) -> Result> { + let writer_schema = self.schema(); + let mut writer_config = self.config.clone(); + writer_config.schema = IOSchema(uniform_schema(writer_schema)); + let _writer = MultiPartAsyncWriter::try_new(writer_config).await?; + Ok(Arc::new(LakeSoulParquetSinkExec::new(input))) + } + +} + +#[derive(Debug, Clone)] +struct LakeSoulParquetSinkExec { + /// Input plan that produces the record batches to be written. + input: Arc, + /// Sink to whic to write + // sink: Arc, + /// Schema describing the structure of the data. + schema: SchemaRef, + +} + +impl LakeSoulParquetSinkExec { + fn new( + input: Arc, + ) -> Self { + Self { + input, + schema: Arc::new(Schema::empty()) + } + } +} + +impl DisplayAs for LakeSoulParquetSinkExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result { + write!(f, "LakeSoulParquetSinkExec") + } +} + +impl ExecutionPlan for LakeSoulParquetSinkExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] + } + + fn required_input_ordering(&self) -> Vec>> { + // Require that the InsertExec gets the data in the order the + // input produced it (otherwise the optimizer may chose to reorder + // the input which could result in unintended / poor UX) + // + // More rationale: + // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 + vec![self + .input + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs)] + } + + fn maintains_input_order(&self) -> Vec { + vec![false] + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self { + input: children[0].clone(), + schema: self.schema.clone(), + })) + } + + /// Execute the plan and return a stream of `RecordBatch`es for + /// the specified partition. + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if partition != 0 { + return Err(DataFusionError::Internal( + format!("Invalid requested partition {partition}. InsertExec requires a single input partition." + ))); + } + + // Execute each of our own input's partitions and pass them to the sink + let input_partition_count = self.input.output_partitioning().partition_count(); + if input_partition_count != 1 { + return Err(DataFusionError::Internal(format!( + "Invalid input partition count {input_partition_count}. \ + InsertExec needs only a single partition." + ))); + } + + let data = self.input.execute(0, context)?; + let schema = self.schema.clone(); + + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, data))) + } + + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 279426c64..a029d22be 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -74,6 +74,20 @@ pub struct LakeSoulIOConfig { pub(crate) default_fs: String, } +impl LakeSoulIOConfig { + pub fn schema(&self) -> SchemaRef { + self.schema.0.clone() + } + + pub fn primary_keys_slice(&self) -> &[String] { + &self.primary_keys + } + + pub fn files_slice(&self) -> &[String] { + &self.files + } +} + #[derive(Derivative, Debug)] #[derivative(Clone, Default)] pub struct LakeSoulIOConfigBuilder { diff --git a/rust/lakesoul-io/src/lib.rs b/rust/lakesoul-io/src/lib.rs index 99213bc20..a884a0518 100644 --- a/rust/lakesoul-io/src/lib.rs +++ b/rust/lakesoul-io/src/lib.rs @@ -12,15 +12,15 @@ pub mod filter; pub mod lakesoul_writer; pub mod lakesoul_io_config; pub mod sorted_merge; -mod datasource; +pub mod datasource; mod projection; #[cfg(feature = "hdfs")] mod hdfs; -pub mod default_column_stream; -pub mod constant; -pub mod transform; +mod default_column_stream; +mod constant; +mod transform; pub use datafusion::arrow::error::Result; pub use tokio; diff --git a/rust/lakesoul-metadata/Cargo.toml b/rust/lakesoul-metadata/Cargo.toml index b2574d21d..730808d5c 100644 --- a/rust/lakesoul-metadata/Cargo.toml +++ b/rust/lakesoul-metadata/Cargo.toml @@ -22,4 +22,5 @@ prost = "0.11" num_enum = "0.5.1" uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]} serde_json = { version = "1.0"} +url = "2.4.1" diff --git a/rust/lakesoul-metadata/src/lib.rs b/rust/lakesoul-metadata/src/lib.rs index 8310bda36..4696a33b9 100644 --- a/rust/lakesoul-metadata/src/lib.rs +++ b/rust/lakesoul-metadata/src/lib.rs @@ -14,6 +14,9 @@ pub use tokio::runtime::{Builder, Runtime}; pub use tokio_postgres::{NoTls, Client, Statement}; use postgres_types::{ToSql, FromSql}; +mod metadata_client; +pub use metadata_client::MetaDataClient; + pub const DAO_TYPE_QUERY_ONE_OFFSET : i32 = 0; pub const DAO_TYPE_QUERY_LIST_OFFSET : i32 = 100; pub const DAO_TYPE_INSERT_ONE_OFFSET : i32 = 200; diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs new file mode 100644 index 000000000..6463118fc --- /dev/null +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -0,0 +1,345 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +use std::{io::Result, collections::HashMap, vec, env, fs}; + +use proto::proto::entity::{TablePathId, TableNameId, TableInfo, PartitionInfo, JniWrapper, DataCommitInfo, MetaInfo, CommitOp, self}; +use prost::Message; +use tokio::runtime::{Runtime, Builder}; +use tokio_postgres::Client; + +use url::Url; + +use crate::{execute_insert, PreparedStatementMap, DaoType, create_connection, clean_meta_for_test, execute_query, PARAM_DELIM, PARTITION_DESC_DELIM}; + +pub struct MetaDataClient { + runtime: Runtime, + client: Client, + prepared: PreparedStatementMap, +} + +impl Default for MetaDataClient { + fn default() -> Self { + Self::from_config( + "host=127.0.0.1 port=5432 dbname=lakesoul_test user=lakesoul_test password=lakesoul_test".to_string() + ) + } +} + +impl MetaDataClient { + pub fn from_env() -> Self{ + match env::var("lakesoul_home") { + Ok(config_path) => { + let config = fs::read_to_string(&config_path).unwrap_or_else(|_| panic!("Fails at reading config file {}", &config_path)); + let config_map = config.split('\n').filter_map(|property| { + property.find('=').map(|idx| property.split_at(idx + 1)) + }).collect::>(); + let url = Url::parse(&config_map.get("lakesoul.pg.url=").unwrap_or(&"jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified")[5..]).unwrap(); + Self::from_config( + format!( + "host={} port={} dbname={} user={} password={}", + url.host_str().unwrap(), + url.port().unwrap(), + url.path_segments().unwrap().next().unwrap(), + config_map.get("lakesoul.pg.username=").unwrap_or(&"lakesoul_test"), + config_map.get("lakesoul.pg.password=").unwrap_or(&"lakesoul_test")) + ) + } + Err(_) => MetaDataClient::default() + } + + } + + pub fn from_config(config: String) -> Self { + let runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .max_blocking_threads(8) + .build() + .unwrap(); + let client = create_connection(&runtime, config).unwrap(); + let prepared = PreparedStatementMap::new(); + Self { + runtime, + client, + prepared + } + } + + pub fn create_table( + &mut self, + table_info: TableInfo + ) -> Result<()> { + let _ = self.insert_table_path_id(&table_path_id_from_table_info(&table_info))?; + let _ = self.insert_table_name_id(&table_name_id_from_table_info(&table_info))?; + let _ = self.insert_table_info(&table_info)?; + Ok(()) + } + + fn execute_insert(&mut self, insert_type: i32, wrapper: JniWrapper) -> Result { + execute_insert(&self.runtime, &mut self.client, &mut self.prepared, insert_type, wrapper) + } + + fn execute_query(&mut self, query_type: i32, joined_string: String) -> Result { + let encoded = execute_query(&self.runtime, &self.client, &mut self.prepared, query_type, joined_string)?; + match JniWrapper::decode(prost::bytes::Bytes::from(encoded)) { + Ok(wrapper) => Ok(wrapper), + Err(err) => Err(std::io::Error::other(err)) + } + } + + fn insert_table_info(&mut self, table_info: &TableInfo) -> Result { + self.execute_insert(DaoType::InsertTableInfo as i32, JniWrapper{table_info: vec![table_info.clone()], ..Default::default()}) + } + + fn insert_table_name_id(&mut self, table_name_id: &TableNameId) -> Result{ + self.execute_insert(DaoType::InsertTableNameId as i32, JniWrapper{table_name_id: vec![table_name_id.clone()], ..Default::default()}) + } + + fn insert_table_path_id(&mut self, table_path_id: &TablePathId) -> Result{ + self.execute_insert(DaoType::InsertTablePathId as i32, JniWrapper{table_path_id: vec![table_path_id.clone()], ..Default::default()}) + } + + fn insert_data_commit_info(&mut self, data_commit_info: &DataCommitInfo) -> Result { + self.execute_insert(DaoType::InsertDataCommitInfo as i32, JniWrapper{data_commit_info: vec![data_commit_info.clone()], ..Default::default()}) + } + + fn transaction_insert_partition_info(&mut self, partition_info_list: Vec) -> Result { + self.execute_insert(DaoType::TransactionInsertPartitionInfo as i32, JniWrapper { partition_info: partition_info_list, ..Default::default()}) + } + + pub fn meta_cleanup(&mut self) -> Result { + clean_meta_for_test(&self.runtime, &self.client) + } + + pub fn commit_data(&mut self, meta_info: MetaInfo, commit_op: CommitOp) -> Result<()> { + let table_info = meta_info.table_info.unwrap(); + if !table_info.table_name.is_empty() { + // todo: updateTableShortName + + + } + // todo: updateTableProperties + + // conflict handling + let _raw_map = meta_info.list_partition + .iter() + .map(|partition_info| (partition_info.partition_desc.clone(), partition_info.clone())) + .collect::>(); + + let partition_desc_list = meta_info.list_partition + .iter() + .map(|partition_info| partition_info.partition_desc.clone()) + .collect::>(); + + let _snapshot_list = meta_info.list_partition + .iter() + .flat_map(|partition_info| partition_info.snapshot.clone()) + .collect::>(); + + // conflict handling + let cur_map = self.get_cur_partition_map(&table_info.table_id, &partition_desc_list)?; + + + match commit_op { + CommitOp::AppendCommit | CommitOp::MergeCommit => { + let new_partition_list = meta_info.list_partition + .iter() + .map(|partition_info| { + let partition_desc = &partition_info.partition_desc; + match cur_map.get(partition_desc) { + Some(cur_partition_info) => { + let mut cur_partition_info = cur_partition_info.clone(); + cur_partition_info.domain = self.get_table_domain(&table_info.table_id).unwrap(); + cur_partition_info.snapshot.extend_from_slice(&partition_info.snapshot[..]); + cur_partition_info.version += 1; + cur_partition_info.commit_op = commit_op as i32; + cur_partition_info.expression = partition_info.expression.clone(); + cur_partition_info + } + None => PartitionInfo { + table_id: table_info.table_id.clone(), + partition_desc: partition_desc.clone(), + version: 0, + snapshot: Vec::from(&partition_info.snapshot[..]), + domain: self.get_table_domain(&table_info.table_id).unwrap(), + commit_op: commit_op as i32, + expression: partition_info.expression.clone(), + ..Default::default() + } + } + }) + .collect::>(); + match self.transaction_insert_partition_info(new_partition_list) { + Ok(_) => Ok(()), + Err(e) => Err(e) + } + } + _ => { + todo!() + } + } + } + + fn get_cur_partition_map(&mut self, table_id: &str, partition_desc_list: &[String]) -> Result> { + Ok(self.get_partition_info_by_table_id_and_partition_list(table_id, partition_desc_list)? + .iter() + .map(|partition_info|(partition_info.partition_desc.clone(), partition_info.clone())) + .collect() + ) + } + + pub fn commit_data_commit_info(&mut self, data_commit_info: DataCommitInfo) -> Result<()> { + let table_id = &data_commit_info.table_id; + let partition_desc = &data_commit_info.partition_desc; + let commit_op = data_commit_info.commit_op; + let commit_id = &data_commit_info.commit_id.clone().unwrap(); + let commit_id_str = uuid::Uuid::from_u64_pair(commit_id.high, commit_id.low).to_string(); + match self.get_single_data_commit_info(table_id, partition_desc, &commit_id_str)? { + Some(data_commit_info) if data_commit_info.committed => { + return Ok(()); + } + None => { + let _ = self.insert_data_commit_info(&data_commit_info); + } + _ => {} + }; + let table_info = Some(self.get_table_info_by_table_id(table_id)?); + let domain = self.get_table_domain(table_id)?; + self.commit_data(MetaInfo { + table_info, + list_partition: vec![PartitionInfo { + table_id: table_id.clone(), + partition_desc: partition_desc.clone(), + commit_op, + domain, + snapshot: vec![commit_id.clone()], + ..Default::default() + }], + ..Default::default() + }, CommitOp::from_i32(commit_op).unwrap()) + } + + pub fn get_table_domain(&mut self, _table_id: &str) -> Result { + Ok("public".to_string()) + } + + pub fn get_table_name_id_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + match self.execute_query(DaoType::SelectTableNameIdByTableName as i32, [table_name, namespace].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(wrapper.table_name_id[0].clone()), + Err(err) => Err(err) + } + } + + pub fn get_table_info_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + match self.execute_query(DaoType::SelectTableInfoByTableNameAndNameSpace as i32, [table_name, namespace].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(wrapper.table_info[0].clone()), + Err(err) => Err(err) + } + } + + pub fn get_table_info_by_table_id(&mut self, table_id: &str) -> Result { + match self.execute_query(DaoType::SelectTableInfoByTableId as i32, table_id.to_string()) { + Ok(wrapper) => Ok(wrapper.table_info[0].clone()), + Err(err) => Err(err) + } + } + + + pub fn get_data_files_by_table_name(&mut self, table_name: &str, partitions: Vec<(&str, &str)>, namespace: &str) -> Result> { + let partition_filter = partitions + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>(); + let table_info = self.get_table_info_by_table_name(table_name, namespace)?; + let partition_list = self.get_all_partition_info(table_info.table_id.as_str())?; + let mut data_commit_info_list = Vec::::new(); + for idx in 0..partition_list.len() { + let partition_info = partition_list.get(idx).unwrap(); + let partition_desc = partition_info.partition_desc.clone(); + if partition_filter.contains(&partition_desc) { + continue; + } else { + let _data_commit_info_list = self.get_data_commit_info_of_single_partition(partition_info).unwrap(); + // let data_commit_info_list = Vec::::new(); + let _data_file_list = _data_commit_info_list + .iter() + .flat_map(|data_commit_info| { + data_commit_info.file_ops + .iter() + .map(|file_op| file_op.path.clone()) + .collect::>() + }) + .collect::>(); + data_commit_info_list.extend_from_slice(&_data_file_list); + } + } + Ok(data_commit_info_list) + } + + fn get_data_commit_info_of_single_partition(&mut self, partition_info: &PartitionInfo) -> Result> { + let table_id = &partition_info.table_id; + let partition_desc = &partition_info.partition_desc; + let joined_commit_id = &partition_info.snapshot + .iter() + .map(|commit_id| format!("{:0>16x}{:0>16x}", commit_id.high, commit_id.low)) + .collect::>() + .join(""); + let joined_string = [table_id.as_str(), partition_desc.as_str(), joined_commit_id.as_str()].join(PARAM_DELIM); + match self.execute_query(DaoType::ListDataCommitInfoByTableIdAndPartitionDescAndCommitList as i32, joined_string) { + Ok(wrapper) => Ok(wrapper.data_commit_info), + Err(e) => Err(e), + } + } + + pub fn get_schema_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + let table_info = self.get_table_info_by_table_name(table_name, namespace)?; + Ok(table_info.table_schema) + } + + pub fn get_all_partition_info(&mut self, table_id: &str) -> Result> { + match self.execute_query(DaoType::ListPartitionByTableId as i32, table_id.to_string()) { + Ok(wrapper) => Ok(wrapper.partition_info), + Err(e) => Err(e), + } + } + + pub fn get_single_data_commit_info(&mut self, table_id: &str, partition_desc: &str, commit_id: &str) -> Result> { + match self.execute_query(DaoType::SelectOneDataCommitInfoByTableIdAndPartitionDescAndCommitId as i32, [table_id, partition_desc, commit_id].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(if wrapper.data_commit_info.is_empty() { + None + } else { + Some(wrapper.data_commit_info[0].clone()) + }), + Err(e) => Err(e), + } + } + + pub fn get_partition_info_by_table_id_and_partition_list(&mut self, table_id: &str, partition_desc_list: &[String]) -> Result> { + match self.execute_query(DaoType::ListPartitionDescByTableIdAndParList as i32, [table_id, partition_desc_list.join(PARTITION_DESC_DELIM).as_str()].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(wrapper.partition_info), + Err(e) => Err(e), + } + + } + +} + +pub fn table_path_id_from_table_info(table_info: &TableInfo) -> TablePathId { + TablePathId { + table_path: table_info.table_path.clone(), + table_id: table_info.table_id.clone(), + table_namespace: table_info.table_namespace.clone(), + domain: table_info.domain.clone() + } +} +pub fn table_name_id_from_table_info(table_info: &TableInfo) -> TableNameId { + TableNameId { + table_name: table_info.table_name.clone(), + table_id: table_info.table_id.clone(), + table_namespace: table_info.table_namespace.clone(), + domain: table_info.domain.clone() + } +} +