Skip to content

Commit

Permalink
fix rust ci cases
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Dec 8, 2023
1 parent ce5562a commit 1529870
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 42 deletions.
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
TableInfo {
table_id: format!("table_{}", uuid::Uuid::new_v4()),
table_name: table_name.to_string(),
table_path: [env::temp_dir().to_str().unwrap(), table_name].iter().collect::<PathBuf>().to_str().unwrap().to_string(),
table_path: format!("{}default/{}", env::temp_dir().to_str().unwrap(), table_name),
table_schema: serde_json::to_string::<ArrowJavaSchema>(&config.schema().into()).unwrap(),
table_namespace: "default".to_string(),
properties: "{}".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl LakeSoulParquetSink {
fn table_info(&self) -> Arc<TableInfo> {
self.table_info.clone()
}

}

#[async_trait]
Expand Down
6 changes: 3 additions & 3 deletions rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub(crate) fn create_io_config_builder_from_table_info(table_info: Arc<TableInfo
}


pub(crate) fn create_sort_exprs(pks: &[String]) -> Vec<Expr> {
pks
pub(crate) fn create_sort_exprs(colunms: &[String]) -> Vec<Expr> {
colunms
.iter()
.map(|pk| col(pk).sort(true, true))
.map(|column| col(column).sort(true, true))
.collect()
}
10 changes: 5 additions & 5 deletions rust/lakesoul-datafusion/src/planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl PhysicalPlanner for LakeSoulPhysicalPlanner {
}) => {
let name = table_name.table();
// let schema = session_state.schema_for_ref(table_name)?;
let table = LakeSoulTable::for_name(name).await.unwrap();
match table.as_sink_provider(session_state).await {
let lakesoul_table = LakeSoulTable::for_name(name).await.unwrap();
match lakesoul_table.as_sink_provider(session_state).await {
Ok(provider) => {
let builder = LogicalPlanBuilder::from(input.deref().clone());

let builder = if table.primary_keys().is_empty() {
if !table
let builder = if lakesoul_table.primary_keys().is_empty() {
if !lakesoul_table
.schema()
.logically_equivalent_names_and_types(&Schema::from(input.schema().as_ref()))
{
Expand All @@ -71,7 +71,7 @@ impl PhysicalPlanner for LakeSoulPhysicalPlanner {

builder
} else {
let sort_exprs = create_sort_exprs(table.primary_keys());
let sort_exprs = create_sort_exprs(lakesoul_table.primary_keys());
builder.sort(sort_exprs)?
};

Expand Down
31 changes: 0 additions & 31 deletions rust/lakesoul-datafusion/src/test/upsert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,37 +1220,6 @@ mod upsert_with_metadata_tests {
lakesoul_table.execute_upsert(record_batch).await
}

async fn check_upsert_print(table_name: &str, selected_cols: Vec<&str>, filters: Option<String>, client: MetaDataClientRef, expected: &[&str]) -> Result<()> {
let lakesoul_table = LakeSoulTable::for_name(table_name).await?;
let builder = create_io_config_builder(client, None, false).await?;
let sess_ctx = create_session_context(&mut builder.clone().build())?;


let dataframe = lakesoul_table.to_dataframe(&sess_ctx).await?;
let schema = SchemaRef::new(dataframe.schema().into());

let dataframe = if let Some(f) = filters {
dataframe.filter(Parser::parse(f.clone(), schema))?
} else {
dataframe
};

let dataframe = if selected_cols.is_empty() {
dataframe
} else {
dataframe.select_columns(&selected_cols)?
};

let result = dataframe
// .explain(true, false)?
.collect()
.await?;

// print_batches(&result);
assert_batches_eq!(expected, &result);
Ok(())
}

async fn check_upsert_debug(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option<String>, client: MetaDataClientRef, expected: &[&str]) -> Result<()> {
let lakesoul_table = LakeSoulTable::for_name(table_name).await?;
lakesoul_table.execute_upsert(batch).await?;
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-io/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl LakeSoulListingTable {
.with_table_partition_cols(table_partition_cols)
.with_insert_mode(datafusion::datasource::listing::ListingTableInsertMode::AppendNewFiles);

let prefix = ListingTableUrl::parse(lakesoul_io_config.prefix.clone())?;
let prefix = ListingTableUrl::parse_create_local_if_not_exists(lakesoul_io_config.prefix.clone(), true)?;

ListingTableConfig::new(prefix)
.with_listing_options(listing_options)
Expand Down

0 comments on commit 1529870

Please sign in to comment.