From a013d81f6abefa3b94a5f9970ba206a5bac6be0c Mon Sep 17 00:00:00 2001 From: zenghua Date: Mon, 30 Oct 2023 17:23:21 +0800 Subject: [PATCH] create MetaDataClient && upsert_with_metadata_tests Signed-off-by: zenghua --- rust/Cargo.lock | 316 +++++++++-------- rust/lakesoul-datafusion/Cargo.toml | 3 + .../src/test/upsert_tests.rs | 202 ++++++++++- rust/lakesoul-io/Cargo.toml | 8 +- rust/lakesoul-io/src/datasource/mod.rs | 1 + .../src/datasource/parquet_sink.rs | 192 ++++++++++ .../src/datasource/parquet_source.rs | 69 ++-- rust/lakesoul-io/src/lakesoul_io_config.rs | 14 + rust/lakesoul-io/src/lib.rs | 8 +- rust/lakesoul-metadata/Cargo.toml | 1 + rust/lakesoul-metadata/src/lib.rs | 7 +- rust/lakesoul-metadata/src/metadata_client.rs | 332 ++++++++++++++++++ 12 files changed, 968 insertions(+), 185 deletions(-) create mode 100644 rust/lakesoul-io/src/datasource/parquet_sink.rs create mode 100644 rust/lakesoul-metadata/src/metadata_client.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ec1a799d3..110a210f9 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -19,15 +19,16 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" dependencies = [ "cfg-if", "const-random", "getrandom", "once_cell", "version_check", + "zerocopy", ] [[package]] @@ -102,7 +103,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "ahash", "arrow-arith", @@ -123,7 +124,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -137,7 +138,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "ahash", "arrow-buffer", @@ -146,15 +147,15 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "num", - "packed_simd_2", + "packed_simd", ] [[package]] name = "arrow-buffer" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "half", "num", @@ -163,7 +164,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -171,7 +172,7 @@ dependencies = [ "arrow-schema", "arrow-select", "chrono", - "comfy-table 7.0.1", + "comfy-table 7.1.0", "half", "lexical-core", "num", @@ -180,7 +181,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -198,7 +199,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-buffer", "arrow-schema", @@ -209,7 +210,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -222,7 +223,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -241,7 +242,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -255,7 +256,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "ahash", "arrow-array", @@ -263,22 +264,22 @@ dependencies = [ "arrow-data", "arrow-schema", "half", - "hashbrown 0.14.1", + "hashbrown 0.14.2", ] [[package]] name = "arrow-schema" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "serde", ] [[package]] name = "arrow-select" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -290,7 +291,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "arrow-array", "arrow-buffer", @@ -314,9 +315,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb42b2197bf15ccb092b62c74515dbd8b86d0effd934795f6687c93b6e679a2c" +checksum = "f658e2baef915ba0f26f1f7c42bfb8e12f532a01f449a090ded75ae7a07e9ba2" dependencies = [ "bzip2", "flate2", @@ -326,8 +327,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd", - "zstd-safe", + "zstd 0.13.0", + "zstd-safe 7.0.0", ] [[package]] @@ -341,15 +342,15 @@ dependencies = [ [[package]] name = "async-task" -version = "4.4.1" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921" +checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1" [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -411,9 +412,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "bitflags" @@ -423,9 +424,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "blake2" @@ -644,12 +645,12 @@ dependencies = [ [[package]] name = "comfy-table" -version = "7.0.1" +version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ab77dbd8adecaf3f0db40581631b995f312a8a5ae3aa9993188bb8f23d83a5b" +checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" dependencies = [ - "strum 0.24.1", - "strum_macros 0.24.3", + "strum 0.25.0", + "strum_macros 0.25.3", "unicode-width", ] @@ -664,23 +665,21 @@ dependencies = [ [[package]] name = "const-random" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368a7a772ead6ce7e1de82bfb04c485f3db8ec744f72925af5735e29a22cc18e" +checksum = "11df32a13d7892ec42d51d3d175faba5211ffe13ed25d4fb348ac9e9ce835593" dependencies = [ "const-random-macro", - "proc-macro-hack", ] [[package]] name = "const-random-macro" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ "getrandom", "once_cell", - "proc-macro-hack", "tiny-keccak", ] @@ -708,9 +707,9 @@ checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" [[package]] name = "cpufeatures" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +checksum = "3fbc60abd742b35f2492f808e1abbb83d45f72db402e14c55057edc9c7b1e9e4" dependencies = [ "libc", ] @@ -808,7 +807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "lock_api", "once_cell", "parking_lot_core", @@ -840,7 +839,7 @@ dependencies = [ "futures", "futures-util", "glob", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "indexmap 1.9.3", "itertools 0.11.0", "lazy_static", @@ -861,7 +860,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd", + "zstd 0.12.4", ] [[package]] @@ -886,7 +885,7 @@ dependencies = [ "dashmap", "datafusion-common", "datafusion-expr", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "log", "object_store", "parking_lot", @@ -906,7 +905,7 @@ dependencies = [ "lazy_static", "sqlparser", "strum 0.25.0", - "strum_macros 0.25.2", + "strum_macros 0.25.3", ] [[package]] @@ -920,7 +919,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "itertools 0.11.0", "log", "regex-syntax 0.7.5", @@ -943,7 +942,7 @@ dependencies = [ "datafusion-expr", "datafusion-row", "half", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "indexmap 1.9.3", "itertools 0.11.0", "lazy_static", @@ -1083,9 +1082,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "miniz_oxide", @@ -1276,9 +1275,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" dependencies = [ "ahash", "allocator-api2", @@ -1402,7 +1401,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -1425,16 +1424,16 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" dependencies = [ "android_system_properties", "core-foundation-sys", "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows", + "windows-core", ] [[package]] @@ -1473,7 +1472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.14.2", ] [[package]] @@ -1484,9 +1483,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "ipnet" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itertools" @@ -1534,10 +1533,13 @@ dependencies = [ name = "lakesoul-datafusion" version = "0.1.0" dependencies = [ + "async-trait", + "futures", "lakesoul-io", "lakesoul-metadata", "prost", "proto", + "uuid", ] [[package]] @@ -1598,6 +1600,7 @@ dependencies = [ "serde_json", "tokio", "tokio-postgres", + "url", "uuid", ] @@ -1686,12 +1689,6 @@ version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" -[[package]] -name = "libm" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a" - [[package]] name = "libm" version = "0.2.8" @@ -1706,9 +1703,9 @@ checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" [[package]] name = "lock_api" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" dependencies = [ "autocfg", "scopeguard", @@ -1784,9 +1781,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "log", @@ -1874,7 +1871,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", - "libm 0.2.8", + "libm", ] [[package]] @@ -1920,7 +1917,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.6.1" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "async-trait", "base64", @@ -1962,18 +1959,18 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.5.1" +version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d5d9eb14b174ee9aa2ef96dc2b94637a2d4b6e7cb873c7e171f0c20c6cf3eac" +checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] -name = "packed_simd_2" -version = "0.3.8" +name = "packed_simd" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1914cd452d8fccd6f9db48147b29fd4ae05bea9dc5d9ad578509f72415de282" +checksum = "1f9f08af0c877571712e2e3e686ad79efad9657dbf0f7c3c8ba943ff6c38932d" dependencies = [ "cfg-if", - "libm 0.1.4", + "num-traits", ] [[package]] @@ -1988,13 +1985,13 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.8" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", "windows-targets", ] @@ -2002,7 +1999,7 @@ dependencies = [ [[package]] name = "parquet" version = "42.0.0" -source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#3fb0b78575aac68fe9c62d8e0a45790a9e2b98fa" +source = "git+https://github.com/lakesoul-io/arrow-rs.git?branch=arrow-rs-42-parquet-bufferred#fc24a4a00ca00917dad1fd531dbc0dcbbdba56dd" dependencies = [ "ahash", "arrow-array", @@ -2018,7 +2015,7 @@ dependencies = [ "chrono", "flate2", "futures", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "lz4", "num", "num-bigint", @@ -2029,7 +2026,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.12.4", ] [[package]] @@ -2216,12 +2213,6 @@ dependencies = [ "toml_edit", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.69" @@ -2362,27 +2353,36 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" -version = "1.10.0" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", "regex-automata", - "regex-syntax 0.8.0", + "regex-syntax 0.8.2", ] [[package]] name = "regex-automata" -version = "0.4.1" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.0", + "regex-syntax 0.8.2", ] [[package]] @@ -2393,9 +2393,9 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "regex-syntax" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3cbb081b9784b07cceb8824c8583f86db4814d172ab043f3c23f7dc600bf83d" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" @@ -2471,11 +2471,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.18" +version = "0.38.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a74ee2d7c2581cd139b42447d7d9389b889bdaad3a73f1ebb16f2a3237bb19c" +checksum = "67ce50cb2e16c2903e30d1cbccfd8387a74b9d4c938b6a4c5ec6cc7556f7a8a0" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", @@ -2564,18 +2564,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", @@ -2697,9 +2697,9 @@ checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "socket2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" dependencies = [ "libc", "winapi", @@ -2707,9 +2707,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys", @@ -2777,7 +2777,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.2", + "strum_macros 0.25.3", ] [[package]] @@ -2795,9 +2795,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.2" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ "heck", "proc-macro2", @@ -2863,7 +2863,7 @@ checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", + "redox_syscall 0.3.5", "rustix", "windows-sys", ] @@ -2932,7 +2932,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.5", "tokio-macros", "windows-sys", ] @@ -2968,7 +2968,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand", - "socket2 0.5.4", + "socket2 0.5.5", "tokio", "tokio-util", "whoami", @@ -3021,9 +3021,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" [[package]] name = "toml_edit" @@ -3044,11 +3044,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3056,9 +3055,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -3067,9 +3066,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", ] @@ -3148,9 +3147,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ "getrandom", "rand", @@ -3159,9 +3158,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e1ba1f333bd65ce3c9f27de592fcbc256dafe3af2717f56d7c87761fbaccf4" +checksum = "3d8c6bba9b149ee82950daefc9623b32bb1dacbfb1890e352f6b887bd582adaf" dependencies = [ "proc-macro2", "quote", @@ -3348,10 +3347,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows" -version = "0.48.0" +name = "windows-core" +version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ "windows-targets", ] @@ -3424,9 +3423,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.16" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907" +checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" dependencies = [ "memchr", ] @@ -3450,13 +3449,42 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "zerocopy" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c19fae0c8a9efc6a8281f2e623db8af1db9e57852e04cde3e754dd2dc29340f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc56589e9ddd1f1c28d4b4b5c773ce232910a6bb67a70133d61c9e347585efe9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "zstd" version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" dependencies = [ - "zstd-safe", + "zstd-safe 6.0.6", +] + +[[package]] +name = "zstd" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +dependencies = [ + "zstd-safe 7.0.0", ] [[package]] @@ -3469,13 +3497,21 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +dependencies = [ + "zstd-sys", +] + [[package]] name = "zstd-sys" -version = "2.0.8+zstd.1.5.5" +version = "2.0.9+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" dependencies = [ "cc", - "libc", "pkg-config", ] diff --git a/rust/lakesoul-datafusion/Cargo.toml b/rust/lakesoul-datafusion/Cargo.toml index 810c9e61b..8e0aa7dd9 100644 --- a/rust/lakesoul-datafusion/Cargo.toml +++ b/rust/lakesoul-datafusion/Cargo.toml @@ -14,3 +14,6 @@ lakesoul-io = { path = "../lakesoul-io" } lakesoul-metadata = { path = "../lakesoul-metadata" } proto = { path = "../proto" } prost = "0.11" +async-trait = "0.1" +futures = "0.3" +uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]} diff --git a/rust/lakesoul-datafusion/src/test/upsert_tests.rs b/rust/lakesoul-datafusion/src/test/upsert_tests.rs index aa55b06dc..4e96a21a9 100644 --- a/rust/lakesoul-datafusion/src/test/upsert_tests.rs +++ b/rust/lakesoul-datafusion/src/test/upsert_tests.rs @@ -519,4 +519,204 @@ mod upsert_with_io_config_tests { } -} \ No newline at end of file +} + +mod upsert_with_metadata_tests { + use std::sync::Arc; + use std::env; + use std::path::PathBuf; + use std::time::SystemTime; + + + use lakesoul_io::datasource::parquet_source::EmptySchemaProvider; + use lakesoul_io::serde_json::json; + use lakesoul_io::{arrow, datafusion, tokio, serde_json}; + + use arrow::util::pretty::print_batches; + use arrow::datatypes::{Schema, SchemaRef, Field}; + use arrow::record_batch::RecordBatch; + use arrow::array::{ArrayRef, Int32Array}; + + use datafusion::assert_batches_eq; + use datafusion::prelude::{DataFrame, SessionContext}; + use datafusion::logical_expr::LogicalPlanBuilder; + use datafusion::error::Result; + + use proto::proto::entity::{TableInfo, DataCommitInfo, FileOp, DataFileOp, CommitOp, Uuid}; + use tokio::runtime::{Builder, Runtime}; + + use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfigBuilder, LakeSoulIOConfig}; + use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader}; + use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; + + use lakesoul_metadata::{Client, PreparedStatementMap, MetaDataClient}; + + + fn commit_data(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()>{ + let table_name_id = client.get_table_name_id_by_table_name(table_name, "default")?; + match client.commit_data_commit_info(DataCommitInfo { + table_id: table_name_id.table_id, + partition_desc: "-5".to_string(), + file_ops: config.files_slice() + .iter() + .map(|file| DataFileOp { + file_op: FileOp::Add as i32, + path: file.clone(), + ..Default::default() + }) + .collect(), + commit_op: CommitOp::AppendCommit as i32, + timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as i64, + commit_id: { + let (high, low) = uuid::Uuid::new_v4().as_u64_pair(); + Some(Uuid{high, low}) + }, + ..Default::default() + }) + { + Ok(()) => Ok(()), + Err(e) => Err(lakesoul_io::lakesoul_reader::DataFusionError::IoError(e)) + } + } + + fn create_table(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()> { + match client.create_table( + TableInfo { + table_id: format!("table_{}", uuid::Uuid::new_v4().to_string()), + table_name: table_name.to_string(), + table_path: [env::temp_dir().to_str().unwrap(), table_name].iter().collect::().to_str().unwrap().to_string(), + table_schema: serde_json::to_string(&config.schema()).unwrap(), + table_namespace: "default".to_string(), + properties: "{}".to_string(), + partitions: ";".to_owned() + config.primary_keys_slice().iter().map(String::as_str).collect::>().join(",").as_str(), + domain: "public".to_string(), + }) + { + Ok(()) => Ok(()), + Err(e) => Err(lakesoul_io::lakesoul_reader::DataFusionError::IoError(e)) + } + } + + fn create_io_config_builder(client: &mut MetaDataClient, table_name: &str) -> LakeSoulIOConfigBuilder { + let table_info = client.get_table_info_by_table_name(table_name, "default").unwrap(); + let data_files = client.get_data_files_by_table_name(table_name, vec![], "default").unwrap(); + let schema_str = client.get_schema_by_table_name(table_name, "default").unwrap(); + let schema = serde_json::from_str::(schema_str.as_str()).unwrap(); + + LakeSoulIOConfigBuilder::new() + .with_files(data_files) + .with_schema(Arc::new(schema)) + .with_primary_keys( + parse_table_info_partitions(table_info.partitions).1 + ) + } + + fn parse_table_info_partitions(partitions: String) -> (Vec, Vec) { + let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').unwrap()); + let hash_keys = &hash_keys[1..]; + ( + range_keys.split(',') + .collect::>() + .iter() + .map(|str|str.to_string()) + .collect::>(), + hash_keys.split(',') + .collect::>() + .iter() + .map(|str|str.to_string()) + .collect::>() + ) + } + + fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch { + let values = values + .into_iter() + .map(|vec| Arc::new(Int32Array::from(Vec::from(vec))) as ArrayRef) + .collect::>(); + let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::>(); + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + + + fn execute_upsert(batch: RecordBatch, table_name: &str, client: &mut MetaDataClient) -> Result<()> { + let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::().to_str().unwrap().to_string(); + let builder = create_io_config_builder(client, table_name).with_file(file.clone()).with_schema(batch.schema()); + let config = builder.clone().build(); + + let writer = SyncSendableMutableLakeSoulWriter::try_new(config, Builder::new_current_thread().build().unwrap()).unwrap(); + writer.write_batch(batch)?; + writer.flush_and_close()?; + commit_data(client, table_name, builder.clone().build()) + } + + + + + fn init_table(batch: RecordBatch, table_name: &str, pks:Vec, client: &mut MetaDataClient) -> Result<()> { + let schema = batch.schema(); + let builder = LakeSoulIOConfigBuilder::new() + .with_schema(schema.clone()) + .with_primary_keys(pks); + create_table(client, table_name, builder.build())?; + execute_upsert(batch, table_name, client) + } + + + + fn check_upsert(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option, client: &mut MetaDataClient, expected: &[&str]) -> Result<()> { + execute_upsert(batch, table_name, client)?; + let builder = create_io_config_builder(client, table_name); + let builder = builder + .with_schema(SchemaRef::new(Schema::new( + selected_cols.iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() + ))); + let builder = if let Some(filters) = filters { + builder.with_filter_str(filters) + } else { + builder + }; + let mut reader = SyncSendableMutableLakeSoulReader::new(LakeSoulReader::new(builder.build()).unwrap(), Builder::new_current_thread().build().unwrap()); + reader.start_blocked()?; + let result = reader.next_rb_blocked(); + match result { + Some(result) => { + assert_batches_eq!(expected, &[result?]); + Ok(()) + }, + None => Ok(()) + } + } + + #[test] + fn test_merge_same_column_i32() -> Result<()>{ + let table_name = "merge-same_column"; + let mut client = MetaDataClient::from_env(); + // let mut client = MetaDataClient::from_config("host=127.0.0.1 port=5433 dbname=test_lakesoul_meta user=yugabyte password=yugabyte".to_string()); + client.meta_cleanup()?; + init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + vec!["range".to_string(), "hash".to_string()], + &mut client, + )?; + + check_upsert( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + None, + &mut client, + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 11 |", + "| 20201101 | 2 | 2 |", + "| 20201101 | 3 | 33 |", + "| 20201101 | 4 | 44 |", + "| 20201102 | 4 | 4 |", + "+----------+------+-------+", + ] + ) + } +} diff --git a/rust/lakesoul-io/Cargo.toml b/rust/lakesoul-io/Cargo.toml index 71607c006..04e928cb3 100644 --- a/rust/lakesoul-io/Cargo.toml +++ b/rust/lakesoul-io/Cargo.toml @@ -8,7 +8,7 @@ version = "2.3.0" edition = "2021" [dependencies] -datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch", features = ["simd"] } +datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-27-parquet-prefetch"} object_store = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["aws"] } tokio-stream = "0.1.9" @@ -16,9 +16,9 @@ tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["io", "compat"]} derivative = "2.2.0" atomic_refcell = "0.1.8" -arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint", "simd"] } +arrow = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["prettyprint"] } arrow-schema = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["serde"] } -arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["simd", "chrono-tz"] } +arrow-array = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["chrono-tz"] } arrow-buffer = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred" } parquet = { git = "https://github.com/lakesoul-io/arrow-rs.git", branch = "arrow-rs-42-parquet-bufferred", features = ["async", "arrow"] } futures = "0.3" @@ -36,6 +36,8 @@ serde_json = { version = "1.0"} [features] hdfs = ["dep:hdrs"] +simd = ["datafusion/simd", "arrow/simd", "arrow-array/simd"] +default = [] [dev-dependencies] tempfile = "3.3.0" diff --git a/rust/lakesoul-io/src/datasource/mod.rs b/rust/lakesoul-io/src/datasource/mod.rs index de47b1339..eeb3c244b 100644 --- a/rust/lakesoul-io/src/datasource/mod.rs +++ b/rust/lakesoul-io/src/datasource/mod.rs @@ -3,3 +3,4 @@ // SPDX-License-Identifier: Apache-2.0 pub mod parquet_source; +pub mod parquet_sink; diff --git a/rust/lakesoul-io/src/datasource/parquet_sink.rs b/rust/lakesoul-io/src/datasource/parquet_sink.rs new file mode 100644 index 000000000..45a41bac3 --- /dev/null +++ b/rust/lakesoul-io/src/datasource/parquet_sink.rs @@ -0,0 +1,192 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + + +use std::any::Any; +use std::fmt::{self, Debug}; +use std::sync::Arc; + +use async_trait::async_trait; + +use datafusion::common::{Statistics, DataFusionError}; + +use datafusion::datasource::TableProvider; +use datafusion::error::Result; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::logical_expr::{ + Expr, TableType, +}; +use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Distribution, stream::RecordBatchStreamAdapter +}; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; + +use crate::lakesoul_io_config::{IOSchema, LakeSoulIOConfig}; +use crate::lakesoul_writer::MultiPartAsyncWriter; +use crate::transform::uniform_schema; + +#[derive(Debug, Clone)] +pub struct LakeSoulParquetSinkProvider{ + schema: SchemaRef, + config: LakeSoulIOConfig +} + +#[async_trait] +impl TableProvider for LakeSoulParquetSinkProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + _projections: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let msg = "Scan not implemented for LakeSoulParquetSinkProvider".to_owned(); + Err(DataFusionError::NotImplemented(msg)) + } + + + async fn insert_into( + &self, + _state: &SessionState, + input: Arc, + ) -> Result> { + let writer_schema = self.schema(); + let mut writer_config = self.config.clone(); + writer_config.schema = IOSchema(uniform_schema(writer_schema)); + let _writer = MultiPartAsyncWriter::try_new(writer_config).await?; + Ok(Arc::new(LakeSoulParquetSinkExec::new(input))) + } + +} + +#[derive(Debug, Clone)] +struct LakeSoulParquetSinkExec { + /// Input plan that produces the record batches to be written. + input: Arc, + /// Sink to whic to write + // sink: Arc, + /// Schema describing the structure of the data. + schema: SchemaRef, + +} + +impl LakeSoulParquetSinkExec { + fn new( + input: Arc, + ) -> Self { + Self { + input, + schema: Arc::new(Schema::empty()) + } + } +} + +impl DisplayAs for LakeSoulParquetSinkExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result { + write!(f, "LakeSoulParquetSinkExec") + } +} + +impl ExecutionPlan for LakeSoulParquetSinkExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] + } + + fn required_input_ordering(&self) -> Vec>> { + // Require that the InsertExec gets the data in the order the + // input produced it (otherwise the optimizer may chose to reorder + // the input which could result in unintended / poor UX) + // + // More rationale: + // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 + vec![self + .input + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs)] + } + + fn maintains_input_order(&self) -> Vec { + vec![false] + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self { + input: children[0].clone(), + schema: self.schema.clone(), + })) + } + + /// Execute the plan and return a stream of `RecordBatch`es for + /// the specified partition. + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if partition != 0 { + return Err(DataFusionError::Internal( + format!("Invalid requested partition {partition}. InsertExec requires a single input partition." + ))); + } + + // Execute each of our own input's partitions and pass them to the sink + let input_partition_count = self.input.output_partitioning().partition_count(); + if input_partition_count != 1 { + return Err(DataFusionError::Internal(format!( + "Invalid input partition count {input_partition_count}. \ + InsertExec needs only a single partition." + ))); + } + + let data = self.input.execute(0, context)?; + let schema = self.schema.clone(); + + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, data))) + } + + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} diff --git a/rust/lakesoul-io/src/datasource/parquet_source.rs b/rust/lakesoul-io/src/datasource/parquet_source.rs index e47a6de83..bfe18399e 100644 --- a/rust/lakesoul-io/src/datasource/parquet_source.rs +++ b/rust/lakesoul-io/src/datasource/parquet_source.rs @@ -75,6 +75,18 @@ impl TableProvider for EmptySchemaProvider { empty_schema: self.empty_schema.clone(), })) } + + async fn insert_into( + &self, + _state: &SessionState, + _input: Arc, + ) -> Result> { + Ok(Arc::new(EmptySchemaScanExec { + count: self.count, + empty_schema: self.empty_schema.clone(), + })) + } + } #[derive(Debug)] @@ -136,7 +148,7 @@ impl LakeSoulParquetProvider { } } - pub(crate) async fn build_with_context(&self, context: &SessionContext) -> Result { + pub async fn build_with_context(&self, context: &SessionContext) -> Result { let mut plans = vec![]; let mut full_schema = uniform_schema(self.config.schema.0.clone()).to_dfschema().unwrap(); for i in 0..self.config.files.len() { @@ -157,15 +169,15 @@ impl LakeSoulParquetProvider { self.full_schema.clone() } - pub(crate) async fn create_physical_plan( + pub async fn create_physical_plan( &self, projections: Option<&Vec>, - full_schema: SchemaRef, + schema: SchemaRef, inputs: Vec>, ) -> Result> { Ok(Arc::new(LakeSoulParquetScanExec::new( projections, - full_schema, + schema, inputs, Arc::new(self.config.default_column_value.clone()), Arc::new(self.config.merge_operators.clone()), @@ -242,7 +254,7 @@ impl TableProvider for LakeSoulParquetProvider { inputs.push(phycical_plan); } - let full_schema = SchemaRef::new(Schema::new( + let physical_schema = SchemaRef::new(Schema::new( self.get_full_schema() .fields() .iter() @@ -263,7 +275,7 @@ impl TableProvider for LakeSoulParquetProvider { .collect::>(), )); - self.create_physical_plan(projections, full_schema, inputs).await + self.create_physical_plan(projections, physical_schema, inputs).await } } @@ -271,8 +283,7 @@ impl TableProvider for LakeSoulParquetProvider { struct LakeSoulParquetScanExec { projections: Vec, origin_schema: SchemaRef, - target_schema_with_pks: SchemaRef, - target_schema: SchemaRef, + projected_schema: SchemaRef, inputs: Vec>, default_column_value: Arc>, merge_operators: Arc>, @@ -282,30 +293,16 @@ struct LakeSoulParquetScanExec { impl LakeSoulParquetScanExec { fn new( projections: Option<&Vec>, - full_schema: SchemaRef, + schema: SchemaRef, inputs: Vec>, default_column_value: Arc>, merge_operators: Arc>, primary_keys: Arc>, ) -> Self { - let target_schema_with_pks = if let Some(proj) = projections { - let mut proj_with_pks = proj.clone(); - for idx in 0..primary_keys.len() { - let field_idx = full_schema.index_of(primary_keys[idx].as_str()).unwrap(); - if !projections.unwrap().contains(&field_idx) { - proj_with_pks.push(field_idx); - } - } - project_schema(&full_schema, Some(&proj_with_pks)).unwrap() - } else { - full_schema.clone() - }; - Self { projections: projections.unwrap().clone(), - origin_schema: full_schema.clone(), - target_schema_with_pks, - target_schema: project_schema(&full_schema, projections).unwrap(), + origin_schema: schema.clone(), + projected_schema: project_schema(&schema, projections).unwrap(), inputs, default_column_value, merge_operators, @@ -330,7 +327,7 @@ impl ExecutionPlan for LakeSoulParquetScanExec { } fn schema(&self) -> SchemaRef { - self.target_schema.clone() + self.projected_schema.clone() } fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { @@ -358,7 +355,7 @@ impl ExecutionPlan for LakeSoulParquetScanExec { } let merged_stream = merge_stream( stream_init_futs, - self.target_schema_with_pks.clone(), + self.schema(), self.primary_keys.clone(), self.default_column_value.clone(), self.merge_operators.clone(), @@ -377,7 +374,7 @@ impl ExecutionPlan for LakeSoulParquetScanExec { .unwrap() }) .collect::>(), - schema: self.target_schema.clone(), + schema: self.projected_schema.clone(), input: merged_stream, }; @@ -447,15 +444,17 @@ pub fn merge_stream( } fn schema_intersection(df_schema: DFSchemaRef, schema: SchemaRef, primary_keys: &[String]) -> Vec { - df_schema + schema .fields() .iter() - .filter_map(|df_field| match schema.field_with_name(df_field.name()) { - // datafusion's select is case sensitive, but col will transform field name to lower case - // so we use Column::new_unqualified instead - Ok(_) => Some(Column(datafusion::common::Column::new_unqualified(df_field.name()))), - _ if primary_keys.contains(df_field.name()) => Some(Column(datafusion::common::Column::new_unqualified(df_field.name()))), - _ => None + .filter_map(|field| match df_schema.field_with_name(None, field.name()) { + // datafusion's select is case sensitive, but col will transform field name to lower case + // so we use Column::new_unqualified instead + Ok(df_field) => Some(Column(datafusion::common::Column::new_unqualified(df_field.name()))), + _ if primary_keys.contains(field.name()) => { + Some(Column(datafusion::common::Column::new_unqualified(field.name()))) + } + _ => None, }) .collect() } diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 279426c64..a029d22be 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -74,6 +74,20 @@ pub struct LakeSoulIOConfig { pub(crate) default_fs: String, } +impl LakeSoulIOConfig { + pub fn schema(&self) -> SchemaRef { + self.schema.0.clone() + } + + pub fn primary_keys_slice(&self) -> &[String] { + &self.primary_keys + } + + pub fn files_slice(&self) -> &[String] { + &self.files + } +} + #[derive(Derivative, Debug)] #[derivative(Clone, Default)] pub struct LakeSoulIOConfigBuilder { diff --git a/rust/lakesoul-io/src/lib.rs b/rust/lakesoul-io/src/lib.rs index 99213bc20..a884a0518 100644 --- a/rust/lakesoul-io/src/lib.rs +++ b/rust/lakesoul-io/src/lib.rs @@ -12,15 +12,15 @@ pub mod filter; pub mod lakesoul_writer; pub mod lakesoul_io_config; pub mod sorted_merge; -mod datasource; +pub mod datasource; mod projection; #[cfg(feature = "hdfs")] mod hdfs; -pub mod default_column_stream; -pub mod constant; -pub mod transform; +mod default_column_stream; +mod constant; +mod transform; pub use datafusion::arrow::error::Result; pub use tokio; diff --git a/rust/lakesoul-metadata/Cargo.toml b/rust/lakesoul-metadata/Cargo.toml index b2574d21d..730808d5c 100644 --- a/rust/lakesoul-metadata/Cargo.toml +++ b/rust/lakesoul-metadata/Cargo.toml @@ -22,4 +22,5 @@ prost = "0.11" num_enum = "0.5.1" uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]} serde_json = { version = "1.0"} +url = "2.4.1" diff --git a/rust/lakesoul-metadata/src/lib.rs b/rust/lakesoul-metadata/src/lib.rs index 8310bda36..a23dc090d 100644 --- a/rust/lakesoul-metadata/src/lib.rs +++ b/rust/lakesoul-metadata/src/lib.rs @@ -14,6 +14,9 @@ pub use tokio::runtime::{Builder, Runtime}; pub use tokio_postgres::{NoTls, Client, Statement}; use postgres_types::{ToSql, FromSql}; +mod metadata_client; +pub use metadata_client::MetaDataClient; + pub const DAO_TYPE_QUERY_ONE_OFFSET : i32 = 0; pub const DAO_TYPE_QUERY_LIST_OFFSET : i32 = 100; pub const DAO_TYPE_INSERT_ONE_OFFSET : i32 = 200; @@ -216,7 +219,7 @@ fn get_prepared_statement( // Select PartitionInfo DaoType::SelectPartitionVersionByTableIdAndDescAndVersion => "select table_id, partition_desc, version, commit_op, snapshot, expression, domain - from partition_info + from partition_info from partition_info where table_id = $1::TEXT and partition_desc = $2::TEXT and version = $3::INT", DaoType::SelectOnePartitionVersionByTableIdAndDesc => "select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression, m.domain from ( @@ -1286,7 +1289,7 @@ pub fn execute_query_scalar( }); match result { Ok(Some(row)) => { - let ts = row.get::<_, Option>(0); + let ts = row.get::<_, Option>(0); match ts { Some(ts) => Ok(Some(format!("{}", ts))), None => Ok(None) diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs new file mode 100644 index 000000000..792cc820c --- /dev/null +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -0,0 +1,332 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +use std::{io::Result, collections::HashMap, vec, env, fs}; + +use proto::proto::entity::{TablePathId, TableNameId, TableInfo, PartitionInfo, JniWrapper, DataCommitInfo, MetaInfo, CommitOp, self}; +use prost::Message; +use tokio::runtime::{Runtime, Builder}; +use tokio_postgres::Client; + +use url::Url; + +use crate::{execute_insert, PreparedStatementMap, DaoType, create_connection, clean_meta_for_test, execute_query, PARAM_DELIM, PARTITION_DESC_DELIM}; + +pub struct MetaDataClient { + runtime: Runtime, + client: Client, + prepared: PreparedStatementMap, +} + +impl MetaDataClient { + pub fn from_env() -> Self{ + let config_path = env::var("lakesoul_home").expect("lakesoul_home should be configured"); + let config = fs::read_to_string(config_path).expect(""); + let config_map = config.split('\n').filter_map(|property| { + property.find('=').map(|idx| property.split_at(idx + 1)) + }).collect::>(); + let url = Url::parse(&config_map.get("lakesoul.pg.url=").expect("")[5..]).unwrap(); + Self::from_config( + format!( + "host={} port={} dbname={} user={} password={}", + url.host_str().unwrap(), + url.port().unwrap(), + url.path_segments().unwrap().next().unwrap(), + config_map.get("lakesoul.pg.username=").unwrap(), + config_map.get("lakesoul.pg.password=").unwrap()) + ) + } + + pub fn from_config(config: String) -> Self { + let runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .max_blocking_threads(8) + .build() + .unwrap(); + let client = create_connection(&runtime, config).unwrap(); + let prepared = PreparedStatementMap::new(); + Self { + runtime, + client, + prepared + } + } + + pub fn create_table( + &mut self, + table_info: TableInfo + ) -> Result<()> { + let _ = self.insert_table_path_id(&table_path_id_from_table_info(&table_info))?; + let _ = self.insert_table_name_id(&table_name_id_from_table_info(&table_info))?; + let _ = self.insert_table_info(&table_info)?; + Ok(()) + } + + fn execute_insert(&mut self, insert_type: i32, wrapper: JniWrapper) -> Result { + execute_insert(&self.runtime, &mut self.client, &mut self.prepared, insert_type, wrapper) + } + + fn execute_query(&mut self, query_type: i32, joined_string: String) -> Result { + let encoded = execute_query(&self.runtime, &self.client, &mut self.prepared, query_type, joined_string)?; + match JniWrapper::decode(prost::bytes::Bytes::from(encoded)) { + Ok(wrapper) => Ok(wrapper), + Err(err) => Err(std::io::Error::other(err)) + } + } + + fn insert_table_info(&mut self, table_info: &TableInfo) -> Result { + self.execute_insert(DaoType::InsertTableInfo as i32, JniWrapper{table_info: vec![table_info.clone()], ..Default::default()}) + } + + fn insert_table_name_id(&mut self, table_name_id: &TableNameId) -> Result{ + self.execute_insert(DaoType::InsertTableNameId as i32, JniWrapper{table_name_id: vec![table_name_id.clone()], ..Default::default()}) + } + + fn insert_table_path_id(&mut self, table_path_id: &TablePathId) -> Result{ + self.execute_insert(DaoType::InsertTablePathId as i32, JniWrapper{table_path_id: vec![table_path_id.clone()], ..Default::default()}) + } + + fn insert_data_commit_info(&mut self, data_commit_info: &DataCommitInfo) -> Result { + self.execute_insert(DaoType::InsertDataCommitInfo as i32, JniWrapper{data_commit_info: vec![data_commit_info.clone()], ..Default::default()}) + } + + fn transaction_insert_partition_info(&mut self, partition_info_list: Vec) -> Result { + self.execute_insert(DaoType::TransactionInsertPartitionInfo as i32, JniWrapper { partition_info: partition_info_list, ..Default::default()}) + } + + pub fn meta_cleanup(&mut self) -> Result { + clean_meta_for_test(&self.runtime, &self.client) + } + + pub fn commit_data(&mut self, meta_info: MetaInfo, commit_op: CommitOp) -> Result<()> { + let table_info = meta_info.table_info.unwrap(); + if !table_info.table_name.is_empty() { + // todo: updateTableShortName + + + } + // todo: updateTableProperties + + // conflict handling + let _raw_map = meta_info.list_partition + .iter() + .map(|partition_info| (partition_info.partition_desc.clone(), partition_info.clone())) + .collect::>(); + + let partition_desc_list = meta_info.list_partition + .iter() + .map(|partition_info| partition_info.partition_desc.clone()) + .collect::>(); + + let _snapshot_list = meta_info.list_partition + .iter() + .flat_map(|partition_info| partition_info.snapshot.clone()) + .collect::>(); + + // conflict handling + let cur_map = self.get_cur_partition_map(&table_info.table_id, &partition_desc_list)?; + + + match commit_op { + CommitOp::AppendCommit | CommitOp::MergeCommit => { + let new_partition_list = meta_info.list_partition + .iter() + .map(|partition_info| { + let partition_desc = &partition_info.partition_desc; + match cur_map.get(partition_desc) { + Some(cur_partition_info) => { + let mut cur_partition_info = cur_partition_info.clone(); + cur_partition_info.domain = self.get_table_domain(&table_info.table_id).unwrap(); + cur_partition_info.snapshot.extend_from_slice(&partition_info.snapshot[..]); + cur_partition_info.version += 1; + cur_partition_info.commit_op = commit_op as i32; + cur_partition_info.expression = partition_info.expression.clone(); + cur_partition_info + } + None => PartitionInfo { + table_id: table_info.table_id.clone(), + partition_desc: partition_desc.clone(), + version: 0, + snapshot: Vec::from(&partition_info.snapshot[..]), + domain: self.get_table_domain(&table_info.table_id).unwrap(), + commit_op: commit_op as i32, + expression: partition_info.expression.clone(), + ..Default::default() + } + } + }) + .collect::>(); + match self.transaction_insert_partition_info(new_partition_list) { + Ok(_) => Ok(()), + Err(e) => Err(e) + } + } + _ => { + todo!() + } + } + } + + fn get_cur_partition_map(&mut self, table_id: &str, partition_desc_list: &[String]) -> Result> { + Ok(self.get_partition_info_by_table_id_and_partition_list(table_id, partition_desc_list)? + .iter() + .map(|partition_info|(partition_info.partition_desc.clone(), partition_info.clone())) + .collect() + ) + } + + pub fn commit_data_commit_info(&mut self, data_commit_info: DataCommitInfo) -> Result<()> { + let table_id = &data_commit_info.table_id; + let partition_desc = &data_commit_info.partition_desc; + let commit_op = data_commit_info.commit_op; + let commit_id = &data_commit_info.commit_id.clone().unwrap(); + let commit_id_str = uuid::Uuid::from_u64_pair(commit_id.high, commit_id.low).to_string(); + match self.get_single_data_commit_info(table_id, partition_desc, &commit_id_str)? { + Some(data_commit_info) if data_commit_info.committed => { + return Ok(()); + } + None => { + let _ = self.insert_data_commit_info(&data_commit_info); + } + _ => {} + }; + let table_info = Some(self.get_table_info_by_table_id(table_id)?); + let domain = self.get_table_domain(table_id)?; + self.commit_data(MetaInfo { + table_info, + list_partition: vec![PartitionInfo { + table_id: table_id.clone(), + partition_desc: partition_desc.clone(), + commit_op, + domain, + snapshot: vec![commit_id.clone()], + ..Default::default() + }], + ..Default::default() + }, CommitOp::from_i32(commit_op).unwrap()) + } + + pub fn get_table_domain(&mut self, _table_id: &str) -> Result { + Ok("public".to_string()) + } + + pub fn get_table_name_id_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + match self.execute_query(DaoType::SelectTableNameIdByTableName as i32, [table_name, namespace].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(wrapper.table_name_id[0].clone()), + Err(err) => Err(err) + } + } + + pub fn get_table_info_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + match self.execute_query(DaoType::SelectTableInfoByTableNameAndNameSpace as i32, [table_name, namespace].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(wrapper.table_info[0].clone()), + Err(err) => Err(err) + } + } + + pub fn get_table_info_by_table_id(&mut self, table_id: &str) -> Result { + match self.execute_query(DaoType::SelectTableInfoByTableId as i32, table_id.to_string()) { + Ok(wrapper) => Ok(wrapper.table_info[0].clone()), + Err(err) => Err(err) + } + } + + + pub fn get_data_files_by_table_name(&mut self, table_name: &str, partitions: Vec<(&str, &str)>, namespace: &str) -> Result> { + let partition_filter = partitions + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>(); + let table_info = self.get_table_info_by_table_name(table_name, namespace)?; + let partition_list = self.get_all_partition_info(table_info.table_id.as_str())?; + let mut data_commit_info_list = Vec::::new(); + for idx in 0..partition_list.len() { + let partition_info = partition_list.get(idx).unwrap(); + let partition_desc = partition_info.partition_desc.clone(); + if partition_filter.contains(&partition_desc) { + continue; + } else { + let _data_commit_info_list = self.get_data_commit_info_of_single_partition(partition_info).unwrap(); + // let data_commit_info_list = Vec::::new(); + let _data_file_list = _data_commit_info_list + .iter() + .flat_map(|data_commit_info| { + data_commit_info.file_ops + .iter() + .map(|file_op| file_op.path.clone()) + .collect::>() + }) + .collect::>(); + data_commit_info_list.extend_from_slice(&_data_file_list); + } + } + Ok(data_commit_info_list) + } + + fn get_data_commit_info_of_single_partition(&mut self, partition_info: &PartitionInfo) -> Result> { + let table_id = &partition_info.table_id; + let partition_desc = &partition_info.partition_desc; + let joined_commit_id = &partition_info.snapshot + .iter() + .map(|commit_id| format!("{:0>16x}{:0>16x}", commit_id.high, commit_id.low)) + .collect::>() + .join(""); + let joined_string = [table_id.as_str(), partition_desc.as_str(), joined_commit_id.as_str()].join(PARAM_DELIM); + match self.execute_query(DaoType::ListDataCommitInfoByTableIdAndPartitionDescAndCommitList as i32, joined_string) { + Ok(wrapper) => Ok(wrapper.data_commit_info), + Err(e) => Err(e), + } + } + + pub fn get_schema_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + let table_info = self.get_table_info_by_table_name(table_name, namespace)?; + Ok(table_info.table_schema) + } + + pub fn get_all_partition_info(&mut self, table_id: &str) -> Result> { + match self.execute_query(DaoType::ListPartitionByTableId as i32, table_id.to_string()) { + Ok(wrapper) => Ok(wrapper.partition_info), + Err(e) => Err(e), + } + } + + pub fn get_single_data_commit_info(&mut self, table_id: &str, partition_desc: &str, commit_id: &str) -> Result> { + match self.execute_query(DaoType::SelectOneDataCommitInfoByTableIdAndPartitionDescAndCommitId as i32, [table_id, partition_desc, commit_id].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(if wrapper.data_commit_info.is_empty() { + None + } else { + Some(wrapper.data_commit_info[0].clone()) + }), + Err(e) => Err(e), + } + } + + pub fn get_partition_info_by_table_id_and_partition_list(&mut self, table_id: &str, partition_desc_list: &[String]) -> Result> { + match self.execute_query(DaoType::ListPartitionDescByTableIdAndParList as i32, [table_id, partition_desc_list.join(PARTITION_DESC_DELIM).as_str()].join(PARAM_DELIM)) { + Ok(wrapper) => Ok(wrapper.partition_info), + Err(e) => Err(e), + } + + } + +} + +pub fn table_path_id_from_table_info(table_info: &TableInfo) -> TablePathId { + TablePathId { + table_path: table_info.table_path.clone(), + table_id: table_info.table_id.clone(), + table_namespace: table_info.table_namespace.clone(), + domain: table_info.domain.clone() + } +} +pub fn table_name_id_from_table_info(table_info: &TableInfo) -> TableNameId { + TableNameId { + table_name: table_info.table_name.clone(), + table_id: table_info.table_id.clone(), + table_namespace: table_info.table_namespace.clone(), + domain: table_info.domain.clone() + } +} +