From 2c70971ecc66960dcf4bfb2fc6618c7f9f60980f Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 21 May 2024 16:36:01 -0500 Subject: [PATCH 1/2] Upgrade `arrow` to 16.1 (#15787) This PR upgrades arrow to 16.1 Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Bradley Dice (https://github.com/bdice) - Ray Douglass (https://github.com/raydouglass) - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/15787 --- .../all_cuda-118_arch-x86_64.yaml | 10 ++++----- .../all_cuda-122_arch-x86_64.yaml | 10 ++++----- conda/recipes/cudf/meta.yaml | 4 ++-- conda/recipes/libcudf/conda_build_config.yaml | 2 +- conda/recipes/libcudf/meta.yaml | 9 -------- cpp/cmake/thirdparty/get_arrow.cmake | 2 +- dependencies.yaml | 22 +++++++++---------- python/cudf/cudf/tests/test_orc.py | 3 +++ python/cudf/pyproject.toml | 4 ++-- python/cudf_kafka/pyproject.toml | 2 +- 10 files changed, 31 insertions(+), 37 deletions(-) diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 48699b81eed..804b09bab59 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -36,15 +36,15 @@ dependencies: - hypothesis - identify>=2.5.20 - ipython -- libarrow-acero==16.0.0.* -- libarrow-dataset==16.0.0.* -- libarrow==16.0.0.* +- libarrow-acero==16.1.0.* +- libarrow-dataset==16.1.0.* +- libarrow==16.1.0.* - libcufile-dev=1.4.0.31 - libcufile=1.4.0.31 - libcurand-dev=10.3.0.86 - libcurand=10.3.0.86 - libkvikio==24.6.* -- libparquet==16.0.0.* +- libparquet==16.1.0.* - librdkafka>=1.9.0,<1.10.0a0 - librmm==24.6.* - make @@ -66,7 +66,7 @@ dependencies: - pip - pre-commit - ptxcompiler -- pyarrow==16.0.0.* +- pyarrow==16.1.0.* - pydata-sphinx-theme!=0.14.2 - pytest-benchmark - pytest-cases>=3.8.2 diff --git a/conda/environments/all_cuda-122_arch-x86_64.yaml b/conda/environments/all_cuda-122_arch-x86_64.yaml index d06a727f331..89eac98f652 100644 --- a/conda/environments/all_cuda-122_arch-x86_64.yaml +++ b/conda/environments/all_cuda-122_arch-x86_64.yaml @@ -37,13 +37,13 @@ dependencies: - hypothesis - identify>=2.5.20 - ipython -- libarrow-acero==16.0.0.* -- libarrow-dataset==16.0.0.* -- libarrow==16.0.0.* +- libarrow-acero==16.1.0.* +- libarrow-dataset==16.1.0.* +- libarrow==16.1.0.* - libcufile-dev - libcurand-dev - libkvikio==24.6.* -- libparquet==16.0.0.* +- libparquet==16.1.0.* - librdkafka>=1.9.0,<1.10.0a0 - librmm==24.6.* - make @@ -63,7 +63,7 @@ dependencies: - pandoc - pip - pre-commit -- pyarrow==16.0.0.* +- pyarrow==16.1.0.* - pydata-sphinx-theme!=0.14.2 - pynvjitlink - pytest-benchmark diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index 12e29c77a98..e7245e67659 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -64,7 +64,7 @@ requirements: - scikit-build-core >=0.7.0 - dlpack >=0.8,<1.0 - numpy 1.23 - - pyarrow ==16.0.0.* + - pyarrow ==16.1.0.* - libcudf ={{ version }} - rmm ={{ minor_version }} {% if cuda_major == "11" %} @@ -82,7 +82,7 @@ requirements: - cupy >=12.0.0 - numba >=0.57 - {{ pin_compatible('numpy', max_pin='x') }} - - {{ pin_compatible('pyarrow', max_pin='x') }} + - {{ pin_compatible('pyarrow', max_pin='x.x') }} - libcudf ={{ version }} - {{ pin_compatible('rmm', max_pin='x.x') }} - fsspec >=0.6.0 diff --git a/conda/recipes/libcudf/conda_build_config.yaml b/conda/recipes/libcudf/conda_build_config.yaml index 61ffcf3c3de..c01178bf732 100644 --- a/conda/recipes/libcudf/conda_build_config.yaml +++ b/conda/recipes/libcudf/conda_build_config.yaml @@ -20,7 +20,7 @@ cmake_version: - ">=3.26.4" libarrow_version: - - "==16.0.0" + - "==16.1.0" dlpack_version: - ">=0.8,<1.0" diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index ad2e840c71d..76115362b6c 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -86,9 +86,6 @@ outputs: {% else %} - {{ compiler('cuda') }} {% endif %} - # TODO: start taking libarrow's run exports again wwhen they're correct for 16.0 - # ref: https://github.com/conda-forge/arrow-cpp-feedstock/issues/1418 - - libarrow requirements: build: - cmake {{ cmake_version }} @@ -108,12 +105,6 @@ outputs: - librmm ={{ minor_version }} - libkvikio ={{ minor_version }} - dlpack {{ dlpack_version }} - # TODO: start taking libarrow's run exports again wwhen they're correct for 16.0 - # ref: https://github.com/conda-forge/arrow-cpp-feedstock/issues/1418 - - libarrow>=16.0.0,<16.1.0a0 - - libarrow-acero>=16.0.0,<16.1.0a0 - - libarrow-dataset>=16.0.0,<16.1.0a0 - - libparquet>=16.0.0,<16.1.0a0 test: commands: - test -f $PREFIX/lib/libcudf.so diff --git a/cpp/cmake/thirdparty/get_arrow.cmake b/cpp/cmake/thirdparty/get_arrow.cmake index 73e66cce608..0afdc526981 100644 --- a/cpp/cmake/thirdparty/get_arrow.cmake +++ b/cpp/cmake/thirdparty/get_arrow.cmake @@ -430,7 +430,7 @@ if(NOT DEFINED CUDF_VERSION_Arrow) set(CUDF_VERSION_Arrow # This version must be kept in sync with the libarrow version pinned for builds in # dependencies.yaml. - 16.0.0 + 16.1.0 CACHE STRING "The version of Arrow to find (or build)" ) endif() diff --git a/dependencies.yaml b/dependencies.yaml index f20c1591e73..0844d86fb66 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -289,7 +289,7 @@ dependencies: - cython>=3.0.3 # Hard pin the patch version used during the build. This must be kept # in sync with the version pinned in get_arrow.cmake. - - pyarrow==16.0.0.* + - pyarrow==16.1.0.* - output_types: conda packages: - scikit-build-core>=0.7.0 @@ -332,25 +332,25 @@ dependencies: packages: # Hard pin the Arrow patch version used during the build. This must # be kept in sync with the version pinned in get_arrow.cmake. - - libarrow-acero==16.0.0.* - - libarrow-dataset==16.0.0.* - - libarrow==16.0.0.* - - libparquet==16.0.0.* + - libarrow-acero==16.1.0.* + - libarrow-dataset==16.1.0.* + - libarrow==16.1.0.* + - libparquet==16.1.0.* libarrow_run: common: - output_types: conda packages: - # Allow runtime version to float up to minor version - - libarrow-acero>=16.0.0,<16.1.0a0 - - libarrow-dataset>=16.0.0,<16.1.0a0 - - libarrow>=16.0.0,<16.1.0a0 - - libparquet>=16.0.0,<16.1.0a0 + # Allow runtime version to float up to patch version + - libarrow-acero>=16.1.0,<16.2.0a0 + - libarrow-dataset>=16.1.0,<16.2.0a0 + - libarrow>=16.1.0,<16.2.0a0 + - libparquet>=16.1.0,<16.2.0a0 pyarrow_run: common: - output_types: [conda, requirements, pyproject] packages: # Allow runtime version to float up to patch version - - pyarrow>=16.0.0,<16.1.0a0 + - pyarrow>=16.1.0,<16.2.0a0 cuda_version: specific: - output_types: conda diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index a9bca7d8b98..83b7353ad89 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1833,6 +1833,9 @@ def test_orc_writer_negative_timestamp(negative_timestamp_df): ) +@pytest.mark.skip( + reason="Bug specific to rockylinux8: https://github.com/rapidsai/cudf/issues/15802", +) def test_orc_reader_apache_negative_timestamp(datadir): path = datadir / "TestOrcFile.apache_timestamp.orc" diff --git a/python/cudf/pyproject.toml b/python/cudf/pyproject.toml index 826362f0632..38aa6eeb24e 100644 --- a/python/cudf/pyproject.toml +++ b/python/cudf/pyproject.toml @@ -7,7 +7,7 @@ requires = [ "cython>=3.0.3", "ninja", "numpy==1.23.*", - "pyarrow==16.0.0.*", + "pyarrow==16.1.0.*", "rmm==24.6.*", "scikit-build-core[pyproject]>=0.7.0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. @@ -34,7 +34,7 @@ dependencies = [ "packaging", "pandas>=2.0,<2.2.3dev0", "ptxcompiler", - "pyarrow>=16.0.0,<16.1.0a0", + "pyarrow>=16.1.0,<16.2.0a0", "rich", "rmm==24.6.*", "typing_extensions>=4.0.0", diff --git a/python/cudf_kafka/pyproject.toml b/python/cudf_kafka/pyproject.toml index 787dd8a97d7..80e30e000c0 100644 --- a/python/cudf_kafka/pyproject.toml +++ b/python/cudf_kafka/pyproject.toml @@ -7,7 +7,7 @@ requires = [ "cython>=3.0.3", "ninja", "numpy==1.23.*", - "pyarrow==16.0.0.*", + "pyarrow==16.1.0.*", "scikit-build-core[pyproject]>=0.7.0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. From fea8fd611f38dc2610d97caded44b17905efbfa5 Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Tue, 21 May 2024 17:51:38 -0500 Subject: [PATCH 2/2] Add multithreaded parquet reader benchmarks. (#15585) Addresses: https://github.com/rapidsai/cudf/issues/12700 Adds multithreaded benchmarks for the parquet reader. Separate benchmarks for the chunked and non-chunked readers. In both cases, the primary cases are 2, 4 and 8 threads running reads at the same time. There is not a ton of variability in the other benchmarking axes. The primary use of this particular benchmark is to see inter-kernel performance (that is, how well do our many different kernel types coexist with each other). Whereas normal benchmarks tend to be more for intra-kernel performance checking. NVTX ranges are included to help visually group the bundles of reads together in nsight-sys. I also posted a new issue which would help along these lines: https://github.com/rapidsai/cudf/issues/15575 Update: I've tweaked some of the numbers to demonstrate some mild performance improvements as we go up in thread count, and included 1-thread as a case. Some examples: ``` ## parquet_multithreaded_read_decode_mixed | cardinality | total_data_size | num_threads | num_cols | bytes_per_second | |-------------|-----------------|-------------|----------|------------------| | 1000 | 536870912 | 1 | 4 | 28874731473 | | 1000 | 1073741824 | 1 | 4 | 30564139526 | | 1000 | 536870912 | 2 | 4 | 29399214255 | | 1000 | 1073741824 | 2 | 4 | 31486327920 | | 1000 | 536870912 | 4 | 4 | 27009769400 | | 1000 | 1073741824 | 4 | 4 | 32234841632 | | 1000 | 536870912 | 8 | 4 | 24416650118 | | 1000 | 1073741824 | 8 | 4 | 30841124677 | ``` ``` ## parquet_multithreaded_read_decode_chunked_string | cardinality | total_data_size | num_threads | num_cols | bytes_per_second | |-------------|-----------------|-------------|----------|------------------| | 1000 | 536870912 | 1 | 4 | 14637004584 | | 1000 | 1073741824 | 1 | 4 | 16025843421 | | 1000 | 536870912 | 2 | 4 | 15333491977 | | 1000 | 1073741824 | 2 | 4 | 17164197747 | | 1000 | 536870912 | 4 | 4 | 16556300728 | | 1000 | 1073741824 | 4 | 4 | 17711338934 | | 1000 | 536870912 | 8 | 4 | 15788371298 | | 1000 | 1073741824 | 8 | 4 | 17911649578 | ``` In addition, this benchmark clearly shows multi-thread only regressions. An example case below using the pageable-error-code regression we've seen in the past. Example without regression: ``` ## parquet_multithreaded_read_decode_chunked_fixed_width total_data_size | num_threads | bytes_per_second | ----------------|-------------|------------------| 536870912 | 1 | 25681728660 | 1073741824 | 1 | 26281335927 | 536870912 | 2 | 25597258848 | 1073741824 | 2 | 26733626352 | 536870912 | 4 | 25190211717 | 1073741824 | 4 | 28117411682 | 536870912 | 8 | 25805791994 | 1073741824 | 8 | 27788485204 | ``` Example with regression (pageable error-code return values): ``` ## parquet_multithreaded_read_decode_chunked_fixed_width total_data_size | num_threads | bytes_per_second | -----------------|------------|------------------| 536870912 | 1 | 25660470283 | 1073741824 | 1 | 26146862480 | 536870912 | 2 | 25040145602 | 1073741824 | 2 | 25460591520 | 536870912 | 4 | 22917046969 | 1073741824 | 4 | 24922624784 | 536870912 | 8 | 20529770200 | 1073741824 | 8 | 23333751767 | ``` In both cases, we can see that the single-thread case remains the same but there's a regression in the multi-thread case. particularly with 4 threads. Authors: - https://github.com/nvdbaranec - Mike Wilson (https://github.com/hyperbolic2346) Approvers: - Nghia Truong (https://github.com/ttnghia) - Vukasin Milovanovic (https://github.com/vuule) - Mike Wilson (https://github.com/hyperbolic2346) URL: https://github.com/rapidsai/cudf/pull/15585 --- cpp/benchmarks/CMakeLists.txt | 5 + cpp/benchmarks/io/cuio_common.hpp | 4 + .../io/parquet/parquet_reader_multithread.cpp | 351 ++++++++++++++++++ .../cudf}/utilities/thread_pool.hpp | 0 cpp/src/io/utilities/file_io_utilities.hpp | 4 +- 5 files changed, 362 insertions(+), 2 deletions(-) create mode 100644 cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp rename cpp/{src/io => include/cudf}/utilities/thread_pool.hpp (100%) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 4586a12f466..170cf27b72b 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -256,6 +256,11 @@ ConfigureNVBench( PARQUET_READER_NVBENCH io/parquet/parquet_reader_input.cpp io/parquet/parquet_reader_options.cpp ) +# ################################################################################################## +# * parquet multithread reader benchmark +# ---------------------------------------------------------------------- +ConfigureNVBench(PARQUET_MULTITHREAD_READER_NVBENCH io/parquet/parquet_reader_multithread.cpp) + # ################################################################################################## # * orc reader benchmark -------------------------------------------------------------------------- ConfigureNVBench(ORC_READER_NVBENCH io/orc/orc_reader_input.cpp io/orc/orc_reader_options.cpp) diff --git a/cpp/benchmarks/io/cuio_common.hpp b/cpp/benchmarks/io/cuio_common.hpp index 3d5be41e25f..6e0b32219ce 100644 --- a/cpp/benchmarks/io/cuio_common.hpp +++ b/cpp/benchmarks/io/cuio_common.hpp @@ -39,6 +39,10 @@ class cuio_source_sink_pair { // delete the temporary file std::remove(file_name.c_str()); } + // move constructor + cuio_source_sink_pair(cuio_source_sink_pair&& ss) = default; + cuio_source_sink_pair& operator=(cuio_source_sink_pair&& ss) = default; + /** * @brief Created a source info of the set type * diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp new file mode 100644 index 00000000000..fbdcfb0ade9 --- /dev/null +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -0,0 +1,351 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#include + +// TODO: remove this once pinned/pooled is enabled by default in cuIO +void set_cuio_host_pinned_pool() +{ + using host_pooled_mr = rmm::mr::pool_memory_resource; + static std::shared_ptr mr = std::make_shared( + std::make_shared().get(), 256ul * 1024 * 1024); + cudf::io::set_host_memory_resource(*mr); +} + +size_t get_num_reads(nvbench::state const& state) { return state.get_int64("num_threads"); } + +size_t get_read_size(nvbench::state const& state) +{ + auto const num_reads = get_num_reads(state); + return state.get_int64("total_data_size") / num_reads; +} + +std::string get_label(std::string const& test_name, nvbench::state const& state) +{ + auto const num_cols = state.get_int64("num_cols"); + size_t const read_size_mb = get_read_size(state) / (1024 * 1024); + return {test_name + ", " + std::to_string(num_cols) + " columns, " + + std::to_string(state.get_int64("num_threads")) + " threads " + " (" + + std::to_string(read_size_mb) + " MB each)"}; +} + +std::tuple, size_t, size_t> write_file_data( + nvbench::state& state, std::vector const& d_types) +{ + cudf::size_type const cardinality = state.get_int64("cardinality"); + cudf::size_type const run_length = state.get_int64("run_length"); + cudf::size_type const num_cols = state.get_int64("num_cols"); + size_t const num_files = get_num_reads(state); + size_t const per_file_data_size = get_read_size(state); + + std::vector source_sink_vector; + + size_t total_file_size = 0; + + for (size_t i = 0; i < num_files; ++i) { + cuio_source_sink_pair source_sink{cudf::io::io_type::HOST_BUFFER}; + + auto const tbl = create_random_table( + cycle_dtypes(d_types, num_cols), + table_size_bytes{per_file_data_size}, + data_profile_builder().cardinality(cardinality).avg_run_length(run_length)); + auto const view = tbl->view(); + + cudf::io::parquet_writer_options write_opts = + cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view) + .compression(cudf::io::compression_type::SNAPPY) + .max_page_size_rows(50000) + .max_page_size_bytes(1024 * 1024); + + cudf::io::write_parquet(write_opts); + total_file_size += source_sink.size(); + + source_sink_vector.push_back(std::move(source_sink)); + } + + return {std::move(source_sink_vector), total_file_size, num_files}; +} + +void BM_parquet_multithreaded_read_common(nvbench::state& state, + std::vector const& d_types, + std::string const& label) +{ + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); + + set_cuio_host_pinned_pool(); + + auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); + cudf::detail::thread_pool threads(num_threads); + + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + + auto mem_stats_logger = cudf::memory_stats_logger(); + + nvtxRangePushA(("(read) " + label).c_str()); + state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, + [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + auto& source_sink = source_sink_vector[index]; + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); + cudf::io::read_parquet(read_opts, stream, rmm::mr::get_current_device_resource()); + }; + + threads.paused = true; + for (size_t i = 0; i < num_files; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); + nvtxRangePop(); + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); +} + +void BM_parquet_multithreaded_read_mixed(nvbench::state& state) +{ + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common( + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_fixed_width(nvbench::state& state) +{ + auto label = get_label("fixed width", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::INT32}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_string(nvbench::state& state) +{ + auto label = get_label("string", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_list(nvbench::state& state) +{ + auto label = get_label("list", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::LIST}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, + std::vector const& d_types, + std::string const& label) +{ + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); + size_t const input_limit = state.get_int64("input_limit"); + size_t const output_limit = state.get_int64("output_limit"); + + set_cuio_host_pinned_pool(); + + auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); + cudf::detail::thread_pool threads(num_threads); + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + + auto mem_stats_logger = cudf::memory_stats_logger(); + + nvtxRangePushA(("(read) " + label).c_str()); + std::vector chunks; + state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, + [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + auto& source_sink = source_sink_vector[index]; + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); + // divide chunk limits by number of threads so the number of chunks produced is the + // same for all cases. this seems better than the alternative, which is to keep the + // limits the same. if we do that, as the number of threads goes up, the number of + // chunks goes down - so are actually benchmarking the same thing in that case? + auto reader = cudf::io::chunked_parquet_reader( + output_limit / num_threads, input_limit / num_threads, read_opts, stream); + + // read all the chunks + do { + auto table = reader.read_chunk(); + } while (reader.has_next()); + }; + + threads.paused = true; + for (size_t i = 0; i < num_files; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); + nvtxRangePop(); + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); +} + +void BM_parquet_multithreaded_read_chunked_mixed(nvbench::state& state) +{ + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common( + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_fixed_width(nvbench::state& state) +{ + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::INT32}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_string(nvbench::state& state) +{ + auto label = get_label("string", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_list(nvbench::state& state) +{ + auto label = get_label("list", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::LIST}, label); + nvtxRangePop(); +} + +// mixed data types: fixed width and strings +NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed) + .set_name("parquet_multithreaded_read_decode_mixed") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width) + .set_name("parquet_multithreaded_read_decode_fixed_width") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_string) + .set_name("parquet_multithreaded_read_decode_string") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_list) + .set_name("parquet_multithreaded_read_decode_list") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +// mixed data types: fixed width, strings +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed) + .set_name("parquet_multithreaded_read_decode_chunked_mixed") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width) + .set_name("parquet_multithreaded_read_decode_chunked_fixed_width") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string) + .set_name("parquet_multithreaded_read_decode_chunked_string") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list) + .set_name("parquet_multithreaded_read_decode_chunked_list") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); diff --git a/cpp/src/io/utilities/thread_pool.hpp b/cpp/include/cudf/utilities/thread_pool.hpp similarity index 100% rename from cpp/src/io/utilities/thread_pool.hpp rename to cpp/include/cudf/utilities/thread_pool.hpp diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index 74a2ae53961..91ef41fba6e 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -17,10 +17,10 @@ #pragma once #ifdef CUFILE_FOUND -#include "thread_pool.hpp" - #include +#include + #include #endif