diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 0406d6e3e4c..8bfcacdb47f 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -564,7 +564,7 @@ class parquet_writer_options { // Maximum size of min or max values in column index int32_t _column_index_truncate_length = default_column_index_truncate_length; // When to use dictionary encoding for data - dictionary_policy _dictionary_policy = dictionary_policy::ALWAYS; + dictionary_policy _dictionary_policy = dictionary_policy::ADAPTIVE; // Maximum size of column chunk dictionary (in bytes) size_t _max_dictionary_size = default_max_dictionary_size; // Maximum number of rows in a page fragment @@ -1095,7 +1095,7 @@ class parquet_writer_options_builder { * dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in * the disabling of compression for columns that would otherwise be compressed. * - * The default value is dictionary_policy::ALWAYS. + * The default value is dictionary_policy::ADAPTIVE. * * @param val policy for dictionary use * @return this for chaining @@ -1258,7 +1258,7 @@ class chunked_parquet_writer_options { // Maximum size of min or max values in column index int32_t _column_index_truncate_length = default_column_index_truncate_length; // When to use dictionary encoding for data - dictionary_policy _dictionary_policy = dictionary_policy::ALWAYS; + dictionary_policy _dictionary_policy = dictionary_policy::ADAPTIVE; // Maximum size of column chunk dictionary (in bytes) size_t _max_dictionary_size = default_max_dictionary_size; // Maximum number of rows in a page fragment @@ -1751,7 +1751,7 @@ class chunked_parquet_writer_options_builder { * dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in * the disabling of compression for columns that would otherwise be compressed. * - * The default value is dictionary_policy::ALWAYS. + * The default value is dictionary_policy::ADAPTIVE. * * @param val policy for dictionary use * @return this for chaining diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 8de16d06a9d..1680eb43700 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -74,6 +74,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: size_type get_row_group_size_rows() except + size_t get_max_page_size_bytes() except + size_type get_max_page_size_rows() except + + size_t get_max_dictionary_size() except + void set_partitions( vector[cudf_io_types.partition_info] partitions @@ -103,8 +104,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_row_group_size_rows(size_type val) except + void set_max_page_size_bytes(size_t val) except + void set_max_page_size_rows(size_type val) except + + void set_max_dictionary_size(size_t val) except + void enable_write_v2_headers(bool val) except + - void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except + + void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except + @staticmethod parquet_writer_options_builder builder( @@ -155,6 +157,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& max_page_size_rows( size_type val ) except + + parquet_writer_options_builder& max_dictionary_size( + size_t val + ) except + parquet_writer_options_builder& write_v2_headers( bool val ) except + @@ -179,6 +184,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: size_type get_row_group_size_rows() except + size_t get_max_page_size_bytes() except + size_type get_max_page_size_rows() except + + size_t get_max_dictionary_size() except + void set_metadata( cudf_io_types.table_input_metadata m @@ -202,8 +208,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_row_group_size_rows(size_type val) except + void set_max_page_size_bytes(size_t val) except + void set_max_page_size_rows(size_type val) except + + void set_max_dictionary_size(size_t val) except + void enable_write_v2_headers(bool val) except + - void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except + + void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except + @staticmethod chunked_parquet_writer_options_builder builder( @@ -245,6 +252,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& max_page_size_rows( size_type val ) except + + chunked_parquet_writer_options_builder& max_dictionary_size( + size_t val + ) except + parquet_writer_options_builder& write_v2_headers( bool val ) except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 9ce9aad18f7..dcfa087a1fa 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -399,6 +399,7 @@ def write_parquet( object row_group_size_rows=None, object max_page_size_bytes=None, object max_page_size_rows=None, + object max_dictionary_size=None, object partitions_info=None, object force_nullable_schema=False, header_version="1.0", @@ -478,7 +479,7 @@ def write_parquet( ) dict_policy = ( - cudf_io_types.dictionary_policy.ALWAYS + cudf_io_types.dictionary_policy.ADAPTIVE if use_dictionary else cudf_io_types.dictionary_policy.NEVER ) @@ -528,6 +529,8 @@ def write_parquet( args.set_max_page_size_bytes(max_page_size_bytes) if max_page_size_rows is not None: args.set_max_page_size_rows(max_page_size_rows) + if max_dictionary_size is not None: + args.set_max_dictionary_size(max_dictionary_size) with nogil: out_metadata_c = move(parquet_writer(args)) @@ -571,7 +574,14 @@ cdef class ParquetWriter: max_page_size_rows: int, default 20000 Maximum number of rows of each page of the output. By default, 20000 will be used. - + max_dictionary_size: int, default 1048576 + Maximum size of the dictionary page for each output column chunk. Dictionary + encoding for column chunks that exceeds this limit will be disabled. + By default, 1048576 (1MB) will be used. + use_dictionary : bool, default True + If ``True``, enable dictionary encoding for Parquet page data + subject to ``max_dictionary_size`` constraints. + If ``False``, disable dictionary encoding for Parquet page data. See Also -------- cudf.io.parquet.write_parquet @@ -588,13 +598,17 @@ cdef class ParquetWriter: cdef size_type row_group_size_rows cdef size_t max_page_size_bytes cdef size_type max_page_size_rows + cdef size_t max_dictionary_size + cdef cudf_io_types.dictionary_policy dict_policy def __cinit__(self, object filepath_or_buffer, object index=None, object compression="snappy", str statistics="ROWGROUP", int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT, int row_group_size_rows=1000000, int max_page_size_bytes=524288, - int max_page_size_rows=20000): + int max_page_size_rows=20000, + int max_dictionary_size=1048576, + bool use_dictionary=True): filepaths_or_buffers = ( list(filepath_or_buffer) if is_list_like(filepath_or_buffer) @@ -609,6 +623,12 @@ cdef class ParquetWriter: self.row_group_size_rows = row_group_size_rows self.max_page_size_bytes = max_page_size_bytes self.max_page_size_rows = max_page_size_rows + self.max_dictionary_size = max_dictionary_size + self.dict_policy = ( + cudf_io_types.dictionary_policy.ADAPTIVE + if use_dictionary + else cudf_io_types.dictionary_policy.NEVER + ) def write_table(self, table, object partitions_info=None): """ Writes a single table to the file """ @@ -726,8 +746,10 @@ cdef class ParquetWriter: .row_group_size_rows(self.row_group_size_rows) .max_page_size_bytes(self.max_page_size_bytes) .max_page_size_rows(self.max_page_size_rows) + .max_dictionary_size(self.max_dictionary_size) .build() ) + args.set_dictionary_policy(self.dict_policy) self.writer.reset(new cpp_parquet_chunked_writer(args)) self.initialized = True diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index dd1e59acaaa..a6c67d22af7 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -63,6 +63,7 @@ def _write_parquet( row_group_size_rows=None, max_page_size_bytes=None, max_page_size_rows=None, + max_dictionary_size=None, partitions_info=None, storage_options=None, force_nullable_schema=False, @@ -96,6 +97,7 @@ def _write_parquet( "row_group_size_rows": row_group_size_rows, "max_page_size_bytes": max_page_size_bytes, "max_page_size_rows": max_page_size_rows, + "max_dictionary_size": max_dictionary_size, "partitions_info": partitions_info, "force_nullable_schema": force_nullable_schema, "header_version": header_version, @@ -898,6 +900,7 @@ def to_parquet( row_group_size_rows=None, max_page_size_bytes=None, max_page_size_rows=None, + max_dictionary_size=None, storage_options=None, return_metadata=False, force_nullable_schema=False, @@ -974,6 +977,7 @@ def to_parquet( row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, + max_dictionary_size=max_dictionary_size, partitions_info=partition_info, storage_options=storage_options, force_nullable_schema=force_nullable_schema, diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index cf3c0e7f7a0..3680c1e0c62 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1890,6 +1890,43 @@ def test_parquet_writer_max_page_size(tmpdir, max_page_size_kwargs): assert s1 > s2 +@pytest.mark.parametrize("use_dict", [False, True]) +@pytest.mark.parametrize("max_dict_size", [0, 1048576]) +def test_parquet_writer_dictionary_setting(use_dict, max_dict_size): + # Simple test for checking the validity of dictionary encoding setting + # and behavior of ParquetWriter in cudf. + # Write a table with repetitive data with varying dictionary settings. + # Make sure the written columns are dictionary-encoded accordingly. + + # Table with repetitive data + table = cudf.DataFrame( + { + "int32": cudf.Series([1024] * 1024, dtype="int64"), + } + ) + + # Write to Parquet using ParquetWriter + buffer = BytesIO() + writer = ParquetWriter( + buffer, + use_dictionary=use_dict, + max_dictionary_size=max_dict_size, + ) + writer.write_table(table) + writer.close() + + # Read encodings from parquet file + got = pq.ParquetFile(buffer) + encodings = got.metadata.row_group(0).column(0).encodings + + # Check for `PLAIN_DICTIONARY` encoding if dictionary encoding enabled + # and dictionary page limit > 0 + if use_dict is True and max_dict_size > 0: + assert "PLAIN_DICTIONARY" in encodings + else: + assert "PLAIN_DICTIONARY" not in encodings + + @pytest.mark.parametrize("filename", ["myfile.parquet", None]) @pytest.mark.parametrize("cols", [["b"], ["c", "b"]]) def test_parquet_partitioned(tmpdir_factory, cols, filename): diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 9c7c687a6ed..18e81078587 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -278,6 +278,10 @@ max_page_size_rows: integer or None, default None Maximum number of rows of each page of the output. If None, 20000 will be used. +max_dictionary_size: integer or None, default None + Maximum size of the dictionary page for each output column chunk. Dictionary + encoding for column chunks that exceeds this limit will be disabled. + If None, 1048576 (1MB) will be used. storage_options : dict, optional, default None Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value @@ -292,8 +296,8 @@ ``return_metadata=True`` instead of specifying ``metadata_file_path`` use_dictionary : bool, default True When ``False``, prevents the use of dictionary encoding for Parquet page - data. When ``True``, dictionary encoding is preferred when not disabled due - to dictionary size constraints. + data. When ``True``, dictionary encoding is preferred subject to + ``max_dictionary_size`` constraints. header_version : {{'1.0', '2.0'}}, default "1.0" Controls whether to use version 1.0 or version 2.0 page headers when encoding. Version 1.0 is more portable, but version 2.0 enables the