diff --git a/.travis.yml b/.travis.yml index 6e685136b..09746cdb2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,6 @@ os: - linux - osx julia: - - 0.6 - 0.7 - nightly notifications: @@ -21,8 +20,6 @@ before_install: # Work around OpenMPI attempting to create overly long temporary # file names - and erroring as a result - export TMPDIR=/tmp -script: - - julia -e 'if VERSION < v"0.7.0-DEV.5183"; Pkg.clone(pwd()) ; else; using Pkg; Pkg.up(); end; Pkg.build("MPI"); Pkg.test("MPI"; coverage=true)' after_success: - - julia -e 'VERSION < v"0.7.0-DEV.5183" || using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Coveralls.submit(Coveralls.process_folder())' - - julia -e 'VERSION < v"0.7.0-DEV.5183" || using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Codecov.submit(Codecov.process_folder())' + - julia -e 'using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Coveralls.submit(Coveralls.process_folder())' + - julia -e 'using Pkg; cd(Pkg.dir("MPI")); Pkg.add("Coverage"); using Coverage; Codecov.submit(Codecov.process_folder())' diff --git a/Project.toml b/Project.toml index 893ba304e..24459a031 100644 --- a/Project.toml +++ b/Project.toml @@ -8,3 +8,6 @@ BinDeps = "9e28174c-4ba2-5203-b857-d8d62c4213ee" Compat = "34da2185-b29b-5c13-b0c7-acf172513d20" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb" +Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/README.md b/README.md index a8c067910..95d7d55c0 100644 --- a/README.md +++ b/README.md @@ -118,10 +118,10 @@ The julia master process is NOT part of the MPI cluster. The main script should launched directly, MPIManager internally calls `mpirun` to launch julia/mpi workers. All the workers started via MPIManager will be part of the MPI cluster. -`MPIManager(;np=Sys.CPU_CORES, mpi_cmd=false, launch_timeout=60.0)` +`MPIManager(;np=Sys.CPU_THREADS, mpi_cmd=false, launch_timeout=60.0)` If not specified, `mpi_cmd` defaults to `mpirun -np $np` -STDOUT from the launched workers is redirected back to the julia session calling `addprocs` via a TCP connection. +`stdout` from the launched workers is redirected back to the julia session calling `addprocs` via a TCP connection. Thus the workers must be able to freely connect via TCP to the host session. The following lines will be typically required on the julia master process to support both julia and mpi diff --git a/REQUIRE b/REQUIRE index af8ddf0f0..7d399637d 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,3 +1,3 @@ -julia 0.6 +julia 0.7.0-beta BinDeps Compat 0.66 diff --git a/appveyor.yml b/appveyor.yml index 1d3cf071d..4e6271126 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,11 +1,9 @@ environment: matrix: - - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.6/julia-0.6-latest-win32.exe" - - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.6/julia-0.6-latest-win64.exe" - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe" - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe" - - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.7/julia-0.7.0-alpha-win32.exe" - - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.7/julia-0.7.0-alpha-win64.exe" + - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.7/julia-0.7.0-beta2-win32.exe" + - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.7/julia-0.7.0-beta2-win64.exe" branches: only: @@ -33,11 +31,7 @@ install: - set PATH=C:\Program Files\Microsoft MPI\Bin;%PATH% build_script: -# Need to convert from shallow to complete for Pkg.clone to work - - IF EXIST .git\shallow (git fetch --unshallow) - - C:\projects\julia\bin\julia -e "VERSION < v\"0.7.0-DEV\" || (using InteractiveUtils); - versioninfo(); pkg=\"MPI\"; - if VERSION < v\"0.7.0-DEV.5183\"; Pkg.clone(pwd(), pkg); else; using Pkg; Pkg.up(); end; Pkg.build(\"MPI\")" + - C:\projects\julia\bin\julia -e "using Pkg; pkg\"activate .\"; pkg\"build\"" test_script: - - C:\projects\julia\bin\julia --check-bounds=yes -e "VERSION < v\"0.7.0-DEV.5183\" || using Pkg; Pkg.test(\"MPI\")" + - C:\projects\julia\bin\julia --check-bounds=yes -e "using Pkg; pkg\"activate .\"; pkg\"test\"" diff --git a/examples/05-juliacman.jl b/examples/05-juliacman.jl index 76e9ab858..989922356 100644 --- a/examples/05-juliacman.jl +++ b/examples/05-juliacman.jl @@ -11,10 +11,10 @@ println("Running 01-hello as part of a Julia cluster") @mpi_do manager (include("01-hello-impl.jl"); do_hello()) # Interspersed julia parallel call -nheads = @parallel (+) for i=1:10^8 +nheads = @distributed (+) for i=1:10^8 Int(rand(Bool)) end -println("@parallel nheads $nheads") +println("@distributed nheads $nheads") println("Running 02-broadcast as part of a Julia cluster") @mpi_do manager (include("02-broadcast-impl.jl"); do_broadcast()) diff --git a/src/MPI.jl b/src/MPI.jl index 9e38f8892..83cdbfa7e 100644 --- a/src/MPI.jl +++ b/src/MPI.jl @@ -33,7 +33,7 @@ function __init__() # look up all symbols ahead of time for (jname, fname) in _mpi_functions - eval(:(const $jname = Libdl.dlsym(libmpi_handle, $fname))) + Core.eval(MPI, :(const $jname = Libdl.dlsym(libmpi_handle, $fname))) end end diff --git a/src/cman.jl b/src/cman.jl index 251c20810..fab95c4de 100644 --- a/src/cman.jl +++ b/src/cman.jl @@ -3,7 +3,7 @@ export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL using Compat using Compat.Distributed -import Compat.Sockets: connect, listenany, accept, getipaddr, IPv4 +import Compat.Sockets: connect, listenany, accept, IPv4, getsockname @@ -42,6 +42,7 @@ mutable struct MPIManager <: ClusterManager # TCP Transport port::UInt16 + ip::UInt32 stdout_ios::Array # MPI transport @@ -54,7 +55,7 @@ mutable struct MPIManager <: ClusterManager sending_done::Channel{Nothing} receiving_done::Channel{Nothing} - function MPIManager(; np::Integer = Sys.CPU_CORES, + function MPIManager(; np::Integer = Sys.CPU_THREADS, mpirun_cmd::Cmd = `mpiexec -n $np`, launch_timeout::Real = 60.0, mode::TransportMode = MPI_ON_WORKERS) @@ -86,6 +87,7 @@ mutable struct MPIManager <: ClusterManager if mode != MPI_TRANSPORT_ALL # Start a listener for capturing stdout from the workers port, server = listenany(11000) + ip = getsockname(server)[1].host @async begin while true sock = accept(server) @@ -93,6 +95,7 @@ mutable struct MPIManager <: ClusterManager end end mgr.port = port + mgr.ip = ip mgr.stdout_ios = IO[] else mgr.rank2streams = Dict{Int,Tuple{IO,IO}}() @@ -133,7 +136,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict, throw(ErrorException("Reuse of MPIManager is not allowed.")) end cookie = string(":cookie_",Distributed.cluster_cookie()) - setup_cmds = `using MPI\;MPI.setup_worker'('$(getipaddr().host),$(mgr.port),$cookie')'` + setup_cmds = `using MPI\;MPI.setup_worker'('$(mgr.ip),$(mgr.port),$cookie')'` mpi_cmd = `$(mgr.mpirun_cmd) $(params[:exename]) -e $(Base.shell_escape(setup_cmds))` open(detach(mpi_cmd)) mgr.launched = true @@ -151,7 +154,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict, end # Traverse all worker I/O streams and receive their MPI rank - configs = Array{WorkerConfig}(mgr.np) + configs = Array{WorkerConfig}(undef, mgr.np) @sync begin for io in mgr.stdout_ios @async let io=io @@ -199,12 +202,12 @@ function setup_worker(host, port, cookie) # Hand over control to Base if cookie == nothing - Base.start_worker(io) + Distributed.start_worker(io) else if isa(cookie, Symbol) cookie = string(cookie)[8:end] # strip the leading "cookie_" end - Base.start_worker(io, cookie) + Distributed.start_worker(io, cookie) end end @@ -279,8 +282,8 @@ end # case function start_send_event_loop(mgr::MPIManager, rank::Int) try - r_s = BufferStream() - w_s = BufferStream() + r_s = Base.BufferStream() + w_s = Base.BufferStream() mgr.rank2streams[rank] = (r_s, w_s) # TODO: There is one task per communication partner -- this can be @@ -292,7 +295,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int) reqs = MPI.Request[] while !isready(mgr.initiate_shutdown) # When data are available, send them - while nb_available(w_s) > 0 + while bytesavailable(w_s) > 0 data = take!(w_s.buffer) push!(reqs, MPI.Isend(data, rank, 0, mgr.comm)) end @@ -307,7 +310,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int) end (r_s, w_s) catch e - Base.show_backtrace(STDOUT, catch_backtrace()) + Base.show_backtrace(stdout, catch_backtrace()) println(e) rethrow(e) end @@ -334,11 +337,15 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL; # Create manager object mgr = MPIManager(np=size-1, mode=mode) mgr.comm = comm + # Needed because of Julia commit https://github.com/JuliaLang/julia/commit/299300a409c35153a1fa235a05c3929726716600 + if isdefined(Distributed, :init_multi) + Distributed.init_multi() + end # Send connection information to all workers # TODO: Use Bcast for j in 1:size-1 cookie = VERSION >= v"0.5.0-dev+4047" ? Distributed.cluster_cookie() : nothing - MPI.send((getipaddr().host, mgr.port, cookie), j, 0, comm) + MPI.send((mgr.ip, mgr.port, cookie), j, 0, comm) end # Tell Base about the workers addprocs(mgr) @@ -363,6 +370,9 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL; # Send the cookie over. Introduced in v"0.5.0-dev+4047". Irrelevant under MPI # transport, but need it to satisfy the changed protocol. + if isdefined(Distributed, :init_multi) + Distributed.init_multi() + end MPI.bcast(Distributed.cluster_cookie(), 0, comm) # Start event loop for the workers @async receive_event_loop(mgr) @@ -376,7 +386,7 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL; mgr.comm = comm # Recv the cookie cookie = MPI.bcast(nothing, 0, comm) - Base.init_worker(cookie, mgr) + Distributed.init_worker(cookie, mgr) # Start a worker event loop receive_event_loop(mgr) MPI.Finalize() @@ -394,7 +404,7 @@ function receive_event_loop(mgr::MPIManager) (hasdata, stat) = MPI.Iprobe(MPI.ANY_SOURCE, 0, mgr.comm) if hasdata count = Get_count(stat, UInt8) - buf = Array{UInt8}(count) + buf = Array{UInt8}(undef, count) from_rank = Get_source(stat) MPI.Recv!(buf, from_rank, 0, mgr.comm) @@ -403,7 +413,7 @@ function receive_event_loop(mgr::MPIManager) # This is the first time we communicate with this rank. # Set up a new connection. (r_s, w_s) = start_send_event_loop(mgr, from_rank) - Base.process_messages(r_s, w_s) + Distributed.process_messages(r_s, w_s) num_send_loops += 1 else (r_s, w_s) = streams @@ -459,7 +469,7 @@ end function mpi_do(mgr::MPIManager, expr) !mgr.initialized && wait(mgr.cond_initialized) jpids = keys(mgr.j2mpi) - refs = Array{Any}(length(jpids)) + refs = Array{Any}(undef, length(jpids)) for (i,p) in enumerate(Iterators.filter(x -> x != myid(), jpids)) refs[i] = remotecall(expr, p) end @@ -490,7 +500,7 @@ end macro mpi_do(mgr, expr) quote # Evaluate expression in Main module - thunk = () -> (eval(Main, $(Expr(:quote, expr))); nothing) + thunk = () -> (Core.eval(Main, $(Expr(:quote, expr))); nothing) mpi_do($(esc(mgr)), thunk) end end diff --git a/test/runtests.jl b/test/runtests.jl index f868b868c..6b596d9ea 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -30,13 +30,8 @@ if Compat.Sys.iswindows() end end -if VERSION > v"0.7.0-DEV.2005" - push!(excludedfiles, "test_cman_julia.jl") - push!(excludedfiles, "test_cman_mpi.jl") - push!(excludedfiles, "test_cman_tcp.jl") -end function runtests() - nprocs = clamp(Sys.CPU_CORES, 2, 4) + nprocs = clamp(Sys.CPU_THREADS, 2, 4) exename = joinpath(BINDIR, Base.julia_exename()) testdir = dirname(@__FILE__) istest(f) = endswith(f, ".jl") && startswith(f, "test_") diff --git a/test/test_bcast.jl b/test/test_bcast.jl index 0b93ca95b..5d6014f90 100644 --- a/test/test_bcast.jl +++ b/test/test_bcast.jl @@ -1,5 +1,5 @@ -using Compat.Test -using Compat.Random +using Test +using Random using MPI MPI.Init() @@ -19,7 +19,7 @@ end root = 0 -srand(17) +Random.seed!(17) matsize = (17,17) for typ in Base.uniontypes(MPI.MPIDatatype) diff --git a/test/test_cman_julia.jl b/test/test_cman_julia.jl index c1eb1ebf9..37525328d 100644 --- a/test/test_cman_julia.jl +++ b/test/test_cman_julia.jl @@ -26,7 +26,7 @@ for id in ids @test id end -s = @parallel (+) for i in 1:10 +s = @distributed (+) for i in 1:10 i^2 end @test s == 385 diff --git a/test/test_cman_mpi.jl b/test/test_cman_mpi.jl index fa70abc3e..3b27b8cb3 100644 --- a/test/test_cman_mpi.jl +++ b/test/test_cman_mpi.jl @@ -1,5 +1,6 @@ using Compat.Test using MPI +using Distributed # This uses MPI to communicate with the workers mgr = MPI.start_main_loop(MPI.MPI_TRANSPORT_ALL) @@ -24,7 +25,7 @@ for id in ids @test id end -s = @parallel (+) for i in 1:10 +s = @distributed (+) for i in 1:10 i^2 end @test s == 385 diff --git a/test/test_cman_tcp.jl b/test/test_cman_tcp.jl index 38192120a..f45211960 100644 --- a/test/test_cman_tcp.jl +++ b/test/test_cman_tcp.jl @@ -1,5 +1,6 @@ using Compat.Test using MPI +using Distributed # This uses TCP to communicate with the workers mgr = MPI.start_main_loop(MPI.TCP_TRANSPORT_ALL) @@ -24,7 +25,7 @@ for id in ids @test id end -s = @parallel (+) for i in 1:10 +s = @distributed (+) for i in 1:10 i^2 end @test s == 385 diff --git a/test/test_finalize_atexit.jl b/test/test_finalize_atexit.jl index 8f8b0e6fb..125be2491 100644 --- a/test/test_finalize_atexit.jl +++ b/test/test_finalize_atexit.jl @@ -1,4 +1,4 @@ -using Base.Test +using Test using MPI @test !MPI.Initialized() diff --git a/test/test_spawn.jl b/test/test_spawn.jl index be3d3aea9..0195bcff2 100644 --- a/test/test_spawn.jl +++ b/test/test_spawn.jl @@ -4,7 +4,7 @@ using MPI MPI.Init() -const N = clamp(Sys.CPU_CORES, 2, 4) +const N = clamp(Sys.CPU_THREADS, 2, 4) exename = joinpath(Compat.Sys.BINDIR, Base.julia_exename()) @test isfile(exename)