Skip to content

Commit

Permalink
change splitdesc def
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Mar 4, 2024
1 parent be14656 commit 6b56255
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import com.alibaba.fastjson.annotation.JSONField;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.util.hash.Hash;

import java.util.HashMap;
import java.util.List;

public class SplitDesc {
@JSONField(name = "file_paths")
private List<String> filePaths;
@JSONField(name = "primary_keys")
private List<String> primaryKeys;
@JSONField(name = "partition_desc_array")
private List<String> partitionDescArray;
@JSONField(name = "partition_desc")
private HashMap<String, String> partitionDesc;
@JSONField(name = "table_schema")
private String tableSchema;

Expand All @@ -32,12 +34,12 @@ public void setPrimaryKeys(List<String> primaryKeys) {
this.primaryKeys = primaryKeys;
}

public List<String> getPartitionDescArray() {
return partitionDescArray;
public HashMap<String, String> getPartitionDesc() {
return partitionDesc;
}

public void setPartitionDescArray(List<String> partitionDescArray) {
this.partitionDescArray = partitionDescArray;
public void setPartitionDesc(HashMap<String, String> partitionDesc) {
this.partitionDesc= partitionDesc;
}

public String getTableSchema() {
Expand All @@ -47,12 +49,13 @@ public String getTableSchema() {
public void setTableSchema(String tableSchema) {
this.tableSchema = tableSchema;
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
.append("file_paths", filePaths)
.append("primary_keys", primaryKeys)
.append("partition_desc_array", partitionDescArray)
.append("partition_desc", partitionDesc)
.append("table_schema", tableSchema)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.scalatestplus.junit.JUnitRunner
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.util.Random

// TODO scala_test normalize
@RunWith(classOf[JUnitRunner])
class SplitDescSuite extends QueryTest
with SharedSparkSession
Expand Down Expand Up @@ -44,7 +43,6 @@ class SplitDescSuite extends QueryTest
df
}


test("no range, no hash") {
val tName = generateRandomString(name_length);
withTable(tName) {
Expand All @@ -65,7 +63,7 @@ class SplitDescSuite extends QueryTest
val desc = descs(0);
assert(!desc.getFilePaths.isEmpty)
assert(desc.getPrimaryKeys.isEmpty)
assert(desc.getPartitionDescArray.isEmpty)
assert(desc.getPartitionDesc.isEmpty)
}
}

Expand Down
11 changes: 5 additions & 6 deletions rust/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
extern crate core;

use core::ffi::c_ptrdiff_t;
use std::collections::HashMap;
use std::ffi::{c_char, c_uchar, CStr, CString};
use std::io::Write;
use std::ptr::{null, null_mut, NonNull};
use std::ptr::{NonNull, null, null_mut};

use log::debug;
use prost::bytes::BufMut;
use prost::Message;

use lakesoul_metadata::{Builder, Client, MetaDataClient, PreparedStatementMap, Runtime};
use lakesoul_metadata::error::LakeSoulMetaDataError;
use lakesoul_metadata::transfusion::SplitDesc;
use lakesoul_metadata::{Builder, Client, MetaDataClient, PreparedStatementMap, Runtime};
use proto::proto::entity;

#[repr(C)]
Expand Down Expand Up @@ -422,11 +423,9 @@ pub extern "C" fn debug(callback: extern "C" fn(bool, *const c_char)) -> *mut c_
SplitDesc {
file_paths: vec!["hello jnr".into()],
primary_keys: vec![],
partition_desc_array: vec![],
partition_desc: HashMap::new(),
table_schema: "".to_string(),
};
1
];
};1];
let array = lakesoul_metadata::transfusion::SplitDescArray(x);
let json_vec = serde_json::to_vec(&array).unwrap();
let c_string = CString::new(json_vec).unwrap();
Expand Down
34 changes: 29 additions & 5 deletions rust/lakesoul-metadata/src/transfusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use tokio_postgres::Client;

use proto::proto::entity::{DataCommitInfo, DataFileOp, FileOp, PartitionInfo, TableInfo};

use crate::{error::Result, MetaDataClient, PreparedStatementMap};
use crate::error::LakeSoulMetaDataError;
use crate::transfusion::config::{
LAKESOUL_HASH_PARTITION_SPLITTER, LAKESOUL_NON_PARTITION_TABLE_PART_DESC,
LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH, LAKESOUL_RANGE_PARTITION_SPLITTER,
};
use crate::{error::Result, MetaDataClient, PreparedStatementMap};

mod config {
#![allow(unused)]
Expand Down Expand Up @@ -122,18 +123,29 @@ async fn split_desc_array_inner(db: &MetaDataClient, table_name: &str, namespace
let (_rk, pk) = parse_table_info_partitions(&table_info.partitions);

for (range_key, value_map) in map {
let mut partition_descs = Vec::new();
let mut range_desc = HashMap::new();
if !table_without_range(range_key) {
partition_descs = range_key
let keys: Vec<String> = range_key
.split(LAKESOUL_RANGE_PARTITION_SPLITTER)
.map(ToString::to_string)
.collect();
for k in keys {
let (k, v) = match k.split_once('=') {
None => {
return Err(LakeSoulMetaDataError::Internal("split error".to_string()));
}
Some((k, v)) => {
(k.to_string(), v.to_string())
}
};
range_desc.insert(k, v);
}
}
for physical_files in value_map {
let sd = SplitDesc {
file_paths: physical_files.1,
primary_keys: pk.clone(),
partition_desc_array: partition_descs.clone(),
partition_desc: range_desc.clone(),
table_schema: table_info.table_schema.clone(),
};
splits.push(sd)
Expand Down Expand Up @@ -235,7 +247,7 @@ pub fn parse_table_info_partitions(partitions: &str) -> (Vec<String>, Vec<String
pub struct SplitDesc {
pub file_paths: Vec<String>,
pub primary_keys: Vec<String>,
pub partition_desc_array: Vec<String>,
pub partition_desc: HashMap<String, String>,
pub table_schema: String,
}

Expand Down Expand Up @@ -291,4 +303,16 @@ mod test {
let _f = Box::from_raw(raw2);
}
}

#[test]
fn serialize_test() {
let sd = SplitDesc {
file_paths: vec![],
primary_keys: vec![],
partition_desc: Default::default(),
table_schema: "".to_string(),
};
let s = serde_json::to_string(&sd).unwrap();
println!("{s}");
}
}

0 comments on commit 6b56255

Please sign in to comment.