Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-MPI backends (e.g. Slurm) #97

Open
lgarrison opened this issue Oct 28, 2022 · 22 comments
Open

Non-MPI backends (e.g. Slurm) #97

lgarrison opened this issue Oct 28, 2022 · 22 comments

Comments

@lgarrison
Copy link
Contributor

Since a "major makeover" was mentioned for dask-mpi in dask/distributed#7192, I thought I would ask if support for non-MPI backends was a possibility. That is, use something like Slurm's srun to launch the ranks even if MPI is not available.

The reason I think this may be possible is because as far as I can tell, MPI is only being used to broadcast the address of the scheduler to the workers. Afterwards, it seems to me that any communication is done via direct TCP. If this is correct, could the startup be achieved instead by, e.g., writing the address to a file in a shared location?

The benefit of this is the elimination of a fairly heavyweight MPI dependency. When users start mixing, e.g. a conda install of MPI with a Lmod install of MPI, their jobs often get confused and fail. It seems like dask-mpi has a chance to sidestep these issues completely.

@mrocklin
Copy link
Member

mrocklin commented Oct 28, 2022 via email

@lgarrison
Copy link
Contributor Author

I'm actually looking for something that can run a dask cluster within an existing Slurm allocation. My understanding is that jobqueue submits Slurm jobs itself; my use-case is that I already have an allocation, and I'd like to start up dask within it.

Taking a step back, I'd like something that can dynamically schedule generic Python tasks within a given allocation of resources. For the most part, I'm interested in completely independent jobs, although task graph capabilities might be useful in the future too. I think dask-mpi is the closest thing I've found so far, but I'm open to suggestions.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 28, 2022

@lgarrison: Yes. Dask-MPI is just a convenience function. As you point out, MPI is not used for client-scheduler-worker communication. MPI is only used to communicate the scheduler address to the worker processes (and the client process, if used with the initialize function). Earlier implementations of Dask-MPI actually required a shared filesystem because the scheduler shared its address with the other processes through use of the --scheduler-file option. But we ran into problems with this because some users had MPI-enabled clusters without shared filesystems. It seems to me that we need to support both of those use cases going forward (and it currently does).

It seems to me that the feature you are looking for would only require that Dask-MPI not require mpi4py. And if mpi4py was not found, then to restrict the input parameters to make it possible to run with non-MPI-communication mechanisms (i.e., require --scheduler-file or similar).

Does that accurately describe what you are looking for?

@lgarrison
Copy link
Contributor Author

It seems to me that the feature you are looking for would only require that Dask-MPI not require mpi4py

Yes, I think that's exactly right. I'll test out the --scheduler-file mechanism to confirm that this is the case. Thanks!

@jacobtomlinson
Copy link
Member

I attempted a pull out a base class from dask-mpi a while ago to make it easier to build alternative implementations with different backends.

distributed #4710
dask-mpi #69

The core difference is that folks want to take an existing allocation and fill it with a Dask cluster like dask-mpi does, rather than create the whole thing from scratch like dask-jobqueue does.

However, that draft implementation met some friction from other maintainers and I didn't pursue it any further. But maybe this is a signal that there is demand for this and the distinction is valuable?

@lgarrison
Copy link
Contributor Author

I did realize that one other piece is needed to get this to work, besides the scheduler-file option, which is a way for processes to identify their rank. In a Slurm context, this means that processes need to read the SLURM_PROCID environment variable. Easy enough, but there will need to be an abstraction where different bootstrap contexts can define how they determine the rank.

@jacobtomlinson, it looks like you've already worked on related abstractions. Are we on the right track here? Is it okay to implement "Slurm-awareness" at the dask-mpi level, or is this something that belongs higher up in distributed?

@jacobtomlinson
Copy link
Member

This sounds like a good track but I don't think it belongs in dask-mpi given that it isn't MPI related. It also isn't best for it to go in distributed. Maybe dask-jobqueue is the right location given that it is job queue specific functionality (cc @guillaumeeb)? If not then a separate package.

This is why I tried to abstract things out in distributed so that other projects could reuse things.

@lgarrison
Copy link
Contributor Author

I agree it's a bit odd to have a package named dask-mpi support non-MPI bootstrapping, but the amount of Slurm stuff that would have to go in dask-mpi is so minimal that it seems overkill to loop in another package. The entire abstraction probably looks like:

class SlurmBootstrapper(Bootstrapper):
    def get_rank(self, comm=None):
        return os.environ['SLURM_PROCID']
    def supports_bcast(self):
        '''Do we have MPI broadcast support, or do we have to use scheduler-file?'''
        return False

class MPIBootstrapper(Bootstrapper):
    def get_rank(self, comm=None):
        return comm.get_rank()
    def supports_bcast(self):
        return True
    def bcast(self, comm, *args, **kwargs):
        comm.bcast(*args, **kwargs)

This could also be an interesting use case for entry points, which would allow an external package to register a bootstrapping interface. But it still seems like a shame to split off another package since the packaging overhead would be so much larger than the useful code itself.

Plus, an important part of this (for my use case, at least) is to have the mpi4py dependency be optional, since it's not such a lightweight library and other mechanisms could do its job. I realize this needs more discussion, but if dask-mpi is willing to forgo a mandatory MPI dependency (as I think @kmpaul was suggesting), then natively supporting common job runners might not be so out-of-place.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Nov 2, 2022

I agree it's a shame for such a small amount of code to cause this discussion, but I'm really not sure this proposal makes sense.

Given that this whole library is like ~200 lines of code I don't think there would be much pushback to making a PR into dask-jobqueue that implements a slurm runner and copy/pastes a chunk from here (although ideally that common chunk would be upstreamed into distributed). Both dask-jobqueue and distributed already exist and seem like the obvious places to put this logic, so there is no packaging overhead.

if dask-mpi is willing to forgo a mandatory MPI dependency

This seems like a pretty unusual ask. Making MPI optional in the dask-mpi package so that non-MPI things can be put here doesn't make sense to me.

@lgarrison
Copy link
Contributor Author

dask-jobqueue seems to have minimal dependencies, so on that front I'd be happy enough if the dask-mpi framework were copy-pasted over there. It just seemed orthogonal to what dask-jobqueue wants to do, which is launch and manage its own Slurm jobs.

Perhaps more importantly, I can see many users upon being directed to the dask-jobqueue package getting confused and using the job-submission functionality, rather than the dask-mpi-like functionality. The pattern I'm trying to encourage is to request a single multi-node allocation, then do dynamic dispatch inside that. I don't think this is so crazy; many HPC schedulers won't even consider more than a handful of queued jobs at a time for scheduling (and have a jobs-per-user max), and dask-jobqueue only submits single-node jobs, so multi-node allocations can be far more efficient for both the user and the scheduler. Indeed, this has been discussed in dask-jobqueue, and it seems they are pretty much set on single-node submissions: dask/dask-jobqueue#364 (comment). In that same comment, they point users to dask-mpi for this multi-node allocation case.

This seems like a pretty unusual ask. Making MPI optional in the dask-mpi package so that non-MPI things can be put here doesn't make sense to me.

I know this seems unusual, but I think it's more because dask-mpi happens to have "MPI" in the name rather than anything fundamental about the package. And maybe that's a good enough reason---I agree it would cause some confusion. Hopefully nothing some informative error messages couldn't smooth over, though.

But if we are at the point of copy-pasting dask-mpi, then maybe it makes the most sense for me to do that in my own repo, rather than try to shoehorn it into dask-jobqueue. Happy to hear what others think.

@guillaumeeb
Copy link
Member

Hi everyone,

Yes, things like that have been discussed in the past. In the HPC world, between dask-jobqueue and dask-mpi functionalities, there are multi nodes allocations (subject of this discussion), and job arrays possibilities. There are several other threads in dask-jobqueue: dask/dask-jobqueue#459, dask/dask-jobqueue#196, dask/dask-jobqueue#84 (comment). In dask-jobqueue, we've reached to the conclusion that it would introduce to much complexity and specific code for each job scheduling system.

In any case, I think it would be really nice to have such Dask cluster deployment method, a lot of HPC system admin (and users) would be glad of this. The thing is, job schedulers are so specific in these matters that it seems complicated to me to find the correct abstraction and place for a new dask-hpc-or-multinode package. Slurm is really powerful in this multi nodes domain, it would almost make sense to have a dask-slurm package, that uses allocations and srun commands!

So in my opinion, you should start in a new repo, I would be happy to follow your development and see if things can be generalized in one way or another! Maybe in the end dask-mpi would become a specific implementation of a dask-multinode package?

@jacobtomlinson
Copy link
Member

That all sounds reasonable. I think the key difference between dask-mpi and other deployment packages is that it runs inside an existing multi-node allocation and just fills it with a Dask cluster. I think this deployment paradigm is underserved currently. I'm going to continue referring to this paradigm as a "dask cluster runner" as opposed to the dask-jobqueue paradigm which we typically call a "dask cluster manager".

I have concerns about orthogonality in all the deployment packages we have when we talk about introducing more runners.

The SLURM runner you are proposing is similar to dask_jobqueue.SlurmCluster but different enough that the suggestion of putting it in a new package seems sensible. But I want to push back on that some more.

I also want to have a runner for various cloud environments, I think Azure ML is the example I used in my original PR about this. We already have the cluster manager dask_cloudprovider.azure.AzureMLCluster so this would be similar to that but the user would submit a multi-node job rather than the cluster manager. Is that different enough that maybe it shouldn't live in dask_cloudprovider?

In that case, does it make sense for a SLURM runner and Azure ML runner to live in one new package called dask-multinode? My feeling is no. The Azure ML runner will probably benefit more from being in dask_cloudprovider as it would allow us to share cloud-specific code and utilities.

I assume from @guillaumeeb's response that there may not be as much benefit of colocating the SLURM cluster manager and cluster runner in the same package. Maybe code reuse between them would be low? But I think there are other compelling cases for cloud, kubernetes, etc where this colocation does have benefits, especially around package dependencies.

I don't think putting this in dask-mpi makes sense, and I don't think lumping all the runners in a new package called dask-multinode or similar makes sense.

We also don't want to create a new package for every new runner, that would be a pain.

That leaves colocating them with existing cluster managers that use the same resource schedulers and dependencies, even if that is a shift for the package's remit currently.

@guillaumeeb
Copy link
Member

I think the key difference between dask-mpi and other deployment packages is that it runs inside an existing multi-node allocation and just fills it with a Dask cluster

I assume from @guillaumeeb's response that there may not be as much benefit of colocating the SLURM cluster manager and cluster runner in the same package. Maybe code reuse between them would be low?

I didn't thought about all that, given that I've not looked a lot into dask-mpi internals, or even use it much. The most part of dask-jobqueue code is about generating and submitting job scripts. In dask-mpi, and this proposal, the user is responsible for writing the job script, and he just want to use some wrapper code in it to start a Dask cluster. Indeed, that is a big difference.

This is also something that dask-mpi is lacking: being able to call cluster = MPICluster(...) inside a notebook would be a really nice feature, but this is another issue.

Back to the subject here, the proposal is really much closer to dask-mpi paradigm. Maybe we should have a dask-hpc package that host dask-mpi and other runners?

@jacobtomlinson
Copy link
Member

I agree the deployment paradigm is a big difference, but I think that's ok. If you're worried about maintaining a SLURM runner in dask-jobqueue falling on your shoulders all I can do if offer to help maintain it.

Maybe we should have a dask-hpc package that host dask-mpi and other runners?

This is tempting, we could just rename this repo and then put the SLURM runner in here too. Packaging would get more complex because MPI would need to be made optional.

However I worry that users would get confused between dask-jobqueue and dask-hpc and what the differences are.

Also what about cloud/kubernetes/hadoop/etc? If we want runners for those it doesn't make sense for those to go in a dask-hpc package.

@guillaumeeb
Copy link
Member

If you're worried about maintaining a SLURM runner in dask-jobqueue falling on your shoulders all I can do if offer to help maintain it.

Not at all! You've seen over the years that I'm not always available to maintain things, so I won't feel the responsibility for this 😄.

I just think I'm not seeing yet what really is a runner, or at least this one, and how it would integrate into dask-jobqueue.

Do you think we could share part of the runner code and use it in SlurmCluster internals at some point? This would make a lot of sense then. If not, I'm curious to see how you would organize things.

Maybe the development should still be started in another repository so that we can see how it can fit later on in a existing repo? Maybe you're afraid in that case that we never merge it in an existing package...

@lgarrison
Copy link
Contributor Author

Maybe we should have a dask-hpc package that host dask-mpi and other runners?

This is tempting, we could just rename this repo and then put the SLURM runner in here too. Packaging would get more complex because MPI would need to be made optional.

This would fit well with the use case I had in mind. I agree the dependency management would be more complex, but maybe specifying optional dependencies like pip install dask-hpc[slurm] or pip install dask-hpc[mpi] would help keep things organized.

Also what about cloud/kubernetes/hadoop/etc? If we want runners for those it doesn't make sense for those to go in a dask-hpc package.

If I understand correctly, the concern is that you want to be able to reuse, e.g., cloud-specific utilities that live in dask_cloudprovider. In that case, couldn't dask_cloudprovider be a dependency in the dask-hpc[cloud] optional dependencies set?

However I worry that users would get confused between dask-jobqueue and dask-hpc and what the differences are.

I think this is valid; understanding the difference between dask-jobqueue and dask-mpi was initially a hurdle for me. But the documentation was pretty good in both repos, so I got up to speed pretty fast.

Using cloud as an example, there's a tension between encouraging code reuse (keeping all cloud things in one repo) and keeping the cluster-manager and cluster-runner workflows separate (two repos with cloud things). But I would argue that the code reuse problem has a decent technical solution, which is to keep them in separate packages and use optional dependencies. The manager vs runner "educational" problem benefits from having two different repos that clearly serve two different workflows, I think.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Nov 4, 2022

I just think I'm not seeing yet what really is a runner, or at least this one, and how it would integrate into dask-jobqueue.

The way I see it the cluster managers that we have inside dask-jobqueue create jobs and tell them exactly what to do. They submit a scheduler, submit workers individually and tell them how to coordinate. The cluster is actively created and managed from the outside, usually within the client code.

The runner in dask-mpi as well as the SLURM runner being proposed here is basically a function that gets run by every job in a multi-job allocation and they coordinate between themselves to figure out who runs the client code, who runs the scheduler, who runs workers. The MPI solution uses ranks and broadcasts to do this coordination. This SLURM proposal suggests using the SLURM_PROCID env var and a scheduler file on a shared filesystem to do the coordination. Here the cluster is managed from the inside.

It's sort of top-down cluster creation vs bottom-up.

For integration, it's mostly just namespacing. I think from dask_jobqueue import SLURMRunner looks more reasonable than from dask_mpi import SLURMRunner.

Do you think we could share part of the runner code and use it in SlurmCluster internals at some point?

Yeah I already proposed this in dask/distributed#4710 and we could revisit that again for sure. Part of the challenge is that a runner is a trivially small amount of code compared to a cluster manager. The runner in dask-mpi is less than 200 lines. So abstracting it out seemed premature optimisation.

In that case, couldn't dask_cloudprovider be a dependency in the dask-hpc[cloud] optional dependencies set?

Dask cloudprovider is a gigantic package with many optional dependency sets already. Making that a dependency of dask-hpc feels wrong to me. Especially given that the cloud runner implementations in dask-hpc would be tiny. Plus the naming makes no sense.

If I understand correctly, the concern is that you want to be able to reuse, e.g., cloud-specific utilities that live in dask_cloudprovider

This is only part of my concern. My other concern is that I think users would expect to find cloud-related tools in the dask-cloudprovider package. Putting them in a dask-hpc package seems confusing and putting them in a new dask-cloudrunner package also seems confusing and adds more maintenance burden.

Maybe the development should still be started in another repository so that we can see how it can fit later on in a existing repo?

This makes the most sense to me as the next course of action. Maybe @lgarrison could create lgarrison/dask-slurm and just copy/modify dask-mpi to do what you need. Then we could discuss upstreaming it somewhere at a later date.

I think key candidates would be:

  • Keep it separate and move it to dask-contrib
  • Merge it into dask-jobqueue
  • Create a new dask-hpc and merge both dask-slurm and dask-mpi into that
  • Something else?

But we can kick that down the road and maybe discuss it at a Dask community meeting with other maintainers.

@kmpaul
Copy link
Collaborator

kmpaul commented Nov 4, 2022

Okay. I was unavailable the last couple of days to respond, so I recognize that I'm coming back to this conversation a bit late. I wanted to add some thoughts to the discussion, at the risk of throwing a spanner in the works.

As has been pointed out, it only uses MPI to determine the "rank" of each running process and to broadcast the scheduler's address to the workers. Also as pointed out already, Dask-MPI originally didn't even broadcast the scheduler address! In those early days, I always saw Dask-MPI really as a way of automatically running/launching dask-scheduler and dask-worker on distributed processes with a single command. In essence, I've always believed that Dask-MPI's dependence on MPI was actually very weak, but that Dask-MPI's dependence on mpirun/mpiexec/srun was quite strong.

I vaguely remember (in those early days) a discussion around ways of reliably determining the "process rank" (using MPI terminology) without needing to rely on mpirun or mpiexec, but no reliable method was uncovered. That led to the stronger dependence on MPI, but I'm still interested in a way of abstracting the "process-rank discovery problem."

Now, even if the "process-rank discovery problem" can be abstracted well, that still leaves a robust mechanism for sharing the scheduler address with the workers without MPI or a shared filesystem. In general, this is all part of "service discovery," but in some sense Dask-MPI exists because "service discovery" is usually discouraged (and sometimes banned) on HPC systems.

So, finally getting back to the discussion at hand, I see the direction of this project being consistent with @lgarrison's initial suggestion (above). But I would say that Dask-MPI was an MPI-specific, early attempt at solving the general problem. I agree that any attempts that take a big step in that direction should probably be created in a new repo.

As for the repo/project name... Since what we are trying to discuss is just a "single-command launcher" for both dask-scheduler and dask-worker... and because "1 scheduler + N workers" = "1 cluster", perhaps it should just be named dask-cluster? Thought that starts to confuse the already existing terminology for a Cluster in Dask.

@jacobtomlinson
Copy link
Member

Interesting points @kmpaul.

To thow another spanner in the Dask team at NVIDIA is exploring a more powerful "single-command launcher" for workers. Today we have dask-worker for CPU and dask-cuda-worker for GPU and we are looking to unite those somehow with some more flexible nanny implementation.

I played with this a while ago in dask-agent but we will likely start from scratch and everything is up for grabs including the name.

Maybe this would be an interesting place to also explore launching schedulers and handling this coordination.

@LTMeyer
Copy link

LTMeyer commented Jul 16, 2024

Hello,
I was wondering if there was any update on this matter. Is there finally any recommendation on how to use dask on slurm allocated resources?

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jul 23, 2024

@LTMeyer things are still a work in progress. I recommend you track dask/dask-jobqueue#638 which I will be working on in the near future.

@LTMeyer
Copy link

LTMeyer commented Jul 23, 2024

@LTMeyer things are still a work in progress. I recommend you track dask/dask-jobqueue#638 which I will be working on in the near future.

Thank you @jacobtomlinson. Actually, @lgarrison pointed me to the repo you both contributed to (https://github.com/jacobtomlinson/dask-runners), which is also the subject of the PR you mentioned. It perfectly answered my need and I was able to use dask on Slurm allocated resources as I wanted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants