Skip to content

Commit

Permalink
Add some missing optional fields to the Parquet RowGroup metadata (#1…
Browse files Browse the repository at this point in the history
…5421)

This PR adds the `sorting_columns`, `file_offset`, `total_compressed_size`, and `ordinal` optional fields to the Parquet `RowGroup` metadata object.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Mike Wilson (https://github.com/hyperbolic2346)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #15421
  • Loading branch information
etseidl authored Apr 23, 2024
1 parent e6d9b9f commit 6780e59
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 9 deletions.
64 changes: 64 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,15 @@ class chunked_parquet_reader {
* @file
*/

/**
* @brief Struct used to describe column sorting metadata
*/
struct sorting_column {
int column_idx{}; //!< leaf column index within the row group
bool is_descending{false}; //!< true if sort order is descending
bool is_nulls_first{true}; //!< true if nulls come before non-null values
};

class parquet_writer_options_builder;

/**
Expand Down Expand Up @@ -564,6 +573,8 @@ class parquet_writer_options {
std::shared_ptr<writer_compression_statistics> _compression_stats;
// write V2 page headers?
bool _v2_page_headers = false;
// Which columns in _table are used for sorting
std::optional<std::vector<sorting_column>> _sorting_columns;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -762,6 +773,13 @@ class parquet_writer_options {
*/
[[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; }

/**
* @brief Returns the sorting_columns.
*
* @return Column sort order metadata
*/
[[nodiscard]] auto const& get_sorting_columns() const { return _sorting_columns; }

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -893,6 +911,16 @@ class parquet_writer_options {
* @param val Boolean value to enable/disable writing of V2 page headers.
*/
void enable_write_v2_headers(bool val) { _v2_page_headers = val; }

/**
* @brief Sets sorting columns.
*
* @param sorting_columns Column sort order metadata
*/
void set_sorting_columns(std::vector<sorting_column> sorting_columns)
{
_sorting_columns = std::move(sorting_columns);
}
};

/**
Expand Down Expand Up @@ -1144,6 +1172,14 @@ class parquet_writer_options_builder {
*/
parquet_writer_options_builder& write_v2_headers(bool enabled);

/**
* @brief Sets column sorting metadata to chunked_parquet_writer_options.
*
* @param sorting_columns Column sort order metadata
* @return this for chaining
*/
parquet_writer_options_builder& sorting_columns(std::vector<sorting_column> sorting_columns);

/**
* @brief move parquet_writer_options member once it's built.
*/
Expand Down Expand Up @@ -1231,6 +1267,8 @@ class chunked_parquet_writer_options {
std::shared_ptr<writer_compression_statistics> _compression_stats;
// write V2 page headers?
bool _v2_page_headers = false;
// Which columns in _table are used for sorting
std::optional<std::vector<sorting_column>> _sorting_columns;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -1385,6 +1423,13 @@ class chunked_parquet_writer_options {
*/
[[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; }

/**
* @brief Returns the sorting_columns.
*
* @return Column sort order metadata
*/
[[nodiscard]] auto const& get_sorting_columns() const { return _sorting_columns; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1502,6 +1547,16 @@ class chunked_parquet_writer_options {
*/
void enable_write_v2_headers(bool val) { _v2_page_headers = val; }

/**
* @brief Sets sorting columns.
*
* @param sorting_columns Column sort order metadata
*/
void set_sorting_columns(std::vector<sorting_column> sorting_columns)
{
_sorting_columns = std::move(sorting_columns);
}

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1741,6 +1796,15 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets column sorting metadata to chunked_parquet_writer_options.
*
* @param sorting_columns Column sort order metadata
* @return this for chaining
*/
chunked_parquet_writer_options_builder& sorting_columns(
std::vector<sorting_column> sorting_columns);

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,13 @@ parquet_writer_options_builder& parquet_writer_options_builder::write_v2_headers
return *this;
}

parquet_writer_options_builder& parquet_writer_options_builder::sorting_columns(
std::vector<sorting_column> sorting_columns)
{
options._sorting_columns = std::move(sorting_columns);
return *this;
}

void chunked_parquet_writer_options::set_key_value_metadata(
std::vector<std::map<std::string, std::string>> metadata)
{
Expand Down Expand Up @@ -889,6 +896,13 @@ chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::
return *this;
}

chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::sorting_columns(
std::vector<sorting_column> sorting_columns)
{
options._sorting_columns = std::move(sorting_columns);
return *this;
}

chunked_parquet_writer_options_builder&
chunked_parquet_writer_options_builder::max_page_fragment_size(size_type val)
{
Expand Down
22 changes: 21 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "compact_protocol_reader.hpp"

#include "parquet.hpp"

#include <cudf/utilities/error.hpp>

#include <algorithm>
Expand Down Expand Up @@ -171,6 +173,7 @@ class parquet_field_int : public parquet_field {
};

using parquet_field_int8 = parquet_field_int<int8_t, FieldType::I8>;
using parquet_field_int16 = parquet_field_int<int16_t, FieldType::I16>;
using parquet_field_int32 = parquet_field_int<int32_t, FieldType::I32>;
using parquet_field_int64 = parquet_field_int<int64_t, FieldType::I64>;

Expand Down Expand Up @@ -618,9 +621,18 @@ void CompactProtocolReader::read(IntType* i)

void CompactProtocolReader::read(RowGroup* r)
{
using optional_i16 = parquet_field_optional<int16_t, parquet_field_int16>;
using optional_i64 = parquet_field_optional<int64_t, parquet_field_int64>;
using optional_list_sorting_column =
parquet_field_optional<std::vector<SortingColumn>, parquet_field_struct_list<SortingColumn>>;

auto op = std::make_tuple(parquet_field_struct_list(1, r->columns),
parquet_field_int64(2, r->total_byte_size),
parquet_field_int64(3, r->num_rows));
parquet_field_int64(3, r->num_rows),
optional_list_sorting_column(4, r->sorting_columns),
optional_i64(5, r->file_offset),
optional_i64(6, r->total_compressed_size),
optional_i16(7, r->ordinal));
function_builder(this, op);
}

Expand Down Expand Up @@ -762,6 +774,14 @@ void CompactProtocolReader::read(ColumnOrder* c)
function_builder(this, op);
}

void CompactProtocolReader::read(SortingColumn* s)
{
auto op = std::make_tuple(parquet_field_int32(1, s->column_idx),
parquet_field_bool(2, s->descending),
parquet_field_bool(3, s->nulls_first));
function_builder(this, op);
}

/**
* @brief Constructs the schema from the file-level metadata
*
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -120,6 +120,7 @@ class CompactProtocolReader {
void read(ColumnIndex* c);
void read(Statistics* s);
void read(ColumnOrder* c);
void read(SortingColumn* s);

public:
static int NumRequiredBits(uint32_t max_level) noexcept
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "compact_protocol_writer.hpp"

#include "parquet.hpp"

#include <cudf/utilities/error.hpp>

namespace cudf::io::parquet::detail {
Expand Down Expand Up @@ -140,6 +142,10 @@ size_t CompactProtocolWriter::write(RowGroup const& r)
c.field_struct_list(1, r.columns);
c.field_int(2, r.total_byte_size);
c.field_int(3, r.num_rows);
if (r.sorting_columns.has_value()) { c.field_struct_list(4, r.sorting_columns.value()); }
if (r.file_offset.has_value()) { c.field_int(5, r.file_offset.value()); }
if (r.total_compressed_size.has_value()) { c.field_int(6, r.total_compressed_size.value()); }
if (r.ordinal.has_value()) { c.field_int16(7, r.ordinal.value()); }
return c.value();
}

Expand Down Expand Up @@ -242,6 +248,15 @@ size_t CompactProtocolWriter::write(ColumnOrder const& co)
return c.value();
}

size_t CompactProtocolWriter::write(SortingColumn const& sc)
{
CompactProtocolFieldWriter c(*this);
c.field_int(1, sc.column_idx);
c.field_bool(2, sc.descending);
c.field_bool(3, sc.nulls_first);
return c.value();
}

void CompactProtocolFieldWriter::put_byte(uint8_t v) { writer.m_buf.push_back(v); }

void CompactProtocolFieldWriter::put_byte(uint8_t const* raw, uint32_t len)
Expand Down Expand Up @@ -292,6 +307,13 @@ inline void CompactProtocolFieldWriter::field_int8(int field, int8_t val)
current_field_value = field;
}

inline void CompactProtocolFieldWriter::field_int16(int field, int16_t val)
{
put_field_header(field, current_field_value, FieldType::I16);
put_int(val);
current_field_value = field;
}

inline void CompactProtocolFieldWriter::field_int(int field, int32_t val)
{
put_field_header(field, current_field_value, FieldType::I32);
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CompactProtocolWriter {
size_t write(OffsetIndex const&);
size_t write(SizeStatistics const&);
size_t write(ColumnOrder const&);
size_t write(SortingColumn const&);

protected:
std::vector<uint8_t>& m_buf;
Expand Down Expand Up @@ -91,6 +92,8 @@ class CompactProtocolFieldWriter {

inline void field_int8(int field, int8_t val);

inline void field_int16(int field, int16_t val);

inline void field_int(int field, int32_t val);

inline void field_int(int field, int64_t val);
Expand Down
25 changes: 23 additions & 2 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -322,6 +322,15 @@ struct ColumnIndex {
thrust::optional<std::vector<int64_t>> definition_level_histogram;
};

/**
* @brief Thrift-derived struct describing column sort order
*/
struct SortingColumn {
int32_t column_idx; // The column index (in this row group)
bool descending; // If true, indicates this column is sorted in descending order
bool nulls_first; // If true, nulls will come before non-null values
};

/**
* @brief Thrift-derived struct describing a column chunk
*/
Expand Down Expand Up @@ -374,9 +383,21 @@ struct ColumnChunk {
* consisting of a column chunk for each column.
*/
struct RowGroup {
int64_t total_byte_size = 0;
// Metadata for each column chunk in this row group.
std::vector<ColumnChunk> columns;
// Total byte size of all the uncompressed column data in this row group
int64_t total_byte_size = 0;
// Number of rows in this row group
int64_t num_rows = 0;
// If set, specifies a sort ordering of the rows in this RowGroup.
// The sorting columns can be a subset of all the columns.
thrust::optional<std::vector<SortingColumn>> sorting_columns;
// Byte offset from beginning of file to first page (data or dictionary) in this row group
thrust::optional<int64_t> file_offset;
// Total byte size of all compressed (and potentially encrypted) column data in this row group
thrust::optional<int64_t> total_compressed_size;
// Row group ordinal in the file
thrust::optional<int16_t> ordinal;
};

/**
Expand Down
Loading

0 comments on commit 6780e59

Please sign in to comment.