From cd3e352be06795b825828156da10ba83e1e8939f Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:38:35 -0500 Subject: [PATCH] Migrate `cudf::io::merge_row_group_metadata` to pylibcudf (#17491) Apart of #15162 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/17491 --- python/cudf/cudf/_lib/parquet.pyx | 22 ++++---------- python/pylibcudf/pylibcudf/io/parquet.pxd | 2 ++ python/pylibcudf/pylibcudf/io/parquet.pyi | 1 + python/pylibcudf/pylibcudf/io/parquet.pyx | 36 +++++++++++++++++++++-- 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d4bd0cd306c..6c80120ad6e 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -14,8 +14,6 @@ except ImportError: import numpy as np -from cython.operator cimport dereference - from cudf.api.types import is_list_like from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io @@ -25,7 +23,7 @@ from cudf._lib.utils import _index_level_name, generate_pandas_metadata from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.map cimport map -from libcpp.memory cimport make_unique, unique_ptr +from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector @@ -35,7 +33,6 @@ from pylibcudf.io.parquet cimport ChunkedParquetReader from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_writer_options, - merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, parquet_writer_options, write_parquet as parquet_writer, @@ -64,6 +61,7 @@ import pylibcudf as plc from pylibcudf cimport Table from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT +from cython.operator cimport dereference cdef class BufferArrayFromVector: @@ -808,19 +806,9 @@ cpdef merge_filemetadata(object filemetadata_list): -------- cudf.io.parquet.merge_row_group_metadata """ - cdef vector[unique_ptr[vector[uint8_t]]] list_c - cdef vector[uint8_t] blob_c - cdef unique_ptr[vector[uint8_t]] output_c - - for blob_py in filemetadata_list: - blob_c = blob_py - list_c.push_back(move(make_unique[vector[uint8_t]](blob_c))) - - with nogil: - output_c = move(parquet_merge_metadata(list_c)) - - out_metadata_py = BufferArrayFromVector.from_unique_ptr(move(output_c)) - return np.asarray(out_metadata_py) + return np.asarray( + plc.io.parquet.merge_row_group_metadata(filemetadata_list).obj + ) cdef statistics_freq _get_stat_freq(str statistics): diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 1a61c20d783..79080fa7243 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -91,3 +91,5 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptions build(self) cpdef memoryview write_parquet(ParquetWriterOptions options) + +cpdef memoryview merge_row_group_metadata(list metdata_list) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index eb2ca68109b..3eb3d7c3a92 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -78,3 +78,4 @@ class ParquetWriterOptionsBuilder: def build(self) -> ParquetWriterOptions: ... def write_parquet(options: ParquetWriterOptions) -> memoryview: ... +def merge_row_group_metadata(metdata_list: list) -> memoryview: ... diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index b95b1f39de1..93843c932ad 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -2,7 +2,7 @@ from cython.operator cimport dereference from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport unique_ptr, make_unique from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.parquet cimport ( read_parquet as cpp_read_parquet, write_parquet as cpp_write_parquet, parquet_writer_options, + merge_row_group_metadata as cpp_merge_row_group_metadata, ) from pylibcudf.libcudf.io.types cimport ( compression_type, @@ -38,10 +39,10 @@ __all__ = [ "ParquetWriterOptions", "ParquetWriterOptionsBuilder", "read_parquet", - "write_parquet" + "write_parquet", + "merge_row_group_metadata", ] - cdef parquet_reader_options _setup_parquet_reader_options( SourceInfo source_info, list columns = None, @@ -577,3 +578,32 @@ cpdef memoryview write_parquet(ParquetWriterOptions options): c_result = cpp_write_parquet(c_options) return memoryview(HostBuffer.from_unique_ptr(move(c_result))) + + +cpdef memoryview merge_row_group_metadata(list metdata_list): + """ + Merges multiple raw metadata blobs that were previously + created by write_parquet into a single metadata blob. + + For details, see :cpp:func:`merge_row_group_metadata`. + + Parameters + ---------- + metdata_list : list + List of input file metadata + + Returns + ------- + memoryview + A parquet-compatible blob that contains the data for all row groups in the list + """ + cdef vector[unique_ptr[vector[uint8_t]]] list_c + cdef unique_ptr[vector[uint8_t]] output_c + + for blob in metdata_list: + list_c.push_back(move(make_unique[vector[uint8_t]]( blob))) + + with nogil: + output_c = move(cpp_merge_row_group_metadata(list_c)) + + return memoryview(HostBuffer.from_unique_ptr(move(output_c)))