Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom client/scheduler MPI rank placement #110

Merged
merged 35 commits into from
Jul 3, 2024
Merged

Conversation

kmpaul
Copy link
Collaborator

@kmpaul kmpaul commented Oct 12, 2023

Update - Ready for Review

I have added a dask_mpi.execute() function, though it looks a little different than the sketch in the "Previous Header" below. The idea is that the execute() function is that it should use MPI to launch the client code/function, the Dask Scheduler, and any needed Dask Workers, as specified. It is general purpose, and can run with or without client code function. It can also run with or without launching a Scheduler. This makes (or should make) execute() completely general, and it can/should work for all use cases either through the CLI or in batch-mode.

The execute() function works by taking a (synchronous) function as input (along with its args/kwargs) and runs the function on a single MPI rank in its own thread. A Dask Scheduler or Worker (or both) can be run on the same MPI rank, if desired. Thus, the entire Dask-MPI utility can launch a Dask cluster with a wide variety of customizable options:

  • with/without a client function: If the client function is absent, execute() works like the old Dask-CLI does, creating a scheduler and workers.
  • with/without a scheduler: If scheduler=False is specified, then it assumes the user supplies a scheduler_address and no actual Scheduler will be run, assuming a scheduler is running at the scheduler_address. This allows you to "add" to a Dask cluster. You can also run a client function to run in batch mode (which also shuts down the cluster when finished).
  • with exclusive/inclusive workers: Workers can be run on every MPI rank, or just on "unused" MPI ranks (where no client or scheduler is running).

All of the options encoded into the CLI/interactive mode of using Dask-MPI are available with execute(), as are all of the options available to the initialize() batch mode of using Dask-MPI.

NOTE: This PR only implements the execute() method. The old CLI has not been modified to use the execute() method, but it can easily be done. Additionally, it does not modify/deprecate/remove the initialize() method. So, all existing functionality will still work, but this new function will provide new functionality and (in a future PR) we can modify the CLI to use execute() and deprecate the initialize() method.

Previous Header

This is a mock-up of a new way of using Dask-MPI that allows custom placement of the client code and the scheduler on the available MPI ranks. The approach is described in #29 (comment), the salient portion of which is quoted below:

  1. The client code would need to be "wrapped" in a function. It could be an asynchronous function, but it would still need to be wrapped in a function regardless.
  2. The dask_mpi.execute(...) function would take the client function as input (or the coroutine) and execute it on the specified client_rank with all of the additional options (e.g., scheduler_rank, exclusive_workers, ...).

To achieve the current Dask-MPI behavior, where workers, scheduler, and client are all in separate MPI ranks, the execute() function might look like:

def execute(func, *args, **kwargs):
    comm = mpi4py.MPI.COMM_WORLD
    this_rank = comm.Get_rank()

    if this_rank == 0:
        async def run_scheduler():
            async with Scheduler(...) as scheduler:
                comm.Barrier()
                await scheduler.finished()
        asyncio.run_until_complete(run_scheduler())

    elif this_rank == 1:
        comm.Barrier()

        ret = func(*args, **kwargs)

        with Client() as client:
            client.shutdown()

        return ret

    else:
        comm.Barrier()

        async def run_worker():
            async with Worker(...) as worker:
                await worker.finished()

        asyncio.run_until_complete(run_worker())

There would be an obvious modification for func being a Coroutine object to run func asynchronously.

And the above modifications to make it more customizable and make MPI rank placement easier would be easy to implement, too.

I particularly like this approach for a number of reasons:

  1. It is much more explicit, and it removes a lot of the previous Dask-MPI "magic" which, as @jacobtomlinson has already pointed out, makes it hard for new users to use Dask-MPI and diagnose problems with Dask-MPI when they have them.
  2. The dask_mpi.execute() function is essentially a decorator, which is an appropriate paradigm to fit the function of Dask-MPI (as opposed to a context manager).
  3. It makes it easy to run the client code in a thread to prevent collision with a scheduler or worker event loop.

This is a draft for the purposes of discussion.

@kmpaul kmpaul marked this pull request as ready for review October 13, 2023 10:21
@kmpaul
Copy link
Collaborator Author

kmpaul commented Oct 13, 2023

NOTE: The execute() tests are not complete. All of the CLI and initialize() tests should be ported to use the new execute() method, but when the CLI is modified to use execute(), the existing CLI tests will "just work." So, I'm holding off on debugging anything related to adding those tests until then. (But I'm obviously willing to fix any obvious bugs that anyone sees that the current tests didn't catch.)

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been sat for a long time, I think we should go ahead and merge it and then if there is any feedback we can iterate in follow up PRs.

@jacobtomlinson jacobtomlinson merged commit 04aecf5 into main Jul 3, 2024
12 checks passed
@jacobtomlinson jacobtomlinson deleted the rank-placement branch July 3, 2024 08:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants