diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index f3332a23992..bfd89200786 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -31,12 +31,8 @@ constexpr int rolling_buf_size = decode_block_size * 2; constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); template -static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_value_count, - page_state_s* s, - state_buf* sb, - level_t const* const def, - int t, - bool nullable_with_nulls) +static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat( + int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) { constexpr int num_warps = decode_block_size / cudf::detail::warp_size; constexpr int max_batch_size = num_warps * cudf::detail::warp_size; @@ -63,13 +59,9 @@ static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_v // definition level. only need to process for nullable columns int d = 0; if constexpr (nullable) { - if (nullable_with_nulls) { - d = t < batch_size - ? static_cast(def[rolling_index(value_count + t)]) - : -1; - } else { - d = t < batch_size ? 1 : -1; - } + d = t < batch_size + ? static_cast(def[rolling_index(value_count + t)]) + : -1; } int const thread_value_count = t + 1; @@ -426,17 +418,13 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if this is a nullable column - if (nullable) { - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); - } else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - } + // only need to process definition levels if the column has nulls + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, def, t, nullable_with_nulls); + next_valid_count = + gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for @@ -444,7 +432,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t, false); + processed_count, s, sb, nullptr, t); } __syncthreads(); @@ -547,18 +535,14 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if this is a nullable column - if (nullable) { - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); - } else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - } + // only need to process definition levels if the column has nulls + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); // count of valid items in this batch - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, def, t, nullable_with_nulls); + next_valid_count = + gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for @@ -566,7 +550,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t, false); + processed_count, s, sb, nullptr, t); } __syncthreads(); @@ -671,17 +655,13 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if this is a nullable column - if (nullable) { - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); - } else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - } + // only need to process definition levels if the column has nulls + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, def, t, nullable_with_nulls); + next_valid_count = + gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for @@ -689,7 +669,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t, false); + processed_count, s, sb, nullptr, t); } __syncthreads(); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 55633b97cf4..a5cd7d06536 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1498,8 +1498,10 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // if we haven't already processed this column because it is part of a struct hierarchy else if (out_buf.size == 0) { // add 1 for the offset if this is a list column - out_buf.create( + // we're going to start null mask as all valid and then turn bits off if necessary + out_buf.create_with_mask( out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows, + cudf::mask_state::ALL_VALID, _stream, _mr); } @@ -1577,7 +1579,8 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } // allocate - out_buf.create(size, _stream, _mr); + // we're going to start null mask as all valid and then turn bits off if necessary + out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr); } } } diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 5ef43599838..e5d4e1a360f 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -91,9 +91,10 @@ void copy_buffer_data(string_policy const& buff, string_policy& new_buff) } // namespace template -void column_buffer_base::create(size_type _size, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +void column_buffer_base::create_with_mask(size_type _size, + cudf::mask_state null_mask_state, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { size = _size; _mr = mr; @@ -111,11 +112,19 @@ void column_buffer_base::create(size_type _size, default: _data = create_data(type, size, stream, _mr); break; } if (is_nullable) { - _null_mask = cudf::detail::create_null_mask( - size, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), _mr); + _null_mask = + cudf::detail::create_null_mask(size, null_mask_state, rmm::cuda_stream_view(stream), _mr); } } +template +void column_buffer_base::create(size_type _size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + create_with_mask(_size, mask_state::ALL_NULL, stream, mr); +} + template string_policy column_buffer_base::empty_like(string_policy const& input) { diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index ace1396bc09..e6bfae0681a 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -115,6 +115,13 @@ class column_buffer_base { // preprocessing steps such as in the Parquet reader void create(size_type _size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); + // like create(), but also takes a `cudf::mask_state` to allow initializing the null mask as + // something other than `ALL_NULL` + void create_with_mask(size_type _size, + cudf::mask_state null_mask_state, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + // Create a new column_buffer that has empty data but with the same basic information as the // input column, including same type, nullability, name, and user_data. static string_policy empty_like(string_policy const& input);