From a5ac4bf3681f8433d3c9a2e96f4287a0daa30088 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 16 Dec 2024 12:32:15 -0800 Subject: [PATCH 01/12] Replace direct `cudaMemcpyAsync` calls with utility functions (within `/src`) (#17550) Replaced the calls to `cudaMemcpyAsync` with the new `cuda_memcpy`/`cuda_memcpy_async` utility, which optionally avoids using the copy engine. Also took the opportunity to use cudf::detail::host_vector and its factories to enable wider pinned memory use. Remaining instances are either not viable (e.g. copying `h_needs_fallback`, interop) or D2D copies. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - David Wendt (https://github.com/davidwendt) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/17550 --- cpp/include/cudf/detail/device_scalar.hpp | 2 +- cpp/src/bitmask/is_element_valid.cpp | 14 ++-- cpp/src/column/column_device_view.cu | 16 ++-- cpp/src/copying/contiguous_split.cu | 95 ++++++++++------------- cpp/src/io/csv/reader_impl.cu | 2 +- cpp/src/io/orc/writer_impl.cu | 2 +- cpp/src/reductions/minmax.cu | 11 +-- cpp/src/scalar/scalar.cpp | 9 +-- cpp/src/strings/regex/regexec.cpp | 14 ++-- cpp/src/text/subword/load_hash_file.cu | 37 ++++----- 10 files changed, 93 insertions(+), 109 deletions(-) diff --git a/cpp/include/cudf/detail/device_scalar.hpp b/cpp/include/cudf/detail/device_scalar.hpp index 16ca06c6561..090dc8b62b6 100644 --- a/cpp/include/cudf/detail/device_scalar.hpp +++ b/cpp/include/cudf/detail/device_scalar.hpp @@ -78,7 +78,7 @@ class device_scalar : public rmm::device_scalar { [[nodiscard]] T value(rmm::cuda_stream_view stream) const { cuda_memcpy(bounce_buffer, device_span{this->data(), 1}, stream); - return bounce_buffer[0]; + return std::move(bounce_buffer[0]); } void set_value_async(T const& value, rmm::cuda_stream_view stream) diff --git a/cpp/src/bitmask/is_element_valid.cpp b/cpp/src/bitmask/is_element_valid.cpp index 7eb80c4249e..d36dacca739 100644 --- a/cpp/src/bitmask/is_element_valid.cpp +++ b/cpp/src/bitmask/is_element_valid.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include @@ -30,15 +31,14 @@ bool is_element_valid_sync(column_view const& col_view, CUDF_EXPECTS(element_index >= 0 and element_index < col_view.size(), "invalid index."); if (!col_view.nullable()) { return true; } - bitmask_type word = 0; // null_mask() returns device ptr to bitmask without offset size_type const index = element_index + col_view.offset(); - CUDF_CUDA_TRY(cudaMemcpyAsync(&word, - col_view.null_mask() + word_index(index), - sizeof(bitmask_type), - cudaMemcpyDefault, - stream.value())); - stream.synchronize(); + + auto const word = + cudf::detail::make_host_vector_sync( + device_span{col_view.null_mask() + word_index(index), 1}, stream) + .front(); + return static_cast(word & (bitmask_type{1} << intra_word_index(index))); } diff --git a/cpp/src/column/column_device_view.cu b/cpp/src/column/column_device_view.cu index fc244521617..9dc39f01ab3 100644 --- a/cpp/src/column/column_device_view.cu +++ b/cpp/src/column/column_device_view.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -60,13 +61,12 @@ create_device_view_from_view(ColumnView const& source, rmm::cuda_stream_view str // A buffer of CPU memory is allocated to hold the ColumnDeviceView // objects. Once filled, the CPU memory is copied to device memory // and then set into the d_children member pointer. - std::vector staging_buffer(descendant_storage_bytes); + auto staging_buffer = detail::make_host_vector(descendant_storage_bytes, stream); // Each ColumnDeviceView instance may have child objects that // require setting some internal device pointers before being copied // from CPU to device. - rmm::device_buffer* const descendant_storage = - new rmm::device_buffer(descendant_storage_bytes, stream); + auto const descendant_storage = new rmm::device_uvector(descendant_storage_bytes, stream); auto deleter = [descendant_storage](ColumnDeviceView* v) { v->destroy(); @@ -77,13 +77,7 @@ create_device_view_from_view(ColumnView const& source, rmm::cuda_stream_view str new ColumnDeviceView(source, staging_buffer.data(), descendant_storage->data()), deleter}; // copy the CPU memory with all the children into device memory - CUDF_CUDA_TRY(cudaMemcpyAsync(descendant_storage->data(), - staging_buffer.data(), - descendant_storage->size(), - cudaMemcpyDefault, - stream.value())); - - stream.synchronize(); + detail::cuda_memcpy(*descendant_storage, staging_buffer, stream); return result; } diff --git a/cpp/src/copying/contiguous_split.cu b/cpp/src/copying/contiguous_split.cu index e3ed5b55415..3413f75357b 100644 --- a/cpp/src/copying/contiguous_split.cu +++ b/cpp/src/copying/contiguous_split.cu @@ -998,7 +998,8 @@ struct packed_split_indices_and_src_buf_info { src_buf_info_size( cudf::util::round_up_safe(num_src_bufs * sizeof(src_buf_info), split_align)), // host-side - h_indices_and_source_info(indices_size + src_buf_info_size), + h_indices_and_source_info{ + detail::make_host_vector(indices_size + src_buf_info_size, stream)}, h_indices{reinterpret_cast(h_indices_and_source_info.data())}, h_src_buf_info{ reinterpret_cast(h_indices_and_source_info.data() + indices_size)} @@ -1025,15 +1026,18 @@ struct packed_split_indices_and_src_buf_info { reinterpret_cast(reinterpret_cast(d_indices_and_source_info.data()) + indices_size + src_buf_info_size); - CUDF_CUDA_TRY(cudaMemcpyAsync( - d_indices, h_indices, indices_size + src_buf_info_size, cudaMemcpyDefault, stream.value())); + detail::cuda_memcpy_async( + device_span{static_cast(d_indices_and_source_info.data()), + h_indices_and_source_info.size()}, + h_indices_and_source_info, + stream); } size_type const indices_size; std::size_t const src_buf_info_size; std::size_t offset_stack_size; - std::vector h_indices_and_source_info; + detail::host_vector h_indices_and_source_info; rmm::device_buffer d_indices_and_source_info; size_type* const h_indices; @@ -1055,27 +1059,26 @@ struct packed_partition_buf_size_and_dst_buf_info { buf_sizes_size{cudf::util::round_up_safe(num_partitions * sizeof(std::size_t), split_align)}, dst_buf_info_size{cudf::util::round_up_safe(num_bufs * sizeof(dst_buf_info), split_align)}, // host-side - h_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size), + h_buf_sizes_and_dst_info{ + detail::make_host_vector(buf_sizes_size + dst_buf_info_size, stream)}, h_buf_sizes{reinterpret_cast(h_buf_sizes_and_dst_info.data())}, h_dst_buf_info{ - reinterpret_cast(h_buf_sizes_and_dst_info.data() + buf_sizes_size)}, + reinterpret_cast(h_buf_sizes_and_dst_info.data() + buf_sizes_size), + num_bufs, + h_buf_sizes_and_dst_info.get_allocator().is_device_accessible()}, // device-side - d_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size, stream, temp_mr), + d_buf_sizes_and_dst_info(h_buf_sizes_and_dst_info.size(), stream, temp_mr), d_buf_sizes{reinterpret_cast(d_buf_sizes_and_dst_info.data())}, // destination buffer info - d_dst_buf_info{reinterpret_cast( - static_cast(d_buf_sizes_and_dst_info.data()) + buf_sizes_size)} + d_dst_buf_info{ + reinterpret_cast(d_buf_sizes_and_dst_info.data() + buf_sizes_size), num_bufs} { } void copy_to_host() { // DtoH buf sizes and col info back to the host - CUDF_CUDA_TRY(cudaMemcpyAsync(h_buf_sizes, - d_buf_sizes, - buf_sizes_size + dst_buf_info_size, - cudaMemcpyDefault, - stream.value())); + detail::cuda_memcpy_async(h_buf_sizes_and_dst_info, d_buf_sizes_and_dst_info, stream); } rmm::cuda_stream_view const stream; @@ -1084,13 +1087,13 @@ struct packed_partition_buf_size_and_dst_buf_info { std::size_t const buf_sizes_size; std::size_t const dst_buf_info_size; - std::vector h_buf_sizes_and_dst_info; + detail::host_vector h_buf_sizes_and_dst_info; std::size_t* const h_buf_sizes; - dst_buf_info* const h_dst_buf_info; + host_span const h_dst_buf_info; - rmm::device_buffer d_buf_sizes_and_dst_info; + rmm::device_uvector d_buf_sizes_and_dst_info; std::size_t* const d_buf_sizes; - dst_buf_info* const d_dst_buf_info; + device_span const d_dst_buf_info; }; // Packed block of memory 3: @@ -1106,11 +1109,12 @@ struct packed_src_and_dst_pointers { src_bufs_size{cudf::util::round_up_safe(num_src_bufs * sizeof(uint8_t*), split_align)}, dst_bufs_size{cudf::util::round_up_safe(num_partitions * sizeof(uint8_t*), split_align)}, // host-side - h_src_and_dst_buffers(src_bufs_size + dst_bufs_size), + h_src_and_dst_buffers{ + detail::make_host_vector(src_bufs_size + dst_bufs_size, stream)}, h_src_bufs{reinterpret_cast(h_src_and_dst_buffers.data())}, h_dst_bufs{reinterpret_cast(h_src_and_dst_buffers.data() + src_bufs_size)}, // device-side - d_src_and_dst_buffers{rmm::device_buffer(src_bufs_size + dst_bufs_size, stream, temp_mr)}, + d_src_and_dst_buffers{h_src_and_dst_buffers.size(), stream, temp_mr}, d_src_bufs{reinterpret_cast(d_src_and_dst_buffers.data())}, d_dst_bufs{reinterpret_cast( reinterpret_cast(d_src_and_dst_buffers.data()) + src_bufs_size)} @@ -1121,18 +1125,18 @@ struct packed_src_and_dst_pointers { void copy_to_device() { - CUDF_CUDA_TRY(cudaMemcpyAsync(d_src_and_dst_buffers.data(), - h_src_and_dst_buffers.data(), - src_bufs_size + dst_bufs_size, - cudaMemcpyDefault, - stream.value())); + detail::cuda_memcpy_async( + device_span{static_cast(d_src_and_dst_buffers.data()), + d_src_and_dst_buffers.size()}, + h_src_and_dst_buffers, + stream); } rmm::cuda_stream_view const stream; std::size_t const src_bufs_size; std::size_t const dst_bufs_size; - std::vector h_src_and_dst_buffers; + detail::host_vector h_src_and_dst_buffers; uint8_t const** const h_src_bufs; uint8_t** const h_dst_bufs; @@ -1205,7 +1209,7 @@ std::unique_ptr compute_splits( std::make_unique( num_partitions, num_bufs, stream, temp_mr); - auto const d_dst_buf_info = partition_buf_size_and_dst_buf_info->d_dst_buf_info; + auto const d_dst_buf_info = partition_buf_size_and_dst_buf_info->d_dst_buf_info.begin(); auto const d_buf_sizes = partition_buf_size_and_dst_buf_info->d_buf_sizes; auto const split_indices_and_src_buf_info = packed_split_indices_and_src_buf_info( @@ -1518,26 +1522,19 @@ std::unique_ptr chunk_iteration_state::create( */ if (user_buffer_size != 0) { // copy the batch offsets back to host - std::vector h_offsets(num_batches + 1); - { - rmm::device_uvector offsets(h_offsets.size(), stream, temp_mr); + auto const h_offsets = [&] { + rmm::device_uvector offsets(num_batches + 1, stream, temp_mr); auto const batch_byte_size_iter = cudf::detail::make_counting_transform_iterator( 0, batch_byte_size_function{num_batches, d_batched_dst_buf_info.begin()}); - thrust::exclusive_scan(rmm::exec_policy(stream, temp_mr), + thrust::exclusive_scan(rmm::exec_policy_nosync(stream, temp_mr), batch_byte_size_iter, - batch_byte_size_iter + num_batches + 1, + batch_byte_size_iter + offsets.size(), offsets.begin()); - CUDF_CUDA_TRY(cudaMemcpyAsync(h_offsets.data(), - offsets.data(), - sizeof(std::size_t) * offsets.size(), - cudaMemcpyDefault, - stream.value())); - // the next part is working on the CPU, so we want to synchronize here - stream.synchronize(); - } + return detail::make_host_vector_sync(offsets, stream); + }(); std::vector num_batches_per_iteration; std::vector size_of_batches_per_iteration; @@ -1699,7 +1696,7 @@ void copy_data(int num_batches_to_copy, int starting_batch, uint8_t const** d_src_bufs, uint8_t** d_dst_bufs, - rmm::device_uvector& d_dst_buf_info, + device_span d_dst_buf_info, uint8_t* user_buffer, rmm::cuda_stream_view stream) { @@ -1833,15 +1830,9 @@ struct contiguous_split_state { keys + num_batches_total, values, thrust::make_discard_iterator(), - dst_valid_count_output_iterator{d_orig_dst_buf_info}); - - CUDF_CUDA_TRY(cudaMemcpyAsync(h_orig_dst_buf_info, - d_orig_dst_buf_info, - partition_buf_size_and_dst_buf_info->dst_buf_info_size, - cudaMemcpyDefault, - stream.value())); + dst_valid_count_output_iterator{d_orig_dst_buf_info.begin()}); - stream.synchronize(); + detail::cuda_memcpy(h_orig_dst_buf_info, d_orig_dst_buf_info, stream); // not necessary for the non-chunked case, but it makes it so further calls to has_next // return false, just in case @@ -1889,7 +1880,7 @@ struct contiguous_split_state { } auto& h_dst_buf_info = partition_buf_size_and_dst_buf_info->h_dst_buf_info; - auto cur_dst_buf_info = h_dst_buf_info; + auto cur_dst_buf_info = h_dst_buf_info.data(); detail::metadata_builder mb{input.num_columns()}; populate_metadata(input.begin(), input.end(), cur_dst_buf_info, mb); @@ -1927,7 +1918,7 @@ struct contiguous_split_state { // Second pass: uses `dst_buf_info` to break down the work into 1MB batches. chunk_iter_state = compute_batches(num_bufs, - partition_buf_size_and_dst_buf_info->d_dst_buf_info, + partition_buf_size_and_dst_buf_info->d_dst_buf_info.data(), partition_buf_size_and_dst_buf_info->h_buf_sizes, num_partitions, user_buffer_size, @@ -1963,7 +1954,7 @@ struct contiguous_split_state { auto& h_dst_buf_info = partition_buf_size_and_dst_buf_info->h_dst_buf_info; auto& h_dst_bufs = src_and_dst_pointers->h_dst_bufs; - auto cur_dst_buf_info = h_dst_buf_info; + auto cur_dst_buf_info = h_dst_buf_info.data(); detail::metadata_builder mb(input.num_columns()); for (std::size_t idx = 0; idx < num_partitions; idx++) { diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 7f0b5e07b09..e05353ee822 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -21,13 +21,13 @@ #include "csv_common.hpp" #include "csv_gpu.hpp" -#include "cudf/detail/utilities/cuda_memcpy.hpp" #include "io/comp/io_uncomp.hpp" #include "io/utilities/column_buffer.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "io/utilities/parsing_utils.cuh" #include +#include #include #include #include diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 8e532b01788..6b9c19368dc 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -19,7 +19,6 @@ * @brief cuDF-IO ORC writer class implementation */ -#include "cudf/detail/utilities/cuda_memcpy.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" @@ -30,6 +29,7 @@ #include #include #include +#include #include #include #include diff --git a/cpp/src/reductions/minmax.cu b/cpp/src/reductions/minmax.cu index 98fd9f679c8..21d8c95e199 100644 --- a/cpp/src/reductions/minmax.cu +++ b/cpp/src/reductions/minmax.cu @@ -218,9 +218,8 @@ struct minmax_functor { auto dev_result = reduce(col, stream); // copy the minmax_pair to the host; does not copy the strings using OutputType = minmax_pair; - OutputType host_result; - CUDF_CUDA_TRY(cudaMemcpyAsync( - &host_result, dev_result.data(), sizeof(OutputType), cudaMemcpyDefault, stream.value())); + + auto const host_result = dev_result.value(stream); // strings are copied to create the scalars here return {std::make_unique(host_result.min_val, true, stream, mr), std::make_unique(host_result.max_val, true, stream, mr)}; @@ -236,10 +235,8 @@ struct minmax_functor { // compute minimum and maximum values auto dev_result = reduce(col, stream); // copy the minmax_pair to the host to call get_element - using OutputType = minmax_pair; - OutputType host_result; - CUDF_CUDA_TRY(cudaMemcpyAsync( - &host_result, dev_result.data(), sizeof(OutputType), cudaMemcpyDefault, stream.value())); + using OutputType = minmax_pair; + OutputType host_result = dev_result.value(stream); // get the keys for those indexes auto const keys = dictionary_column_view(col).keys(); return {detail::get_element(keys, static_cast(host_result.min_val), stream, mr), diff --git a/cpp/src/scalar/scalar.cpp b/cpp/src/scalar/scalar.cpp index 4ec2174a96f..4b0b08fe251 100644 --- a/cpp/src/scalar/scalar.cpp +++ b/cpp/src/scalar/scalar.cpp @@ -114,11 +114,10 @@ string_scalar::operator std::string() const { return this->to_string(cudf::get_d std::string string_scalar::to_string(rmm::cuda_stream_view stream) const { - std::string result; - result.resize(_data.size()); - CUDF_CUDA_TRY( - cudaMemcpyAsync(&result[0], _data.data(), _data.size(), cudaMemcpyDefault, stream.value())); - stream.synchronize(); + std::string result(size(), '\0'); + detail::cuda_memcpy(host_span{result.data(), result.size()}, + device_span{data(), _data.size()}, + stream); return result; } diff --git a/cpp/src/strings/regex/regexec.cpp b/cpp/src/strings/regex/regexec.cpp index 3d11b641b3f..902e13fe75e 100644 --- a/cpp/src/strings/regex/regexec.cpp +++ b/cpp/src/strings/regex/regexec.cpp @@ -17,7 +17,9 @@ #include "strings/regex/regcomp.h" #include "strings/regex/regex.cuh" +#include #include +#include #include #include @@ -66,10 +68,11 @@ std::unique_ptr> reprog_devic cudf::util::round_up_safe(classes_size, sizeof(char32_t)); // allocate memory to store all the prog data in a flat contiguous buffer - std::vector h_buffer(memsize); // copy everything into here; - auto h_ptr = h_buffer.data(); // this is our running host ptr; - auto d_buffer = new rmm::device_buffer(memsize, stream); // output device memory; - auto d_ptr = reinterpret_cast(d_buffer->data()); // running device pointer + auto h_buffer = + cudf::detail::make_host_vector(memsize, stream); // copy everything into here; + auto h_ptr = h_buffer.data(); // this is our running host ptr; + auto d_buffer = new rmm::device_uvector(memsize, stream); // output device memory; + auto d_ptr = d_buffer->data(); // running device pointer // create our device object; this is managed separately and returned to the caller auto* d_prog = new reprog_device(h_prog); @@ -113,8 +116,7 @@ std::unique_ptr> reprog_devic d_prog->_prog_size = memsize + sizeof(reprog_device); // copy flat prog to device memory - CUDF_CUDA_TRY( - cudaMemcpyAsync(d_buffer->data(), h_buffer.data(), memsize, cudaMemcpyDefault, stream.value())); + cudf::detail::cuda_memcpy_async(*d_buffer, h_buffer, stream); // build deleter to cleanup device memory auto deleter = [d_buffer](reprog_device* t) { diff --git a/cpp/src/text/subword/load_hash_file.cu b/cpp/src/text/subword/load_hash_file.cu index b13ad0a7de8..ee51a426eac 100644 --- a/cpp/src/text/subword/load_hash_file.cu +++ b/cpp/src/text/subword/load_hash_file.cu @@ -19,6 +19,8 @@ #include #include +#include +#include #include #include #include @@ -198,8 +200,8 @@ std::unique_ptr load_vocabulary_file( std::getline(hash_file, line); result.num_bins = str_to_uint32(line, line_no++); - std::vector bin_coefficients(result.num_bins); - std::vector bin_offsets(result.num_bins); + auto bin_coefficients = cudf::detail::make_host_vector(result.num_bins, stream); + auto bin_offsets = cudf::detail::make_host_vector(result.num_bins, stream); for (int i = 0; i < result.num_bins; ++i) { std::getline(hash_file, line); @@ -216,7 +218,7 @@ std::unique_ptr load_vocabulary_file( std::getline(hash_file, line); uint64_t hash_table_length = str_to_uint64(line, line_no++); - std::vector table(hash_table_length); + auto table = cudf::detail::make_host_vector(hash_table_length, stream); std::generate(table.begin(), table.end(), [&hash_file, &line_no]() { std::string line; @@ -239,33 +241,32 @@ std::unique_ptr load_vocabulary_file( cudf::mask_state::UNALLOCATED, stream, mr); - CUDF_CUDA_TRY(cudaMemcpyAsync(result.table->mutable_view().data(), - table.data(), - table.size() * sizeof(uint64_t), - cudaMemcpyDefault, - stream.value())); + cudf::detail::cuda_memcpy_async( + cudf::device_span(result.table->mutable_view().data(), table.size()), + table, + stream); result.bin_coefficients = cudf::make_numeric_column(cudf::data_type{cudf::type_id::UINT64}, bin_coefficients.size(), cudf::mask_state::UNALLOCATED, stream, mr); - CUDF_CUDA_TRY(cudaMemcpyAsync(result.bin_coefficients->mutable_view().data(), - bin_coefficients.data(), - bin_coefficients.size() * sizeof(uint64_t), - cudaMemcpyDefault, - stream.value())); + cudf::detail::cuda_memcpy_async( + cudf::device_span(result.bin_coefficients->mutable_view().data(), + bin_coefficients.size()), + bin_coefficients, + stream); result.bin_offsets = cudf::make_numeric_column(cudf::data_type{cudf::type_id::UINT16}, bin_offsets.size(), cudf::mask_state::UNALLOCATED, stream, mr); - CUDF_CUDA_TRY(cudaMemcpyAsync(result.bin_offsets->mutable_view().data(), - bin_offsets.data(), - bin_offsets.size() * sizeof(uint16_t), - cudaMemcpyDefault, - stream.value())); + cudf::detail::cuda_memcpy_async( + cudf::device_span(result.bin_offsets->mutable_view().data(), + bin_offsets.size()), + bin_offsets, + stream); auto cp_metadata = detail::get_codepoint_metadata(stream); auto const cp_metadata_size = static_cast(cp_metadata.size()); From e9e34e631adc650adc230b788e03ac0489b097c1 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 16 Dec 2024 17:11:27 -0800 Subject: [PATCH 02/12] Stop memory_resource.hpp from including itself (#17603) Resolves #17595 Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Nghia Truong (https://github.com/ttnghia) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/17603 --- cpp/include/cudf/utilities/memory_resource.hpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/include/cudf/utilities/memory_resource.hpp b/cpp/include/cudf/utilities/memory_resource.hpp index b562574fd79..eaba466557b 100644 --- a/cpp/include/cudf/utilities/memory_resource.hpp +++ b/cpp/include/cudf/utilities/memory_resource.hpp @@ -16,8 +16,6 @@ #pragma once -#include - #include #include #include From 5802d343f3bb8aaad1c2ebe440535769c3455e66 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 16 Dec 2024 17:38:28 -0800 Subject: [PATCH 03/12] Correctly accept a `pandas.CategoricalDtype(pandas.IntervalDtype(...), ...)` type (#17604) From an offline discussion, a pandas object with an `category[interval[...]]` type would be incorrectly be interpreted as a `category[struct[...]]` type. This can cause further problems with `cudf.pandas` as a `category[struct[...]]` type cannot be properly interpreted by pandas. Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17604 --- python/cudf/cudf/core/column/categorical.py | 27 ++++++++++------- python/cudf/cudf/core/column/column.py | 32 +++++++++++++-------- python/cudf/cudf/tests/test_categorical.py | 10 +++++++ 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/python/cudf/cudf/core/column/categorical.py b/python/cudf/cudf/core/column/categorical.py index a0cf38c6f51..d9b54008e85 100644 --- a/python/cudf/cudf/core/column/categorical.py +++ b/python/cudf/cudf/core/column/categorical.py @@ -1095,17 +1095,22 @@ def as_categorical_column(self, dtype: Dtype) -> Self: raise ValueError("dtype must be CategoricalDtype") if not isinstance(self.categories, type(dtype.categories._column)): - # If both categories are of different Column types, - # return a column full of Nulls. - codes = cast( - cudf.core.column.numerical.NumericalColumn, - column.as_column( - _DEFAULT_CATEGORICAL_VALUE, - length=self.size, - dtype=self.codes.dtype, - ), - ) - codes = as_unsigned_codes(len(dtype.categories), codes) + if isinstance( + self.categories.dtype, cudf.StructDtype + ) and isinstance(dtype.categories.dtype, cudf.IntervalDtype): + codes = self.codes + else: + # Otherwise if both categories are of different Column types, + # return a column full of nulls. + codes = cast( + cudf.core.column.numerical.NumericalColumn, + column.as_column( + _DEFAULT_CATEGORICAL_VALUE, + length=self.size, + dtype=self.codes.dtype, + ), + ) + codes = as_unsigned_codes(len(dtype.categories), codes) return type(self)( data=self.data, # type: ignore[arg-type] size=self.size, diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 1445124bbc3..2ae7c3f6503 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -2076,18 +2076,26 @@ def as_column( if isinstance(arbitrary.dtype, pd.DatetimeTZDtype): new_tz = get_compatible_timezone(arbitrary.dtype) arbitrary = arbitrary.astype(new_tz) - if isinstance(arbitrary.dtype, pd.CategoricalDtype) and isinstance( - arbitrary.dtype.categories.dtype, pd.DatetimeTZDtype - ): - new_tz = get_compatible_timezone( - arbitrary.dtype.categories.dtype - ) - new_cats = arbitrary.dtype.categories.astype(new_tz) - new_dtype = pd.CategoricalDtype( - categories=new_cats, ordered=arbitrary.dtype.ordered - ) - arbitrary = arbitrary.astype(new_dtype) - + if isinstance(arbitrary.dtype, pd.CategoricalDtype): + if isinstance( + arbitrary.dtype.categories.dtype, pd.DatetimeTZDtype + ): + new_tz = get_compatible_timezone( + arbitrary.dtype.categories.dtype + ) + new_cats = arbitrary.dtype.categories.astype(new_tz) + new_dtype = pd.CategoricalDtype( + categories=new_cats, ordered=arbitrary.dtype.ordered + ) + arbitrary = arbitrary.astype(new_dtype) + elif ( + isinstance( + arbitrary.dtype.categories.dtype, pd.IntervalDtype + ) + and dtype is None + ): + # Conversion to arrow converts IntervalDtype to StructDtype + dtype = cudf.CategoricalDtype.from_pandas(arbitrary.dtype) return as_column( pa.array(arbitrary, from_pandas=True), nan_as_null=nan_as_null, diff --git a/python/cudf/cudf/tests/test_categorical.py b/python/cudf/cudf/tests/test_categorical.py index db24fdd2a29..8e1dba858c3 100644 --- a/python/cudf/cudf/tests/test_categorical.py +++ b/python/cudf/cudf/tests/test_categorical.py @@ -950,3 +950,13 @@ def test_index_set_categories(ordered): expected = pd_ci.set_categories([1, 2, 3, 4], ordered=ordered) result = cudf_ci.set_categories([1, 2, 3, 4], ordered=ordered) assert_eq(result, expected) + + +def test_categorical_interval_pandas_roundtrip(): + expected = cudf.Series(cudf.interval_range(0, 5)).astype("category") + result = cudf.Series.from_pandas(expected.to_pandas()) + assert_eq(result, expected) + + expected = pd.Series(pd.interval_range(0, 5)).astype("category") + result = cudf.Series.from_pandas(expected).to_pandas() + assert_eq(result, expected) From c650bf7a86a986f80fd8f1270139a459e4cae7ab Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 16 Dec 2024 17:52:39 -0800 Subject: [PATCH 04/12] Move cudf._lib.stream_compaction to cudf.core._internals (#17456) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/17456 --- python/cudf/cudf/_lib/CMakeLists.txt | 4 +- python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/stream_compaction.pyx | 181 ------------------ python/cudf/cudf/core/_base_index.py | 14 +- .../cudf/core/_internals/stream_compaction.py | 121 ++++++++++++ python/cudf/cudf/core/column/column.py | 29 +-- python/cudf/cudf/core/dataframe.py | 7 +- python/cudf/cudf/core/frame.py | 13 -- python/cudf/cudf/core/groupby/groupby.py | 5 +- python/cudf/cudf/core/indexed_frame.py | 59 +++--- 10 files changed, 191 insertions(+), 243 deletions(-) delete mode 100644 python/cudf/cudf/_lib/stream_compaction.pyx create mode 100644 python/cudf/cudf/core/_internals/stream_compaction.py diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 296f8685f6a..5b9fa83b33c 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,8 +12,8 @@ # the License. # ============================================================================= -set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx stream_compaction.pyx - string_casting.pyx strings_udf.pyx types.pyx utils.pyx +set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx string_casting.pyx strings_udf.pyx + types.pyx utils.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 78b92025deb..63090ef86c8 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -4,7 +4,6 @@ from . import ( groupby, interop, - stream_compaction, string_casting, strings_udf, ) diff --git a/python/cudf/cudf/_lib/stream_compaction.pyx b/python/cudf/cudf/_lib/stream_compaction.pyx deleted file mode 100644 index 1b8831940e3..00000000000 --- a/python/cudf/cudf/_lib/stream_compaction.pyx +++ /dev/null @@ -1,181 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from cudf.core.buffer import acquire_spill_lock - -from libcpp cimport bool - -from cudf._lib.column cimport Column -from cudf._lib.utils cimport columns_from_pylibcudf_table - -import pylibcudf - - -@acquire_spill_lock() -def drop_nulls(list columns, how="any", keys=None, thresh=None): - """ - Drops null rows from cols depending on key columns. - - Parameters - ---------- - columns : list of columns - how : "any" or "all". If thresh is None, drops rows of cols that have any - nulls or all nulls (respectively) in subset (default: "any") - keys : List of column indices. If set, then these columns are checked for - nulls rather than all of columns (optional) - thresh : Minimum number of non-nulls required to keep a row (optional) - - Returns - ------- - columns with null rows dropped - """ - if how not in {"any", "all"}: - raise ValueError("how must be 'any' or 'all'") - - keys = list(keys if keys is not None else range(len(columns))) - - # Note: If how == "all" and thresh is specified this prioritizes thresh - if thresh is not None: - keep_threshold = thresh - elif how == "all": - keep_threshold = 1 - else: - keep_threshold = len(keys) - - return columns_from_pylibcudf_table( - pylibcudf.stream_compaction.drop_nulls( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - keys, - keep_threshold, - ) - ) - - -@acquire_spill_lock() -def apply_boolean_mask(list columns, Column boolean_mask): - """ - Drops the rows which correspond to False in boolean_mask. - - Parameters - ---------- - columns : list of columns whose rows are dropped as per boolean_mask - boolean_mask : a boolean column of same size as source_table - - Returns - ------- - columns obtained from applying mask - """ - return columns_from_pylibcudf_table( - pylibcudf.stream_compaction.apply_boolean_mask( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - boolean_mask.to_pylibcudf(mode="read"), - ) - ) - - -_keep_options = { - "first": pylibcudf.stream_compaction.DuplicateKeepOption.KEEP_FIRST, - "last": pylibcudf.stream_compaction.DuplicateKeepOption.KEEP_LAST, - False: pylibcudf.stream_compaction.DuplicateKeepOption.KEEP_NONE, -} - - -@acquire_spill_lock() -def drop_duplicates(list columns, - object keys=None, - object keep='first', - bool nulls_are_equal=True): - """ - Drops rows in source_table as per duplicate rows in keys. - - Parameters - ---------- - columns : List of columns - keys : List of column indices. If set, then these columns are checked for - duplicates rather than all of columns (optional) - keep : keep 'first' or 'last' or none of the duplicate rows - nulls_are_equal : if True, nulls are treated equal else not. - - Returns - ------- - columns with duplicate dropped - """ - if (keep_option := _keep_options.get(keep)) is None: - raise ValueError('keep must be either "first", "last" or False') - - return columns_from_pylibcudf_table( - pylibcudf.stream_compaction.stable_distinct( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - list(keys if keys is not None else range(len(columns))), - keep_option, - pylibcudf.types.NullEquality.EQUAL - if nulls_are_equal else pylibcudf.types.NullEquality.UNEQUAL, - pylibcudf.types.NanEquality.ALL_EQUAL, - ) - ) - - -@acquire_spill_lock() -def distinct_indices( - list columns, - object keep="first", - bool nulls_equal=True, - bool nans_equal=True, -): - """ - Return indices of the distinct rows in a table. - - Parameters - ---------- - columns : list of columns to check for duplicates - keep : treat "first", "last", or (False) none of any duplicate - rows as distinct - nulls_equal : Should nulls compare equal - nans_equal: Should nans compare equal - - Returns - ------- - Column of indices - - See Also - -------- - drop_duplicates - """ - if (keep_option := _keep_options.get(keep)) is None: - raise ValueError('keep must be either "first", "last" or False') - - return Column.from_pylibcudf( - pylibcudf.stream_compaction.distinct_indices( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - keep_option, - pylibcudf.types.NullEquality.EQUAL - if nulls_equal else pylibcudf.types.NullEquality.UNEQUAL, - pylibcudf.types.NanEquality.ALL_EQUAL - if nans_equal else pylibcudf.types.NanEquality.UNEQUAL, - ) - ) - - -@acquire_spill_lock() -def distinct_count(Column source_column, ignore_nulls=True, nan_as_null=False): - """ - Finds number of unique rows in `source_column` - - Parameters - ---------- - source_column : source table checked for unique rows - ignore_nulls : If True nulls are ignored, - else counted as one more distinct value - nan_as_null : If True, NAN is considered NULL, - else counted as one more distinct value - - Returns - ------- - Count of number of unique rows in `source_column` - """ - return pylibcudf.stream_compaction.distinct_count( - source_column.to_pylibcudf(mode="read"), - pylibcudf.types.NullPolicy.EXCLUDE - if ignore_nulls else pylibcudf.types.NullPolicy.INCLUDE, - pylibcudf.types.NanPolicy.NAN_IS_NULL - if nan_as_null else pylibcudf.types.NanPolicy.NAN_IS_VALID, - ) diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index e97f63db17a..f4543bc6156 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -10,15 +10,15 @@ from typing_extensions import Self import cudf -from cudf._lib.stream_compaction import ( - apply_boolean_mask, - drop_duplicates, - drop_nulls, -) from cudf._lib.types import size_type_dtype from cudf.api.extensions import no_default from cudf.api.types import is_integer, is_list_like, is_scalar from cudf.core._internals import copying +from cudf.core._internals.stream_compaction import ( + apply_boolean_mask, + drop_duplicates, + drop_nulls, +) from cudf.core.abc import Serializable from cudf.core.column import ColumnBase, column from cudf.core.copy_types import GatherMap @@ -414,7 +414,7 @@ def hasnans(self): raise NotImplementedError @property - def nlevels(self): + def nlevels(self) -> int: """ Number of levels. """ @@ -1944,7 +1944,6 @@ def drop_duplicates( return self._from_columns_like_self( drop_duplicates( list(self._columns), - keys=range(len(self._columns)), keep=keep, nulls_are_equal=nulls_are_equal, ), @@ -2033,7 +2032,6 @@ def dropna(self, how="any"): drop_nulls( data_columns, how=how, - keys=range(len(data_columns)), ), self._column_names, ) diff --git a/python/cudf/cudf/core/_internals/stream_compaction.py b/python/cudf/cudf/core/_internals/stream_compaction.py new file mode 100644 index 00000000000..4ccc26c2a1c --- /dev/null +++ b/python/cudf/cudf/core/_internals/stream_compaction.py @@ -0,0 +1,121 @@ +# Copyright (c) 2020-2024, NVIDIA CORPORATION. +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +import pylibcudf as plc + +from cudf._lib.column import Column +from cudf.core.buffer import acquire_spill_lock + +if TYPE_CHECKING: + from cudf.core.column import ColumnBase + + +@acquire_spill_lock() +def drop_nulls( + columns: list[ColumnBase], + how: Literal["any", "all"] = "any", + keys: list[int] | None = None, + thresh: int | None = None, +) -> list[ColumnBase]: + """ + Drops null rows from cols depending on key columns. + + Parameters + ---------- + columns : list of columns + how : "any" or "all". If thresh is None, drops rows of cols that have any + nulls or all nulls (respectively) in subset (default: "any") + keys : List of column indices. If set, then these columns are checked for + nulls rather than all of columns (optional) + thresh : Minimum number of non-nulls required to keep a row (optional) + + Returns + ------- + columns with null rows dropped + """ + if how not in {"any", "all"}: + raise ValueError("how must be 'any' or 'all'") + + keys = keys if keys is not None else list(range(len(columns))) + + # Note: If how == "all" and thresh is specified this prioritizes thresh + if thresh is not None: + keep_threshold = thresh + elif how == "all": + keep_threshold = 1 + else: + keep_threshold = len(keys) + + plc_table = plc.stream_compaction.drop_nulls( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + keys, + keep_threshold, + ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()] + + +@acquire_spill_lock() +def apply_boolean_mask( + columns: list[ColumnBase], boolean_mask: ColumnBase +) -> list[ColumnBase]: + """ + Drops the rows which correspond to False in boolean_mask. + + Parameters + ---------- + columns : list of columns whose rows are dropped as per boolean_mask + boolean_mask : a boolean column of same size as source_table + + Returns + ------- + columns obtained from applying mask + """ + plc_table = plc.stream_compaction.apply_boolean_mask( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + boolean_mask.to_pylibcudf(mode="read"), + ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()] + + +@acquire_spill_lock() +def drop_duplicates( + columns: list[ColumnBase], + keys: list[int] | None = None, + keep: Literal["first", "last", False] = "first", + nulls_are_equal: bool = True, +) -> list[ColumnBase]: + """ + Drops rows in source_table as per duplicate rows in keys. + + Parameters + ---------- + columns : List of columns + keys : List of column indices. If set, then these columns are checked for + duplicates rather than all of columns (optional) + keep : keep 'first' or 'last' or none of the duplicate rows + nulls_are_equal : if True, nulls are treated equal else not. + + Returns + ------- + columns with duplicate dropped + """ + _keep_options = { + "first": plc.stream_compaction.DuplicateKeepOption.KEEP_FIRST, + "last": plc.stream_compaction.DuplicateKeepOption.KEEP_LAST, + False: plc.stream_compaction.DuplicateKeepOption.KEEP_NONE, + } + if (keep_option := _keep_options.get(keep)) is None: + raise ValueError('keep must be either "first", "last" or False') + + plc_table = plc.stream_compaction.stable_distinct( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + keys if keys is not None else list(range(len(columns))), + keep_option, + plc.types.NullEquality.EQUAL + if nulls_are_equal + else plc.types.NullEquality.UNEQUAL, + plc.types.NanEquality.ALL_EQUAL, + ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()] diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 2ae7c3f6503..2515157253c 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -26,12 +26,6 @@ from cudf import _lib as libcudf from cudf._lib.column import Column from cudf._lib.scalar import as_device_scalar -from cudf._lib.stream_compaction import ( - apply_boolean_mask, - distinct_count as cpp_distinct_count, - drop_duplicates, - drop_nulls, -) from cudf._lib.types import dtype_to_pylibcudf_type, size_type_dtype from cudf.api.types import ( _is_non_decimal_numeric_dtype, @@ -43,6 +37,11 @@ ) from cudf.core._compat import PANDAS_GE_210 from cudf.core._internals import aggregation, copying, sorting, unary +from cudf.core._internals.stream_compaction import ( + apply_boolean_mask, + drop_duplicates, + drop_nulls, +) from cudf.core._internals.timezones import get_compatible_timezone from cudf.core.abc import Serializable from cudf.core.buffer import ( @@ -276,7 +275,7 @@ def any(self, skipna: bool = True) -> bool: def dropna(self) -> Self: if self.has_nulls(): - return drop_nulls([self])[0]._with_type_metadata(self.dtype) + return drop_nulls([self])[0]._with_type_metadata(self.dtype) # type: ignore[return-value] else: return self.copy() @@ -849,7 +848,7 @@ def indices_of( else: value = as_column(value, dtype=self.dtype, length=1) mask = value.contains(self) - return apply_boolean_mask( + return apply_boolean_mask( # type: ignore[return-value] [as_column(range(0, len(self)), dtype=size_type_dtype)], mask )[0] @@ -1084,9 +1083,15 @@ def distinct_count(self, dropna: bool = True) -> int: try: return self._distinct_count[dropna] except KeyError: - self._distinct_count[dropna] = cpp_distinct_count( - self, ignore_nulls=dropna - ) + with acquire_spill_lock(): + result = plc.stream_compaction.distinct_count( + self.to_pylibcudf(mode="read"), + plc.types.NullPolicy.EXCLUDE + if dropna + else plc.types.NullPolicy.INCLUDE, + plc.types.NanPolicy.NAN_IS_VALID, + ) + self._distinct_count[dropna] = result return self._distinct_count[dropna] def can_cast_safely(self, to_dtype: Dtype) -> bool: @@ -1315,7 +1320,7 @@ def unique(self) -> Self: if self.is_unique: return self.copy() else: - return drop_duplicates([self], keep="first")[ + return drop_duplicates([self], keep="first")[ # type: ignore[return-value] 0 ]._with_type_metadata(self.dtype) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 2c92069f26e..e66e4f41642 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -7878,7 +7878,8 @@ def interleave_columns(self): return self._constructor_sliced._from_column(result_col) @acquire_spill_lock() - def _compute_columns(self, expr: str) -> ColumnBase: + def _compute_column(self, expr: str) -> ColumnBase: + """Helper function for eval""" plc_column = plc.transform.compute_column( plc.Table( [col.to_pylibcudf(mode="read") for col in self._columns] @@ -8014,7 +8015,7 @@ def eval(self, expr: str, inplace: bool = False, **kwargs): raise ValueError( "Cannot operate inplace if there is no assignment" ) - return Series._from_column(self._compute_columns(statements[0])) + return Series._from_column(self._compute_column(statements[0])) targets = [] exprs = [] @@ -8032,7 +8033,7 @@ def eval(self, expr: str, inplace: bool = False, **kwargs): ret = self if inplace else self.copy(deep=False) for name, expr in zip(targets, exprs): - ret._data[name] = self._compute_columns(expr) + ret._data[name] = self._compute_column(expr) if not inplace: return ret diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 2412d6e9c4f..ba9b15667f1 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1058,19 +1058,6 @@ def to_arrow(self): } ) - @_performance_tracking - def _positions_from_column_names(self, column_names) -> list[int]: - """Map each column name into their positions in the frame. - - The order of indices returned corresponds to the column order in this - Frame. - """ - return [ - i - for i, name in enumerate(self._column_names) - if name in set(column_names) - ] - @_performance_tracking def _copy_type_metadata(self: Self, other: Self) -> Self: """ diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index b772d35846d..6cd8e11695f 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -16,6 +16,7 @@ import pylibcudf as plc import cudf +import cudf.core._internals from cudf import _lib as libcudf from cudf._lib import groupby as libgroupby from cudf._lib.types import size_type_dtype @@ -430,7 +431,9 @@ def indices(self) -> dict[ScalarLike, cp.ndarray]: ] ) - group_keys = libcudf.stream_compaction.drop_duplicates(group_keys) + group_keys = cudf.core._internals.stream_compaction.drop_duplicates( + group_keys + ) if len(group_keys) > 1: index = cudf.MultiIndex.from_arrays(group_keys) else: diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 8302cd72aa8..72bb85821fa 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -27,6 +27,7 @@ import cudf import cudf._lib as libcudf import cudf.core +import cudf.core._internals import cudf.core.algorithms from cudf.api.extensions import no_default from cudf.api.types import ( @@ -3063,21 +3064,21 @@ def _slice(self, arg: slice, keep_index: bool = True) -> Self: return result def _positions_from_column_names( - self, column_names, offset_by_index_columns=False - ): + self, + column_names: set[abc.Hashable], + offset_by_index_columns: bool = True, + ) -> list[int]: """Map each column name into their positions in the frame. Return positions of the provided column names, offset by the number of index columns if `offset_by_index_columns` is True. The order of indices returned corresponds to the column order in this Frame. """ - num_index_columns = ( - len(self.index._data) if offset_by_index_columns else 0 - ) + start = self.index.nlevels if offset_by_index_columns else 0 return [ - i + num_index_columns - for i, name in enumerate(self._column_names) - if name in set(column_names) + i + for i, name in enumerate(self._column_names, start=start) + if name in column_names ] def drop_duplicates( @@ -3114,7 +3115,7 @@ def drop_duplicates( subset, offset_by_index_columns=not ignore_index ) return self._from_columns_like_self( - libcudf.stream_compaction.drop_duplicates( + cudf.core._internals.stream_compaction.drop_duplicates( list(self._columns) if ignore_index else list(self.index._columns + self._columns), @@ -3127,7 +3128,9 @@ def drop_duplicates( ) @_performance_tracking - def duplicated(self, subset=None, keep="first"): + def duplicated( + self, subset=None, keep: Literal["first", "last", False] = "first" + ) -> cudf.Series: """ Return boolean Series denoting duplicate rows. @@ -3227,9 +3230,24 @@ def duplicated(self, subset=None, keep="first"): name = self.name else: columns = [self._data[n] for n in subset] - distinct = libcudf.stream_compaction.distinct_indices( - columns, keep=keep - ) + + _keep_options = { + "first": plc.stream_compaction.DuplicateKeepOption.KEEP_FIRST, + "last": plc.stream_compaction.DuplicateKeepOption.KEEP_LAST, + False: plc.stream_compaction.DuplicateKeepOption.KEEP_NONE, + } + + if (keep_option := _keep_options.get(keep)) is None: + raise ValueError('keep must be either "first", "last" or False') + + with acquire_spill_lock(): + plc_column = plc.stream_compaction.distinct_indices( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + keep_option, + plc.types.NullEquality.EQUAL, + plc.types.NanEquality.ALL_EQUAL, + ) + distinct = libcudf.column.Column.from_pylibcudf(plc_column) result = copying.scatter( [cudf.Scalar(False, dtype=bool)], distinct, @@ -4353,12 +4371,10 @@ def _drop_na_rows(self, how="any", subset=None, thresh=None): data_columns = [col.nans_to_nulls() for col in self._columns] return self._from_columns_like_self( - libcudf.stream_compaction.drop_nulls( + cudf.core._internals.stream_compaction.drop_nulls( [*self.index._columns, *data_columns], how=how, - keys=self._positions_from_column_names( - subset, offset_by_index_columns=True - ), + keys=self._positions_from_column_names(subset), thresh=thresh, ), self._column_names, @@ -4378,7 +4394,7 @@ def _apply_boolean_mask(self, boolean_mask: BooleanMask, keep_index=True): f"{len(boolean_mask.column)} not {len(self)}" ) return self._from_columns_like_self( - libcudf.stream_compaction.apply_boolean_mask( + cudf.core._internals.stream_compaction.apply_boolean_mask( list(self.index._columns + self._columns) if keep_index else list(self._columns), @@ -6289,17 +6305,16 @@ def ge(self, other, axis="columns", level=None, fill_value=None): other=other, op="__ge__", fill_value=fill_value, can_reindex=True ) - def _preprocess_subset(self, subset): + def _preprocess_subset(self, subset) -> set[abc.Hashable]: if subset is None: subset = self._column_names elif ( - not np.iterable(subset) - or isinstance(subset, str) + is_scalar(subset) or isinstance(subset, tuple) and subset in self._column_names ): subset = (subset,) - diff = set(subset) - set(self._data) + diff = set(subset) - set(self._column_names) if len(diff) != 0: raise KeyError(f"columns {diff} do not exist") return subset From 187053abc4b3941ab1fa26828d396042e91c2b10 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 16 Dec 2024 21:18:46 -0800 Subject: [PATCH 05/12] Remove cudf._lib.string_casting in favor of inlining pylibcudf (#17460) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/17460 --- python/cudf/cudf/_lib/CMakeLists.txt | 4 +- python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/string_casting.pyx | 598 ---------------------- python/cudf/cudf/core/column/datetime.py | 12 +- python/cudf/cudf/core/column/numerical.py | 40 +- python/cudf/cudf/core/column/string.py | 170 +++--- python/cudf/cudf/core/column/timedelta.py | 12 +- python/cudf/cudf/core/tools/numeric.py | 3 +- 8 files changed, 129 insertions(+), 711 deletions(-) delete mode 100644 python/cudf/cudf/_lib/string_casting.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 5b9fa83b33c..bfbfbfed333 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,8 +12,8 @@ # the License. # ============================================================================= -set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx string_casting.pyx strings_udf.pyx - types.pyx utils.pyx +set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx strings_udf.pyx types.pyx + utils.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 63090ef86c8..e18e05cc43e 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -4,7 +4,6 @@ from . import ( groupby, interop, - string_casting, strings_udf, ) diff --git a/python/cudf/cudf/_lib/string_casting.pyx b/python/cudf/cudf/_lib/string_casting.pyx deleted file mode 100644 index 06ee07d8e2b..00000000000 --- a/python/cudf/cudf/_lib/string_casting.pyx +++ /dev/null @@ -1,598 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from cudf._lib.column cimport Column - -import pylibcudf as plc -from pylibcudf.types cimport DataType - -from cudf._lib.scalar import as_device_scalar - -from cudf._lib.types cimport dtype_to_pylibcudf_type - - -def floating_to_string(Column input_col): - plc_column = plc.strings.convert.convert_floats.from_floats( - input_col.to_pylibcudf(mode="read"), - ) - return Column.from_pylibcudf(plc_column) - - -def string_to_floating(Column input_col, DataType out_type): - plc_column = plc.strings.convert.convert_floats.to_floats( - input_col.to_pylibcudf(mode="read"), - out_type - ) - return Column.from_pylibcudf(plc_column) - - -def dtos(Column input_col): - """ - Converting/Casting input column of type double to string column - - Parameters - ---------- - input_col : input column of type double - - Returns - ------- - A Column with double values cast to string - """ - - return floating_to_string(input_col) - - -def stod(Column input_col): - """ - Converting/Casting input column of type string to double - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to double - """ - - return string_to_floating(input_col, plc.DataType(plc.TypeId.FLOAT64)) - - -def ftos(Column input_col): - """ - Converting/Casting input column of type float to string column - - Parameters - ---------- - input_col : input column of type double - - Returns - ------- - A Column with float values cast to string - """ - - return floating_to_string(input_col) - - -def stof(Column input_col): - """ - Converting/Casting input column of type string to float - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to float - """ - - return string_to_floating(input_col, plc.DataType(plc.TypeId.FLOAT32)) - - -def integer_to_string(Column input_col): - plc_column = plc.strings.convert.convert_integers.from_integers( - input_col.to_pylibcudf(mode="read"), - ) - return Column.from_pylibcudf(plc_column) - - -def string_to_integer(Column input_col, DataType out_type): - plc_column = plc.strings.convert.convert_integers.to_integers( - input_col.to_pylibcudf(mode="read"), - out_type - ) - return Column.from_pylibcudf(plc_column) - - -def i8tos(Column input_col): - """ - Converting/Casting input column of type int8 to string column - - Parameters - ---------- - input_col : input column of type int8 - - Returns - ------- - A Column with int8 values cast to string - """ - - return integer_to_string(input_col) - - -def stoi8(Column input_col): - """ - Converting/Casting input column of type string to int8 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to int8 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.INT8)) - - -def i16tos(Column input_col): - """ - Converting/Casting input column of type int16 to string column - - Parameters - ---------- - input_col : input column of type int16 - - Returns - ------- - A Column with int16 values cast to string - """ - - return integer_to_string(input_col) - - -def stoi16(Column input_col): - """ - Converting/Casting input column of type string to int16 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to int16 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.INT16)) - - -def itos(Column input_col): - """ - Converting/Casting input column of type int32 to string column - - Parameters - ---------- - input_col : input column of type int32 - - Returns - ------- - A Column with int32 values cast to string - """ - - return integer_to_string(input_col) - - -def stoi(Column input_col): - """ - Converting/Casting input column of type string to int32 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to int32 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.INT32)) - - -def ltos(Column input_col): - """ - Converting/Casting input column of type int64 to string column - - Parameters - ---------- - input_col : input column of type int64 - - Returns - ------- - A Column with int64 values cast to string - """ - - return integer_to_string(input_col) - - -def stol(Column input_col): - """ - Converting/Casting input column of type string to int64 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to int64 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.INT64)) - - -def ui8tos(Column input_col): - """ - Converting/Casting input column of type uint8 to string column - - Parameters - ---------- - input_col : input column of type uint8 - - Returns - ------- - A Column with uint8 values cast to string - """ - - return integer_to_string(input_col) - - -def stoui8(Column input_col): - """ - Converting/Casting input column of type string to uint8 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to uint8 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.UINT8)) - - -def ui16tos(Column input_col): - """ - Converting/Casting input column of type uint16 to string column - - Parameters - ---------- - input_col : input column of type uint16 - - Returns - ------- - A Column with uint16 values cast to string - """ - - return integer_to_string(input_col) - - -def stoui16(Column input_col): - """ - Converting/Casting input column of type string to uint16 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to uint16 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.UINT16)) - - -def uitos(Column input_col): - """ - Converting/Casting input column of type uint32 to string column - - Parameters - ---------- - input_col : input column of type uint32 - - Returns - ------- - A Column with uint32 values cast to string - """ - - return integer_to_string(input_col) - - -def stoui(Column input_col): - """ - Converting/Casting input column of type string to uint32 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to uint32 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.UINT32)) - - -def ultos(Column input_col): - """ - Converting/Casting input column of type uint64 to string column - - Parameters - ---------- - input_col : input column of type uint64 - - Returns - ------- - A Column with uint64 values cast to string - """ - - return integer_to_string(input_col) - - -def stoul(Column input_col): - """ - Converting/Casting input column of type string to uint64 - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with strings cast to uint64 - """ - - return string_to_integer(input_col, plc.DataType(plc.TypeId.UINT64)) - - -def to_booleans(Column input_col): - plc_column = plc.strings.convert.convert_booleans.to_booleans( - input_col.to_pylibcudf(mode="read"), - as_device_scalar("True").c_value, - ) - return Column.from_pylibcudf(plc_column) - - -def from_booleans(Column input_col): - plc_column = plc.strings.convert.convert_booleans.from_booleans( - input_col.to_pylibcudf(mode="read"), - as_device_scalar("True").c_value, - as_device_scalar("False").c_value, - ) - return Column.from_pylibcudf(plc_column) - - -def int2timestamp( - Column input_col, - str format, - Column names): - """ - Converting/Casting input date-time column to string - column with specified format - - Parameters - ---------- - input_col : input column of type timestamp in integer format - format : The string specifying output format - names : The string names to use for weekdays ("%a", "%A") and - months ("%b", "%B") - - Returns - ------- - A Column with date-time represented in string format - - """ - return Column.from_pylibcudf( - plc.strings.convert.convert_datetime.from_timestamps( - input_col.to_pylibcudf(mode="read"), - format, - names.to_pylibcudf(mode="read") - ) - ) - - -def timestamp2int(Column input_col, dtype, format): - """ - Converting/Casting input string column to date-time column with specified - timestamp_format - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with string represented in date-time format - - """ - dtype = dtype_to_pylibcudf_type(dtype) - return Column.from_pylibcudf( - plc.strings.convert.convert_datetime.to_timestamps( - input_col.to_pylibcudf(mode="read"), - dtype, - format - ) - ) - - -def istimestamp(Column input_col, str format): - """ - Check input string column matches the specified timestamp format - - Parameters - ---------- - input_col : input column of type string - - format : format string of timestamp specifiers - - Returns - ------- - A Column of boolean values identifying strings that matched the format. - - """ - plc_column = plc.strings.convert.convert_datetime.is_timestamp( - input_col.to_pylibcudf(mode="read"), - format - ) - return Column.from_pylibcudf(plc_column) - - -def timedelta2int(Column input_col, dtype, format): - """ - Converting/Casting input string column to TimeDelta column with specified - format - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column with string represented in TimeDelta format - - """ - dtype = dtype_to_pylibcudf_type(dtype) - return Column.from_pylibcudf( - plc.strings.convert.convert_durations.to_durations( - input_col.to_pylibcudf(mode="read"), - dtype, - format - ) - ) - - -def int2timedelta(Column input_col, str format): - """ - Converting/Casting input Timedelta column to string - column with specified format - - Parameters - ---------- - input_col : input column of type Timedelta in integer format - - Returns - ------- - A Column with Timedelta represented in string format - - """ - return Column.from_pylibcudf( - plc.strings.convert.convert_durations.from_durations( - input_col.to_pylibcudf(mode="read"), - format - ) - ) - - -def int2ip(Column input_col): - """ - Converting/Casting integer column to string column in ipv4 format - - Parameters - ---------- - input_col : input integer column - - Returns - ------- - A Column with integer represented in string ipv4 format - - """ - plc_column = plc.strings.convert.convert_ipv4.integers_to_ipv4( - input_col.to_pylibcudf(mode="read") - ) - return Column.from_pylibcudf(plc_column) - - -def ip2int(Column input_col): - """ - Converting string ipv4 column to integer column - - Parameters - ---------- - input_col : input string column - - Returns - ------- - A Column with ipv4 represented as integer - - """ - plc_column = plc.strings.convert.convert_ipv4.ipv4_to_integers( - input_col.to_pylibcudf(mode="read") - ) - return Column.from_pylibcudf(plc_column) - - -def is_ipv4(Column source_strings): - """ - Returns a Column of boolean values with True for `source_strings` - that have strings in IPv4 format. This format is nnn.nnn.nnn.nnn - where nnn is integer digits in [0,255]. - """ - plc_column = plc.strings.convert.convert_ipv4.is_ipv4( - source_strings.to_pylibcudf(mode="read") - ) - return Column.from_pylibcudf(plc_column) - - -def htoi(Column input_col): - """ - Converting input column of type string having hex values - to integer of out_type - - Parameters - ---------- - input_col : input column of type string - - Returns - ------- - A Column of integers parsed from hexadecimal string values. - """ - plc_column = plc.strings.convert.convert_integers.hex_to_integers( - input_col.to_pylibcudf(mode="read"), - plc.DataType(plc.TypeId.INT64) - ) - return Column.from_pylibcudf(plc_column) - - -def is_hex(Column source_strings): - """ - Returns a Column of boolean values with True for `source_strings` - that have hex characters. - """ - plc_column = plc.strings.convert.convert_integers.is_hex( - source_strings.to_pylibcudf(mode="read"), - ) - return Column.from_pylibcudf(plc_column) - - -def itoh(Column input_col): - """ - Converting input column of type integer to a string - column with hexadecimal character digits. - - Parameters - ---------- - input_col : input column of type integer - - Returns - ------- - A Column of strings with hexadecimal characters. - """ - plc_column = plc.strings.convert.convert_integers.integers_to_hex( - input_col.to_pylibcudf(mode="read"), - ) - return Column.from_pylibcudf(plc_column) diff --git a/python/cudf/cudf/core/column/datetime.py b/python/cudf/cudf/core/column/datetime.py index c991f291eec..1a820da3c62 100644 --- a/python/cudf/cudf/core/column/datetime.py +++ b/python/cudf/cudf/core/column/datetime.py @@ -19,7 +19,6 @@ import cudf import cudf.core.column.column as column -import cudf.core.column.string as string from cudf import _lib as libcudf from cudf.core._compat import PANDAS_GE_220 from cudf.core._internals import binaryop, unary @@ -602,9 +601,14 @@ def strftime(self, format: str) -> cudf.core.column.StringColumn: names = as_column(_DATETIME_NAMES) else: names = column.column_empty(0, dtype="object") - return string._datetime_to_str_typecast_functions[self.dtype]( - self, format, names - ) + with acquire_spill_lock(): + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.strings.convert.convert_datetime.from_timestamps( + self.to_pylibcudf(mode="read"), + format, + names.to_pylibcudf(mode="read"), + ) + ) def as_string_column(self) -> cudf.core.column.StringColumn: format = _dtype_to_format_conversion.get( diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index f099cef3331..4405e153b0c 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -14,8 +14,6 @@ import cudf import cudf.core.column.column as column -import cudf.core.column.string as string -from cudf import _lib as libcudf from cudf.api.types import is_integer, is_scalar from cudf.core._internals import binaryop, unary from cudf.core.buffer import acquire_spill_lock, as_buffer @@ -366,22 +364,42 @@ def normalize_binop_value(self, other: ScalarLike) -> Self | cudf.Scalar: else: return NotImplemented - def int2ip(self) -> "cudf.core.column.StringColumn": - if self.dtype != cudf.dtype("uint32"): + @acquire_spill_lock() + def int2ip(self) -> cudf.core.column.StringColumn: + if self.dtype != np.dtype(np.uint32): raise TypeError("Only uint32 type can be converted to ip") - - return libcudf.string_casting.int2ip(self) + plc_column = plc.strings.convert.convert_ipv4.integers_to_ipv4( + self.to_pylibcudf(mode="read") + ) + return type(self).from_pylibcudf(plc_column) # type: ignore[return-value] def as_string_column(self) -> cudf.core.column.StringColumn: - if len(self) > 0: - return string._numeric_to_str_typecast_functions[ - cudf.dtype(self.dtype) - ](self) - else: + if len(self) == 0: return cast( cudf.core.column.StringColumn, column.column_empty(0, dtype="object"), ) + elif self.dtype.kind == "b": + conv_func = functools.partial( + plc.strings.convert.convert_booleans.from_booleans, + true_string=cudf.Scalar( + "True", dtype="str" + ).device_value.c_value, + false_string=cudf.Scalar( + "False", dtype="str" + ).device_value.c_value, + ) + elif self.dtype.kind in {"i", "u"}: + conv_func = plc.strings.convert.convert_integers.from_integers + elif self.dtype.kind == "f": + conv_func = plc.strings.convert.convert_floats.from_floats + else: + raise ValueError(f"No string conversion from type {self.dtype}") + + with acquire_spill_lock(): + return type(self).from_pylibcudf( # type: ignore[return-value] + conv_func(self.to_pylibcudf(mode="read")) + ) def as_datetime_column( self, dtype: Dtype diff --git a/python/cudf/cudf/core/column/string.py b/python/cudf/cudf/core/column/string.py index 0c93f60eab2..fcdcb789f23 100644 --- a/python/cudf/cudf/core/column/string.py +++ b/python/cudf/cudf/core/column/string.py @@ -20,9 +20,8 @@ import cudf.core.column.column as column import cudf.core.column.datetime as datetime from cudf import _lib as libcudf -from cudf._lib import string_casting as str_cast from cudf._lib.column import Column -from cudf._lib.types import size_type_dtype +from cudf._lib.types import dtype_to_pylibcudf_type, size_type_dtype from cudf.api.types import is_integer, is_scalar, is_string_dtype from cudf.core._internals import binaryop from cudf.core.buffer import acquire_spill_lock @@ -49,62 +48,7 @@ from cudf.core.column.numerical import NumericalColumn -def str_to_boolean(column: StringColumn): - """Takes in string column and returns boolean column""" - with acquire_spill_lock(): - plc_column = plc.strings.attributes.count_characters( - column.to_pylibcudf(mode="read") - ) - result = Column.from_pylibcudf(plc_column) - return (result > cudf.Scalar(0, dtype="int8")).fillna(False) - - -_str_to_numeric_typecast_functions = { - cudf.api.types.dtype("int8"): str_cast.stoi8, - cudf.api.types.dtype("int16"): str_cast.stoi16, - cudf.api.types.dtype("int32"): str_cast.stoi, - cudf.api.types.dtype("int64"): str_cast.stol, - cudf.api.types.dtype("uint8"): str_cast.stoui8, - cudf.api.types.dtype("uint16"): str_cast.stoui16, - cudf.api.types.dtype("uint32"): str_cast.stoui, - cudf.api.types.dtype("uint64"): str_cast.stoul, - cudf.api.types.dtype("float32"): str_cast.stof, - cudf.api.types.dtype("float64"): str_cast.stod, - cudf.api.types.dtype("bool"): str_to_boolean, -} - -_numeric_to_str_typecast_functions = { - cudf.api.types.dtype("int8"): str_cast.i8tos, - cudf.api.types.dtype("int16"): str_cast.i16tos, - cudf.api.types.dtype("int32"): str_cast.itos, - cudf.api.types.dtype("int64"): str_cast.ltos, - cudf.api.types.dtype("uint8"): str_cast.ui8tos, - cudf.api.types.dtype("uint16"): str_cast.ui16tos, - cudf.api.types.dtype("uint32"): str_cast.uitos, - cudf.api.types.dtype("uint64"): str_cast.ultos, - cudf.api.types.dtype("float32"): str_cast.ftos, - cudf.api.types.dtype("float64"): str_cast.dtos, - cudf.api.types.dtype("bool"): str_cast.from_booleans, -} - -_datetime_to_str_typecast_functions = { - # TODO: support Date32 UNIX days - # cudf.api.types.dtype("datetime64[D]"): str_cast.int2timestamp, - cudf.api.types.dtype("datetime64[s]"): str_cast.int2timestamp, - cudf.api.types.dtype("datetime64[ms]"): str_cast.int2timestamp, - cudf.api.types.dtype("datetime64[us]"): str_cast.int2timestamp, - cudf.api.types.dtype("datetime64[ns]"): str_cast.int2timestamp, -} - -_timedelta_to_str_typecast_functions = { - cudf.api.types.dtype("timedelta64[s]"): str_cast.int2timedelta, - cudf.api.types.dtype("timedelta64[ms]"): str_cast.int2timedelta, - cudf.api.types.dtype("timedelta64[us]"): str_cast.int2timedelta, - cudf.api.types.dtype("timedelta64[ns]"): str_cast.int2timedelta, -} - - -def _is_supported_regex_flags(flags): +def _is_supported_regex_flags(flags: int) -> bool: return flags == 0 or ( (flags & (re.MULTILINE | re.DOTALL) != 0) and (flags & ~(re.MULTILINE | re.DOTALL) == 0) @@ -155,10 +99,7 @@ def htoi(self) -> SeriesOrIndex: 3 51966 dtype: int64 """ - - out = str_cast.htoi(self._column) - - return self._return_or_inplace(out, inplace=False) + return self._return_or_inplace(self._column.hex_to_integers()) hex_to_int = htoi @@ -188,10 +129,7 @@ def ip2int(self) -> SeriesOrIndex: 2 0 dtype: int64 """ - - out = str_cast.ip2int(self._column) - - return self._return_or_inplace(out, inplace=False) + return self._return_or_inplace(self._column.ipv4_to_integers()) ip_to_int = ip2int @@ -1380,7 +1318,7 @@ def ishex(self) -> SeriesOrIndex: 4 True dtype: bool """ - return self._return_or_inplace(str_cast.is_hex(self._column)) + return self._return_or_inplace(self._column.is_hex()) def istimestamp(self, format: str) -> SeriesOrIndex: """ @@ -1404,9 +1342,7 @@ def istimestamp(self, format: str) -> SeriesOrIndex: 3 False dtype: bool """ - return self._return_or_inplace( - str_cast.istimestamp(self._column, format) - ) + return self._return_or_inplace(self._column.is_timestamp(format)) def isfloat(self) -> SeriesOrIndex: r""" @@ -1957,7 +1893,7 @@ def isipv4(self) -> SeriesOrIndex: 3 False dtype: bool """ - return self._return_or_inplace(str_cast.is_ipv4(self._column)) + return self._return_or_inplace(self._column.is_ipv4()) def lower(self) -> SeriesOrIndex: """ @@ -5822,26 +5758,38 @@ def __contains__(self, item: ScalarLike) -> bool: other = [item] if is_scalar(item) else item return self.contains(column.as_column(other, dtype=self.dtype)).any() - def as_numerical_column( - self, dtype: Dtype - ) -> "cudf.core.column.NumericalColumn": + def as_numerical_column(self, dtype: Dtype) -> NumericalColumn: out_dtype = cudf.api.types.dtype(dtype) - string_col = self - if out_dtype.kind in {"i", "u"}: - if not string_col.is_integer().all(): + if out_dtype.kind == "b": + with acquire_spill_lock(): + plc_column = plc.strings.attributes.count_characters( + self.to_pylibcudf(mode="read") + ) + result = Column.from_pylibcudf(plc_column) + return (result > cudf.Scalar(0, dtype="int8")).fillna(False) + elif out_dtype.kind in {"i", "u"}: + if not self.is_integer().all(): raise ValueError( "Could not convert strings to integer " "type due to presence of non-integer values." ) + cast_func = plc.strings.convert.convert_integers.to_integers elif out_dtype.kind == "f": - if not string_col.is_float().all(): + if not self.is_float().all(): raise ValueError( "Could not convert strings to float " "type due to presence of non-floating values." ) - - result_col = _str_to_numeric_typecast_functions[out_dtype](string_col) - return result_col + cast_func = plc.strings.convert.convert_floats.to_floats + else: + raise ValueError( + f"dtype must be a numerical type, not {out_dtype}" + ) + plc_dtype = dtype_to_pylibcudf_type(out_dtype) + with acquire_spill_lock(): + return type(self).from_pylibcudf( # type: ignore[return-value] + cast_func(self.to_pylibcudf(mode="read"), plc_dtype) + ) def strptime( self, dtype: Dtype, format: str @@ -5876,23 +5824,27 @@ def strptime( raise NotImplementedError( "Cannot parse date-like strings with different formats" ) - valid_ts = str_cast.istimestamp(self, format) + valid_ts = self.is_timestamp(format) valid = valid_ts | is_nat if not valid.all(): raise ValueError(f"Column contains invalid data for {format=}") - casting_func = str_cast.timestamp2int + casting_func = plc.strings.convert.convert_datetime.to_timestamps add_back_nat = is_nat.any() elif dtype.kind == "m": # type: ignore[union-attr] - casting_func = str_cast.timedelta2int + casting_func = plc.strings.convert.convert_durations.to_durations add_back_nat = False - result_col = casting_func(self, dtype, format) + with acquire_spill_lock(): + plc_dtype = dtype_to_pylibcudf_type(dtype) + result_col = type(self).from_pylibcudf( + casting_func(self.to_pylibcudf(mode="read"), plc_dtype, format) + ) if add_back_nat: result_col[is_nat] = None - return result_col + return result_col # type: ignore[return-value] def as_datetime_column( self, dtype: Dtype @@ -6394,15 +6346,15 @@ def detokenize(self, indices: ColumnBase, separator: cudf.Scalar) -> Self: ) ) + @acquire_spill_lock() def _modify_characters( self, method: Callable[[plc.Column], plc.Column] ) -> Self: """ Helper function for methods that modify characters e.g. to_lower """ - with acquire_spill_lock(): - plc_column = method(self.to_pylibcudf(mode="read")) - return cast(Self, Column.from_pylibcudf(plc_column)) + plc_column = method(self.to_pylibcudf(mode="read")) + return cast(Self, Column.from_pylibcudf(plc_column)) def to_lower(self) -> Self: return self._modify_characters(plc.strings.case.to_lower) @@ -6431,6 +6383,46 @@ def replace_multiple(self, pattern: Self, replacements: Self) -> Self: ) return cast(Self, Column.from_pylibcudf(plc_result)) + @acquire_spill_lock() + def is_hex(self) -> NumericalColumn: + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.strings.convert.convert_integers.is_hex( + self.to_pylibcudf(mode="read"), + ) + ) + + @acquire_spill_lock() + def hex_to_integers(self) -> NumericalColumn: + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.strings.convert.convert_integers.hex_to_integers( + self.to_pylibcudf(mode="read"), plc.DataType(plc.TypeId.INT64) + ) + ) + + @acquire_spill_lock() + def is_ipv4(self) -> NumericalColumn: + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.strings.convert.convert_ipv4.is_ipv4( + self.to_pylibcudf(mode="read"), + ) + ) + + @acquire_spill_lock() + def ipv4_to_integers(self) -> NumericalColumn: + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.strings.convert.convert_ipv4.ipv4_to_integers( + self.to_pylibcudf(mode="read"), + ) + ) + + @acquire_spill_lock() + def is_timestamp(self, format: str) -> NumericalColumn: + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.strings.convert.convert_datetime.is_timestamp( + self.to_pylibcudf(mode="read"), format + ) + ) + @acquire_spill_lock() def _split_record_re( self, diff --git a/python/cudf/cudf/core/column/timedelta.py b/python/cudf/cudf/core/column/timedelta.py index 8b1515acae2..417fa99dac0 100644 --- a/python/cudf/cudf/core/column/timedelta.py +++ b/python/cudf/cudf/core/column/timedelta.py @@ -10,9 +10,10 @@ import pandas as pd import pyarrow as pa +import pylibcudf as plc + import cudf import cudf.core.column.column as column -import cudf.core.column.string as string from cudf.api.types import is_scalar from cudf.core._internals import binaryop, unary from cudf.core.buffer import Buffer, acquire_spill_lock @@ -297,9 +298,12 @@ def strftime(self, format: str) -> cudf.core.column.StringColumn: column.column_empty(0, dtype="object"), ) else: - return string._timedelta_to_str_typecast_functions[self.dtype]( - self, format=format - ) + with acquire_spill_lock(): + return type(self).from_pylibcudf( # type: ignore[return-value] + plc.strings.convert.convert_durations.from_durations( + self.to_pylibcudf(mode="read"), format + ) + ) def as_string_column(self) -> cudf.core.column.StringColumn: return self.strftime("%D days %H:%M:%S") diff --git a/python/cudf/cudf/core/tools/numeric.py b/python/cudf/cudf/core/tools/numeric.py index 40348461f8c..6d3dc2dc7d9 100644 --- a/python/cudf/cudf/core/tools/numeric.py +++ b/python/cudf/cudf/core/tools/numeric.py @@ -8,7 +8,6 @@ import pandas as pd import cudf -from cudf import _lib as libcudf from cudf.api.types import _is_non_decimal_numeric_dtype, is_string_dtype from cudf.core._internals import unary from cudf.core.column import as_column @@ -251,9 +250,9 @@ def _convert_str_col( return converted_col.astype(dtype=cudf.dtype("float64")) # type: ignore[return-value] else: if errors == "coerce": - converted_col = libcudf.string_casting.stod(converted_col) non_numerics = is_float.unary_operator("not") converted_col[non_numerics] = None + converted_col = converted_col.astype(np.dtype(np.float64)) # type: ignore[assignment] return converted_col # type: ignore[return-value] else: raise ValueError("Unable to convert some strings to numerics.") From becfacc029393c591654553828990aeca3d242c4 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 16 Dec 2024 22:32:56 -0800 Subject: [PATCH 06/12] Use `[[nodiscard]]` attribute before `__device__` (#17608) Clang-tidy does not like `[[nodiscard]]` after `__device__` and I don't like red squigly lines. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Yunsong Wang (https://github.com/PointKernel) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/17608 --- .../cudf/column/column_device_view.cuh | 12 +++++----- cpp/include/cudf/strings/string_view.hpp | 22 +++++++++---------- cpp/src/strings/regex/regex.cuh | 12 +++++----- cpp/src/strings/regex/regex.inl | 6 ++--- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/cpp/include/cudf/column/column_device_view.cuh b/cpp/include/cudf/column/column_device_view.cuh index ea480b133dc..aacb5ccfede 100644 --- a/cpp/include/cudf/column/column_device_view.cuh +++ b/cpp/include/cudf/column/column_device_view.cuh @@ -444,7 +444,7 @@ class alignas(16) column_device_view : public detail::column_device_view_base { * @return string_view instance representing this element at this index */ template )> - __device__ [[nodiscard]] T element(size_type element_index) const noexcept + [[nodiscard]] __device__ T element(size_type element_index) const noexcept { size_type index = element_index + offset(); // account for this view's _offset char const* d_strings = static_cast(_data); @@ -503,7 +503,7 @@ class alignas(16) column_device_view : public detail::column_device_view_base { * @return dictionary32 instance representing this element at this index */ template )> - __device__ [[nodiscard]] T element(size_type element_index) const noexcept + [[nodiscard]] __device__ T element(size_type element_index) const noexcept { size_type index = element_index + offset(); // account for this view's _offset auto const indices = d_children[0]; @@ -521,7 +521,7 @@ class alignas(16) column_device_view : public detail::column_device_view_base { * @return numeric::fixed_point representing the element at this index */ template ())> - __device__ [[nodiscard]] T element(size_type element_index) const noexcept + [[nodiscard]] __device__ T element(size_type element_index) const noexcept { using namespace numeric; using rep = typename T::rep; @@ -1034,7 +1034,7 @@ class alignas(16) mutable_column_device_view : public detail::column_device_view * @return Reference to the element at the specified index */ template ())> - __device__ [[nodiscard]] T& element(size_type element_index) const noexcept + [[nodiscard]] __device__ T& element(size_type element_index) const noexcept { return data()[element_index]; } @@ -1427,13 +1427,13 @@ struct pair_rep_accessor { private: template , void>* = nullptr> - __device__ [[nodiscard]] inline auto get_rep(cudf::size_type i) const + [[nodiscard]] __device__ inline auto get_rep(cudf::size_type i) const { return col.element(i); } template , void>* = nullptr> - __device__ [[nodiscard]] inline auto get_rep(cudf::size_type i) const + [[nodiscard]] __device__ inline auto get_rep(cudf::size_type i) const { return col.element(i).value(); } diff --git a/cpp/include/cudf/strings/string_view.hpp b/cpp/include/cudf/strings/string_view.hpp index 504c31057ae..33f3176d2c6 100644 --- a/cpp/include/cudf/strings/string_view.hpp +++ b/cpp/include/cudf/strings/string_view.hpp @@ -54,7 +54,7 @@ class string_view { * * @return The number of characters in this string */ - __device__ [[nodiscard]] inline size_type length() const; + [[nodiscard]] __device__ inline size_type length() const; /** * @brief Return a pointer to the internal device array * @@ -119,13 +119,13 @@ class string_view { * * @return new iterator pointing to the beginning of this string */ - __device__ [[nodiscard]] inline const_iterator begin() const; + [[nodiscard]] __device__ inline const_iterator begin() const; /** * @brief Return new iterator pointing past the end of this string * * @return new iterator pointing past the end of this string */ - __device__ [[nodiscard]] inline const_iterator end() const; + [[nodiscard]] __device__ inline const_iterator end() const; /** * @brief Return single UTF-8 character at the given character position @@ -140,7 +140,7 @@ class string_view { * @param pos Character position * @return Byte offset from data() for a given character position */ - __device__ [[nodiscard]] inline size_type byte_offset(size_type pos) const; + [[nodiscard]] __device__ inline size_type byte_offset(size_type pos) const; /** * @brief Comparing target string with this string. Each character is compared @@ -155,7 +155,7 @@ class string_view { * not match is greater in the arg string, or all compared characters * match but the arg string is longer. */ - __device__ [[nodiscard]] inline int compare(string_view const& str) const; + [[nodiscard]] __device__ inline int compare(string_view const& str) const; /** * @brief Comparing target string with this string. Each character is compared * as a UTF-8 code-point value. @@ -225,7 +225,7 @@ class string_view { * Specify -1 to indicate to the end of the string. * @return npos if str is not found in this string. */ - __device__ [[nodiscard]] inline size_type find(string_view const& str, + [[nodiscard]] __device__ inline size_type find(string_view const& str, size_type pos = 0, size_type count = -1) const; /** @@ -253,7 +253,7 @@ class string_view { * Specify -1 to indicate to the end of the string. * @return npos if arg string is not found in this string. */ - __device__ [[nodiscard]] inline size_type find(char_utf8 character, + [[nodiscard]] __device__ inline size_type find(char_utf8 character, size_type pos = 0, size_type count = -1) const; /** @@ -266,7 +266,7 @@ class string_view { * Specify -1 to indicate to the end of the string. * @return npos if arg string is not found in this string. */ - __device__ [[nodiscard]] inline size_type rfind(string_view const& str, + [[nodiscard]] __device__ inline size_type rfind(string_view const& str, size_type pos = 0, size_type count = -1) const; /** @@ -294,7 +294,7 @@ class string_view { * Specify -1 to indicate to the end of the string. * @return npos if arg string is not found in this string. */ - __device__ [[nodiscard]] inline size_type rfind(char_utf8 character, + [[nodiscard]] __device__ inline size_type rfind(char_utf8 character, size_type pos = 0, size_type count = -1) const; @@ -306,7 +306,7 @@ class string_view { * @param length Number of characters from start to include in the sub-string. * @return New instance pointing to a subset of the characters within this instance. */ - __device__ [[nodiscard]] inline string_view substr(size_type start, size_type length) const; + [[nodiscard]] __device__ inline string_view substr(size_type start, size_type length) const; /** * @brief Return minimum value associated with the string type @@ -386,7 +386,7 @@ class string_view { * @param bytepos Byte position from start of _data. * @return The character position for the specified byte. */ - __device__ [[nodiscard]] inline size_type character_offset(size_type bytepos) const; + [[nodiscard]] __device__ inline size_type character_offset(size_type bytepos) const; /** * @brief Common internal implementation for string_view::find and string_view::rfind. diff --git a/cpp/src/strings/regex/regex.cuh b/cpp/src/strings/regex/regex.cuh index 2df404048f7..d22fb04696c 100644 --- a/cpp/src/strings/regex/regex.cuh +++ b/cpp/src/strings/regex/regex.cuh @@ -186,7 +186,7 @@ class reprog_device { * Specify -1 to match any virtual positions past the end of the string. * @return If match found, returns character positions of the matches. */ - __device__ [[nodiscard]] inline match_result find(int32_t const thread_idx, + [[nodiscard]] __device__ inline match_result find(int32_t const thread_idx, string_view const d_str, string_view::const_iterator begin, cudf::size_type end = -1) const; @@ -205,7 +205,7 @@ class reprog_device { * @param group_id The specific group to return its matching position values. * @return If valid, returns the character position of the matched group in the given string, */ - __device__ [[nodiscard]] inline match_result extract(int32_t const thread_idx, + [[nodiscard]] __device__ inline match_result extract(int32_t const thread_idx, string_view const d_str, string_view::const_iterator begin, cudf::size_type end, @@ -225,17 +225,17 @@ class reprog_device { /** * @brief Returns the regex instruction object for a given id. */ - __device__ [[nodiscard]] inline reinst get_inst(int32_t id) const; + [[nodiscard]] __device__ inline reinst get_inst(int32_t id) const; /** * @brief Returns the regex class object for a given id. */ - __device__ [[nodiscard]] inline reclass_device get_class(int32_t id) const; + [[nodiscard]] __device__ inline reclass_device get_class(int32_t id) const; /** * @brief Executes the regex pattern on the given string. */ - __device__ [[nodiscard]] inline match_result regexec(string_view const d_str, + [[nodiscard]] __device__ inline match_result regexec(string_view const d_str, reljunk jnk, string_view::const_iterator begin, cudf::size_type end, @@ -244,7 +244,7 @@ class reprog_device { /** * @brief Utility wrapper to setup state memory structures for calling regexec */ - __device__ [[nodiscard]] inline match_result call_regexec( + [[nodiscard]] __device__ inline match_result call_regexec( int32_t const thread_idx, string_view const d_str, string_view::const_iterator begin, diff --git a/cpp/src/strings/regex/regex.inl b/cpp/src/strings/regex/regex.inl index e34a1e12015..906f09e4d82 100644 --- a/cpp/src/strings/regex/regex.inl +++ b/cpp/src/strings/regex/regex.inl @@ -81,11 +81,11 @@ struct alignas(8) relist { return true; } - __device__ [[nodiscard]] __forceinline__ restate get_state(int16_t idx) const + [[nodiscard]] __device__ __forceinline__ restate get_state(int16_t idx) const { return restate{ranges[idx * stride], inst_ids[idx * stride]}; } - __device__ [[nodiscard]] __forceinline__ int16_t get_size() const { return size; } + [[nodiscard]] __device__ __forceinline__ int16_t get_size() const { return size; } private: int16_t size{}; @@ -101,7 +101,7 @@ struct alignas(8) relist { mask[pos >> 3] |= uc; } - __device__ [[nodiscard]] __forceinline__ bool readMask(int32_t pos) const + [[nodiscard]] __device__ __forceinline__ bool readMask(int32_t pos) const { u_char const uc = mask[pos >> 3]; return static_cast((uc >> (pos & 7)) & 1); From 0058b52ed7882d29267264c6205978427227a44d Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 17 Dec 2024 12:24:45 -0600 Subject: [PATCH 07/12] Fix ``dask_cudf.read_csv`` (#17612) Recent changes in dask and dask-expr have broken `dask_cudf.read_csv` (https://github.com/dask/dask-expr/pull/1178, https://github.com/dask/dask/pull/11603). Fortunately, the breaking changes help us avoid legacy CSV code in the long run. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/17612 --- python/dask_cudf/dask_cudf/backends.py | 36 +++- python/dask_cudf/dask_cudf/io/csv.py | 195 +++++++++++++++++- .../dask_cudf/dask_cudf/io/tests/test_csv.py | 9 - 3 files changed, 215 insertions(+), 25 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 962a229a839..fceaaf185e8 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -714,21 +714,35 @@ def read_csv( storage_options=None, **kwargs, ): - import dask_expr as dx - from fsspec.utils import stringify_path + try: + # TODO: Remove when cudf is pinned to dask>2024.12.0 + import dask_expr as dx + from dask_expr.io.csv import ReadCSV + from fsspec.utils import stringify_path + + if not isinstance(path, str): + path = stringify_path(path) + return dx.new_collection( + ReadCSV( + path, + dtype_backend=dtype_backend, + storage_options=storage_options, + kwargs=kwargs, + header=header, + dataframe_backend="cudf", + ) + ) + except ImportError: + # Requires dask>2024.12.0 + from dask_cudf.io.csv import read_csv - if not isinstance(path, str): - path = stringify_path(path) - return dx.new_collection( - dx.io.csv.ReadCSV( + return read_csv( path, - dtype_backend=dtype_backend, - storage_options=storage_options, - kwargs=kwargs, + *args, header=header, - dataframe_backend="cudf", + storage_options=storage_options, + **kwargs, ) - ) @staticmethod def read_json(*args, **kwargs): diff --git a/python/dask_cudf/dask_cudf/io/csv.py b/python/dask_cudf/dask_cudf/io/csv.py index b22b31a591f..29f98b14511 100644 --- a/python/dask_cudf/dask_cudf/io/csv.py +++ b/python/dask_cudf/dask_cudf/io/csv.py @@ -1,8 +1,193 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from dask_cudf import _deprecated_api +import os +from glob import glob +from warnings import warn -read_csv = _deprecated_api( - "dask_cudf.io.csv.read_csv", - new_api="dask_cudf.read_csv", -) +from fsspec.utils import infer_compression + +from dask import dataframe as dd +from dask.dataframe.io.csv import make_reader +from dask.utils import parse_bytes + +import cudf + + +def read_csv(path, blocksize="default", **kwargs): + """ + Read CSV files into a :class:`.DataFrame`. + + This API parallelizes the :func:`cudf:cudf.read_csv` function in + the following ways: + + It supports loading many files at once using globstrings: + + >>> import dask_cudf + >>> df = dask_cudf.read_csv("myfiles.*.csv") + + In some cases it can break up large files: + + >>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB") + + It can read CSV files from external resources (e.g. S3, HTTP, FTP) + + >>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv") + >>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv") + + Internally ``read_csv`` uses :func:`cudf:cudf.read_csv` and + supports many of the same keyword arguments with the same + performance guarantees. See the docstring for + :func:`cudf:cudf.read_csv` for more information on available + keyword arguments. + + Parameters + ---------- + path : str, path object, or file-like object + Either a path to a file (a str, :py:class:`pathlib.Path`, or + py._path.local.LocalPath), URL (including http, ftp, and S3 + locations), or any object with a read() method (such as + builtin :py:func:`open` file handler function or + :py:class:`~io.StringIO`). + blocksize : int or str, default "256 MiB" + The target task partition size. If ``None``, a single block + is used for each file. + **kwargs : dict + Passthrough key-word arguments that are sent to + :func:`cudf:cudf.read_csv`. + + Notes + ----- + If any of `skipfooter`/`skiprows`/`nrows` are passed, + `blocksize` will default to None. + + Examples + -------- + >>> import dask_cudf + >>> ddf = dask_cudf.read_csv("sample.csv", usecols=["a", "b"]) + >>> ddf.compute() + a b + 0 1 hi + 1 2 hello + 2 3 ai + + """ + # Set default `blocksize` + if blocksize == "default": + if ( + kwargs.get("skipfooter", 0) != 0 + or kwargs.get("skiprows", 0) != 0 + or kwargs.get("nrows", None) is not None + ): + # Cannot read in blocks if skipfooter, + # skiprows or nrows is passed. + blocksize = None + else: + blocksize = "256 MiB" + + if "://" in str(path): + func = make_reader(cudf.read_csv, "read_csv", "CSV") + return func(path, blocksize=blocksize, **kwargs) + else: + return _internal_read_csv(path=path, blocksize=blocksize, **kwargs) + + +def _internal_read_csv(path, blocksize="256 MiB", **kwargs): + if isinstance(blocksize, str): + blocksize = parse_bytes(blocksize) + + if isinstance(path, list): + filenames = path + elif isinstance(path, str): + filenames = sorted(glob(path)) + elif hasattr(path, "__fspath__"): + filenames = sorted(glob(path.__fspath__())) + else: + raise TypeError(f"Path type not understood:{type(path)}") + + if not filenames: + msg = f"A file in: {filenames} does not exist." + raise FileNotFoundError(msg) + + compression = kwargs.get("compression", "infer") + + if compression == "infer": + # Infer compression from first path by default + compression = infer_compression(filenames[0]) + + if compression and blocksize: + # compressed CSVs reading must read the entire file + kwargs.pop("byte_range", None) + warn( + "Warning %s compression does not support breaking apart files\n" + "Please ensure that each individual file can fit in memory and\n" + "use the keyword ``blocksize=None to remove this message``\n" + "Setting ``blocksize=(size of file)``" % compression + ) + blocksize = None + + if blocksize is None: + return read_csv_without_blocksize(path, **kwargs) + + # Let dask.dataframe generate meta + dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV") + kwargs1 = kwargs.copy() + usecols = kwargs1.pop("usecols", None) + dtype = kwargs1.pop("dtype", None) + meta = dask_reader(filenames[0], **kwargs1)._meta + names = meta.columns + if usecols or dtype: + # Regenerate meta with original kwargs if + # `usecols` or `dtype` was specified + meta = dask_reader(filenames[0], **kwargs)._meta + + i = 0 + path_list = [] + kwargs_list = [] + for fn in filenames: + size = os.path.getsize(fn) + for start in range(0, size, blocksize): + kwargs2 = kwargs.copy() + kwargs2["byte_range"] = ( + start, + blocksize, + ) # specify which chunk of the file we care about + if start != 0: + kwargs2["names"] = names # no header in the middle of the file + kwargs2["header"] = None + path_list.append(fn) + kwargs_list.append(kwargs2) + i += 1 + + return dd.from_map(_read_csv, path_list, kwargs_list, meta=meta) + + +def _read_csv(fn, kwargs): + return cudf.read_csv(fn, **kwargs) + + +def read_csv_without_blocksize(path, **kwargs): + """Read entire CSV with optional compression (gzip/zip) + + Parameters + ---------- + path : str + path to files (support for glob) + """ + if isinstance(path, list): + filenames = path + elif isinstance(path, str): + filenames = sorted(glob(path)) + elif hasattr(path, "__fspath__"): + filenames = sorted(glob(path.__fspath__())) + else: + raise TypeError(f"Path type not understood:{type(path)}") + + meta_kwargs = kwargs.copy() + if "skipfooter" in meta_kwargs: + meta_kwargs.pop("skipfooter") + if "nrows" in meta_kwargs: + meta_kwargs.pop("nrows") + # Read "head" of first file (first 5 rows). + # Convert to empty df for metadata. + meta = cudf.read_csv(filenames[0], nrows=5, **meta_kwargs).iloc[:0] + return dd.from_map(cudf.read_csv, filenames, meta=meta, **kwargs) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_csv.py b/python/dask_cudf/dask_cudf/io/tests/test_csv.py index a0acb86f5a9..ddfd1c1adac 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_csv.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_csv.py @@ -185,11 +185,6 @@ def test_read_csv_blocksize_none(tmp_path, compression, size): df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ) dd.assert_eq(df, df2) - # Test chunksize deprecation - with pytest.warns(FutureWarning, match="deprecated"): - df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ) - dd.assert_eq(df, df3) - @pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None]) def test_csv_reader_usecols(tmp_path, dtype): @@ -275,7 +270,3 @@ def test_deprecated_api_paths(tmp_path): with pytest.warns(match="dask_cudf.io.read_csv is now deprecated"): df2 = dask_cudf.io.read_csv(csv_path) dd.assert_eq(df, df2, check_divisions=False) - - with pytest.warns(match="dask_cudf.io.csv.read_csv is now deprecated"): - df2 = dask_cudf.io.csv.read_csv(csv_path) - dd.assert_eq(df, df2, check_divisions=False) From e5753e3a0c2d161477de5edabe91b3f013246187 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Tue, 17 Dec 2024 14:02:57 -0500 Subject: [PATCH 08/12] Add Avro Reader options classes to pylibcudf (#17599) Apart of #17565 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/17599 --- python/cudf/cudf/io/avro.py | 17 +- python/pylibcudf/pylibcudf/io/avro.pxd | 25 ++- python/pylibcudf/pylibcudf/io/avro.pyi | 21 ++- python/pylibcudf/pylibcudf/io/avro.pyx | 156 ++++++++++++++---- .../pylibcudf/pylibcudf/tests/io/test_avro.py | 13 +- 5 files changed, 173 insertions(+), 59 deletions(-) diff --git a/python/cudf/cudf/io/avro.py b/python/cudf/cudf/io/avro.py index 11730e98c95..4966cdb86e1 100644 --- a/python/cudf/cudf/io/avro.py +++ b/python/cudf/cudf/io/avro.py @@ -33,11 +33,18 @@ def read_avro( if not isinstance(skip_rows, int) or skip_rows < 0: raise TypeError("skip_rows must be an int >= 0") - plc_result = plc.io.avro.read_avro( - plc.io.types.SourceInfo([filepath_or_buffer]), - columns, - skip_rows, - num_rows, + options = ( + plc.io.avro.AvroReaderOptions.builder( + plc.io.types.SourceInfo([filepath_or_buffer]) + ) + .skip_rows(skip_rows) + .num_rows(num_rows) + .build() ) + if columns is not None and len(columns) > 0: + options.set_columns(columns) + + plc_result = plc.io.avro.read_avro(options) + return cudf.DataFrame._from_data(*data_from_pylibcudf_io(plc_result)) diff --git a/python/pylibcudf/pylibcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/io/avro.pxd index 8696fcb3c15..a0fca95d459 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/io/avro.pxd @@ -1,12 +1,23 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from pylibcudf.io.types cimport SourceInfo, TableWithMetadata -from pylibcudf.libcudf.io.avro cimport avro_reader_options +from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder from pylibcudf.libcudf.types cimport size_type -cpdef TableWithMetadata read_avro( - SourceInfo source_info, - list columns = *, - size_type skip_rows = *, - size_type num_rows = * -) +from pylibcudf.libcudf.types cimport size_type + +cdef class AvroReaderOptions: + cdef avro_reader_options c_obj + cdef SourceInfo source + cpdef void set_columns(self, list col_names) + + +cdef class AvroReaderOptionsBuilder: + cdef avro_reader_options_builder c_obj + cdef SourceInfo source + cpdef AvroReaderOptionsBuilder columns(self, list col_names) + cpdef AvroReaderOptionsBuilder skip_rows(self, size_type skip_rows) + cpdef AvroReaderOptionsBuilder num_rows(self, size_type num_rows) + cpdef AvroReaderOptions build(self) + +cpdef TableWithMetadata read_avro(AvroReaderOptions options) diff --git a/python/pylibcudf/pylibcudf/io/avro.pyi b/python/pylibcudf/pylibcudf/io/avro.pyi index 49c2f083702..8cafc9a6573 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyi +++ b/python/pylibcudf/pylibcudf/io/avro.pyi @@ -1,11 +1,16 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from pylibcudf.io.types import SourceInfo, TableWithMetadata -__all__ = ["read_avro"] - -def read_avro( - source_info: SourceInfo, - columns: list[str] | None = None, - skip_rows: int = 0, - num_rows: int = -1, -) -> TableWithMetadata: ... +__all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"] + +class AvroReaderOptions: + @staticmethod + def builder(source: SourceInfo) -> AvroReaderOptionsBuilder: ... + +class AvroReaderOptionsBuilder: + def columns(col_names: list[str]) -> AvroReaderOptionsBuilder: ... + def skip_rows(skip_rows: int) -> AvroReaderOptionsBuilder: ... + def num_rows(num_rows: int) -> AvroReaderOptionsBuilder: ... + def build(self) -> AvroReaderOptions: ... + +def read_avro(options: AvroReaderOptions) -> TableWithMetadata: ... diff --git a/python/pylibcudf/pylibcudf/io/avro.pyx b/python/pylibcudf/pylibcudf/io/avro.pyx index 4271333511a..c378fca0415 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyx +++ b/python/pylibcudf/pylibcudf/io/avro.pyx @@ -10,52 +10,138 @@ from pylibcudf.libcudf.io.avro cimport ( ) from pylibcudf.libcudf.types cimport size_type -__all__ = ["read_avro"] +__all__ = ["read_avro", "AvroReaderOptions", "AvroReaderOptionsBuilder"] + + +cdef class AvroReaderOptions: + """ + The settings to use for ``read_avro`` + For details, see :cpp:class:`cudf::io::avro_reader_options` + """ + @staticmethod + def builder(SourceInfo source): + """ + Create a AvroWriterOptionsBuilder object + + For details, see :cpp:func:`cudf::io::avro_reader_options::builder` + + Parameters + ---------- + sink : SourceInfo + The source to read the Avro file from. + + Returns + ------- + AvroReaderOptionsBuilder + Builder to build AvroReaderOptions + """ + cdef AvroReaderOptionsBuilder avro_builder = AvroReaderOptionsBuilder.__new__( + AvroReaderOptionsBuilder + ) + avro_builder.c_obj = avro_reader_options.builder(source.c_obj) + avro_builder.source = source + return avro_builder + + cpdef void set_columns(self, list col_names): + """ + Set names of the column to be read. + + Parameters + ---------- + col_names : list[str] + List of column names + + Returns + ------- + None + """ + cdef vector[string] vec + vec.reserve(len(col_names)) + for name in col_names: + vec.push_back(str(name).encode()) + self.c_obj.set_columns(vec) + + +cdef class AvroReaderOptionsBuilder: + cpdef AvroReaderOptionsBuilder columns(self, list col_names): + """ + Set names of the column to be read. + + Parameters + ---------- + col_names : list + List of column names + + Returns + ------- + AvroReaderOptionsBuilder + """ + cdef vector[string] vec + vec.reserve(len(col_names)) + for name in col_names: + vec.push_back(str(name).encode()) + self.c_obj.columns(vec) + return self + + cpdef AvroReaderOptionsBuilder skip_rows(self, size_type skip_rows): + """ + Sets number of rows to skip. + + Parameters + ---------- + skip_rows : size_type + Number of rows to skip from start + + Returns + ------- + AvroReaderOptionsBuilder + """ + self.c_obj.skip_rows(skip_rows) + return self + + cpdef AvroReaderOptionsBuilder num_rows(self, size_type num_rows): + """ + Sets number of rows to read. + + Parameters + ---------- + num_rows : size_type + Number of rows to read after skip + + Returns + ------- + AvroReaderOptionsBuilder + """ + self.c_obj.num_rows(num_rows) + return self + + cpdef AvroReaderOptions build(self): + """Create a AvroReaderOptions object""" + cdef AvroReaderOptions avro_options = AvroReaderOptions.__new__( + AvroReaderOptions + ) + avro_options.c_obj = move(self.c_obj.build()) + avro_options.source = self.source + return avro_options cpdef TableWithMetadata read_avro( - SourceInfo source_info, - list columns = None, - size_type skip_rows = 0, - size_type num_rows = -1 + AvroReaderOptions options ): """ - Reads an Avro dataset into a :py:class:`~.types.TableWithMetadata`. + Read from Avro format. + + The source to read from and options are encapsulated + by the `options` object. For details, see :cpp:func:`read_avro`. Parameters ---------- - source_info: SourceInfo - The SourceInfo object to read the avro dataset from. - columns: list, default None - Optional columns to read, if not provided, reads all columns in the file. - skip_rows: size_type, default 0 - The number of rows to skip. - num_rows: size_type, default -1 - The number of rows to read, after skipping rows. - If -1 is passed, all rows will be read. - - Returns - ------- - TableWithMetadata - The Table and its corresponding metadata (column names) that were read in. + options: AvroReaderOptions + Settings for controlling reading behavior """ - cdef vector[string] c_columns - if columns is not None and len(columns) > 0: - c_columns.reserve(len(columns)) - for col in columns: - c_columns.push_back(str(col).encode()) - - cdef avro_reader_options avro_opts = ( - avro_reader_options.builder(source_info.c_obj) - .columns(c_columns) - .skip_rows(skip_rows) - .num_rows(num_rows) - .build() - ) - with nogil: - c_result = move(cpp_read_avro(avro_opts)) + c_result = move(cpp_read_avro(options.c_obj)) return TableWithMetadata.from_libcudf(c_result) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_avro.py b/python/pylibcudf/pylibcudf/tests/io/test_avro.py index 3d9d99ffa61..bda8921b62a 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_avro.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_avro.py @@ -98,10 +98,15 @@ def test_read_avro(avro_dtypes, avro_dtype_data, row_opts, columns, nullable): buffer.seek(0) res = plc.io.avro.read_avro( - plc.io.types.SourceInfo([buffer]), - columns=columns, - skip_rows=skip_rows, - num_rows=num_rows, + ( + plc.io.avro.AvroReaderOptions.builder( + plc.io.types.SourceInfo([buffer]) + ) + .columns(columns) + .skip_rows(skip_rows) + .num_rows(num_rows) + .build() + ) ) expected = pa.Table.from_arrays( From d7425993d86b92a586ec600ec2ed8a0984a9699a Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Wed, 18 Dec 2024 01:46:24 +0530 Subject: [PATCH 09/12] Bump the oldest `pyarrow` version to `14.0.2` in test matrix (#17611) A recent nightly failure discovered by @davidwendt here: https://github.com/rapidsai/cudf/actions/runs/12367794950/job/34543121050 indicates an environment cannot be created with `pytorch>=2.4.0` and `pyarrow==14.0.0 & 14.0.1`. Thus this bump to `14.0.2`. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17611 --- dependencies.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dependencies.yaml b/dependencies.yaml index 44767f1e9d3..7a83efc6e3d 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -878,7 +878,7 @@ dependencies: - matrix: {dependencies: "oldest"} packages: - numpy==1.23.* - - pyarrow==14.0.0 + - pyarrow==14.* - matrix: packages: - output_types: conda @@ -903,7 +903,7 @@ dependencies: - matrix: {dependencies: "oldest"} packages: - numpy==1.24.* - - pyarrow==14.0.1 + - pyarrow==14.* - matrix: packages: test_python_cudf_polars: From 24aacb22a3cfce1562f2e92d2fcbdd17eccf7888 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 17 Dec 2024 13:37:19 -0800 Subject: [PATCH 10/12] A couple of fixes in rapids-logger usage (#17588) This PR has two fixes: - Since we're pinning to a commit, a shallow clone will start failing as soon as HEAD gets bumped on the main branch (which will happen next when cuml/raft logging features are merged). We need to stop using shallow clones. - The CMake code for setting the default logging levels was setting the wrong macro name. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17588 --- cpp/CMakeLists.txt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 78f529a44d3..9cbacee8e8d 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -276,7 +276,7 @@ rapids_cpm_init() # Not using rapids-cmake since we never want to find, always download. CPMAddPackage( - NAME rapids_logger GITHUB_REPOSITORY rapidsai/rapids-logger GIT_SHALLOW TRUE GIT_TAG + NAME rapids_logger GITHUB_REPOSITORY rapidsai/rapids-logger GIT_SHALLOW FALSE GIT_TAG c510947ae9d3a67530cfe3e5eaccb5a3b8ea0e55 VERSION c510947ae9d3a67530cfe3e5eaccb5a3b8ea0e55 ) rapids_make_logger(cudf EXPORT_SET cudf-exports) @@ -916,7 +916,9 @@ if(CUDF_LARGE_STRINGS_DISABLED) endif() # Define logging level -target_compile_definitions(cudf PRIVATE "CUDF_LOG_ACTIVE_LEVEL=${LIBCUDF_LOGGING_LEVEL}") +target_compile_definitions( + cudf PRIVATE "CUDF_LOG_ACTIVE_LEVEL=CUDF_LOG_LEVEL_${LIBCUDF_LOGGING_LEVEL}" +) # Enable remote IO through KvikIO target_compile_definitions(cudf PRIVATE $<$:CUDF_KVIKIO_REMOTE_IO>) From 267c7f236a9996dbd2e45cd6355bfeac1a9220d3 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Tue, 17 Dec 2024 17:49:11 -0500 Subject: [PATCH 11/12] Fix memcheck error in ReplaceTest.NormalizeNansAndZerosMutable gtest (#17610) Fixes memcheck error found in nightly build checks in the STREAM_REPLACE_TEST's `ReplaceTest.NormalizeNansAndZerosMutable` gtest. The mutable-view passed to the `cudf::normalize_nans_and_zeros` API was pointing to invalidated data. The following line created the invalid view ``` cudf::mutable_column_view mutable_view = cudf::column(input, cudf::test::get_default_stream()); ``` The temporary `cudf::column` is destroyed once the `mutable_view` is created so this view would now point to a freed column. The view must be created from a non-temporary column and also must be non-temporary itself so that it is not implicitly converted to a `column_view`. Error introduced by #17436 Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/17610 --- cpp/tests/streams/replace_test.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/tests/streams/replace_test.cpp b/cpp/tests/streams/replace_test.cpp index 89f76237de6..e3fdc177b50 100644 --- a/cpp/tests/streams/replace_test.cpp +++ b/cpp/tests/streams/replace_test.cpp @@ -104,9 +104,9 @@ TEST_F(ReplaceTest, NormalizeNansAndZeros) TEST_F(ReplaceTest, NormalizeNansAndZerosMutable) { - auto nan = std::numeric_limits::quiet_NaN(); - auto input_column = cudf::test::make_type_param_vector({-0.0, 0.0, -nan, nan, nan}); - cudf::test::fixed_width_column_wrapper input(input_column.begin(), input_column.end()); - cudf::mutable_column_view mutable_view = cudf::column(input, cudf::test::get_default_stream()); - cudf::normalize_nans_and_zeros(mutable_view, cudf::test::get_default_stream()); + auto nan = std::numeric_limits::quiet_NaN(); + auto data = cudf::test::make_type_param_vector({-0.0, 0.0, -nan, nan, nan}); + auto input = cudf::test::fixed_width_column_wrapper(data.begin(), data.end()).release(); + auto view = input->mutable_view(); + cudf::normalize_nans_and_zeros(view, cudf::test::get_default_stream()); } From b9760ac12b593521b7afb803f0d40d5e7996e01a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:01:45 -0800 Subject: [PATCH 12/12] Remove cudf._lib.interop in favor of inlining pylibcudf (#17555) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/17555 --- python/cudf/cudf/_lib/CMakeLists.txt | 9 +- python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/interop.pyx | 111 ----------------------- python/cudf/cudf/core/column/column.py | 48 ++++++---- python/cudf/cudf/core/column/datetime.py | 2 +- python/cudf/cudf/core/column/decimal.py | 10 +- python/cudf/cudf/core/column/lists.py | 4 +- python/cudf/cudf/core/frame.py | 15 +-- python/cudf/cudf/io/dlpack.py | 27 +++--- 9 files changed, 62 insertions(+), 165 deletions(-) delete mode 100644 python/cudf/cudf/_lib/interop.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index bfbfbfed333..410fd57691e 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,9 +12,7 @@ # the License. # ============================================================================= -set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx strings_udf.pyx types.pyx - utils.pyx -) +set(cython_sources column.pyx groupby.pyx scalar.pyx strings_udf.pyx types.pyx utils.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( @@ -24,8 +22,3 @@ rapids_cython_create_modules( ) target_link_libraries(strings_udf PUBLIC cudf_strings_udf) -target_include_directories(interop PUBLIC "$") - -include(${rapids-cmake-dir}/export/find_package_root.cmake) -include(../../../../cpp/cmake/thirdparty/get_nanoarrow.cmake) -target_link_libraries(interop PUBLIC nanoarrow) diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index e18e05cc43e..6b5a7814e48 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -3,7 +3,6 @@ from . import ( groupby, - interop, strings_udf, ) diff --git a/python/cudf/cudf/_lib/interop.pyx b/python/cudf/cudf/_lib/interop.pyx deleted file mode 100644 index 1c9d3a01b80..00000000000 --- a/python/cudf/cudf/_lib/interop.pyx +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -import pylibcudf - -from cudf._lib.utils cimport columns_from_pylibcudf_table - -from cudf.core.buffer import acquire_spill_lock -from cudf.core.dtypes import ListDtype, StructDtype - - -def from_dlpack(object dlpack_capsule): - """ - Converts a DLPack Tensor PyCapsule into a list of columns. - - DLPack Tensor PyCapsule is expected to have the name "dltensor". - """ - return columns_from_pylibcudf_table( - pylibcudf.interop.from_dlpack(dlpack_capsule) - ) - - -def to_dlpack(list source_columns): - """ - Converts a list of columns into a DLPack Tensor PyCapsule. - - DLPack Tensor PyCapsule will have the name "dltensor". - """ - return pylibcudf.interop.to_dlpack( - pylibcudf.Table( - [col.to_pylibcudf(mode="read") for col in source_columns] - ) - ) - - -def gather_metadata(object cols_dtypes): - """ - Generates a ColumnMetadata vector for each column. - - Parameters - ---------- - cols_dtypes : iterable - An iterable of ``(column_name, dtype)`` pairs. - """ - cpp_metadata = [] - if cols_dtypes is not None: - for idx, (col_name, col_dtype) in enumerate(cols_dtypes): - cpp_metadata.append(pylibcudf.interop.ColumnMetadata(col_name)) - if isinstance(col_dtype, (ListDtype, StructDtype)): - _set_col_children_metadata(col_dtype, cpp_metadata[idx]) - else: - raise TypeError( - "An iterable of (column_name, dtype) pairs is required to " - "construct column_metadata" - ) - return cpp_metadata - - -def _set_col_children_metadata(dtype, col_meta): - if isinstance(dtype, StructDtype): - for name, value in dtype.fields.items(): - element_metadata = pylibcudf.interop.ColumnMetadata(name) - _set_col_children_metadata(value, element_metadata) - col_meta.children_meta.append(element_metadata) - elif isinstance(dtype, ListDtype): - # Offsets - child 0 - col_meta.children_meta.append(pylibcudf.interop.ColumnMetadata()) - - # Element column - child 1 - element_metadata = pylibcudf.interop.ColumnMetadata() - _set_col_children_metadata(dtype.element_type, element_metadata) - col_meta.children_meta.append(element_metadata) - else: - col_meta.children_meta.append(pylibcudf.interop.ColumnMetadata()) - - -@acquire_spill_lock() -def to_arrow(list source_columns, object column_dtypes): - """Convert a list of columns from - cudf Frame to a PyArrow Table. - - Parameters - ---------- - source_columns : a list of columns to convert - column_dtypes : Iterable of ``(column_name, column_dtype)`` pairs - - Returns - ------- - pyarrow table - """ - cpp_metadata = gather_metadata(column_dtypes) - return pylibcudf.interop.to_arrow( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in source_columns]), - cpp_metadata, - ) - - -@acquire_spill_lock() -def from_arrow(object input_table): - """Convert from PyArrow Table to a list of columns. - - Parameters - ---------- - input_table : PyArrow table - - Returns - ------- - A list of columns to construct Frame object - """ - return columns_from_pylibcudf_table( - pylibcudf.interop.from_arrow(input_table) - ) diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 2515157253c..cccafaeba88 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -279,6 +279,7 @@ def dropna(self) -> Self: else: return self.copy() + @acquire_spill_lock() def to_arrow(self) -> pa.Array: """Convert to PyArrow Array @@ -295,9 +296,7 @@ def to_arrow(self) -> pa.Array: 4 ] """ - return libcudf.interop.to_arrow([self], [("None", self.dtype)])[ - "None" - ].chunk(0) + return plc.interop.to_arrow(self.to_pylibcudf(mode="read")).chunk(0) @classmethod def from_arrow(cls, array: pa.Array) -> ColumnBase: @@ -334,26 +333,33 @@ def from_arrow(cls, array: pa.Array) -> ColumnBase: if isinstance(array.type, pa.DictionaryType): indices_table = pa.table( - { - "None": pa.chunked_array( - [chunk.indices for chunk in data["None"].chunks], + [ + pa.chunked_array( + [chunk.indices for chunk in data.column(0).chunks], type=array.type.index_type, ) - } + ], + [None], ) dictionaries_table = pa.table( - { - "None": pa.chunked_array( - [chunk.dictionary for chunk in data["None"].chunks], + [ + pa.chunked_array( + [chunk.dictionary for chunk in data.column(0).chunks], type=array.type.value_type, ) - } + ], + [None], ) - - codes = libcudf.interop.from_arrow(indices_table)[0] - categories = libcudf.interop.from_arrow(dictionaries_table)[0] + with acquire_spill_lock(): + codes = cls.from_pylibcudf( + plc.interop.from_arrow(indices_table).columns()[0] + ) + categories = cls.from_pylibcudf( + plc.interop.from_arrow(dictionaries_table).columns()[0] + ) codes = cudf.core.column.categorical.as_unsigned_codes( - len(categories), codes + len(categories), + codes, # type: ignore[arg-type] ) return cudf.core.column.CategoricalColumn( data=None, @@ -364,10 +370,14 @@ def from_arrow(cls, array: pa.Array) -> ColumnBase: mask=codes.base_mask, children=(codes,), ) - - result = libcudf.interop.from_arrow(data)[0] - - return result._with_type_metadata(cudf_dtype_from_pa_type(array.type)) + else: + result = cls.from_pylibcudf( + plc.interop.from_arrow(data).columns()[0] + ) + # TODO: cudf_dtype_from_pa_type may be less necessary for some types + return result._with_type_metadata( + cudf_dtype_from_pa_type(array.type) + ) @acquire_spill_lock() def _get_mask_as_column(self) -> ColumnBase: diff --git a/python/cudf/cudf/core/column/datetime.py b/python/cudf/cudf/core/column/datetime.py index 1a820da3c62..b6a4122ebb9 100644 --- a/python/cudf/cudf/core/column/datetime.py +++ b/python/cudf/cudf/core/column/datetime.py @@ -1016,7 +1016,7 @@ def to_pandas( self.dtype.tz, ambiguous="NaT", nonexistent="NaT" ) - def to_arrow(self): + def to_arrow(self) -> pa.Array: return pa.compute.assume_timezone( self._local_time.to_arrow(), str(self.dtype.tz) ) diff --git a/python/cudf/cudf/core/column/decimal.py b/python/cudf/cudf/core/column/decimal.py index 9e6a73f1a9c..09941665ba2 100644 --- a/python/cudf/cudf/core/column/decimal.py +++ b/python/cudf/cudf/core/column/decimal.py @@ -269,8 +269,8 @@ def from_arrow(cls, data: pa.Array): mask=mask, ) - def to_arrow(self): - data_buf_32 = np.array(self.base_data.memoryview()).view("int32") + def to_arrow(self) -> pa.Array: + data_buf_32 = np.array(self.base_data.memoryview()).view("int32") # type: ignore[union-attr] data_buf_128 = np.empty(len(data_buf_32) * 4, dtype="int32") # use striding to set the first 32 bits of each 128-bit chunk: @@ -337,7 +337,7 @@ def from_arrow(cls, data: pa.Array): result.dtype.precision = data.type.precision return result - def to_arrow(self): + def to_arrow(self) -> pa.Array: return super().to_arrow().cast(self.dtype.to_arrow()) def _with_type_metadata( @@ -396,8 +396,8 @@ def from_arrow(cls, data: pa.Array): mask=mask, ) - def to_arrow(self): - data_buf_64 = np.array(self.base_data.memoryview()).view("int64") + def to_arrow(self) -> pa.Array: + data_buf_64 = np.array(self.base_data.memoryview()).view("int64") # type: ignore[union-attr] data_buf_128 = np.empty(len(data_buf_64) * 2, dtype="int64") # use striding to set the first 64 bits of each 128-bit chunk: diff --git a/python/cudf/cudf/core/column/lists.py b/python/cudf/cudf/core/column/lists.py index ba98e28f6a2..3d9440cdf21 100644 --- a/python/cudf/cudf/core/column/lists.py +++ b/python/cudf/cudf/core/column/lists.py @@ -150,7 +150,7 @@ def offsets(self) -> NumericalColumn: """ return cast(NumericalColumn, self.children[0]) - def to_arrow(self): + def to_arrow(self) -> pa.Array: offsets = self.offsets.to_arrow() elements = ( pa.nulls(len(self.elements)) @@ -160,7 +160,7 @@ def to_arrow(self): pa_type = pa.list_(elements.type) if self.nullable: - nbuf = pa.py_buffer(self.mask.memoryview()) + nbuf = pa.py_buffer(self.mask.memoryview()) # type: ignore[union-attr] buffers = (nbuf, offsets.buffers()[1]) else: buffers = offsets.buffers() diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index ba9b15667f1..9aadbf8f47a 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -946,16 +946,17 @@ def from_arrow(cls, data: pa.Table) -> Self: if len(dict_indices): dict_indices_table = pa.table(dict_indices) data = data.drop(dict_indices_table.column_names) - indices_columns = libcudf.interop.from_arrow(dict_indices_table) + plc_indices = plc.interop.from_arrow(dict_indices_table) # as dictionary size can vary, it can't be a single table cudf_dictionaries_columns = { name: ColumnBase.from_arrow(dict_dictionaries[name]) for name in dict_dictionaries.keys() } - for name, codes in zip( - dict_indices_table.column_names, indices_columns + for name, plc_codes in zip( + dict_indices_table.column_names, plc_indices.columns() ): + codes = libcudf.column.Column.from_pylibcudf(plc_codes) categories = cudf_dictionaries_columns[name] codes = as_unsigned_codes(len(categories), codes) cudf_category_frame[name] = CategoricalColumn( @@ -971,9 +972,9 @@ def from_arrow(cls, data: pa.Table) -> Self: # Handle non-dict arrays cudf_non_category_frame = { - name: col - for name, col in zip( - data.column_names, libcudf.interop.from_arrow(data) + name: libcudf.column.Column.from_pylibcudf(plc_col) + for name, plc_col in zip( + data.column_names, plc.interop.from_arrow(data).columns() ) } @@ -1032,7 +1033,7 @@ def from_arrow(cls, data: pa.Table) -> Self: return cls._from_data({name: result[name] for name in column_names}) @_performance_tracking - def to_arrow(self): + def to_arrow(self) -> pa.Table: """ Convert to arrow Table diff --git a/python/cudf/cudf/io/dlpack.py b/python/cudf/cudf/io/dlpack.py index fe8e446f9c0..3b3fd5f7c56 100644 --- a/python/cudf/cudf/io/dlpack.py +++ b/python/cudf/cudf/io/dlpack.py @@ -1,13 +1,14 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. +from __future__ import annotations +import pylibcudf as plc import cudf -from cudf._lib import interop as libdlpack from cudf.core.column import ColumnBase from cudf.utils import ioutils -def from_dlpack(pycapsule_obj): +def from_dlpack(pycapsule_obj) -> cudf.Series | cudf.DataFrame: """Converts from a DLPack tensor to a cuDF object. DLPack is an open-source memory tensor structure: @@ -33,18 +34,21 @@ def from_dlpack(pycapsule_obj): cuDF from_dlpack() assumes column-major (Fortran order) input. If the input tensor is row-major, transpose it before passing it to this function. """ + plc_table = plc.interop.from_dlpack(pycapsule_obj) + data = dict( + enumerate( + (ColumnBase.from_pylibcudf(col) for col in plc_table.columns()) + ) + ) - columns = libdlpack.from_dlpack(pycapsule_obj) - data = dict(enumerate(columns)) - - if len(columns) == 1: + if len(data) == 1: return cudf.Series._from_data(data) else: return cudf.DataFrame._from_data(data) @ioutils.doc_to_dlpack() -def to_dlpack(cudf_obj): +def to_dlpack(cudf_obj: cudf.Series | cudf.DataFrame | cudf.BaseIndex): """Converts a cuDF object to a DLPack tensor. DLPack is an open-source memory tensor structure: @@ -80,13 +84,14 @@ def to_dlpack(cudf_obj): if any( not cudf.api.types._is_non_decimal_numeric_dtype(dtype) - for _, dtype in gdf._dtypes + for _, dtype in gdf._dtypes # type: ignore[union-attr] ): raise TypeError("non-numeric data not yet supported") dtype = cudf.utils.dtypes.find_common_type( - [dtype for _, dtype in gdf._dtypes] + [dtype for _, dtype in gdf._dtypes] # type: ignore[union-attr] ) gdf = gdf.astype(dtype) - - return libdlpack.to_dlpack([*gdf._columns]) + return plc.interop.to_dlpack( + plc.Table([col.to_pylibcudf(mode="read") for col in gdf._columns]) + )