Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable get_token_stream to include LineEnd tokens with optional parameter. #15605

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cpp/include/cudf/io/detail/tokenize_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,30 @@ enum token_t : PdaTokenT {

namespace detail {

/**
* @brief Decision to keep or discard LineEnd tokens in the output token stream
*/
enum class LineEndTokenOption { Keep, Discard };

/**
* @brief Parses the given JSON string and emits a sequence of tokens that demarcate relevant
* sections from the input.
*
* @param json_in The JSON input
* @param options Parsing options specifying the parsing behaviour
* @param line_end_option option whether to keep or discard line_end_token
* @param stream The CUDA stream to which kernels are dispatched
* @param mr Optional, resource with which to allocate
* @return Pair of device vectors, where the first vector represents the token types and the second
* vector represents the index within the input corresponding to each token
*/
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> get_token_stream(
device_span<SymbolT const> json_in,
cudf::io::json_reader_options const& options,
cudf::io::json::detail::LineEndTokenOption line_end_option,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Parses the given JSON string and emits a sequence of tokens that demarcate relevant
* sections from the input.
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,11 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
auto gpu_tree = [&]() {
// Parse the JSON and get the token stream
const auto [tokens_gpu, token_indices_gpu] =
get_token_stream(d_input, options, stream, rmm::mr::get_current_device_resource());
get_token_stream(d_input,
options,
LineEndTokenOption::Discard,
stream,
rmm::mr::get_current_device_resource());
// gpu tree generation
return get_tree_representation(tokens_gpu,
token_indices_gpu,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ void get_stack_context(device_span<SymbolT const> json_in,
*
* @param tokens The tokens to be post-processed
* @param token_indices The tokens' corresponding indices that are post-processed
* @param line_end_option option whether to keep or discard line_end_token
* @param stream The cuda stream to dispatch GPU kernels to
* @return Returns the post-processed token stream
*/
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
LineEndTokenOption line_end_option,
rmm::cuda_stream_view stream);

/**
Expand Down
55 changes: 39 additions & 16 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ struct UnwrapTokenFromSymbolOp {
* invalid lines.
*/
struct TransduceToken {
detail::LineEndTokenOption line_end_option;
template <typename RelativeOffsetT, typename SymbolT>
constexpr CUDF_HOST_DEVICE SymbolT operator()(StateT const state_id,
SymbolGroupT const match_id,
Expand All @@ -249,8 +250,9 @@ struct TransduceToken {
match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER));

if (is_end_of_invalid_line) {
return relative_offset == 0 ? SymbolT{token_t::StructEnd, 0}
: SymbolT{token_t::StructBegin, 0};
return relative_offset == 0 ? SymbolT{token_t::StructEnd, 0}
: relative_offset == 1 ? SymbolT{token_t::StructBegin, 0}
: SymbolT{token_t::LineEnd, 0};
} else {
return read_symbol;
}
Expand All @@ -262,20 +264,25 @@ struct TransduceToken {
SymbolT const read_symbol) const
{
// Number of tokens emitted on invalid lines
constexpr int32_t num_inv_tokens = 2;
int32_t num_inv_tokens = line_end_option == detail::LineEndTokenOption::Discard ? 2 : 3;

const bool is_delimiter = match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER);

// If state is either invalid or we're entering an invalid state, we discard tokens
const bool is_part_of_invalid_line =
const bool is_part_of_valid_line =
(match_id != static_cast<SymbolGroupT>(dfa_symbol_group_id::ERROR) &&
state_id == static_cast<StateT>(TT_VLD));

// Indicates whether we transition from an invalid line to a potentially valid line
const bool is_end_of_invalid_line = (state_id == static_cast<StateT>(TT_INV) && is_delimiter);

int32_t const emit_count =
is_end_of_invalid_line ? num_inv_tokens : (is_part_of_invalid_line && !is_delimiter ? 1 : 0);
is_end_of_invalid_line
? num_inv_tokens
: (is_part_of_valid_line &&
(is_delimiter ? (line_end_option == detail::LineEndTokenOption::Keep) : true)
? 1
: 0);
return emit_count;
}
};
Expand Down Expand Up @@ -1479,16 +1486,12 @@ void get_stack_context(device_span<SymbolT const> json_in,
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
LineEndTokenOption line_end_option,
rmm::cuda_stream_view stream)
{
// Instantiate FST for post-processing the token stream to remove all tokens that belong to an
// invalid JSON line
token_filter::UnwrapTokenFromSymbolOp sgid_op{};
auto filter_fst =
fst::detail::make_fst(fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor(token_filter::TransduceToken{}),
stream);

auto const mr = rmm::mr::get_current_device_resource();
rmm::device_scalar<SymbolOffsetT> d_num_selected_tokens(stream, mr);
Expand All @@ -1497,10 +1500,19 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> pr

// The FST is run on the reverse token stream, discarding all tokens between ErrorBegin and the
// next LineEnd (LineEnd, inv_token_0, inv_token_1, ..., inv_token_n, ErrorBegin, LineEnd, ...),
// emitting a [StructBegin, StructEnd] pair on the end of such an invalid line. In that example,
// inv_token_i for i in [0, n] together with the ErrorBegin are removed and replaced with
// StructBegin, StructEnd. Also, all LineEnd are removed as well, as these are not relevant after
// this stage anymore
// emitting a [StructBegin, StructEnd] pair on the end of such an invalid line when
// remove_line_end_token is true. In that example, inv_token_i for i in [0, n] together with the
// ErrorBegin are removed and replaced with StructBegin, StructEnd. Also, LineEnd tokens are
// removed. However, if remove_line_end_token is false the FST replaces inv_token_i for i in [0,
// n] and ErrorBegin with [LineEnd, StructBegin, StructEnd] in case of invalid lines.
// Additionally, all LineEnd tokens in valid lines are retained.

auto filter_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor(token_filter::TransduceToken{line_end_option}),
stream);

filter_fst.Transduce(
thrust::make_reverse_iterator(thrust::make_zip_iterator(tokens.data(), token_indices.data()) +
tokens.size()),
Expand Down Expand Up @@ -1531,6 +1543,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> pr
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> get_token_stream(
device_span<SymbolT const> json_in,
cudf::io::json_reader_options const& options,
LineEndTokenOption line_end_option,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand Down Expand Up @@ -1632,7 +1645,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
if (delimiter_offset == 1) {
tokens.set_element(0, token_t::LineEnd, stream);
auto [filtered_tokens, filtered_tokens_indices] =
process_token_stream(tokens, tokens_indices, stream);
process_token_stream(tokens, tokens_indices, line_end_option, stream);
tokens = std::move(filtered_tokens);
tokens_indices = std::move(filtered_tokens_indices);
}
Expand All @@ -1643,6 +1656,15 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
return std::make_pair(std::move(tokens), std::move(tokens_indices));
}

std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> get_token_stream(
device_span<SymbolT const> json_in,
cudf::io::json_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
return get_token_stream(json_in, options, LineEndTokenOption::Discard, stream, mr);
}

/**
* @brief Parses the given JSON string and generates a tree representation of the given input.
*
Expand Down Expand Up @@ -1671,7 +1693,8 @@ void make_json_column(json_column& root_column,
CUDF_FUNC_RANGE();

// Parse the JSON and get the token stream
auto const [d_tokens_gpu, d_token_indices_gpu] = get_token_stream(d_input, options, stream, mr);
auto const [d_tokens_gpu, d_token_indices_gpu] =
get_token_stream(d_input, options, LineEndTokenOption::Discard, stream, mr);

// Copy the JSON tokens to the host
thrust::host_vector<PdaTokenT> tokens =
Expand Down
40 changes: 30 additions & 10 deletions cpp/tests/io/json_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,12 @@ TEST_F(JsonTest, TreeRepresentation)
cudf::io::json_reader_options const options{};

// Parse the JSON and get the token stream
auto const [tokens_gpu, token_indices_gpu] = cudf::io::json::detail::get_token_stream(
d_input, options, stream, rmm::mr::get_current_device_resource());
auto const [tokens_gpu, token_indices_gpu] =
cudf::io::json::detail::get_token_stream(d_input,
options,
cudf::io::json::detail::LineEndTokenOption::Discard,
stream,
rmm::mr::get_current_device_resource());

// Get the JSON's tree representation
auto gpu_tree = cuio_json::detail::get_tree_representation(
Expand Down Expand Up @@ -679,8 +683,12 @@ TEST_F(JsonTest, TreeRepresentation2)
cudf::io::json_reader_options const options{};

// Parse the JSON and get the token stream
auto const [tokens_gpu, token_indices_gpu] = cudf::io::json::detail::get_token_stream(
d_input, options, stream, rmm::mr::get_current_device_resource());
auto const [tokens_gpu, token_indices_gpu] =
cudf::io::json::detail::get_token_stream(d_input,
options,
cudf::io::json::detail::LineEndTokenOption::Discard,
stream,
rmm::mr::get_current_device_resource());

// Get the JSON's tree representation
auto gpu_tree = cuio_json::detail::get_tree_representation(
Expand Down Expand Up @@ -754,8 +762,12 @@ TEST_F(JsonTest, TreeRepresentation3)
options.enable_lines(true);

// Parse the JSON and get the token stream
auto const [tokens_gpu, token_indices_gpu] = cudf::io::json::detail::get_token_stream(
d_input, options, stream, rmm::mr::get_current_device_resource());
auto const [tokens_gpu, token_indices_gpu] =
cudf::io::json::detail::get_token_stream(d_input,
options,
cudf::io::json::detail::LineEndTokenOption::Discard,
stream,
rmm::mr::get_current_device_resource());

// Get the JSON's tree representation
auto gpu_tree = cuio_json::detail::get_tree_representation(
Expand All @@ -780,8 +792,12 @@ TEST_F(JsonTest, TreeRepresentationError)
cudf::io::json_reader_options const options{};

// Parse the JSON and get the token stream
auto const [tokens_gpu, token_indices_gpu] = cudf::io::json::detail::get_token_stream(
d_input, options, stream, rmm::mr::get_current_device_resource());
auto const [tokens_gpu, token_indices_gpu] =
cudf::io::json::detail::get_token_stream(d_input,
options,
cudf::io::json::detail::LineEndTokenOption::Discard,
stream,
rmm::mr::get_current_device_resource());

// Get the JSON's tree representation
// This JSON is invalid and will raise an exception.
Expand Down Expand Up @@ -863,8 +879,12 @@ TEST_P(JsonTreeTraversalTest, CPUvsGPUTraversal)
static_cast<size_t>(d_scalar.size())};

// Parse the JSON and get the token stream
auto const [tokens_gpu, token_indices_gpu] = cudf::io::json::detail::get_token_stream(
d_input, options, stream, rmm::mr::get_current_device_resource());
auto const [tokens_gpu, token_indices_gpu] =
cudf::io::json::detail::get_token_stream(d_input,
options,
cudf::io::json::detail::LineEndTokenOption::Discard,
stream,
rmm::mr::get_current_device_resource());
// host tree generation
auto cpu_tree = get_tree_representation_cpu(tokens_gpu, token_indices_gpu, options, stream);
bool const is_array_of_arrays =
Expand Down
Loading
Loading