diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9cbacee8e8d..2a8a6084822 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -799,6 +799,21 @@ set_source_files_properties( PROPERTIES COMPILE_DEFINITIONS "_FILE_OFFSET_BITS=64" ) +# set_property( +# SOURCE +# src/io/orc/dict_enc.cu +# src/io/orc/reader_impl.cu +# src/io/orc/reader_impl_chunking.cu +# src/io/orc/reader_impl_decode.cu +# src/io/orc/stats_enc.cu +# src/io/orc/stripe_data.cu +# src/io/orc/stripe_enc.cu +# src/io/orc/stripe_init.cu +# src/io/orc/writer_impl.cu +# APPEND +# PROPERTY COMPILE_OPTIONS "-g;-G" +# ) + set_property( SOURCE src/io/parquet/writer_impl.cu APPEND diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index 25984df1b60..d061121d813 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -7,7 +7,7 @@ set -euo pipefail # Parallelism control -PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} +PARALLEL_LEVEL=${PARALLEL_LEVEL:-16} # Installation disabled by default INSTALL_EXAMPLES=false @@ -47,7 +47,7 @@ build_example() { build_dir="${example_dir}/build" # Configure - cmake -S ${example_dir} -B ${build_dir} -Dcudf_ROOT="${LIB_BUILD_DIR}" + cmake -S ${example_dir} -B ${build_dir} -Dcudf_ROOT="${LIB_BUILD_DIR}" -DCMAKE_BUILD_TYPE=Debug -DCMAKE_CUDA_ARCHITECTURES=native # Build cmake --build ${build_dir} -j${PARALLEL_LEVEL} # Install if needed @@ -56,9 +56,10 @@ build_example() { fi } -build_example basic -build_example strings -build_example nested_types -build_example parquet_io -build_example billion_rows -build_example interop +# build_example basic +# build_example strings +# build_example nested_types +# build_example parquet_io +build_example orc_io +# build_example billion_rows +# build_example interop diff --git a/cpp/examples/fetch_dependencies.cmake b/cpp/examples/fetch_dependencies.cmake index 851405caf55..01e508a1791 100644 --- a/cpp/examples/fetch_dependencies.cmake +++ b/cpp/examples/fetch_dependencies.cmake @@ -25,11 +25,5 @@ include(${CMAKE_BINARY_DIR}/cmake/get_cpm.cmake) # find or build it via CPM CPMFindPackage( NAME cudf - FIND_PACKAGE_ARGUMENTS "PATHS ${cudf_ROOT} ${cudf_ROOT}/latest" GIT_REPOSITORY - https://github.com/rapidsai/cudf - GIT_TAG ${CUDF_TAG} - GIT_SHALLOW - TRUE - SOURCE_SUBDIR - cpp + FIND_PACKAGE_ARGUMENTS "REQUIRED PATHS ${cudf_ROOT}/latest" ) diff --git a/cpp/examples/orc_io/CMakeLists.txt b/cpp/examples/orc_io/CMakeLists.txt new file mode 100644 index 00000000000..2e87b76da93 --- /dev/null +++ b/cpp/examples/orc_io/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +cmake_minimum_required(VERSION 3.26.4) + +include(../set_cuda_architecture.cmake) + +# initialize cuda architecture +rapids_cuda_init_architectures(orc_io) + +project( + orc_io + VERSION 0.0.1 + LANGUAGES CXX CUDA +) + +include(../fetch_dependencies.cmake) + +add_library(orc_io_utils OBJECT common_utils.cpp io_source.cpp) +target_compile_features(orc_io_utils PRIVATE cxx_std_17) +target_link_libraries(orc_io_utils PRIVATE cudf::cudf) + +# Build and install orc_io +add_executable(orc_io orc_io.cpp) +target_link_libraries(orc_io PRIVATE cudf::cudf $) +target_compile_features(orc_io PRIVATE cxx_std_17) +install(TARGETS orc_io DESTINATION bin/examples/libcudf) \ No newline at end of file diff --git a/cpp/examples/orc_io/col_b_only.orc b/cpp/examples/orc_io/col_b_only.orc new file mode 100644 index 00000000000..085e30c4fe7 Binary files /dev/null and b/cpp/examples/orc_io/col_b_only.orc differ diff --git a/cpp/examples/orc_io/common_utils.cpp b/cpp/examples/orc_io/common_utils.cpp new file mode 100644 index 00000000000..a79ca48af86 --- /dev/null +++ b/cpp/examples/orc_io/common_utils.cpp @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common_utils.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +/** + * @file common_utils.cpp + * @brief Definitions for common utilities for `parquet_io` examples + * + */ + +std::shared_ptr create_memory_resource(bool is_pool_used) +{ + auto cuda_mr = std::make_shared(); + if (is_pool_used) { + return rmm::mr::make_owning_wrapper( + cuda_mr, rmm::percent_of_free_device_memory(50)); + } + return cuda_mr; +} + +cudf::io::column_encoding get_encoding_type(std::string name) +{ + using encoding_type = cudf::io::column_encoding; + + static std::unordered_map const map = { + {"DEFAULT", encoding_type::USE_DEFAULT}, + {"DICTIONARY", encoding_type::DICTIONARY}, + {"PLAIN", encoding_type::PLAIN}, + {"DELTA_BINARY_PACKED", encoding_type::DELTA_BINARY_PACKED}, + {"DELTA_LENGTH_BYTE_ARRAY", encoding_type::DELTA_LENGTH_BYTE_ARRAY}, + {"DELTA_BYTE_ARRAY", encoding_type::DELTA_BYTE_ARRAY}, + }; + + std::transform(name.begin(), name.end(), name.begin(), ::toupper); + if (map.find(name) != map.end()) { return map.at(name); } + throw std::invalid_argument(name + + " is not a valid encoding type.\n\n" + "Available encoding types: DEFAULT, DICTIONARY, PLAIN,\n" + "DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY,\n" + "DELTA_BYTE_ARRAY\n\n"); +} + +cudf::io::compression_type get_compression_type(std::string name) +{ + using compression_type = cudf::io::compression_type; + + static std::unordered_map const map = { + {"NONE", compression_type::NONE}, + {"AUTO", compression_type::AUTO}, + {"SNAPPY", compression_type::SNAPPY}, + {"LZ4", compression_type::LZ4}, + {"ZSTD", compression_type::ZSTD}}; + + std::transform(name.begin(), name.end(), name.begin(), ::toupper); + if (map.find(name) != map.end()) { return map.at(name); } + throw std::invalid_argument(name + + " is not a valid compression type.\n\n" + "Available compression types: NONE, AUTO, SNAPPY,\n" + "LZ4, ZSTD\n\n"); +} + +bool get_boolean(std::string input) +{ + std::transform(input.begin(), input.end(), input.begin(), ::toupper); + + // Check if the input string matches to any of the following + return input == "ON" or input == "TRUE" or input == "YES" or input == "Y" or input == "T"; +} + +void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table) +{ + try { + // Left anti-join the original and transcoded tables + // identical tables should not throw an exception and + // return an empty indices vector + auto const indices = cudf::left_anti_join(lhs_table, rhs_table, cudf::null_equality::EQUAL); + + // No exception thrown, check indices + auto const valid = indices->size() == 0; + std::cout << "Tables identical: " << valid << "\n\n"; + } catch (std::exception& e) { + std::cerr << e.what() << std::endl << std::endl; + throw std::runtime_error("Tables identical: false\n\n"); + } +} + +std::unique_ptr concatenate_tables(std::vector> tables, + rmm::cuda_stream_view stream) +{ + if (tables.size() == 1) { return std::move(tables[0]); } + + std::vector table_views; + table_views.reserve(tables.size()); + std::transform( + tables.begin(), tables.end(), std::back_inserter(table_views), [&](auto const& tbl) { + return tbl->view(); + }); + // Construct the final table + return cudf::concatenate(table_views, stream); +} + +std::string current_date_and_time() +{ + auto const time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + auto const local_time = *std::localtime(&time); + // Stringstream to format the date and time + std::stringstream ss; + ss << std::put_time(&local_time, "%Y-%m-%d-%H-%M-%S"); + return ss.str(); +} diff --git a/cpp/examples/orc_io/common_utils.hpp b/cpp/examples/orc_io/common_utils.hpp new file mode 100644 index 00000000000..12896e61a0d --- /dev/null +++ b/cpp/examples/orc_io/common_utils.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include + +#include +#include + +/** + * @file common_utils.hpp + * @brief Common utilities for `parquet_io` examples + * + */ + +/** + * @brief Create memory resource for libcudf functions + * + * @param pool Whether to use a pool memory resource. + * @return Memory resource instance + */ +std::shared_ptr create_memory_resource(bool is_pool_used); + +/** + * @brief Get encoding type from the keyword + * + * @param name encoding keyword name + * @return corresponding column encoding type + */ +[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name); + +/** + * @brief Get compression type from the keyword + * + * @param name compression keyword name + * @return corresponding compression type + */ +[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name); + +/** + * @brief Get boolean from they keyword + * + * @param input keyword affirmation string such as: Y, T, YES, TRUE, ON + * @return true or false + */ +[[nodiscard]] bool get_boolean(std::string input); + +/** + * @brief Check if two tables are identical, throw an error otherwise + * + * @param lhs_table View to lhs table + * @param rhs_table View to rhs table + */ +void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table); + +/** + * @brief Concatenate a vector of tables and return the resultant table + * + * @param tables Vector of tables to concatenate + * @param stream CUDA stream to use + * + * @return Unique pointer to the resultant concatenated table. + */ +std::unique_ptr concatenate_tables(std::vector> tables, + rmm::cuda_stream_view stream); + +/** + * @brief Returns a string containing current date and time + * + */ +std::string current_date_and_time(); diff --git a/cpp/examples/orc_io/debug/breakpoints.txt b/cpp/examples/orc_io/debug/breakpoints.txt new file mode 100644 index 00000000000..6675e943850 --- /dev/null +++ b/cpp/examples/orc_io/debug/breakpoints.txt @@ -0,0 +1,26 @@ +break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:210 + +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:1398 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:646 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:700 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:1493 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:1605 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:1444 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:1451 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:1827 +# break /home/coder/cudf/cpp/examples/orc_io/orc_io.cpp:114 +# break /home/coder/cudf/cpp/src/io/orc/reader_impl.cu:85 +# break /home/coder/cudf/cpp/src/io/orc/reader_impl.cu:31 +# break /home/coder/cudf/cpp/src/io/orc/reader_impl_decode.cu:875 +# break /home/coder/cudf/cpp/src/io/orc/reader_impl_decode.cu:933 +# break /home/coder/cudf/cpp/src/io/orc/reader_impl_decode.cu:982 +# break /home/coder/cudf/cpp/src/io/orc/stripe_init.cu:459 +# break /home/coder/cudf/cpp/src/io/orc/stripe_data.cu:1891 + +define biu62 + cuda block (0,62,0) thread (0,0,0) +end + +define biu63 + cuda block (0,63,0) thread (0,0,0) +end \ No newline at end of file diff --git a/cpp/examples/orc_io/debug/build_example.sh b/cpp/examples/orc_io/debug/build_example.sh new file mode 100755 index 00000000000..e868fce8671 --- /dev/null +++ b/cpp/examples/orc_io/debug/build_example.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +pushd $(pwd) + +# Build cuDF after source changes +cd ~/cudf/cpp/build/pip/cuda-12.5/debug +ninja -j16 cudf + +popd + + +# Build the example after source changes +cmake --build ~/cudf/cpp/examples/orc_io/build -j 16 diff --git a/cpp/examples/orc_io/debug/clean.sh b/cpp/examples/orc_io/debug/clean.sh new file mode 100755 index 00000000000..dcd7cb5b0aa --- /dev/null +++ b/cpp/examples/orc_io/debug/clean.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +clean-all +rm -rf ~/cudf/cpp/examples/orc_io/build diff --git a/cpp/examples/orc_io/debug/compare_data.py b/cpp/examples/orc_io/debug/compare_data.py new file mode 100755 index 00000000000..f020ac68889 --- /dev/null +++ b/cpp/examples/orc_io/debug/compare_data.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 + +# References: +# https://github.com/rapidsai/cudf/issues/17155 +# +# In the shell, export KVIKIO_COMPAT_MODE=ON +# +# Col 'a' is the same for pandas and cudf. +# Col 'b' starts to differ at row 630107. + +import pandas as pd +import cudf +import numpy as np +import datetime as dt + + +class CompareManager: + def __init__(self, orcPath): + self.orcPath = orcPath + self.dfFull = None + print("--> orcPath: {}".format(orcPath)) + + # Reference: https://pandas.pydata.org/docs/reference/api/pandas.Timestamp.timestamp.html#pandas.Timestamp.timestamp + def getTime(self, timeStamp): + if isinstance(timeStamp, pd.Timestamp): + npTimeStamp = timeStamp.to_numpy() + elif isinstance(timeStamp, np.datetime64): + npTimeStamp = timeStamp + + tmp1 = npTimeStamp.astype("datetime64[ns]").astype("int") + epochTimeElapsedSec = tmp1 // 1e9 + epochTimeElapsedNano = tmp1 / 1e9 - tmp1 // 1e9 + + tmp2 = npTimeStamp - np.datetime64("2015-01-01") + orcTimeElapsedSec = tmp2 // np.timedelta64(1, "s") + orcTimeElapsedNano = tmp2 / np.timedelta64(1, "s") - orcTimeElapsedSec + + return ( + epochTimeElapsedSec, + epochTimeElapsedNano, + orcTimeElapsedSec, + orcTimeElapsedNano, + ) + + def doIt(self): + pdDf = pd.read_orc(self.orcPath) + # cfDf = cudf.read_orc(self.orcPath, engine="pyarrow") + cfDf = cudf.read_orc(self.orcPath, engine="cudf") + + print(pdDf.shape) + print(cfDf.shape) + + # Convert to numpy array for speed + b1 = pdDf["b"].to_numpy() + b2 = cfDf["b"].to_numpy() + + targetIdx = -1 + for rowIdx in range(len(b1)): + if b1[rowIdx] != b2[rowIdx]: + print("Column b: {}".format(rowIdx)) + targetIdx = rowIdx + break + + print("targetIdx: {}".format(targetIdx)) + + if targetIdx == -1: + print("Same data") + return + + for k in range(-3, 10): + # Now that the target index has been found, + # use pandas/cudf's dataframe instead of numpy + # for better timestamp handling + ts1 = pdDf["b"][targetIdx + k] + ts2 = cfDf["b"][targetIdx + k] + if k == 0: + print("-------------------------------------------------") + + res1 = self.getTime(ts1) + res2 = self.getTime(ts2) + print( + "{}: {} ({}+{}) vs {} ({}+{})".format( + targetIdx + k, ts1, res1[2], res1[3], ts2, res2[2], res2[3] + ) + ) + if k == 0: + print("-------------------------------------------------") + + # lastRowGroupFirstRowIdx = 630000 + # if len(b1) <= lastRowGroupFirstRowIdx: + # print("--> Small data frame. diff.txt is not calcualted.") + # return + + # with open("diff.txt", "w") as f: + # secDiffCount = 0 + # nanoDiffCount = 0 + # for i in range(lastRowGroupFirstRowIdx, len(b1)): + # ts1 = pdDf["b"][i] + # ts2 = cfDf["b"][i] + # res1 = self.getTime(ts1) + # res2 = self.getTime(ts2) + + # if abs(res1[2] - res2[2]) > 1: + # secDiffCount += 1 + + # if abs(res1[3] - res2[3]) > 1e-7: + # nanoDiffCount += 1 + + # f.write( + # "{}: {} ({}+{}) vs {} ({}+{})\n".format( + # i, ts1, res1[2], res1[3], ts2, res2[2], res2[3] + # ) + # ) + + # print("secDiffCount: {}".format(secDiffCount)) + # print("nanoDiffCount: {}".format(nanoDiffCount)) + + +if __name__ == "__main__": + # orcPath = "data/timestamp_bug.snappy.orc" + orcPath = "../col_b_only.orc" + + cmObj = CompareManager(orcPath) + cmObj.doIt() diff --git a/cpp/examples/orc_io/debug/debug.sh b/cpp/examples/orc_io/debug/debug.sh new file mode 100755 index 00000000000..14038ec61fe --- /dev/null +++ b/cpp/examples/orc_io/debug/debug.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# my_debugger=gdb +my_debugger=cuda-gdb + +root_dir=/home/coder/cudf/cpp/examples/orc_io + +example_bin=$root_dir/build/orc_io +# input_file=$root_dir/first_27000_rows.orc +input_file=$root_dir/col_b_only.orc +output_file=$root_dir/debug/test_output.orc + +# DEFAULT, DICTIONARY, PLAIN, DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY +encoding_type=DEFAULT + +# NONE, AUTO, SNAPPY, LZ4, ZSTD +compression_type=NONE + +write_page_stats=yes + +export LIBCUDF_LOGGING_LEVEL=INFO + +$my_debugger -ex start --ex 'source breakpoints.txt' --args $example_bin $input_file $output_file $encoding_type $compression_type + +# $example_bin $input_file $output_file $encoding_type $compression_type $write_page_stats + diff --git a/cpp/examples/orc_io/debug/rebuild_example.sh b/cpp/examples/orc_io/debug/rebuild_example.sh new file mode 100755 index 00000000000..ed05cb7a92e --- /dev/null +++ b/cpp/examples/orc_io/debug/rebuild_example.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +pushd $(pwd) + +# Reconfigure libcudf to use debug build +cd ~ +clean-all +configure-cudf-cpp -DCMAKE_BUILD_TYPE=Debug -DCMAKE_CUDA_ARCHITECTURES=native -j 16 + +# Rebuild libcudf +# In nvcc, compiler options -E and -MD cannot be used together. However: +# - Adding the device debug symbol (-G) for a selected set of .cu files +# appears to cause -MD to be added as a compiler option. +# - sccache always forces -E to be appended even when export CCACHE_DISABLE=1 +# is used. +# Given this conundrum, the hacky solution is to remove the use of sccache +# from the build.ninja file, before building libcudf with the device +# debug symbol added. +cd ~/cudf/cpp/build/pip/cuda-12.5/debug +sed -i 's|LAUNCHER = /usr/bin/sccache|LAUNCHER =|g' build.ninja +ninja -j16 cudf + +# Rebuild the example +# The example will always use the locally built libcudf +cd ~/cudf/cpp/examples +rm -rf orc_io/build +./build.sh +popd + +# Build Python package +# build-pylibcudf-python -j 16 +# build-cudf-python -j 16 \ No newline at end of file diff --git a/cpp/examples/orc_io/debug/use_other_tools.py b/cpp/examples/orc_io/debug/use_other_tools.py new file mode 100755 index 00000000000..1c63adc441d --- /dev/null +++ b/cpp/examples/orc_io/debug/use_other_tools.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 + +# References: +# https://github.com/rapidsai/cudf/issues/17155 + +import pandas as pd + + +class UsePandas: + def __init__(self, orcPath): + self.orcPath = orcPath + self.newOrcPath = "timestamp_ok.snappy.orc" + self.dfFull = None + + def read(self): + self.dfFull = pd.read_orc(self.orcPath) + + # df = self.dfFull[self.dfFull.a == 10] + # print(df) + # print(df.shape) + + print(self.dfFull) + print(self.dfFull.shape) + + def write(self): + self.dfFull.to_orc(self.newOrcPath) + + +if __name__ == '__main__': + orcPath = "../timestamp_bug.snappy.orc" + + pdObj = UsePandas(orcPath) + pdObj.read() + pdObj.write() diff --git a/cpp/examples/orc_io/io_source.cpp b/cpp/examples/orc_io/io_source.cpp new file mode 100644 index 00000000000..019b3f96474 --- /dev/null +++ b/cpp/examples/orc_io/io_source.cpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_source.hpp" + +#include +#include + +#include +#include + +#include + +#include +#include +#include + +rmm::host_async_resource_ref pinned_memory_resource() +{ + static auto mr = rmm::mr::pinned_host_memory_resource{}; + return mr; +} + +io_source_type get_io_source_type(std::string name) +{ + static std::unordered_map const map = { + {"FILEPATH", io_source_type::FILEPATH}, + {"HOST_BUFFER", io_source_type::HOST_BUFFER}, + {"PINNED_BUFFER", io_source_type::PINNED_BUFFER}, + {"DEVICE_BUFFER", io_source_type::DEVICE_BUFFER}}; + + std::transform(name.begin(), name.end(), name.begin(), ::toupper); + if (map.find(name) != map.end()) { + return map.at(name); + } else { + throw std::invalid_argument(name + + " is not a valid io source type. Available: FILEPATH,\n" + "HOST_BUFFER, PINNED_BUFFER, DEVICE_BUFFER.\n\n"); + } +} + +io_source::io_source(std::string_view file_path, io_source_type type, rmm::cuda_stream_view stream) + : pinned_buffer({pinned_memory_resource(), stream}), d_buffer{0, stream} +{ + std::string const file_name{file_path}; + auto const file_size = std::filesystem::file_size(file_name); + + // For filepath make a quick source_info and return early + if (type == io_source_type::FILEPATH) { + source_info = cudf::io::source_info(file_name); + return; + } + + std::ifstream file{file_name, std::ifstream::binary}; + + // Copy file contents to the specified io source buffer + switch (type) { + case io_source_type::HOST_BUFFER: { + h_buffer.resize(file_size); + file.read(h_buffer.data(), file_size); + source_info = cudf::io::source_info(h_buffer.data(), file_size); + break; + } + case io_source_type::PINNED_BUFFER: { + pinned_buffer.resize(file_size); + file.read(pinned_buffer.data(), file_size); + source_info = cudf::io::source_info(pinned_buffer.data(), file_size); + break; + } + case io_source_type::DEVICE_BUFFER: { + h_buffer.resize(file_size); + file.read(h_buffer.data(), file_size); + d_buffer.resize(file_size, stream); + CUDF_CUDA_TRY(cudaMemcpyAsync( + d_buffer.data(), h_buffer.data(), file_size, cudaMemcpyDefault, stream.value())); + + source_info = cudf::io::source_info(d_buffer); + break; + } + default: { + throw std::runtime_error("Encountered unexpected source type\n\n"); + } + } +} diff --git a/cpp/examples/orc_io/io_source.hpp b/cpp/examples/orc_io/io_source.hpp new file mode 100644 index 00000000000..a614d348fae --- /dev/null +++ b/cpp/examples/orc_io/io_source.hpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +#include + +#include + +/** + * @file io_source.hpp + * @brief Utilities for constructing the specified IO sources from the input parquet files. + * + */ + +/** + * @brief Available IO source types + */ +enum class io_source_type { FILEPATH, HOST_BUFFER, PINNED_BUFFER, DEVICE_BUFFER }; + +/** + * @brief Get io source type from the string keyword argument + * + * @param name io source type keyword name + * @return io source type + */ +[[nodiscard]] io_source_type get_io_source_type(std::string name); + +/** + * @brief Create and return a reference to a static pinned memory pool + * + * @return Reference to a static pinned memory pool + */ +rmm::host_async_resource_ref pinned_memory_resource(); + +/** + * @brief Custom allocator for pinned_buffer via RMM. + */ +template +struct pinned_allocator : public std::allocator { + pinned_allocator(rmm::host_async_resource_ref _mr, rmm::cuda_stream_view _stream) + : mr{_mr}, stream{_stream} + { + } + + T* allocate(std::size_t n) + { + auto ptr = mr.allocate_async(n * sizeof(T), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + stream.synchronize(); + return static_cast(ptr); + } + + void deallocate(T* ptr, std::size_t n) + { + mr.deallocate_async(ptr, n * sizeof(T), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + } + + private: + rmm::host_async_resource_ref mr; + rmm::cuda_stream_view stream; +}; + +/** + * @brief Class to create a cudf::io::source_info of given type from the input parquet file + * + */ +class io_source { + public: + io_source(std::string_view file_path, io_source_type io_type, rmm::cuda_stream_view stream); + + // Get the internal source info + [[nodiscard]] cudf::io::source_info get_source_info() const { return source_info; } + + private: + // alias for pinned vector + template + using pinned_vector = thrust::host_vector>; + cudf::io::source_info source_info; + std::vector h_buffer; + pinned_vector pinned_buffer; + rmm::device_uvector d_buffer; +}; diff --git a/cpp/examples/orc_io/orc_io.cpp b/cpp/examples/orc_io/orc_io.cpp new file mode 100644 index 00000000000..52d282f40b5 --- /dev/null +++ b/cpp/examples/orc_io/orc_io.cpp @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../utilities/timer.hpp" +#include "common_utils.hpp" +#include "io_source.hpp" + +#include +#include +#include + +#include + +/** + * @file orc_io.cpp + * @brief Demonstrates usage of the libcudf APIs to read and write + * orc file format with different encodings and compression types + * + * The following encoding and compression ztypes are demonstrated: + * Encoding Types: DEFAULT, DICTIONARY, PLAIN, DELTA_BINARY_PACKED, + * DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY + * + * Compression Types: NONE, AUTO, SNAPPY, LZ4, ZSTD + * + */ + +/** + * @brief Read orc input from file + * + * @param filepath path to input orc file + * @return cudf::io::table_with_metadata + */ +cudf::io::table_with_metadata read_orc(std::string filepath) +{ + auto source_info = cudf::io::source_info(filepath); + auto builder = cudf::io::orc_reader_options::builder(source_info).columns({"b"}); + auto options = builder.build(); + return cudf::io::read_orc(options); +} + +/** + * @brief Function to print example usage and argument information. + */ +void print_usage() +{ + std::cout << "\nUsage: orc \n" + " \n\n" + "Available encoding types: DEFAULT, DICTIONARY, PLAIN, DELTA_BINARY_PACKED,\n" + " DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY\n\n" + "Available compression types: NONE, AUTO, SNAPPY, LZ4, ZSTD\n\n"; +} + +/** + * @brief Main for nested_types examples + * + * Command line parameters: + * 1. orc input file name/path (default: "example.orc") + * 2. orc output file name/path (default: "output.orc") + * 3. encoding type for columns (default: "DELTA_BINARY_PACKED") + * 4. compression type (default: "ZSTD") + * 5. optional: use page size stats metadata (default: "NO") + * + * Example invocation from directory `cudf/cpp/examples/orc_io`: + * ./build/orc_io example.orc output.orc DELTA_BINARY_PACKED ZSTD + * + */ +int main(int argc, char const** argv) +{ + std::string input_filepath = "example.orc"; + std::string output_filepath = "output.orc"; + cudf::io::column_encoding encoding = get_encoding_type("DELTA_BINARY_PACKED"); + cudf::io::compression_type compression = get_compression_type("ZSTD"); + std::optional page_stats = std::nullopt; + + switch (argc) { + case 6: + page_stats = get_boolean(argv[5]) + ? std::make_optional(cudf::io::statistics_freq::STATISTICS_COLUMN) + : std::nullopt; + [[fallthrough]]; + case 5: compression = get_compression_type(argv[4]); [[fallthrough]]; + case 4: encoding = get_encoding_type(argv[3]); [[fallthrough]]; + case 3: output_filepath = argv[2]; [[fallthrough]]; + case 2: // Check if instead of input_paths, the first argument is `-h` or `--help` + if (auto arg = std::string{argv[1]}; arg != "-h" and arg != "--help") { + input_filepath = std::move(arg); + break; + } + [[fallthrough]]; + default: print_usage(); throw std::runtime_error(""); + } + + // Create and use a memory pool + bool is_pool_used = true; + auto resource = create_memory_resource(is_pool_used); + cudf::set_current_device_resource(resource.get()); + + // Read input orc file + cudf::examples::timer timer; + auto [input, metadata] = read_orc(input_filepath); + timer.print_elapsed_millis(); + + return 0; +} diff --git a/cpp/examples/orc_io/timestamp_bug.snappy.orc b/cpp/examples/orc_io/timestamp_bug.snappy.orc new file mode 100644 index 00000000000..df09710e381 Binary files /dev/null and b/cpp/examples/orc_io/timestamp_bug.snappy.orc differ diff --git a/cpp/examples/parquet_io/CMakeLists.txt b/cpp/examples/parquet_io/CMakeLists.txt index b7d8fc14b6c..f152bc22533 100644 --- a/cpp/examples/parquet_io/CMakeLists.txt +++ b/cpp/examples/parquet_io/CMakeLists.txt @@ -6,7 +6,6 @@ include(../set_cuda_architecture.cmake) # initialize cuda architecture rapids_cuda_init_architectures(parquet_io) -rapids_cuda_set_architectures(RAPIDS) project( parquet_io @@ -25,14 +24,14 @@ target_link_libraries(parquet_io_utils PRIVATE cudf::cudf) # Build and install parquet_io add_executable(parquet_io parquet_io.cpp) -target_link_libraries(parquet_io PRIVATE cudf::cudf nvToolsExt $) +target_link_libraries(parquet_io PRIVATE cudf::cudf $) target_compile_features(parquet_io PRIVATE cxx_std_17) install(TARGETS parquet_io DESTINATION bin/examples/libcudf) # Build and install parquet_io_multithreaded add_executable(parquet_io_multithreaded parquet_io_multithreaded.cpp) target_link_libraries( - parquet_io_multithreaded PRIVATE cudf::cudf nvToolsExt $ + parquet_io_multithreaded PRIVATE cudf::cudf $ ) target_compile_features(parquet_io_multithreaded PRIVATE cxx_std_17) install(TARGETS parquet_io_multithreaded DESTINATION bin/examples/libcudf) diff --git a/cpp/examples/parquet_io/debug/debug.sh b/cpp/examples/parquet_io/debug/debug.sh new file mode 100755 index 00000000000..e32e4c6d1af --- /dev/null +++ b/cpp/examples/parquet_io/debug/debug.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +root_dir=/home/coder/cudf/cpp/examples/parquet_io + +example_bin=$root_dir/build/parquet_io +input_file=$root_dir/example.parquet +output_file=$root_dir/debug/test_output.parquet + +# DEFAULT, DICTIONARY, PLAIN, DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY +encoding_type=DEFAULT + +# NONE, AUTO, SNAPPY, LZ4, ZSTD +compression_type=NONE + +write_page_stats=yes + +export LIBCUDF_LOGGING_LEVEL=INFO + +# cuda-gdb -ex start --args $example_bin $input_file $output_file $encoding_type $compression_type + +$example_bin $input_file $output_file $encoding_type $compression_type $write_page_stats + diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 1572b7246c0..3499c7ad89b 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -132,6 +132,146 @@ struct orcdec_state_s { } vals; }; +/** + * @brief Manage caching of the first run of TIMESTAMP's DATA stream for a row group. + * + * This class is used to address a special case, where the first run spans two adjacent row groups + * and its length is greater than the maximum length allowed to be consumed. This limit is imposed + * by the decoder when processing the SECONDARY stream. This class shall be instantiated in the + * shared memory, and be used to cache the DATA stream with a decoded data type of `int64_t`. As an + * optimization, the actual cache is a local variable and does not reside in the shared memory. + */ +class run_cache_manager { + private: + enum class status : uint8_t { + DISABLED, ///< Run cache manager is disabled. No caching will be performed. + CAN_WRITE_TO_CACHE, ///< Run cache manager is ready for write. This is expected to happen in + ///< the first iteration of the top-level while-loop in + ///< gpuDecodeOrcColumnData(). + CAN_READ_FROM_CACHE, ///< Run cache manager is ready for read. This is expected to happen in + ///< the second iteration of the while-loop. + }; + + public: + /** + * @brief Initialize the run cache manager. + * + * @param[in] s ORC decoder state. + */ + __device__ void initialize(orcdec_state_s* s) + { + _status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP) + ? status::CAN_WRITE_TO_CACHE + : status::DISABLED; + _reusable_length = 0; + _run_length = 0; + } + + /** + * @brief Set the reusable length object. + * + * @param[in] run_length The length of the first run (spanning two adjacent row groups) of the + * DATA stream. + * @param[in] max_length The maximum length allowed to be consumed. This limit is imposed + * by the decoder when processing the SECONDARY stream. + */ + __device__ void set_reusable_length(uint32_t run_length, uint32_t max_length) + { + if (_status == status::CAN_WRITE_TO_CACHE) { + _run_length = run_length; + _reusable_length = (_run_length > max_length) ? (_run_length - max_length) : 0; + } + } + + /** + * @brief Adjust the maximum length allowed to be consumed when the length of the first run is + * greater than it. + * + * @param[in] max_length The maximum length allowed to be consumed for the DATA stream. + * @return A new maximum length. + */ + __device__ uint32_t adjust_max_length(uint32_t max_length) + { + auto new_max_length{max_length}; + if (_status == status::CAN_READ_FROM_CACHE and _reusable_length > 0) { + new_max_length -= _reusable_length; + } + return new_max_length; + } + + /** + * @brief Copy the excess data from the intermediate buffer for the DATA stream to the cache. + * + * @param[in] src Intermediate buffer for the DATA stream. + * @param[out] cache Local variable serving as the cache for the DATA stream. + */ + __device__ void write_to_cache(int64_t* src, int64_t& cache) + { + if (_status != status::CAN_WRITE_TO_CACHE) { return; } + + const auto tid = threadIdx.x; + + // All threads in the block always take a uniform code path for the following branches. + // _reusable_length ranges between [0, 512]. + if (_reusable_length > 0) { + const auto length_to_skip = _run_length - _reusable_length; + if (tid < _reusable_length) { + const auto src_idx = tid + length_to_skip; + cache = src[src_idx]; + } + // Block until all writes are done to safely change _status. + __syncthreads(); + if (tid == 0) { _status = status::CAN_READ_FROM_CACHE; } + } else { + __syncthreads(); + if (tid == 0) { _status = status::DISABLED; } + } + + __syncthreads(); + } + + /** + * @brief Copy the cached data to the intermediate buffer for the DATA stream. + * + * @param[in,out] dst Intermediate buffer for the DATA stream. + * @param[in,out] rle Run length decoder state object. + * @param[in] cache Local variable serving as the cache for the DATA stream. + */ + __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle, int64_t cache) + { + if (_status != status::CAN_READ_FROM_CACHE) { return; } + + const auto tid = threadIdx.x; + + // First, shift the data up + const auto dst_idx = tid + _reusable_length; + const auto v = (dst_idx < rle->num_vals + _reusable_length) ? dst[tid] : 0; + __syncthreads(); + + if (dst_idx < rle->num_vals + _reusable_length) { dst[dst_idx] = v; } + __syncthreads(); + + // Second, insert the cached data + if (tid < _reusable_length) { dst[tid] = cache; } + __syncthreads(); + + if (tid == 0) { + _status = status::DISABLED; + rle->num_vals += _reusable_length; + } + + __syncthreads(); + } + + private: + status _status; ///< The status of the run cache manager. + uint32_t + _reusable_length; ///< The number of data to be cached and reused later. For example, if a run + ///< has a length of 512 but the maximum length allowed to be consumed is + ///< capped at 162, then 350 (512-162) data will be cached. + uint32_t _run_length; ///< The length of the run, 512 in the above example. +}; + /** * @brief Initializes byte stream, modifying length and start position to keep the read pointer * 8-byte aligned. @@ -631,6 +771,9 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = { * @param[in] maxvals maximum number of values to decode * @param[in] t thread id * @param[in] has_buffered_values If true, means there are already buffered values + * @param[in] run_cache_manager_inst If non-null, the run cache manager will be used to manage + * caching of the first run of the DATA stream. + * @param[in] cache Local variable serving as the cache. * * @return number of values decoded */ @@ -640,9 +783,14 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, T* vals, uint32_t maxvals, int t, - bool has_buffered_values = false) + bool has_buffered_values = false, + run_cache_manager* run_cache_manager_inst = nullptr, + int64_t* cache = nullptr) { if (t == 0) { + if (run_cache_manager_inst != nullptr) { + maxvals = run_cache_manager_inst->adjust_max_length(maxvals); + } uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u)); uint32_t lastpos = bs->pos; auto numvals = 0; @@ -685,6 +833,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, l += deltapos; } } + + if (run_cache_manager_inst != nullptr) { + run_cache_manager_inst->set_reusable_length(n, maxvals); + } + if ((numvals != 0) and (numvals + n > maxvals)) break; // case where there are buffered values and can't consume a whole chunk // from decoded values, so skip adding any more to buffer, work on buffered values and then @@ -866,6 +1019,15 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, __syncwarp(); } __syncthreads(); + if constexpr (cuda::std::is_same_v) { + if (run_cache_manager_inst != nullptr) { + // Run cache is read from during the 2nd iteration of the top-level while-loop in + // gpuDecodeOrcColumnData(). + run_cache_manager_inst->read_from_cache(vals, rle, *cache); + // Run cache is written to during the 1st iteration of the loop. + run_cache_manager_inst->write_to_cache(vals, *cache); + } + } return rle->num_vals; } @@ -1401,6 +1563,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) // Struct doesn't have any data in itself, so skip bool const is_valid = s->chunk.type_kind != STRUCT; size_t const max_num_rows = s->chunk.column_num_rows; + __shared__ run_cache_manager run_cache_manager_inst; + int64_t cache{}; if (t == 0 and is_valid) { // If we have an index, seek to the initial run and update row positions if (num_rowgroups > 0) { @@ -1443,6 +1607,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) bytestream_init(&s->bs, s->chunk.streams[CI_DATA], s->chunk.strm_len[CI_DATA]); bytestream_init(&s->bs2, s->chunk.streams[CI_DATA2], s->chunk.strm_len[CI_DATA2]); + + run_cache_manager_inst.initialize(s); } __syncthreads(); @@ -1602,7 +1768,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (is_rlev1(s->chunk.encoding_kind)) { numvals = Integer_RLEv1(bs, &s->u.rlev1, s->vals.i64, numvals, t); } else { - numvals = Integer_RLEv2(bs, &s->u.rlev2, s->vals.i64, numvals, t); + numvals = Integer_RLEv2( + bs, &s->u.rlev2, s->vals.i64, numvals, t, false, &run_cache_manager_inst, &cache); } if (s->chunk.type_kind == DECIMAL) { // If we're using an index, we may have to drop values from the initial run