From 9fa247ff7db104517f4e9dab0fc3c321e76cccdf Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Mon, 22 Apr 2024 08:28:42 -0400 Subject: [PATCH] Add to_arrow_device() functions that accept views (#15465) Adds the following new interop functions ``` unique_device_array_t to_arrow_device(cudf::table_view const& table, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); unique_device_array_t to_arrow_device(cudf::column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); ``` Also refactors some common code with the ownership transfer version of these APIs. And moves the `to_arrow_schema()` functions to a separate .cpp file. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Matt Topol (https://github.com/zeroshade) - Nghia Truong (https://github.com/ttnghia) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/15465 --- cpp/CMakeLists.txt | 2 + cpp/include/cudf/interop.hpp | 66 +- cpp/src/interop/to_arrow_device.cu | 745 ++++++++++----------- cpp/src/interop/to_arrow_schema.cpp | 231 +++++++ cpp/src/interop/to_arrow_utilities.cpp | 44 ++ cpp/src/interop/to_arrow_utilities.hpp | 34 + cpp/tests/interop/to_arrow_device_test.cpp | 78 ++- 7 files changed, 801 insertions(+), 399 deletions(-) create mode 100644 cpp/src/interop/to_arrow_schema.cpp create mode 100644 cpp/src/interop/to_arrow_utilities.cpp create mode 100644 cpp/src/interop/to_arrow_utilities.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7d62e0acb10..3c7e10c9bc4 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -359,6 +359,8 @@ add_library( src/interop/from_arrow.cu src/interop/to_arrow.cu src/interop/to_arrow_device.cu + src/interop/to_arrow_schema.cpp + src/interop/to_arrow_utilities.cpp src/interop/detail/arrow_allocator.cpp src/io/avro/avro.cpp src/io/avro/avro_gpu.cu diff --git a/cpp/include/cudf/interop.hpp b/cpp/include/cudf/interop.hpp index dc4d66a8f6e..defc1fc834c 100644 --- a/cpp/include/cudf/interop.hpp +++ b/cpp/include/cudf/interop.hpp @@ -258,6 +258,70 @@ unique_device_array_t to_arrow_device( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); +/** + * @brief Create `ArrowDeviceArray` from a table view + * + * Populates the C struct ArrowDeviceArray performing copies only if necessary. + * This wraps the data on the GPU device and gives a view of the table data + * to the ArrowDeviceArray struct. If the caller frees the data referenced by + * the table_view, using the returned object results in undefined behavior. + * + * After calling this function, the release callback on the returned ArrowDeviceArray + * must be called to clean up any memory created during conversion. + * + * @note For decimals, since the precision is not stored for them in libcudf + * it will be converted to an Arrow decimal128 with the widest-precision the cudf decimal type + * supports. For example, numeric::decimal32 will be converted to Arrow decimal128 of the precision + * 9 which is the maximum precision for 32-bit types. Similarly, numeric::decimal128 will be + * converted to Arrow decimal128 of the precision 38. + * + * Copies will be performed in the cases where cudf differs from Arrow: + * - BOOL8: Arrow uses a bitmap and cudf uses 1 byte per value + * - DECIMAL32 and DECIMAL64: Converted to Arrow decimal128 + * - STRING: Arrow expects a single value int32 offset child array for empty strings columns + * + * @param table Input table + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used for any allocations during conversion + * @return ArrowDeviceArray which will have ownership of any copied data + */ +unique_device_array_t to_arrow_device( + cudf::table_view const& table, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Create `ArrowDeviceArray` from a column view + * + * Populates the C struct ArrowDeviceArray performing copies only if necessary. + * This wraps the data on the GPU device and gives a view of the column data + * to the ArrowDeviceArray struct. If the caller frees the data referenced by + * the column_view, using the returned object results in undefined behavior. + * + * After calling this function, the release callback on the returned ArrowDeviceArray + * must be called to clean up any memory created during conversion. + * + * @note For decimals, since the precision is not stored for them in libcudf + * it will be converted to an Arrow decimal128 with the widest-precision the cudf decimal type + * supports. For example, numeric::decimal32 will be converted to Arrow decimal128 of the precision + * 9 which is the maximum precision for 32-bit types. Similar, numeric::decimal128 will be + * converted to Arrow decimal128 of the precision 38. + * + * Copies will be performed in the cases where cudf differs from Arrow: + * - BOOL8: Arrow uses a bitmap and cudf uses 1 byte per value + * - DECIMAL32 and DECIMAL64: Converted to Arrow decimal128 + * - STRING: Arrow expects a single value int32 offset child array for empty strings columns + * + * @param col Input column + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used for any allocations during conversion + * @return ArrowDeviceArray which will have ownership of any copied data + */ +unique_device_array_t to_arrow_device( + cudf::column_view const& col, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + /** * @brief Create `cudf::table` from given arrow Table input * @@ -266,7 +330,6 @@ unique_device_array_t to_arrow_device( * @param mr Device memory resource used to allocate `cudf::table` * @return cudf table generated from given arrow Table */ - std::unique_ptr from_arrow( arrow::Table const& input, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -280,7 +343,6 @@ std::unique_ptr
from_arrow( * @param mr Device memory resource used to allocate `cudf::scalar` * @return cudf scalar generated from given arrow Scalar */ - std::unique_ptr from_arrow( arrow::Scalar const& input, rmm::cuda_stream_view stream = cudf::get_default_stream(), diff --git a/cpp/src/interop/to_arrow_device.cu b/cpp/src/interop/to_arrow_device.cu index 1754d1493bd..737f8c7f625 100644 --- a/cpp/src/interop/to_arrow_device.cu +++ b/cpp/src/interop/to_arrow_device.cu @@ -14,11 +14,14 @@ * limitations under the License. */ +#include "to_arrow_utilities.hpp" + #include #include #include #include #include +#include #include #include #include @@ -45,198 +48,10 @@ namespace cudf { namespace detail { namespace { + static constexpr int validity_buffer_idx = 0; static constexpr int fixed_width_data_buffer_idx = 1; -ArrowType id_to_arrow_type(cudf::type_id id) -{ - switch (id) { - case cudf::type_id::BOOL8: return NANOARROW_TYPE_BOOL; - case cudf::type_id::INT8: return NANOARROW_TYPE_INT8; - case cudf::type_id::INT16: return NANOARROW_TYPE_INT16; - case cudf::type_id::INT32: return NANOARROW_TYPE_INT32; - case cudf::type_id::INT64: return NANOARROW_TYPE_INT64; - case cudf::type_id::UINT8: return NANOARROW_TYPE_UINT8; - case cudf::type_id::UINT16: return NANOARROW_TYPE_UINT16; - case cudf::type_id::UINT32: return NANOARROW_TYPE_UINT32; - case cudf::type_id::UINT64: return NANOARROW_TYPE_UINT64; - case cudf::type_id::FLOAT32: return NANOARROW_TYPE_FLOAT; - case cudf::type_id::FLOAT64: return NANOARROW_TYPE_DOUBLE; - case cudf::type_id::TIMESTAMP_DAYS: return NANOARROW_TYPE_DATE32; - default: CUDF_FAIL("Unsupported type_id conversion to arrow type"); - } -} - -struct dispatch_to_arrow_type { - template ())> - int operator()(column_view, column_metadata const&, ArrowSchema*) - { - CUDF_FAIL("Unsupported type for to_arrow_schema"); - } - - template ())> - int operator()(column_view input_view, column_metadata const&, ArrowSchema* out) - { - cudf::type_id id = input_view.type().id(); - switch (id) { - case cudf::type_id::TIMESTAMP_SECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_SECOND, nullptr); - case cudf::type_id::TIMESTAMP_MILLISECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MILLI, nullptr); - case cudf::type_id::TIMESTAMP_MICROSECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO, nullptr); - case cudf::type_id::TIMESTAMP_NANOSECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_NANO, nullptr); - case cudf::type_id::DURATION_SECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_SECOND, nullptr); - case cudf::type_id::DURATION_MILLISECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MILLI, nullptr); - case cudf::type_id::DURATION_MICROSECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MICRO, nullptr); - case cudf::type_id::DURATION_NANOSECONDS: - return ArrowSchemaSetTypeDateTime( - out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_NANO, nullptr); - default: return ArrowSchemaSetType(out, id_to_arrow_type(id)); - } - } -}; - -template -int decimals_to_arrow(column_view input, ArrowSchema* out) -{ - // Arrow doesn't support decimal32/decimal64 currently. decimal128 - // is the smallest that arrow supports besides float32/float64 so we - // upcast to decimal128. - return ArrowSchemaSetTypeDecimal(out, - NANOARROW_TYPE_DECIMAL128, - cudf::detail::max_precision(), - -input.type().scale()); -} - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const&, - ArrowSchema* out) -{ - using DeviceType = int32_t; - return decimals_to_arrow(input, out); -} - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const&, - ArrowSchema* out) -{ - using DeviceType = int64_t; - return decimals_to_arrow(input, out); -} - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const&, - ArrowSchema* out) -{ - using DeviceType = __int128_t; - return decimals_to_arrow(input, out); -} - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const&, - ArrowSchema* out) -{ - return ArrowSchemaSetType(out, NANOARROW_TYPE_STRING); -} - -// these forward declarations are needed due to the recursive calls to them -// inside their definitions and in struct_vew for handling children -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const& metadata, - ArrowSchema* out); - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const& metadata, - ArrowSchema* out); - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const& metadata, - ArrowSchema* out) -{ - CUDF_EXPECTS(metadata.children_meta.size() == static_cast(input.num_children()), - "Number of field names and number of children doesn't match\n"); - - NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeStruct(out, input.num_children())); - for (int i = 0; i < input.num_children(); ++i) { - auto child = out->children[i]; - auto col = input.child(i); - ArrowSchemaInit(child); - NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(child, metadata.children_meta[i].name.c_str())); - - child->flags = col.has_nulls() ? ARROW_FLAG_NULLABLE : 0; - - if (col.type().id() == cudf::type_id::EMPTY) { - NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(child, NANOARROW_TYPE_NA)); - continue; - } - - NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( - col.type(), detail::dispatch_to_arrow_type{}, col, metadata.children_meta[i], child)); - } - - return NANOARROW_OK; -} - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const& metadata, - ArrowSchema* out) -{ - NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(out, NANOARROW_TYPE_LIST)); - auto child = input.child(cudf::lists_column_view::child_column_index); - ArrowSchemaInit(out->children[0]); - if (child.type().id() == cudf::type_id::EMPTY) { - return ArrowSchemaSetType(out->children[0], NANOARROW_TYPE_NA); - } - auto child_meta = - metadata.children_meta.empty() ? column_metadata{"element"} : metadata.children_meta[0]; - - out->flags = input.has_nulls() ? ARROW_FLAG_NULLABLE : 0; - NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(out->children[0], child_meta.name.c_str())); - out->children[0]->flags = child.has_nulls() ? ARROW_FLAG_NULLABLE : 0; - return cudf::type_dispatcher( - child.type(), detail::dispatch_to_arrow_type{}, child, child_meta, out->children[0]); -} - -template <> -int dispatch_to_arrow_type::operator()(column_view input, - column_metadata const& metadata, - ArrowSchema* out) -{ - cudf::dictionary_column_view dview{input}; - - NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(out, id_to_arrow_type(dview.indices().type().id()))); - NANOARROW_RETURN_NOT_OK(ArrowSchemaAllocateDictionary(out)); - ArrowSchemaInit(out->dictionary); - - auto dict_keys = dview.keys(); - return cudf::type_dispatcher( - dict_keys.type(), - detail::dispatch_to_arrow_type{}, - dict_keys, - metadata.children_meta.empty() ? column_metadata{"keys"} : metadata.children_meta[0], - out->dictionary); -} - template void device_buffer_finalize(ArrowBufferAllocator* allocator, uint8_t*, int64_t) { @@ -244,6 +59,14 @@ void device_buffer_finalize(ArrowBufferAllocator* allocator, uint8_t*, int64_t) delete unique_buffer; } +int initialize_array(ArrowArray* arr, ArrowType storage_type, cudf::column_view column) +{ + NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(arr, storage_type)); + arr->length = column.size(); + arr->null_count = column.null_count(); + return NANOARROW_OK; +} + template struct is_device_scalar : public std::false_type {}; @@ -279,19 +102,26 @@ int set_buffer(std::unique_ptr device_buf, int64_t i, ArrowArray* out) return NANOARROW_OK; } -int initialize_array(ArrowArray* arr, ArrowType storage_type, cudf::column const& column) +ArrowType id_to_arrow_storage_type(cudf::type_id id) { - NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(arr, storage_type)); - arr->length = column.size(); - arr->null_count = column.null_count(); - return NANOARROW_OK; + switch (id) { + case cudf::type_id::TIMESTAMP_SECONDS: + case cudf::type_id::TIMESTAMP_MILLISECONDS: + case cudf::type_id::TIMESTAMP_MICROSECONDS: + case cudf::type_id::TIMESTAMP_NANOSECONDS: return NANOARROW_TYPE_INT64; + case cudf::type_id::DURATION_SECONDS: + case cudf::type_id::DURATION_MILLISECONDS: + case cudf::type_id::DURATION_MICROSECONDS: + case cudf::type_id::DURATION_NANOSECONDS: return NANOARROW_TYPE_INT64; + default: return id_to_arrow_type(id); + } } struct dispatch_to_arrow_device { template ())> int operator()(cudf::column&&, rmm::cuda_stream_view, rmm::device_async_resource_ref, ArrowArray*) { - CUDF_FAIL("Unsupported type for to_arrow_device"); + CUDF_FAIL("Unsupported type for to_arrow_device", cudf::data_type_error); } template ())> @@ -302,38 +132,34 @@ struct dispatch_to_arrow_device { { nanoarrow::UniqueArray tmp; - const ArrowType storage_type = [&] { - switch (column.type().id()) { - case cudf::type_id::TIMESTAMP_SECONDS: - case cudf::type_id::TIMESTAMP_MILLISECONDS: - case cudf::type_id::TIMESTAMP_MICROSECONDS: - case cudf::type_id::TIMESTAMP_NANOSECONDS: return NANOARROW_TYPE_INT64; - case cudf::type_id::DURATION_SECONDS: - case cudf::type_id::DURATION_MILLISECONDS: - case cudf::type_id::DURATION_MICROSECONDS: - case cudf::type_id::DURATION_NANOSECONDS: return NANOARROW_TYPE_INT64; - default: return id_to_arrow_type(column.type().id()); - } - }(); + auto const storage_type = id_to_arrow_storage_type(column.type().id()); NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), storage_type, column)); auto contents = column.release(); + NANOARROW_RETURN_NOT_OK(set_contents(contents, tmp.get())); + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; + } + + int set_null_mask(column::contents& contents, ArrowArray* out) + { if (contents.null_mask) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.null_mask), validity_buffer_idx, tmp.get())); + NANOARROW_RETURN_NOT_OK(set_buffer(std::move(contents.null_mask), validity_buffer_idx, out)); } + return NANOARROW_OK; + } - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.data), fixed_width_data_buffer_idx, tmp.get())); - - ArrowArrayMove(tmp.get(), out); + int set_contents(column::contents& contents, ArrowArray* out) + { + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, out)); + NANOARROW_RETURN_NOT_OK(set_buffer(std::move(contents.data), fixed_width_data_buffer_idx, out)); return NANOARROW_OK; } }; template -int decimals_to_arrow(cudf::column&& input, - int32_t precision, +int decimals_to_arrow(cudf::column_view input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr, ArrowArray* out) @@ -341,42 +167,28 @@ int decimals_to_arrow(cudf::column&& input, nanoarrow::UniqueArray tmp; NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_DECIMAL128, input)); - if constexpr (!std::is_same_v) { - constexpr size_type BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DeviceType); - auto buf = - std::make_unique>(input.size() * BIT_WIDTH_RATIO, stream, mr); - - auto count = thrust::make_counting_iterator(0); - - thrust::for_each(rmm::exec_policy(stream, mr), - count, - count + input.size(), - [in = input.view().begin(), - out = buf->data(), - BIT_WIDTH_RATIO] __device__(auto in_idx) { - auto const out_idx = in_idx * BIT_WIDTH_RATIO; - // the lowest order bits are the value, the remainder - // simply matches the sign bit to satisfy the two's - // complement integer representation of negative numbers. - out[out_idx] = in[in_idx]; + constexpr size_type BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DeviceType); + auto buf = + std::make_unique>(input.size() * BIT_WIDTH_RATIO, stream, mr); + + auto count = thrust::counting_iterator(0); + + thrust::for_each( + rmm::exec_policy(stream, mr), + count, + count + input.size(), + [in = input.begin(), out = buf->data(), BIT_WIDTH_RATIO] __device__(auto in_idx) { + auto const out_idx = in_idx * BIT_WIDTH_RATIO; + // the lowest order bits are the value, the remainder + // simply matches the sign bit to satisfy the two's + // complement integer representation of negative numbers. + out[out_idx] = in[in_idx]; #pragma unroll BIT_WIDTH_RATIO - 1 - for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) { - out[out_idx + i] = in[in_idx] < 0 ? -1 : 0; - } - }); - NANOARROW_RETURN_NOT_OK(set_buffer(std::move(buf), fixed_width_data_buffer_idx, tmp.get())); - } - - auto contents = input.release(); - if (contents.null_mask) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.null_mask), validity_buffer_idx, tmp.get())); - } - - if constexpr (std::is_same_v) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.data), fixed_width_data_buffer_idx, tmp.get())); - } + for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) { + out[out_idx + i] = in[in_idx] < 0 ? -1 : 0; + } + }); + NANOARROW_RETURN_NOT_OK(set_buffer(std::move(buf), fixed_width_data_buffer_idx, tmp.get())); ArrowArrayMove(tmp.get(), out); return NANOARROW_OK; @@ -389,8 +201,10 @@ int dispatch_to_arrow_device::operator()(cudf::column&& colu ArrowArray* out) { using DeviceType = int32_t; - return decimals_to_arrow( - std::move(column), cudf::detail::max_precision(), stream, mr, out); + NANOARROW_RETURN_NOT_OK(decimals_to_arrow(column.view(), stream, mr, out)); + auto contents = column.release(); + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, out)); + return NANOARROW_OK; } template <> @@ -400,8 +214,10 @@ int dispatch_to_arrow_device::operator()(cudf::column&& colu ArrowArray* out) { using DeviceType = int64_t; - return decimals_to_arrow( - std::move(column), cudf::detail::max_precision(), stream, mr, out); + NANOARROW_RETURN_NOT_OK(decimals_to_arrow(column.view(), stream, mr, out)); + auto contents = column.release(); + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, out)); + return NANOARROW_OK; } template <> @@ -410,9 +226,12 @@ int dispatch_to_arrow_device::operator()(cudf::column&& col rmm::device_async_resource_ref mr, ArrowArray* out) { - using DeviceType = __int128_t; - return decimals_to_arrow( - std::move(column), cudf::detail::max_precision(), stream, mr, out); + nanoarrow::UniqueArray tmp; + NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_DECIMAL128, column)); + auto contents = column.release(); + NANOARROW_RETURN_NOT_OK(set_contents(contents, tmp.get())); + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; } template <> @@ -426,10 +245,7 @@ int dispatch_to_arrow_device::operator()(cudf::column&& column, auto bitmask = bools_to_mask(column.view(), stream, mr); auto contents = column.release(); - if (contents.null_mask) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.null_mask), validity_buffer_idx, tmp.get())); - } + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, tmp.get())); NANOARROW_RETURN_NOT_OK( set_buffer(std::move(bitmask.first), fixed_width_data_buffer_idx, tmp.get())); @@ -459,10 +275,7 @@ int dispatch_to_arrow_device::operator()(cudf::column&& colum } auto contents = column.release(); - if (contents.null_mask) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.null_mask), validity_buffer_idx, tmp.get())); - } + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, tmp.get())); auto offsets_contents = contents.children[cudf::strings_column_view::offsets_column_index]->release(); @@ -496,22 +309,13 @@ int dispatch_to_arrow_device::operator()(cudf::column&& colum NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(tmp.get(), column.num_children())); auto contents = column.release(); - if (contents.null_mask) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.null_mask), validity_buffer_idx, tmp.get())); - } + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, tmp.get())); for (size_t i = 0; i < size_t(tmp->n_children); ++i) { ArrowArray* child_ptr = tmp->children[i]; auto& child = contents.children[i]; - if (child->type().id() == cudf::type_id::EMPTY) { - NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(child_ptr, NANOARROW_TYPE_NA)); - child_ptr->length = child->size(); - child_ptr->null_count = child->size(); - } else { - NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( - child->type(), dispatch_to_arrow_device{}, std::move(*child), stream, mr, child_ptr)); - } + NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( + child->type(), dispatch_to_arrow_device{}, std::move(*child), stream, mr, child_ptr)); } ArrowArrayMove(tmp.get(), out); @@ -529,24 +333,15 @@ int dispatch_to_arrow_device::operator()(cudf::column&& column, NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(tmp.get(), 1)); auto contents = column.release(); - if (contents.null_mask) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.null_mask), validity_buffer_idx, tmp.get())); - } + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, tmp.get())); auto offsets_contents = contents.children[cudf::lists_column_view::offsets_column_index]->release(); NANOARROW_RETURN_NOT_OK(set_buffer(std::move(offsets_contents.data), 1, tmp.get())); auto& child = contents.children[cudf::lists_column_view::child_column_index]; - if (child->type().id() == cudf::type_id::EMPTY) { - NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(tmp->children[0], NANOARROW_TYPE_NA)); - tmp->children[0]->length = 0; - tmp->children[0]->null_count = 0; - } else { - NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( - child->type(), dispatch_to_arrow_device{}, std::move(*child), stream, mr, tmp->children[0])); - } + NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( + child->type(), dispatch_to_arrow_device{}, std::move(*child), stream, mr, tmp->children[0])); ArrowArrayMove(tmp.get(), out); return NANOARROW_OK; @@ -566,10 +361,7 @@ int dispatch_to_arrow_device::operator()(cudf::column&& colu NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(tmp.get())); auto contents = column.release(); - if (contents.null_mask) { - NANOARROW_RETURN_NOT_OK( - set_buffer(std::move(contents.null_mask), validity_buffer_idx, tmp.get())); - } + NANOARROW_RETURN_NOT_OK(set_null_mask(contents, tmp.get())); auto indices_contents = contents.children[cudf::dictionary_column_view::indices_column_index]->release(); @@ -584,6 +376,205 @@ int dispatch_to_arrow_device::operator()(cudf::column&& colu return NANOARROW_OK; } +struct dispatch_to_arrow_device_view { + cudf::column_view column; + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; + + template ())> + int operator()(ArrowArray*) const + { + CUDF_FAIL("Unsupported type for to_arrow_device", cudf::data_type_error); + } + + template ())> + int operator()(ArrowArray* out) const + { + nanoarrow::UniqueArray tmp; + + auto const storage_type = id_to_arrow_storage_type(column.type().id()); + NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), storage_type, column)); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, tmp.get())); + NANOARROW_RETURN_NOT_OK(set_view_to_buffer(column, tmp.get())); + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; + } + + int set_buffer_view(void const* in_ptr, size_t size, int64_t i, ArrowArray* out) const + { + ArrowBuffer* buf = ArrowArrayBuffer(out, i); + buf->size_bytes = size; + + // reset the deallocator to do nothing since this is a non-owning view + NANOARROW_RETURN_NOT_OK(ArrowBufferSetAllocator( + buf, ArrowBufferDeallocator([](ArrowBufferAllocator*, uint8_t*, int64_t) {}, nullptr))); + + buf->data = const_cast(reinterpret_cast(in_ptr)); + return NANOARROW_OK; + } + + int set_null_mask(column_view column, ArrowArray* out) const + { + if (column.nullable()) { + NANOARROW_RETURN_NOT_OK(set_buffer_view(column.null_mask(), + bitmask_allocation_size_bytes(column.size()), + validity_buffer_idx, + out)); + } + return NANOARROW_OK; + } + + int set_view_to_buffer(column_view column, ArrowArray* out) const + { + auto const type_size = cudf::size_of(column.type()); + return set_buffer_view(column.head() + (type_size * column.offset()), + column.size() * type_size, + fixed_width_data_buffer_idx, + out); + } +}; + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + using DeviceType = int32_t; + NANOARROW_RETURN_NOT_OK(decimals_to_arrow(column, stream, mr, out)); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, out)); + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + using DeviceType = int64_t; + NANOARROW_RETURN_NOT_OK(decimals_to_arrow(column, stream, mr, out)); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, out)); + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + nanoarrow::UniqueArray tmp; + + NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_DECIMAL128, column)); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, tmp.get())); + NANOARROW_RETURN_NOT_OK(set_view_to_buffer(column, tmp.get())); + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + nanoarrow::UniqueArray tmp; + NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_BOOL, column)); + + auto bitmask = bools_to_mask(column, stream, mr); + NANOARROW_RETURN_NOT_OK( + set_buffer(std::move(bitmask.first), fixed_width_data_buffer_idx, tmp.get())); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, tmp.get())); + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + nanoarrow::UniqueArray tmp; + NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_STRING, column)); + + if (column.size() == 0) { + // https://github.com/rapidsai/cudf/pull/15047#discussion_r1546528552 + auto zero = std::make_unique>(0, stream, mr); + NANOARROW_RETURN_NOT_OK(set_buffer(std::move(zero), fixed_width_data_buffer_idx, tmp.get())); + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; + } + + NANOARROW_RETURN_NOT_OK(set_null_mask(column, tmp.get())); + + auto const scv = cudf::strings_column_view(column); + NANOARROW_RETURN_NOT_OK(set_view_to_buffer(scv.offsets(), tmp.get())); + NANOARROW_RETURN_NOT_OK( + set_buffer_view(scv.chars_begin(stream), scv.chars_size(stream), 2, tmp.get())); + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const; + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const; + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + nanoarrow::UniqueArray tmp; + + NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_STRUCT, column)); + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(tmp.get(), column.num_children())); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, tmp.get())); + + for (size_t i = 0; i < size_t(tmp->n_children); ++i) { + ArrowArray* child_ptr = tmp->children[i]; + auto const child = column.child(i); + NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( + child.type(), dispatch_to_arrow_device_view{child, stream, mr}, child_ptr)); + } + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + nanoarrow::UniqueArray tmp; + + NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_LIST, column)); + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(tmp.get(), 1)); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, tmp.get())); + + auto const lcv = cudf::lists_column_view(column); + NANOARROW_RETURN_NOT_OK(set_view_to_buffer(lcv.offsets(), tmp.get())); + + auto child = lcv.child(); + NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( + child.type(), dispatch_to_arrow_device_view{child, stream, mr}, tmp->children[0])); + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_device_view::operator()(ArrowArray* out) const +{ + nanoarrow::UniqueArray tmp; + + NANOARROW_RETURN_NOT_OK(initialize_array( + tmp.get(), + id_to_arrow_type(column.child(cudf::dictionary_column_view::indices_column_index).type().id()), + column)); + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateDictionary(tmp.get())); + NANOARROW_RETURN_NOT_OK(set_null_mask(column, tmp.get())); + + auto const dcv = cudf::dictionary_column_view(column); + NANOARROW_RETURN_NOT_OK(set_view_to_buffer(dcv.indices(), tmp.get())); + + auto keys = dcv.keys(); + NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( + keys.type(), dispatch_to_arrow_device_view{keys, stream, mr}, tmp->dictionary)); + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; +} + struct ArrowDeviceArrayPrivateData { ArrowArray parent; cudaEvent_t sync_event; @@ -592,49 +583,38 @@ struct ArrowDeviceArrayPrivateData { void ArrowDeviceArrayRelease(ArrowArray* array) { auto private_data = reinterpret_cast(array->private_data); - cudaEventDestroy(private_data->sync_event); + RMM_ASSERT_CUDA_SUCCESS(cudaEventDestroy(private_data->sync_event)); ArrowArrayRelease(&private_data->parent); delete private_data; array->release = nullptr; } -} // namespace -} // namespace detail - -unique_schema_t to_arrow_schema(cudf::table_view const& input, - cudf::host_span metadata) +unique_device_array_t create_device_array(nanoarrow::UniqueArray&& out, + rmm::cuda_stream_view stream) { - CUDF_EXPECTS((metadata.size() == static_cast(input.num_columns())), - "columns' metadata should be equal to the number of columns in table"); - - nanoarrow::UniqueSchema result; - ArrowSchemaInit(result.get()); - NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(result.get(), input.num_columns())); - - for (int i = 0; i < input.num_columns(); ++i) { - auto child = result->children[i]; - auto col = input.column(i); - ArrowSchemaInit(child); - NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child, metadata[i].name.c_str())); - child->flags = col.has_nulls() ? ARROW_FLAG_NULLABLE : 0; - - if (col.type().id() == cudf::type_id::EMPTY) { - NANOARROW_THROW_NOT_OK(ArrowSchemaSetType(child, NANOARROW_TYPE_NA)); - continue; - } + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(out.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr)); - NANOARROW_THROW_NOT_OK( - cudf::type_dispatcher(col.type(), detail::dispatch_to_arrow_type{}, col, metadata[i], child)); - } + auto private_data = std::make_unique(); + CUDF_CUDA_TRY(cudaEventCreate(&private_data->sync_event)); + CUDF_CUDA_TRY(cudaEventRecord(private_data->sync_event, stream.value())); - unique_schema_t out(new ArrowSchema, [](ArrowSchema* schema) { - if (schema->release != nullptr) { ArrowSchemaRelease(schema); } - delete schema; + ArrowArrayMove(out.get(), &private_data->parent); + unique_device_array_t result(new ArrowDeviceArray, [](ArrowDeviceArray* arr) { + if (arr->array.release != nullptr) { ArrowArrayRelease(&arr->array); } + delete arr; }); - result.move(out.get()); - return out; + result->device_id = rmm::get_current_cuda_device().value(); + result->device_type = ARROW_DEVICE_CUDA; + result->sync_event = private_data->sync_event; + result->array = private_data->parent; // makes a shallow copy + result->array.private_data = private_data.release(); + result->array.release = &detail::ArrowDeviceArrayRelease; + return result; } +} // namespace + unique_device_array_t to_arrow_device(cudf::table&& table, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -650,76 +630,89 @@ unique_device_array_t to_arrow_device(cudf::table&& table, for (size_t i = 0; i < cols.size(); ++i) { auto child = tmp->children[i]; auto col = cols[i].get(); - - if (col->type().id() == cudf::type_id::EMPTY) { - NANOARROW_THROW_NOT_OK(ArrowArrayInitFromType(child, NANOARROW_TYPE_NA)); - child->length = col->size(); - child->null_count = col->size(); - continue; - } - NANOARROW_THROW_NOT_OK(cudf::type_dispatcher( col->type(), detail::dispatch_to_arrow_device{}, std::move(*col), stream, mr, child)); } - NANOARROW_THROW_NOT_OK( - ArrowArrayFinishBuilding(tmp.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr)); + return create_device_array(std::move(tmp), stream); +} - auto private_data = std::make_unique(); - cudaEventCreate(&private_data->sync_event); +unique_device_array_t to_arrow_device(cudf::column&& col, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + nanoarrow::UniqueArray tmp; - auto status = cudaEventRecord(private_data->sync_event, stream); - if (status != cudaSuccess) { CUDF_FAIL("could not create event to sync on"); } + NANOARROW_THROW_NOT_OK(cudf::type_dispatcher( + col.type(), detail::dispatch_to_arrow_device{}, std::move(col), stream, mr, tmp.get())); - ArrowArrayMove(tmp.get(), &private_data->parent); - unique_device_array_t result(new ArrowDeviceArray, [](ArrowDeviceArray* arr) { - if (arr->array.release != nullptr) { ArrowArrayRelease(&arr->array); } - delete arr; - }); - result->device_id = rmm::get_current_cuda_device().value(); - result->device_type = ARROW_DEVICE_CUDA; - result->sync_event = &private_data->sync_event; - result->array = private_data->parent; - result->array.private_data = private_data.release(); - result->array.release = &detail::ArrowDeviceArrayRelease; - return result; + return create_device_array(std::move(tmp), stream); } -unique_device_array_t to_arrow_device(cudf::column&& col, +unique_device_array_t to_arrow_device(cudf::table_view const& table, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { nanoarrow::UniqueArray tmp; - if (col.type().id() == cudf::type_id::EMPTY) { - NANOARROW_THROW_NOT_OK(ArrowArrayInitFromType(tmp.get(), NANOARROW_TYPE_NA)); - tmp->length = col.size(); - tmp->null_count = col.size(); + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromType(tmp.get(), NANOARROW_TYPE_STRUCT)); + + NANOARROW_THROW_NOT_OK(ArrowArrayAllocateChildren(tmp.get(), table.num_columns())); + tmp->length = table.num_rows(); + tmp->null_count = 0; + + for (cudf::size_type i = 0; i < table.num_columns(); ++i) { + auto child = tmp->children[i]; + auto col = table.column(i); + NANOARROW_THROW_NOT_OK(cudf::type_dispatcher( + col.type(), detail::dispatch_to_arrow_device_view{col, stream, mr}, child)); } + return create_device_array(std::move(tmp), stream); +} + +unique_device_array_t to_arrow_device(cudf::column_view const& col, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + nanoarrow::UniqueArray tmp; + NANOARROW_THROW_NOT_OK(cudf::type_dispatcher( - col.type(), detail::dispatch_to_arrow_device{}, std::move(col), stream, mr, tmp.get())); + col.type(), detail::dispatch_to_arrow_device_view{col, stream, mr}, tmp.get())); - NANOARROW_THROW_NOT_OK( - ArrowArrayFinishBuilding(tmp.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr)); + return create_device_array(std::move(tmp), stream); +} - auto private_data = std::make_unique(); - cudaEventCreate(&private_data->sync_event); +} // namespace detail - auto status = cudaEventRecord(private_data->sync_event, stream); - if (status != cudaSuccess) { CUDF_FAIL("could not create event to sync on"); } +unique_device_array_t to_arrow_device(cudf::table&& table, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::to_arrow_device(std::move(table), stream, mr); +} - ArrowArrayMove(tmp.get(), &private_data->parent); - unique_device_array_t result(new ArrowDeviceArray, [](ArrowDeviceArray* arr) { - if (arr->array.release != nullptr) { ArrowArrayRelease(&arr->array); } - delete arr; - }); - result->device_id = rmm::get_current_cuda_device().value(); - result->device_type = ARROW_DEVICE_CUDA; - result->sync_event = &private_data->sync_event; - result->array = private_data->parent; - result->array.private_data = private_data.release(); - result->array.release = &detail::ArrowDeviceArrayRelease; - return result; +unique_device_array_t to_arrow_device(cudf::column&& col, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::to_arrow_device(std::move(col), stream, mr); +} + +unique_device_array_t to_arrow_device(cudf::table_view const& table, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::to_arrow_device(table, stream, mr); } +unique_device_array_t to_arrow_device(cudf::column_view const& col, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::to_arrow_device(col, stream, mr); +} } // namespace cudf diff --git a/cpp/src/interop/to_arrow_schema.cpp b/cpp/src/interop/to_arrow_schema.cpp new file mode 100644 index 00000000000..6f943593dce --- /dev/null +++ b/cpp/src/interop/to_arrow_schema.cpp @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "to_arrow_utilities.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace cudf { +namespace detail { +namespace { + +struct dispatch_to_arrow_type { + template ())> + int operator()(column_view, column_metadata const&, ArrowSchema*) + { + CUDF_FAIL("Unsupported type for to_arrow_schema", cudf::data_type_error); + } + + template ())> + int operator()(column_view input_view, column_metadata const&, ArrowSchema* out) + { + cudf::type_id id = input_view.type().id(); + switch (id) { + case cudf::type_id::TIMESTAMP_SECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_SECOND, nullptr); + case cudf::type_id::TIMESTAMP_MILLISECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MILLI, nullptr); + case cudf::type_id::TIMESTAMP_MICROSECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO, nullptr); + case cudf::type_id::TIMESTAMP_NANOSECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_NANO, nullptr); + case cudf::type_id::DURATION_SECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_SECOND, nullptr); + case cudf::type_id::DURATION_MILLISECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MILLI, nullptr); + case cudf::type_id::DURATION_MICROSECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MICRO, nullptr); + case cudf::type_id::DURATION_NANOSECONDS: + return ArrowSchemaSetTypeDateTime( + out, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_NANO, nullptr); + default: return ArrowSchemaSetType(out, id_to_arrow_type(id)); + } + } +}; + +template +int decimals_to_arrow(column_view input, ArrowSchema* out) +{ + // Arrow doesn't support decimal32/decimal64 currently. decimal128 + // is the smallest that arrow supports besides float32/float64 so we + // upcast to decimal128. + return ArrowSchemaSetTypeDecimal(out, + NANOARROW_TYPE_DECIMAL128, + cudf::detail::max_precision(), + -input.type().scale()); +} + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const&, + ArrowSchema* out) +{ + using DeviceType = int32_t; + return decimals_to_arrow(input, out); +} + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const&, + ArrowSchema* out) +{ + using DeviceType = int64_t; + return decimals_to_arrow(input, out); +} + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const&, + ArrowSchema* out) +{ + using DeviceType = __int128_t; + return decimals_to_arrow(input, out); +} + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const&, + ArrowSchema* out) +{ + return ArrowSchemaSetType(out, NANOARROW_TYPE_STRING); +} + +// these forward declarations are needed due to the recursive calls to them +// inside their definitions and in struct_vew for handling children +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const& metadata, + ArrowSchema* out); + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const& metadata, + ArrowSchema* out); + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const& metadata, + ArrowSchema* out) +{ + CUDF_EXPECTS(metadata.children_meta.size() == static_cast(input.num_children()), + "Number of field names and number of children doesn't match\n"); + + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeStruct(out, input.num_children())); + for (int i = 0; i < input.num_children(); ++i) { + auto child = out->children[i]; + auto col = input.child(i); + ArrowSchemaInit(child); + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(child, metadata.children_meta[i].name.c_str())); + + child->flags = col.has_nulls() ? ARROW_FLAG_NULLABLE : 0; + + NANOARROW_RETURN_NOT_OK(cudf::type_dispatcher( + col.type(), detail::dispatch_to_arrow_type{}, col, metadata.children_meta[i], child)); + } + + return NANOARROW_OK; +} + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const& metadata, + ArrowSchema* out) +{ + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(out, NANOARROW_TYPE_LIST)); + auto child = input.child(cudf::lists_column_view::child_column_index); + ArrowSchemaInit(out->children[0]); + auto child_meta = + metadata.children_meta.empty() ? column_metadata{"element"} : metadata.children_meta[0]; + + out->flags = input.has_nulls() ? ARROW_FLAG_NULLABLE : 0; + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(out->children[0], child_meta.name.c_str())); + out->children[0]->flags = child.has_nulls() ? ARROW_FLAG_NULLABLE : 0; + return cudf::type_dispatcher( + child.type(), detail::dispatch_to_arrow_type{}, child, child_meta, out->children[0]); +} + +template <> +int dispatch_to_arrow_type::operator()(column_view input, + column_metadata const& metadata, + ArrowSchema* out) +{ + cudf::dictionary_column_view dview{input}; + + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(out, id_to_arrow_type(dview.indices().type().id()))); + NANOARROW_RETURN_NOT_OK(ArrowSchemaAllocateDictionary(out)); + ArrowSchemaInit(out->dictionary); + + auto dict_keys = dview.keys(); + return cudf::type_dispatcher( + dict_keys.type(), + detail::dispatch_to_arrow_type{}, + dict_keys, + metadata.children_meta.empty() ? column_metadata{"keys"} : metadata.children_meta[0], + out->dictionary); +} +} // namespace +} // namespace detail + +unique_schema_t to_arrow_schema(cudf::table_view const& input, + cudf::host_span metadata) +{ + CUDF_EXPECTS((metadata.size() == static_cast(input.num_columns())), + "columns' metadata should be equal to the number of columns in table"); + + nanoarrow::UniqueSchema result; + ArrowSchemaInit(result.get()); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(result.get(), input.num_columns())); + + for (int i = 0; i < input.num_columns(); ++i) { + auto child = result->children[i]; + auto col = input.column(i); + ArrowSchemaInit(child); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child, metadata[i].name.c_str())); + child->flags = col.has_nulls() ? ARROW_FLAG_NULLABLE : 0; + + NANOARROW_THROW_NOT_OK( + cudf::type_dispatcher(col.type(), detail::dispatch_to_arrow_type{}, col, metadata[i], child)); + } + + unique_schema_t out(new ArrowSchema, [](ArrowSchema* schema) { + if (schema->release != nullptr) { ArrowSchemaRelease(schema); } + delete schema; + }); + result.move(out.get()); + return out; +} + +} // namespace cudf diff --git a/cpp/src/interop/to_arrow_utilities.cpp b/cpp/src/interop/to_arrow_utilities.cpp new file mode 100644 index 00000000000..04d17847273 --- /dev/null +++ b/cpp/src/interop/to_arrow_utilities.cpp @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "to_arrow_utilities.hpp" + +#include + +namespace cudf { +namespace detail { + +ArrowType id_to_arrow_type(cudf::type_id id) +{ + switch (id) { + case cudf::type_id::BOOL8: return NANOARROW_TYPE_BOOL; + case cudf::type_id::INT8: return NANOARROW_TYPE_INT8; + case cudf::type_id::INT16: return NANOARROW_TYPE_INT16; + case cudf::type_id::INT32: return NANOARROW_TYPE_INT32; + case cudf::type_id::INT64: return NANOARROW_TYPE_INT64; + case cudf::type_id::UINT8: return NANOARROW_TYPE_UINT8; + case cudf::type_id::UINT16: return NANOARROW_TYPE_UINT16; + case cudf::type_id::UINT32: return NANOARROW_TYPE_UINT32; + case cudf::type_id::UINT64: return NANOARROW_TYPE_UINT64; + case cudf::type_id::FLOAT32: return NANOARROW_TYPE_FLOAT; + case cudf::type_id::FLOAT64: return NANOARROW_TYPE_DOUBLE; + case cudf::type_id::TIMESTAMP_DAYS: return NANOARROW_TYPE_DATE32; + default: CUDF_FAIL("Unsupported type_id conversion to arrow type", cudf::data_type_error); + } +} + +} // namespace detail +} // namespace cudf diff --git a/cpp/src/interop/to_arrow_utilities.hpp b/cpp/src/interop/to_arrow_utilities.hpp new file mode 100644 index 00000000000..3c01c726a7b --- /dev/null +++ b/cpp/src/interop/to_arrow_utilities.hpp @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +namespace cudf { +namespace detail { + +/** + * @brief Map cudf column type id to ArrowType id + * + * @param id Column type id + * @return ArrowType id + */ +ArrowType id_to_arrow_type(cudf::type_id id); + +} // namespace detail +} // namespace cudf diff --git a/cpp/tests/interop/to_arrow_device_test.cpp b/cpp/tests/interop/to_arrow_device_test.cpp index 16aab53a249..d6eae8dece1 100644 --- a/cpp/tests/interop/to_arrow_device_test.cpp +++ b/cpp/tests/interop/to_arrow_device_test.cpp @@ -327,14 +327,16 @@ TEST_F(ToArrowDeviceTest, EmptyTable) auto got_arrow_schema = cudf::to_arrow_schema(table->view(), meta); compare_schemas(schema.get(), got_arrow_schema.get()); - ArrowSchemaRelease(got_arrow_schema.get()); - auto got_arrow_device = cudf::to_arrow_device(std::move(*table)); + auto got_arrow_device = cudf::to_arrow_device(table->view()); EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_device->device_id); EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_device->device_type); + compare_arrays(schema.get(), arr.get(), &got_arrow_device->array); + got_arrow_device = cudf::to_arrow_device(std::move(*table)); + EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_device->device_id); + EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_device->device_type); compare_arrays(schema.get(), arr.get(), &got_arrow_device->array); - ArrowArrayRelease(&got_arrow_device->array); } TEST_F(ToArrowDeviceTest, DateTimeTable) @@ -358,10 +360,9 @@ TEST_F(ToArrowDeviceTest, DateTimeTable) expected_schema->children[0]->flags = 0; compare_schemas(expected_schema.get(), got_arrow_schema.get()); - ArrowSchemaRelease(got_arrow_schema.get()); auto data_ptr = input.get_column(0).view().data(); - auto got_arrow_array = cudf::to_arrow_device(std::move(input)); + auto got_arrow_array = cudf::to_arrow_device(input.view()); EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); @@ -377,7 +378,21 @@ TEST_F(ToArrowDeviceTest, DateTimeTable) EXPECT_EQ(nullptr, got_arrow_array->array.children[0]->buffers[0]); EXPECT_EQ(data_ptr, got_arrow_array->array.children[0]->buffers[1]); - ArrowArrayRelease(&got_arrow_array->array); + got_arrow_array = cudf::to_arrow_device(std::move(input)); + EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); + EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); + + EXPECT_EQ(data.size(), got_arrow_array->array.length); + EXPECT_EQ(0, got_arrow_array->array.null_count); + EXPECT_EQ(0, got_arrow_array->array.offset); + EXPECT_EQ(1, got_arrow_array->array.n_children); + EXPECT_EQ(nullptr, got_arrow_array->array.buffers[0]); + + EXPECT_EQ(data.size(), got_arrow_array->array.children[0]->length); + EXPECT_EQ(0, got_arrow_array->array.children[0]->null_count); + EXPECT_EQ(0, got_arrow_array->array.children[0]->offset); + EXPECT_EQ(nullptr, got_arrow_array->array.children[0]->buffers[0]); + EXPECT_EQ(data_ptr, got_arrow_array->array.children[0]->buffers[1]); } TYPED_TEST(ToArrowDeviceTestDurationsTest, DurationTable) @@ -415,10 +430,9 @@ TYPED_TEST(ToArrowDeviceTestDurationsTest, DurationTable) auto got_arrow_schema = cudf::to_arrow_schema(input.view(), std::vector{{"a"}}); BaseArrowFixture::compare_schemas(expected_schema.get(), got_arrow_schema.get()); - ArrowSchemaRelease(got_arrow_schema.get()); auto data_ptr = input.get_column(0).view().data(); - auto got_arrow_array = cudf::to_arrow_device(std::move(input)); + auto got_arrow_array = cudf::to_arrow_device(input.view()); EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); @@ -434,7 +448,21 @@ TYPED_TEST(ToArrowDeviceTestDurationsTest, DurationTable) EXPECT_EQ(nullptr, got_arrow_array->array.children[0]->buffers[0]); EXPECT_EQ(data_ptr, got_arrow_array->array.children[0]->buffers[1]); - ArrowArrayRelease(&got_arrow_array->array); + got_arrow_array = cudf::to_arrow_device(std::move(input)); + EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); + EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); + + EXPECT_EQ(data.size(), got_arrow_array->array.length); + EXPECT_EQ(0, got_arrow_array->array.null_count); + EXPECT_EQ(0, got_arrow_array->array.offset); + EXPECT_EQ(1, got_arrow_array->array.n_children); + EXPECT_EQ(nullptr, got_arrow_array->array.buffers[0]); + + EXPECT_EQ(data.size(), got_arrow_array->array.children[0]->length); + EXPECT_EQ(0, got_arrow_array->array.children[0]->null_count); + EXPECT_EQ(0, got_arrow_array->array.children[0]->offset); + EXPECT_EQ(nullptr, got_arrow_array->array.children[0]->buffers[0]); + EXPECT_EQ(data_ptr, got_arrow_array->array.children[0]->buffers[1]); } TEST_F(ToArrowDeviceTest, NestedList) @@ -471,7 +499,6 @@ TEST_F(ToArrowDeviceTest, NestedList) auto got_arrow_schema = cudf::to_arrow_schema(input.view(), std::vector{{"a"}}); compare_schemas(expected_schema.get(), got_arrow_schema.get()); - ArrowSchemaRelease(got_arrow_schema.get()); nanoarrow::UniqueArray expected_array; EXPECT_EQ(NANOARROW_OK, @@ -487,12 +514,15 @@ TEST_F(ToArrowDeviceTest, NestedList) NANOARROW_THROW_NOT_OK( ArrowArrayFinishBuilding(expected_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); - auto got_arrow_array = cudf::to_arrow_device(std::move(input)); + auto got_arrow_array = cudf::to_arrow_device(input.view()); EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); + compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); + got_arrow_array = cudf::to_arrow_device(std::move(input)); + EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); + EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); - ArrowArrayRelease(&got_arrow_array->array); } TEST_F(ToArrowDeviceTest, StructColumn) @@ -588,7 +618,6 @@ TEST_F(ToArrowDeviceTest, StructColumn) auto got_arrow_schema = cudf::to_arrow_schema(input.view(), std::vector{metadata}); compare_schemas(expected_schema.get(), got_arrow_schema.get()); - ArrowSchemaRelease(got_arrow_schema.get()); nanoarrow::UniqueArray expected_array; NANOARROW_THROW_NOT_OK( @@ -629,12 +658,15 @@ TEST_F(ToArrowDeviceTest, StructColumn) NANOARROW_THROW_NOT_OK( ArrowArrayFinishBuilding(expected_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); - auto got_arrow_array = cudf::to_arrow_device(std::move(input)); + auto got_arrow_array = cudf::to_arrow_device(input.view()); EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); + compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); + got_arrow_array = cudf::to_arrow_device(std::move(input)); + EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); + EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); - ArrowArrayRelease(&got_arrow_array->array); } template @@ -665,7 +697,6 @@ TEST_F(ToArrowDeviceTest, FixedPoint64Table) auto got_arrow_schema = cudf::to_arrow_schema(input.view(), std::vector{{"a"}}); compare_schemas(expected_schema.get(), got_arrow_schema.get()); - ArrowSchemaRelease(got_arrow_schema.get()); auto result_dev_data = std::make_unique>( expect_data.size(), cudf::get_default_stream()); @@ -700,12 +731,15 @@ TEST_F(ToArrowDeviceTest, FixedPoint64Table) NANOARROW_THROW_NOT_OK( ArrowArrayFinishBuilding(expected_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); - auto got_arrow_array = cudf::to_arrow_device(std::move(input)); + auto got_arrow_array = cudf::to_arrow_device(input.view()); ASSERT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); ASSERT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); + compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); + got_arrow_array = cudf::to_arrow_device(std::move(input)); + ASSERT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); + ASSERT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); - ArrowArrayRelease(&got_arrow_array->array); } } @@ -734,7 +768,6 @@ TEST_F(ToArrowDeviceTest, FixedPoint128Table) auto got_arrow_schema = cudf::to_arrow_schema(input.view(), std::vector{{"a"}}); compare_schemas(expected_schema.get(), got_arrow_schema.get()); - ArrowSchemaRelease(got_arrow_schema.get()); nanoarrow::UniqueArray expected_array; NANOARROW_THROW_NOT_OK( @@ -745,11 +778,14 @@ TEST_F(ToArrowDeviceTest, FixedPoint128Table) NANOARROW_THROW_NOT_OK( ArrowArrayFinishBuilding(expected_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); - auto got_arrow_array = cudf::to_arrow_device(std::move(input)); + auto got_arrow_array = cudf::to_arrow_device(input.view()); EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); + compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); + got_arrow_array = cudf::to_arrow_device(std::move(input)); + EXPECT_EQ(rmm::get_current_cuda_device().value(), got_arrow_array->device_id); + EXPECT_EQ(ARROW_DEVICE_CUDA, got_arrow_array->device_type); compare_arrays(expected_schema.get(), expected_array.get(), &got_arrow_array->array); - ArrowArrayRelease(&got_arrow_array->array); } }