Skip to content

Commit

Permalink
Integrate with dask>=2023.7.1
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan committed Aug 10, 2023
1 parent dbeeb1b commit c1ba28c
Show file tree
Hide file tree
Showing 17 changed files with 223 additions and 141 deletions.
49 changes: 27 additions & 22 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
{
"name": "Fugue Development Environment",
"image": "fugueproject/devenv:0.7.7",
"settings": {
"terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8",
"python.formatting.blackPath": "/usr/local/py-utils/bin/black",
"python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf",
"python.linting.banditPath": "/usr/local/py-utils/bin/bandit",
"python.linting.flake8Path": "/usr/local/py-utils/bin/flake8",
"python.linting.mypyPath": "/usr/local/py-utils/bin/mypy",
"python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle",
"python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle",
"python.linting.pylintPath": "/usr/local/py-utils/bin/pylint"
},
"extensions": [
"ms-python.python",
"ms-python.isort",
"GitHub.copilot",
"njpwerner.autodocstring"
],
"_image": "fugueproject/devenv:0.7.7",
"image": "mcr.microsoft.com/vscode/devcontainers/python:3.10",
"customizations": {
"vscode": {
"settings": {
"terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8",
"python.formatting.blackPath": "/usr/local/py-utils/bin/black",
"python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf",
"python.linting.banditPath": "/usr/local/py-utils/bin/bandit",
"python.linting.flake8Path": "/usr/local/py-utils/bin/flake8",
"python.linting.mypyPath": "/usr/local/py-utils/bin/mypy",
"python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle",
"python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle",
"python.linting.pylintPath": "/usr/local/py-utils/bin/pylint"
},
"extensions": [
"ms-python.python",
"ms-python.isort",
"GitHub.copilot",
"njpwerner.autodocstring"
]
}},

"forwardPorts": [
8888
],
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[MESSAGES CONTROL]
disable = C0103,C0114,C0115,C0116,C0122,C0200,C0201,C0302,C0411,C0415,E0401,E0712,E1130,E5110,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R0915,R1705,R1710,R1714,R1718,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0237,W0511,W0613,W0622,W0631,W0640,W0703,W0707,W1116
disable = C0103,C0114,C0115,C0116,C0122,C0200,C0201,C0302,C0411,C0415,E0401,E0712,E1130,E1136,E5110,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R0915,R1705,R1710,R1714,R1718,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0237,W0511,W0613,W0622,W0631,W0640,W0703,W0707,W1116
# TODO: R0205: inherits from object, can be safely removed
7 changes: 6 additions & 1 deletion fugue/dataframe/arrow_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@ def count(self) -> int:
return self.native.shape[0]

def as_pandas(self) -> pd.DataFrame:
return self.native.to_pandas(use_threads=False, date_as_object=False)
mapper: Any = None
if hasattr(pd, "ArrowDtype"):
mapper = pd.ArrowDtype
return self.native.to_pandas(
use_threads=False, date_as_object=False, types_mapper=mapper
)

def head(
self, n: int, columns: Optional[List[str]] = None
Expand Down
20 changes: 11 additions & 9 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from triad.collections.schema import Schema
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.pandas_like import PD_UTILS

from .._utils.display import PrettyTable
from ..collections.yielded import Yielded
Expand Down Expand Up @@ -113,14 +112,17 @@ def peek_dict(self) -> Dict[str, Any]:
def as_pandas(self) -> pd.DataFrame:
"""Convert to pandas DataFrame"""
pdf = pd.DataFrame(self.as_array(), columns=self.columns)
if len(pdf) == 0: # TODO: move to triad
return pd.DataFrame(
{
k: pd.Series(dtype=v.type.to_pandas_dtype())
for k, v in self.schema.items()
}
)
return PD_UTILS.enforce_type(pdf, self.schema.pa_schema, null_safe=True)
return pdf.astype(
self.schema.to_pandas_dtype(use_extension_types=True, use_arrow_dtype=True)
)
# if len(pdf) == 0: # TODO: move to triad
# return pd.DataFrame(
# {
# k: pd.Series(dtype=v.type.to_pandas_dtype())
# for k, v in self.schema.items()
# }
# )
# return PD_UTILS.enforce_type(pdf, self.schema.pa_schema, null_safe=True)

def as_arrow(self, type_safe: bool = False) -> pa.Table:
"""Convert to pyArrow DataFrame"""
Expand Down
3 changes: 3 additions & 0 deletions fugue_dask/_constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Any, Dict
import pandas as pd
import dask

FUGUE_DASK_CONF_DEFAULT_PARTITIONS = "fugue.dask.default.partitions"
FUGUE_DASK_DEFAULT_CONF: Dict[str, Any] = {FUGUE_DASK_CONF_DEFAULT_PARTITIONS: -1}
FUGUE_DASK_USE_ARROW = hasattr(pd, "ArrowDtype") and dask.__version__ >= "2023.7.1"
30 changes: 24 additions & 6 deletions fugue_dask/_io.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import pyarrow as pa
import fs as pfs
from dask import dataframe as dd
from fugue._utils.io import FileParser, _get_single_files
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw

import pandas as pd
from fugue_dask.dataframe import DaskDataFrame


Expand Down Expand Up @@ -58,22 +58,40 @@ def save_df(


def _save_parquet(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:
df.native.to_parquet(p.uri, **{"schema": df.schema.pa_schema, **kwargs})
params = {
"schema": df.schema.pa_schema,
"write_index": False,
**kwargs,
}
if int(pa.__version__.split(".")[0]) >= 11:
has_nested = any(pa.types.is_nested(f.type) for f in df.schema.fields)
params["store_schema"] = not has_nested
df.native.to_parquet(p.uri, **params)


def _load_parquet(
p: FileParser, columns: Any = None, **kwargs: Any
) -> Tuple[dd.DataFrame, Any]:
params = dict(kwargs)
dtype_backend: Any = params.pop("dtype_backend", None)
params["engine"] = "pyarrow"
params["arrow_to_pandas"] = {"ignore_metadata": True}
if pd.__version__ >= "1.5":
dtype_backend = "pyarrow"
if columns is None:
pdf = dd.read_parquet(p.uri, **kwargs)
pdf = dd.read_parquet(p.uri, dtype_backend=dtype_backend, **params)
schema = Schema(pdf.head(1))
return pdf, schema
if isinstance(columns, list): # column names
pdf = dd.read_parquet(p.uri, columns=columns, **kwargs)
pdf = dd.read_parquet(
p.uri, columns=columns, dtype_backend=dtype_backend, **params
)
schema = Schema(pdf.head(1))
return pdf, schema
schema = Schema(columns)
pdf = dd.read_parquet(p.uri, columns=schema.names, **kwargs)
pdf = dd.read_parquet(
p.uri, columns=schema.names, dtype_backend=dtype_backend, **params
)
return pdf, schema


Expand Down
20 changes: 8 additions & 12 deletions fugue_dask/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from dask.dataframe.core import DataFrame
from dask.distributed import Client, get_client
from triad.utils.pandas_like import PandasLikeUtils
from triad.utils.pyarrow import to_pandas_dtype, to_single_pandas_dtype
from triad.utils.pyarrow import to_pandas_dtype

import fugue.api as fa
from fugue.constants import FUGUE_CONF_DEFAULT_PARTITIONS

from ._constants import FUGUE_DASK_CONF_DEFAULT_PARTITIONS
from ._constants import FUGUE_DASK_CONF_DEFAULT_PARTITIONS, FUGUE_DASK_USE_ARROW

_FUGUE_DASK_TEMP_IDX_COLUMN = "_fugue_dask_temp_index"

Expand Down Expand Up @@ -241,13 +241,9 @@ def safe_to_pandas_dtype(self, schema: pa.Schema) -> Dict[str, np.dtype]:
This is a temporary solution, it will be removed when we use the Slide
package. Do not use this function directly.
"""
res: Dict[str, np.dtype] = {}
for f in schema:
if pa.types.is_nested(f.type):
res[f.name] = np.dtype(object)
else:
res[f.name] = to_single_pandas_dtype(f.type, use_extension_types=False)
return res
return to_pandas_dtype(
schema, use_extension_types=True, use_arrow_dtype=FUGUE_DASK_USE_ARROW
)

# TODO: merge this back to base class
def enforce_type( # noqa: C901
Expand Down Expand Up @@ -281,10 +277,10 @@ def enforce_type( # noqa: C901
try:
s = s.str.lower() == "true"
except AttributeError:
s = s.fillna(0).astype(bool)
s = s.fillna(0).astype(pd.BooleanDtype())
else:
s = s.fillna(0).astype(bool)
s = s.mask(ns, None).astype("boolean")
s = s.fillna(0).astype(pd.BooleanDtype())
s = s.mask(ns, None).astype(pd.BooleanDtype())
elif pa.types.is_integer(v.type) and not pd.api.types.is_integer_dtype(
s.dtype
):
Expand Down
46 changes: 30 additions & 16 deletions fugue_dask/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple

import dask.dataframe as dd
import pandas
import pyarrow as pa
import pandas as pd
from triad.collections.schema import Schema
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.pyarrow import to_pandas_dtype
Expand Down Expand Up @@ -31,6 +31,7 @@
)

from ._utils import DASK_UTILS, get_default_partitions
from ._constants import FUGUE_DASK_USE_ARROW


class DaskDataFrame(DataFrame):
Expand Down Expand Up @@ -71,15 +72,20 @@ def __init__( # noqa: C901
df = df.to_frame()
pdf = df
schema = None if schema is None else _input_schema(schema)
elif isinstance(df, (pandas.DataFrame, pandas.Series)):
if isinstance(df, pandas.Series):
elif isinstance(df, (pd.DataFrame, pd.Series)):
if isinstance(df, pd.Series):
df = df.to_frame()
pdf = dd.from_pandas(df, npartitions=num_partitions, sort=False)
schema = None if schema is None else _input_schema(schema)
elif isinstance(df, Iterable):
schema = _input_schema(schema).assert_not_empty()
t = PandasDataFrame(df, schema)
pdf = dd.from_pandas(t.native, npartitions=num_partitions, sort=False)
tdf = t.native.astype(
schema.to_pandas_dtype(
use_extension_types=True, use_arrow_dtype=FUGUE_DASK_USE_ARROW
)
)
pdf = dd.from_pandas(tdf, npartitions=num_partitions, sort=False)
type_safe = False
else:
raise ValueError(f"{df} is incompatible with DaskDataFrame")
Expand Down Expand Up @@ -137,7 +143,7 @@ def persist(self, **kwargs: Any) -> "DaskDataFrame":
def count(self) -> int:
return self.native.shape[0].compute()

def as_pandas(self) -> pandas.DataFrame:
def as_pandas(self) -> pd.DataFrame:
return self.native.compute().reset_index(drop=True)

def rename(self, columns: Dict[str, str]) -> DataFrame:
Expand All @@ -153,7 +159,7 @@ def alter_columns(self, columns: Any) -> DataFrame:
if new_schema == self.schema:
return self
new_pdf = self.native.assign()
pd_types = to_pandas_dtype(new_schema.pa_schema)
pd_types = to_pandas_dtype(new_schema.pa_schema, use_extension_types=True)
for k, v in new_schema.items():
if not v.type.equals(self.schema[k].type):
old_type = self.schema[k].type
Expand All @@ -163,33 +169,35 @@ def alter_columns(self, columns: Any) -> DataFrame:
series = new_pdf[k]
ns = series.isnull()
series = series.fillna(0).astype(int).astype(str)
new_pdf[k] = series.mask(ns, None)
new_pdf[k] = series.mask(ns, pd.NA)
# bool -> str
elif pa.types.is_boolean(old_type) and pa.types.is_string(new_type):
series = new_pdf[k]
ns = series.isnull()
positive = series != 0
new_pdf[k] = "False"
new_pdf[k] = new_pdf[k].mask(positive, "True").mask(ns, None)
new_pdf[k] = series.astype(str).mask(ns, pd.NA)
# str -> bool
elif pa.types.is_string(old_type) and pa.types.is_boolean(new_type):
series = new_pdf[k]
ns = series.isnull()
new_pdf[k] = (
series.fillna("true")
.apply(lambda x: None if x is None else x.lower())
(series.fillna("true").str.lower() == "true")
.mask(ns, None)
.astype("boolean")
)
elif pa.types.is_integer(new_type):
series = new_pdf[k]
ns = series.isnull()
series = series.fillna(0).astype(pd_types[k])
new_pdf[k] = series.mask(ns, None)
if pa.types.is_string(old_type):
series = series.fillna("0")
else:
series = series.fillna(0)
series = series.astype(pd_types[k])
new_pdf[k] = series.mask(ns, pd.NA)
else:
series = new_pdf[k]
ns = series.isnull()
series = series.astype(pd_types[k])
new_pdf[k] = series.mask(ns, None)
new_pdf[k] = series.mask(ns, pd.NA)
return DaskDataFrame(new_pdf, new_schema, type_safe=True)

def as_array(
Expand Down Expand Up @@ -221,6 +229,12 @@ def _apply_schema(
assert_arg_not_none(pdf, "pdf")
assert_arg_not_none(schema, "schema")
return pdf, schema
if schema is not None:
pdf = pdf.astype(
schema.to_pandas_dtype(
use_extension_types=True, use_arrow_dtype=FUGUE_DASK_USE_ARROW
)
)
DASK_UTILS.ensure_compatible(pdf)
if pdf.columns.dtype == "object": # pdf has named schema
pschema = Schema(DASK_UTILS.to_schema(pdf))
Expand Down Expand Up @@ -315,7 +329,7 @@ def _dd_head(
n: int,
columns: Optional[List[str]] = None,
as_fugue: bool = False,
) -> pandas.DataFrame:
) -> pd.DataFrame:
if columns is not None:
df = df[columns]
res = df.head(n, compute=True, npartitions=-1)
Expand Down
Loading

0 comments on commit c1ba28c

Please sign in to comment.