Skip to content

Commit

Permalink
Fix clustermanager for 0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
barche committed Aug 2, 2018
1 parent c7ee281 commit 0c47e18
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 51 deletions.
7 changes: 2 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ os:
- linux
- osx
julia:
- 0.6
- 0.7
- nightly
notifications:
Expand All @@ -21,8 +20,6 @@ before_install:
# Work around OpenMPI attempting to create overly long temporary
# file names - and erroring as a result
- export TMPDIR=/tmp
script:
- julia -e 'if VERSION < v"0.7.0-DEV.5183"; Pkg.clone(pwd()) ; else; using Pkg; Pkg.up(); end; Pkg.build("MPI"); Pkg.test("MPI"; coverage=true)'
after_success:
- julia -e 'VERSION < v"0.7.0-DEV.5183" || using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Coveralls.submit(Coveralls.process_folder())'
- julia -e 'VERSION < v"0.7.0-DEV.5183" || using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Codecov.submit(Codecov.process_folder())'
- julia -e 'using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Coveralls.submit(Coveralls.process_folder())'
- julia -e 'using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Codecov.submit(Codecov.process_folder())'
3 changes: 3 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ BinDeps = "9e28174c-4ba2-5203-b857-d8d62c4213ee"
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ The julia master process is NOT part of the MPI cluster. The main script should
launched directly, MPIManager internally calls `mpirun` to launch julia/mpi workers.
All the workers started via MPIManager will be part of the MPI cluster.

`MPIManager(;np=Sys.CPU_CORES, mpi_cmd=false, launch_timeout=60.0)`
`MPIManager(;np=Sys.CPU_THREADS, mpi_cmd=false, launch_timeout=60.0)`

If not specified, `mpi_cmd` defaults to `mpirun -np $np`
STDOUT from the launched workers is redirected back to the julia session calling `addprocs` via a TCP connection.
`stdout` from the launched workers is redirected back to the julia session calling `addprocs` via a TCP connection.
Thus the workers must be able to freely connect via TCP to the host session.
The following lines will be typically required on the julia master process to support both julia and mpi

Expand Down
2 changes: 1 addition & 1 deletion REQUIRE
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
julia 0.6
julia 0.7.0-beta
BinDeps
Compat 0.66
14 changes: 4 additions & 10 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
environment:
matrix:
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.6/julia-0.6-latest-win32.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.6/julia-0.6-latest-win64.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.7/julia-0.7.0-alpha-win32.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.7/julia-0.7.0-alpha-win64.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.7/julia-0.7.0-beta2-win32.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.7/julia-0.7.0-beta2-win64.exe"

branches:
only:
Expand Down Expand Up @@ -33,11 +31,7 @@ install:
- set PATH=C:\Program Files\Microsoft MPI\Bin;%PATH%

build_script:
# Need to convert from shallow to complete for Pkg.clone to work
- IF EXIST .git\shallow (git fetch --unshallow)
- C:\projects\julia\bin\julia -e "VERSION < v\"0.7.0-DEV\" || (using InteractiveUtils);
versioninfo(); pkg=\"MPI\";
if VERSION < v\"0.7.0-DEV.5183\"; Pkg.clone(pwd(), pkg); else; using Pkg; Pkg.up(); end; Pkg.build(\"MPI\")"
- C:\projects\julia\bin\julia -e "using Pkg; pkg\"activate .\"; pkg\"build\""

test_script:
- C:\projects\julia\bin\julia --check-bounds=yes -e "VERSION < v\"0.7.0-DEV.5183\" || using Pkg; Pkg.test(\"MPI\")"
- C:\projects\julia\bin\julia --check-bounds=yes -e "using Pkg; pkg\"activate .\"; pkg\"test\""
4 changes: 2 additions & 2 deletions examples/05-juliacman.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ println("Running 01-hello as part of a Julia cluster")
@mpi_do manager (include("01-hello-impl.jl"); do_hello())

# Interspersed julia parallel call
nheads = @parallel (+) for i=1:10^8
nheads = @distributed (+) for i=1:10^8
Int(rand(Bool))
end
println("@parallel nheads $nheads")
println("@distributed nheads $nheads")

println("Running 02-broadcast as part of a Julia cluster")
@mpi_do manager (include("02-broadcast-impl.jl"); do_broadcast())
Expand Down
2 changes: 1 addition & 1 deletion src/MPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function __init__()

# look up all symbols ahead of time
for (jname, fname) in _mpi_functions
eval(:(const $jname = Libdl.dlsym(libmpi_handle, $fname)))
Core.eval(MPI, :(const $jname = Libdl.dlsym(libmpi_handle, $fname)))
end
end

Expand Down
42 changes: 26 additions & 16 deletions src/cman.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
using Compat
using Compat.Distributed
import Compat.Sockets: connect, listenany, accept, getipaddr, IPv4
import Compat.Sockets: connect, listenany, accept, IPv4, getsockname



Expand Down Expand Up @@ -42,6 +42,7 @@ mutable struct MPIManager <: ClusterManager

# TCP Transport
port::UInt16
ip::UInt32
stdout_ios::Array

# MPI transport
Expand All @@ -54,7 +55,7 @@ mutable struct MPIManager <: ClusterManager
sending_done::Channel{Nothing}
receiving_done::Channel{Nothing}

function MPIManager(; np::Integer = Sys.CPU_CORES,
function MPIManager(; np::Integer = Sys.CPU_THREADS,
mpirun_cmd::Cmd = `mpiexec -n $np`,
launch_timeout::Real = 60.0,
mode::TransportMode = MPI_ON_WORKERS)
Expand Down Expand Up @@ -86,13 +87,15 @@ mutable struct MPIManager <: ClusterManager
if mode != MPI_TRANSPORT_ALL
# Start a listener for capturing stdout from the workers
port, server = listenany(11000)
ip = getsockname(server)[1].host
@async begin
while true
sock = accept(server)
push!(mgr.stdout_ios, sock)
end
end
mgr.port = port
mgr.ip = ip
mgr.stdout_ios = IO[]
else
mgr.rank2streams = Dict{Int,Tuple{IO,IO}}()
Expand Down Expand Up @@ -133,7 +136,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict,
throw(ErrorException("Reuse of MPIManager is not allowed."))
end
cookie = string(":cookie_",Distributed.cluster_cookie())
setup_cmds = `using MPI\;MPI.setup_worker'('$(getipaddr().host),$(mgr.port),$cookie')'`
setup_cmds = `using MPI\;MPI.setup_worker'('$(mgr.ip),$(mgr.port),$cookie')'`
mpi_cmd = `$(mgr.mpirun_cmd) $(params[:exename]) -e $(Base.shell_escape(setup_cmds))`
open(detach(mpi_cmd))
mgr.launched = true
Expand All @@ -151,7 +154,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict,
end

# Traverse all worker I/O streams and receive their MPI rank
configs = Array{WorkerConfig}(mgr.np)
configs = Array{WorkerConfig}(undef, mgr.np)
@sync begin
for io in mgr.stdout_ios
@async let io=io
Expand Down Expand Up @@ -199,12 +202,12 @@ function setup_worker(host, port, cookie)

# Hand over control to Base
if cookie == nothing
Base.start_worker(io)
Distributed.start_worker(io)
else
if isa(cookie, Symbol)
cookie = string(cookie)[8:end] # strip the leading "cookie_"
end
Base.start_worker(io, cookie)
Distributed.start_worker(io, cookie)
end
end

Expand Down Expand Up @@ -279,8 +282,8 @@ end
# case
function start_send_event_loop(mgr::MPIManager, rank::Int)
try
r_s = BufferStream()
w_s = BufferStream()
r_s = Base.BufferStream()
w_s = Base.BufferStream()
mgr.rank2streams[rank] = (r_s, w_s)

# TODO: There is one task per communication partner -- this can be
Expand All @@ -292,7 +295,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
reqs = MPI.Request[]
while !isready(mgr.initiate_shutdown)
# When data are available, send them
while nb_available(w_s) > 0
while bytesavailable(w_s) > 0
data = take!(w_s.buffer)
push!(reqs, MPI.Isend(data, rank, 0, mgr.comm))
end
Expand All @@ -307,7 +310,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
end
(r_s, w_s)
catch e
Base.show_backtrace(STDOUT, catch_backtrace())
Base.show_backtrace(stdout, catch_backtrace())
println(e)
rethrow(e)
end
Expand All @@ -334,11 +337,15 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
# Create manager object
mgr = MPIManager(np=size-1, mode=mode)
mgr.comm = comm
# Needed because of Julia commit https://github.com/JuliaLang/julia/commit/299300a409c35153a1fa235a05c3929726716600
if isdefined(Distributed, :init_multi)
Distributed.init_multi()
end
# Send connection information to all workers
# TODO: Use Bcast
for j in 1:size-1
cookie = VERSION >= v"0.5.0-dev+4047" ? Distributed.cluster_cookie() : nothing
MPI.send((getipaddr().host, mgr.port, cookie), j, 0, comm)
MPI.send((mgr.ip, mgr.port, cookie), j, 0, comm)
end
# Tell Base about the workers
addprocs(mgr)
Expand All @@ -363,6 +370,9 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;

# Send the cookie over. Introduced in v"0.5.0-dev+4047". Irrelevant under MPI
# transport, but need it to satisfy the changed protocol.
if isdefined(Distributed, :init_multi)
Distributed.init_multi()
end
MPI.bcast(Distributed.cluster_cookie(), 0, comm)
# Start event loop for the workers
@async receive_event_loop(mgr)
Expand All @@ -376,7 +386,7 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
mgr.comm = comm
# Recv the cookie
cookie = MPI.bcast(nothing, 0, comm)
Base.init_worker(cookie, mgr)
Distributed.init_worker(cookie, mgr)
# Start a worker event loop
receive_event_loop(mgr)
MPI.Finalize()
Expand All @@ -394,7 +404,7 @@ function receive_event_loop(mgr::MPIManager)
(hasdata, stat) = MPI.Iprobe(MPI.ANY_SOURCE, 0, mgr.comm)
if hasdata
count = Get_count(stat, UInt8)
buf = Array{UInt8}(count)
buf = Array{UInt8}(undef, count)
from_rank = Get_source(stat)
MPI.Recv!(buf, from_rank, 0, mgr.comm)

Expand All @@ -403,7 +413,7 @@ function receive_event_loop(mgr::MPIManager)
# This is the first time we communicate with this rank.
# Set up a new connection.
(r_s, w_s) = start_send_event_loop(mgr, from_rank)
Base.process_messages(r_s, w_s)
Distributed.process_messages(r_s, w_s)
num_send_loops += 1
else
(r_s, w_s) = streams
Expand Down Expand Up @@ -459,7 +469,7 @@ end
function mpi_do(mgr::MPIManager, expr)
!mgr.initialized && wait(mgr.cond_initialized)
jpids = keys(mgr.j2mpi)
refs = Array{Any}(length(jpids))
refs = Array{Any}(undef, length(jpids))
for (i,p) in enumerate(Iterators.filter(x -> x != myid(), jpids))
refs[i] = remotecall(expr, p)
end
Expand Down Expand Up @@ -490,7 +500,7 @@ end
macro mpi_do(mgr, expr)
quote
# Evaluate expression in Main module
thunk = () -> (eval(Main, $(Expr(:quote, expr))); nothing)
thunk = () -> (Core.eval(Main, $(Expr(:quote, expr))); nothing)
mpi_do($(esc(mgr)), thunk)
end
end
Expand Down
7 changes: 1 addition & 6 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ if Compat.Sys.iswindows()
end
end

if VERSION > v"0.7.0-DEV.2005"
push!(excludedfiles, "test_cman_julia.jl")
push!(excludedfiles, "test_cman_mpi.jl")
push!(excludedfiles, "test_cman_tcp.jl")
end
function runtests()
nprocs = clamp(Sys.CPU_CORES, 2, 4)
nprocs = clamp(Sys.CPU_THREADS, 2, 4)
exename = joinpath(BINDIR, Base.julia_exename())
testdir = dirname(@__FILE__)
istest(f) = endswith(f, ".jl") && startswith(f, "test_")
Expand Down
6 changes: 3 additions & 3 deletions test/test_bcast.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Compat.Test
using Compat.Random
using Test
using Random
using MPI

MPI.Init()
Expand All @@ -19,7 +19,7 @@ end

root = 0

srand(17)
Random.seed!(17)

matsize = (17,17)
for typ in Base.uniontypes(MPI.MPIDatatype)
Expand Down
2 changes: 1 addition & 1 deletion test/test_cman_julia.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ for id in ids
@test id
end

s = @parallel (+) for i in 1:10
s = @distributed (+) for i in 1:10
i^2
end
@test s == 385
Expand Down
3 changes: 2 additions & 1 deletion test/test_cman_mpi.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Compat.Test
using MPI
using Distributed

# This uses MPI to communicate with the workers
mgr = MPI.start_main_loop(MPI.MPI_TRANSPORT_ALL)
Expand All @@ -24,7 +25,7 @@ for id in ids
@test id
end

s = @parallel (+) for i in 1:10
s = @distributed (+) for i in 1:10
i^2
end
@test s == 385
Expand Down
3 changes: 2 additions & 1 deletion test/test_cman_tcp.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Compat.Test
using MPI
using Distributed

# This uses TCP to communicate with the workers
mgr = MPI.start_main_loop(MPI.TCP_TRANSPORT_ALL)
Expand All @@ -24,7 +25,7 @@ for id in ids
@test id
end

s = @parallel (+) for i in 1:10
s = @distributed (+) for i in 1:10
i^2
end
@test s == 385
Expand Down
2 changes: 1 addition & 1 deletion test/test_finalize_atexit.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Base.Test
using Test
using MPI

@test !MPI.Initialized()
Expand Down
2 changes: 1 addition & 1 deletion test/test_spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ using MPI

MPI.Init()

const N = clamp(Sys.CPU_CORES, 2, 4)
const N = clamp(Sys.CPU_THREADS, 2, 4)

exename = joinpath(Compat.Sys.BINDIR, Base.julia_exename())
@test isfile(exename)
Expand Down

0 comments on commit 0c47e18

Please sign in to comment.