Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into fea/plc/io/streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 authored Dec 18, 2024
2 parents 0fd96db + a95fbc8 commit 8758baa
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 222 deletions.
46 changes: 26 additions & 20 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,15 @@ def read_json(
if cudf.get_option("io.json.low_memory") and lines:
res_cols, res_col_names, res_child_names = (
plc.io.json.chunked_read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
plc.io.json._setup_json_reader_options(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
)
)
)
df = cudf.DataFrame._from_data(
Expand All @@ -181,19 +183,23 @@ def read_json(
return df
else:
table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset=byte_range[0]
if byte_range is not None
else 0,
byte_range_size=byte_range[1] if byte_range is not None else 0,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
extra_parameters=kwargs,
plc.io.json._setup_json_reader_options(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset=byte_range[0]
if byte_range is not None
else 0,
byte_range_size=byte_range[1]
if byte_range is not None
else 0,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=c_on_bad_lines,
extra_parameters=kwargs,
)
)

df = cudf.DataFrame._from_data(
Expand Down
10 changes: 6 additions & 4 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,12 @@ def slice_skip(tbl: plc.Table):
(name, typ, []) for name, typ in schema.items()
]
plc_tbl_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(paths),
lines=True,
dtypes=json_schema,
prune_columns=True,
plc.io.json._setup_json_reader_options(
plc.io.SourceInfo(paths),
lines=True,
dtypes=json_schema,
prune_columns=True,
)
)
# TODO: I don't think cudf-polars supports nested types in general right now
# (but when it does, we should pass child column names from nested columns in)
Expand Down
61 changes: 40 additions & 21 deletions python/pylibcudf/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ from pylibcudf.io.types cimport (
)
from pylibcudf.libcudf.io.json cimport (
json_recovery_mode_t,
json_reader_options,
json_reader_options_builder,
json_writer_options,
json_writer_options_builder,
)
Expand All @@ -16,20 +18,43 @@ from rmm._cuda.stream cimport Stream
from pylibcudf.table cimport Table


cpdef TableWithMetadata read_json(
SourceInfo source_info,
list dtypes = *,
compression_type compression = *,
bool lines = *,
size_t byte_range_offset = *,
size_t byte_range_size = *,
bool keep_quotes = *,
bool mixed_types_as_string = *,
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
dict extra_parameters = *,
Stream stream = *,
)
cdef class JsonReaderOptions:
cdef json_reader_options c_obj
cdef SourceInfo source
cpdef void set_dtypes(self, list types)
cpdef void enable_keep_quotes(self, bool keep_quotes)
cpdef void enable_mixed_types_as_string(self, bool mixed_types_as_string)
cpdef void enable_prune_columns(self, bool prune_columns)
cpdef void set_byte_range_offset(self, size_t offset)
cpdef void set_byte_range_size(self, size_t size)
cpdef void enable_lines(self, bool val)
# These hidden options are subjected to change without deprecation cycle.
# These are used to test libcudf JSON reader features, not used in cuDF.
cpdef void set_delimiter(self, str val)
cpdef void enable_dayfirst(self, bool val)
cpdef void enable_experimental(self, bool val)
cpdef void enable_normalize_single_quotes(self, bool val)
cpdef void enable_normalize_whitespace(self, bool val)
cpdef void set_strict_validation(self, bool val)
cpdef void allow_unquoted_control_chars(self, bool val)
cpdef void allow_numeric_leading_zeros(self, bool val)
cpdef void allow_nonnumeric_numbers(self, bool val)
cpdef void set_na_values(self, list vals)

cdef class JsonReaderOptionsBuilder:
cdef json_reader_options_builder c_obj
cdef SourceInfo source
cpdef JsonReaderOptionsBuilder compression(self, compression_type compression)
cpdef JsonReaderOptionsBuilder lines(self, bool val)
cpdef JsonReaderOptionsBuilder keep_quotes(self, bool val)
cpdef JsonReaderOptionsBuilder byte_range_offset(self, size_t byte_range_offset)
cpdef JsonReaderOptionsBuilder byte_range_size(self, size_t byte_range_size)
cpdef JsonReaderOptionsBuilder recovery_mode(
self, json_recovery_mode_t recovery_mode
)
cpdef build(self)

cpdef TableWithMetadata read_json(JsonReaderOptions options)

cdef class JsonWriterOptions:
cdef json_writer_options c_obj
Expand All @@ -52,13 +77,7 @@ cdef class JsonWriterOptionsBuilder:
cpdef void write_json(JsonWriterOptions options, Stream stream = *)

cpdef tuple chunked_read_json(
SourceInfo source_info,
list dtypes = *,
compression_type compression = *,
bool keep_quotes = *,
bool mixed_types_as_string = *,
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
JsonReaderOptions options,
int chunk_size= *,
Stream stream = *,
)
55 changes: 35 additions & 20 deletions python/pylibcudf/pylibcudf/io/json.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,40 @@ ChildNameToTypeMap: TypeAlias = Mapping[str, ChildNameToTypeMap]

NameAndType: TypeAlias = tuple[str, DataType, list[NameAndType]]

def read_json(
source_info: SourceInfo,
dtypes: list[NameAndType] | None = None,
compression: CompressionType = CompressionType.AUTO,
lines: bool = False,
byte_range_offset: int = 0,
byte_range_size: int = 0,
keep_quotes: bool = False,
mixed_types_as_string: bool = False,
prune_columns: bool = False,
recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL,
stream: Stream = None,
) -> TableWithMetadata: ...
class JsonReaderOptions:
def set_dtypes(
self, types: list[DataType] | list[NameAndType]
) -> None: ...
def enable_keep_quotes(self, keep_quotes: bool) -> None: ...
def enable_mixed_types_as_string(
self, mixed_types_as_string: bool
) -> None: ...
def enable_prune_columns(self, prune_columns: bool) -> None: ...
def set_byte_range_offset(self, offset: int) -> None: ...
def set_byte_range_size(self, size: int) -> None: ...
def enable_lines(self, val: bool) -> None: ...
def set_delimiter(self, val: str) -> None: ...
def enable_dayfirst(self, val: bool) -> None: ...
def enable_experimental(self, val: bool) -> None: ...
def enable_normalize_single_quotes(self, val: bool) -> None: ...
def enable_normalize_whitespace(self, val: bool) -> None: ...
def set_strict_validation(self, val: bool) -> None: ...
def allow_unquoted_control_chars(self, val: bool) -> None: ...
def allow_numeric_leading_zeros(self, val: bool) -> None: ...
def allow_nonnumeric_numbers(self, val: bool) -> None: ...
def set_na_values(self, vals: list[str]) -> None: ...
@staticmethod
def builder(source: SourceInfo) -> JsonReaderOptionsBuilder: ...

class JsonReaderOptionsBuilder:
def compression(self, compression: CompressionType) -> Self: ...
def lines(self, lines: bool) -> Self: ...
def byte_range_offset(self, byte_range_offset: int) -> Self: ...
def byte_range_size(self, byte_range_size: int) -> Self: ...
def recovery_mode(self, recovery_mode: JSONRecoveryMode) -> Self: ...
def build(self) -> JsonReaderOptions: ...

def read_json(options: JsonReaderOptions) -> TableWithMetadata: ...

class JsonWriterOptions:
@staticmethod
Expand All @@ -51,13 +72,7 @@ class JsonWriterOptionsBuilder:

def write_json(options: JsonWriterOptions, stream: Stream = None) -> None: ...
def chunked_read_json(
source_info: SourceInfo,
dtypes: list[NameAndType] | None = None,
compression: CompressionType = CompressionType.AUTO,
keep_quotes: bool = False,
mixed_types_as_string: bool = False,
prune_columns: bool = False,
recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL,
options: JsonReaderOptions,
chunk_size: int = 100_000_000,
stream: Stream = None,
) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ...
Loading

0 comments on commit 8758baa

Please sign in to comment.