Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launching dask-mpi clusters #44

Open
rcthomas opened this issue Nov 24, 2019 · 24 comments
Open

Launching dask-mpi clusters #44

rcthomas opened this issue Nov 24, 2019 · 24 comments

Comments

@rcthomas
Copy link

Is this supported? We have the ability (through Slurm) to srun ... dask-mpi... which will launch the cluster on a queue set aside for interactive work. From the command line it's a single command. Other facilities may not have this capability (it may require a batch script).

We're willing to take a look into how to make this work if it's needed. cc @shreddd

@mrocklin mrocklin transferred this issue from dask/dask-labextension Nov 24, 2019
@mrocklin
Copy link
Member

Moving this to dask/dask-mpi .

This isn't currently supported, but it could be. In principle we need to create a class that meets some of the interface in https://github.com/dask/distributed/blob/master/distributed/deploy/cluster.py

If you wanted to get a start on this, I would recommend subclassing from Cluster, and implementing _start and _close. I suspect that that will get you most of the way there.

@mrocklin
Copy link
Member

I think also that @andersy005 and @kmpaul may have had something like this in the past. I'm not seeing it now though, so I'm curiuos if maybe I removed it when we redid things for 2.0. (although the old version probably wouldn't have met the current-day constraints of the dask.distributed.Cluster interface)

@kmpaul
Copy link
Collaborator

kmpaul commented Nov 25, 2019

I feel like some context was lost in the transfer of this issue to dask-mpi...? Not sure what exactly is being asked.

@rcthomas I hate to ask you to repeat yourself, but can you describe what kind of functionality (exactly) you are asking about?

@kmpaul
Copy link
Collaborator

kmpaul commented Nov 25, 2019

Is this related to #9 in any way?

@mrocklin
Copy link
Member

I think that the ask here is for a Python class that follows the Cluster interface so that it can be integrated into the JupyterLab "new cluster" button, as is depicted here: https://youtu.be/EX_voquHdk0

@kmpaul
Copy link
Collaborator

kmpaul commented Nov 25, 2019

Ah! Yes. This would be very cool. ...Let's talk about what this would look like, then, because I'm actually wondering if this is more of a fit for Dask-Jobqueue.

@rcthomas
Copy link
Author

Thanks for moving this to the right place. We're looking to enable both dask-jobqueue and dask-mpi style interactive HPC. We have a configuration that allows interactive use via either salloc or srun. It looks like dask-jobqueue sets up a job and submits it, here we'd want to do the same but there's no batch script involved. I'm sure this can be accommodated, it's just a question of where...

@kmpaul
Copy link
Collaborator

kmpaul commented Nov 25, 2019

@rcthomas Is there a difference between the two approaches, other than that one uses a batch script and the other doesn't? In other words, would a user notice a difference if one of these approaches or the other was implemented behind the "new cluster" button?

@rcthomas
Copy link
Author

That is one difference and it is not clear to me that it's something that needs to be included or whether it can be handled via abstraction. Another difference is vanilla dask-jobqueue has a scheduler running somewhere already and workers are added or come and go. With vanilla dask-mpi the scheduler and workers all start up at once. This latter difference may not matter but I'm not that familiar with the dask-labextension to know what matters.

@mrocklin
Copy link
Member

A minimal implementation will probably look like this ...

class MPICluster(dask.distributed.Cluster):
    async def _start(self):
        self.proc = subprocess.popen("dask-mpi", ...)
        await scheduler_has_started_up()

    async def _close(self):
        self.proc.close()

And then you'll add a config file like the following:

labextension:
  factory:
    module: 'dask_mpi'
    class: 'MPICluster'
    args: []
    kwargs: {}

I'm sure that it will be more complex than that, but in theory that should work.

@rcthomas
Copy link
Author

rcthomas commented Dec 6, 2019

I've been studying the code a bit and I have a few questions and comments. To make things a bit more concrete I'm looking to encapsulate something like the following. Many examples will be less complicated but we want to make a more user friendly way to accomplish something like the following real (working!) example:

salloc \
    -N 40 \
    -n 1280 \
    -c 2 \
    -t 60 \
    -C haswell \
    -q interactive \
    -A m3384 \
    --image=registry.services.nersc.gov/rthomas/nersc-dask-example:v0.1.0 \
    srun -u shifter \
        /opt/conda/bin/python -u \
        /opt/conda/bin/dask-mpi \
            --scheduler-file=scheduler.json \
            --dashboard-address=0 \
            --nthreads=1 \
            --memory-limit=0 \
            --no-nanny \
            --local-directory=/tmp

So @kmpaul if dask-jobqueue can be made to do that then OK (there is just no concept of a batch script here).

The next thing I wondered was whether @mrocklin you really meant to say SpecCluster as the base class and not Cluster?

Taking a step back though all I want to do is subprocess something like that command line that starts a cluster, but it could be started with straight-up srun if say, Jupyter is running on a compute node already. That (starting a cluster via subprocess) is probably happening in quite a few different Cluster implementations but I wonder if that ought to just be a base class itself? My plan was to just make something that does that first, and then move on to getting the MPI and job launcher stuff right.

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2019

I meant Cluster. SpecCluster is based around running many individual worker objects, which you won't have. You have one big job, rather than a configuration of many jobs.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 9, 2019

@rcthomas Yes. I think you are correct that this doesn't fit into the Dask-Jobqueue paradigm. This cluster object should probably be defined in Dask-MPI using the subprocess algorithm like the one @mrocklin suggests.

@kmpaul
Copy link
Collaborator

kmpaul commented Dec 19, 2019

@rcthomas Ok. A question for you about what you actually need.

The command that you quoted above. Is this what you want to run when you click on the Dask Labextension button?

salloc \
    -N 40 \
    -n 1280 \
    -c 2 \
    -t 60 \
    -C haswell \
    -q interactive \
    -A m3384 \
    --image=registry.services.nersc.gov/rthomas/nersc-dask-example:v0.1.0 \
    srun -u shifter \
        /opt/conda/bin/python -u \
        /opt/conda/bin/dask-mpi \
            --scheduler-file=scheduler.json \
            --dashboard-address=0 \
            --nthreads=1 \
            --memory-limit=0 \
            --no-nanny \
            --local-directory=/tmp

Or is this what you want to run when you click the Dask Labextension button?

    srun -u shifter \
        /opt/conda/bin/python -u \
        /opt/conda/bin/dask-mpi \
            --scheduler-file=scheduler.json \
            --dashboard-address=0 \
            --nthreads=1 \
            --memory-limit=0 \
            --no-nanny \
            --local-directory=/tmp

If you want to run the former command, then it falls within the purview of Dask-Jobqueue.
And if you really need to avoid creating a batch script and submitting it via SLURM (i.e., you really want to run the single command without a batch script), then I am sure a modification to Dask-Jobqueue's SLURMCluster can be made to accommodate this.

However, if you want to run the latter command, then it falls within the purview of Dask-MPI, and all we would need to do is create the MPICluster class as @mrocklin recommends.

What makes the most sense to you?

@rcthomas
Copy link
Author

rcthomas commented Feb 27, 2020

@kmpaul I am so sorry it's taken me 2 months to come back to this.

You've got a point. Backing up just a bit we salloc here because -q interactive doesn't accept submission via sbatch. I'm not sure how typical this is for everyone using Slurm. If it's OK, then let's go for putting it into Dask-Jobqueue.

I'll note that in principle the 2 versions of the command (salloc... srun... and srun...) will do the same thing from our login nodes (ok a diff is you have to put --image on the srun in the second case). But the Shifter Slurm integration (as you get with salloc) ensures the images are in place before the command runs.

I'm in the same room as @lesteve at the Dask Developer workshop and so we'll talk about this tomorrow.

@rcthomas
Copy link
Author

rcthomas commented Mar 2, 2020

We took a look at trying to adapt dask-jobqueue for salloc and it won't work. We're able to submit the job but dask-jobqueue looks for the job ID when the submit command returns. Of course here it doesn't ever return till the job times out. I'm looking at options, which now look like the original course I wanted to follow or an internal conversation about queue policy here.

@kmpaul
Copy link
Collaborator

kmpaul commented Mar 2, 2020

@rcthomas No worries about getting busing with other things! I completely understand.

The fact that salloc does not return immediately is definitely a complication, but I think there's a way of making it work. It's just a little more complicated.

In standard Slurm Dask-Jobqueue, the sbatch command is run with the Dask-Jobqueue's Job._call(...) function. The Job._call function uses subprocess.Popen to run the sbatch command, which calls process.communicate() to get the stdout and stderr of the process. My guess is that this is where everything hangs when you run salloc, because salloc doesn't complete!

So, instead, I tried to see if you could get the Slurm job ID from stderr before the salloc process completes. I was able to do it, and here's the sample script that demonstrates the process in Python (running on NCAR's Casper, which uses Slurm):

from subprocess import Popen, PIPE
from shlex import split

cmd = "salloc --account=NIOW0001 --ntasks=1 --time=00:02:00 --partition=dav sleep 10"

print(f'Running command: "{cmd}"')
p = Popen(split(cmd), stdout=PIPE, stderr=PIPE)

print('Looping until first output to stderr (or command completes)...')
while True:
    err = p.stderr.readline().decode().strip()
    if err or p.poll() is not None:
        break

if err:
    print(f'First output to stderr: "{err}"')
else:
    print('No output found on stderr.')

print('Waiting until command finishes...')
out, err = p.communicate()
print('Command finished.')
print()
print('STDOUT:')
print(out.decode())
print()
print('STDERR:')
print(err.decode())

On NCAR's Casper system, the output I get from this script looks like this:

Running command: "salloc --account=NIOW0001 --ntasks=1 --time=00:02:00 --partition=dav sleep 10"
Looping until first output to stderr (or command completes)...
First output to stderr: "salloc: Granted job allocation 4375428"
Waiting until command finishes...
Command finished.

STDOUT:


STDERR:
salloc: Waiting for resource configuration
salloc: Nodes casper21 are ready for job
salloc: Relinquishing job allocation 4375428

Would something like this work? You'd have to create a new Dask-Jobqueue class that held on to the subprocess.Popen object, instead of discarding it like the current Job._call() method does. But it seems to me that something like the above would let you get the Slurm Job ID by processing the 1st line returned on stderr from the salloc command. The, most of the rest of the Dask-Jobqueue code should work... ?

@kmpaul
Copy link
Collaborator

kmpaul commented Mar 2, 2020

@rcthomas That said, I still think that getting the srun ... dask-mpi ... command to work with something like an MPICluster object is a great idea, too. I think both options are worth doing. Unfortunately, I've got some timely issues taking up my free cycles right now, and I don't think I could look at this until mid-April.

@jshleap
Copy link

jshleap commented Jun 8, 2020

Hello all! any advances in this "issue"?

@kmpaul
Copy link
Collaborator

kmpaul commented Jun 8, 2020

No progress yet. Is this a priority for you? I can try to accelerate this, but many things have come up over the last few months that have gotten in the way.

@rcthomas
Copy link
Author

rcthomas commented Jun 8, 2020

We're heading in a different direction from salloc in the long run FWIW

@kmpaul
Copy link
Collaborator

kmpaul commented Jun 8, 2020

@rcthomas Because of the issues above? Or because of another reason?

@jshleap
Copy link

jshleap commented Jun 8, 2020

@kmpaul I guess I can use mprun in the meantime, but srun is the recommended command for SLURM. @rcthomas you mean away from running srun in salloc? If so, why? would that be also for sbatch?

@rcthomas
Copy link
Author

rcthomas commented Jun 8, 2020

@kmpaul the queue we're putting these clusters into isn't really the right place for them. We've got another plan that's more in line with vanilla dask-jobqueue. Some kind of MPICluster object would still be something we'd want to work on.

@jshleap yes, away from the salloc-srun setup (see above) and towards just sbatch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants