From 8ed9baba7d49ecb9a0eb1771fa26ac75e8f62e7e Mon Sep 17 00:00:00 2001 From: Brad Richardson Date: Wed, 4 Sep 2024 09:55:35 -0400 Subject: [PATCH] S3 async read (#15) * Async read * Git instead of local * Format * Tokio dependent restructure * Format, cleanup sink finish * Unwraps, progress logs, futures unordered * Format * Update cargo files * Revert object store version * Lint * Only use object store to read pbfs * Semaphore to reduce tasks in memory, joinset * Fix semaphore --- Cargo.lock | 672 +++++++++++++++++++++++++------------------ Cargo.toml | 30 +- benches/benchmark.rs | 22 +- src/lib.rs | 207 ++++++++----- src/main.rs | 17 +- src/sink.rs | 95 +++--- src/util.rs | 14 + 7 files changed, 614 insertions(+), 443 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 614cb09..5757200 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "ahash" version = "0.8.11" @@ -78,9 +84,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" dependencies = [ "anstyle", "anstyle-parse", @@ -93,33 +99,33 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -133,9 +139,9 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "arrow" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6127ea5e585a12ec9f742232442828ebaf264dfa5eefdd71282376c599562b77" +checksum = "05048a8932648b63f21c37d88b552ccc8a65afb6dfe9fc9f30ce79174c2e7a85" dependencies = [ "arrow-arith", "arrow-array", @@ -154,9 +160,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7add7f39210b7d726e2a8efc0083e7bf06e8f2d15bdb4896b564dce4410fbf5d" +checksum = "1d8a57966e43bfe9a3277984a14c24ec617ad874e4c0e1d2a1b083a39cfbf22c" dependencies = [ "arrow-array", "arrow-buffer", @@ -169,9 +175,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81c16ec702d3898c2f5cfdc148443c6cd7dbe5bac28399859eb0a3d38f072827" +checksum = "16f4a9468c882dc66862cef4e1fd8423d47e67972377d85d80e022786427768c" dependencies = [ "ahash", "arrow-buffer", @@ -185,9 +191,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae6970bab043c4fbc10aee1660ceb5b306d0c42c8cc5f6ae564efcd9759b663" +checksum = "c975484888fc95ec4a632cdc98be39c085b1bb518531b0c80c5d462063e5daa1" dependencies = [ "bytes", "half", @@ -196,9 +202,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c7ef44f26ef4f8edc392a048324ed5d757ad09135eff6d5509e6450d39e0398" +checksum = "da26719e76b81d8bc3faad1d4dbdc1bcc10d14704e63dc17fc9f3e7e1e567c8e" dependencies = [ "arrow-array", "arrow-buffer", @@ -216,9 +222,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f843490bd258c5182b66e888161bb6f198f49f3792f7c7f98198b924ae0f564" +checksum = "c13c36dc5ddf8c128df19bab27898eea64bf9da2b555ec1cd17a8ff57fba9ec2" dependencies = [ "arrow-array", "arrow-buffer", @@ -235,9 +241,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a769666ffac256dd301006faca1ca553d0ae7cffcf4cd07095f73f95eb226514" +checksum = "dd9d6f18c65ef7a2573ab498c374d8ae364b4a4edf67105357491c031f716ca5" dependencies = [ "arrow-buffer", "arrow-schema", @@ -247,9 +253,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbf9c3fb57390a1af0b7bb3b5558c1ee1f63905f3eccf49ae7676a8d1e6e5a72" +checksum = "e786e1cdd952205d9a8afc69397b317cfbb6e0095e445c69cda7e8da5c1eeb0f" dependencies = [ "arrow-array", "arrow-buffer", @@ -261,9 +267,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "654e7f3724176b66ddfacba31af397c48e106fbe4d281c8144e7d237df5acfd7" +checksum = "fb22284c5a2a01d73cebfd88a33511a3234ab45d66086b2ca2d1228c3498e445" dependencies = [ "arrow-array", "arrow-buffer", @@ -281,9 +287,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8008370e624e8e3c68174faaf793540287106cfda8ad1da862fdc53d8e096b4" +checksum = "42745f86b1ab99ef96d1c0bcf49180848a64fe2c7a7a0d945bc64fa2b21ba9bc" dependencies = [ "arrow-array", "arrow-buffer", @@ -296,9 +302,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca5e3a6b7fda8d9fe03f3b18a2d946354ea7f3c8e4076dbdb502ad50d9d44824" +checksum = "4cd09a518c602a55bd406bcc291a967b284cfa7a63edfbf8b897ea4748aad23c" dependencies = [ "ahash", "arrow-array", @@ -306,20 +312,19 @@ dependencies = [ "arrow-data", "arrow-schema", "half", - "hashbrown", ] [[package]] name = "arrow-schema" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dab1c12b40e29d9f3b699e0203c2a73ba558444c05e388a4377208f8f9c97eee" +checksum = "9e972cd1ff4a4ccd22f86d3e53e835c2ed92e0eea6a3e8eadb72b4f1ac802cf8" [[package]] name = "arrow-select" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e80159088ffe8c48965cb9b1a7c968b2729f29f37363df7eca177fc3281fe7c3" +checksum = "600bae05d43483d216fb3494f8c32fdbefd8aa4e1de237e790dbb3d9f44690a3" dependencies = [ "ahash", "arrow-array", @@ -331,9 +336,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fd04a6ea7de183648edbcb7a6dd925bbd04c210895f6384c780e27a9b54afcd" +checksum = "f0dc1985b67cb45f6606a248ac2b4a288849f196bab8c657ea5589f47cdd55e6" dependencies = [ "arrow-array", "arrow-buffer", @@ -346,15 +351,37 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "async-trait" -version = "0.1.81" +version = "0.1.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] @@ -388,7 +415,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", ] @@ -455,9 +482,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cast" @@ -467,12 +494,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.6" +version = "1.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +checksum = "57b6a275aa2903740dc87da01c62040406b8812552e97129a63ea8850a17c6e6" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -491,7 +519,7 @@ dependencies = [ "iana-time-zone", "num-traits", "serde", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -523,9 +551,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.9" +version = "4.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462" +checksum = "ed6719fffa43d0d87e5fd8caeab59be1554fb028cd30edc88fc4369b17971019" dependencies = [ "clap_builder", "clap_derive", @@ -533,9 +561,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.9" +version = "4.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942" +checksum = "216aec2b177652e3846684cbfe25c9964d18ec45234f0f5da5157b207ed1aab6" dependencies = [ "anstream", "anstyle", @@ -545,36 +573,36 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.8" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] name = "clap_lex" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "cmake" -version = "0.1.50" +version = "0.1.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a" dependencies = [ "cc", ] [[package]] name = "colorchoice" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" [[package]] name = "const-random" @@ -608,9 +636,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "crc32fast" @@ -632,6 +660,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -644,6 +673,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -741,6 +771,29 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -759,9 +812,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" [[package]] name = "flatbuffers" @@ -775,13 +828,13 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.30" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" dependencies = [ "crc32fast", "libz-ng-sys", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -855,7 +908,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] @@ -917,9 +970,9 @@ checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "h2" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" dependencies = [ "atomic-waker", "bytes", @@ -969,6 +1022,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "home" version = "0.5.9" @@ -1064,9 +1123,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", "futures-channel", @@ -1093,7 +1152,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -1117,9 +1176,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown", @@ -1139,20 +1198,20 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "is-terminal" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" dependencies = [ - "hermit-abi", + "hermit-abi 0.4.0", "libc", "windows-sys 0.52.0", ] [[package]] name = "is_terminal_polyfill" -version = "1.70.0" +version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "itertools" @@ -1180,18 +1239,18 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "jobserver" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] [[package]] name = "js-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" dependencies = [ "wasm-bindgen", ] @@ -1268,9 +1327,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libm" @@ -1280,9 +1339,9 @@ checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "libz-ng-sys" -version = "1.1.15" +version = "1.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6409efc61b12687963e602df8ecf70e8ddacf95bc6576bcf16e3ac6328083c5" +checksum = "4436751a01da56f1277f323c80d584ffad94a3d14aecd959dd0dff75aa73a438" dependencies = [ "cmake", "libc", @@ -1359,15 +1418,25 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1453,21 +1522,11 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "object" -version = "0.36.1" +version = "0.36.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce" +checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" dependencies = [ "memchr", ] @@ -1531,17 +1590,21 @@ dependencies = [ [[package]] name = "osm-pbf-parquet" -version = "0.6.0" +version = "0.7.0" dependencies = [ + "anyhow", "arrow", "bytes", "clap", "criterion", + "env_logger", "flate2", + "futures", + "futures-util", + "log", "object_store", "osmpbf", "parquet", - "rayon", "sysinfo", "tokio", "tokio-util", @@ -1551,15 +1614,18 @@ dependencies = [ [[package]] name = "osmpbf" version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a32d502e10d65b4ef06819cd12a931570e590aa70472aab5eee2cfaa5841d19" +source = "git+https://github.com/brad-richardson/osmpbf.git?branch=async-blob-reader#210771f725979c6fb1efe3be7c0d77119991ac28" dependencies = [ + "async-stream", "byteorder", "flate2", "memmap2", + "object_store", "protobuf", "protobuf-codegen", "rayon", + "tokio", + "tokio-stream", ] [[package]] @@ -1582,14 +1648,14 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] name = "parquet" -version = "52.1.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f22ba0d95db56dde8685e3fadcb915cdaadda31ab8abbe3ff7f0ad1ef333267" +checksum = "e977b9066b4d3b03555c22bdc442f3fadebd96a39111249113087d0edb2691cd" dependencies = [ "ahash", "arrow-array", @@ -1649,7 +1715,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] @@ -1700,9 +1766,12 @@ dependencies = [ [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] [[package]] name = "proc-macro2" @@ -1715,9 +1784,9 @@ dependencies = [ [[package]] name = "protobuf" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df67496db1a89596beaced1579212e9b7c53c22dca1d9745de00ead76573d514" +checksum = "0bcc343da15609eaecd65f8aa76df8dc4209d325131d8219358c0aaaebab0bf6" dependencies = [ "once_cell", "protobuf-support", @@ -1726,9 +1795,9 @@ dependencies = [ [[package]] name = "protobuf-codegen" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab09155fad2d39333d3796f67845d43e29b266eea74f7bc93f153f707f126dc" +checksum = "c4d0cde5642ea4df842b13eb9f59ea6fafa26dcb43e3e1ee49120e9757556189" dependencies = [ "anyhow", "once_cell", @@ -1741,9 +1810,9 @@ dependencies = [ [[package]] name = "protobuf-parse" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a16027030d4ec33e423385f73bb559821827e9ec18c50e7874e4d6de5a4e96f" +checksum = "1b0e9b447d099ae2c4993c0cbb03c7a9d6c937b17f2d56cfc0b1550e6fcfdb76" dependencies = [ "anyhow", "indexmap", @@ -1757,18 +1826,18 @@ dependencies = [ [[package]] name = "protobuf-support" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e2d30ab1878b2e72d1e2fc23ff5517799c9929e2cf81a8516f9f4dcf2b9cf3" +checksum = "f0766e3675a627c327e4b3964582594b0e8741305d628a98a5de75a1d15f99b9" dependencies = [ "thiserror", ] [[package]] name = "quick-xml" -version = "0.36.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4091e032efecb09d7b1f711f487b85ab925632a842627e3200fb088382cde32c" +checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" dependencies = [ "memchr", "serde", @@ -1776,9 +1845,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.2" +version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +checksum = "a2d2fb862b7ba45e615c1429def928f2e15f815bdf933b27a2d3824e224c1f46" dependencies = [ "bytes", "pin-project-lite", @@ -1786,6 +1855,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", + "socket2", "thiserror", "tokio", "tracing", @@ -1793,9 +1863,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.3" +version = "0.11.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +checksum = "ea0a9b3a42929fad8a7c3de7f86ce0814cfa893328157672680e9fb1145549c5" dependencies = [ "bytes", "rand", @@ -1810,9 +1880,9 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.2" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" dependencies = [ "libc", "once_cell", @@ -1823,9 +1893,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -1891,9 +1961,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -1920,9 +1990,9 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "reqwest" -version = "0.12.5" +version = "0.12.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" +checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64", "bytes", @@ -1960,7 +2030,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "winreg", + "windows-registry", ] [[package]] @@ -1986,24 +2056,24 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc-hash" -version = "1.1.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" [[package]] name = "rustc_version" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ "semver", ] [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "a85d50532239da68e9addb745ba38ff4612a242c1c7ceea689c4bc7c2f43c36f" dependencies = [ "bitflags 2.6.0", "errno", @@ -2014,9 +2084,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.11" +version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4828ea528154ae444e5a642dbb7d5623354030dc9822b83fd9bb79683c7399d0" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ "once_cell", "ring", @@ -2028,9 +2098,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.1" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" dependencies = [ "openssl-probe", "rustls-pemfile", @@ -2041,9 +2111,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" dependencies = [ "base64", "rustls-pki-types", @@ -2051,15 +2121,15 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" -version = "0.102.5" +version = "0.102.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" +checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" dependencies = [ "ring", "rustls-pki-types", @@ -2133,31 +2203,32 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.204" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" +checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.204" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" +checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -2174,6 +2245,12 @@ dependencies = [ "serde", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "slab" version = "0.4.9" @@ -2264,9 +2341,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.72" +version = "2.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" +checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" dependencies = [ "proc-macro2", "quote", @@ -2278,32 +2355,35 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "sysinfo" -version = "0.30.13" +version = "0.31.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" +checksum = "355dbe4f8799b304b05e1b0f05fc59b2a18d36645cf169607da45bde2f69a1be" dependencies = [ - "cfg-if", "core-foundation-sys", "libc", + "memchr", "ntapi", - "once_cell", "rayon", "windows", ] [[package]] name = "tempfile" -version = "3.10.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" dependencies = [ "cfg-if", "fastrand", + "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2323,7 +2403,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] @@ -2373,30 +2453,29 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.1" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "pin-project-lite", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] @@ -2410,6 +2489,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -2440,15 +2530,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -2469,7 +2559,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] @@ -2549,9 +2639,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "walkdir" @@ -2580,34 +2670,35 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" dependencies = [ "cfg-if", + "once_cell", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.42" +version = "0.4.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" dependencies = [ "cfg-if", "js-sys", @@ -2617,9 +2708,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2627,22 +2718,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" [[package]] name = "wasm-streams" @@ -2659,9 +2750,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" dependencies = [ "js-sys", "wasm-bindgen", @@ -2697,11 +2788,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2712,12 +2803,12 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows" -version = "0.52.0" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" dependencies = [ - "windows-core", - "windows-targets 0.52.6", + "windows-core 0.57.0", + "windows-targets", ] [[package]] @@ -2726,16 +2817,80 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] -name = "windows-sys" -version = "0.48.0" +name = "windows-core" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" dependencies = [ - "windows-targets 0.48.5", + "windows-implement", + "windows-interface", + "windows-result 0.1.2", + "windows-targets", +] + +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result 0.2.0", + "windows-strings", + "windows-targets", +] + +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result 0.2.0", + "windows-targets", ] [[package]] @@ -2744,22 +2899,16 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] -name = "windows-targets" -version = "0.48.5" +name = "windows-sys" +version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows-targets", ] [[package]] @@ -2768,46 +2917,28 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.6", - "windows_aarch64_msvc 0.52.6", - "windows_i686_gnu 0.52.6", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.6", - "windows_x86_64_gnu 0.52.6", - "windows_x86_64_gnullvm 0.52.6", - "windows_x86_64_msvc 0.52.6", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2820,70 +2951,37 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "winreg" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" -dependencies = [ - "cfg-if", - "windows-sys 0.48.0", -] - [[package]] name = "zerocopy" version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ + "byteorder", "zerocopy-derive", ] @@ -2895,7 +2993,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.77", ] [[package]] @@ -2915,18 +3013,18 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.2.0" +version = "7.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa556e971e7b568dc775c136fc9de8c779b1c2fc3a63defaafadffdbd3181afa" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.11+zstd.1.5.6" +version = "2.0.12+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75652c55c0b6f3e6f12eb786fe1bc960396bf05a1eb3bf1f3691c3610ac2e6d4" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" dependencies = [ "cc", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index cc8bafa..318e3d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,24 +1,28 @@ [package] name = "osm-pbf-parquet" -version = "0.6.0" +version = "0.7.0" edition = "2021" [dependencies] -arrow = "52.1.0" -bytes = "1.6.1" -clap = { version = "4.5.9", features = ["derive"] } -flate2 = { version = "1.0.30", features = ["zlib-ng"], default-features = false } +anyhow = "1.*" +arrow = "52.*" +bytes = "1.*" +clap = { version = "4.*", features = ["derive"] } +env_logger = "0.11.*" +flate2 = { version = "1.*", features = ["zlib-ng"], default-features = false } +futures = "0.3.*" +futures-util = "0.3.*" +log = "0.4.*" object_store = {version = "0.10.2", features = ["aws"] } -osmpbf = { version = "0.3.4", features = ["zlib-ng"], default-features = false } -parquet = { version = "52.1.0", features = ["async"] } -rayon = "1.10.0" -sysinfo = "0.30.13" -tokio = { version = "1.38.1", features = ["rt", "rt-multi-thread", "io-util"] } -tokio-util = {version = "0.7.11", features = ["io-util"] } -url = "2.5.2" +osmpbf = { version = "0.3.*", features = ["zlib-ng", "async"], default-features = false, git = "https://github.com/brad-richardson/osmpbf.git", branch = "async-blob-reader" } +parquet = { version = "52.*", features = ["async"] } +sysinfo = "0.31.*" +tokio = { version = "1.*", features = ["rt", "rt-multi-thread", "io-util"] } +tokio-util = {version = "0.7.*", features = ["io-util"] } +url = "2.*" [dev-dependencies] -criterion = "0.5.1" +criterion = { version = "0.5.*", features = ["async_futures", "async_tokio"] } [lib] name = "osm_pbf_parquet" diff --git a/benches/benchmark.rs b/benches/benchmark.rs index 78452f0..3b37034 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -3,16 +3,22 @@ use osm_pbf_parquet::driver; use osm_pbf_parquet::util::Args; use std::fs; +async fn bench() { + let args = Args::new( + "./test/test.osm.pbf".to_string(), + "./test/bench-out/".to_string(), + 0, + ); + driver(args).await.unwrap(); +} + pub fn criterion_benchmark(c: &mut Criterion) { c.bench_function("benchmark", |b| { - b.iter(|| { - let args = Args::new( - "./test/test.osm.pbf".to_string(), - "./test/bench-out/".to_string(), - 0, - ); - let _ = driver(args); - }) + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + b.to_async(rt).iter(bench) }); let _ = fs::remove_dir_all("./test/bench-out/"); } diff --git a/src/lib.rs b/src/lib.rs index 69962d7..d16e9f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,18 @@ use std::collections::HashMap; -use std::fs::File; use std::sync::{Arc, Mutex}; +use futures::StreamExt; +use futures_util::pin_mut; +use log::info; use object_store::aws::AmazonS3Builder; use object_store::buffered::BufReader; +use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::ObjectStore; -use osmpbf::{BlobDecode, BlobReader, Element, PrimitiveBlock}; -use rayon::iter::ParallelBridge; -use rayon::iter::ParallelIterator; -use tokio_util::io::SyncIoBridge; +use osmpbf::{AsyncBlobReader, BlobDecode, Element, PrimitiveBlock}; +use tokio::runtime::Handle; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; use url::Url; pub mod osm_arrow; @@ -17,13 +20,15 @@ pub mod sink; pub mod util; use crate::osm_arrow::OSMType; use crate::sink::ElementSink; -use crate::util::{Args, ARGS, DEFAULT_BUF_READER_SIZE}; +use crate::util::{ + default_worker_thread_count, Args, ARGS, DEFAULT_BUF_READER_SIZE, ELEMENT_COUNTER, +}; fn get_sink_from_pool( osm_type: OSMType, sinkpools: Arc>>>>, filenums: Arc>>>, -) -> Result { +) -> Result { { let mut pool = sinkpools[&osm_type].lock().unwrap(); if let Some(sink) = pool.pop() { @@ -42,91 +47,138 @@ fn add_sink_to_pool( pool.push(sink); } -fn process_block( +async fn process_block( block: PrimitiveBlock, sinkpools: Arc>>>>, filenums: Arc>>>, -) { - let mut node_sink = - get_sink_from_pool(OSMType::Node, sinkpools.clone(), filenums.clone()).unwrap(); - let mut way_sink = - get_sink_from_pool(OSMType::Way, sinkpools.clone(), filenums.clone()).unwrap(); - let mut rel_sink = - get_sink_from_pool(OSMType::Relation, sinkpools.clone(), filenums.clone()).unwrap(); +) -> Result { + let mut node_sink = get_sink_from_pool(OSMType::Node, sinkpools.clone(), filenums.clone())?; + let mut way_sink = get_sink_from_pool(OSMType::Way, sinkpools.clone(), filenums.clone())?; + let mut rel_sink = get_sink_from_pool(OSMType::Relation, sinkpools.clone(), filenums.clone())?; + + let mut block_counter = 0u64; for element in block.elements() { + block_counter += 1; match element { Element::Node(ref node) => { - let _ = node_sink.add_node(node); + node_sink.add_node(node); } Element::DenseNode(ref node) => { - let _ = node_sink.add_dense_node(node); + node_sink.add_dense_node(node); } Element::Way(ref way) => { - let _ = way_sink.add_way(way); + way_sink.add_way(way); } Element::Relation(ref rel) => { - let _ = rel_sink.add_relation(rel); + rel_sink.add_relation(rel); } } } + ELEMENT_COUNTER.fetch_add(block_counter, std::sync::atomic::Ordering::Relaxed); + + node_sink.increment_and_cycle().await?; + way_sink.increment_and_cycle().await?; + rel_sink.increment_and_cycle().await?; add_sink_to_pool(node_sink, sinkpools.clone()); add_sink_to_pool(way_sink, sinkpools.clone()); add_sink_to_pool(rel_sink, sinkpools.clone()); + + Ok(block_counter) } -async fn create_s3_async_reader(url: Url) -> BufReader { - let s3_store = AmazonS3Builder::from_env() - .with_url(url.clone()) - .build() - .unwrap(); - let path = Path::parse(url.path()).unwrap(); - let meta = s3_store.head(&path).await.unwrap(); - BufReader::with_capacity(Arc::new(s3_store), &meta, DEFAULT_BUF_READER_SIZE) +async fn create_s3_buf_reader(url: Url) -> Result { + let s3_store = AmazonS3Builder::from_env().with_url(url.clone()).build()?; + let path = Path::parse(url.path())?; + let meta = s3_store.head(&path).await?; + Ok(BufReader::with_capacity( + Arc::new(s3_store), + &meta, + DEFAULT_BUF_READER_SIZE, + )) } -fn s3_read( - url: Url, +async fn create_local_buf_reader(path: &str) -> Result { + let local_store: LocalFileSystem = LocalFileSystem::new(); + let path = std::path::Path::new(path); + let filesystem_path = object_store::path::Path::from_filesystem_path(path)?; + let meta = local_store.head(&filesystem_path).await?; + Ok(BufReader::with_capacity( + Arc::new(local_store), + &meta, + DEFAULT_BUF_READER_SIZE, + )) +} + +async fn process_blobs( + buf_reader: BufReader, sinkpools: Arc>>>>, - filenums: Arc>>>, -) -> Result<(), osmpbf::Error> { - // Create sync reader because underlying BlobReader is not async - // Backed by multi-threaded runtime to allow fetch concurrency - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - let s3_async_reader = rt.block_on(create_s3_async_reader(url)); - let s3_sync_reader = SyncIoBridge::new_with_handle(s3_async_reader, rt.handle().clone()); - let sync_buf_reader = - std::io::BufReader::with_capacity(DEFAULT_BUF_READER_SIZE, s3_sync_reader); - let blob_reader = BlobReader::new(sync_buf_reader); - - // Using rayon parallelize bridge here because SyncIoBridge can't run on tokio-enabled threads - blob_reader.par_bridge().for_each(|blob| { - if let BlobDecode::OsmData(block) = blob.unwrap().decode().unwrap() { - process_block(block, sinkpools.clone(), filenums.clone()); - } - }); +) -> Result<(), anyhow::Error> { + let mut blob_reader = AsyncBlobReader::new(buf_reader); + + let stream = blob_reader.stream(); + pin_mut!(stream); + + let filenums: Arc>>> = Arc::new(HashMap::from([ + (OSMType::Node, Arc::new(Mutex::new(0))), + (OSMType::Way, Arc::new(Mutex::new(0))), + (OSMType::Relation, Arc::new(Mutex::new(0))), + ])); + + // Avoid too many tasks in memory + let active_tasks = 4 * ARGS + .get() + .unwrap() + .worker_threads + .unwrap_or(default_worker_thread_count()); + let semaphore = Arc::new(Semaphore::new(active_tasks)); + + let mut join_set = JoinSet::new(); + while let Some(Ok(blob)) = stream.next().await { + let sinkpools = sinkpools.clone(); + let filenums = filenums.clone(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_set.spawn(async move { + match blob.decode() { + Ok(BlobDecode::OsmHeader(_)) => (), + Ok(BlobDecode::OsmData(block)) => { + process_block(block, sinkpools.clone(), filenums.clone()) + .await + .unwrap(); + } + Ok(BlobDecode::Unknown(unknown)) => { + panic!("Unknown blob: {}", unknown); + } + Err(error) => { + panic!("Error decoding blob: {}", error); + } + } + drop(permit); + }); + } + while let Some(result) = join_set.join_next().await { + result?; + } Ok(()) } -fn local_read( - path: &str, - sinkpools: Arc>>>>, - filenums: Arc>>>, -) -> Result<(), osmpbf::Error> { - let file = File::open(path).unwrap(); - let reader = std::io::BufReader::with_capacity(DEFAULT_BUF_READER_SIZE, file); - let blob_reader = BlobReader::new(reader); - blob_reader.par_bridge().for_each(|blob| { - if let BlobDecode::OsmData(block) = blob.unwrap().decode().unwrap() { - process_block(block, sinkpools.clone(), filenums.clone()); +async fn progress_log() { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); + interval.tick().await; // First tick is immediate + + loop { + interval.tick().await; + let processed = ELEMENT_COUNTER.load(std::sync::atomic::Ordering::Relaxed); + let mut processed_str = format!("{}", processed); + if processed >= 1_000_000_000 { + processed_str = format!("{:.2}B", (processed as f64) / 1_000_000_000.0); + } else if processed >= 1_000_000 { + processed_str = format!("{:.2}M", (processed as f64) / 1_000_000.0); } - }); - Ok(()) + info!("Processed {} elements", processed_str); + } } -pub fn driver(args: Args) -> Result<(), std::io::Error> { +pub async fn driver(args: Args) -> Result<(), anyhow::Error> { // TODO - validation of args // Store value for reading across threads (write-once) let _ = ARGS.set(args.clone()); @@ -137,26 +189,35 @@ pub fn driver(args: Args) -> Result<(), std::io::Error> { (OSMType::Relation, Arc::new(Mutex::new(vec![]))), ])); - let filenums: Arc>>> = Arc::new(HashMap::from([ - (OSMType::Node, Arc::new(Mutex::new(0))), - (OSMType::Way, Arc::new(Mutex::new(0))), - (OSMType::Relation, Arc::new(Mutex::new(0))), - ])); + // Verify we're running in a tokio runtime and start separate logging thread + Handle::current().spawn(async { progress_log().await }); let full_path = args.input; - if let Ok(url) = Url::parse(&full_path) { - let _ = s3_read(url, sinkpools.clone(), filenums.clone()); + let buf_reader = if let Ok(url) = Url::parse(&full_path) { + create_s3_buf_reader(url).await? } else { - let _ = local_read(&full_path, sinkpools.clone(), filenums.clone()); - } + create_local_buf_reader(&full_path).await? + }; + process_blobs(buf_reader, sinkpools.clone()).await?; { + let handle = Handle::current(); + let mut join_set = JoinSet::new(); for sinkpool in sinkpools.values() { let mut pool = sinkpool.lock().unwrap(); for mut sink in pool.drain(..) { - sink.finish(); + join_set.spawn_on( + async move { + // TODO - handle this + sink.finish().await.unwrap(); + }, + &handle, + ); } } + while let Some(result) = join_set.join_next().await { + result?; + } } Ok(()) } diff --git a/src/main.rs b/src/main.rs index 4b4cb47..c31c94d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,23 @@ use clap::Parser; +use env_logger::{Builder, Env}; use osm_pbf_parquet::driver; -use osm_pbf_parquet::util::Args; +use osm_pbf_parquet::util::{default_worker_thread_count, Args}; fn main() { + Builder::from_env(Env::default().default_filter_or("info")).init(); + let args = Args::parse(); println!("{:?}", args); - let _ = driver(args); + + let worker_threads = args.worker_threads.unwrap_or(default_worker_thread_count()); + + tokio::runtime::Builder::new_multi_thread() + .worker_threads(worker_threads) + .enable_all() + .build() + .unwrap() + .block_on(async { + let _ = driver(args).await; + }); } diff --git a/src/sink.rs b/src/sink.rs index 76c31ea..8c2784b 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -9,7 +9,6 @@ use osmpbf::{DenseNode, Node, RelMemberType, Relation, Way}; use parquet::arrow::async_writer::AsyncArrowWriter; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; -use tokio::runtime::Runtime; use url::Url; use crate::osm_arrow::osm_arrow_schema; @@ -24,23 +23,22 @@ pub struct ElementSink { // Arrow wrappers osm_builder: Box, - writer: Option>, + writer: Option>, // Wrapped so we can replace this on the fly // State tracking for batching estimated_record_batch_bytes: usize, estimated_file_bytes: usize, target_record_batch_bytes: usize, target_file_bytes: usize, - tokio_runtime: Arc, } impl ElementSink { - pub fn new(filenum: Arc>, osm_type: OSMType) -> Result { + pub fn new(filenum: Arc>, osm_type: OSMType) -> Result { let args = ARGS.get().unwrap(); let full_path = Self::create_full_path(&args.output, &osm_type, &filenum, args.compression); - let buf_writer = Self::create_buf_writer(&full_path); - let writer = Self::create_writer(buf_writer, args.compression, args.max_row_group_count); + let buf_writer = Self::create_buf_writer(&full_path)?; + let writer = Self::create_writer(buf_writer, args.compression, args.max_row_group_count)?; let target_record_batch_bytes = args .record_batch_target_mb @@ -58,40 +56,27 @@ impl ElementSink { estimated_file_bytes: 0usize, target_record_batch_bytes, target_file_bytes: args.file_target_mb * 1_000_000usize, - - // Underlying object store writer (cloud/s3) needs to run in a tokio runtime context - tokio_runtime: Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(), - ), }) } - pub fn finish(&mut self) { - self.finish_batch(); - let _ = self - .tokio_runtime - .block_on(self.writer.take().unwrap().close()); + pub async fn finish(&mut self) -> Result<(), anyhow::Error> { + self.finish_batch().await?; + self.writer.take().unwrap().close().await?; + Ok(()) } - fn finish_batch(&mut self) { + async fn finish_batch(&mut self) -> Result<(), anyhow::Error> { if self.estimated_record_batch_bytes == 0 { // Nothing to write - return; + return Ok(()); } - let batch = self.osm_builder.finish().unwrap(); - let _ = self - .tokio_runtime - .block_on(self.writer.as_mut().unwrap().write(&batch)); + let batch = self.osm_builder.finish()?; + self.writer.as_mut().unwrap().write(&batch).await?; // Reset writer to new path if needed self.estimated_file_bytes += self.estimated_record_batch_bytes; if self.estimated_file_bytes >= self.target_file_bytes { - let _ = self - .tokio_runtime - .block_on(self.writer.take().unwrap().close()); + self.writer.take().unwrap().close().await?; // Create new writer and output let args = ARGS.get().unwrap(); @@ -101,41 +86,39 @@ impl ElementSink { &self.filenum, args.compression, ); - let buf_writer = Self::create_buf_writer(&full_path); + let buf_writer = Self::create_buf_writer(&full_path)?; self.writer = Some(Self::create_writer( buf_writer, args.compression, args.max_row_group_count, - )); + )?); self.estimated_file_bytes = 0; } self.estimated_record_batch_bytes = 0; + Ok(()) } - fn increment_and_cycle(&mut self) -> Result<(), std::io::Error> { + pub async fn increment_and_cycle(&mut self) -> Result<(), anyhow::Error> { if self.estimated_record_batch_bytes >= self.target_record_batch_bytes { - self.finish_batch(); + self.finish_batch().await?; } Ok(()) } - fn create_buf_writer(full_path: &str) -> BufWriter { + fn create_buf_writer(full_path: &str) -> Result { // TODO - better validation of URL/paths here and error handling if let Ok(url) = Url::parse(full_path) { - let s3_store = AmazonS3Builder::from_env() - .with_url(url.clone()) - .build() - .unwrap(); - let path = Path::parse(url.path()).unwrap(); + let s3_store = AmazonS3Builder::from_env().with_url(url.clone()).build()?; + let path = Path::parse(url.path())?; - BufWriter::new(Arc::new(s3_store), path) + Ok(BufWriter::new(Arc::new(s3_store), path)) } else { let object_store = LocalFileSystem::new(); - let absolute_path = absolute(full_path).unwrap(); - let store_path = Path::from_absolute_path(absolute_path).unwrap(); + let absolute_path = absolute(full_path)?; + let store_path = Path::from_absolute_path(absolute_path)?; - BufWriter::new(Arc::new(object_store), store_path) + Ok(BufWriter::new(Arc::new(object_store), store_path)) } } @@ -143,21 +126,21 @@ impl ElementSink { buffer: BufWriter, compression: u8, max_row_group_rows: Option, - ) -> AsyncArrowWriter { + ) -> Result, anyhow::Error> { let mut props_builder = WriterProperties::builder(); if compression == 0 { props_builder = props_builder.set_compression(Compression::UNCOMPRESSED); - } else { - props_builder = props_builder.set_compression(Compression::ZSTD( - ZstdLevel::try_new(compression as i32).unwrap(), - )); + } else if compression > 0 && compression <= 22 { + props_builder = props_builder + .set_compression(Compression::ZSTD(ZstdLevel::try_new(compression as i32)?)); } if let Some(max_rows) = max_row_group_rows { props_builder = props_builder.set_max_row_group_size(max_rows); } let props = props_builder.build(); - AsyncArrowWriter::try_new(buffer, Arc::new(osm_arrow_schema()), Some(props)).unwrap() + let writer = AsyncArrowWriter::try_new(buffer, Arc::new(osm_arrow_schema()), Some(props))?; + Ok(writer) } fn create_full_path( @@ -186,7 +169,7 @@ impl ElementSink { path } - pub fn add_node(&mut self, node: &Node) -> Result<(), std::io::Error> { + pub fn add_node(&mut self, node: &Node) { let info = node.info(); let user = info .user() @@ -211,11 +194,9 @@ impl ElementSink { Some(info.visible()), ); self.estimated_record_batch_bytes += est_size_bytes; - - self.increment_and_cycle() } - pub fn add_dense_node(&mut self, node: &DenseNode) -> Result<(), std::io::Error> { + pub fn add_dense_node(&mut self, node: &DenseNode) { let info = node.info(); let mut user: Option = None; if let Some(info) = info { @@ -239,11 +220,9 @@ impl ElementSink { info.map(|info| info.visible()), ); self.estimated_record_batch_bytes += est_size_bytes; - - self.increment_and_cycle() } - pub fn add_way(&mut self, way: &Way) -> Result<(), std::io::Error> { + pub fn add_way(&mut self, way: &Way) { let info = way.info(); let user = info .user() @@ -268,11 +247,9 @@ impl ElementSink { Some(info.visible()), ); self.estimated_record_batch_bytes += est_size_bytes; - - self.increment_and_cycle() } - pub fn add_relation(&mut self, relation: &Relation) -> Result<(), std::io::Error> { + pub fn add_relation(&mut self, relation: &Relation) { let info = relation.info(); let user = info .user() @@ -312,7 +289,5 @@ impl ElementSink { Some(info.visible()), ); self.estimated_record_batch_bytes += est_size_bytes; - - self.increment_and_cycle() } } diff --git a/src/util.rs b/src/util.rs index 7ad8a79..c1335f3 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::AtomicU64; use std::sync::OnceLock; use sysinfo::System; @@ -6,6 +7,9 @@ use clap::Parser; // Write once, safe read across threads pub static ARGS: OnceLock = OnceLock::new(); +// Element counter to track read progress +pub static ELEMENT_COUNTER: AtomicU64 = AtomicU64::new(0); + // Max recommended size of an uncompressed single blob is 16MB, assumes compression ratio of 2:1 or better pub const DEFAULT_BUF_READER_SIZE: usize = 1024 * 1024 * 8; @@ -25,6 +29,10 @@ pub struct Args { #[arg(long, default_value_t = 3)] pub compression: u8, + /// Worker thread count, default CPU count + #[arg(long)] + pub worker_threads: Option, + /// Override target record batch size, balance this with available memory /// default is total memory (MB) / CPU count / 8 #[arg(long)] @@ -45,6 +53,7 @@ impl Args { input, output, compression, + worker_threads: None, record_batch_target_mb: None, max_row_group_count: None, file_target_mb: 500usize, @@ -57,3 +66,8 @@ pub fn default_record_batch_size_mb() -> usize { // Estimate per thread available memory, leaving overhead for copies and system processes return ((system.total_memory() as usize / 1_000_000usize) / system.cpus().len()) / 8usize; } + +pub fn default_worker_thread_count() -> usize { + let system = System::new_all(); + return system.cpus().len(); +}