Skip to content

Commit

Permalink
Add arguments to enable cuDF spilling and set statistics (#1362)
Browse files Browse the repository at this point in the history
Add arguments to enable cuDF spilling and set statistics in `dask cuda worker`/`LocalCUDACluster`. This is implemented as a Dask plugin, and does not require users anymore to rely on `client.run` to do that.

Closes #1280

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #1362
  • Loading branch information
pentschev authored Jul 24, 2024
1 parent fa226b1 commit d6cafc1
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 2 deletions.
18 changes: 18 additions & 0 deletions dask_cuda/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ def cuda():
total device memory), string (like ``"5GB"`` or ``"5000M"``), or ``"auto"`` or 0 to
disable spilling to host (i.e. allow full device memory usage).""",
)
@click.option(
"--enable-cudf-spill/--disable-cudf-spill",
default=False,
show_default=True,
help="""Enable automatic cuDF spilling. WARNING: This should NOT be used with
JIT-Unspill.""",
)
@click.option(
"--cudf-spill-stats",
type=int,
default=0,
help="""Set the cuDF spilling statistics level. This option has no effect if
`--enable-cudf-spill` is not specified.""",
)
@click.option(
"--rmm-pool-size",
default=None,
Expand Down Expand Up @@ -330,6 +344,8 @@ def worker(
name,
memory_limit,
device_memory_limit,
enable_cudf_spill,
cudf_spill_stats,
rmm_pool_size,
rmm_maximum_pool_size,
rmm_managed_memory,
Expand Down Expand Up @@ -402,6 +418,8 @@ def worker(
name,
memory_limit,
device_memory_limit,
enable_cudf_spill,
cudf_spill_stats,
rmm_pool_size,
rmm_maximum_pool_size,
rmm_managed_memory,
Expand Down
11 changes: 10 additions & 1 deletion dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from .device_host_file import DeviceHostFile
from .initialize import initialize
from .plugins import CPUAffinity, PreImport, RMMSetup
from .plugins import CPUAffinity, CUDFSetup, PreImport, RMMSetup
from .proxify_host_file import ProxifyHostFile
from .utils import (
cuda_visible_devices,
Expand All @@ -41,6 +41,8 @@ def __init__(
name=None,
memory_limit="auto",
device_memory_limit="auto",
enable_cudf_spill=False,
cudf_spill_stats=0,
rmm_pool_size=None,
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
Expand Down Expand Up @@ -166,6 +168,12 @@ def del_pid_file():
if device_memory_limit is None and memory_limit is None:
data = lambda _: {}
elif jit_unspill:
if enable_cudf_spill:
warnings.warn(
"Enabling cuDF spilling and JIT-Unspill together is not "
"safe, consider disabling JIT-Unspill."
)

data = lambda i: (
ProxifyHostFile,
{
Expand Down Expand Up @@ -217,6 +225,7 @@ def del_pid_file():
track_allocations=rmm_track_allocations,
),
PreImport(pre_import),
CUDFSetup(spill=enable_cudf_spill, spill_stats=cudf_spill_stats),
},
name=name if nprocs == 1 or name is None else str(name) + "-" + str(i),
local_directory=local_directory,
Expand Down
21 changes: 20 additions & 1 deletion dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .device_host_file import DeviceHostFile
from .initialize import initialize
from .plugins import CPUAffinity, PreImport, RMMSetup
from .plugins import CPUAffinity, CUDFSetup, PreImport, RMMSetup
from .proxify_host_file import ProxifyHostFile
from .utils import (
cuda_visible_devices,
Expand Down Expand Up @@ -73,6 +73,14 @@ class LocalCUDACluster(LocalCluster):
starts spilling to host memory. Can be an integer (bytes), float (fraction of
total device memory), string (like ``"5GB"`` or ``"5000M"``), or ``"auto"``, 0,
or ``None`` to disable spilling to host (i.e. allow full device memory usage).
enable_cudf_spill : bool, default False
Enable automatic cuDF spilling.
.. warning::
This should NOT be used together with JIT-Unspill.
cudf_spill_stats : int, default 0
Set the cuDF spilling statistics level. This option has no effect if
``enable_cudf_spill=False``.
local_directory : str or None, default None
Path on local machine to store temporary files. Can be a string (like
``"path/to/files"``) or ``None`` to fall back on the value of
Expand Down Expand Up @@ -209,6 +217,8 @@ def __init__(
threads_per_worker=1,
memory_limit="auto",
device_memory_limit=0.8,
enable_cudf_spill=False,
cudf_spill_stats=0,
data=None,
local_directory=None,
shared_filesystem=None,
Expand Down Expand Up @@ -259,6 +269,8 @@ def __init__(
self.device_memory_limit = parse_device_memory_limit(
device_memory_limit, device_index=nvml_device_index(0, CUDA_VISIBLE_DEVICES)
)
self.enable_cudf_spill = enable_cudf_spill
self.cudf_spill_stats = cudf_spill_stats

self.rmm_pool_size = rmm_pool_size
self.rmm_maximum_pool_size = rmm_maximum_pool_size
Expand Down Expand Up @@ -302,6 +314,12 @@ def __init__(
if device_memory_limit is None and memory_limit is None:
data = {}
elif jit_unspill:
if enable_cudf_spill:
warnings.warn(
"Enabling cuDF spilling and JIT-Unspill together is not "
"safe, consider disabling JIT-Unspill."
)

data = (
ProxifyHostFile,
{
Expand Down Expand Up @@ -414,6 +432,7 @@ def new_worker_spec(self):
track_allocations=self.rmm_track_allocations,
),
PreImport(self.pre_import),
CUDFSetup(self.enable_cudf_spill, self.cudf_spill_stats),
},
}
)
Expand Down
15 changes: 15 additions & 0 deletions dask_cuda/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ def setup(self, worker=None):
os.sched_setaffinity(0, self.cores)


class CUDFSetup(WorkerPlugin):
def __init__(self, spill, spill_stats):
self.spill = spill
self.spill_stats = spill_stats

def setup(self, worker=None):
try:
import cudf

cudf.set_option("spill", self.spill)
cudf.set_option("spill_stats", self.spill_stats)
except ImportError:
pass


class RMMSetup(WorkerPlugin):
def __init__(
self,
Expand Down
58 changes: 58 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,64 @@ def test_rmm_logging(loop): # noqa: F811
assert v is rmm.mr.LoggingResourceAdaptor


def test_cudf_spill_disabled(loop): # noqa: F811
cudf = pytest.importorskip("cudf")
with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask",
"cuda",
"worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--no-dashboard",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

cudf_spill = client.run(
cudf.get_option,
"spill",
)
for v in cudf_spill.values():
assert v is False

cudf_spill_stats = client.run(cudf.get_option, "spill_stats")
for v in cudf_spill_stats.values():
assert v == 0


def test_cudf_spill(loop): # noqa: F811
cudf = pytest.importorskip("cudf")
with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask",
"cuda",
"worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--no-dashboard",
"--enable-cudf-spill",
"--cudf-spill-stats",
"2",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

cudf_spill = client.run(cudf.get_option, "spill")
for v in cudf_spill.values():
assert v is True

cudf_spill_stats = client.run(cudf.get_option, "spill_stats")
for v in cudf_spill_stats.values():
assert v == 2


@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"})
def test_dashboard_address(loop): # noqa: F811
with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
Expand Down
48 changes: 48 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,54 @@ async def test_worker_fraction_limits():
)


@gen_test(timeout=20)
async def test_cudf_spill_disabled():
cudf = pytest.importorskip("cudf")

async with LocalCUDACluster(
asynchronous=True,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
cudf_spill = await client.run(
cudf.get_option,
"spill",
)
for v in cudf_spill.values():
assert v is False

cudf_spill_stats = await client.run(
cudf.get_option,
"spill_stats",
)
for v in cudf_spill_stats.values():
assert v == 0


@gen_test(timeout=20)
async def test_cudf_spill():
cudf = pytest.importorskip("cudf")

async with LocalCUDACluster(
enable_cudf_spill=True,
cudf_spill_stats=2,
asynchronous=True,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
cudf_spill = await client.run(
cudf.get_option,
"spill",
)
for v in cudf_spill.values():
assert v is True

cudf_spill_stats = await client.run(
cudf.get_option,
"spill_stats",
)
for v in cudf_spill_stats.values():
assert v == 2


@pytest.mark.parametrize(
"protocol",
["ucx", "ucxx"],
Expand Down

0 comments on commit d6cafc1

Please sign in to comment.