Skip to content

Commit

Permalink
[BACKPORT] Enable running on GPU for oscar (#2284) (#2306)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng authored Aug 6, 2021
1 parent 4808c24 commit 4f52138
Show file tree
Hide file tree
Showing 51 changed files with 1,018 additions and 468 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Pre-commit Checks

on: [push, pull_request_target]
on: [push, pull_request]

jobs:
checks:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/upload-packages.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ else
cp *.whl dist/

if [[ "$UNAME" == "darwin" ]]; then
pip install delocate
pip install delocate==0.8.2
delocate-wheel dist/*.whl
delocate-addplat --rm-orig -x 10_9 -x 10_10 dist/*.whl
fi
Expand Down
1 change: 1 addition & 0 deletions docs/source/development/oscar/batch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ group requests by certain keys and resent them to different handlers in
batches. Oscar supports creating a batch version of the method:

.. code-block:: python
class ExampleActor(mo.Actor):
@mo.extensible
async def batch_method(self, a, b=None):
Expand Down
23 changes: 23 additions & 0 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,23 @@ def _new_integrated_test_session(_stop_isolation):
sess.stop_server(isolation=False)


@pytest.fixture(scope='module')
def _new_gpu_test_session(_stop_isolation): # pragma: no cover
from .deploy.oscar.tests.session import new_test_session
from .resource import cuda_count

cuda_devices = list(range(min(cuda_count(), 2)))

sess = new_test_session(address='127.0.0.1',
init_local=True, n_worker=1, n_cpu=1, cuda_devices=cuda_devices,
default=True, timeout=300)
with option_context({'show_progress': False}):
try:
yield sess
finally:
sess.stop_server(isolation=False)


@pytest.fixture
def setup(_new_test_session):
_new_test_session.as_default()
Expand All @@ -117,3 +134,9 @@ def setup(_new_test_session):
def setup_cluster(_new_integrated_test_session):
_new_integrated_test_session.as_default()
yield _new_integrated_test_session


@pytest.fixture
def setup_gpu(_new_gpu_test_session): # pragma: no cover
_new_gpu_test_session.as_default()
yield _new_test_session
7 changes: 5 additions & 2 deletions mars/core/operand/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class SchedulingHint(Serializable):
# need to be executed not later than the later ones,
# because the range index of later chunk should be accumulated from
# indexes of previous ones
# `gpu` indicates that if the operand should be executed on the GPU.
gpu = BoolField('gpu', default=None)
priority = Int32Field('priority', default=0)

@classproperty
Expand Down Expand Up @@ -108,7 +110,6 @@ class Operand(Base, metaclass=OperandMetaclass):
"""
Operand base class. All operands should have a type, which can be Add, Subtract etc.
`sparse` indicates that if the operand is applied on a sparse tensor/chunk.
`gpu` indicates that if the operand should be executed on the GPU.
`device`, 0 means the CPU, otherwise means the GPU device.
Operand can have inputs and outputs
which should be the :class:`mars.tensor.core.TensorData`, :class:`mars.tensor.core.ChunkData` etc.
Expand All @@ -119,7 +120,6 @@ class Operand(Base, metaclass=OperandMetaclass):
_output_type_ = None

sparse = BoolField('sparse', default=False)
gpu = BoolField('gpu', default=None)
device = Int32Field('device', default=None)
# will this operand create a view of input data or not
create_view = BoolField('create_view', default=False)
Expand Down Expand Up @@ -250,6 +250,9 @@ def _get_output_type(self, output_idx):
def copy(self: OperandType) -> OperandType:
new_op = super().copy()
new_op.outputs = []
# copy scheduling_hint
new_op.scheduling_hint = SchedulingHint(**{field: getattr(self.scheduling_hint, field)
for field in SchedulingHint.all_hint_names})
new_op.extra_params = deepcopy(self.extra_params)
return new_op

Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


@require_cudf
def test_to_gpu_execution(setup):
def test_to_gpu_execution(setup_gpu):
pdf = pd.DataFrame(np.random.rand(20, 30), index=np.arange(20, 0, -1))
df = from_pandas_df(pdf, chunk_size=(13, 21))
cdf = to_gpu(df)
Expand All @@ -57,7 +57,7 @@ def test_to_gpu_execution(setup):


@require_cudf
def test_to_cpu_execution(setup):
def test_to_cpu_execution(setup_gpu):
pdf = pd.DataFrame(np.random.rand(20, 30), index=np.arange(20, 0, -1))
df = from_pandas_df(pdf, chunk_size=(13, 21))
cdf = to_gpu(df)
Expand Down
1 change: 0 additions & 1 deletion mars/dataframe/datasource/read_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ def execute(cls, ctx, op):
df = df[op.usecols]
else:
df = cls._cudf_read_csv(op) if op.gpu else cls._pandas_read_csv(f, op)

ctx[out_df.key] = df

def estimate_size(cls, ctx, op):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def test_read_csv_use_arrow_dtype(setup):


@require_cudf
def test_read_csvgpu_execution(setup):
def test_read_csv_gpu_execution(setup_gpu):
with tempfile.TemporaryDirectory() as tempdir:
file_path = os.path.join(tempdir, 'test.csv')

Expand Down
9 changes: 7 additions & 2 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from ..reduction.core import ReductionCompiler, ReductionSteps, ReductionPreStep, \
ReductionAggStep, ReductionPostStep
from ..reduction.aggregation import is_funcs_aggregate, normalize_reduction_funcs
from ..utils import parse_index, build_concatenated_rows_frame
from ..utils import parse_index, build_concatenated_rows_frame, is_cudf
from .core import DataFrameGroupByOperand

cp = lazy_import('cupy', globals=globals(), rename='cp')
Expand Down Expand Up @@ -84,7 +84,10 @@ def _patch_groupby_kurt():
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy
if not hasattr(DataFrameGroupBy, 'kurt'): # pragma: no branch
def _kurt_by_frame(a, *args, **kwargs):
return a.to_frame().kurt(*args, **kwargs).iloc[0]
data = a.to_frame().kurt(*args, **kwargs).iloc[0]
if is_cudf(data): # pragma: no cover
data = data.copy()
return data

def _group_kurt(x, *args, **kwargs):
if kwargs.get('numeric_only') is not None:
Expand Down Expand Up @@ -802,6 +805,8 @@ def _execute_agg(cls, ctx, op: "DataFrameGroupByAgg"):
result = xdf.concat(aggs)
if result.ndim == 2:
result = result.iloc[:, 0]
if is_cudf(result): # pragma: no cover
result = result.copy()
result.name = out_chunk.name

ctx[out_chunk.key] = result
Expand Down
11 changes: 9 additions & 2 deletions mars/dataframe/groupby/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@
from ...core.operand import OperandStage, MapReduceOperand
from ...lib.groupby_wrapper import wrapped_groupby
from ...serialization.serializables import BoolField, Int32Field, AnyField
from ...utils import lazy_import
from ..align import align_dataframe_series, align_series_series
from ..initializer import Series as asseries
from ..core import SERIES_TYPE, SERIES_CHUNK_TYPE
from ..utils import build_concatenated_rows_frame, hash_dataframe_on, \
build_df, build_series, parse_index
build_df, build_series, parse_index, is_cudf
from ..operands import DataFrameOperandMixin, DataFrameShuffleProxy


cudf = lazy_import('cudf', globals=globals())


class DataFrameGroupByOperand(MapReduceOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.GROUPBY

Expand Down Expand Up @@ -302,6 +306,8 @@ def _take_index(src, f):
result = src.iloc[f]
if src.index.names:
result.index.names = src.index.names
if is_cudf(result): # pragma: no cover
result = result.copy()
return result

for index_idx, index_filter in enumerate(filters):
Expand Down Expand Up @@ -330,6 +336,7 @@ def _take_index(src, f):

@classmethod
def execute_reduce(cls, ctx, op: "DataFrameGroupByOperand"):
xdf = cudf if op.gpu else pd
chunk = op.outputs[0]
input_idx_to_df = dict(op.iter_mapper_data_with_index(ctx))
row_idxes = sorted(input_idx_to_df.keys())
Expand All @@ -347,7 +354,7 @@ def execute_reduce(cls, ctx, op: "DataFrameGroupByOperand"):
part_len = len(res[0])
part_len -= 1 if not deliver_by else 2
for n in range(part_len):
r.append(pd.concat([it[n] for it in res], axis=0))
r.append(xdf.concat([it[n] for it in res], axis=0))
r = tuple(r)

if deliver_by:
Expand Down
2 changes: 1 addition & 1 deletion mars/dataframe/groupby/tests/test_groupby_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def test_groupby_agg_str_cat(setup):


@require_cudf
def test_gpu_groupby_agg(setup):
def test_gpu_groupby_agg(setup_gpu):
rs = np.random.RandomState(0)
df1 = pd.DataFrame({'a': rs.choice([2, 3, 4], size=(100,)),
'b': rs.choice([2, 3, 4], size=(100,))})
Expand Down
4 changes: 3 additions & 1 deletion mars/dataframe/indexing/iloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ...tensor.indexing.core import calc_shape
from ...utils import ceildiv
from ..operands import DataFrameOperand, DataFrameOperandMixin, DATAFRAME_TYPE
from ..utils import indexing_index_value
from ..utils import indexing_index_value, is_cudf
from .index_lib import DataFrameIlocIndexesHandler


Expand Down Expand Up @@ -358,6 +358,8 @@ def execute(cls, ctx, op):
r = df.iloc[indexes]
if isinstance(r, pd.Series) and r.dtype != chunk.dtype:
r = r.astype(chunk.dtype)
if is_cudf(r): # pragma: no cover
r = r.copy()
ctx[chunk.key] = r


Expand Down
2 changes: 1 addition & 1 deletion mars/dataframe/reduction/tests/test_reduction_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def compute(data, **kwargs):

@require_cudf
@require_cupy
def test_gpu_execution(setup, check_ref_counts):
def test_gpu_execution(setup_gpu, check_ref_counts):
df_raw = pd.DataFrame(np.random.rand(30, 3), columns=list('abc'))
df = to_gpu(md.DataFrame(df_raw, chunk_size=6))

Expand Down
4 changes: 3 additions & 1 deletion mars/dataframe/sort/psrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ...serialization.serializables import Int32Field, ListField, StringField, BoolField
from ...tensor.base.psrs import PSRSOperandMixin
from ..core import IndexValue, OutputType
from ..utils import standardize_range_index, parse_index
from ..utils import standardize_range_index, parse_index, is_cudf
from ..operands import DataFrameOperandMixin, DataFrameOperand, \
DataFrameShuffleProxy

Expand Down Expand Up @@ -544,6 +544,8 @@ def _execute_dataframe_map(cls, ctx, op):
poses = (None,) + tuple(poses) + (None,)
for i in range(op.n_partition):
values = a.iloc[poses[i]: poses[i + 1]]
if is_cudf(values): # pragma: no cover
values = values.copy()
ctx[out.key, (i,)] = values

@classmethod
Expand Down
7 changes: 4 additions & 3 deletions mars/dataframe/sort/tests/test_sort_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,22 +329,23 @@ def test_arrow_string_sort_values(setup):


@require_cudf
def test_gpu_execution(setup):
def test_gpu_execution(setup_gpu):
# test sort_values
rs = np.random.RandomState(0)
distinct_opts = ['0'] if sys.platform.lower().startswith('win') else ['0', '1']
for add_distinct in distinct_opts:
os.environ['PSRS_DISTINCT_COL'] = add_distinct

# test dataframe
raw = pd.DataFrame(np.random.rand(100, 10), columns=['a' + str(i) for i in range(10)])
raw = pd.DataFrame(rs.rand(100, 10), columns=['a' + str(i) for i in range(10)])
mdf = DataFrame(raw, chunk_size=30).to_gpu()

result = mdf.sort_values(by='a0').execute().fetch()
expected = raw.sort_values(by='a0')
pd.testing.assert_frame_equal(result.to_pandas(), expected)

# test series
raw = pd.Series(np.random.rand(10))
raw = pd.Series(rs.rand(10))
series = Series(raw).to_gpu()

result = series.sort_values().execute().fetch()
Expand Down
45 changes: 44 additions & 1 deletion mars/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
from ..core import Entity, ExecutableTuple
from ..lib.mmh3 import hash as mmh_hash
from ..tensor.utils import dictify_chunk_size, normalize_chunk_sizes
from ..utils import tokenize, sbytes
from ..utils import tokenize, sbytes, lazy_import

cudf = lazy_import('cudf', globals=globals(), rename='cudf')


def hash_index(index, size):
Expand All @@ -48,6 +50,8 @@ def hash_dataframe_on(df, on, size, level=None):
idx = df.index
if level is not None:
idx = idx.to_frame(False)[level]
if cudf and isinstance(idx, cudf.Index): # pragma: no cover
idx = idx.to_pandas()
hashed_label = pd.util.hash_pandas_object(idx, categorize=False)
elif callable(on):
# todo optimization can be added, if ``on`` is a numpy ufunc or sth can be vectorized
Expand Down Expand Up @@ -292,6 +296,10 @@ def _serialize_multi_index(index):
_max_val_close=True,
_key=key or tokenize(*args),
))
if hasattr(index_value, 'to_pandas'): # pragma: no cover
# convert cudf.Index to pandas
index_value = index_value.to_pandas()

if isinstance(index_value, pd.RangeIndex):
return IndexValue(_index_value=_serialize_range_index(index_value))
elif isinstance(index_value, pd.MultiIndex):
Expand Down Expand Up @@ -1116,3 +1124,38 @@ def make_dtypes(dtypes):
else:
dtypes = pd.Series(dtypes)
return dtypes.apply(make_dtype)


def is_dataframe(x):
if cudf is not None: # pragma: no cover
if isinstance(x, cudf.DataFrame):
return True
return isinstance(x, pd.DataFrame)


def is_series(x):
if cudf is not None: # pragma: no cover
if isinstance(x, cudf.Series):
return True
return isinstance(x, pd.Series)


def is_index(x):
if cudf is not None: # pragma: no cover
if isinstance(x, cudf.Index):
return True
return isinstance(x, pd.Index)


def get_xdf(x):
if cudf is not None: # pragma: no cover
if isinstance(x, (cudf.DataFrame, cudf.Series, cudf.Index)):
return cudf
return pd


def is_cudf(x):
if cudf is not None: # pragma: no cover
if isinstance(x, (cudf.DataFrame, cudf.Series, cudf.Index)):
return True
return False
Loading

0 comments on commit 4f52138

Please sign in to comment.