Skip to content

Commit

Permalink
[Rust][NativeIO]Use stable rustc for lakesoul-io feature default (#358)
Browse files Browse the repository at this point in the history
* use stable rustc for lakesoul-io feature default

Signed-off-by: zenghua <[email protected]>

* replace get_mut_unchecked with make_mut for SortKeyBatchRanges

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Nov 2, 2023
1 parent c7fdbaa commit e299795
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 204 deletions.
429 changes: 239 additions & 190 deletions rust/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/upsert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod upsert_with_io_config_tests {
use lakesoul_io::datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use lakesoul_io::datafusion::prelude::SessionContext;
use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader};
use lakesoul_io::tokio::runtime::{Builder};
use lakesoul_io::tokio::runtime::Builder;
use lakesoul_io::arrow;
use lakesoul_io::arrow::array::{ArrayRef, Int32Array};
use lakesoul_io::arrow::datatypes::{Schema, SchemaRef, Field};
Expand Down
2 changes: 2 additions & 0 deletions rust/lakesoul-io-c/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ serde = { version = "1.0", default-features = false, features = ["derive", "std"

[features]
hdfs = ["lakesoul-io/hdfs"]
simd = ["lakesoul-io/simd"]
default = []

[build-dependencies]
cbindgen = "0.24.0"
8 changes: 5 additions & 3 deletions rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ version = "2.3.0"
edition = "2021"

[dependencies]
datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch", features = ["simd"] }
datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch"}
object_store = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["aws"] }

tokio-stream = "0.1.9"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["io", "compat"]}
derivative = "2.2.0"
atomic_refcell = "0.1.8"
arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint", "simd"] }
arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint"] }
arrow-schema = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["serde"] }
arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["simd", "chrono-tz"] }
arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["chrono-tz"] }
arrow-buffer = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred" }
parquet = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["async", "arrow"] }
futures = "0.3"
Expand All @@ -36,6 +36,8 @@ serde_json = { version = "1.0"}

[features]
hdfs = ["dep:hdrs"]
simd = ["datafusion/simd", "arrow/simd", "arrow-array/simd"]
default = []

[dev-dependencies]
tempfile = "3.3.0"
Expand Down
6 changes: 3 additions & 3 deletions rust/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::any::Any;
use std::borrow::Borrow;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::io::ErrorKind::ResourceBusy;
use std::io::ErrorKind::AddrInUse;
use std::io::Write;
use std::sync::Arc;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Write for InMemBuf {
let mut v = self
.0
.try_borrow_mut()
.map_err(|_| std::io::Error::from(ResourceBusy))?;
.map_err(|_| std::io::Error::from(AddrInUse))?;
v.extend(buf);
Ok(buf.len())
}
Expand All @@ -104,7 +104,7 @@ impl Write for InMemBuf {
let mut v = self
.0
.try_borrow_mut()
.map_err(|_| std::io::Error::from(ResourceBusy))?;
.map_err(|_| std::io::Error::from(AddrInUse))?;
v.extend(buf);
Ok(())
}
Expand Down
4 changes: 0 additions & 4 deletions rust/lakesoul-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
//
// SPDX-License-Identifier: Apache-2.0

#![feature(new_uninit)]
#![feature(get_mut_unchecked)]
#![feature(io_error_more)]
#![feature(sync_unsafe_cell)]

pub mod lakesoul_reader;
pub mod filter;
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-io/src/sorted_merge/combiner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl MinHeapSortKeyBatchRangeCombiner {
}

fn get_mut_current_sort_key_range(&mut self) -> &mut SortKeyBatchRanges {
unsafe { Arc::get_mut_unchecked(&mut self.current_sort_key_range) }
Arc::make_mut(&mut self.current_sort_key_range)
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-io/src/sorted_merge/sort_key_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Clone for SortKeyArrayRange {
}

// Multiple ranges with same sorted primary key from variant source record_batch. These ranges will be merged into ONE row of target record_batch finnally.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SortKeyBatchRanges {
// vector with length=column_num that holds a Vector of SortKeyArrayRange to be merged for each column
pub(crate) sort_key_array_ranges: Vec<SmallVec<[SortKeyArrayRange; 4]>>,
Expand Down
14 changes: 13 additions & 1 deletion rust/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::io::Write;
use std::ptr::NonNull;
use std::ffi::{c_char, c_uchar, CString, CStr};

use lakesoul_metadata::{Runtime, Builder, Client, PreparedStatementMap};
use lakesoul_metadata::{Runtime, Builder, Client, PreparedStatementMap, MetaDataClient};
use prost::bytes::BufMut;
use proto::proto::entity;
use prost::Message;
Expand Down Expand Up @@ -244,6 +244,7 @@ pub extern "C" fn free_bytes_result(bytes: NonNull<Result<BytesResult>>) {
}



#[no_mangle]
pub extern "C" fn clean_meta_for_test(
callback: extern "C" fn(i32, *const c_char),
Expand Down Expand Up @@ -313,4 +314,15 @@ pub extern "C" fn create_prepared_statement() -> NonNull<Result<PreparedStatemen
#[no_mangle]
pub extern "C" fn free_prepared_statement(prepared: NonNull<Result<PreparedStatement>>) {
from_nonnull(prepared).free::<PreparedStatementMap>();
}

#[no_mangle]
pub extern "C" fn create_lakesoul_metadata_client() -> NonNull<Result<MetaDataClient>> {
let client = MetaDataClient::from_env();
convert_to_nonnull(Result::<MetaDataClient>::new(client))
}

#[no_mangle]
pub extern "C" fn free_lakesoul_metadata_client(prepared: NonNull<Result<MetaDataClient>>) {
from_nonnull(prepared).free::<MetaDataClient>();
}

0 comments on commit e299795

Please sign in to comment.