From d930f8925720672cb6120ff73826b662f88bc2a0 Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 27 Sep 2021 19:35:34 +0900 Subject: [PATCH 1/9] [SPARK-36438] Support list-like Python objects for Series comparison --- python/pyspark/pandas/base.py | 12 +++- python/pyspark/pandas/series.py | 18 ++++- .../pandas/tests/test_ops_on_diff_frames.py | 69 +++++++++++++++++++ python/pyspark/pandas/tests/test_series.py | 36 ++++++++++ 4 files changed, 133 insertions(+), 2 deletions(-) diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index 58f6c1918d67a..ab06cbec7c69f 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -394,7 +394,17 @@ def __abs__(self: IndexOpsLike) -> IndexOpsLike: # comparison operators def __eq__(self, other: Any) -> SeriesOrIndex: # type: ignore[override] - return self._dtype_op.eq(self, other) + if isinstance(other, (list, tuple)): + with ps.option_context("compute.ordered_head", True): + pindex_ops = self.head(len(other) + 1)._to_internal_pandas() # type: ignore + if len(pindex_ops) != len(other): + raise ValueError("Lengths must be equal") + return ps.from_pandas(pindex_ops == other) # type: ignore + # pandas always returns False for all items with dict and set. + elif isinstance(other, (dict, set)): + return self != self + else: + return column_op(Column.__eq__)(self, other) def __ne__(self, other: Any) -> SeriesOrIndex: # type: ignore[override] return self._dtype_op.ne(self, other) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 7e45566cfbfdb..b58acbfae01d0 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -684,7 +684,7 @@ def rfloordiv(self, other: Any) -> "Series": koalas = CachedAccessor("koalas", PandasOnSparkSeriesMethods) # Comparison Operators - def eq(self, other: Any) -> bool: + def eq(self, other: Any) -> "Series": """ Compare if the current value is equal to the other. @@ -705,6 +705,22 @@ def eq(self, other: Any) -> bool: c True d False Name: b, dtype: bool + + Support for list-like Python object with same length + + >>> df.a == [1, 3, 2, 4] + a True + b False + c False + d True + Name: a, dtype: bool + + >>> df.a.eq([1, 3, 2, 4]) + a True + b False + c False + d True + Name: a, dtype: bool """ return self == other diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py index 3e3bb0d231ec6..ba155ce855a0e 100644 --- a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py +++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py @@ -1830,6 +1830,61 @@ def _test_cov(self, pser1, pser2): pscov = psser1.cov(psser2, min_periods=3) self.assert_eq(pcov, pscov, almost=True) + def test_series_eq(self): + pser = pd.Series([1, 2, 3, 4, 5, 6], name="x") + psser = ps.from_pandas(pser) + + # other = Series + pandas_other = pd.Series([np.nan, 1, 3, 4, np.nan, 6], name="x") + pandas_on_spark_other = ps.from_pandas(pandas_other) + self.assert_eq(pser.eq(pandas_other), psser.eq(pandas_on_spark_other).sort_index()) + self.assert_eq(pser == pandas_other, (psser == pandas_on_spark_other).sort_index()) + + # other = Series with different Index + pandas_other = pd.Series( + [np.nan, 1, 3, 4, np.nan, 6], index=[10, 20, 30, 40, 50, 60], name="x" + ) + pandas_on_spark_other = ps.from_pandas(pandas_other) + self.assert_eq(pser.eq(pandas_other), psser.eq(pandas_on_spark_other).sort_index()) + + # other = Index + pandas_other = pd.Index([np.nan, 1, 3, 4, np.nan, 6], name="x") + pandas_on_spark_other = ps.from_pandas(pandas_other) + self.assert_eq(pser.eq(pandas_other), psser.eq(pandas_on_spark_other).sort_index()) + self.assert_eq(pser == pandas_other, (psser == pandas_on_spark_other).sort_index()) + + # other = list + other = [np.nan, 1, 3, 4, np.nan, 6] + if LooseVersion(pd.__version__) >= LooseVersion("1.2"): + self.assert_eq(pser.eq(other), psser.eq(other).sort_index()) + self.assert_eq(pser == other, (psser == other).sort_index()) + else: + self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) + self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) + + # other = tuple + other = (np.nan, 1, 3, 4, np.nan, 6) + if LooseVersion(pd.__version__) >= LooseVersion("1.2"): + self.assert_eq(pser.eq(other), psser.eq(other).sort_index()) + self.assert_eq(pser == other, (psser == other).sort_index()) + else: + self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) + self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) + + # other = list with the different length + other = [np.nan, 1, 3, 4, np.nan] + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser.eq(other) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser == other + + # other = tuple with the different length + other = (np.nan, 1, 3, 4, np.nan) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser.eq(other) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser == other + class OpsOnDiffFramesDisabledTest(PandasOnSparkTestCase, SQLTestUtils): @classmethod @@ -2017,6 +2072,20 @@ def test_combine_first(self): with self.assertRaisesRegex(ValueError, "Cannot combine the series or dataframe"): psdf1.combine_first(psdf2) + def test_series_eq(self): + pser = pd.Series([1, 2, 3, 4, 5, 6], name="x") + psser = ps.from_pandas(pser) + + others = ( + ps.Series([np.nan, 1, 3, 4, np.nan, 6], name="x"), + ps.Index([np.nan, 1, 3, 4, np.nan, 6], name="x"), + ) + for other in others: + with self.assertRaisesRegex(ValueError, "Cannot combine the series or dataframe"): + psser.eq(other) + with self.assertRaisesRegex(ValueError, "Cannot combine the series or dataframe"): + psser == other + if __name__ == "__main__": from pyspark.pandas.tests.test_ops_on_diff_frames import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index e36885b0cd7fa..629df1934cfcd 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -2950,6 +2950,42 @@ def _test_cov(self, pdf): pscov = psdf["s1"].cov(psdf["s2"], min_periods=4) self.assert_eq(pcov, pscov, almost=True) + def test_eq(self): + pser = pd.Series([1, 2, 3, 4, 5, 6], name="x") + psser = ps.from_pandas(pser) + + # other = Series + self.assert_eq(pser.eq(pser), psser.eq(psser)) + self.assert_eq(pser == pser, psser == psser) + + # other = list + other = [np.nan, 1, 3, 4, np.nan, 6] + if LooseVersion(pd.__version__) >= LooseVersion("1.2"): + self.assert_eq(pser.eq(other), psser.eq(other)) + self.assert_eq(pser == other, psser == other) + else: + self.assert_eq(pser.eq(other).rename("x"), psser.eq(other)) + self.assert_eq((pser == other).rename("x"), psser == other) + + # other = tuple + other = (np.nan, 1, 3, 4, np.nan, 6) + if LooseVersion(pd.__version__) >= LooseVersion("1.2"): + self.assert_eq(pser.eq(other), psser.eq(other)) + self.assert_eq(pser == other, psser == other) + else: + self.assert_eq(pser.eq(other).rename("x"), psser.eq(other)) + self.assert_eq((pser == other).rename("x"), psser == other) + + # other = dict + other = {1: None, 2: None, 3: None, 4: None, np.nan: None, 6: None} + self.assert_eq(pser.eq(other), psser.eq(other)) + self.assert_eq(pser == other, psser == other) + + # other = set + other = {1, 2, 3, 4, np.nan, 6} + self.assert_eq(pser.eq(other), psser.eq(other)) + self.assert_eq(pser == other, psser == other) + if __name__ == "__main__": from pyspark.pandas.tests.test_series import * # noqa: F401 From cbe62da250ff001b4f49c718772d7826a18a66c8 Mon Sep 17 00:00:00 2001 From: itholic Date: Wed, 29 Sep 2021 13:10:53 +0900 Subject: [PATCH 2/9] Resolved comments & test failure --- python/pyspark/pandas/base.py | 12 +++---- python/pyspark/pandas/series.py | 16 ---------- .../data_type_ops/test_categorical_ops.py | 2 +- .../pandas/tests/test_ops_on_diff_frames.py | 14 -------- python/pyspark/pandas/tests/test_series.py | 32 ++++++++----------- 5 files changed, 21 insertions(+), 55 deletions(-) diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index ab06cbec7c69f..fa4038041a4f4 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -395,16 +395,16 @@ def __abs__(self: IndexOpsLike) -> IndexOpsLike: # comparison operators def __eq__(self, other: Any) -> SeriesOrIndex: # type: ignore[override] if isinstance(other, (list, tuple)): - with ps.option_context("compute.ordered_head", True): - pindex_ops = self.head(len(other) + 1)._to_internal_pandas() # type: ignore - if len(pindex_ops) != len(other): - raise ValueError("Lengths must be equal") - return ps.from_pandas(pindex_ops == other) # type: ignore + if len(self) != len(other): + raise ValueError("Lengths must be equal") + name = self._internal.spark_column_name_for(self.spark.column) + other = ps.Series(other, name=name) + return self == other # pandas always returns False for all items with dict and set. elif isinstance(other, (dict, set)): return self != self else: - return column_op(Column.__eq__)(self, other) + return self._dtype_op.eq(self, other) def __ne__(self, other: Any) -> SeriesOrIndex: # type: ignore[override] return self._dtype_op.ne(self, other) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index b58acbfae01d0..a875a1c61e4e0 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -705,22 +705,6 @@ def eq(self, other: Any) -> "Series": c True d False Name: b, dtype: bool - - Support for list-like Python object with same length - - >>> df.a == [1, 3, 2, 4] - a True - b False - c False - d True - Name: a, dtype: bool - - >>> df.a.eq([1, 3, 2, 4]) - a True - b False - c False - d True - Name: a, dtype: bool """ return self == other diff --git a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py index 0aa2e108d799a..ad8144f30f756 100644 --- a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py +++ b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py @@ -259,7 +259,7 @@ def test_eq(self): ) self.assertRaisesRegex( TypeError, - "The operation can not be applied to list", + "Cannot compare a Categorical with the given type", lambda: ordered_psser == [1, 2, 3], ) diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py index ba155ce855a0e..16b323655da88 100644 --- a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py +++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py @@ -1871,20 +1871,6 @@ def test_series_eq(self): self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) - # other = list with the different length - other = [np.nan, 1, 3, 4, np.nan] - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser.eq(other) - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser == other - - # other = tuple with the different length - other = (np.nan, 1, 3, 4, np.nan) - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser.eq(other) - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser == other - class OpsOnDiffFramesDisabledTest(PandasOnSparkTestCase, SQLTestUtils): @classmethod diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index 629df1934cfcd..8a27f5e0ec94a 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -2958,24 +2958,6 @@ def test_eq(self): self.assert_eq(pser.eq(pser), psser.eq(psser)) self.assert_eq(pser == pser, psser == psser) - # other = list - other = [np.nan, 1, 3, 4, np.nan, 6] - if LooseVersion(pd.__version__) >= LooseVersion("1.2"): - self.assert_eq(pser.eq(other), psser.eq(other)) - self.assert_eq(pser == other, psser == other) - else: - self.assert_eq(pser.eq(other).rename("x"), psser.eq(other)) - self.assert_eq((pser == other).rename("x"), psser == other) - - # other = tuple - other = (np.nan, 1, 3, 4, np.nan, 6) - if LooseVersion(pd.__version__) >= LooseVersion("1.2"): - self.assert_eq(pser.eq(other), psser.eq(other)) - self.assert_eq(pser == other, psser == other) - else: - self.assert_eq(pser.eq(other).rename("x"), psser.eq(other)) - self.assert_eq((pser == other).rename("x"), psser == other) - # other = dict other = {1: None, 2: None, 3: None, 4: None, np.nan: None, 6: None} self.assert_eq(pser.eq(other), psser.eq(other)) @@ -2986,6 +2968,20 @@ def test_eq(self): self.assert_eq(pser.eq(other), psser.eq(other)) self.assert_eq(pser == other, psser == other) + # other = list with the different length + other = [np.nan, 1, 3, 4, np.nan] + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser.eq(other) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser == other + + # other = tuple with the different length + other = (np.nan, 1, 3, 4, np.nan) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser.eq(other) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser == other + if __name__ == "__main__": from pyspark.pandas.tests.test_series import * # noqa: F401 From fb555b4cc55a5347dabf59c0ce7c406cfbc11ba8 Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 30 Sep 2021 16:04:39 +0900 Subject: [PATCH 3/9] Use collect_list and zip_with, explode --- python/pyspark/pandas/base.py | 78 ++++++++++++++++++++-- python/pyspark/pandas/tests/test_series.py | 14 ---- 2 files changed, 73 insertions(+), 19 deletions(-) diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index fa4038041a4f4..eb712d7df9d55 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -395,11 +395,79 @@ def __abs__(self: IndexOpsLike) -> IndexOpsLike: # comparison operators def __eq__(self, other: Any) -> SeriesOrIndex: # type: ignore[override] if isinstance(other, (list, tuple)): - if len(self) != len(other): - raise ValueError("Lengths must be equal") - name = self._internal.spark_column_name_for(self.spark.column) - other = ps.Series(other, name=name) - return self == other + from pyspark.pandas.series import first_series + + sdf = self._internal.spark_frame + structed_scol = F.struct( + sdf[NATURAL_ORDER_COLUMN_NAME], + *self._internal.index_spark_columns, + self.spark.column + ) + # The size of the list is expected to be small. + collected_structed_scol = F.collect_list(structed_scol) + # Sort the array by NATURAL_ORDER_COLUMN so that we can guarantee the order. + collected_structed_scol = F.array_sort(collected_structed_scol) + other_values_scol = F.array([F.lit(x) for x in other]) # type: ignore + index_scol_names = self._internal.index_spark_column_names + scol_name = self._internal.spark_column_name_for(self._internal.column_labels[0]) + # Compare the values of self and other by using zip_with function. + cond = F.zip_with( + collected_structed_scol, + other_values_scol, + lambda x, y: F.struct( + *[ + x[index_scol_name].alias(index_scol_name) + for index_scol_name in index_scol_names + ], + F.when( + F.assert_true( + # If the comparing result is null, + # that means the length of `self` and `other` is not the same. + (x[scol_name] == y).isNotNull(), + "Lengths must be equal", + ).isNull(), + x[scol_name] == y, + ).alias(scol_name) + ), + ).alias(scol_name) + # 1. `sdf_new` here looks like the below (the first field of each set is Index): + # +----------------------------------------------------------+ + # |0 | + # +----------------------------------------------------------+ + # |[{0, false}, {1, true}, {2, false}, {3, true}, {4, false}]| + # +----------------------------------------------------------+ + sdf_new = sdf.select(cond) + # 2. `sdf_new` after the explode looks like the below: + # +----------+ + # | col| + # +----------+ + # |{0, false}| + # | {1, true}| + # |{2, false}| + # | {3, true}| + # |{4, false}| + # +----------+ + sdf_new = sdf_new.select(F.explode(scol_name)) + # 3. Here, the final `sdf_new` looks like the below: + # +-----------------+-----+ + # |__index_level_0__| 0| + # +-----------------+-----+ + # | 0|false| + # | 1| true| + # | 2|false| + # | 3| true| + # | 4|false| + # +-----------------+-----+ + sdf_new = sdf_new.select("col.*") + internal = self._internal.copy( + spark_frame=sdf_new, + index_spark_columns=[ + scol_for(sdf_new, index_scol_name) for index_scol_name in index_scol_names + ], + data_spark_columns=[scol_for(sdf_new, scol_name)], + data_fields=[InternalField.from_struct_field(sdf_new.select(scol_name).schema[0])], + ) + return first_series(ps.DataFrame(internal)) # pandas always returns False for all items with dict and set. elif isinstance(other, (dict, set)): return self != self diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index 8a27f5e0ec94a..a294775474f6b 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -2968,20 +2968,6 @@ def test_eq(self): self.assert_eq(pser.eq(other), psser.eq(other)) self.assert_eq(pser == other, psser == other) - # other = list with the different length - other = [np.nan, 1, 3, 4, np.nan] - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser.eq(other) - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser == other - - # other = tuple with the different length - other = (np.nan, 1, 3, 4, np.nan) - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser.eq(other) - with self.assertRaisesRegex(ValueError, "Lengths must be equal"): - psser == other - if __name__ == "__main__": from pyspark.pandas.tests.test_series import * # noqa: F401 From e890e81f1222d0e33576ed9e6397a9d919a23369 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 1 Oct 2021 13:12:47 +0900 Subject: [PATCH 4/9] move codes to proper place --- python/pyspark/pandas/base.py | 76 +---------------- python/pyspark/pandas/data_type_ops/base.py | 83 ++++++++++++++++++- .../data_type_ops/test_categorical_ops.py | 2 +- .../pandas/tests/test_ops_on_diff_frames.py | 18 ---- python/pyspark/pandas/tests/test_series.py | 18 ++++ 5 files changed, 99 insertions(+), 98 deletions(-) diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index eb712d7df9d55..415d02ea6a9c8 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -394,82 +394,8 @@ def __abs__(self: IndexOpsLike) -> IndexOpsLike: # comparison operators def __eq__(self, other: Any) -> SeriesOrIndex: # type: ignore[override] - if isinstance(other, (list, tuple)): - from pyspark.pandas.series import first_series - - sdf = self._internal.spark_frame - structed_scol = F.struct( - sdf[NATURAL_ORDER_COLUMN_NAME], - *self._internal.index_spark_columns, - self.spark.column - ) - # The size of the list is expected to be small. - collected_structed_scol = F.collect_list(structed_scol) - # Sort the array by NATURAL_ORDER_COLUMN so that we can guarantee the order. - collected_structed_scol = F.array_sort(collected_structed_scol) - other_values_scol = F.array([F.lit(x) for x in other]) # type: ignore - index_scol_names = self._internal.index_spark_column_names - scol_name = self._internal.spark_column_name_for(self._internal.column_labels[0]) - # Compare the values of self and other by using zip_with function. - cond = F.zip_with( - collected_structed_scol, - other_values_scol, - lambda x, y: F.struct( - *[ - x[index_scol_name].alias(index_scol_name) - for index_scol_name in index_scol_names - ], - F.when( - F.assert_true( - # If the comparing result is null, - # that means the length of `self` and `other` is not the same. - (x[scol_name] == y).isNotNull(), - "Lengths must be equal", - ).isNull(), - x[scol_name] == y, - ).alias(scol_name) - ), - ).alias(scol_name) - # 1. `sdf_new` here looks like the below (the first field of each set is Index): - # +----------------------------------------------------------+ - # |0 | - # +----------------------------------------------------------+ - # |[{0, false}, {1, true}, {2, false}, {3, true}, {4, false}]| - # +----------------------------------------------------------+ - sdf_new = sdf.select(cond) - # 2. `sdf_new` after the explode looks like the below: - # +----------+ - # | col| - # +----------+ - # |{0, false}| - # | {1, true}| - # |{2, false}| - # | {3, true}| - # |{4, false}| - # +----------+ - sdf_new = sdf_new.select(F.explode(scol_name)) - # 3. Here, the final `sdf_new` looks like the below: - # +-----------------+-----+ - # |__index_level_0__| 0| - # +-----------------+-----+ - # | 0|false| - # | 1| true| - # | 2|false| - # | 3| true| - # | 4|false| - # +-----------------+-----+ - sdf_new = sdf_new.select("col.*") - internal = self._internal.copy( - spark_frame=sdf_new, - index_spark_columns=[ - scol_for(sdf_new, index_scol_name) for index_scol_name in index_scol_names - ], - data_spark_columns=[scol_for(sdf_new, scol_name)], - data_fields=[InternalField.from_struct_field(sdf_new.select(scol_name).schema[0])], - ) - return first_series(ps.DataFrame(internal)) # pandas always returns False for all items with dict and set. - elif isinstance(other, (dict, set)): + if isinstance(other, (dict, set)): return self != self else: return self._dtype_op.eq(self, other) diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py index e6261c3ea00b0..7c295b273dd74 100644 --- a/python/pyspark/pandas/data_type_ops/base.py +++ b/python/pyspark/pandas/data_type_ops/base.py @@ -349,11 +349,86 @@ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: raise TypeError(">= can not be applied to %s." % self.pretty_name) def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: - from pyspark.pandas.base import column_op - - _sanitize_list_like(right) + if isinstance(right, (list, tuple)): + from pyspark.pandas.series import first_series, scol_for + from pyspark.pandas.frame import DataFrame + from pyspark.pandas.internal import NATURAL_ORDER_COLUMN_NAME, InternalField + + sdf = left._internal.spark_frame + structed_scol = F.struct( + sdf[NATURAL_ORDER_COLUMN_NAME], + *left._internal.index_spark_columns, + left.spark.column + ) + # The size of the list is expected to be small. + collected_structed_scol = F.collect_list(structed_scol) + # Sort the array by NATURAL_ORDER_COLUMN so that we can guarantee the order. + collected_structed_scol = F.array_sort(collected_structed_scol) + right_values_scol = F.array([F.lit(x) for x in right]) # type: ignore + index_scol_names = left._internal.index_spark_column_names + scol_name = left._internal.spark_column_name_for(left._internal.column_labels[0]) + # Compare the values of left and right by using zip_with function. + cond = F.zip_with( + collected_structed_scol, + right_values_scol, + lambda x, y: F.struct( + *[ + x[index_scol_name].alias(index_scol_name) + for index_scol_name in index_scol_names + ], + F.when( + F.assert_true( + # If the comparing result is null, + # that means the length of `left` and `right` is not the same. + (x[scol_name] == y).isNotNull(), + "Lengths must be equal", + ).isNull(), + x[scol_name] == y, + ).alias(scol_name) + ), + ).alias(scol_name) + # 1. `sdf_new` here looks like the below (the first field of each set is Index): + # +----------------------------------------------------------+ + # |0 | + # +----------------------------------------------------------+ + # |[{0, false}, {1, true}, {2, false}, {3, true}, {4, false}]| + # +----------------------------------------------------------+ + sdf_new = sdf.select(cond) + # 2. `sdf_new` after the explode looks like the below: + # +----------+ + # | col| + # +----------+ + # |{0, false}| + # | {1, true}| + # |{2, false}| + # | {3, true}| + # |{4, false}| + # +----------+ + sdf_new = sdf_new.select(F.explode(scol_name)) + # 3. Here, the final `sdf_new` looks like the below: + # +-----------------+-----+ + # |__index_level_0__| 0| + # +-----------------+-----+ + # | 0|false| + # | 1| true| + # | 2|false| + # | 3| true| + # | 4|false| + # +-----------------+-----+ + sdf_new = sdf_new.select("col.*") + internal = left._internal.copy( + spark_frame=sdf_new, + index_spark_columns=[ + scol_for(sdf_new, index_scol_name) for index_scol_name in index_scol_names + ], + data_spark_columns=[scol_for(sdf_new, scol_name)], + data_fields=[InternalField.from_struct_field(sdf_new.select(scol_name).schema[0])], + ) + return first_series(DataFrame(internal)) + else: + from pyspark.pandas.base import column_op - return column_op(Column.__eq__)(left, right) + return column_op(Column.__eq__)(left, right) def ne(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: from pyspark.pandas.base import column_op diff --git a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py index ad8144f30f756..e1b84b1eba25d 100644 --- a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py +++ b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py @@ -259,7 +259,7 @@ def test_eq(self): ) self.assertRaisesRegex( TypeError, - "Cannot compare a Categorical with the given type", + "The operation can not be applied to list.", lambda: ordered_psser == [1, 2, 3], ) diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py index 16b323655da88..6835acacd31fd 100644 --- a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py +++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py @@ -1853,24 +1853,6 @@ def test_series_eq(self): self.assert_eq(pser.eq(pandas_other), psser.eq(pandas_on_spark_other).sort_index()) self.assert_eq(pser == pandas_other, (psser == pandas_on_spark_other).sort_index()) - # other = list - other = [np.nan, 1, 3, 4, np.nan, 6] - if LooseVersion(pd.__version__) >= LooseVersion("1.2"): - self.assert_eq(pser.eq(other), psser.eq(other).sort_index()) - self.assert_eq(pser == other, (psser == other).sort_index()) - else: - self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) - self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) - - # other = tuple - other = (np.nan, 1, 3, 4, np.nan, 6) - if LooseVersion(pd.__version__) >= LooseVersion("1.2"): - self.assert_eq(pser.eq(other), psser.eq(other).sort_index()) - self.assert_eq(pser == other, (psser == other).sort_index()) - else: - self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) - self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) - class OpsOnDiffFramesDisabledTest(PandasOnSparkTestCase, SQLTestUtils): @classmethod diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index a294775474f6b..d2e95d4a4f8fd 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -2968,6 +2968,24 @@ def test_eq(self): self.assert_eq(pser.eq(other), psser.eq(other)) self.assert_eq(pser == other, psser == other) + # other = list + other = [np.nan, 1, 3, 4, np.nan, 6] + if LooseVersion(pd.__version__) >= LooseVersion("1.2"): + self.assert_eq(pser.eq(other), psser.eq(other).sort_index()) + self.assert_eq(pser == other, (psser == other).sort_index()) + else: + self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) + self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) + + # other = tuple + other = (np.nan, 1, 3, 4, np.nan, 6) + if LooseVersion(pd.__version__) >= LooseVersion("1.2"): + self.assert_eq(pser.eq(other), psser.eq(other).sort_index()) + self.assert_eq(pser == other, (psser == other).sort_index()) + else: + self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) + self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) + if __name__ == "__main__": from pyspark.pandas.tests.test_series import * # noqa: F401 From 5199684bcf59761ea3cdca1a2dc77df6f3db33a9 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 1 Oct 2021 13:14:29 +0900 Subject: [PATCH 5/9] rollback meaningless clange --- .../pyspark/pandas/tests/data_type_ops/test_categorical_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py index e1b84b1eba25d..0aa2e108d799a 100644 --- a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py +++ b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py @@ -259,7 +259,7 @@ def test_eq(self): ) self.assertRaisesRegex( TypeError, - "The operation can not be applied to list.", + "The operation can not be applied to list", lambda: ordered_psser == [1, 2, 3], ) From 5c9b1682a528daffde759726369051adee1ab723 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 1 Oct 2021 14:49:06 +0900 Subject: [PATCH 6/9] Handling None --- python/pyspark/pandas/data_type_ops/base.py | 22 ++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py index 7c295b273dd74..cca0bdfb8abdf 100644 --- a/python/pyspark/pandas/data_type_ops/base.py +++ b/python/pyspark/pandas/data_type_ops/base.py @@ -376,15 +376,19 @@ def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: x[index_scol_name].alias(index_scol_name) for index_scol_name in index_scol_names ], - F.when( - F.assert_true( - # If the comparing result is null, - # that means the length of `left` and `right` is not the same. - (x[scol_name] == y).isNotNull(), - "Lengths must be equal", - ).isNull(), - x[scol_name] == y, - ).alias(scol_name) + F.when(x[scol_name].isNull() | y.isNull(), False) + .otherwise( + F.when( + F.assert_true( + # If the comparing result is null, + # that means the length of `left` and `right` is not the same. + (x[scol_name] == y).isNotNull(), + "Lengths must be equal", + ).isNull(), + x[scol_name] == y, + ) + ) + .alias(scol_name) ), ).alias(scol_name) # 1. `sdf_new` here looks like the below (the first field of each set is Index): From 57b3c7389c804bf48deea9bb8805e56a2c5b0aa9 Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 5 Oct 2021 14:41:54 +0900 Subject: [PATCH 7/9] Comapre the length --- python/pyspark/pandas/data_type_ops/base.py | 13 ++++--------- python/pyspark/pandas/tests/test_series.py | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py index cca0bdfb8abdf..18c6e9a4b584c 100644 --- a/python/pyspark/pandas/data_type_ops/base.py +++ b/python/pyspark/pandas/data_type_ops/base.py @@ -354,6 +354,9 @@ def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: from pyspark.pandas.frame import DataFrame from pyspark.pandas.internal import NATURAL_ORDER_COLUMN_NAME, InternalField + if len(left) != len(right): + raise ValueError("Lengths must be equal") + sdf = left._internal.spark_frame structed_scol = F.struct( sdf[NATURAL_ORDER_COLUMN_NAME], @@ -378,15 +381,7 @@ def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: ], F.when(x[scol_name].isNull() | y.isNull(), False) .otherwise( - F.when( - F.assert_true( - # If the comparing result is null, - # that means the length of `left` and `right` is not the same. - (x[scol_name] == y).isNotNull(), - "Lengths must be equal", - ).isNull(), - x[scol_name] == y, - ) + x[scol_name] == y, ) .alias(scol_name) ), diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index d2e95d4a4f8fd..4e3438fafe2e6 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -2986,6 +2986,20 @@ def test_eq(self): self.assert_eq(pser.eq(other).rename("x"), psser.eq(other).sort_index()) self.assert_eq((pser == other).rename("x"), (psser == other).sort_index()) + # other = list with the different length + other = [np.nan, 1, 3, 4, np.nan] + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser.eq(other) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser == other + + # other = tuple with the different length + other = (np.nan, 1, 3, 4, np.nan) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser.eq(other) + with self.assertRaisesRegex(ValueError, "Lengths must be equal"): + psser == other + if __name__ == "__main__": from pyspark.pandas.tests.test_series import * # noqa: F401 From 15a6c20060b7e085e42cfef3db33a15d5991bbef Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 5 Oct 2021 20:39:36 +0900 Subject: [PATCH 8/9] fix nullability --- python/pyspark/pandas/data_type_ops/base.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py index 18c6e9a4b584c..d927561ddd9c6 100644 --- a/python/pyspark/pandas/data_type_ops/base.py +++ b/python/pyspark/pandas/data_type_ops/base.py @@ -39,6 +39,7 @@ NumericType, StringType, StructType, + StructField, TimestampType, TimestampNTZType, UserDefinedType, @@ -354,6 +355,7 @@ def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: from pyspark.pandas.frame import DataFrame from pyspark.pandas.internal import NATURAL_ORDER_COLUMN_NAME, InternalField + len_right = len(right) if len(left) != len(right): raise ValueError("Lengths must be equal") @@ -421,7 +423,15 @@ def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: scol_for(sdf_new, index_scol_name) for index_scol_name in index_scol_names ], data_spark_columns=[scol_for(sdf_new, scol_name)], - data_fields=[InternalField.from_struct_field(sdf_new.select(scol_name).schema[0])], + data_fields=[ + InternalField.from_struct_field( + StructField( + scol_name, + BooleanType(), + nullable=left._internal.data_fields[0].nullable, + ) + ) + ], ) return first_series(DataFrame(internal)) else: From 92753a2b89fda8be6f4e603a4c337d520cb51933 Mon Sep 17 00:00:00 2001 From: itholic Date: Wed, 6 Oct 2021 11:50:33 +0900 Subject: [PATCH 9/9] fix test failure --- python/pyspark/pandas/data_type_ops/base.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py index d927561ddd9c6..4ef352f693dce 100644 --- a/python/pyspark/pandas/data_type_ops/base.py +++ b/python/pyspark/pandas/data_type_ops/base.py @@ -39,7 +39,6 @@ NumericType, StringType, StructType, - StructField, TimestampType, TimestampNTZType, UserDefinedType, @@ -417,19 +416,23 @@ def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex: # | 4|false| # +-----------------+-----+ sdf_new = sdf_new.select("col.*") + + index_spark_columns = [ + scol_for(sdf_new, index_scol_name) for index_scol_name in index_scol_names + ] + data_spark_columns = [scol_for(sdf_new, scol_name)] + internal = left._internal.copy( spark_frame=sdf_new, - index_spark_columns=[ - scol_for(sdf_new, index_scol_name) for index_scol_name in index_scol_names + index_spark_columns=index_spark_columns, + data_spark_columns=data_spark_columns, + index_fields=[ + InternalField.from_struct_field(index_field) + for index_field in sdf_new.select(index_spark_columns).schema.fields ], - data_spark_columns=[scol_for(sdf_new, scol_name)], data_fields=[ InternalField.from_struct_field( - StructField( - scol_name, - BooleanType(), - nullable=left._internal.data_fields[0].nullable, - ) + sdf_new.select(data_spark_columns).schema.fields[0] ) ], )