Skip to content

Commit

Permalink
Refactor Fugue test
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan authored Nov 22, 2023
1 parent 846cca9 commit ad14b10
Show file tree
Hide file tree
Showing 58 changed files with 1,315 additions and 731 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[MESSAGES CONTROL]
disable = C0103,C0114,C0115,C0116,C0122,C0200,C0201,C0302,C0411,C0415,E0401,E0712,E1130,E1136,E5110,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R0915,R1705,R1710,R1714,R1718,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0237,W0511,W0613,W0622,W0631,W0640,W0703,W0707,W1116
disable = C0103,C0114,C0115,C0116,C0122,C0200,C0201,C0302,C0411,C0415,E0401,E0712,E1130,E1136,E5110,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R0915,R1705,R1710,R1714,R1718,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0237,W0511,W0603,W0613,W0621,W0622,W0631,W0640,W0703,W0707,W1116
# TODO: R0205: inherits from object, can be safely removed
11 changes: 11 additions & 0 deletions fugue/test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# flake8: noqa
from .pandas_backend import NativeTestBackend, PandasTestBackend
from .plugins import (
FugueTestBackend,
FugueTestContext,
FugueTestSuite,
extract_conf,
fugue_test_backend,
fugue_test_suite,
with_backend,
)
24 changes: 24 additions & 0 deletions fugue/test/pandas_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from contextlib import contextmanager
from typing import Any, Dict, Iterator

from .plugins import FugueTestBackend, fugue_test_backend


@fugue_test_backend
class PandasTestBackend(FugueTestBackend):
name = "pandas"

@classmethod
@contextmanager
def session_context(cls, session_conf: Dict[str, Any]) -> Iterator[Any]:
yield "pandas"


@fugue_test_backend
class NativeTestBackend(FugueTestBackend):
name = "native"

@classmethod
@contextmanager
def session_context(cls, session_conf: Dict[str, Any]) -> Iterator[Any]:
yield "native"
235 changes: 235 additions & 0 deletions fugue/test/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type

from triad import assert_or_throw
from triad.utils.entry_points import load_entry_point

try:
import pytest

_HAS_PYTEST = True
except ImportError: # pragma: no cover
_HAS_PYTEST = False


_FUGUE_TEST_BACKENDS: Dict[str, Type["FugueTestBackend"]] = {}
_FUGUE_TEST_ALL_INI_CONF: Dict[str, Any] = {}
_FUGUE_TEST_INI_FUGUE_CONF: Dict[str, Any] = {}
_FUGUE_BACKEND_SESSION_FIXTURES: Dict[str, Any] = {}


def _set_global_conf(conf: Dict[str, Any]) -> None:
global _FUGUE_TEST_ALL_INI_CONF, _FUGUE_TEST_INI_FUGUE_CONF
_FUGUE_TEST_ALL_INI_CONF = conf
_FUGUE_TEST_INI_FUGUE_CONF = _extract_fugue_conf(conf)


def _get_all_ini_conf() -> Dict[str, Any]:
return _FUGUE_TEST_ALL_INI_CONF


def _get_all_backend_session_fixtures() -> Dict[str, Any]:
return _FUGUE_BACKEND_SESSION_FIXTURES


def _load_all_backends() -> None:
from fugue.constants import FUGUE_ENTRYPOINT

load_entry_point(FUGUE_ENTRYPOINT)


def with_backend(ctx: str, *other: str, skip_missing: bool = False) -> Any:
import pytest

_load_all_backends()

_ctx = _construct_parameterized_fixture([ctx] + list(other), skip_missing)
return lambda f: pytest.mark.parametrize("backend_context", _ctx, indirect=True)(
pytest.mark.usefixtures("backend_context")(f)
)


def fugue_test_backend(cls: Type["FugueTestBackend"]) -> Type["FugueTestBackend"]:
assert_or_throw(
issubclass(cls, FugueTestBackend),
ValueError(f"{cls} is not a FugueTestBackend"),
)
name = cls.name.strip().lower()
assert_or_throw(
name != "" and name != "fugue",
ValueError(f"Fugue test backend name cannot be empty or fugue: {cls}"),
)
assert_or_throw(
name not in _FUGUE_TEST_BACKENDS,
ValueError(f"Duplicate Fugue test backend name: {name}"),
)
_FUGUE_TEST_BACKENDS[name] = cls
cls.generate_session_fixture()
return cls


class FugueTestSuite:
backend: Any
tmp_path: Path

__test__ = False
_test_context: Any = None

if _HAS_PYTEST:

@pytest.fixture(autouse=True)
def init_builtin_per_func_context(self, tmp_path):
self.tmp_path = tmp_path

@property
def context(self) -> "FugueTestContext":
return self._test_context

@property
def engine(self) -> Any:
return self.context.engine


def fugue_test_suite(backend: Any, mark_test: Optional[bool] = None) -> Any:
def deco(cls: Type["FugueTestSuite"]) -> Type["FugueTestSuite"]:
import pytest

assert_or_throw(
issubclass(cls, FugueTestSuite),
ValueError(f"{cls} is not a FugueTestSuite"),
)
if mark_test is not None:
cls.__test__ = mark_test
c, extra_conf = _parse_backend(backend)
return pytest.mark.parametrize(
"backend_context", [pytest.param((c, extra_conf), id=c)], indirect=True
)(pytest.mark.usefixtures("_class_backend_context")(cls))

return deco


@dataclass
class FugueTestContext:
engine: Any
session: Any
name: str


class FugueTestBackend:
name = ""
default_session_conf: Dict[str, Any] = {}
default_fugue_conf: Dict[str, Any] = {}
session_conf: Dict[str, Any] = {}
fugue_conf: Dict[str, Any] = {}

@classmethod
def transform_session_conf(cls, conf: Dict[str, Any]) -> Dict[str, Any]:
return extract_conf(conf, cls.name + ".", remove_prefix=True)

@classmethod
@contextmanager
def session_context(cls, session_conf: Dict[str, Any]) -> Iterator[Any]:
raise NotImplementedError # pragma: no cover

@classmethod
@contextmanager
def generate_session_fixture(cls) -> Iterator[Any]:
session_conf = _merge_dicts(
cls.default_session_conf,
cls.transform_session_conf(_FUGUE_TEST_ALL_INI_CONF),
)
with cls.session_context(session_conf) as session:
yield session

@classmethod
@contextmanager
def generate_context_fixture(
cls, session: object, extra_fugue_conf: Dict[str, Any]
) -> Iterator[FugueTestContext]:
import fugue.api as fa

fugue_conf = _merge_dicts(
cls.default_fugue_conf,
_FUGUE_TEST_INI_FUGUE_CONF,
_extract_fugue_conf(extra_fugue_conf),
)
with fa.engine_context(session, fugue_conf) as engine:
yield FugueTestContext(engine=engine, session=session, name=cls.name)


def extract_conf(
conf: Dict[str, Any], prefix: str, remove_prefix: bool
) -> Dict[str, Any]:
res: Dict[str, Any] = {}
for k, v in conf.items():
if k.startswith(prefix):
if remove_prefix:
k = k[len(prefix) :]
res[k] = v
return res


@contextmanager
def _make_backend_context(obj: Any, session: Any) -> Iterator[Any]:
_load_all_backends()
if isinstance(obj, str):
key = obj
extra_conf: Dict[str, Any] = {}
else:
key, extra_conf = obj
assert_or_throw(
key in _FUGUE_TEST_BACKENDS,
lambda: ValueError(
f"Undefined Fugue test backend: {key}, "
f"available backends: {list(_FUGUE_TEST_BACKENDS.keys())}"
),
)
with _FUGUE_TEST_BACKENDS[key].generate_context_fixture(session, extra_conf) as ctx:
yield ctx


def _extract_fugue_conf(conf: Dict[str, Any]) -> Dict[str, Any]:
return extract_conf(conf, "fugue.", remove_prefix=False)


def _construct_parameterized_fixture(ctx: List[Any], skip_missing: bool) -> List[Any]:
import pytest

_ctx: List[Tuple[str, Dict[str, Any]]] = []
for x in ctx:
c, extra_conf = _parse_backend(x)
if c not in _FUGUE_TEST_BACKENDS:
if not skip_missing:
raise ValueError(
f"Undefined Fugue test backend: {c}, "
f"available backends: {list(_FUGUE_TEST_BACKENDS.keys())}"
)
else:
_ctx.append(
pytest.param(
c,
marks=pytest.mark.xfail(
reason="Undefined Fugue test backend", run=False
),
id=c,
)
)
else:
_ctx.append(pytest.param((c, extra_conf), id=c))
return _ctx


def _merge_dicts(*dicts: Dict[str, Any]) -> Dict[str, Any]:
res: Dict[str, Any] = {}
for d in dicts:
res.update(d)
return res


def _parse_backend(ctx: Any) -> Tuple[str, Dict[str, Any]]:
if isinstance(ctx, str):
return ctx, {}
else:
return ctx[0], ctx[1]
22 changes: 21 additions & 1 deletion fugue_dask/registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Any
from contextlib import contextmanager
from typing import Any, Dict, Iterator

import dask
import dask.dataframe as dd
from dask.distributed import Client

import fugue.test as ft
from fugue import DataFrame
from fugue.dev import (
DataFrameParam,
Expand Down Expand Up @@ -66,3 +69,20 @@ def to_output_df(self, output: Any, schema: Any, ctx: Any) -> DataFrame:

def count(self, df: DataFrame) -> int: # pragma: no cover
raise NotImplementedError("not allowed")


@ft.fugue_test_backend
class DaskTestBackend(ft.FugueTestBackend):
name = "dask"

@classmethod
def transform_session_conf(cls, conf: Dict[str, Any]) -> Dict[str, Any]:
return ft.extract_conf(conf, "dask.", remove_prefix=True)

@classmethod
@contextmanager
def session_context(cls, session_conf: Dict[str, Any]) -> Iterator[Any]:
with Client(**session_conf) as client:
dask.config.set({"dataframe.shuffle.method": "tasks"})
dask.config.set({"dataframe.convert-string": False})
yield client
25 changes: 23 additions & 2 deletions fugue_duckdb/registry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Tuple

from duckdb import DuckDBPyConnection, DuckDBPyRelation
from triad import run_at_def
Expand All @@ -15,7 +15,7 @@
fugue_annotated_param,
is_pandas_or,
)
from fugue.plugins import infer_execution_engine
from fugue.plugins import infer_execution_engine, parse_execution_engine
from fugue_duckdb.dataframe import DuckDataFrame
from fugue_duckdb.execution_engine import DuckDBEngine, DuckExecutionEngine

Expand Down Expand Up @@ -67,6 +67,27 @@ def _register_engines() -> None:
register_sql_engine("duckdb", lambda engine: DuckDBEngine(engine))


try:
from fugue_duckdb.dask import DuckDaskExecutionEngine
from dask.distributed import Client

@parse_execution_engine.candidate(
lambda engine, conf, **kwargs: isinstance(engine, list)
and len(engine) == 2
and isinstance(engine[0], DuckDBPyConnection)
and isinstance(engine[1], Client),
)
def _parse_duck_dask_client(
engine: Tuple[DuckDBPyConnection, Client], conf: Any, **kwargs: Any
) -> DuckDaskExecutionEngine:
return DuckDaskExecutionEngine(
connection=engine[0], dask_client=engine[1], conf=conf
)

except Exception: # pragma: no cover
pass


@fugue_annotated_param(DuckExecutionEngine)
class _DuckExecutionEngineParam(ExecutionEngineParam):
pass
Expand Down
Loading

0 comments on commit ad14b10

Please sign in to comment.