Skip to content

Commit

Permalink
create rust metadata client
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Oct 31, 2023
1 parent f82eb3c commit ce1e1f7
Show file tree
Hide file tree
Showing 10 changed files with 731 additions and 149 deletions.
316 changes: 176 additions & 140 deletions rust/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust/lakesoul-io-c/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = { version = "1.0", default-features = false, features = ["derive", "std"

[features]
hdfs = ["lakesoul-io/hdfs"]
simd = ["lakesoul-io/simd"]

[build-dependencies]
cbindgen = "0.24.0"
8 changes: 5 additions & 3 deletions rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ version = "2.3.0"
edition = "2021"

[dependencies]
datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch", features = ["simd"] }
datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch"}
object_store = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["aws"] }

tokio-stream = "0.1.9"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["io", "compat"]}
derivative = "2.2.0"
atomic_refcell = "0.1.8"
arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint", "simd"] }
arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint"] }
arrow-schema = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["serde"] }
arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["simd", "chrono-tz"] }
arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["chrono-tz"] }
arrow-buffer = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred" }
parquet = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["async", "arrow"] }
futures = "0.3"
Expand All @@ -36,6 +36,8 @@ serde_json = { version = "1.0"}

[features]
hdfs = ["dep:hdrs"]
simd = ["datafusion/simd", "arrow/simd", "arrow-array/simd"]
default = []

[dev-dependencies]
tempfile = "3.3.0"
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-io/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
// SPDX-License-Identifier: Apache-2.0

pub mod parquet_source;
pub mod parquet_sink;
192 changes: 192 additions & 0 deletions rust/lakesoul-io/src/datasource/parquet_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0


use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::Arc;

use async_trait::async_trait;

use datafusion::common::{Statistics, DataFusionError};

use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::logical_expr::{
Expr, TableType,
};
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Distribution, stream::RecordBatchStreamAdapter
};
use datafusion::arrow::datatypes::{Schema, SchemaRef};

use crate::lakesoul_io_config::{IOSchema, LakeSoulIOConfig};
use crate::lakesoul_writer::MultiPartAsyncWriter;
use crate::transform::uniform_schema;

#[derive(Debug, Clone)]
pub struct LakeSoulParquetSinkProvider{
schema: SchemaRef,
config: LakeSoulIOConfig
}

#[async_trait]
impl TableProvider for LakeSoulParquetSinkProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
_state: &SessionState,
_projections: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let msg = "Scan not implemented for LakeSoulParquetSinkProvider".to_owned();
Err(DataFusionError::NotImplemented(msg))
}


async fn insert_into(
&self,
_state: &SessionState,
input: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let writer_schema = self.schema();
let mut writer_config = self.config.clone();
writer_config.schema = IOSchema(uniform_schema(writer_schema));
let _writer = MultiPartAsyncWriter::try_new(writer_config).await?;
Ok(Arc::new(LakeSoulParquetSinkExec::new(input)))
}

}

#[derive(Debug, Clone)]
struct LakeSoulParquetSinkExec {
/// Input plan that produces the record batches to be written.
input: Arc<dyn ExecutionPlan>,
/// Sink to whic to write
// sink: Arc<dyn DataSink>,
/// Schema describing the structure of the data.
schema: SchemaRef,

}

impl LakeSoulParquetSinkExec {
fn new(
input: Arc<dyn ExecutionPlan>,
) -> Self {
Self {
input,
schema: Arc::new(Schema::empty())
}
}
}

impl DisplayAs for LakeSoulParquetSinkExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
write!(f, "LakeSoulParquetSinkExec")
}
}

impl ExecutionPlan for LakeSoulParquetSinkExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// Require that the InsertExec gets the data in the order the
// input produced it (otherwise the optimizer may chose to reorder
// the input which could result in unintended / poor UX)
//
// More rationale:
// https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
vec![self
.input
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![false]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
input: children[0].clone(),
schema: self.schema.clone(),
}))
}

/// Execute the plan and return a stream of `RecordBatch`es for
/// the specified partition.
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(
format!("Invalid requested partition {partition}. InsertExec requires a single input partition."
)));
}

// Execute each of our own input's partitions and pass them to the sink
let input_partition_count = self.input.output_partitioning().partition_count();
if input_partition_count != 1 {
return Err(DataFusionError::Internal(format!(
"Invalid input partition count {input_partition_count}. \
InsertExec needs only a single partition."
)));
}

let data = self.input.execute(0, context)?;
let schema = self.schema.clone();


Ok(Box::pin(RecordBatchStreamAdapter::new(schema, data)))
}


fn statistics(&self) -> Statistics {
Statistics::default()
}
}
14 changes: 14 additions & 0 deletions rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ pub struct LakeSoulIOConfig {
pub(crate) default_fs: String,
}

impl LakeSoulIOConfig {
pub fn schema(&self) -> SchemaRef {
self.schema.0.clone()
}

pub fn primary_keys_slice(&self) -> &[String] {
&self.primary_keys
}

pub fn files_slice(&self) -> &[String] {
&self.files
}
}

#[derive(Derivative, Debug)]
#[derivative(Clone, Default)]
pub struct LakeSoulIOConfigBuilder {
Expand Down
8 changes: 4 additions & 4 deletions rust/lakesoul-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ pub mod filter;
pub mod lakesoul_writer;
pub mod lakesoul_io_config;
pub mod sorted_merge;
mod datasource;
pub mod datasource;
mod projection;

#[cfg(feature = "hdfs")]
mod hdfs;

pub mod default_column_stream;
pub mod constant;
pub mod transform;
mod default_column_stream;
mod constant;
mod transform;

pub use datafusion::arrow::error::Result;
pub use tokio;
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-metadata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ prost = "0.11"
num_enum = "0.5.1"
uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]}
serde_json = { version = "1.0"}
url = "2.4.1"

7 changes: 5 additions & 2 deletions rust/lakesoul-metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub use tokio::runtime::{Builder, Runtime};
pub use tokio_postgres::{NoTls, Client, Statement};
use postgres_types::{ToSql, FromSql};

mod metadata_client;
pub use metadata_client::MetaDataClient;

pub const DAO_TYPE_QUERY_ONE_OFFSET : i32 = 0;
pub const DAO_TYPE_QUERY_LIST_OFFSET : i32 = 100;
pub const DAO_TYPE_INSERT_ONE_OFFSET : i32 = 200;
Expand Down Expand Up @@ -216,7 +219,7 @@ fn get_prepared_statement(
// Select PartitionInfo
DaoType::SelectPartitionVersionByTableIdAndDescAndVersion =>
"select table_id, partition_desc, version, commit_op, snapshot, expression, domain
from partition_info
from partition_info from partition_info
where table_id = $1::TEXT and partition_desc = $2::TEXT and version = $3::INT",
DaoType::SelectOnePartitionVersionByTableIdAndDesc =>
"select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression, m.domain from (
Expand Down Expand Up @@ -1286,7 +1289,7 @@ pub fn execute_query_scalar(
});
match result {
Ok(Some(row)) => {
let ts = row.get::<_, Option<i32>>(0);
let ts = row.get::<_, Option<i64>>(0);
match ts {
Some(ts) => Ok(Some(format!("{}", ts))),
None => Ok(None)
Expand Down
Loading

0 comments on commit ce1e1f7

Please sign in to comment.