From 7d9e86a4bc9731596665fa526fea4ba1b44678d9 Mon Sep 17 00:00:00 2001 From: Brad Richardson Date: Mon, 22 Jul 2024 11:32:13 -0400 Subject: [PATCH] Async writer (#11) * Probably working * Cleanup * File targets, tokio fixes, fix max row group value * Clippy + format * Current thread only * Shared runtime * Format * Update test workflow * Bump dependencies --------- Co-authored-by: Brad Richardson --- .github/workflows/build_test.yaml | 11 +- Cargo.lock | 451 ++++++++++++++++++------------ Cargo.toml | 14 +- benches/benchmark.rs | 4 +- src/lib.rs | 2 +- src/osm_arrow.rs | 5 +- src/sink.rs | 219 +++++++++------ src/util.rs | 21 +- test/prepare.sh | 16 -- test/test.sh | 26 ++ test/validate.py | 3 +- 11 files changed, 463 insertions(+), 309 deletions(-) delete mode 100755 test/prepare.sh create mode 100755 test/test.sh diff --git a/.github/workflows/build_test.yaml b/.github/workflows/build_test.yaml index 44a38d0..35ed3af 100644 --- a/.github/workflows/build_test.yaml +++ b/.github/workflows/build_test.yaml @@ -26,15 +26,16 @@ jobs: - name: Install python test dependencies working-directory: ./test run: pip3 install -r requirements.txt - - name: Prepare test files - run: ./prepare.sh + - name: Prepare and run test + if: success() || failure() + run: ./test.sh working-directory: ./test - - name: Run test - working-directory: ./test - run: python3 ./validate.py - name: Benchmark + if: success() || failure() run: cargo bench - name: Run clippy + if: success() || failure() run: cargo clippy --all-targets --all-features - name: Run cargo fmt + if: success() || failure() run: cargo fmt --all -- --check diff --git a/Cargo.lock b/Cargo.lock index 51c1f62..134dd8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,12 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "adler32" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" - [[package]] name = "ahash" version = "0.8.11" @@ -76,6 +70,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.14" @@ -125,6 +125,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "anyhow" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" + [[package]] name = "arrow" version = "52.1.0" @@ -157,7 +163,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", - "half 2.4.1", + "half", "num", ] @@ -172,7 +178,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", - "half 2.4.1", + "half", "hashbrown", "num", ] @@ -184,7 +190,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cae6970bab043c4fbc10aee1660ceb5b306d0c42c8cc5f6ae564efcd9759b663" dependencies = [ "bytes", - "half 2.4.1", + "half", "num", ] @@ -202,7 +208,7 @@ dependencies = [ "atoi", "base64", "chrono", - "half 2.4.1", + "half", "lexical-core", "num", "ryu", @@ -235,7 +241,7 @@ checksum = "a769666ffac256dd301006faca1ca553d0ae7cffcf4cd07095f73f95eb226514" dependencies = [ "arrow-buffer", "arrow-schema", - "half 2.4.1", + "half", "num", ] @@ -265,7 +271,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", - "half 2.4.1", + "half", "indexmap", "lexical-core", "num", @@ -284,7 +290,7 @@ dependencies = [ "arrow-data", "arrow-schema", "arrow-select", - "half 2.4.1", + "half", "num", ] @@ -299,7 +305,7 @@ dependencies = [ "arrow-buffer", "arrow-data", "arrow-schema", - "half 2.4.1", + "half", "hashbrown", ] @@ -348,7 +354,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -366,17 +372,6 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.3.0" @@ -472,13 +467,12 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.101" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac367972e516d45567c7eafc73d24e1c193dcf200a8d94e9db7b3d38b349572d" +checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" dependencies = [ "jobserver", "libc", - "once_cell", ] [[package]] @@ -497,25 +491,41 @@ dependencies = [ "iana-time-zone", "num-traits", "serde", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] -name = "clap" -version = "2.34.0" +name = "ciborium" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" dependencies = [ - "bitflags 1.3.2", - "textwrap", - "unicode-width", + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", ] [[package]] name = "clap" -version = "4.5.8" +version = "4.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84b3edb18336f4df585bc9aa31dd99c036dfa5dc5e9a2939a722a188f3a8970d" +checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462" dependencies = [ "clap_builder", "clap_derive", @@ -523,9 +533,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.8" +version = "4.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1c09dd5ada6c6c78075d6fd0da3f90d8080651e2d6cc8eb2f1aaa4034ced708" +checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942" dependencies = [ "anstream", "anstyle", @@ -542,7 +552,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -613,24 +623,24 @@ dependencies = [ [[package]] name = "criterion" -version = "0.3.6" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" dependencies = [ - "atty", + "anes", "cast", - "clap 2.34.0", + "ciborium", + "clap", "criterion-plot", - "csv", + "is-terminal", "itertools 0.10.5", - "lazy_static", "num-traits", + "once_cell", "oorandom", "plotters", "rayon", "regex", "serde", - "serde_cbor", "serde_derive", "serde_json", "tinytemplate", @@ -639,9 +649,9 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.4.5" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2673cc8207403546f45f5fd319a974b1e6983ad1a3ee7e6041650013be041876" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", "itertools 0.10.5", @@ -737,6 +747,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + [[package]] name = "flatbuffers" version = "24.3.25" @@ -829,7 +855,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -908,12 +934,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "half" -version = "1.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" - [[package]] name = "half" version = "2.4.1" @@ -945,11 +965,17 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" -version = "0.1.19" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "home" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "libc", + "windows-sys 0.52.0", ] [[package]] @@ -1099,15 +1125,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "inflate" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cdb29978cc5797bd8dcc8e5bf7de604891df2a8dc576973d71a281e916db2ff" -dependencies = [ - "adler32", -] - [[package]] name = "integer-encoding" version = "3.0.4" @@ -1120,6 +1137,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -1137,9 +1165,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" dependencies = [ "either", ] @@ -1260,6 +1288,12 @@ dependencies = [ "libc", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "lock_api" version = "0.4.12" @@ -1302,13 +1336,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] -name = "memmap" -version = "0.7.0" +name = "memmap2" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" dependencies = [ "libc", - "winapi", ] [[package]] @@ -1420,6 +1453,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.36.1" @@ -1431,9 +1474,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" +checksum = "e6da452820c715ce78221e8202ccc599b4a52f3e1eb3eedb487b680c81a8e3f3" dependencies = [ "async-trait", "base64", @@ -1442,7 +1485,7 @@ dependencies = [ "futures", "humantime", "hyper", - "itertools 0.12.1", + "itertools 0.13.0", "md-5", "parking_lot", "percent-encoding", @@ -1492,7 +1535,7 @@ version = "0.1.0" dependencies = [ "arrow", "bytes", - "clap 4.5.8", + "clap", "criterion", "flate2", "futures", @@ -1507,16 +1550,15 @@ dependencies = [ [[package]] name = "osmpbf" -version = "0.2.8" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c231c0fda8c367e0de57c98409a80c51b9aa0475d77ddca5f7a05fe0b51572e9" +checksum = "5a32d502e10d65b4ef06819cd12a931570e590aa70472aab5eee2cfaa5841d19" dependencies = [ "byteorder", "flate2", - "inflate", - "memmap", + "memmap2", "protobuf", - "protobuf-codegen-pure", + "protobuf-codegen", "rayon", ] @@ -1540,14 +1582,14 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] name = "parquet" -version = "52.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29c3b5322cc1bbf67f11c079c42be41a55949099b78732f7dba9e15edde40eab" +checksum = "0f22ba0d95db56dde8685e3fadcb915cdaadda31ab8abbe3ff7f0ad1ef333267" dependencies = [ "ahash", "arrow-array", @@ -1562,7 +1604,8 @@ dependencies = [ "bytes", "chrono", "flate2", - "half 2.4.1", + "futures", + "half", "hashbrown", "lz4_flex", "num", @@ -1571,6 +1614,7 @@ dependencies = [ "seq-macro", "snap", "thrift", + "tokio", "twox-hash", "zstd", "zstd-sys", @@ -1605,7 +1649,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -1671,34 +1715,60 @@ dependencies = [ [[package]] name = "protobuf" -version = "2.28.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +checksum = "df67496db1a89596beaced1579212e9b7c53c22dca1d9745de00ead76573d514" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror", +] [[package]] name = "protobuf-codegen" -version = "2.28.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6" +checksum = "eab09155fad2d39333d3796f67845d43e29b266eea74f7bc93f153f707f126dc" dependencies = [ + "anyhow", + "once_cell", "protobuf", + "protobuf-parse", + "regex", + "tempfile", + "thiserror", ] [[package]] -name = "protobuf-codegen-pure" -version = "2.28.0" +name = "protobuf-parse" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a29399fc94bcd3eeaa951c715f7bea69409b2445356b00519740bcd6ddd865" +checksum = "1a16027030d4ec33e423385f73bb559821827e9ec18c50e7874e4d6de5a4e96f" dependencies = [ + "anyhow", + "indexmap", + "log", "protobuf", - "protobuf-codegen", + "protobuf-support", + "tempfile", + "thiserror", + "which", +] + +[[package]] +name = "protobuf-support" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e2d30ab1878b2e72d1e2fc23ff5517799c9929e2cf81a8516f9f4dcf2b9cf3" +dependencies = [ + "thiserror", ] [[package]] name = "quick-xml" -version = "0.31.0" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +checksum = "4091e032efecb09d7b1f711f487b85ab925632a842627e3200fb088382cde32c" dependencies = [ "memchr", "serde", @@ -1812,9 +1882,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags 2.6.0", ] @@ -1929,6 +1999,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +dependencies = [ + "bitflags 2.6.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.23.11" @@ -2015,9 +2098,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "security-framework" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -2028,9 +2111,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" dependencies = [ "core-foundation-sys", "libc", @@ -2050,39 +2133,29 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.203" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" dependencies = [ "serde_derive", ] -[[package]] -name = "serde_cbor" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" -dependencies = [ - "half 1.8.3", - "serde", -] - [[package]] name = "serde_derive" -version = "1.0.203" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] name = "serde_json" -version = "1.0.118" +version = "1.0.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d947f6b3163d8857ea16c4fa0dd4840d52f3041039a85decd46867eb1abef2e4" +checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" dependencies = [ "itoa", "ryu", @@ -2191,9 +2264,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.68" +version = "2.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" dependencies = [ "proc-macro2", "quote", @@ -2222,32 +2295,35 @@ dependencies = [ ] [[package]] -name = "textwrap" -version = "0.11.0" +name = "tempfile" +version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ - "unicode-width", + "cfg-if", + "fastrand", + "rustix", + "windows-sys 0.52.0", ] [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2297,14 +2373,15 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes", "libc", "mio", + "num_cpus", "pin-project-lite", "socket2", "tokio-macros", @@ -2319,7 +2396,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2392,7 +2469,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2447,12 +2524,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-width" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" - [[package]] name = "untrusted" version = "0.9.0" @@ -2528,7 +2599,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", "wasm-bindgen-shared", ] @@ -2562,7 +2633,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2596,6 +2667,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2634,7 +2717,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -2643,7 +2726,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -2661,7 +2744,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -2681,18 +2764,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.5", - "windows_aarch64_msvc 0.52.5", - "windows_i686_gnu 0.52.5", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.5", - "windows_x86_64_gnu 0.52.5", - "windows_x86_64_gnullvm 0.52.5", - "windows_x86_64_msvc 0.52.5", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -2703,9 +2786,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -2715,9 +2798,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -2727,15 +2810,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" [[package]] name = "windows_i686_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -2745,9 +2828,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -2757,9 +2840,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -2769,9 +2852,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -2781,9 +2864,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winreg" @@ -2797,22 +2880,22 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.34" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.34" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2823,27 +2906,27 @@ checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" [[package]] name = "zstd" -version = "0.13.0" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "7.0.0" +version = "7.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +checksum = "fa556e971e7b568dc775c136fc9de8c779b1c2fc3a63defaafadffdbd3181afa" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.9+zstd.1.5.5" +version = "2.0.11+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +checksum = "75652c55c0b6f3e6f12eb786fe1bc960396bf05a1eb3bf1f3691c3610ac2e6d4" dependencies = [ "cc", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index 503f8ef..2a681a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,19 +6,19 @@ edition = "2021" [dependencies] arrow = "52.1.0" bytes = "1.6.1" -clap = { version = "4.5.8", features = ["derive"] } +clap = { version = "4.5.9", features = ["derive"] } flate2 = { version = "1.0.30", features = ["zlib-ng"], default-features = false } futures = "0.3.30" -object_store = {version = "0.10.1", features = ["aws"] } -osmpbf = "0.2.8" -parquet = "52.0.0" -rayon = "1.5.0" +object_store = {version = "0.10.2", features = ["aws"] } +osmpbf = "0.3.4" +parquet = { version = "52.1.0", features = ["async"] } +rayon = "1.10.0" sysinfo = "0.30.13" -tokio = { version = "1.38.0", features = ["rt"] } +tokio = { version = "1.38.1", features = ["rt", "rt-multi-thread"] } url = "2.5.2" [dev-dependencies] -criterion = "0.3" +criterion = "0.5.1" [lib] name = "osm_pbf_parquet" diff --git a/benches/benchmark.rs b/benches/benchmark.rs index b577b54..78452f0 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -7,8 +7,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { c.bench_function("benchmark", |b| { b.iter(|| { let args = Args::new( - "./test/el-salvador-latest.osm.pbf".to_string(), - "./test/bench-out/".to_string(), // Will just overwrite files on each run + "./test/test.osm.pbf".to_string(), + "./test/bench-out/".to_string(), 0, ); let _ = driver(args); diff --git a/src/lib.rs b/src/lib.rs index 061e0b0..99f753e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,7 +82,7 @@ pub fn driver(args: Args) -> Result<(), io::Error> { for sinkpool in sinkpools.values() { let mut pool = sinkpool.lock().unwrap(); for mut sink in pool.drain(..) { - sink.finish_batch(); + sink.finish(); } } } diff --git a/src/osm_arrow.rs b/src/osm_arrow.rs index 638960e..12c81cb 100644 --- a/src/osm_arrow.rs +++ b/src/osm_arrow.rs @@ -96,8 +96,6 @@ pub fn osm_arrow_schema() -> Schema { } pub struct OSMArrowBuilder { - schema: Arc, - id_builder: Box, tags_builder: Box>, lat_builder: Box, @@ -148,7 +146,6 @@ impl OSMArrowBuilder { let visible_builder = Box::new(BooleanBuilder::new()); OSMArrowBuilder { - schema: Arc::new(osm_arrow_schema()), id_builder, tags_builder, lat_builder, @@ -263,6 +260,6 @@ impl OSMArrowBuilder { Arc::new(self.visible_builder.finish()), ]; - RecordBatch::try_new(self.schema.clone(), array_refs) + RecordBatch::try_new(Arc::new(osm_arrow_schema()), array_refs) } } diff --git a/src/sink.rs b/src/sink.rs index de7dd05..76c31ea 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,127 +1,186 @@ -use futures::executor::block_on; +use object_store::buffered::BufWriter; use std::path::absolute; use std::sync::{Arc, Mutex}; -use bytes::Bytes; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; use object_store::path::Path; -use object_store::{ObjectStore, PutPayload}; use osmpbf::{DenseNode, Node, RelMemberType, Relation, Way}; -use parquet::arrow::ArrowWriter; +use parquet::arrow::async_writer::AsyncArrowWriter; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; +use tokio::runtime::Runtime; use url::Url; +use crate::osm_arrow::osm_arrow_schema; use crate::osm_arrow::OSMArrowBuilder; use crate::osm_arrow::OSMType; -use crate::util::{default_record_batch_size, ARGS}; +use crate::util::{default_record_batch_size_mb, ARGS}; pub struct ElementSink { - osm_builder: Box, - num_elements: u64, - estimated_current_size_bytes: usize, - filenum: Arc>, - output_path: String, + // Config for writing file pub osm_type: OSMType, - target_record_batch_size: usize, + filenum: Arc>, + + // Arrow wrappers + osm_builder: Box, + writer: Option>, + + // State tracking for batching + estimated_record_batch_bytes: usize, + estimated_file_bytes: usize, + target_record_batch_bytes: usize, + target_file_bytes: usize, + tokio_runtime: Arc, } impl ElementSink { - pub fn new( - filenum: Arc>, - // output_dir: String, - osm_type: OSMType, - ) -> Result { + pub fn new(filenum: Arc>, osm_type: OSMType) -> Result { let args = ARGS.get().unwrap(); + + let full_path = Self::create_full_path(&args.output, &osm_type, &filenum, args.compression); + let buf_writer = Self::create_buf_writer(&full_path); + let writer = Self::create_writer(buf_writer, args.compression, args.max_row_group_count); + + let target_record_batch_bytes = args + .record_batch_target_mb + .unwrap_or(default_record_batch_size_mb()) + * 1_000_000usize; + Ok(ElementSink { - osm_builder: Box::new(OSMArrowBuilder::new()), - num_elements: 0, - estimated_current_size_bytes: 0usize, - filenum, - output_path: args.output.clone(), osm_type, - target_record_batch_size: args - .record_batch_target_bytes - .unwrap_or(default_record_batch_size()), + filenum, + + osm_builder: Box::new(OSMArrowBuilder::new()), + writer: Some(writer), + + estimated_record_batch_bytes: 0usize, + estimated_file_bytes: 0usize, + target_record_batch_bytes, + target_file_bytes: args.file_target_mb * 1_000_000usize, + + // Underlying object store writer (cloud/s3) needs to run in a tokio runtime context + tokio_runtime: Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ), }) } - pub fn finish_batch(&mut self) { - let mut props_builder = WriterProperties::builder(); - let args = ARGS.get().unwrap(); - if args.compression == 0 { - props_builder = props_builder.set_compression(Compression::UNCOMPRESSED); - } else { - props_builder = props_builder.set_compression(Compression::ZSTD( - ZstdLevel::try_new(args.compression as i32).unwrap(), - )); - } - if let Some(max_row_group_size) = args.max_row_group_size { - props_builder = props_builder.set_max_row_group_size(max_row_group_size); + pub fn finish(&mut self) { + self.finish_batch(); + let _ = self + .tokio_runtime + .block_on(self.writer.take().unwrap().close()); + } + + fn finish_batch(&mut self) { + if self.estimated_record_batch_bytes == 0 { + // Nothing to write + return; } - let props = props_builder.build(); + let batch = self.osm_builder.finish().unwrap(); + let _ = self + .tokio_runtime + .block_on(self.writer.as_mut().unwrap().write(&batch)); - let trailing_path = self.new_trailing_path(&self.filenum, args.compression != 0); - // Remove trailing `/`s to avoid empty path segment - let full_path = format!( - "{0}{trailing_path}", - &self.output_path.trim_end_matches('/') - ); + // Reset writer to new path if needed + self.estimated_file_bytes += self.estimated_record_batch_bytes; + if self.estimated_file_bytes >= self.target_file_bytes { + let _ = self + .tokio_runtime + .block_on(self.writer.take().unwrap().close()); - let batch = self.osm_builder.finish().unwrap(); + // Create new writer and output + let args = ARGS.get().unwrap(); + let full_path = Self::create_full_path( + &args.output, + &self.osm_type, + &self.filenum, + args.compression, + ); + let buf_writer = Self::create_buf_writer(&full_path); + self.writer = Some(Self::create_writer( + buf_writer, + args.compression, + args.max_row_group_count, + )); + self.estimated_file_bytes = 0; + } - let mut buffer = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap(); - writer.write(&batch).expect("Writing batch"); - writer.close().unwrap(); + self.estimated_record_batch_bytes = 0; + } - let payload = PutPayload::from_bytes(Bytes::from(buffer)); + fn increment_and_cycle(&mut self) -> Result<(), std::io::Error> { + if self.estimated_record_batch_bytes >= self.target_record_batch_bytes { + self.finish_batch(); + } + Ok(()) + } - // TODO - create this when sink is created for reuse - if let Ok(url) = Url::parse(&full_path) { - let object_store = AmazonS3Builder::from_env() - .with_url(&full_path) + fn create_buf_writer(full_path: &str) -> BufWriter { + // TODO - better validation of URL/paths here and error handling + if let Ok(url) = Url::parse(full_path) { + let s3_store = AmazonS3Builder::from_env() + .with_url(url.clone()) .build() .unwrap(); - let path = Path::parse(url.path()).unwrap(); - // S3 object store put needs to in a tokio runtime context - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { object_store.put(&path, payload).await }) - .unwrap_or_else(|_| panic!("Failed to write to path {0}", &path)); + BufWriter::new(Arc::new(s3_store), path) } else { - let absolute_path = absolute(&full_path).unwrap(); - let store_path = Path::from_absolute_path(&absolute_path).unwrap(); let object_store = LocalFileSystem::new(); + let absolute_path = absolute(full_path).unwrap(); + let store_path = Path::from_absolute_path(absolute_path).unwrap(); - block_on(object_store.put(&store_path, payload)).unwrap_or_else(|_| { - panic!("Failed to write to path {0}", &absolute_path.display()) - }); + BufWriter::new(Arc::new(object_store), store_path) } - - self.num_elements = 0; - self.estimated_current_size_bytes = 0; } - fn increment_and_cycle(&mut self) -> Result<(), std::io::Error> { - self.num_elements += 1; - if self.estimated_current_size_bytes >= self.target_record_batch_size { - self.finish_batch(); + fn create_writer( + buffer: BufWriter, + compression: u8, + max_row_group_rows: Option, + ) -> AsyncArrowWriter { + let mut props_builder = WriterProperties::builder(); + if compression == 0 { + props_builder = props_builder.set_compression(Compression::UNCOMPRESSED); + } else { + props_builder = props_builder.set_compression(Compression::ZSTD( + ZstdLevel::try_new(compression as i32).unwrap(), + )); } - Ok(()) + if let Some(max_rows) = max_row_group_rows { + props_builder = props_builder.set_max_row_group_size(max_rows); + } + let props = props_builder.build(); + + AsyncArrowWriter::try_new(buffer, Arc::new(osm_arrow_schema()), Some(props)).unwrap() + } + + fn create_full_path( + output_path: &str, + osm_type: &OSMType, + filenum: &Arc>, + compression: u8, + ) -> String { + let trailing_path = Self::new_trailing_path(osm_type, filenum, compression != 0); + // Remove trailing `/`s to avoid empty path segment + format!("{0}{trailing_path}", &output_path.trim_end_matches('/')) } - fn new_trailing_path(&self, filenum: &Arc>, is_zstd_compression: bool) -> String { + fn new_trailing_path( + osm_type: &OSMType, + filenum: &Arc>, + is_zstd_compression: bool, + ) -> String { let mut num = filenum.lock().unwrap(); let compression_stem = if is_zstd_compression { ".zstd" } else { "" }; let path = format!( "/type={}/{}_{:04}{}.parquet", - self.osm_type, self.osm_type, num, compression_stem + osm_type, osm_type, num, compression_stem ); *num += 1; path @@ -151,7 +210,7 @@ impl ElementSink { info.version(), Some(info.visible()), ); - self.estimated_current_size_bytes += est_size_bytes; + self.estimated_record_batch_bytes += est_size_bytes; self.increment_and_cycle() } @@ -179,7 +238,7 @@ impl ElementSink { info.map(|info| info.version()), info.map(|info| info.visible()), ); - self.estimated_current_size_bytes += est_size_bytes; + self.estimated_record_batch_bytes += est_size_bytes; self.increment_and_cycle() } @@ -208,7 +267,7 @@ impl ElementSink { info.version(), Some(info.visible()), ); - self.estimated_current_size_bytes += est_size_bytes; + self.estimated_record_batch_bytes += est_size_bytes; self.increment_and_cycle() } @@ -252,7 +311,7 @@ impl ElementSink { info.version(), Some(info.visible()), ); - self.estimated_current_size_bytes += est_size_bytes; + self.estimated_record_batch_bytes += est_size_bytes; self.increment_and_cycle() } diff --git a/src/util.rs b/src/util.rs index 61277fe..c69a7b6 100644 --- a/src/util.rs +++ b/src/util.rs @@ -19,17 +19,21 @@ pub struct Args { pub output: String, /// Zstd compression level, 1-22, 0 for no compression - #[arg(long, default_value = "3")] + #[arg(long, default_value_t = 3)] pub compression: u8, /// Override target record batch size, balance this with available memory - /// default is total memory / CPU count / 4 + /// default is total memory (MB) / CPU count / 8 #[arg(long)] - pub record_batch_target_bytes: Option, + pub record_batch_target_mb: Option, /// Max feature count per row group #[arg(long)] - pub max_row_group_size: Option, + pub max_row_group_count: Option, + + /// Override target parquet file size + #[arg(long, default_value_t = 500usize)] + pub file_target_mb: usize, } impl Args { @@ -38,14 +42,15 @@ impl Args { input, output, compression, - record_batch_target_bytes: None, - max_row_group_size: None, + record_batch_target_mb: None, + max_row_group_count: None, + file_target_mb: 500usize, } } } -pub fn default_record_batch_size() -> usize { +pub fn default_record_batch_size_mb() -> usize { let system = System::new_all(); // Estimate per thread available memory, leaving overhead for copies and system processes - return (system.total_memory() as usize / system.cpus().len()) / 4usize; + return ((system.total_memory() as usize / 1_000_000usize) / system.cpus().len()) / 8usize; } diff --git a/test/prepare.sh b/test/prepare.sh deleted file mode 100755 index 2ea3c91..0000000 --- a/test/prepare.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -# Cleanup old artifacts if exist -rm cook-islands-latest.* -rm -rf ./parquet/ -mkdir -p ./parquet/ - -# Download PBF, convert to OSM XML -curl -L "https://download.geofabrik.de/australia-oceania/cook-islands-latest.osm.pbf" -o "cook-islands-latest.osm.pbf" -osmium cat cook-islands-latest.osm.pbf -o cook-islands-latest.osm - -# Also download larger PBF for benchmarking -curl -L "https://download.geofabrik.de/central-america/el-salvador-latest.osm.pbf" -o "el-salvador-latest.osm.pbf" - -# Run parquet conversion -cargo run -- --input cook-islands-latest.osm.pbf --output ./parquet/ diff --git a/test/test.sh b/test/test.sh new file mode 100755 index 0000000..dbedc5e --- /dev/null +++ b/test/test.sh @@ -0,0 +1,26 @@ +#!/bin/sh +set -e + +# Cleanup old artifacts if exist +rm -rf ./parquet/**/*.parquet +mkdir -p ./parquet/ + +test_file="test" + +# Download PBF, convert to OSM XML +if [ ! -f "./${test_file}.osm.pbf" ]; then + echo "Downloading file" + curl -L "https://download.geofabrik.de/australia-oceania/cook-islands-latest.osm.pbf" -o "${test_file}.osm.pbf" + echo "Creating OSM XML" + osmium cat ${test_file}.osm.pbf -o ${test_file}.osm +fi + +# Run parquet conversion +echo "Running conversion" +cargo run --release -- --input ${test_file}.osm.pbf --output ./parquet/ + +echo "Running validation" +python3 ./validate.py + +# Cleanup artifacts +# rm -rf ./parquet/**/*.parquet diff --git a/test/validate.py b/test/validate.py index 2e2c577..99d35cf 100644 --- a/test/validate.py +++ b/test/validate.py @@ -4,8 +4,7 @@ from pathlib import Path from datetime import datetime -OSM_XML_FILE = "cook-islands-latest.osm" -OSM_GEOJSON_FILE = "cook-islands-latest.geojsonseq" +OSM_XML_FILE = "test.osm" PARQUET_DIR = "./parquet" # Parse, process OSM XML file into a dataframe