diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 296f8685f6a..5b9fa83b33c 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,8 +12,8 @@ # the License. # ============================================================================= -set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx stream_compaction.pyx - string_casting.pyx strings_udf.pyx types.pyx utils.pyx +set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx string_casting.pyx strings_udf.pyx + types.pyx utils.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 78b92025deb..63090ef86c8 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -4,7 +4,6 @@ from . import ( groupby, interop, - stream_compaction, string_casting, strings_udf, ) diff --git a/python/cudf/cudf/_lib/stream_compaction.pyx b/python/cudf/cudf/_lib/stream_compaction.pyx deleted file mode 100644 index 1b8831940e3..00000000000 --- a/python/cudf/cudf/_lib/stream_compaction.pyx +++ /dev/null @@ -1,181 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from cudf.core.buffer import acquire_spill_lock - -from libcpp cimport bool - -from cudf._lib.column cimport Column -from cudf._lib.utils cimport columns_from_pylibcudf_table - -import pylibcudf - - -@acquire_spill_lock() -def drop_nulls(list columns, how="any", keys=None, thresh=None): - """ - Drops null rows from cols depending on key columns. - - Parameters - ---------- - columns : list of columns - how : "any" or "all". If thresh is None, drops rows of cols that have any - nulls or all nulls (respectively) in subset (default: "any") - keys : List of column indices. If set, then these columns are checked for - nulls rather than all of columns (optional) - thresh : Minimum number of non-nulls required to keep a row (optional) - - Returns - ------- - columns with null rows dropped - """ - if how not in {"any", "all"}: - raise ValueError("how must be 'any' or 'all'") - - keys = list(keys if keys is not None else range(len(columns))) - - # Note: If how == "all" and thresh is specified this prioritizes thresh - if thresh is not None: - keep_threshold = thresh - elif how == "all": - keep_threshold = 1 - else: - keep_threshold = len(keys) - - return columns_from_pylibcudf_table( - pylibcudf.stream_compaction.drop_nulls( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - keys, - keep_threshold, - ) - ) - - -@acquire_spill_lock() -def apply_boolean_mask(list columns, Column boolean_mask): - """ - Drops the rows which correspond to False in boolean_mask. - - Parameters - ---------- - columns : list of columns whose rows are dropped as per boolean_mask - boolean_mask : a boolean column of same size as source_table - - Returns - ------- - columns obtained from applying mask - """ - return columns_from_pylibcudf_table( - pylibcudf.stream_compaction.apply_boolean_mask( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - boolean_mask.to_pylibcudf(mode="read"), - ) - ) - - -_keep_options = { - "first": pylibcudf.stream_compaction.DuplicateKeepOption.KEEP_FIRST, - "last": pylibcudf.stream_compaction.DuplicateKeepOption.KEEP_LAST, - False: pylibcudf.stream_compaction.DuplicateKeepOption.KEEP_NONE, -} - - -@acquire_spill_lock() -def drop_duplicates(list columns, - object keys=None, - object keep='first', - bool nulls_are_equal=True): - """ - Drops rows in source_table as per duplicate rows in keys. - - Parameters - ---------- - columns : List of columns - keys : List of column indices. If set, then these columns are checked for - duplicates rather than all of columns (optional) - keep : keep 'first' or 'last' or none of the duplicate rows - nulls_are_equal : if True, nulls are treated equal else not. - - Returns - ------- - columns with duplicate dropped - """ - if (keep_option := _keep_options.get(keep)) is None: - raise ValueError('keep must be either "first", "last" or False') - - return columns_from_pylibcudf_table( - pylibcudf.stream_compaction.stable_distinct( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - list(keys if keys is not None else range(len(columns))), - keep_option, - pylibcudf.types.NullEquality.EQUAL - if nulls_are_equal else pylibcudf.types.NullEquality.UNEQUAL, - pylibcudf.types.NanEquality.ALL_EQUAL, - ) - ) - - -@acquire_spill_lock() -def distinct_indices( - list columns, - object keep="first", - bool nulls_equal=True, - bool nans_equal=True, -): - """ - Return indices of the distinct rows in a table. - - Parameters - ---------- - columns : list of columns to check for duplicates - keep : treat "first", "last", or (False) none of any duplicate - rows as distinct - nulls_equal : Should nulls compare equal - nans_equal: Should nans compare equal - - Returns - ------- - Column of indices - - See Also - -------- - drop_duplicates - """ - if (keep_option := _keep_options.get(keep)) is None: - raise ValueError('keep must be either "first", "last" or False') - - return Column.from_pylibcudf( - pylibcudf.stream_compaction.distinct_indices( - pylibcudf.Table([c.to_pylibcudf(mode="read") for c in columns]), - keep_option, - pylibcudf.types.NullEquality.EQUAL - if nulls_equal else pylibcudf.types.NullEquality.UNEQUAL, - pylibcudf.types.NanEquality.ALL_EQUAL - if nans_equal else pylibcudf.types.NanEquality.UNEQUAL, - ) - ) - - -@acquire_spill_lock() -def distinct_count(Column source_column, ignore_nulls=True, nan_as_null=False): - """ - Finds number of unique rows in `source_column` - - Parameters - ---------- - source_column : source table checked for unique rows - ignore_nulls : If True nulls are ignored, - else counted as one more distinct value - nan_as_null : If True, NAN is considered NULL, - else counted as one more distinct value - - Returns - ------- - Count of number of unique rows in `source_column` - """ - return pylibcudf.stream_compaction.distinct_count( - source_column.to_pylibcudf(mode="read"), - pylibcudf.types.NullPolicy.EXCLUDE - if ignore_nulls else pylibcudf.types.NullPolicy.INCLUDE, - pylibcudf.types.NanPolicy.NAN_IS_NULL - if nan_as_null else pylibcudf.types.NanPolicy.NAN_IS_VALID, - ) diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index e97f63db17a..f4543bc6156 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -10,15 +10,15 @@ from typing_extensions import Self import cudf -from cudf._lib.stream_compaction import ( - apply_boolean_mask, - drop_duplicates, - drop_nulls, -) from cudf._lib.types import size_type_dtype from cudf.api.extensions import no_default from cudf.api.types import is_integer, is_list_like, is_scalar from cudf.core._internals import copying +from cudf.core._internals.stream_compaction import ( + apply_boolean_mask, + drop_duplicates, + drop_nulls, +) from cudf.core.abc import Serializable from cudf.core.column import ColumnBase, column from cudf.core.copy_types import GatherMap @@ -414,7 +414,7 @@ def hasnans(self): raise NotImplementedError @property - def nlevels(self): + def nlevels(self) -> int: """ Number of levels. """ @@ -1944,7 +1944,6 @@ def drop_duplicates( return self._from_columns_like_self( drop_duplicates( list(self._columns), - keys=range(len(self._columns)), keep=keep, nulls_are_equal=nulls_are_equal, ), @@ -2033,7 +2032,6 @@ def dropna(self, how="any"): drop_nulls( data_columns, how=how, - keys=range(len(data_columns)), ), self._column_names, ) diff --git a/python/cudf/cudf/core/_internals/stream_compaction.py b/python/cudf/cudf/core/_internals/stream_compaction.py new file mode 100644 index 00000000000..4ccc26c2a1c --- /dev/null +++ b/python/cudf/cudf/core/_internals/stream_compaction.py @@ -0,0 +1,121 @@ +# Copyright (c) 2020-2024, NVIDIA CORPORATION. +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +import pylibcudf as plc + +from cudf._lib.column import Column +from cudf.core.buffer import acquire_spill_lock + +if TYPE_CHECKING: + from cudf.core.column import ColumnBase + + +@acquire_spill_lock() +def drop_nulls( + columns: list[ColumnBase], + how: Literal["any", "all"] = "any", + keys: list[int] | None = None, + thresh: int | None = None, +) -> list[ColumnBase]: + """ + Drops null rows from cols depending on key columns. + + Parameters + ---------- + columns : list of columns + how : "any" or "all". If thresh is None, drops rows of cols that have any + nulls or all nulls (respectively) in subset (default: "any") + keys : List of column indices. If set, then these columns are checked for + nulls rather than all of columns (optional) + thresh : Minimum number of non-nulls required to keep a row (optional) + + Returns + ------- + columns with null rows dropped + """ + if how not in {"any", "all"}: + raise ValueError("how must be 'any' or 'all'") + + keys = keys if keys is not None else list(range(len(columns))) + + # Note: If how == "all" and thresh is specified this prioritizes thresh + if thresh is not None: + keep_threshold = thresh + elif how == "all": + keep_threshold = 1 + else: + keep_threshold = len(keys) + + plc_table = plc.stream_compaction.drop_nulls( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + keys, + keep_threshold, + ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()] + + +@acquire_spill_lock() +def apply_boolean_mask( + columns: list[ColumnBase], boolean_mask: ColumnBase +) -> list[ColumnBase]: + """ + Drops the rows which correspond to False in boolean_mask. + + Parameters + ---------- + columns : list of columns whose rows are dropped as per boolean_mask + boolean_mask : a boolean column of same size as source_table + + Returns + ------- + columns obtained from applying mask + """ + plc_table = plc.stream_compaction.apply_boolean_mask( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + boolean_mask.to_pylibcudf(mode="read"), + ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()] + + +@acquire_spill_lock() +def drop_duplicates( + columns: list[ColumnBase], + keys: list[int] | None = None, + keep: Literal["first", "last", False] = "first", + nulls_are_equal: bool = True, +) -> list[ColumnBase]: + """ + Drops rows in source_table as per duplicate rows in keys. + + Parameters + ---------- + columns : List of columns + keys : List of column indices. If set, then these columns are checked for + duplicates rather than all of columns (optional) + keep : keep 'first' or 'last' or none of the duplicate rows + nulls_are_equal : if True, nulls are treated equal else not. + + Returns + ------- + columns with duplicate dropped + """ + _keep_options = { + "first": plc.stream_compaction.DuplicateKeepOption.KEEP_FIRST, + "last": plc.stream_compaction.DuplicateKeepOption.KEEP_LAST, + False: plc.stream_compaction.DuplicateKeepOption.KEEP_NONE, + } + if (keep_option := _keep_options.get(keep)) is None: + raise ValueError('keep must be either "first", "last" or False') + + plc_table = plc.stream_compaction.stable_distinct( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + keys if keys is not None else list(range(len(columns))), + keep_option, + plc.types.NullEquality.EQUAL + if nulls_are_equal + else plc.types.NullEquality.UNEQUAL, + plc.types.NanEquality.ALL_EQUAL, + ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()] diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 2ae7c3f6503..2515157253c 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -26,12 +26,6 @@ from cudf import _lib as libcudf from cudf._lib.column import Column from cudf._lib.scalar import as_device_scalar -from cudf._lib.stream_compaction import ( - apply_boolean_mask, - distinct_count as cpp_distinct_count, - drop_duplicates, - drop_nulls, -) from cudf._lib.types import dtype_to_pylibcudf_type, size_type_dtype from cudf.api.types import ( _is_non_decimal_numeric_dtype, @@ -43,6 +37,11 @@ ) from cudf.core._compat import PANDAS_GE_210 from cudf.core._internals import aggregation, copying, sorting, unary +from cudf.core._internals.stream_compaction import ( + apply_boolean_mask, + drop_duplicates, + drop_nulls, +) from cudf.core._internals.timezones import get_compatible_timezone from cudf.core.abc import Serializable from cudf.core.buffer import ( @@ -276,7 +275,7 @@ def any(self, skipna: bool = True) -> bool: def dropna(self) -> Self: if self.has_nulls(): - return drop_nulls([self])[0]._with_type_metadata(self.dtype) + return drop_nulls([self])[0]._with_type_metadata(self.dtype) # type: ignore[return-value] else: return self.copy() @@ -849,7 +848,7 @@ def indices_of( else: value = as_column(value, dtype=self.dtype, length=1) mask = value.contains(self) - return apply_boolean_mask( + return apply_boolean_mask( # type: ignore[return-value] [as_column(range(0, len(self)), dtype=size_type_dtype)], mask )[0] @@ -1084,9 +1083,15 @@ def distinct_count(self, dropna: bool = True) -> int: try: return self._distinct_count[dropna] except KeyError: - self._distinct_count[dropna] = cpp_distinct_count( - self, ignore_nulls=dropna - ) + with acquire_spill_lock(): + result = plc.stream_compaction.distinct_count( + self.to_pylibcudf(mode="read"), + plc.types.NullPolicy.EXCLUDE + if dropna + else plc.types.NullPolicy.INCLUDE, + plc.types.NanPolicy.NAN_IS_VALID, + ) + self._distinct_count[dropna] = result return self._distinct_count[dropna] def can_cast_safely(self, to_dtype: Dtype) -> bool: @@ -1315,7 +1320,7 @@ def unique(self) -> Self: if self.is_unique: return self.copy() else: - return drop_duplicates([self], keep="first")[ + return drop_duplicates([self], keep="first")[ # type: ignore[return-value] 0 ]._with_type_metadata(self.dtype) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 2c92069f26e..e66e4f41642 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -7878,7 +7878,8 @@ def interleave_columns(self): return self._constructor_sliced._from_column(result_col) @acquire_spill_lock() - def _compute_columns(self, expr: str) -> ColumnBase: + def _compute_column(self, expr: str) -> ColumnBase: + """Helper function for eval""" plc_column = plc.transform.compute_column( plc.Table( [col.to_pylibcudf(mode="read") for col in self._columns] @@ -8014,7 +8015,7 @@ def eval(self, expr: str, inplace: bool = False, **kwargs): raise ValueError( "Cannot operate inplace if there is no assignment" ) - return Series._from_column(self._compute_columns(statements[0])) + return Series._from_column(self._compute_column(statements[0])) targets = [] exprs = [] @@ -8032,7 +8033,7 @@ def eval(self, expr: str, inplace: bool = False, **kwargs): ret = self if inplace else self.copy(deep=False) for name, expr in zip(targets, exprs): - ret._data[name] = self._compute_columns(expr) + ret._data[name] = self._compute_column(expr) if not inplace: return ret diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 2412d6e9c4f..ba9b15667f1 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1058,19 +1058,6 @@ def to_arrow(self): } ) - @_performance_tracking - def _positions_from_column_names(self, column_names) -> list[int]: - """Map each column name into their positions in the frame. - - The order of indices returned corresponds to the column order in this - Frame. - """ - return [ - i - for i, name in enumerate(self._column_names) - if name in set(column_names) - ] - @_performance_tracking def _copy_type_metadata(self: Self, other: Self) -> Self: """ diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index b772d35846d..6cd8e11695f 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -16,6 +16,7 @@ import pylibcudf as plc import cudf +import cudf.core._internals from cudf import _lib as libcudf from cudf._lib import groupby as libgroupby from cudf._lib.types import size_type_dtype @@ -430,7 +431,9 @@ def indices(self) -> dict[ScalarLike, cp.ndarray]: ] ) - group_keys = libcudf.stream_compaction.drop_duplicates(group_keys) + group_keys = cudf.core._internals.stream_compaction.drop_duplicates( + group_keys + ) if len(group_keys) > 1: index = cudf.MultiIndex.from_arrays(group_keys) else: diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 8302cd72aa8..72bb85821fa 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -27,6 +27,7 @@ import cudf import cudf._lib as libcudf import cudf.core +import cudf.core._internals import cudf.core.algorithms from cudf.api.extensions import no_default from cudf.api.types import ( @@ -3063,21 +3064,21 @@ def _slice(self, arg: slice, keep_index: bool = True) -> Self: return result def _positions_from_column_names( - self, column_names, offset_by_index_columns=False - ): + self, + column_names: set[abc.Hashable], + offset_by_index_columns: bool = True, + ) -> list[int]: """Map each column name into their positions in the frame. Return positions of the provided column names, offset by the number of index columns if `offset_by_index_columns` is True. The order of indices returned corresponds to the column order in this Frame. """ - num_index_columns = ( - len(self.index._data) if offset_by_index_columns else 0 - ) + start = self.index.nlevels if offset_by_index_columns else 0 return [ - i + num_index_columns - for i, name in enumerate(self._column_names) - if name in set(column_names) + i + for i, name in enumerate(self._column_names, start=start) + if name in column_names ] def drop_duplicates( @@ -3114,7 +3115,7 @@ def drop_duplicates( subset, offset_by_index_columns=not ignore_index ) return self._from_columns_like_self( - libcudf.stream_compaction.drop_duplicates( + cudf.core._internals.stream_compaction.drop_duplicates( list(self._columns) if ignore_index else list(self.index._columns + self._columns), @@ -3127,7 +3128,9 @@ def drop_duplicates( ) @_performance_tracking - def duplicated(self, subset=None, keep="first"): + def duplicated( + self, subset=None, keep: Literal["first", "last", False] = "first" + ) -> cudf.Series: """ Return boolean Series denoting duplicate rows. @@ -3227,9 +3230,24 @@ def duplicated(self, subset=None, keep="first"): name = self.name else: columns = [self._data[n] for n in subset] - distinct = libcudf.stream_compaction.distinct_indices( - columns, keep=keep - ) + + _keep_options = { + "first": plc.stream_compaction.DuplicateKeepOption.KEEP_FIRST, + "last": plc.stream_compaction.DuplicateKeepOption.KEEP_LAST, + False: plc.stream_compaction.DuplicateKeepOption.KEEP_NONE, + } + + if (keep_option := _keep_options.get(keep)) is None: + raise ValueError('keep must be either "first", "last" or False') + + with acquire_spill_lock(): + plc_column = plc.stream_compaction.distinct_indices( + plc.Table([col.to_pylibcudf(mode="read") for col in columns]), + keep_option, + plc.types.NullEquality.EQUAL, + plc.types.NanEquality.ALL_EQUAL, + ) + distinct = libcudf.column.Column.from_pylibcudf(plc_column) result = copying.scatter( [cudf.Scalar(False, dtype=bool)], distinct, @@ -4353,12 +4371,10 @@ def _drop_na_rows(self, how="any", subset=None, thresh=None): data_columns = [col.nans_to_nulls() for col in self._columns] return self._from_columns_like_self( - libcudf.stream_compaction.drop_nulls( + cudf.core._internals.stream_compaction.drop_nulls( [*self.index._columns, *data_columns], how=how, - keys=self._positions_from_column_names( - subset, offset_by_index_columns=True - ), + keys=self._positions_from_column_names(subset), thresh=thresh, ), self._column_names, @@ -4378,7 +4394,7 @@ def _apply_boolean_mask(self, boolean_mask: BooleanMask, keep_index=True): f"{len(boolean_mask.column)} not {len(self)}" ) return self._from_columns_like_self( - libcudf.stream_compaction.apply_boolean_mask( + cudf.core._internals.stream_compaction.apply_boolean_mask( list(self.index._columns + self._columns) if keep_index else list(self._columns), @@ -6289,17 +6305,16 @@ def ge(self, other, axis="columns", level=None, fill_value=None): other=other, op="__ge__", fill_value=fill_value, can_reindex=True ) - def _preprocess_subset(self, subset): + def _preprocess_subset(self, subset) -> set[abc.Hashable]: if subset is None: subset = self._column_names elif ( - not np.iterable(subset) - or isinstance(subset, str) + is_scalar(subset) or isinstance(subset, tuple) and subset in self._column_names ): subset = (subset,) - diff = set(subset) - set(self._data) + diff = set(subset) - set(self._column_names) if len(diff) != 0: raise KeyError(f"columns {diff} do not exist") return subset