-
-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run Scheduler and Client from the same MPI Rank #29
Comments
Seems fine to me.
…On Mon, Mar 11, 2019 at 1:25 PM Guillaume Eynard-Bontemps < ***@***.***> wrote:
I believe than in some simple case, we don't need to have a rank dedicated
to the Scheduler, and a rank dedicated to the Client. We should provide a
way to start both within the same MPI rank in
https://github.com/dask/dask-mpi/blob/master/dask_mpi/core.py#L62-L71.
Thoughts?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#29>, or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszK-hXbWnE093kNxNhEOH1LLXd9Roks5vVru5gaJpZM4bpfR9>
.
|
@guillaumeeb Sorry for the delay! I think this is entirely true. And it's an excellent idea! In fact, I would like to go a little further than this. I would like to make the launch algorithm work for any size MPI communicator, not just > 2! Here's what I'm considering
Is this reasonable? |
If you think about use case where the MPI rank would be less than 3, then yes. I assumed that there was not really such a use case... But maybe for algorithm testing or debug purpose? Otherwise I think the idea of an optional parameter is perfectly fine! |
I'd like to reboot this issue :D Is there a reason to not place a worker on each MPI rank? For example if the MPI size is 1: place worker, scheduler and client on rank 0. If the MPI size is >=2: scheduler on rank 0, client on rank 1, workers on all ranks. If that isn't a fundamentally bad idea I'd propose adding a flag (maybe called |
Thanks for rebooting this, @betatim! I have some cycles to devote to this today, so the timing is great! I've been thinking about how to give the user more explicit control about placement for client, scheduler and workers within the MPI cluster. To aid with this, I've been thinking about adding the CLI options:
They could be the same, and they could be different from 0/1, but they have sensible defaults (chosen for backwards compatibility). I think the client and scheduler are the easy bits, though, because there are only ever 1 of each of them. The workers are the harder bits. I think the default behavior (again, backward compatibility) is for workers to "fill" the remaining MPI cluster, one worker per MPI rank. But it's the idea of more than one client/scheduler/worker per MPI process that we are discussing, I think. And if we allow the client and scheduler to run on the same MPI rank, then why not multiple workers? In terms of implementation, most MPI implementations that I am aware of will not allow multiple processes to be associated with the same MPI rank (and things like forking/spawning new processes from an MPI process can actually be blocked). I do not know if some MPI implementations will allow it, or if it is safe even if possible, but regardless, the general approach to MPI is that each rank should (ideally) be a single process. That means that to get multiple client/scheduler/workers to run on a single MPI process will require running them asynchronously in a single process, which is very doable from a "I know how to code that up" perspective... but I don't know if it will be a stable solution. Does anyone listening in on this conversation have thoughts on this? In this time zone, maybe @guillaumeeb @jacobtomlinson? Of course, I don't have a problem with implementing something that gives the user the ability to create an unstable instantiation of a Dask-MPI cluster if it also gives the user more flexibility. However, I don't want to create general instability for other users who aren't aware of the complexity and subtlety. |
In terms of a user CLI to allow complete control over worker placement, the complete solution would involve asking the user to specify a list of one That is, something like:
would describe launching 0 workers on ranks 0 and 1, 1 worker on ranks 2, 3, and 4, and 2 workers on rank 5. ...But for large clusters, this is untenable. One possible solution would be to assume that the last
which would imply not launching a worker on ranks 0 and 1 and only launching a worker on ranks 2 or higher. That's the default behavior right now. |
So, currently Dask-MPI in the stand-alone script mode only works if you have an MPI comm size of 3 or larger. This proposed change would make it possible to run Dask-MPI with MPI comm sizes 1 or 2, for which we would need to define sensible defaults. Here are my suggestions: MPI comm size 1: There is only 1 possible choice here, so it doesn't really need to be said. However, just to be pedantic: scheduler on rank 0, client on rank 0, 1 worker on rank 0. MPI comm size 2: I think the default arrangement should be client on rank 0, scheduler on rank 0, and worker on rank 1. But I'm open for comment here. MPI comm sizes 3+: The normal defaults should exist with scheduler on rank 0, client on rank 1, and workers on all other ranks. |
My desire to have a worker on the ranks 0 and 1 is that I don't know enough about the scheduler/job submission system I am using to request different types of nodes per rank. So I end up with a gigantic machine with eight GPUs for each rank. Given that I have to queue a while to get the allocation I'd like to use all of it :D Of course an alternative solution would be to figure out if it is possible to receive a smaller node for rank 0 and 1 (a bit more like you'd do in the cloud). This means for me it is all about "workers on all the ranks". I care less/know nothing about how the scheduler and client are distributed amongst the available ranks. I also don't use the CLI but call I hadn't considered the restrictions on starting new processes and such (can you tell I am a MPI newbie?). I had naively assumed that we'd just add more stuff to the event loop and that would be that. I am +1 on making it hard for people to configure things in a way that makes running unstable. My assumption is that most people "have no clue", so like me they'd just see a wasted node and see that they can configure it to do work and do it. And then have a miserable life because sometimes things work and sometimes they don't. I think it would be better to not give people that rope to hang themselves by and write a short bit of documentation explaining why you can't configure a particular setup. For "why not more than one worker": I learnt that the word "worker" is overloaded. For example when I instantiate one |
Understood! Then I would lean towards making the "run workers everywhere" option easy to implement, at least.
Yes. I think the
Yes. I think that being explicit here would benefit everyone. Then, at least, people will have a better idea of what to expect before hand. That said, MPI is always confusing for many people. Dask-MPI was created out of a simple abstraction that attempted to make launching a dask cluster in an MPI environment easy, but I think that just opened the trail to a steep climb for many people. Once you're in the door, MPI can be a labyrinth! (And that's not even going into the complications created by each implementation of MPI being a little different from each other...and from the MPI standard itself!) |
Yes. You've hit on exactly the issue, I think. That is why the initial implementations of Dask-MPI have always worked in "units of 1". And I think (because of this overloaded "worker" word) it is safe to continue assuming only 1 worker per rank.
Changed my mind. I think it actually makes sense to just adopt your notion of whether to allow a worker on the same rank as the client/scheduler. This is the notion of worker exclusivity, or whether a worker has exclusive access to an MPI rank. Would it be sufficient to just have a boolean |
I think this makes a load of sense. Especially on large nodes. Thinking purely from a GPU perspective there are a bunch of different configurations I would love to have (assuming the rank 0 node has 8 GPUs):
I'm also playing around with https://github.com/jacobtomlinson/dask-hpc-runner which is based on dask/distributed#4710 and #69. My main goal here is to make the My motivating use case is SLURM where we have env vars to get the ranks and a shared filesystem to communicate the scheduler address. It would be nice to leave MPI out of things altogether because there can be consequences to importing There are other places like Databricks where it would also be useful to have this paradigm for side-loading a Dask cluster onto a Spark cluster. |
@jacobtomlinson: Oooh! Excited to hear about progress on the Thanks for chiming in on these suggestions. It sounds like the 3 bulleted configurations you described would be satisfied by the options I suggested above. I'm working on a PR now. |
Good point about assigning one of the GPUs (each) to the scheduler and client. |
Ok. I've been playing around with this today and there are solutions, but I don't think they are as pretty as anyone was hoping. So, I'm sharing my experiences in the hopes that someone might have better ideas than I have. First, it's easy to launch a worker on the same MPI rank as the scheduler. The scheduler and worker are both asynchronous tasks, and therefore they can both be run on the same asynchronous event loop, which is started with the However, the complication comes from the fact that the client code is synchronous. And launching an asynchronous task inside of synchronous code can only be done by starting an event loop with either The only way to resolve this issue is for the event loop that runs in the client process to either: (1) start an event loop in a separate thread or (2) start an event loop in a separate process (i.e., fork or spawn). For option 2, this violates the 1 process per MPI rank rule that I mentioned above, so I think we should exclude it from consideration. For option 1, however, this means starting a thread pool in the client code, starting the event loop in a separate thread, continuing execution of the main thread (i.e., the client code) and then shutting down the thread when the event loop is complete. Option 1 is technically doable, but I feel like it is messy, and running processes on MPI ranks that randomly grab extra threads without being explicit about it seems like a way of creating problems for users later on. ...But it's possible. What do people think about this? |
This is a fun rabbit hole. I spent a ton of time thinking about async-in-sync, sync-in-async, sync-in-async-in-sync, etc recently. To clarify the client code can either be sync or async, but we have no control over how the user decides to implement things so we have to support both. If we are in a situation where we want to start more than one component per rank it's probably because each node is big and a single rank refers to a substantial machine. Do you think it's fair to say in these cases that the "one process per rank" rule will not be enforced? It seems bonkers for an HPC admin to enforce a single process on a machine with 8 GPUs. How on earth do they expect to saturate that hardware? |
I don't think it is true that one MPI rank typically equates to one machine/node. In mixed threading/processing jobs, one MPI rank typically equates to one machine, with the assumption that the cores on the machine will be saturated with the appropriate number of threads. In plain CPU multiprocessing jobs, multiple MPI ranks are assigned to a single machine, typically one MPI rank per core. Obviously, it depends on how the scheduler is configured,but all of my HPC experience has followed that rule. |
To be clear about this, MPI typically doesn't decide where to place its ranks. Or rather, the user does not usually need to tell MPI where to place its ranks. This can be done with something called a machinesfile, which lists the names of the machines on which to launch MPI ranks. Such a machinesfile can look like this:
which would tell MPI to launch 16 ranks on However, in practice, this is almost never done because this process is handled by the HPC resource manager, like Slurm, LSF, PBS, etc. Depending on what you request from the resource manager, it will essentially construct a machinesfile to place multiple MPI ranks across your hardware. Different queue policies can be used to enforce things like "nodes need to be full" (i.e., 1 MPI rank per core) or "nodes can be shared across multiple jobs" or similar. Like I mentioned above, some MPI implementations will actually crash if you try to spawn or fork a process from the process assigned to an MPI rank. While some MPI implementations may allow it, it is generally considered best practice with MPI to assume 1 process per MPI rank. And everything I've just described suggests the dilemma with using threads. The HPC user expects MPI ranks to be placed in a way that uses the entire requested resource (i.e., "fills the nodes"). This gets hard when some processes are creating threads that the user didn't know about when originally requesting the resources. But you make an excellent point @jacobtomlinson about the fact that the client code can be sync or async. This problem goes away entirely if the client code is async because then the client code could be run in the same async event loop as the scheduler and/or worker. However, to make it possible to run async client code, it seems to me that from dask_mpi import initialize
initialize(...) # <-- Start scheduler and workers for MPI ranks other than the client MPI rank
# Client code starts here... where the client code is executed only on the client MPI rank because the worker and scheduler MPI ranks block due to their running event loops. But if the client MPI rank is the same as the scheduler MPI rank, then the scheduler's event loop will block and prevent the execution of the client code. To make it possible to run this asynchronously, one would need to change this to something like: from dask_mpi import initialize
async def client_code():
# Client code starts here
initialize(..., coroutine=client_code()) where the I don't want to spend my whole day just writing up my thoughts/notes, though. So, let me investigate a bit about how one might change the Dask-MPI API to make this kind of operation possible. |
Thanks for the info, I've only ever used machines where node==rank so this is good to know and definitely affects the design here. I'm very keen to move away from the My big concern about letting user code and Dask code share an event loop like this is that if a user accidentally does any blocking code they will very likely run into deadlocks that will be painful to debug. I think it would always be safer to start a new loop on a separate thread and run the Dask components there to avoid them getting deadlocked by user code on the main event loop. |
@jacobtomlinson: Thanks for the reply! I agree that there is enough "magic" going on here that I don't want to charge forward with anything that makes things worse. You are correct about the accidentally blocking user code scenario. I hadn't really thought about that, but it is a problem. And it would be a nightmare to diagnose. I think you are correct that always executing the Dask code in its own event loop is the safest bet. I'll put a crude template in place in the form of a PR and then we can discuss further. |
One thought based on a conversation with @jacobtomlinson: maybe the solution isn't to try and run scheduler and client on the same rank but instead to run more than one rank per node. This would solve my problem of "wasting" a whole node on the scheduler and one more on the client. It might be differently complicated to achieve this, but wanted to mention it. I think for the queue system/cluster I am using I can set |
Ok. I'm working on a solution to this, but I think it needs to change the fundamental way that the
To achieve the current Dask-MPI behavior, where workers, scheduler, and client are all in separate MPI ranks, the def execute(func, *args, **kwargs):
comm = mpi4py.MPI.COMM_WORLD
this_rank = comm.Get_rank()
if this_rank == 0:
async def run_scheduler():
async with Scheduler(...) as scheduler:
comm.Barrier()
await scheduler.finished()
asyncio.run_until_complete(run_scheduler())
elif this_rank == 1:
comm.Barrier()
ret = func(*args, **kwargs)
with Client() as client:
client.shutdown()
return ret
else:
comm.Barrier()
async def run_worker():
async with Worker(...) as worker:
await worker.finished()
asyncio.run_until_complete(run_worker()) There would be an obvious modification for And the above modifications to make it more customizable and make MPI rank placement easier would be easy to implement, too. I particularly like this approach for a number of reasons:
What do people think? |
Using a decorator is an interesting idea. I'm not sure the code above would behave quite right though, a decorator expects to return a function, not the return value of the function. Everything in that example would be called at the time of decoration, not when the user function is called. I wonder if the user's function should expect to take the client as an argument too? |
Sorry for the delay. Yeah. It's not a decorator yet. But it could be easily transformed into one. As for taking the client in as input, I would assume that is the user's choice. The user could pass it in or create it inside the custom function. |
Ok. I'm looking at this a bit today, again. First, I have created a mock-up of the solution I described above in #110. I'd appreciate any comments on that solution in a review, if you have time. In #110, the |
Now that I've had some experience playing around implementing #110, I've discovered a new way of accomplishing the same things. And I like this approach even more. The original intent behind the Now, with #110, we can do all of this with the Now, this brings me to something new. If we exploit Python's built-in def my_client_code():
# define client code here
dask_mpi.execute(my_client_code, **execute_kwargs) to import runpy
dask_mpi.execute(runpy.run_path, my_client_script, **execute_kwargs) where In other words, the use of
to the new way of running things:
This would unify the batch-mode Dask-MPI operation with the interactive (CLI-based) operation. The |
Today I can call I think |
@betatim I agree with you entirely. My intention is to make the
This is really the tricky point, because if you want to use MPI, then you must execute your MPI program with So, for your case, if you have an external framework that (I assume) is not an MPI-based framework, and you want to use Dask-MPI to launch a cluster "on demand" from within the framework, then the only way to do it is to execute a subprocess (e.g., Even Dask-Jobqueue uses subprocess to submit a job to a resource manager (PBS, LSF, etc.). If anyone knows of a way to get around the need for |
Incidentally, the |
Agreed that you need to use |
Great. Then I understand you. And that should be doable with the |
I believe that in some simple cases, we don't need to have a rank dedicated to the Scheduler, and a rank dedicated to the Client. We should provide a way to start both within the same MPI rank in https://github.com/dask/dask-mpi/blob/master/dask_mpi/core.py#L62-L71.
Thoughts?
The text was updated successfully, but these errors were encountered: