Skip to content

Commit

Permalink
Migrate cudf::io::merge_row_group_metadata to pylibcudf (#17491)
Browse files Browse the repository at this point in the history
Apart of #15162

Authors:
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Matthew Roeschke (https://github.com/mroeschke)

URL: #17491
  • Loading branch information
Matt711 authored Dec 4, 2024
1 parent 351ece5 commit cd3e352
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
22 changes: 5 additions & 17 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ except ImportError:

import numpy as np

from cython.operator cimport dereference

from cudf.api.types import is_list_like

from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io
Expand All @@ -25,7 +23,7 @@ from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
Expand All @@ -35,7 +33,6 @@ from pylibcudf.io.parquet cimport ChunkedParquetReader
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_writer_options,
merge_row_group_metadata as parquet_merge_metadata,
parquet_chunked_writer as cpp_parquet_chunked_writer,
parquet_writer_options,
write_parquet as parquet_writer,
Expand Down Expand Up @@ -64,6 +61,7 @@ import pylibcudf as plc
from pylibcudf cimport Table

from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT
from cython.operator cimport dereference


cdef class BufferArrayFromVector:
Expand Down Expand Up @@ -808,19 +806,9 @@ cpdef merge_filemetadata(object filemetadata_list):
--------
cudf.io.parquet.merge_row_group_metadata
"""
cdef vector[unique_ptr[vector[uint8_t]]] list_c
cdef vector[uint8_t] blob_c
cdef unique_ptr[vector[uint8_t]] output_c

for blob_py in filemetadata_list:
blob_c = blob_py
list_c.push_back(move(make_unique[vector[uint8_t]](blob_c)))

with nogil:
output_c = move(parquet_merge_metadata(list_c))

out_metadata_py = BufferArrayFromVector.from_unique_ptr(move(output_c))
return np.asarray(out_metadata_py)
return np.asarray(
plc.io.parquet.merge_row_group_metadata(filemetadata_list).obj
)


cdef statistics_freq _get_stat_freq(str statistics):
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ cdef class ParquetWriterOptionsBuilder:
cpdef ParquetWriterOptions build(self)

cpdef memoryview write_parquet(ParquetWriterOptions options)

cpdef memoryview merge_row_group_metadata(list metdata_list)
1 change: 1 addition & 0 deletions python/pylibcudf/pylibcudf/io/parquet.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ class ParquetWriterOptionsBuilder:
def build(self) -> ParquetWriterOptions: ...

def write_parquet(options: ParquetWriterOptions) -> memoryview: ...
def merge_row_group_metadata(metdata_list: list) -> memoryview: ...
36 changes: 33 additions & 3 deletions python/pylibcudf/pylibcudf/io/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from cython.operator cimport dereference
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.memory cimport unique_ptr, make_unique
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
Expand All @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.parquet cimport (
read_parquet as cpp_read_parquet,
write_parquet as cpp_write_parquet,
parquet_writer_options,
merge_row_group_metadata as cpp_merge_row_group_metadata,
)
from pylibcudf.libcudf.io.types cimport (
compression_type,
Expand All @@ -38,10 +39,10 @@ __all__ = [
"ParquetWriterOptions",
"ParquetWriterOptionsBuilder",
"read_parquet",
"write_parquet"
"write_parquet",
"merge_row_group_metadata",
]


cdef parquet_reader_options _setup_parquet_reader_options(
SourceInfo source_info,
list columns = None,
Expand Down Expand Up @@ -577,3 +578,32 @@ cpdef memoryview write_parquet(ParquetWriterOptions options):
c_result = cpp_write_parquet(c_options)

return memoryview(HostBuffer.from_unique_ptr(move(c_result)))


cpdef memoryview merge_row_group_metadata(list metdata_list):
"""
Merges multiple raw metadata blobs that were previously
created by write_parquet into a single metadata blob.
For details, see :cpp:func:`merge_row_group_metadata`.
Parameters
----------
metdata_list : list
List of input file metadata
Returns
-------
memoryview
A parquet-compatible blob that contains the data for all row groups in the list
"""
cdef vector[unique_ptr[vector[uint8_t]]] list_c
cdef unique_ptr[vector[uint8_t]] output_c

for blob in metdata_list:
list_c.push_back(move(make_unique[vector[uint8_t]](<vector[uint8_t]> blob)))

with nogil:
output_c = move(cpp_merge_row_group_metadata(list_c))

return memoryview(HostBuffer.from_unique_ptr(move(output_c)))

0 comments on commit cd3e352

Please sign in to comment.