Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing MPI communication protocol for Dask #48

Open
aamirshafi opened this issue Apr 3, 2020 · 5 comments
Open

Implementing MPI communication protocol for Dask #48

aamirshafi opened this issue Apr 3, 2020 · 5 comments

Comments

@aamirshafi
Copy link

I am currently experimenting with dask-mpi to replace point-to-point communication between scheduler<-->workers and workers<-->workers to use MPI. The goal is to exploit high-performance interconnects . I am aware of the ucx communication protocol already implemented but the MPI approach will be an alternative to that. Also, I have seen discussions on dask starting MPI jobs (#25) but obviously this post is for doing the opposite.

While starting dask jobs with dask-mpi, the COMM_WORLD has already been initialized in core.py. After that process 0 runs the scheduler code, process 1 runs that client code, and process >= 2 run worker processes. I can also see that the communication code is nicely abstracted out in dask/distributed/comm/ folder (inside dask.distributed). There are three implementations tcp.py, ucx.py, and inproc.py already available. All three of them implement the abstract class Comm. So should I be looking at producing mpi.py as the first step? Somehow pass the COMM_WORLD object to mpi.py and use this communicator for communication. My initial goal is to leave connection establishment as is and only replace data communication with MPI's point-to-point communication. Also there must be some code that multiplexes between these communication protocols based on user's parameters. I would appreciate if I can be pointed to that code.

Also once this is implemented, do we need to do anything special for communication between GPU devices. As per my understanding, GPU devices will be able to communicate as long as the underlying MPI library is CUDA-aware meaning that it supports communication between GPU devices directly. In other words, I am asking if dask-cuda has its own communication mechanism or does it also rely on distributed for its communication.

And also one last clarification please. Is there explicit communication between dask workers? Typically in a loosely coupled systems, workers are only aware of schedulers but going through the code I got this impression that dask workers also communicate with one another. The architecture diagram does not have connections between workers and hence this confusion. And if there is communication between workers, then does mpi.py that we envisaged above enables workers<-->workers communication or scheduler<-->workers communication or both.

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2020

I am currently experimenting with dask-mpi to replace point-to-point communication

Great! I'm excited to see what comes out of this work.

So should I be looking at producing mpi.py as the first step?

Yes.

My initial goal is to leave connection establishment as is and only replace data communication with MPI's point-to-point communication

I'm not sure that you get to make this choice. You'll have to implement the entire Comm interface. Fortunately, connection is trivial in an mpi4py setting. Everything is already connected so you probably don't have to do anything other than store the rank of the other end.

Also there must be some code that multiplexes between these communication protocols based on user's parameters. I would appreciate if I can be pointed to that code.

I'm not sure I understand this. But perhaps you're looking for the protocol= keywords

Also once this is implemented, do we need to do anything special for communication between GPU devices

I don't know. i would defer this question to the mpi4py developers. You might also want to ask the RAPIDS folks.

Is there explicit communication between dask workers?

Yes. Workers communicate peer to peer.

@aamirshafi
Copy link
Author

Thanks for your responses.

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2020 via email

@aamirshafi
Copy link
Author

@mrocklin apart from the initial connection establishment between workers and scheduler, dynamic connections are also made during the execution time. Is it possible to allow just one connection at most between any two peers? Thanks.

@aamirshafi
Copy link
Author

We have an update to this on dask/distributed#4461

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants