From c1ba28c6eb90530fe6acc7a93fa35be554e0cf78 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 10 Aug 2023 05:33:32 +0000 Subject: [PATCH] Integrate with dask>=2023.7.1 --- .devcontainer/devcontainer.json | 49 +++++----- .pylintrc | 2 +- fugue/dataframe/arrow_dataframe.py | 7 +- fugue/dataframe/dataframe.py | 20 +++-- fugue_dask/_constants.py | 3 + fugue_dask/_io.py | 30 +++++-- fugue_dask/_utils.py | 20 ++--- fugue_dask/dataframe.py | 46 ++++++---- fugue_dask/execution_engine.py | 104 +++++++++++++++------- fugue_dask/registry.py | 49 +++++----- fugue_test/execution_suite.py | 4 +- requirements.txt | 1 + tests/fugue_dask/test_dataframe.py | 20 ++--- tests/fugue_dask/test_execution_engine.py | 2 + tests/fugue_dask/test_importless.py | 1 + tests/fugue_dask/test_io.py | 4 +- tests/fugue_dask/test_utils.py | 2 +- 17 files changed, 223 insertions(+), 141 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7339cb5a..dc1fdead 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,27 +1,32 @@ { "name": "Fugue Development Environment", - "image": "fugueproject/devenv:0.7.7", - "settings": { - "terminal.integrated.shell.linux": "/bin/bash", - "python.pythonPath": "/usr/local/bin/python", - "python.linting.enabled": true, - "python.linting.pylintEnabled": true, - "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", - "python.formatting.blackPath": "/usr/local/py-utils/bin/black", - "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", - "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", - "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8", - "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", - "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", - "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", - "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint" - }, - "extensions": [ - "ms-python.python", - "ms-python.isort", - "GitHub.copilot", - "njpwerner.autodocstring" - ], + "_image": "fugueproject/devenv:0.7.7", + "image": "mcr.microsoft.com/vscode/devcontainers/python:3.10", + "customizations": { + "vscode": { + "settings": { + "terminal.integrated.shell.linux": "/bin/bash", + "python.pythonPath": "/usr/local/bin/python", + "python.linting.enabled": true, + "python.linting.pylintEnabled": true, + "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", + "python.formatting.blackPath": "/usr/local/py-utils/bin/black", + "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", + "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", + "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8", + "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", + "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", + "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", + "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint" + }, + "extensions": [ + "ms-python.python", + "ms-python.isort", + "GitHub.copilot", + "njpwerner.autodocstring" + ] + }}, + "forwardPorts": [ 8888 ], diff --git a/.pylintrc b/.pylintrc index a5b7fefa..d26111fe 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,3 +1,3 @@ [MESSAGES CONTROL] -disable = C0103,C0114,C0115,C0116,C0122,C0200,C0201,C0302,C0411,C0415,E0401,E0712,E1130,E5110,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R0915,R1705,R1710,R1714,R1718,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0237,W0511,W0613,W0622,W0631,W0640,W0703,W0707,W1116 +disable = C0103,C0114,C0115,C0116,C0122,C0200,C0201,C0302,C0411,C0415,E0401,E0712,E1130,E1136,E5110,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R0915,R1705,R1710,R1714,R1718,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0237,W0511,W0613,W0622,W0631,W0640,W0703,W0707,W1116 # TODO: R0205: inherits from object, can be safely removed diff --git a/fugue/dataframe/arrow_dataframe.py b/fugue/dataframe/arrow_dataframe.py index 0990019c..2084ea56 100644 --- a/fugue/dataframe/arrow_dataframe.py +++ b/fugue/dataframe/arrow_dataframe.py @@ -141,7 +141,12 @@ def count(self) -> int: return self.native.shape[0] def as_pandas(self) -> pd.DataFrame: - return self.native.to_pandas(use_threads=False, date_as_object=False) + mapper: Any = None + if hasattr(pd, "ArrowDtype"): + mapper = pd.ArrowDtype + return self.native.to_pandas( + use_threads=False, date_as_object=False, types_mapper=mapper + ) def head( self, n: int, columns: Optional[List[str]] = None diff --git a/fugue/dataframe/dataframe.py b/fugue/dataframe/dataframe.py index 4408f39e..9bc3d3f6 100644 --- a/fugue/dataframe/dataframe.py +++ b/fugue/dataframe/dataframe.py @@ -8,7 +8,6 @@ from triad.collections.schema import Schema from triad.exceptions import InvalidOperationError from triad.utils.assertion import assert_or_throw -from triad.utils.pandas_like import PD_UTILS from .._utils.display import PrettyTable from ..collections.yielded import Yielded @@ -113,14 +112,17 @@ def peek_dict(self) -> Dict[str, Any]: def as_pandas(self) -> pd.DataFrame: """Convert to pandas DataFrame""" pdf = pd.DataFrame(self.as_array(), columns=self.columns) - if len(pdf) == 0: # TODO: move to triad - return pd.DataFrame( - { - k: pd.Series(dtype=v.type.to_pandas_dtype()) - for k, v in self.schema.items() - } - ) - return PD_UTILS.enforce_type(pdf, self.schema.pa_schema, null_safe=True) + return pdf.astype( + self.schema.to_pandas_dtype(use_extension_types=True, use_arrow_dtype=True) + ) + # if len(pdf) == 0: # TODO: move to triad + # return pd.DataFrame( + # { + # k: pd.Series(dtype=v.type.to_pandas_dtype()) + # for k, v in self.schema.items() + # } + # ) + # return PD_UTILS.enforce_type(pdf, self.schema.pa_schema, null_safe=True) def as_arrow(self, type_safe: bool = False) -> pa.Table: """Convert to pyArrow DataFrame""" diff --git a/fugue_dask/_constants.py b/fugue_dask/_constants.py index ee2c43d7..6f1ed6b3 100644 --- a/fugue_dask/_constants.py +++ b/fugue_dask/_constants.py @@ -1,4 +1,7 @@ from typing import Any, Dict +import pandas as pd +import dask FUGUE_DASK_CONF_DEFAULT_PARTITIONS = "fugue.dask.default.partitions" FUGUE_DASK_DEFAULT_CONF: Dict[str, Any] = {FUGUE_DASK_CONF_DEFAULT_PARTITIONS: -1} +FUGUE_DASK_USE_ARROW = hasattr(pd, "ArrowDtype") and dask.__version__ >= "2023.7.1" diff --git a/fugue_dask/_io.py b/fugue_dask/_io.py index 7de19e0a..a3a9b6c1 100644 --- a/fugue_dask/_io.py +++ b/fugue_dask/_io.py @@ -1,5 +1,5 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union - +import pyarrow as pa import fs as pfs from dask import dataframe as dd from fugue._utils.io import FileParser, _get_single_files @@ -7,7 +7,7 @@ from triad.collections.fs import FileSystem from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw - +import pandas as pd from fugue_dask.dataframe import DaskDataFrame @@ -58,22 +58,40 @@ def save_df( def _save_parquet(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: - df.native.to_parquet(p.uri, **{"schema": df.schema.pa_schema, **kwargs}) + params = { + "schema": df.schema.pa_schema, + "write_index": False, + **kwargs, + } + if int(pa.__version__.split(".")[0]) >= 11: + has_nested = any(pa.types.is_nested(f.type) for f in df.schema.fields) + params["store_schema"] = not has_nested + df.native.to_parquet(p.uri, **params) def _load_parquet( p: FileParser, columns: Any = None, **kwargs: Any ) -> Tuple[dd.DataFrame, Any]: + params = dict(kwargs) + dtype_backend: Any = params.pop("dtype_backend", None) + params["engine"] = "pyarrow" + params["arrow_to_pandas"] = {"ignore_metadata": True} + if pd.__version__ >= "1.5": + dtype_backend = "pyarrow" if columns is None: - pdf = dd.read_parquet(p.uri, **kwargs) + pdf = dd.read_parquet(p.uri, dtype_backend=dtype_backend, **params) schema = Schema(pdf.head(1)) return pdf, schema if isinstance(columns, list): # column names - pdf = dd.read_parquet(p.uri, columns=columns, **kwargs) + pdf = dd.read_parquet( + p.uri, columns=columns, dtype_backend=dtype_backend, **params + ) schema = Schema(pdf.head(1)) return pdf, schema schema = Schema(columns) - pdf = dd.read_parquet(p.uri, columns=schema.names, **kwargs) + pdf = dd.read_parquet( + p.uri, columns=schema.names, dtype_backend=dtype_backend, **params + ) return pdf, schema diff --git a/fugue_dask/_utils.py b/fugue_dask/_utils.py index c7d2ac1e..afde9a82 100644 --- a/fugue_dask/_utils.py +++ b/fugue_dask/_utils.py @@ -8,12 +8,12 @@ from dask.dataframe.core import DataFrame from dask.distributed import Client, get_client from triad.utils.pandas_like import PandasLikeUtils -from triad.utils.pyarrow import to_pandas_dtype, to_single_pandas_dtype +from triad.utils.pyarrow import to_pandas_dtype import fugue.api as fa from fugue.constants import FUGUE_CONF_DEFAULT_PARTITIONS -from ._constants import FUGUE_DASK_CONF_DEFAULT_PARTITIONS +from ._constants import FUGUE_DASK_CONF_DEFAULT_PARTITIONS, FUGUE_DASK_USE_ARROW _FUGUE_DASK_TEMP_IDX_COLUMN = "_fugue_dask_temp_index" @@ -241,13 +241,9 @@ def safe_to_pandas_dtype(self, schema: pa.Schema) -> Dict[str, np.dtype]: This is a temporary solution, it will be removed when we use the Slide package. Do not use this function directly. """ - res: Dict[str, np.dtype] = {} - for f in schema: - if pa.types.is_nested(f.type): - res[f.name] = np.dtype(object) - else: - res[f.name] = to_single_pandas_dtype(f.type, use_extension_types=False) - return res + return to_pandas_dtype( + schema, use_extension_types=True, use_arrow_dtype=FUGUE_DASK_USE_ARROW + ) # TODO: merge this back to base class def enforce_type( # noqa: C901 @@ -281,10 +277,10 @@ def enforce_type( # noqa: C901 try: s = s.str.lower() == "true" except AttributeError: - s = s.fillna(0).astype(bool) + s = s.fillna(0).astype(pd.BooleanDtype()) else: - s = s.fillna(0).astype(bool) - s = s.mask(ns, None).astype("boolean") + s = s.fillna(0).astype(pd.BooleanDtype()) + s = s.mask(ns, None).astype(pd.BooleanDtype()) elif pa.types.is_integer(v.type) and not pd.api.types.is_integer_dtype( s.dtype ): diff --git a/fugue_dask/dataframe.py b/fugue_dask/dataframe.py index 487bcac0..e06d655e 100644 --- a/fugue_dask/dataframe.py +++ b/fugue_dask/dataframe.py @@ -1,8 +1,8 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple import dask.dataframe as dd -import pandas import pyarrow as pa +import pandas as pd from triad.collections.schema import Schema from triad.utils.assertion import assert_arg_not_none, assert_or_throw from triad.utils.pyarrow import to_pandas_dtype @@ -31,6 +31,7 @@ ) from ._utils import DASK_UTILS, get_default_partitions +from ._constants import FUGUE_DASK_USE_ARROW class DaskDataFrame(DataFrame): @@ -71,15 +72,20 @@ def __init__( # noqa: C901 df = df.to_frame() pdf = df schema = None if schema is None else _input_schema(schema) - elif isinstance(df, (pandas.DataFrame, pandas.Series)): - if isinstance(df, pandas.Series): + elif isinstance(df, (pd.DataFrame, pd.Series)): + if isinstance(df, pd.Series): df = df.to_frame() pdf = dd.from_pandas(df, npartitions=num_partitions, sort=False) schema = None if schema is None else _input_schema(schema) elif isinstance(df, Iterable): schema = _input_schema(schema).assert_not_empty() t = PandasDataFrame(df, schema) - pdf = dd.from_pandas(t.native, npartitions=num_partitions, sort=False) + tdf = t.native.astype( + schema.to_pandas_dtype( + use_extension_types=True, use_arrow_dtype=FUGUE_DASK_USE_ARROW + ) + ) + pdf = dd.from_pandas(tdf, npartitions=num_partitions, sort=False) type_safe = False else: raise ValueError(f"{df} is incompatible with DaskDataFrame") @@ -137,7 +143,7 @@ def persist(self, **kwargs: Any) -> "DaskDataFrame": def count(self) -> int: return self.native.shape[0].compute() - def as_pandas(self) -> pandas.DataFrame: + def as_pandas(self) -> pd.DataFrame: return self.native.compute().reset_index(drop=True) def rename(self, columns: Dict[str, str]) -> DataFrame: @@ -153,7 +159,7 @@ def alter_columns(self, columns: Any) -> DataFrame: if new_schema == self.schema: return self new_pdf = self.native.assign() - pd_types = to_pandas_dtype(new_schema.pa_schema) + pd_types = to_pandas_dtype(new_schema.pa_schema, use_extension_types=True) for k, v in new_schema.items(): if not v.type.equals(self.schema[k].type): old_type = self.schema[k].type @@ -163,33 +169,35 @@ def alter_columns(self, columns: Any) -> DataFrame: series = new_pdf[k] ns = series.isnull() series = series.fillna(0).astype(int).astype(str) - new_pdf[k] = series.mask(ns, None) + new_pdf[k] = series.mask(ns, pd.NA) # bool -> str elif pa.types.is_boolean(old_type) and pa.types.is_string(new_type): series = new_pdf[k] ns = series.isnull() - positive = series != 0 - new_pdf[k] = "False" - new_pdf[k] = new_pdf[k].mask(positive, "True").mask(ns, None) + new_pdf[k] = series.astype(str).mask(ns, pd.NA) # str -> bool elif pa.types.is_string(old_type) and pa.types.is_boolean(new_type): series = new_pdf[k] ns = series.isnull() new_pdf[k] = ( - series.fillna("true") - .apply(lambda x: None if x is None else x.lower()) + (series.fillna("true").str.lower() == "true") .mask(ns, None) + .astype("boolean") ) elif pa.types.is_integer(new_type): series = new_pdf[k] ns = series.isnull() - series = series.fillna(0).astype(pd_types[k]) - new_pdf[k] = series.mask(ns, None) + if pa.types.is_string(old_type): + series = series.fillna("0") + else: + series = series.fillna(0) + series = series.astype(pd_types[k]) + new_pdf[k] = series.mask(ns, pd.NA) else: series = new_pdf[k] ns = series.isnull() series = series.astype(pd_types[k]) - new_pdf[k] = series.mask(ns, None) + new_pdf[k] = series.mask(ns, pd.NA) return DaskDataFrame(new_pdf, new_schema, type_safe=True) def as_array( @@ -221,6 +229,12 @@ def _apply_schema( assert_arg_not_none(pdf, "pdf") assert_arg_not_none(schema, "schema") return pdf, schema + if schema is not None: + pdf = pdf.astype( + schema.to_pandas_dtype( + use_extension_types=True, use_arrow_dtype=FUGUE_DASK_USE_ARROW + ) + ) DASK_UTILS.ensure_compatible(pdf) if pdf.columns.dtype == "object": # pdf has named schema pschema = Schema(DASK_UTILS.to_schema(pdf)) @@ -315,7 +329,7 @@ def _dd_head( n: int, columns: Optional[List[str]] = None, as_fugue: bool = False, -) -> pandas.DataFrame: +) -> pd.DataFrame: if columns is not None: df = df[columns] res = df.head(n, compute=True, npartitions=-1) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index f8a57488..06e3c5f4 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -10,15 +10,15 @@ from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from triad.utils.hash import to_uuid -from triad.utils.threading import RunOnce from triad.utils.pandas_like import PandasUtils +from triad.utils.threading import RunOnce from fugue import StructuredRawSQL -from fugue._utils.misc import import_fsql_dependency from fugue.collections.partition import ( PartitionCursor, PartitionSpec, parse_presort_exp, ) +from fugue.exceptions import FugueBug from fugue.constants import KEYWORD_PARALLELISM, KEYWORD_ROWCOUNT from fugue.dataframe import ( AnyDataFrame, @@ -41,15 +41,17 @@ ) from fugue_dask.dataframe import DaskDataFrame +from ._constants import FUGUE_DASK_USE_ARROW + _DASK_PARTITION_KEY = "__dask_partition_key__" -class QPDDaskEngine(SQLEngine): - """QPD execution implementation.""" +class DaskSQLEngine(SQLEngine): + """Dask-sql implementation.""" @property def dialect(self) -> Optional[str]: - return "spark" + return "trino" def to_df(self, df: AnyDataFrame, schema: Any = None) -> DataFrame: return to_dask_engine_df(df, schema) @@ -59,12 +61,33 @@ def is_distributed(self) -> bool: return True def select(self, dfs: DataFrames, statement: StructuredRawSQL) -> DataFrame: - qpd_dask = import_fsql_dependency("qpd_dask") + try: + from dask_sql import Context + except ImportError: # pragma: no cover + raise ImportError( + "dask-sql is not installed. " + "Please install it with `pip install dask-sql`" + ) + ctx = Context() + _dfs: Dict[str, dd.DataFrame] = {k: self._to_safe_df(v) for k, v in dfs.items()} + sql = statement.construct(dialect=self.dialect, log=self.log) + res = ctx.sql( + sql, + return_futures=False, + dataframes=_dfs, + config_options={"sql.identifier.case_sensitive": True}, + ) + return DaskDataFrame(res) - _dfs, _sql = self.encode(dfs, statement) - dask_dfs = {k: self.to_df(v).native for k, v in _dfs.items()} # type: ignore - df = qpd_dask.run_sql_on_dask(_sql, dask_dfs, ignore_case=True) - return DaskDataFrame(df) + def _to_safe_df(self, df: DataFrame) -> dd.DataFrame: + df = self.to_df(df) + ddf = df.native.astype( + df.schema.to_pandas_dtype(use_extension_types=True, use_arrow_dtype=False) + ) + for k, v in ddf.dtypes.to_dict().items(): + if v == pd.StringDtype("pyarrow"): + ddf[k] = ddf[k].astype(str) + return ddf class DaskMapEngine(MapEngine): @@ -76,7 +99,7 @@ def execution_engine_constraint(self) -> Type[ExecutionEngine]: def is_distributed(self) -> bool: return True - def map_dataframe( + def map_dataframe( # noqa: C901 self, df: DataFrame, map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame], @@ -91,6 +114,9 @@ def map_dataframe( presort_keys = list(presort.keys()) presort_asc = list(presort.values()) output_schema = Schema(output_schema) + output_dtypes = output_schema.to_pandas_dtype( + use_extension_types=True, use_arrow_dtype=FUGUE_DASK_USE_ARROW + ) input_schema = df.schema cursor = partition_spec.get_cursor(input_schema, 0) on_init_once: Any = ( @@ -101,9 +127,17 @@ def map_dataframe( ) ) - def _map(pdf: pd.DataFrame) -> pd.DataFrame: - if pdf.shape[0] == 0: - return PandasDataFrame([], output_schema).as_pandas() + def _fix_dask_bug(pdf: pd.DataFrame) -> pd.DataFrame: + assert_or_throw( + pdf.shape[1] == len(input_schema), + FugueBug( + "partitioned dataframe has different number of columns: " + f"{pdf.columns} vs {input_schema}" + ), + ) + return pdf + + def _core_map(pdf: pd.DataFrame) -> pd.DataFrame: if len(partition_spec.presort) > 0: pdf = pdf.sort_values(presort_keys, ascending=presort_asc) input_df = PandasDataFrame( @@ -115,34 +149,43 @@ def _map(pdf: pd.DataFrame) -> pd.DataFrame: output_df = map_func(cursor, input_df) return output_df.as_pandas()[output_schema.names] + def _map(pdf: pd.DataFrame, enforce_types: bool = True) -> pd.DataFrame: + if pdf.shape[0] == 0: + return PandasDataFrame([], output_schema).as_pandas() + pdf = pdf.reset_index(drop=True) + pdf = _fix_dask_bug(pdf) + res = _core_map(pdf) + return res.astype(output_dtypes) + def _gp_map(pdf: pd.DataFrame) -> pd.DataFrame: if pdf.shape[0] == 0: # pragma: no cover return PandasDataFrame([], output_schema).as_pandas() + pdf = pdf.reset_index(drop=True) + pdf = _fix_dask_bug(pdf) pu = PandasUtils() - return pu.safe_groupby_apply( - pdf.reset_index(drop=True), partition_spec.partition_by, _map - ) + res = pu.safe_groupby_apply(pdf, partition_spec.partition_by, _core_map) + return res.astype(output_dtypes) df = self.to_df(df) - meta = self.execution_engine.pl_utils.safe_to_pandas_dtype( # type: ignore - output_schema.pa_schema - ) pdf = self.execution_engine.repartition(df, partition_spec) if len(partition_spec.partition_by) == 0: - result = pdf.native.map_partitions(_map, meta=meta) # type: ignore + result = pdf.native.map_partitions(_map, meta=output_dtypes) # type: ignore else: if partition_spec.algo == "default": - engine = self.execution_engine - result = engine.pl_utils.safe_groupby_apply( # type: ignore - df.native, + result = df.native.groupby( partition_spec.partition_by, - _map, - meta=meta, - ) + sort=False, + group_keys=False, + dropna=False, + ).apply(_map, meta=output_dtypes) elif partition_spec.algo == "coarse": - result = pdf.native.map_partitions(_map, meta=meta) # type: ignore + result = pdf.native.map_partitions( # type: ignore + _map, meta=output_dtypes + ) else: - result = pdf.native.map_partitions(_gp_map, meta=meta) # type: ignore + result = pdf.native.map_partitions( # type: ignore + _gp_map, meta=output_dtypes + ) return DaskDataFrame(result, output_schema) @@ -194,7 +237,7 @@ def fs(self) -> FileSystem: return self._fs def create_default_sql_engine(self) -> SQLEngine: - return QPDDaskEngine(self) + return DaskSQLEngine(self) def create_default_map_engine(self) -> MapEngine: return DaskMapEngine(self) @@ -238,6 +281,7 @@ def repartition( return df if len(partition_spec.partition_by) > 0 and partition_spec.algo == "default": return df + p = partition_spec.get_num_partitions( **{ KEYWORD_ROWCOUNT: lambda: df.persist().count(), # type: ignore diff --git a/fugue_dask/registry.py b/fugue_dask/registry.py index a279443c..6eb27761 100644 --- a/fugue_dask/registry.py +++ b/fugue_dask/registry.py @@ -2,16 +2,19 @@ import dask.dataframe as dd from dask.distributed import Client -from triad import run_at_def -from fugue import DataFrame, register_execution_engine +from fugue import DataFrame from fugue.dev import ( DataFrameParam, ExecutionEngineParam, fugue_annotated_param, is_pandas_or, ) -from fugue.plugins import as_fugue_dataset, infer_execution_engine +from fugue.plugins import ( + as_fugue_dataset, + infer_execution_engine, + parse_execution_engine, +) from fugue_dask._utils import DASK_UTILS from fugue_dask.dataframe import DaskDataFrame from fugue_dask.execution_engine import DaskExecutionEngine @@ -29,19 +32,20 @@ def _dask_as_fugue_df(df: dd.DataFrame, **kwargs: Any) -> DaskDataFrame: return DaskDataFrame(df, **kwargs) -def _register_engines() -> None: - register_execution_engine( - "dask", - lambda conf, **kwargs: DaskExecutionEngine(conf=conf), - on_dup="ignore", - ) - register_execution_engine( - Client, - lambda engine, conf, **kwargs: DaskExecutionEngine( - dask_client=engine, conf=conf - ), - on_dup="ignore", - ) +@parse_execution_engine.candidate( + lambda engine, conf, **kwargs: isinstance(engine, Client), + priority=4, # TODO: this is to overwrite dask-sql fugue integration +) +def _parse_dask_client(engine: Client, conf: Any, **kwargs: Any) -> DaskExecutionEngine: + return DaskExecutionEngine(dask_client=engine, conf=conf) + + +@parse_execution_engine.candidate( + lambda engine, conf, **kwargs: isinstance(engine, str) and engine == "dask", + priority=4, # TODO: this is to overwrite dask-sql fugue integration +) +def _parse_dask_str(engine: str, conf: Any, **kwargs: Any) -> DaskExecutionEngine: + return DaskExecutionEngine(conf=conf) @fugue_annotated_param(DaskExecutionEngine) @@ -62,16 +66,3 @@ def to_output_df(self, output: Any, schema: Any, ctx: Any) -> DataFrame: def count(self, df: DataFrame) -> int: # pragma: no cover raise NotImplementedError("not allowed") - - -@run_at_def -def _register() -> None: - """Register Dask Execution Engine - - .. note:: - - This function is automatically called when you do - - >>> import fugue_dask - """ - _register_engines() diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index deff66b6..0ef78256 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -286,14 +286,14 @@ def with_nat(cursor, data): e = self.engine # test with multiple key with null values o = ArrayDataFrame( - [[1, None, 1], [1, None, 0], [None, None, 1]], "a:double,b:double,c:int" + [[1, None, 1], [1, None, 0], [None, None, 2]], "a:double,b:double,c:int" ) c = e.map_engine.map_dataframe( o, select_top, o.schema, PartitionSpec(by=["a", "b"], presort="c") ) df_eq( c, - [[1, None, 0], [None, None, 1]], + [[1, None, 0], [None, None, 2]], "a:double,b:double,c:int", throw=True, ) diff --git a/requirements.txt b/requirements.txt index 5a32e091..90cf4ad6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,6 +31,7 @@ duckdb!=0.8.1 sqlalchemy==2.0.10 # 2.0.11 has a bug ray[data]>=2.5.0 # pyarrow==7.0.0 +dask-sql # publish to pypi wheel diff --git a/tests/fugue_dask/test_dataframe.py b/tests/fugue_dask/test_dataframe.py index 03aa5f40..1e630e0e 100644 --- a/tests/fugue_dask/test_dataframe.py +++ b/tests/fugue_dask/test_dataframe.py @@ -63,8 +63,8 @@ def test_init(): assert df.count() == 0 assert df.schema == "a:str,b:int" - pdf = pandas.DataFrame([["a", 1], ["b", 2]]) - raises(Exception, lambda: DaskDataFrame(pdf)) + pdf = pandas.DataFrame([["a", 1], ["b", 2]], columns=["a", "b"]) + # raises(Exception, lambda: DaskDataFrame(pdf)) df = DaskDataFrame(pdf, "a:str,b:str") assert [["a", "1"], ["b", "2"]] == df.as_pandas().values.tolist() df = DaskDataFrame(pdf, "a:str,b:int") @@ -88,7 +88,7 @@ def test_init(): assert [["1", "a"], ["2", "b"]] == ddf.as_pandas().values.tolist() assert df.native is ddf.native # no real copy happened - df = DaskDataFrame([["a", 1], ["b", "2"]], "x:str,y:double") + df = DaskDataFrame([["a", 1], ["b", 2]], "x:str,y:double") assert [["a", 1.0], ["b", 2.0]] == df.as_pandas().values.tolist() df = DaskDataFrame([], "x:str,y:double") @@ -103,7 +103,7 @@ def test_simple_methods(): assert 0 == df.count() assert not df.is_local - df = DaskDataFrame([["a", 1], ["b", "2"]], "x:str,y:double") + df = DaskDataFrame([["a", 1], ["b", 2]], "x:str,y:double") assert not df.empty assert 2 == df.count() assert ["a", 1.0] == df.peek_array() @@ -143,37 +143,37 @@ def test_as_array(): assert [[1, "a"]] == df.as_array(["b", "a"]) # prevent pandas auto type casting - df = DaskDataFrame([[1.0, 1.1]], "a:double,b:int") + df = DaskDataFrame([[1.0, 1.0]], "a:double,b:int") assert [[1.0, 1]] == df.as_array() assert isinstance(df.as_array()[0][0], float) assert isinstance(df.as_array()[0][1], int) assert [[1.0, 1]] == df.as_array(["a", "b"]) assert [[1, 1.0]] == df.as_array(["b", "a"]) - df = DaskDataFrame([[np.float64(1.0), 1.1]], "a:double,b:int") + df = DaskDataFrame([[np.float64(1.0), 1.0]], "a:double,b:int") assert [[1.0, 1]] == df.as_array() assert isinstance(df.as_array()[0][0], float) assert isinstance(df.as_array()[0][1], int) - df = DaskDataFrame([[pandas.Timestamp("2020-01-01"), 1.1]], "a:datetime,b:int") + df = DaskDataFrame([[pandas.Timestamp("2020-01-01"), 1.0]], "a:datetime,b:int") df.native["a"] = pd.to_datetime(df.native["a"]) assert [[datetime(2020, 1, 1), 1]] == df.as_array() assert isinstance(df.as_array()[0][0], datetime) assert isinstance(df.as_array()[0][1], int) - df = DaskDataFrame([[pandas.NaT, 1.1]], "a:datetime,b:int") + df = DaskDataFrame([[pandas.NaT, 1.0]], "a:datetime,b:int") df.native["a"] = pd.to_datetime(df.native["a"]) assert df.as_array()[0][0] is None assert isinstance(df.as_array()[0][1], int) - df = DaskDataFrame([[1.0, 1.1]], "a:double,b:int") + df = DaskDataFrame([[1.0, 1.0]], "a:double,b:int") assert [[1.0, 1]] == df.as_array(type_safe=True) assert isinstance(df.as_array()[0][0], float) assert isinstance(df.as_array()[0][1], int) def test_as_dict_iterable(): - df = DaskDataFrame([["2020-01-01", 1.1]], "a:datetime,b:int") + df = DaskDataFrame([["2020-01-01", 1.0]], "a:datetime,b:int") assert [dict(a=datetime(2020, 1, 1), b=1)] == list(df.as_dict_iterable()) diff --git a/tests/fugue_dask/test_execution_engine.py b/tests/fugue_dask/test_execution_engine.py index cd398a57..828ad352 100644 --- a/tests/fugue_dask/test_execution_engine.py +++ b/tests/fugue_dask/test_execution_engine.py @@ -47,6 +47,7 @@ def make_engine(self): client = Client(processes=True, n_workers=3, threads_per_worker=1) # p2p (new default algo has bugs) dask.config.set({"dataframe.shuffle.method": "tasks"}) + dask.config.set({"dataframe.convert-string": False}) e = DaskExecutionEngine(client, conf=dict(test=True, **_CONF)) return e @@ -135,6 +136,7 @@ def tearDownClass(cls): def make_engine(self): e = DaskExecutionEngine(conf=dict(test=True, **_CONF)) + dask.config.set({"dataframe.convert-string": False}) return e def test_yield_table(self): diff --git a/tests/fugue_dask/test_importless.py b/tests/fugue_dask/test_importless.py index 2dc45e69..d14c163c 100644 --- a/tests/fugue_dask/test_importless.py +++ b/tests/fugue_dask/test_importless.py @@ -6,6 +6,7 @@ def test_importless(): pytest.importorskip("fugue_sql_antlr") + pytest.importorskip("ibis") client = Client() for engine in ["dask", client]: dag = FugueWorkflow() diff --git a/tests/fugue_dask/test_io.py b/tests/fugue_dask/test_io.py index 31ebfa24..ff3ce372 100644 --- a/tests/fugue_dask/test_io.py +++ b/tests/fugue_dask/test_io.py @@ -12,14 +12,14 @@ def test_parquet_io(tmpdir): df1 = DaskDataFrame([["1", 2, 3]], "a:str,b:int,c:long") - df2 = DaskDataFrame([[[1, 2]]], "a:[int]") + df2 = DaskDataFrame([[[1, 2]]], "a:[long]") # {a:int} will become {a:long} because pyarrow lib has issue df3 = DaskDataFrame([[dict(a=1)]], "a:{a:long}") for df in [df1, df2, df3]: path = os.path.join(tmpdir, "a.parquet") save_df(df, path) actual = load_df(path) - df_eq(df, actual, throw=True) + df_eq(df, actual, throw=True, check_order=True) save_df(df1, path) actual = load_df(path, columns=["b", "a"]) diff --git a/tests/fugue_dask/test_utils.py b/tests/fugue_dask/test_utils.py index caaa5af1..cbec0991 100644 --- a/tests/fugue_dask/test_utils.py +++ b/tests/fugue_dask/test_utils.py @@ -7,7 +7,7 @@ from fugue_dask._utils import even_repartition, hash_repartition, rand_repartition -@pytest.fixture +@pytest.fixture(scope="module") def dask_client(): from dask.distributed import Client