Skip to content

Commit

Permalink
[Rust]DataFusion connector supports partition column (#449)
Browse files Browse the repository at this point in the history
* datafusion connector supports partition column

Signed-off-by: zenghua <[email protected]>

* fix ci test

Signed-off-by: zenghua <[email protected]>

* fix ci test

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Mar 4, 2024
1 parent 78a0456 commit 2224829
Show file tree
Hide file tree
Showing 28 changed files with 1,528 additions and 1,102 deletions.
2 changes: 2 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-r
arrow-schema = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" }
arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" }
arrow-buffer = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" }
arrow-cast = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" }
arrow-arith = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" }
parquet = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-48-parquet-bufferred" }
object_store = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "object_store_0.7_opt", features = ["aws", "http"] }

Expand Down
2 changes: 2 additions & 0 deletions rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ edition = "2021"
datafusion = { workspace = true }
object_store = { workspace = true }
arrow = { workspace = true }
arrow-cast = { workspace = true }
arrow-arith = { workspace = true }
parquet = { workspace = true }
lakesoul-io = { path = "../lakesoul-io" }
lakesoul-metadata = { path = "../lakesoul-metadata" }
Expand Down
21 changes: 9 additions & 12 deletions rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
})?,
partitions: format!(
"{};{}",
"",
config
.range_partitions_slice()
.iter()
.map(String::as_str)
.collect::<Vec<_>>()
.join(","),
config
.primary_keys_slice()
.iter()
Expand All @@ -77,7 +82,7 @@ pub(crate) async fn create_io_config_builder(
let table_info = client.get_table_info_by_table_name(table_name, namespace).await?;
let data_files = if fetch_files {
client
.get_data_files_by_table_name(table_name, vec![], namespace)
.get_data_files_by_table_name(table_name, namespace)
.await?
} else {
vec![]
Expand Down Expand Up @@ -114,7 +119,7 @@ pub(crate) fn parse_table_info_partitions(partitions: String) -> Result<(Vec<Str
pub(crate) async fn commit_data(
client: MetaDataClientRef,
table_name: &str,
partitions: Vec<(String, String)>,
partition_desc: String,
files: &[String],
) -> Result<()> {
let table_ref = TableReference::from(table_name);
Expand All @@ -124,15 +129,7 @@ pub(crate) async fn commit_data(
client
.commit_data_commit_info(DataCommitInfo {
table_id: table_name_id.table_id,
partition_desc: if partitions.is_empty() {
"-5".to_string()
} else {
partitions
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
},
partition_desc,
file_ops: files
.iter()
.map(|file| DataFileOp {
Expand Down
Loading

0 comments on commit 2224829

Please sign in to comment.