Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feat/cache-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 17, 2024
2 parents d78b778 + d10083e commit 2e3b9ae
Show file tree
Hide file tree
Showing 29 changed files with 550 additions and 119 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: run
args: --example declare --all-features
args: --example declare --features bytes,tokio

benchmark:
name: Rust benchmark
Expand All @@ -109,7 +109,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: bench
args: --all-features
args: --features bench

- name: Comment on PR using GitHub CLI
env:
Expand Down
62 changes: 62 additions & 0 deletions .github/workflows/ci_wasm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: WASM CI

on:
push:
pull_request:
workflow_dispatch:

env:
CARGO_TERM_COLOR: always
CARGO_REGISTRIES_MY_REGISTRY_INDEX: https://github.com/rust-lang/crates.io-index


jobs:
check:
name: Rust project wasm check
runs-on: ${{ matrix.os }}
strategy:
matrix:
os:
- ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

- name: Run cargo clippy
uses: actions-rs/cargo@v1
with:
command: check

- name: Setup for wasm32
run: |
rustup target add wasm32-unknown-unknown
- name: Run cargo build
uses: actions-rs/cargo@v1
with:
command: build
args: --target wasm32-unknown-unknown --no-default-features --features aws,bytes,opfs

- name: Install Chrome Environment
run: |
mkdir -p /tmp/chrome
wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url')
wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url')
unzip chromedriver-linux64.zip
unzip chrome-linux64.zip
cp -r chrome-linux64/ /tmp/chrome/
cp -r chromedriver-linux64 /tmp/chrome/chromedriver
- name: Setup wasm-pack
run: |
cargo install wasm-pack
- name: Run wasm-pack test
run: |
export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/
wasm-pack test --chrome --headless --test wasm --no-default-features --features aws,bytes,opfs
60 changes: 43 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,31 @@ version = "0.2.0"
msrv = "1.79.0"

[features]
aws = ["fusio-dispatch/aws", "fusio/aws"]
bench = ["redb", "rocksdb", "sled"]
bytes = ["dep:bytes"]
datafusion = ["dep:async-trait", "dep:datafusion"]
default = ["bytes", "tokio"]
default = ["aws", "bytes", "tokio", "tokio-http"]
load_tbl = []
object-store = ["fusio/object_store"]
opfs = [
"dep:wasm-bindgen-futures",
"fusio-dispatch/opfs",
"fusio-parquet/opfs",
"fusio/opfs",
]
redb = ["dep:redb"]
rocksdb = ["dep:rocksdb"]
sled = ["dep:sled"]
tokio = ["tokio/fs"]
tokio = [
"fusio-dispatch/tokio",
"fusio-parquet/tokio",
"fusio/tokio",
"parquet/default",
"tokio/fs",
]
tokio-http = ["fusio/tokio-http"]
wasm = ["aws", "bytes", "opfs"]

[[example]]
name = "declare"
Expand Down Expand Up @@ -58,25 +74,25 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { package = "fusio", version = "0.3.3", features = [
"aws",
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.3", features = [
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.1", features = [
"aws",
"tokio",
] }
fusio-parquet = { package = "fusio-parquet", version = "0.2.1" }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-parquet", version = "0.2.1" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
lockable = "0.1.1"
once_cell = "1"
parquet = { version = "53", features = ["async"] }
parquet = { version = "53", default-features = false, features = [
"async",
"base64",
"brotli",
"flate2",
"lz4",
"snap",
] }
parquet-lru = { version = "0.1.0", path = "parquet-lru" }
pin-project-lite = "0.2"
regex = "1"
Expand All @@ -93,18 +109,28 @@ redb = { version = "2", optional = true }
rocksdb = { version = "0.22", optional = true }
sled = { version = "0.34", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen = "0.2.95"
wasm-bindgen-futures = { version = "0.4.45", optional = true }

[dev-dependencies]
bincode = "1"
comfy-table = "7"
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
fastrand = "2"
futures = { version = "0.3" }
mimalloc = "0.1"
serde = "1"
tempfile = "3"
tokio = { version = "1", features = ["full"] }
trybuild = "1.0"

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
comfy-table = "7"
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
mimalloc = "0.1"
tokio = { version = "1", features = ["full"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen = "0.2.95"
wasm-bindgen-test = "0.3.9"

[target.'cfg(unix)'.dev-dependencies]
pprof = { version = "0.13", features = ["criterion", "flamegraph"] }

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,4 @@ async fn main() {

## Contributing to Tonbo
Follow the Contributing Guide to [contribute](https://github.com/tonbo-io/tonbo/blob/main/CONTRIBUTING.md).
Please feel free to ask any question or contact us on Github [Discussions](https://github.com/orgs/tonbo-io/discussions) or [issues](https://github.com/tonbo-io/tonbo/issues).
Please feel free to ask any question or contact us on Github [Discussions](https://github.com/tonbo-io/tonbo/discussions) or [issues](https://github.com/tonbo-io/tonbo/issues).
7 changes: 5 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.1", features = [
"aws",
"tokio",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.0", features = [
"aws",
"tokio",
] }
Expand Down
19 changes: 9 additions & 10 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use parquet::arrow::{AsyncArrowWriter, ProjectionMask};
use parquet_lru::LruCache;
use thiserror::Error;
use tokio::sync::oneshot;
use ulid::Ulid;

use crate::{
fs::{manager::StoreManager, FileId, FileType},
Expand Down Expand Up @@ -66,7 +65,7 @@ where
parquet_lru_cache: C,
) -> Result<(), CompactionError<R>>
where
C: LruCache<Ulid> + Unpin,
C: LruCache<FileId> + Unpin,
{
let mut guard = self.schema.write().await;

Expand Down Expand Up @@ -156,7 +155,7 @@ where
AsyncWriter::new(
level_0_fs
.open_options(
&option.table_path(&gen, 0),
&option.table_path(gen, 0),
FileType::Parquet.open_options(false),
)
.await?,
Expand Down Expand Up @@ -206,7 +205,7 @@ where
parquet_cache: &C,
) -> Result<(), CompactionError<R>>
where
C: LruCache<Ulid> + Unpin,
C: LruCache<FileId> + Unpin,
{
let mut level = 0;

Expand All @@ -226,7 +225,7 @@ where
for scope in meet_scopes_l.iter() {
let file = level_fs
.open_options(
&option.table_path(&scope.gen, level),
&option.table_path(scope.gen, level),
FileType::Parquet.open_options(true),
)
.await?;
Expand Down Expand Up @@ -401,7 +400,7 @@ where
fs: &Arc<dyn DynFs>,
) -> Result<(), CompactionError<R>>
where
C: LruCache<Ulid> + Unpin,
C: LruCache<FileId> + Unpin,
{
let mut stream = MergeStream::<R, C>::from_vec(streams, u32::MAX.into()).await?;

Expand Down Expand Up @@ -472,12 +471,12 @@ where
debug_assert!(min.is_some());
debug_assert!(max.is_some());

let gen = Ulid::new();
let gen = FileId::new();
let columns = builder.finish(None);
let mut writer = AsyncArrowWriter::try_new(
AsyncWriter::new(
fs.open_options(
&option.table_path(&gen, level),
&option.table_path(gen, level),
FileType::Parquet.open_options(false),
)
.await?,
Expand Down Expand Up @@ -521,7 +520,7 @@ where
EmptyLevel,
}

#[cfg(test)]
#[cfg(all(test, feature = "tokio"))]
pub(crate) mod tests {
use std::sync::{atomic::AtomicU32, Arc};

Expand Down Expand Up @@ -582,7 +581,7 @@ pub(crate) mod tests {
let mut writer = AsyncArrowWriter::try_new(
AsyncWriter::new(
fs.open_options(
&option.table_path(&gen, level),
&option.table_path(gen, level),
FileType::Parquet.open_options(false),
)
.await?,
Expand Down
43 changes: 40 additions & 3 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::future::Future;

use fusio::MaybeSend;

pub trait Executor {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;
F: Future<Output = ()> + MaybeSend + 'static;
}

#[cfg(any(test, feature = "tokio"))]
#[cfg(feature = "tokio")]
pub mod tokio {
use std::future::Future;

use fusio::MaybeSend;
use tokio::runtime::Handle;

use super::Executor;
Expand All @@ -36,9 +39,43 @@ pub mod tokio {
impl Executor for TokioExecutor {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
F: Future<Output = ()> + MaybeSend + 'static,
{
self.handle.spawn(future);
}
}
}

#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
pub mod opfs {
use std::future::Future;

use fusio::MaybeSend;
use wasm_bindgen::prelude::*;

use super::Executor;

#[wasm_bindgen]
pub struct OpfsExecutor();

impl Default for OpfsExecutor {
fn default() -> Self {
Self {}
}
}

impl OpfsExecutor {
pub fn new() -> Self {
Self {}
}
}

impl Executor for OpfsExecutor {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + MaybeSend + 'static,
{
wasm_bindgen_futures::spawn_local(future);
}
}
}
2 changes: 1 addition & 1 deletion src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ where
}
}

#[cfg(test)]
#[cfg(all(test, feature = "tokio"))]
pub(crate) mod tests {
use std::{mem, sync::Arc};

Expand Down
Loading

0 comments on commit 2e3b9ae

Please sign in to comment.