Skip to content

Commit

Permalink
compile lakesoul-io lakesoul-io-c
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Apr 25, 2024
1 parent 5f6c3b2 commit c569cd0
Show file tree
Hide file tree
Showing 19 changed files with 4,982 additions and 667 deletions.
4,193 changes: 4,193 additions & 0 deletions rust/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [
"lakesoul-metadata",
"lakesoul-metadata-c",
"lakesoul-io",
# "lakesoul-io-c",
"lakesoul-io-c",
# "lakesoul-datafusion"
]
resolver = "2"
Expand Down
1 change: 0 additions & 1 deletion rust/justfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
copy-to-java:
cargo build
cp target/debug/liblakesoul_io_c.dylib ../lakesoul-common/target/classes/
cp target/debug/liblakesoul_metadata_c.dylib ../lakesoul-common/target/classes/
11 changes: 5 additions & 6 deletions rust/lakesoul-io-c/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ crate-type = ["cdylib"]

[dependencies]
lakesoul-io = { path = "../lakesoul-io" }
arrow = { workspace = true, features = ["ffi"] }
arrow = { version = "51", features = ["prettyprint", "ffi"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1.0"
serde = { version = "1.0", default-features = false, features = ["derive", "std"], optional = true }
Expand All @@ -23,15 +23,14 @@ env_logger = "0.11"


[target.'cfg(target_os = "windows")'.dependencies]
datafusion-substrait = { workspace = true }
datafusion-substrait = { version = "37" }

[target.'cfg(not(target_os = "windows"))'.dependencies]
datafusion-substrait = { workspace = true, features = ["protoc"] }

datafusion-substrait = { version = "37", features = ["protoc"] }

[features]
hdfs = ["lakesoul-io/hdfs"]
simd = ["lakesoul-io/simd"]
#hdfs = ["lakesoul-io/hdfs"]
#simd = ["lakesoul-io/simd"]
default = []

[build-dependencies]
Expand Down
10 changes: 6 additions & 4 deletions rust/lakesoul-io-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,11 @@ pub extern "C" fn free_tokio_runtime(runtime: NonNull<CResult<TokioRuntime>>) {
from_nonnull(runtime).free::<Runtime>();
}

#[no_mangle]
pub extern "C" fn rust_logger_init() {
let _ = env_logger::try_init();
}

#[cfg(test)]
mod tests {
use core::ffi::c_ptrdiff_t;
Expand Down Expand Up @@ -1227,7 +1232,4 @@ mod tests {
}


#[no_mangle]
pub extern "C" fn rust_logger_init() {
let _ = env_logger::try_init();
}

29 changes: 16 additions & 13 deletions rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@ version = "2.5.0"
edition = "2021"

[dependencies]
datafusion = { workspace = true }
object_store = { workspace = true }
# origin in workspace
datafusion = { version = "37" }
datafusion-common = { version = "37" }
object_store = { version = "0.9.1", default-features = false, features = ["aws", "http"] } # datafuion use this version don't change
arrow = { version = "51", features = ["prettyprint"] }
arrow-schema = { version = "51.0.0", features = ["serde"] }
arrow-array = { version = "51.0.0", features = ["chrono-tz"] }
arrow-buffer = { version = "51.0.0" }
#arrow-cast = { workspace = true }
parquet = { version = "51", features = ["async"] }


tokio-stream = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
derivative = { workspace = true }
atomic_refcell = { workspace = true }
arrow = { workspace = true, features = ["prettyprint"] }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-array = { workspace = true, features = ["chrono-tz"] }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
parquet = { workspace = true, features = ["async", "arrow"] }
futures = { workspace = true }
datafusion-common = { workspace = true }
serde = { workspace = true }
url = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -42,19 +44,20 @@ half = { workspace = true }
log = "0.4.20"
anyhow = { workspace = true, features = [] }
prost = "0.12.3"
async-recursion = "1.1"
env_logger = "0.11"


[features]
hdfs = ["dep:hdrs"]
simd = ["datafusion/simd", "arrow/simd", "arrow-array/simd"]
#simd = ["datafusion/simd", "arrow/simd", "arrow-array/simd"]
default = []

[target.'cfg(target_os = "windows")'.dependencies]
datafusion-substrait = { workspace = true }
datafusion-substrait = { version = "37" }

[target.'cfg(not(target_os = "windows"))'.dependencies]
datafusion-substrait = { workspace = true, features = ["protoc"] }
datafusion-substrait = { version = "37", features = ["protoc"] }



Expand Down
6 changes: 2 additions & 4 deletions rust/lakesoul-io/src/datasource/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl FileFormat for LakeSoulParquetFormat {
// will not prune data based on the statistics.
let predicate = self
.parquet_format
.enable_pruning(state.config_options())
.enable_pruning()
.then(|| filters.cloned())
.flatten();

Expand All @@ -113,7 +113,7 @@ impl FileFormat for LakeSoulParquetFormat {
merged_schema.clone(),
flatten_conf,
predicate,
self.parquet_format.metadata_size_hint(state.config_options()),
self.parquet_format.metadata_size_hint(),
self.conf.clone(),
)?);

Expand Down Expand Up @@ -174,7 +174,6 @@ pub async fn flatten_file_scan_config(
let limit = conf.limit;
let table_partition_cols = conf.table_partition_cols.clone();
let output_ordering = conf.output_ordering.clone();
let infinite_source = conf.infinite_source;
let config = FileScanConfig {
object_store_url: object_store_url.clone(),
file_schema,
Expand All @@ -184,7 +183,6 @@ pub async fn flatten_file_scan_config(
limit,
table_partition_cols,
output_ordering,
infinite_source,
};
flatten_configs.push(config);
}
Expand Down
28 changes: 15 additions & 13 deletions rust/lakesoul-io/src/datasource/physical_plan/defatul_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion::{
execution::TaskContext,
physical_expr::PhysicalSortExpr,
execution::TaskContext
,
physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream},
};
use datafusion::physical_plan::{ExecutionPlanProperties, PlanProperties};
use datafusion_common::{DataFusionError, Result};

use crate::default_column_stream::DefaultColumnStream;
Expand All @@ -19,19 +20,24 @@ use crate::default_column_stream::DefaultColumnStream;
pub struct DefaultColumnExec {
input: Arc<dyn ExecutionPlan>,
target_schema: SchemaRef,
default_column_value: Arc<HashMap<String, String>>
default_column_value: Arc<HashMap<String, String>>,
cache: PlanProperties,
}

impl DefaultColumnExec {
pub fn new(
input: Arc<dyn ExecutionPlan>,
input: Arc<dyn ExecutionPlan>,
target_schema: SchemaRef,
default_column_value: Arc<HashMap<String, String>>
default_column_value: Arc<HashMap<String, String>>,
) -> Result<Self> {
let eq_prop = input.equivalence_properties().clone();
let partitioning = input.output_partitioning();
let cache = PlanProperties::new(eq_prop, partitioning.clone(), input.execution_mode());
Ok(Self {
input,
target_schema,
default_column_value
default_column_value,
cache,
})
}
}
Expand All @@ -51,16 +57,12 @@ impl ExecutionPlan for DefaultColumnExec {
self.target_schema.clone()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
vec![self.input.clone()]
}

fn with_new_children(self: Arc<Self>, _: Vec<Arc<dyn ExecutionPlan>>) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
24 changes: 11 additions & 13 deletions rust/lakesoul-io/src/datasource/physical_plan/empty_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};
use datafusion::{
execution::TaskContext,
physical_expr::PhysicalSortExpr,
execution::TaskContext
,
physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream},
};
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::{ExecutionMode, PlanProperties};
use datafusion_common::Result;

use crate::default_column_stream::empty_schema_stream::EmptySchemaStream;
Expand All @@ -19,13 +21,17 @@ use crate::default_column_stream::empty_schema_stream::EmptySchemaStream;
pub struct EmptySchemaScanExec {
count: usize,
empty_schema: SchemaRef,
cache: PlanProperties,
}

impl EmptySchemaScanExec {
pub fn new(count: usize) -> Self {
let empty_schema = SchemaRef::new(Schema::empty());
let eq_prop = EquivalenceProperties::new(empty_schema.clone());
Self {
count,
empty_schema: SchemaRef::new(Schema::empty()),
empty_schema,
cache: PlanProperties::new(eq_prop, Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded),
}
}
}
Expand All @@ -41,16 +47,8 @@ impl ExecutionPlan for EmptySchemaScanExec {
self
}

fn schema(&self) -> SchemaRef {
self.empty_schema.clone()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
Loading

0 comments on commit c569cd0

Please sign in to comment.