diff --git a/README.md b/README.md index 94d710f..68555f4 100755 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Support for different job queue systems commonly used on compute clusters. | Job queue system | Command to add processors | | ---------------- | ------------------------- | -| Load Sharing Facility (LSF) | `addprocs_lsf(np::Integer, flags=``)` or `addprocs(LSFManager(np, flags))` | +| Load Sharing Facility (LSF) | `addprocs_lsf(np::Integer; bsub_flags=``, ssh_cmd=``)` or `addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle))` | | Sun Grid Engine | `addprocs_sge(np::Integer, queue="")` or `addprocs(SGEManager(np, queue))` | | SGE via qrsh | `addprocs_qrsh(np::Integer, queue="")` or `addprocs(QRSHManager(np, queue))` | | PBS | `addprocs_pbs(np::Integer, queue="")` or `addprocs(PBSManager(np, queue))` | @@ -111,8 +111,8 @@ command to bypass the filesystem and captures STDOUT directly. ### Load Sharing Facility (LSF) -`LSFManager` supports IBM's scheduler. Similar to `QRSHManager` in that it -uses the `-I` (i.e. interactive) flag to `bsub`. +`LSFManager` supports IBM's scheduler. See the `addprocs_lsf` docstring +for more information. ### Using `LocalAffinityManager` (for pinning local workers to specific cores) diff --git a/src/lsf.jl b/src/lsf.jl index 4e2c6cd..b5833a1 100755 --- a/src/lsf.jl +++ b/src/lsf.jl @@ -3,6 +3,45 @@ export LSFManager, addprocs_lsf struct LSFManager <: ClusterManager np::Integer bsub_flags::Cmd + ssh_cmd::Cmd + retry_delays + throttle::Integer +end + +struct LSFException <: Exception + msg +end + +function bpeek(manager, jobid, iarray) + old_stderr = stderr + rd,_ = redirect_stderr() + try + io = open(`$(manager.ssh_cmd) bpeek $(jobid)\[$iarray\]`) + success(io) || throw(LSFException(String(readavailable(rd)))) + return io + finally + redirect_stderr(old_stderr) + end +end + +function _launch(manager, launched, c, jobid, iarray) + config = WorkerConfig() + + io = retry(()->bpeek(manager, jobid, iarray), + delays=manager.retry_delays, + check=(s,e)->occursin("Not yet started", e.msg))() + port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)" + for line in eachline(io) + mm = match(port_host_regex, line) + isnothing(mm) && continue + config.host = mm.captures[2] + config.port = parse(Int, mm.captures[1]) + break + end + config.userdata = `$jobid\[$iarray\]` + + push!(launched, config) + notify(c) end function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition) @@ -16,16 +55,15 @@ function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition jobname = `julia-$(getpid())` cmd = `$exename $exeflags $(worker_arg())` - bsub_cmd = `bsub -I $(manager.bsub_flags) -cwd $dir -J $jobname "$cmd"` + bsub_cmd = `$(manager.ssh_cmd) bsub $(manager.bsub_flags) -cwd $dir -J $(jobname)\[1-$np\] "$cmd"` - stream_proc = [open(bsub_cmd) for i in 1:np] + line = open(readline, bsub_cmd) + m = match(r"Job <([0-9]+)> is submitted", line) + jobid = m.captures[1] - for i in 1:np - config = WorkerConfig() - config.io = stream_proc[i] - push!(launched, config) - notify(c) - end + asyncmap((i)->_launch(manager, launched, c, jobid, i), + 1:np; + ntasks=manager.throttle) catch e println("Error launching workers") @@ -35,7 +73,38 @@ end manage(manager::LSFManager, id::Int64, config::WorkerConfig, op::Symbol) = nothing -kill(manager::LSFManager, id::Int64, config::WorkerConfig) = kill(config.io) +kill(manager::LSFManager, id::Int64, config::WorkerConfig) = remote_do(exit, id) + +""" + addprocs_lsf(np::Integer; + bsub_flags::Cmd=``, + ssh_cmd::Cmd=``, + retry_delays=ExponentialBackOff(n=10, + first_delay=1, max_delay=512, + factor=2), + throttle::Integer=np, + params...) = + +Launch `np` workers on a cluster managed by IBM's Platform Load Sharing +Facility. `bsub_flags` can be used to pass flags to `bsub` that are specific +to your cluster or workflow needs. `ssh_cmd` can be used to launch workers +from other than the cluster head node (e.g. your personal workstation). +`retry_delays` is a vector of numbers specifying in seconds how long to +repeatedly wait for a worker to start. `throttle` specifies how many workers +to launch at once. + +# Examples -addprocs_lsf(np::Integer, bsub_flags::Cmd=``; params...) = - addprocs(LSFManager(np, bsub_flags); params...) +``` +addprocs_lsf(1000; ssh_cmd=`ssh login`, throttle=10) +``` +""" +addprocs_lsf(np::Integer; + bsub_flags::Cmd=``, + ssh_cmd::Cmd=``, + retry_delays=ExponentialBackOff(n=10, + first_delay=1, max_delay=512, + factor=2), + throttle::Integer=np, + params...) = + addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle); params...) diff --git a/test/runtests.jl b/test/runtests.jl index 1aaa803..e4ccfbd 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,9 +1,9 @@ using Test using ClusterManagers - -TIMEOUT = 10. +using Distributed @testset "ElasticManager" begin + TIMEOUT = 10. em = ElasticManager(addr=:auto, port=0) @@ -15,4 +15,28 @@ TIMEOUT = 10. length(em.active) == 1 end + wait(rmprocs(workers())) +end + + +@static if Sys.iswindows() + windows_which(command) = `powershell.exe -Command Get-Command $command` + is_lsf_installed() = success(windows_which("bsub.exe")) +else + is_lsf_installed() = success(`which bsub`) +end + +if is_lsf_installed() + +@testset "LSFManager" begin + p = addprocs_lsf(1, bsub_flags=`-P scicompsoft`) + @test nprocs() == 2 + @test workers() == p + @test fetch(@spawnat :any myid()) == p[1] + @test remotecall_fetch(+,p[1],1,1) == 2 + rmprocs(p) + @test nprocs() == 1 + @test workers() == [1] +end + end