Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan authored Mar 18, 2024
1 parent f553a4d commit 45fe0bf
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 182 deletions.
22 changes: 4 additions & 18 deletions fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,6 @@
rename_dataframe_column_names = rename


def _pa_type_eq(t1: pa.DataType, t2: pa.DataType) -> bool:
# should ignore the name difference of list
# e.g. list<item: string> == list<l: string>
if pa.types.is_list(t1) and pa.types.is_list(t2): # pragma: no cover
return _pa_type_eq(t1.value_type, t2.value_type)
return t1 == t2


def _schema_eq(s1: Schema, s2: Schema) -> bool:
if s1 == s2:
return True
return s1.names == s2.names and all(
_pa_type_eq(f1.type, f2.type) for f1, f2 in zip(s1.fields, s2.fields)
)


def _df_eq(
df: DataFrame,
data: Any,
Expand All @@ -46,6 +30,7 @@ def _df_eq(
check_schema: bool = True,
check_content: bool = True,
no_pandas: bool = False,
equal_type_groups: Optional[List[List[Any]]] = None,
throw=False,
) -> bool:
"""Compare if two dataframes are equal. Is for internal, unit test
Expand All @@ -66,6 +51,7 @@ def _df_eq(
:param no_pandas: if true, it will compare the string representations of the
dataframes, otherwise, it will convert both to pandas dataframe to compare,
defaults to False
:param equal_type_groups: the groups to treat as equal types, defaults to None.
:param throw: if to throw error if not equal, defaults to False
:return: if they equal
"""
Expand All @@ -78,8 +64,8 @@ def _df_eq(
assert (
df1.count() == df2.count()
), f"count mismatch {df1.count()}, {df2.count()}"
assert not check_schema or _schema_eq(
df.schema, df2.schema
assert not check_schema or df.schema.is_like(
df2.schema, equal_groups=equal_type_groups
), f"schema mismatch {df.schema.pa_schema}, {df2.schema.pa_schema}"
if not check_content:
return True
Expand Down
12 changes: 11 additions & 1 deletion fugue/test/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type

from fugue.dataframe.utils import _df_eq
from triad import assert_or_throw, run_once
from triad.utils.entry_points import load_entry_point

Expand Down Expand Up @@ -160,6 +160,7 @@ def test_spark(self):

backend: Any
tmp_path: Path
equal_type_groups: Any = None

__test__ = False
_test_context: Any = None
Expand All @@ -180,6 +181,15 @@ def engine(self) -> Any:
"""The engine object inside the ``FugueTestContext``"""
return self.context.engine

def get_equal_type_groups(self) -> Optional[List[List[Any]]]:
return None

def df_eq(self, *args: Any, **kwargs: Any) -> bool:
"""A wrapper of :func:`~fugue.dataframe.utils.df_eq`"""
if "equal_type_groups" not in kwargs:
kwargs["equal_type_groups"] = self.equal_type_groups
return _df_eq(*args, **kwargs)


def fugue_test_suite(backend: Any, mark_test: Optional[bool] = None) -> Any:
def deco(cls: Type["FugueTestSuite"]) -> Type["FugueTestSuite"]:
Expand Down
29 changes: 14 additions & 15 deletions fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
from fugue.column import col
from fugue.column import functions as ff
from fugue.column import lit
from fugue.dataframe.utils import _df_eq as df_eq
from fugue.exceptions import (
FugueInterfacelessError,
FugueWorkflowCompileError,
Expand All @@ -81,7 +80,7 @@ class BuiltInTests(object):
class Tests(ft.FugueTestSuite):
def test_workflows(self):
a = FugueWorkflow().df([[0]], "a:int")
df_eq(a.compute(self.engine), [[0]], "a:int")
self.df_eq(a.compute(self.engine), [[0]], "a:int")

def test_create_show(self):
with FugueWorkflow() as dag:
Expand Down Expand Up @@ -1690,7 +1689,7 @@ def tr(df: pd.DataFrame, n=1) -> pd.DataFrame:
""",
x=sdf3,
).run()
df_eq(
self.df_eq(
res["res"],
[[3, 4, 13]],
schema="a:long,b:int,c:long",
Expand Down Expand Up @@ -1723,9 +1722,9 @@ def tr(df: pd.DataFrame) -> pd.DataFrame:
df1 = pd.DataFrame([[0, 1], [2, 3]], columns=["a b", " "])
df2 = pd.DataFrame([[0, 10], [20, 3]], columns=["a b", "d"])
r = fa.inner_join(df1, df2, as_fugue=True)
df_eq(r, [[0, 1, 10]], "`a b`:long,` `:long,d:long", throw=True)
self.df_eq(r, [[0, 1, 10]], "`a b`:long,` `:long,d:long", throw=True)
r = fa.transform(r, tr)
df_eq(
self.df_eq(
r,
[[0, 1, 10, 2]],
"`a b`:long,` `:long,d:long,`c *`:long",
Expand All @@ -1739,7 +1738,7 @@ def tr(df: pd.DataFrame) -> pd.DataFrame:
col("d"),
col("c *").cast(int),
)
df_eq(
self.df_eq(
r,
[[0, 1, 10, 2]],
"`a b `:long,`x y`:long,d:long,`c *`:long",
Expand All @@ -1748,13 +1747,13 @@ def tr(df: pd.DataFrame) -> pd.DataFrame:
r = fa.rename(r, {"a b ": "a b"})
fa.save(r, f_csv, header=True, force_single=True)
fa.save(r, f_parquet)
df_eq(
self.df_eq(
fa.load(f_parquet, columns=["x y", "d", "c *"], as_fugue=True),
[[1, 10, 2]],
"`x y`:long,d:long,`c *`:long",
throw=True,
)
df_eq(
self.df_eq(
fa.load(
f_csv,
header=True,
Expand All @@ -1766,7 +1765,7 @@ def tr(df: pd.DataFrame) -> pd.DataFrame:
"d:str,`c *`:str",
throw=True,
)
df_eq(
self.df_eq(
fa.load(
f_csv,
header=True,
Expand All @@ -1786,14 +1785,14 @@ def tr(df: pd.DataFrame) -> pd.DataFrame:
""",
as_fugue=True,
)
df_eq(r, [[0, 1, 10]], "`a b`:long,` `:long,d:long", throw=True)
self.df_eq(r, [[0, 1, 10]], "`a b`:long,` `:long,d:long", throw=True)
r = fa.fugue_sql(
"""
TRANSFORM r USING tr SCHEMA *,`c *`:long
""",
as_fugue=True,
)
df_eq(
self.df_eq(
r,
[[0, 1, 10, 2]],
"`a b`:long,` `:long,d:long,`c *`:long",
Expand All @@ -1805,7 +1804,7 @@ def tr(df: pd.DataFrame) -> pd.DataFrame:
""",
as_fugue=True,
)
df_eq(
self.df_eq(
r,
[[0, 1, 10, 2]],
"`a b`:long,` `:long,d:long,`c *`:long",
Expand All @@ -1826,19 +1825,19 @@ def tr(df: pd.DataFrame) -> pd.DataFrame:
f_parquet=f_parquet,
f_csv=f_csv,
).run()
df_eq(
self.df_eq(
res["r1"],
[[1, 10, 2]],
"`x y`:long,d:long,`c *`:long",
throw=True,
)
df_eq(
self.df_eq(
res["r2"],
[["1", "10", "2"]],
"`x y`:str,d:str,`c *`:str",
throw=True,
)
df_eq(
self.df_eq(
res["r3"],
[[0, 1, 10, 2]],
"`a b`:long,`x y`:long,d:long,`c *`:long",
Expand Down
7 changes: 3 additions & 4 deletions fugue_test/dataframe_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import fugue.api as fi
import fugue.test as ft
from fugue.dataframe import ArrowDataFrame, DataFrame
from fugue.dataframe.utils import _df_eq as df_eq
from fugue.exceptions import FugueDataFrameOperationError, FugueDatasetEmptyError


Expand Down Expand Up @@ -121,7 +120,7 @@ def test_select(self):
assert [[1]] == fi.as_array(df, type_safe=True)

df = self.df([["a", 1, 2]], "a:str,b:int,c:int")
df_eq(
self.df_eq(
fi.as_fugue_df(fi.select_columns(df, ["c", "a"])),
[[2, "a"]],
"a:str,c:int",
Expand All @@ -132,13 +131,13 @@ def test_rename(self):
df = self.df(data, "a:str,b:int")
df2 = fi.rename(df, columns=dict(a="aa"))
assert fi.get_schema(df) == "a:str,b:int"
df_eq(fi.as_fugue_df(df2), data, "aa:str,b:int", throw=True)
self.df_eq(fi.as_fugue_df(df2), data, "aa:str,b:int", throw=True)

for data in [[["a", 1]], []]:
df = self.df(data, "a:str,b:int")
df3 = fi.rename(df, columns={})
assert fi.get_schema(df3) == "a:str,b:int"
df_eq(fi.as_fugue_df(df3), data, "a:str,b:int", throw=True)
self.df_eq(fi.as_fugue_df(df3), data, "a:str,b:int", throw=True)

def test_rename_invalid(self):
df = self.df([["a", 1]], "a:str,b:int")
Expand Down
Loading

0 comments on commit 45fe0bf

Please sign in to comment.