From 91bc9807ac01ec22c5f8c2b60c397e58c7d520cf Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 27 Sep 2023 11:20:25 +0200 Subject: [PATCH 01/32] Generalizing starting coroutines in CLI --- dask_mpi/cli.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 039dd10..1538d4b 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -23,6 +23,12 @@ type=int, help="Specify scheduler port number. Defaults to random.", ) +@click.option( + "--scheduler-rank", + default=0, + type=int, + help="The MPI rank on which the scheduler will launch. Defaults to 0.", +) @click.option( "--interface", type=str, default=None, help="Network interface like 'eth0' or 'ib0'" ) @@ -56,6 +62,14 @@ default=True, help="Start workers in nanny process for management (deprecated use --worker-class instead)", ) +@click.option( + "--exclusive-workers", + default=True, + help=( + "Whether to force workers to run on unoccupied MPI ranks. If false, " + "then a worker will be launched on the same rank as the scheduler." + ), +) @click.option( "--worker-class", type=str, @@ -83,6 +97,7 @@ def main( scheduler_address, scheduler_file, + scheduler_rank, interface, nthreads, local_directory, @@ -90,6 +105,7 @@ def main( scheduler, dashboard_address, nanny, + exclusive_workers, worker_class, worker_options, scheduler_port, @@ -112,9 +128,9 @@ def main( except TypeError: worker_options = {} - if rank == 0 and scheduler: + if rank == scheduler_rank and scheduler: - async def run_scheduler(): + async def run_func(): async with Scheduler( interface=interface, protocol=protocol, @@ -125,18 +141,16 @@ async def run_scheduler(): comm.Barrier() await s.finished() - asyncio.get_event_loop().run_until_complete(run_scheduler()) - else: - comm.Barrier() - async def run_worker(): + async def run_func(): + comm.Barrier() + WorkerType = import_term(worker_class) if not nanny: raise DeprecationWarning( "Option --no-nanny is deprectaed, use --worker-class instead" ) - WorkerType = Worker opts = { "interface": interface, "protocol": protocol, @@ -149,10 +163,11 @@ async def run_worker(): } if scheduler_address: opts["scheduler_ip"] = scheduler_address + async with WorkerType(**opts) as worker: await worker.finished() - asyncio.get_event_loop().run_until_complete(run_worker()) + asyncio.get_event_loop().run_until_complete(run_func()) if __name__ == "__main__": From a449b7c27c72420dcb320538a4a7e50c8fe67970 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 27 Sep 2023 11:39:13 +0200 Subject: [PATCH 02/32] Ignore scratch space --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 5fc18e6..3227925 100644 --- a/.gitignore +++ b/.gitignore @@ -115,6 +115,7 @@ global.lock purge.lock /temp/ /dask-worker-space/ +/dask-scratch-space/ # VSCode files .vscode/ From fcee51e57c0a9c582e64776b7c1a81233ff6bdac Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 27 Sep 2023 13:27:25 +0200 Subject: [PATCH 03/32] scheduler_rank and exclusive_workers options with tests --- dask_mpi/cli.py | 79 ++++++++++++++++++++------------------ dask_mpi/tests/test_cli.py | 67 ++++++++++++++++++++++++-------- 2 files changed, 92 insertions(+), 54 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 1538d4b..f9817dc 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -63,7 +63,7 @@ help="Start workers in nanny process for management (deprecated use --worker-class instead)", ) @click.option( - "--exclusive-workers", + "--exclusive-workers/--inclusive-workers", default=True, help=( "Whether to force workers to run on unoccupied MPI ranks. If false, " @@ -128,46 +128,51 @@ def main( except TypeError: worker_options = {} - if rank == scheduler_rank and scheduler: + async def run_worker(): + WorkerType = import_term(worker_class) + if not nanny: + raise DeprecationWarning( + "Option --no-nanny is deprectaed, use --worker-class instead" + ) + opts = { + "interface": interface, + "protocol": protocol, + "nthreads": nthreads, + "memory_limit": memory_limit, + "local_directory": local_directory, + "name": f"{name}-{rank}", + "scheduler_file": scheduler_file, + **worker_options, + } + if scheduler_address: + opts["scheduler_ip"] = scheduler_address + + async with WorkerType(**opts) as worker: + await worker.finished() + + async def run_scheduler(launch_worker=False): + async with Scheduler( + interface=interface, + protocol=protocol, + dashboard_address=dashboard_address, + scheduler_file=scheduler_file, + port=scheduler_port, + ) as scheduler: + comm.Barrier() - async def run_func(): - async with Scheduler( - interface=interface, - protocol=protocol, - dashboard_address=dashboard_address, - scheduler_file=scheduler_file, - port=scheduler_port, - ) as s: - comm.Barrier() - await s.finished() + if launch_worker: + asyncio.get_event_loop().create_task(run_worker()) - else: + await scheduler.finished() - async def run_func(): - comm.Barrier() + if rank == scheduler_rank and scheduler: + asyncio.get_event_loop().run_until_complete( + run_scheduler(launch_worker=not exclusive_workers) + ) + else: + comm.Barrier() - WorkerType = import_term(worker_class) - if not nanny: - raise DeprecationWarning( - "Option --no-nanny is deprectaed, use --worker-class instead" - ) - opts = { - "interface": interface, - "protocol": protocol, - "nthreads": nthreads, - "memory_limit": memory_limit, - "local_directory": local_directory, - "name": f"{name}-{rank}", - "scheduler_file": scheduler_file, - **worker_options, - } - if scheduler_address: - opts["scheduler_ip"] = scheduler_address - - async with WorkerType(**opts) as worker: - await worker.finished() - - asyncio.get_event_loop().run_until_complete(run_func()) + asyncio.get_event_loop().run_until_complete(run_worker()) if __name__ == "__main__": diff --git a/dask_mpi/tests/test_cli.py b/dask_mpi/tests/test_cli.py index e10e370..0a35960 100644 --- a/dask_mpi/tests/test_cli.py +++ b/dask_mpi/tests/test_cli.py @@ -51,6 +51,27 @@ def test_basic(loop, worker_class, mpirun): assert c.submit(lambda x: x + 1, 10).result() == 11 +def test_inclusive_workers(loop, mpirun): + with tmpfile(extension="json") as fn: + cmd = mpirun + [ + "-np", + "4", + "dask-mpi", + "--scheduler-file", + fn, + "--inclusive-workers", + ] + + with popen(cmd): + with Client(scheduler_file=fn) as client: + start = time() + while len(client.scheduler_info()["workers"]) < 4: + assert time() < start + 10 + sleep(0.1) + + assert client.submit(lambda x: x + 1, 10).result() == 11 + + def test_small_world(mpirun): with tmpfile(extension="json") as fn: # Set too few processes to start cluster @@ -98,6 +119,35 @@ def test_no_scheduler(loop, mpirun): sleep(0.2) +def test_scheduler_rank(loop, mpirun): + with tmpfile(extension="json") as fn: + cmd = mpirun + [ + "-np", + "2", + "dask-mpi", + "--scheduler-file", + fn, + "--exclusive-workers", + "--scheduler-rank", + "1", + ] + + with popen(cmd, stdin=FNULL): + with Client(scheduler_file=fn) as client: + start = time() + while len(client.scheduler_info()["workers"]) < 1: + assert time() < start + 10 + sleep(0.2) + + worker_infos = client.scheduler_info()["workers"] + assert len(worker_infos) == 1 + + worker_info = next(iter(worker_infos.values())) + assert worker_info["name"].rsplit("-")[-1] == "0" + + assert client.submit(lambda x: x + 1, 10).result() == 11 + + @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_non_default_ports(loop, nanny, mpirun): with tmpfile(extension="json") as fn: @@ -150,23 +200,6 @@ def test_dashboard(loop, mpirun): requests.get("http://localhost:59583/status/") -@pytest.mark.skip(reason="Should we expose this option?") -def test_bokeh_worker(loop, mpirun): - with tmpfile(extension="json") as fn: - cmd = mpirun + [ - "-np", - "2", - "dask-mpi", - "--scheduler-file", - fn, - "--bokeh-worker-port", - "59584", - ] - - with popen(cmd, stdin=FNULL): - check_port_okay(59584) - - def tmpfile_static(extension="", dir=None): """ utility function for test_stale_sched test From 055a95d6fd5b1cb386c8a5d758f13f5479b84e7c Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 27 Sep 2023 13:49:22 +0200 Subject: [PATCH 04/32] allow 1-rank clusters --- dask_mpi/cli.py | 7 ++++--- dask_mpi/tests/test_cli.py | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index f9817dc..46204d9 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -115,10 +115,11 @@ def main( comm = MPI.COMM_WORLD world_size = comm.Get_size() - if scheduler and world_size < 2: + if scheduler and exclusive_workers and world_size < 2: raise WorldTooSmallException( - f"Not enough MPI ranks to start cluster, found {world_size}, " - "needs at least 2, one each for the scheduler and a worker." + "Not enough MPI ranks to start cluster with exclusive workers, " + f"found {world_size} MPI ranks, needs at least 2, " + "one each for the scheduler and a worker." ) rank = comm.Get_rank() diff --git a/dask_mpi/tests/test_cli.py b/dask_mpi/tests/test_cli.py index 0a35960..b1d023d 100644 --- a/dask_mpi/tests/test_cli.py +++ b/dask_mpi/tests/test_cli.py @@ -90,6 +90,27 @@ def test_small_world(mpirun): assert p.returncode != 0 +def test_inclusive_small_world(mpirun): + with tmpfile(extension="json") as fn: + cmd = mpirun + [ + "-np", + "1", + "dask-mpi", + "--scheduler-file", + fn, + "--inclusive-workers", + ] + + with popen(cmd): + with Client(scheduler_file=fn) as client: + start = time() + while len(client.scheduler_info()["workers"]) < 1: + assert time() < start + 10 + sleep(0.1) + + assert client.submit(lambda x: x + 1, 10).result() == 11 + + def test_no_scheduler(loop, mpirun): with tmpfile(extension="json") as fn: cmd = mpirun + ["-np", "2", "dask-mpi", "--scheduler-file", fn] From ff320009b7fa6f7da2640840bd63076c48da1101 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 29 Sep 2023 14:42:24 +0200 Subject: [PATCH 05/32] Correction to min world size calculation --- dask_mpi/cli.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 46204d9..315e4e2 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -115,11 +115,11 @@ def main( comm = MPI.COMM_WORLD world_size = comm.Get_size() - if scheduler and exclusive_workers and world_size < 2: + min_world_size = 1 + scheduler * max(scheduler_rank, exclusive_workers) + if world_size < min_world_size: raise WorldTooSmallException( - "Not enough MPI ranks to start cluster with exclusive workers, " - f"found {world_size} MPI ranks, needs at least 2, " - "one each for the scheduler and a worker." + f"Not enough MPI ranks to start cluster with exclusive_workers={exclusive_workers} and " + f"scheduler_rank={scheduler_rank}, found {world_size} MPI ranks but needs {min_world_size}." ) rank = comm.Get_rank() From 10d54c01f4b43ee6c289e59767b59587bc00f5f6 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 29 Sep 2023 14:43:30 +0200 Subject: [PATCH 06/32] Rename module for clarity --- dask_mpi/{core.py => initialize.py} | 0 dask_mpi/tests/{core_basic.py => initialize_basic.py} | 0 dask_mpi/tests/{core_no_exit.py => initialize_no_exit.py} | 0 dask_mpi/tests/{test_core.py => test_initialize_basic.py} | 4 ++-- .../tests/{test_no_exit.py => test_initialize_no_exit.py} | 2 +- 5 files changed, 3 insertions(+), 3 deletions(-) rename dask_mpi/{core.py => initialize.py} (100%) rename dask_mpi/tests/{core_basic.py => initialize_basic.py} (100%) rename dask_mpi/tests/{core_no_exit.py => initialize_no_exit.py} (100%) rename dask_mpi/tests/{test_core.py => test_initialize_basic.py} (79%) rename dask_mpi/tests/{test_no_exit.py => test_initialize_no_exit.py} (81%) diff --git a/dask_mpi/core.py b/dask_mpi/initialize.py similarity index 100% rename from dask_mpi/core.py rename to dask_mpi/initialize.py diff --git a/dask_mpi/tests/core_basic.py b/dask_mpi/tests/initialize_basic.py similarity index 100% rename from dask_mpi/tests/core_basic.py rename to dask_mpi/tests/initialize_basic.py diff --git a/dask_mpi/tests/core_no_exit.py b/dask_mpi/tests/initialize_no_exit.py similarity index 100% rename from dask_mpi/tests/core_no_exit.py rename to dask_mpi/tests/initialize_no_exit.py diff --git a/dask_mpi/tests/test_core.py b/dask_mpi/tests/test_initialize_basic.py similarity index 79% rename from dask_mpi/tests/test_core.py rename to dask_mpi/tests/test_initialize_basic.py index abe4a60..c247385 100644 --- a/dask_mpi/tests/test_core.py +++ b/dask_mpi/tests/test_initialize_basic.py @@ -11,7 +11,7 @@ def test_basic(mpirun): script_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "core_basic.py" + os.path.dirname(os.path.realpath(__file__)), "initialize_basic.py" ) p = subprocess.Popen(mpirun + ["-np", "4", sys.executable, script_file]) @@ -22,7 +22,7 @@ def test_basic(mpirun): def test_small_world(mpirun): script_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "core_basic.py" + os.path.dirname(os.path.realpath(__file__)), "initialize_basic.py" ) # Set too few processes to start cluster diff --git a/dask_mpi/tests/test_no_exit.py b/dask_mpi/tests/test_initialize_no_exit.py similarity index 81% rename from dask_mpi/tests/test_no_exit.py rename to dask_mpi/tests/test_initialize_no_exit.py index 65c27d5..b7c57f2 100644 --- a/dask_mpi/tests/test_no_exit.py +++ b/dask_mpi/tests/test_initialize_no_exit.py @@ -11,7 +11,7 @@ def test_no_exit(mpirun): script_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "core_no_exit.py" + os.path.dirname(os.path.realpath(__file__)), "initialize_no_exit.py" ) p = subprocess.Popen(mpirun + ["-np", "4", sys.executable, script_file]) From aa00b9864ed7d2495241bdb9e0aeeb0f0472438e Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 29 Sep 2023 14:43:53 +0200 Subject: [PATCH 07/32] Create execute function --- dask_mpi/__init__.py | 3 +- dask_mpi/execute.py | 151 +++++++++++++++++++++++++++ dask_mpi/tests/execute_basic.py | 21 ++++ dask_mpi/tests/test_execute_basic.py | 32 ++++++ 4 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 dask_mpi/execute.py create mode 100644 dask_mpi/tests/execute_basic.py create mode 100644 dask_mpi/tests/test_execute_basic.py diff --git a/dask_mpi/__init__.py b/dask_mpi/__init__.py index 9084c91..ef5215f 100644 --- a/dask_mpi/__init__.py +++ b/dask_mpi/__init__.py @@ -1,5 +1,6 @@ from ._version import get_versions -from .core import initialize, send_close_signal +from .execute import execute +from .initialize import initialize, send_close_signal from .exceptions import WorldTooSmallException __version__ = get_versions()["version"] diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py new file mode 100644 index 0000000..58f9853 --- /dev/null +++ b/dask_mpi/execute.py @@ -0,0 +1,151 @@ +import asyncio + +import dask +from distributed import Nanny, Scheduler +from distributed.utils import import_term + +from .initialize import send_close_signal +from .exceptions import WorldTooSmallException + + +def execute( + func, + *args, + client_rank=1, + scheduler_rank=0, + interface=None, + nthreads=1, + local_directory="", + memory_limit="auto", + nanny=False, + dashboard=True, + dashboard_address=":8787", + protocol=None, + exclusive_workers=True, + worker_class="distributed.Worker", + worker_options=None, + comm=None, + **kwargs, +): + """ + Execute a function on a given MPI rank with a Dask cluster launched using mpi4py + + Using mpi4py, MPI rank 0 launches the Scheduler, MPI rank 1 passes through to the + client script, and all other MPI ranks launch workers. All MPI ranks other than + MPI rank 1 block while their event loops run. + + In normal operation these ranks exit once rank 1 ends. If exit=False is set they + instead return an bool indicating whether they are the client and should execute + more client code, or a worker/scheduler who should not. In this case the user is + responsible for the client calling send_close_signal when work is complete, and + checking the returned value to choose further actions. + + Parameters + ---------- + func : callable + A function containing Dask client code to execute with a Dask cluster + args : list + Arguments to func + client_rank : int + The MPI rank on which to run func + scheduler_rank : int + The MPI rank on which to run the Dask scheduler + interface : str + Network interface like 'eth0' or 'ib0' + nthreads : int + Number of threads per worker + local_directory : str + Directory to place worker files + memory_limit : int, float, or 'auto' + Number of bytes before spilling data to disk. This can be an + integer (nbytes), float (fraction of total memory), or 'auto'. + nanny : bool + Start workers in nanny process for management (deprecated, use worker_class instead) + dashboard : bool + Enable Bokeh visual diagnostics + dashboard_address : str + Bokeh port for visual diagnostics + protocol : str + Protocol like 'inproc' or 'tcp' + exclusive_workers : bool + Whether to only run Dask workers on their own MPI ranks + worker_class : str + Class to use when creating workers + worker_options : dict + Options to pass to workers + comm: mpi4py.MPI.Intracomm + Optional MPI communicator to use instead of COMM_WORLD + + Returns + ------- + ret : Any + If the MPI rank equals client_rank, then the return value of the executed function. + Otherwise, returns None. + """ + if comm is None: + from mpi4py import MPI + + comm = MPI.COMM_WORLD + + world_size = comm.Get_size() + min_world_size = 1 + max(client_rank, scheduler_rank, exclusive_workers) + if world_size < min_world_size: + raise WorldTooSmallException( + f"Not enough MPI ranks to start cluster with exclusive_workers={exclusive_workers} and " + f"scheduler_rank={scheduler_rank}, found {world_size} MPI ranks but needs {min_world_size}." + ) + + rank = comm.Get_rank() + + if not worker_options: + worker_options = {} + + async def run_worker(): + WorkerType = import_term(worker_class) + if nanny: + raise DeprecationWarning( + "Option nanny=True is deprectaed, use worker_class='distributed.Nanny' instead" + ) + WorkerType = Nanny + opts = { + "interface": interface, + "protocol": protocol, + "nthreads": nthreads, + "memory_limit": memory_limit, + "local_directory": local_directory, + "name": rank, + **worker_options, + } + async with WorkerType(**opts) as worker: + await worker.finished() + + async def run_scheduler(launch_worker=False): + async with Scheduler( + interface=interface, + protocol=protocol, + dashboard=dashboard, + dashboard_address=dashboard_address, + ) as scheduler: + comm.bcast(scheduler.address, root=0) + comm.Barrier() + + if launch_worker: + asyncio.create_task(run_worker()) + + await scheduler.finished() + + if rank == scheduler_rank: + asyncio.get_event_loop().run_until_complete(run_scheduler()) + + else: + scheduler_address = comm.bcast(None, root=scheduler_rank) + dask.config.set(scheduler_address=scheduler_address) + comm.Barrier() + + if rank == client_rank: + ret = func(*args, **kwargs) + send_close_signal() + return ret + + else: + asyncio.get_event_loop().run_until_complete(run_worker()) diff --git a/dask_mpi/tests/execute_basic.py b/dask_mpi/tests/execute_basic.py new file mode 100644 index 0000000..e79b66b --- /dev/null +++ b/dask_mpi/tests/execute_basic.py @@ -0,0 +1,21 @@ +from time import sleep + +from distributed import Client +from distributed.metrics import time + +from dask_mpi import execute + + +def client_func(): + with Client() as c: + start = time() + while len(c.scheduler_info()["workers"]) != 2: + assert time() < start + 10 + sleep(0.2) + + assert c.submit(lambda x: x + 1, 10).result() == 11 + assert c.submit(lambda x: x + 1, 20, workers=2).result() == 21 + + +if __name__ == "__main__": + execute(client_func) diff --git a/dask_mpi/tests/test_execute_basic.py b/dask_mpi/tests/test_execute_basic.py new file mode 100644 index 0000000..163f1de --- /dev/null +++ b/dask_mpi/tests/test_execute_basic.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import, division, print_function + +import os +import subprocess +import sys + +import pytest + +pytest.importorskip("mpi4py") + + +def test_basic(mpirun): + script_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "execute_basic.py" + ) + + p = subprocess.Popen(mpirun + ["-np", "4", sys.executable, script_file]) + + p.communicate() + assert p.returncode == 0 + + +def test_small_world(mpirun): + script_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "execute_basic.py" + ) + + # Set too few processes to start cluster + p = subprocess.Popen(mpirun + ["-np", "1", sys.executable, script_file]) + + p.communicate() + assert p.returncode != 0 From 483d8495781a3f06a43bbe3a2f84a9eec66443f3 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 11 Oct 2023 13:12:08 +0200 Subject: [PATCH 08/32] NOQA on unused imports --- dask_mpi/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_mpi/__init__.py b/dask_mpi/__init__.py index ef5215f..09779b4 100644 --- a/dask_mpi/__init__.py +++ b/dask_mpi/__init__.py @@ -1,7 +1,7 @@ from ._version import get_versions -from .execute import execute -from .initialize import initialize, send_close_signal -from .exceptions import WorldTooSmallException +from .exceptions import WorldTooSmallException # noqa +from .execute import execute # noqa +from .initialize import initialize, send_close_signal # noqa __version__ = get_versions()["version"] del get_versions From b6f28415659314685b4b1f1360cad31937edb317 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 11 Oct 2023 13:12:35 +0200 Subject: [PATCH 09/32] Set worker type if deprecated "--no-nanny" option set --- dask_mpi/cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 315e4e2..7b4214b 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -132,6 +132,7 @@ def main( async def run_worker(): WorkerType = import_term(worker_class) if not nanny: + WorkerType = Worker raise DeprecationWarning( "Option --no-nanny is deprectaed, use --worker-class instead" ) From c850dd99fe77b5ca1759d4d844bffc26b9ab0d27 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 11 Oct 2023 13:13:15 +0200 Subject: [PATCH 10/32] Set worker type before raise --- dask_mpi/execute.py | 4 ++-- dask_mpi/initialize.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py index 58f9853..09d409d 100644 --- a/dask_mpi/execute.py +++ b/dask_mpi/execute.py @@ -4,8 +4,8 @@ from distributed import Nanny, Scheduler from distributed.utils import import_term -from .initialize import send_close_signal from .exceptions import WorldTooSmallException +from .initialize import send_close_signal def execute( @@ -103,10 +103,10 @@ def execute( async def run_worker(): WorkerType = import_term(worker_class) if nanny: + WorkerType = Nanny raise DeprecationWarning( "Option nanny=True is deprectaed, use worker_class='distributed.Nanny' instead" ) - WorkerType = Nanny opts = { "interface": interface, "protocol": protocol, diff --git a/dask_mpi/initialize.py b/dask_mpi/initialize.py index 05a0d39..bd7318a 100644 --- a/dask_mpi/initialize.py +++ b/dask_mpi/initialize.py @@ -121,10 +121,10 @@ async def run_scheduler(): async def run_worker(): WorkerType = import_term(worker_class) if nanny: + WorkerType = Nanny raise DeprecationWarning( "Option nanny=True is deprectaed, use worker_class='distributed.Nanny' instead" ) - WorkerType = Nanny opts = { "interface": interface, "protocol": protocol, From 376852c2587a8727bc52865546079895dd5edfec Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 11 Oct 2023 13:29:21 +0200 Subject: [PATCH 11/32] Import from dask not distributed --- dask_mpi/tests/test_cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_mpi/tests/test_cli.py b/dask_mpi/tests/test_cli.py index b1d023d..4a2f754 100644 --- a/dask_mpi/tests/test_cli.py +++ b/dask_mpi/tests/test_cli.py @@ -8,10 +8,11 @@ import pytest import requests +from dask.utils import tmpfile from distributed import Client from distributed.comm.addressing import get_address_host_port from distributed.metrics import time -from distributed.utils import import_term, tmpfile +from distributed.utils import import_term from distributed.utils_test import cleanup, loop, loop_in_thread, popen # noqa: F401 pytest.importorskip("mpi4py") From 74cf544e23a55e47865832c4e856c4f56f81cbf7 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 11 Oct 2023 13:29:37 +0200 Subject: [PATCH 12/32] Update versioneer script to new Python --- versioneer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/versioneer.py b/versioneer.py index 9882041..c3b09fa 100644 --- a/versioneer.py +++ b/versioneer.py @@ -339,9 +339,14 @@ def get_config_from_root(root): # configparser.NoOptionError (if it lacks "VCS="). See the docstring at # the top of versioneer.py for instructions on writing your setup.cfg . setup_cfg = os.path.join(root, "setup.cfg") - parser = configparser.SafeConfigParser() - with open(setup_cfg, "r") as f: - parser.readfp(f) + try: + parser = configparser.SafeConfigParser() + with open(setup_cfg, "r") as f: + parser.readfp(f) + except Exception: + parser = configparser.ConfigParser() + with open(setup_cfg, "r") as f: + parser.read_file(f) VCS = parser.get("versioneer", "VCS") # mandatory def get(parser, name): From 6592545000239840455a550c1e36c05b2f4767d2 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 11 Oct 2023 13:49:10 +0200 Subject: [PATCH 13/32] Temporary fix for python 3.12 changes --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8a1aedf..d197b71 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ def environment_dependencies(obj, dependencies=None): license="BSD 3-Clause", include_package_data=True, install_requires=install_requires, - python_requires=">=3.6", + python_requires=">=3.6,<3.12", packages=["dask_mpi"], long_description=long_description, entry_points=""" From 6219830fe84474eecaa68161c8bd5f5ac693dacf Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Wed, 11 Oct 2023 16:53:59 +0200 Subject: [PATCH 14/32] Custom rank placement logic --- dask_mpi/execute.py | 47 +++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py index 09d409d..1aee5fa 100644 --- a/dask_mpi/execute.py +++ b/dask_mpi/execute.py @@ -1,4 +1,5 @@ import asyncio +import threading import dask from distributed import Nanny, Scheduler @@ -75,12 +76,6 @@ def execute( Options to pass to workers comm: mpi4py.MPI.Intracomm Optional MPI communicator to use instead of COMM_WORLD - - Returns - ------- - ret : Any - If the MPI rank equals client_rank, then the return value of the executed function. - Otherwise, returns None. """ if comm is None: from mpi4py import MPI @@ -100,7 +95,14 @@ def execute( if not worker_options: worker_options = {} - async def run_worker(): + async def run_client(): + def wrapped_func(*args, **kwargs): + func(*args, **kwargs) + send_close_signal() + + threading.Thread(target=wrapped_func, args=args, kwargs=kwargs).start() + + async def run_worker(launch_client=False): WorkerType = import_term(worker_class) if nanny: WorkerType = Nanny @@ -117,9 +119,12 @@ async def run_worker(): **worker_options, } async with WorkerType(**opts) as worker: + if launch_client: + asyncio.get_event_loop().create_task(run_client()) + await worker.finished() - async def run_scheduler(launch_worker=False): + async def run_scheduler(launch_worker=False, launch_client=False): async with Scheduler( interface=interface, protocol=protocol, @@ -130,22 +135,30 @@ async def run_scheduler(launch_worker=False): comm.Barrier() if launch_worker: - asyncio.create_task(run_worker()) + asyncio.get_event_loop().create_task(run_worker(launch_client=launch_client)) + + elif launch_client: + asyncio.get_event_loop().create_task(run_client()) await scheduler.finished() - if rank == scheduler_rank: - asyncio.get_event_loop().run_until_complete(run_scheduler()) + launch_scheduler = rank == scheduler_rank + launch_client = rank == client_rank + + if launch_scheduler: + run_coro = run_scheduler( + launch_worker=not exclusive_workers, + launch_client=launch_client, + ) else: scheduler_address = comm.bcast(None, root=scheduler_rank) dask.config.set(scheduler_address=scheduler_address) comm.Barrier() - if rank == client_rank: - ret = func(*args, **kwargs) - send_close_signal() - return ret - + if launch_client and exclusive_workers: + run_coro = run_client() else: - asyncio.get_event_loop().run_until_complete(run_worker()) + run_coro = run_worker(launch_client=launch_client) + + asyncio.get_event_loop().run_until_complete(run_coro) From 3cdd7ba63298976d198bd6c20b242c553bf6102a Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 11:09:22 +0200 Subject: [PATCH 15/32] Move no_exit test into main initialize test and rename --- ...initialize_basic.py => test_initialize.py} | 11 ++++++++++ dask_mpi/tests/test_initialize_no_exit.py | 20 ------------------- 2 files changed, 11 insertions(+), 20 deletions(-) rename dask_mpi/tests/{test_initialize_basic.py => test_initialize.py} (73%) delete mode 100644 dask_mpi/tests/test_initialize_no_exit.py diff --git a/dask_mpi/tests/test_initialize_basic.py b/dask_mpi/tests/test_initialize.py similarity index 73% rename from dask_mpi/tests/test_initialize_basic.py rename to dask_mpi/tests/test_initialize.py index c247385..56757c4 100644 --- a/dask_mpi/tests/test_initialize_basic.py +++ b/dask_mpi/tests/test_initialize.py @@ -30,3 +30,14 @@ def test_small_world(mpirun): p.communicate() assert p.returncode != 0 + + +def test_no_exit(mpirun): + script_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "initialize_no_exit.py" + ) + + p = subprocess.Popen(mpirun + ["-np", "4", sys.executable, script_file]) + + p.communicate() + assert p.returncode == 0 diff --git a/dask_mpi/tests/test_initialize_no_exit.py b/dask_mpi/tests/test_initialize_no_exit.py deleted file mode 100644 index b7c57f2..0000000 --- a/dask_mpi/tests/test_initialize_no_exit.py +++ /dev/null @@ -1,20 +0,0 @@ -from __future__ import absolute_import, division, print_function - -import os -import subprocess -import sys - -import pytest - -pytest.importorskip("mpi4py") - - -def test_no_exit(mpirun): - script_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "initialize_no_exit.py" - ) - - p = subprocess.Popen(mpirun + ["-np", "4", sys.executable, script_file]) - - p.communicate() - assert p.returncode == 0 From 93419612583de86f0001944bbe5f054393033ed7 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 11:10:02 +0200 Subject: [PATCH 16/32] Renaming for better clarity --- dask_mpi/execute.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py index 1aee5fa..3435d33 100644 --- a/dask_mpi/execute.py +++ b/dask_mpi/execute.py @@ -102,7 +102,7 @@ def wrapped_func(*args, **kwargs): threading.Thread(target=wrapped_func, args=args, kwargs=kwargs).start() - async def run_worker(launch_client=False): + async def run_worker(with_client=False): WorkerType = import_term(worker_class) if nanny: WorkerType = Nanny @@ -119,25 +119,26 @@ async def run_worker(launch_client=False): **worker_options, } async with WorkerType(**opts) as worker: - if launch_client: + if with_client: asyncio.get_event_loop().create_task(run_client()) await worker.finished() - async def run_scheduler(launch_worker=False, launch_client=False): + async def run_scheduler(with_worker=False, with_client=False): async with Scheduler( interface=interface, protocol=protocol, dashboard=dashboard, dashboard_address=dashboard_address, ) as scheduler: - comm.bcast(scheduler.address, root=0) + dask.config.set(scheduler_address=scheduler.address) + comm.bcast(scheduler.address, root=scheduler_rank) comm.Barrier() - if launch_worker: - asyncio.get_event_loop().create_task(run_worker(launch_client=launch_client)) + if with_worker: + asyncio.get_event_loop().create_task(run_worker(with_client=with_client)) - elif launch_client: + elif with_client: asyncio.get_event_loop().create_task(run_client()) await scheduler.finished() @@ -147,8 +148,8 @@ async def run_scheduler(launch_worker=False, launch_client=False): if launch_scheduler: run_coro = run_scheduler( - launch_worker=not exclusive_workers, - launch_client=launch_client, + with_worker=not exclusive_workers, + with_client=launch_client, ) else: @@ -159,6 +160,6 @@ async def run_scheduler(launch_worker=False, launch_client=False): if launch_client and exclusive_workers: run_coro = run_client() else: - run_coro = run_worker(launch_client=launch_client) + run_coro = run_worker(with_client=launch_client) asyncio.get_event_loop().run_until_complete(run_coro) From c61ec656bde49477238d90a781c53933d3a2d32e Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 11:10:27 +0200 Subject: [PATCH 17/32] Add execute tests --- dask_mpi/tests/execute_basic.py | 21 -------------- dask_mpi/tests/execute_script.py | 43 ++++++++++++++++++++++++++++ dask_mpi/tests/test_execute.py | 34 ++++++++++++++++++++++ dask_mpi/tests/test_execute_basic.py | 32 --------------------- 4 files changed, 77 insertions(+), 53 deletions(-) delete mode 100644 dask_mpi/tests/execute_basic.py create mode 100644 dask_mpi/tests/execute_script.py create mode 100644 dask_mpi/tests/test_execute.py delete mode 100644 dask_mpi/tests/test_execute_basic.py diff --git a/dask_mpi/tests/execute_basic.py b/dask_mpi/tests/execute_basic.py deleted file mode 100644 index e79b66b..0000000 --- a/dask_mpi/tests/execute_basic.py +++ /dev/null @@ -1,21 +0,0 @@ -from time import sleep - -from distributed import Client -from distributed.metrics import time - -from dask_mpi import execute - - -def client_func(): - with Client() as c: - start = time() - while len(c.scheduler_info()["workers"]) != 2: - assert time() < start + 10 - sleep(0.2) - - assert c.submit(lambda x: x + 1, 10).result() == 11 - assert c.submit(lambda x: x + 1, 20, workers=2).result() == 21 - - -if __name__ == "__main__": - execute(client_func) diff --git a/dask_mpi/tests/execute_script.py b/dask_mpi/tests/execute_script.py new file mode 100644 index 0000000..b887bd3 --- /dev/null +++ b/dask_mpi/tests/execute_script.py @@ -0,0 +1,43 @@ +from argparse import ArgumentParser +from time import sleep + +from distributed import Client +from distributed.metrics import time + +from dask_mpi import execute + + +def client_func(m=4, c=1, s=0, x=True): + xranks = {c, s} if x else set() + worker_ranks = set(i for i in range(m) if i not in xranks) + + with Client() as c: + start = time() + while len(c.scheduler_info()["workers"]) != len(worker_ranks): + assert time() < start + 10 + sleep(0.2) + + actual_worker_ranks = set(v["name"] for k,v in c.scheduler_info()["workers"].items()) + assert actual_worker_ranks == worker_ranks + + for i in actual_worker_ranks: + assert c.submit(lambda x: x + 1, 10, workers=i).result() == 11 + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument("-m", type=int, default=None) + parser.add_argument("-c", type=int, default=None) + parser.add_argument("-s", type=int, default=None) + parser.add_argument("-x", type=lambda v: v.lower() != "false", default=None) + kwargs = vars(parser.parse_args()) + + execute_kwargs = {k:v for k,v in kwargs.items() if v is not None} + if "c" in execute_kwargs: + execute_kwargs["client_rank"] = execute_kwargs["c"] + if "s" in execute_kwargs: + execute_kwargs["scheduler_rank"] = execute_kwargs["s"] + if "x" in execute_kwargs: + execute_kwargs["exclusive_workers"] = execute_kwargs["x"] + + execute(client_func, **execute_kwargs) diff --git a/dask_mpi/tests/test_execute.py b/dask_mpi/tests/test_execute.py new file mode 100644 index 0000000..99fd500 --- /dev/null +++ b/dask_mpi/tests/test_execute.py @@ -0,0 +1,34 @@ +from __future__ import absolute_import, division, print_function + +import os +import subprocess +import sys + +import pytest + +pytest.importorskip("mpi4py") + + +@pytest.mark.parametrize( + "mpisize,execute_args,retcode", + [ + (4, [], 0), + (1, [], 1), # Set too few processes to start cluster + (4, ["-c", "2", "-s", "3"], 0), + (5, ["-s", "3"], 0), + (3, ["-c", "2", "-s", "2"], 0), + (2, ["-c", "0", "-s", "0", "-x", "False"], 0), + (1, ["-c", "0", "-s", "0", "-x", "False"], 0), + (1, ["-c", "0", "-s", "0", "-x", "True"], 1), + ] +) +def test_execute(mpisize, execute_args, retcode, mpirun): + script_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "execute_script.py" + ) + + execute_args += ["-m", str(mpisize)] + p = subprocess.Popen(mpirun + ["-n", str(mpisize), sys.executable, script_file] + execute_args) + + p.communicate() + assert p.returncode == retcode diff --git a/dask_mpi/tests/test_execute_basic.py b/dask_mpi/tests/test_execute_basic.py deleted file mode 100644 index 163f1de..0000000 --- a/dask_mpi/tests/test_execute_basic.py +++ /dev/null @@ -1,32 +0,0 @@ -from __future__ import absolute_import, division, print_function - -import os -import subprocess -import sys - -import pytest - -pytest.importorskip("mpi4py") - - -def test_basic(mpirun): - script_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "execute_basic.py" - ) - - p = subprocess.Popen(mpirun + ["-np", "4", sys.executable, script_file]) - - p.communicate() - assert p.returncode == 0 - - -def test_small_world(mpirun): - script_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "execute_basic.py" - ) - - # Set too few processes to start cluster - p = subprocess.Popen(mpirun + ["-np", "1", sys.executable, script_file]) - - p.communicate() - assert p.returncode != 0 From 51f21e17646246ae420f5183ce5bc35f5868ccbc Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Oct 2023 09:39:45 +0000 Subject: [PATCH 18/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- dask_mpi/execute.py | 4 +++- dask_mpi/tests/execute_script.py | 6 ++++-- dask_mpi/tests/test_execute.py | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py index 3435d33..6e8b557 100644 --- a/dask_mpi/execute.py +++ b/dask_mpi/execute.py @@ -136,7 +136,9 @@ async def run_scheduler(with_worker=False, with_client=False): comm.Barrier() if with_worker: - asyncio.get_event_loop().create_task(run_worker(with_client=with_client)) + asyncio.get_event_loop().create_task( + run_worker(with_client=with_client) + ) elif with_client: asyncio.get_event_loop().create_task(run_client()) diff --git a/dask_mpi/tests/execute_script.py b/dask_mpi/tests/execute_script.py index b887bd3..a3158d3 100644 --- a/dask_mpi/tests/execute_script.py +++ b/dask_mpi/tests/execute_script.py @@ -17,7 +17,9 @@ def client_func(m=4, c=1, s=0, x=True): assert time() < start + 10 sleep(0.2) - actual_worker_ranks = set(v["name"] for k,v in c.scheduler_info()["workers"].items()) + actual_worker_ranks = set( + v["name"] for k, v in c.scheduler_info()["workers"].items() + ) assert actual_worker_ranks == worker_ranks for i in actual_worker_ranks: @@ -32,7 +34,7 @@ def client_func(m=4, c=1, s=0, x=True): parser.add_argument("-x", type=lambda v: v.lower() != "false", default=None) kwargs = vars(parser.parse_args()) - execute_kwargs = {k:v for k,v in kwargs.items() if v is not None} + execute_kwargs = {k: v for k, v in kwargs.items() if v is not None} if "c" in execute_kwargs: execute_kwargs["client_rank"] = execute_kwargs["c"] if "s" in execute_kwargs: diff --git a/dask_mpi/tests/test_execute.py b/dask_mpi/tests/test_execute.py index 99fd500..ab2ff08 100644 --- a/dask_mpi/tests/test_execute.py +++ b/dask_mpi/tests/test_execute.py @@ -20,7 +20,7 @@ (2, ["-c", "0", "-s", "0", "-x", "False"], 0), (1, ["-c", "0", "-s", "0", "-x", "False"], 0), (1, ["-c", "0", "-s", "0", "-x", "True"], 1), - ] + ], ) def test_execute(mpisize, execute_args, retcode, mpirun): script_file = os.path.join( @@ -28,7 +28,9 @@ def test_execute(mpisize, execute_args, retcode, mpirun): ) execute_args += ["-m", str(mpisize)] - p = subprocess.Popen(mpirun + ["-n", str(mpisize), sys.executable, script_file] + execute_args) + p = subprocess.Popen( + mpirun + ["-n", str(mpisize), sys.executable, script_file] + execute_args + ) p.communicate() assert p.returncode == retcode From b1fbbf55a670ff58b328e85a7a1dcee436590927 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 11:56:15 +0200 Subject: [PATCH 19/32] Set python version in readthedocs env --- docs/environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/environment.yml b/docs/environment.yml index b25df22..745f5a9 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -3,6 +3,7 @@ channels: - conda-forge - nodefaults dependencies: + - python<3.12 - dask>=2.19 - distributed>=2.19 - mpich From 82038c1a2db2a4a34d63b2e46876f73f72299577 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 12:19:33 +0200 Subject: [PATCH 20/32] Try fixing python version for readthedocs build --- readthedocs.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/readthedocs.yml b/readthedocs.yml index 1b6bc16..0a0370f 100644 --- a/readthedocs.yml +++ b/readthedocs.yml @@ -3,7 +3,8 @@ version: 2 build: os: "ubuntu-22.04" tools: - python: "mambaforge-22.9" + # python: "mambaforge-22.9" + python: "3.10" conda: environment: docs/environment.yml python: From 2a05ccac20344661d27a18b67e84c647340898aa Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 12:22:15 +0200 Subject: [PATCH 21/32] Revert --- readthedocs.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/readthedocs.yml b/readthedocs.yml index 0a0370f..1b6bc16 100644 --- a/readthedocs.yml +++ b/readthedocs.yml @@ -3,8 +3,7 @@ version: 2 build: os: "ubuntu-22.04" tools: - # python: "mambaforge-22.9" - python: "3.10" + python: "mambaforge-22.9" conda: environment: docs/environment.yml python: From b838307a17a6ecb0354ceb6a54192cdcd62dd18c Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 15:51:08 +0200 Subject: [PATCH 22/32] Rename to match test name --- dask_mpi/tests/{execute_script.py => execute_basic.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename dask_mpi/tests/{execute_script.py => execute_basic.py} (100%) diff --git a/dask_mpi/tests/execute_script.py b/dask_mpi/tests/execute_basic.py similarity index 100% rename from dask_mpi/tests/execute_script.py rename to dask_mpi/tests/execute_basic.py From 1c43320cd902f5caa72c77370840f56a32933f20 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Thu, 12 Oct 2023 15:51:46 +0200 Subject: [PATCH 23/32] Rename test / add no_exit test for execute --- dask_mpi/tests/execute_no_exit.py | 30 ++++++++++++++++++++++++++++++ dask_mpi/tests/test_execute.py | 15 +++++++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 dask_mpi/tests/execute_no_exit.py diff --git a/dask_mpi/tests/execute_no_exit.py b/dask_mpi/tests/execute_no_exit.py new file mode 100644 index 0000000..548f227 --- /dev/null +++ b/dask_mpi/tests/execute_no_exit.py @@ -0,0 +1,30 @@ +from time import time, sleep + +from distributed import Client +from mpi4py.MPI import COMM_WORLD as world + +from dask_mpi import execute + +# Split our MPI world into two pieces, one consisting just of +# the old rank 3 process and the other with everything else +new_comm_assignment = 1 if world.rank == 3 else 0 +comm = world.Split(new_comm_assignment) + +if world.rank != 3: + def client_code(): + with Client() as c: + start = time() + while len(c.scheduler_info()["workers"]) != 1: + assert time() < start + 10 + sleep(0.2) + + c.submit(lambda x: x + 1, 10).result() == 11 + c.submit(lambda x: x + 1, 20).result() == 21 + + execute(client_code, comm=comm) + +# check that our original comm is intact +world.Barrier() +x = 100 if world.rank == 0 else 200 +x = world.bcast(x) +assert x == 100 diff --git a/dask_mpi/tests/test_execute.py b/dask_mpi/tests/test_execute.py index ab2ff08..318d10e 100644 --- a/dask_mpi/tests/test_execute.py +++ b/dask_mpi/tests/test_execute.py @@ -22,9 +22,9 @@ (1, ["-c", "0", "-s", "0", "-x", "True"], 1), ], ) -def test_execute(mpisize, execute_args, retcode, mpirun): +def test_basic(mpisize, execute_args, retcode, mpirun): script_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "execute_script.py" + os.path.dirname(os.path.realpath(__file__)), "execute_basic.py" ) execute_args += ["-m", str(mpisize)] @@ -34,3 +34,14 @@ def test_execute(mpisize, execute_args, retcode, mpirun): p.communicate() assert p.returncode == retcode + + +def test_no_exit(mpirun): + script_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "execute_no_exit.py" + ) + + p = subprocess.Popen(mpirun + ["-np", "4", sys.executable, script_file]) + + p.communicate() + assert p.returncode == 0 From b67d5a1135e8172e24a4b56854b6eaab4d77ce20 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Oct 2023 13:53:13 +0000 Subject: [PATCH 24/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- dask_mpi/tests/execute_no_exit.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_mpi/tests/execute_no_exit.py b/dask_mpi/tests/execute_no_exit.py index 548f227..0735684 100644 --- a/dask_mpi/tests/execute_no_exit.py +++ b/dask_mpi/tests/execute_no_exit.py @@ -1,4 +1,4 @@ -from time import time, sleep +from time import sleep, time from distributed import Client from mpi4py.MPI import COMM_WORLD as world @@ -11,6 +11,7 @@ comm = world.Split(new_comm_assignment) if world.rank != 3: + def client_code(): with Client() as c: start = time() From 5ef8ca42b9ce7564be89d180735992920d9479b5 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 13 Oct 2023 09:20:44 +0200 Subject: [PATCH 25/32] Possible to not supply a function --- dask_mpi/execute.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py index 6e8b557..65178ef 100644 --- a/dask_mpi/execute.py +++ b/dask_mpi/execute.py @@ -12,7 +12,7 @@ def execute( func, *args, - client_rank=1, + client_rank=None, scheduler_rank=0, interface=None, nthreads=1, @@ -25,6 +25,7 @@ def execute( exclusive_workers=True, worker_class="distributed.Worker", worker_options=None, + worker_name=None, comm=None, **kwargs, ): @@ -44,11 +45,12 @@ def execute( Parameters ---------- func : callable - A function containing Dask client code to execute with a Dask cluster + A function containing Dask client code to execute with a Dask cluster. If + func it not callable, then no client code will be executed. args : list - Arguments to func + Arguments to the client function client_rank : int - The MPI rank on which to run func + The MPI rank on which to run func. scheduler_rank : int The MPI rank on which to run the Dask scheduler interface : str @@ -74,8 +76,13 @@ def execute( Class to use when creating workers worker_options : dict Options to pass to workers - comm: mpi4py.MPI.Intracomm + worker_name : str + Prefix for name given to workers. If defined, each worker will be named + '{worker_name}-{rank}'. Otherwise, the name of each worker is just '{rank}'. + comm : mpi4py.MPI.Intracomm Optional MPI communicator to use instead of COMM_WORLD + kwargs : dict + Keyword arguments to the client function """ if comm is None: from mpi4py import MPI @@ -115,7 +122,7 @@ async def run_worker(with_client=False): "nthreads": nthreads, "memory_limit": memory_limit, "local_directory": local_directory, - "name": rank, + "name": rank if worker_name else f"{worker_name}-{rank}", **worker_options, } async with WorkerType(**opts) as worker: @@ -145,13 +152,13 @@ async def run_scheduler(with_worker=False, with_client=False): await scheduler.finished() - launch_scheduler = rank == scheduler_rank - launch_client = rank == client_rank + with_scheduler = rank == scheduler_rank + with_client = callable(func) and (rank == client_rank) - if launch_scheduler: + if with_scheduler: run_coro = run_scheduler( with_worker=not exclusive_workers, - with_client=launch_client, + with_client=with_client, ) else: @@ -159,9 +166,9 @@ async def run_scheduler(with_worker=False, with_client=False): dask.config.set(scheduler_address=scheduler_address) comm.Barrier() - if launch_client and exclusive_workers: + if with_client and exclusive_workers: run_coro = run_client() else: - run_coro = run_worker(with_client=launch_client) + run_coro = run_worker(with_client=with_client) asyncio.get_event_loop().run_until_complete(run_coro) From 422280178052a0d6da820dfdcf1f47a58a26b9d3 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 13 Oct 2023 11:29:18 +0200 Subject: [PATCH 26/32] Reorder cli options --- dask_mpi/cli.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 7b4214b..696b292 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -97,19 +97,19 @@ def main( scheduler_address, scheduler_file, + scheduler_port, scheduler_rank, interface, + protocol, nthreads, - local_directory, memory_limit, + local_directory, scheduler, - dashboard_address, nanny, exclusive_workers, worker_class, worker_options, - scheduler_port, - protocol, + dashboard_address, name, ): comm = MPI.COMM_WORLD From 573476422ba6d45c433e7c1410d61a5d27b1abe2 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 13 Oct 2023 11:29:35 +0200 Subject: [PATCH 27/32] Add more execute options --- dask_mpi/execute.py | 43 ++++++++++++++++++++++++--------- dask_mpi/tests/execute_basic.py | 31 ++++++++++-------------- dask_mpi/tests/test_execute.py | 24 +++++++++--------- 3 files changed, 57 insertions(+), 41 deletions(-) diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py index 65178ef..d12116b 100644 --- a/dask_mpi/execute.py +++ b/dask_mpi/execute.py @@ -10,10 +10,15 @@ def execute( - func, - *args, - client_rank=None, + client_function=None, + client_args=(), + client_kwargs=None, + client_rank=1, + scheduler=True, scheduler_rank=0, + scheduler_address=None, + scheduler_port=None, + scheduler_file=None, interface=None, nthreads=1, local_directory="", @@ -27,7 +32,6 @@ def execute( worker_options=None, worker_name=None, comm=None, - **kwargs, ): """ Execute a function on a given MPI rank with a Dask cluster launched using mpi4py @@ -53,6 +57,12 @@ def execute( The MPI rank on which to run func. scheduler_rank : int The MPI rank on which to run the Dask scheduler + scheduler_address : str + IP Address of the scheduler, used if scheduler is not launched + scheduler_port : int + Specify scheduler port number. Defaults to random. + scheduler_file : str + Filename to JSON encoded scheduler information. interface : str Network interface like 'eth0' or 'ib0' nthreads : int @@ -103,11 +113,13 @@ def execute( worker_options = {} async def run_client(): - def wrapped_func(*args, **kwargs): - func(*args, **kwargs) + def wrapped_function(*args, **kwargs): + client_function(*args, **kwargs) send_close_signal() - threading.Thread(target=wrapped_func, args=args, kwargs=kwargs).start() + threading.Thread( + target=wrapped_function, args=client_args, kwargs=client_kwargs + ).start() async def run_worker(with_client=False): WorkerType = import_term(worker_class) @@ -122,9 +134,11 @@ async def run_worker(with_client=False): "nthreads": nthreads, "memory_limit": memory_limit, "local_directory": local_directory, - "name": rank if worker_name else f"{worker_name}-{rank}", + "name": rank if not worker_name else f"{worker_name}-{rank}", **worker_options, } + if not scheduler and scheduler_address: + opts["scheduler_ip"] = scheduler_address async with WorkerType(**opts) as worker: if with_client: asyncio.get_event_loop().create_task(run_client()) @@ -137,6 +151,8 @@ async def run_scheduler(with_worker=False, with_client=False): protocol=protocol, dashboard=dashboard, dashboard_address=dashboard_address, + scheduler_file=scheduler_file, + port=scheduler_port, ) as scheduler: dask.config.set(scheduler_address=scheduler.address) comm.bcast(scheduler.address, root=scheduler_rank) @@ -152,8 +168,8 @@ async def run_scheduler(with_worker=False, with_client=False): await scheduler.finished() - with_scheduler = rank == scheduler_rank - with_client = callable(func) and (rank == client_rank) + with_scheduler = scheduler and (rank == scheduler_rank) + with_client = callable(client_function) and (rank == client_rank) if with_scheduler: run_coro = run_scheduler( @@ -162,7 +178,12 @@ async def run_scheduler(with_worker=False, with_client=False): ) else: - scheduler_address = comm.bcast(None, root=scheduler_rank) + if scheduler: + scheduler_address = comm.bcast(None, root=scheduler_rank) + elif scheduler_address is None: + raise ValueError( + "Must provide scheduler_address if executing with scheduler=False" + ) dask.config.set(scheduler_address=scheduler_address) comm.Barrier() diff --git a/dask_mpi/tests/execute_basic.py b/dask_mpi/tests/execute_basic.py index a3158d3..7af4e9f 100644 --- a/dask_mpi/tests/execute_basic.py +++ b/dask_mpi/tests/execute_basic.py @@ -1,4 +1,4 @@ -from argparse import ArgumentParser +import sys from time import sleep from distributed import Client @@ -7,7 +7,7 @@ from dask_mpi import execute -def client_func(m=4, c=1, s=0, x=True): +def client_func(m, c, s, x): xranks = {c, s} if x else set() worker_ranks = set(i for i in range(m) if i not in xranks) @@ -27,19 +27,14 @@ def client_func(m=4, c=1, s=0, x=True): if __name__ == "__main__": - parser = ArgumentParser() - parser.add_argument("-m", type=int, default=None) - parser.add_argument("-c", type=int, default=None) - parser.add_argument("-s", type=int, default=None) - parser.add_argument("-x", type=lambda v: v.lower() != "false", default=None) - kwargs = vars(parser.parse_args()) - - execute_kwargs = {k: v for k, v in kwargs.items() if v is not None} - if "c" in execute_kwargs: - execute_kwargs["client_rank"] = execute_kwargs["c"] - if "s" in execute_kwargs: - execute_kwargs["scheduler_rank"] = execute_kwargs["s"] - if "x" in execute_kwargs: - execute_kwargs["exclusive_workers"] = execute_kwargs["x"] - - execute(client_func, **execute_kwargs) + vmap = {"True": True, "False": False, "None": None} + int_or_bool = lambda s: vmap[s] if s in vmap else int(s) + args = [int_or_bool(i) for i in sys.argv[1:]] + + execute( + client_function=client_func, + client_args=args, + client_rank=args[1], + scheduler_rank=args[2], + exclusive_workers=args[3], + ) diff --git a/dask_mpi/tests/test_execute.py b/dask_mpi/tests/test_execute.py index 318d10e..99b74fa 100644 --- a/dask_mpi/tests/test_execute.py +++ b/dask_mpi/tests/test_execute.py @@ -10,26 +10,26 @@ @pytest.mark.parametrize( - "mpisize,execute_args,retcode", + "mpisize,crank,srank,xworkers,retcode", [ - (4, [], 0), - (1, [], 1), # Set too few processes to start cluster - (4, ["-c", "2", "-s", "3"], 0), - (5, ["-s", "3"], 0), - (3, ["-c", "2", "-s", "2"], 0), - (2, ["-c", "0", "-s", "0", "-x", "False"], 0), - (1, ["-c", "0", "-s", "0", "-x", "False"], 0), - (1, ["-c", "0", "-s", "0", "-x", "True"], 1), + (4, 1, 0, True, 0), # DEFAULTS + (1, 1, 0, True, 1), # Set too few processes to start cluster + (4, 2, 3, True, 0), + (5, 1, 3, True, 0), + (3, 2, 2, True, 0), + (2, 0, 0, False, 0), + (1, 0, 0, False, 0), + (1, 0, 0, True, 1), ], ) -def test_basic(mpisize, execute_args, retcode, mpirun): +def test_basic(mpisize, crank, srank, xworkers, retcode, mpirun): script_file = os.path.join( os.path.dirname(os.path.realpath(__file__)), "execute_basic.py" ) - execute_args += ["-m", str(mpisize)] + script_args = [str(v) for v in (mpisize, crank, srank, xworkers)] p = subprocess.Popen( - mpirun + ["-n", str(mpisize), sys.executable, script_file] + execute_args + mpirun + ["-n", script_args[0], sys.executable, script_file] + script_args ) p.communicate() From 3ebc32105ec1150dbada5e94af831d7a336e75d1 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 13 Oct 2023 11:32:37 +0200 Subject: [PATCH 28/32] move send_close_signal to execute --- dask_mpi/execute.py | 19 +++++++++++++++++-- dask_mpi/initialize.py | 19 ++----------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/dask_mpi/execute.py b/dask_mpi/execute.py index d12116b..fb6df43 100644 --- a/dask_mpi/execute.py +++ b/dask_mpi/execute.py @@ -2,11 +2,10 @@ import threading import dask -from distributed import Nanny, Scheduler +from distributed import Client, Nanny, Scheduler from distributed.utils import import_term from .exceptions import WorldTooSmallException -from .initialize import send_close_signal def execute( @@ -193,3 +192,19 @@ async def run_scheduler(with_worker=False, with_client=False): run_coro = run_worker(with_client=with_client) asyncio.get_event_loop().run_until_complete(run_coro) + + +def send_close_signal(): + """ + The client can call this function to explicitly stop + the event loop. + + This is not needed in normal usage, where it is run + automatically when the client code exits python. + + You only need to call this manually when using exit=False + in initialize. + """ + + with Client() as c: + c.shutdown() diff --git a/dask_mpi/initialize.py b/dask_mpi/initialize.py index bd7318a..7170d63 100644 --- a/dask_mpi/initialize.py +++ b/dask_mpi/initialize.py @@ -3,10 +3,11 @@ import sys import dask -from distributed import Client, Nanny, Scheduler +from distributed import Nanny, Scheduler from distributed.utils import import_term from .exceptions import WorldTooSmallException +from .execute import send_close_signal def initialize( @@ -142,19 +143,3 @@ async def run_worker(): sys.exit() else: return False - - -def send_close_signal(): - """ - The client can call this function to explicitly stop - the event loop. - - This is not needed in normal usage, where it is run - automatically when the client code exits python. - - You only need to call this manually when using exit=False - in initialize. - """ - - with Client() as c: - c.shutdown() From a35c26179792bf1534d166e5db43a997d6c2d0d0 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 13 Oct 2023 11:34:19 +0200 Subject: [PATCH 29/32] send_close_signal has moved --- dask_mpi/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_mpi/__init__.py b/dask_mpi/__init__.py index 09779b4..6201984 100644 --- a/dask_mpi/__init__.py +++ b/dask_mpi/__init__.py @@ -1,7 +1,7 @@ from ._version import get_versions from .exceptions import WorldTooSmallException # noqa -from .execute import execute # noqa -from .initialize import initialize, send_close_signal # noqa +from .execute import execute, send_close_signal # noqa +from .initialize import initialize # noqa __version__ = get_versions()["version"] del get_versions From d5ee1e8be08c0e0c5036677d2e416090e34ef328 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 13 Oct 2023 11:34:48 +0200 Subject: [PATCH 30/32] Deprecate initialize, now that execute does everything --- dask_mpi/initialize.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dask_mpi/initialize.py b/dask_mpi/initialize.py index 7170d63..df78ce7 100644 --- a/dask_mpi/initialize.py +++ b/dask_mpi/initialize.py @@ -72,6 +72,10 @@ def initialize( Only returned if exit=False. Inidcates whether this rank should continue to run client code (True), or if it acts as a scheduler or worker (False). """ + raise DeprecationWarning( + "The initialize function is deprectaed, use the execute function instead" + ) + if comm is None: from mpi4py import MPI From bb352ca41e81bcf1cbe4519eb293cde9593571ed Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Fri, 13 Oct 2023 11:36:11 +0200 Subject: [PATCH 31/32] Revert initialize deprecation warning for now --- dask_mpi/initialize.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dask_mpi/initialize.py b/dask_mpi/initialize.py index df78ce7..7170d63 100644 --- a/dask_mpi/initialize.py +++ b/dask_mpi/initialize.py @@ -72,10 +72,6 @@ def initialize( Only returned if exit=False. Inidcates whether this rank should continue to run client code (True), or if it acts as a scheduler or worker (False). """ - raise DeprecationWarning( - "The initialize function is deprectaed, use the execute function instead" - ) - if comm is None: from mpi4py import MPI From 274f1958f4061c762bd312a59ce88adea3cd8b43 Mon Sep 17 00:00:00 2001 From: Kevin Paul Date: Tue, 2 Jul 2024 12:49:44 -0600 Subject: [PATCH 32/32] Attempt RTF fix --- docs/environment.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/environment.yml b/docs/environment.yml index 745f5a9..3c70b3d 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -13,6 +13,13 @@ dependencies: - pygments - pip - pip: + #>>>> See: https://github.com/dask/dask-sphinx-theme/issues/68 + - sphinxcontrib-applehelp<1.0.5 + - sphinxcontrib-devhelp<1.0.6 + - sphinxcontrib-htmlhelp<2.0.5 + - sphinxcontrib-serializinghtml<1.1.10 + - sphinxcontrib-qthelp<1.0.7 + #<<<< - dask-sphinx-theme>=3.0.5 - numpydoc - sphinx-click