diff --git a/Project.toml b/Project.toml index 22b1cff..4c554d7 100644 --- a/Project.toml +++ b/Project.toml @@ -4,13 +4,14 @@ authors = ["Sergio Sánchez Ramírez and contrib version = "0.1.0" [deps] -Cassette = "7057c7e9-c182-5462-911a-8362d720325c" -Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Extrae_jll = "2b2c4be0-e38c-5918-b8b4-9a308845a1e9" -Requires = "ae029012-a4dd-5104-9daa-d747884805df" + +[weakdeps] +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" + +[extensions] +ExtraeDistributedExt = "Distributed" [compat] -Cassette = "0.3" Extrae_jll = "4.0.3" -Requires = "1.3" -julia = "1" +julia = "1.9" diff --git a/ext/ExtraeDistributedExt.jl b/ext/ExtraeDistributedExt.jl new file mode 100644 index 0000000..67d1ff8 --- /dev/null +++ b/ext/ExtraeDistributedExt.jl @@ -0,0 +1,134 @@ +using Distributed + +""" + ExtraeLocalManager.jl + +Implements a copy of the default `LocalClusterManager` that starts +workers with an overdubbed event loop. +""" + +struct ExtraeLocalManager <: ClusterManager + np::Int + restrict::Bool # Restrict binding to 127.0.0.1 only +end + +Base.show(io::IO, manager::ExtraeLocalManager) = print(io, "ExtraeLocalManager(#procs=$(manager.np), restrict=$(manager.restrict))") + +function Distributed.launch(manager::ExtraeLocalManager, params::Dict, launched::Array, c::Condition) + dir = params[:dir] + exename = params[:exename] + exeflags = params[:exeflags] + bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)` + cookie = cluster_cookie() + + active_project = Base.active_project() + hookline = """ + using Distributed + using Extrae + using Cassette + + Cassette.overdub(Extrae.ExtraeCtx(), start_worker, $(repr(cookie))) + """ + + # Bug: Instead of using julia_cmd(exename), I directly use exename because idk howto access Base.julia_cmd + for i in 1:manager.np + cmd = `env EXTRAE_SKIP_AUTO_LIBRARY_INITIALIZE=1 $exename $exeflags --project=$active_project --bind-to $bind_to -e "$hookline"` + io = open(detach(setenv(cmd, dir=dir)), "r+") + + # Cluster cookie is not passed through IO. Instead, we set it + # as a parameter when starting worker, through the hookline + #Distributed.write_cookie(io) + + wconfig = WorkerConfig() + wconfig.process = io + wconfig.io = io.out + wconfig.enable_threaded_blas = params[:enable_threaded_blas] + push!(launched, wconfig) + end + + notify(c) +end + +function Distributed.manage(manager::ExtraeLocalManager, id::Integer, config::WorkerConfig, op::Symbol) + if op === :interrupt + kill(config.process, 2) + end +end + +export ExtraeLocalManager + +const DistributedEvent{ValueCode} = Event{400002,ValueCode} +const DistributedUsefulWorkEvent{ValueCode} = Event{400001,ValueCode} +const DistributedMessageHandlingEvent{ValueCode} = Event{400004,ValueCode} + +description(::Type{DistributedEvent}) = "Distributed runtime calls" +description(::Type{DistributedUsefulWorkEvent}) = "Distributed workload execution" +description(::Type{DistributedMessageHandlingEvent}) = "Distributed message handling functions" + +const DistributedEnd = DistributedEvent{0}() +const DistributedAddProcs = DistributedEvent{1}() +const DistributedRmProcs = DistributedEvent{2}() +const DistributedInitWorker = DistributedEvent{3}() +const DistributedStartWorker = DistributedEvent{4}() +const DistributedRemoteCall = DistributedEvent{5}() +const DistributedRemoteCallFetch = DistributedEvent{6}() +const DistributedRemoteCallWait = DistributedEvent{7}() +const DistributedProcessMessages = DistributedEvent{8}() +const DistributedInterrupt = DistributedEvent{9}() + +const DistributedUsefulWork = DistributedUsefulWorkEvent{1}() +const DistributedNotUsefulWork = DistributedUsefulWorkEvent{0}() + +const DistributedHandleEnd = DistributedMessageHandlingEvent{0}() +const DistributedHandleCall = DistributedMessageHandlingEvent{1}() +const DistributedHandleCallFetch = DistributedMessageHandlingEvent{2}() +const DistributedHandleCallWait = DistributedMessageHandlingEvent{3}() +const DistributedHandleRemoteDo = DistributedMessageHandlingEvent{4}() +const DistributedHandleResult = DistributedMessageHandlingEvent{5}() +const DistributedHandleIdentifySocket = DistributedMessageHandlingEvent{6}() +const DistributedHandleIdentifySocketAck = DistributedMessageHandlingEvent{7}() +const DistributedHandleJoinPGRP = DistributedMessageHandlingEvent{8}() +const DistributedHandleJoinComplete = DistributedMessageHandlingEvent{9}() + +description(::typeof(DistributedEnd)) = "end" +description(::typeof(DistributedAddProcs)) = "addprocs" +description(::typeof(DistributedRmProcs)) = "rmprocs" +description(::typeof(DistributedInitWorker)) = "init_worker" +description(::typeof(DistributedStartWorker)) = "start_worker" +description(::typeof(DistributedRemoteCall)) = "remotecall" +description(::typeof(DistributedRemoteCallFetch)) = "remotecall_fetch" +description(::typeof(DistributedRemoteCallWait)) = "remotecall_wait" +description(::typeof(DistributedProcessMessages)) = "process_messages" +description(::typeof(DistributedInterrupt)) = "interrupt" + +description(::typeof(DistributedHandleEnd)) = "End" +description(::typeof(DistributedHandleCall)) = "CallMsg{:call}" +description(::typeof(DistributedHandleCallFetch)) = "CallMsg{:call_fetch}" +description(::typeof(DistributedHandleCallWait)) = "CallWaitMsg" +description(::typeof(DistributedHandleRemoteDo)) = "RemoteDoMsg" +description(::typeof(DistributedHandleResult)) = "ResultMsg" +description(::typeof(DistributedHandleIdentifySocket)) = "IdentifySocketMsg" +description(::typeof(DistributedHandleIdentifySocketAck)) = "IdentifySocketAckMsg" +description(::typeof(DistributedHandleJoinPGRP)) = "JoinPGRPMsg" +description(::typeof(DistributedHandleJoinComplete)) = "JoinCompletesg" + +description(::typeof(DistributedUsefulWork)) = "Useful" +description(::typeof(DistributedNotUsefulWork)) = "Not Useful" + +# resource identification +dist_taskid()::Cuint = Distributed.myid() - 1 +dist_numtasks()::Cuint = Distributed.nworkers() + 1 + +# cluster manager addprocs +function addprocs_extrae(np::Integer; restrict=true, kwargs...) + manager = Extrae.ExtraeLocalManager(np, restrict) + #check_addprocs_args(manager, kwargs) + new_workers = addprocs(manager; kwargs...) + + Extrae.init() + for pid in new_workers + @fetchfrom pid Extrae.init() + end + return new_workers +end +export addprocs_extrae diff --git a/src/Extrae.jl b/src/Extrae.jl index 1cfa697..d6ff0fa 100644 --- a/src/Extrae.jl +++ b/src/Extrae.jl @@ -5,15 +5,6 @@ include("API.jl") export Event, typecode, valuecode, description export version, init, isinit, finish, flush, instrumentation, emit, register, previous_hwc_set, next_hwc_set, set_tracing_tasks, setoption, network_counters, network_routes, user_function -using Cassette -Cassette.@context ExtraeCtx - -using Requires - -function __init__() - @require Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" include("Instrumentation/Distributed.jl") -end - include("Instrumentation/Threads.jl") end diff --git a/src/Instrumentation/Distributed.jl b/src/Instrumentation/Distributed.jl deleted file mode 100644 index f2b9e09..0000000 --- a/src/Instrumentation/Distributed.jl +++ /dev/null @@ -1,131 +0,0 @@ -using Distributed - -include("ExtraeLocalManager.jl") - -const DistributedEvent{ValueCode} = Event{400002,ValueCode} -const DistributedUsefulWorkEvent{ValueCode} = Event{400001,ValueCode} -const DistributedMessageHandlingEvent{ValueCode} = Event{400004,ValueCode} - -description(::Type{DistributedEvent}) = "Distributed runtime calls" -description(::Type{DistributedUsefulWorkEvent}) = "Distributed workload execution" -description(::Type{DistributedMessageHandlingEvent}) = "Distributed message handling functions" - -const DistributedEnd = DistributedEvent{0}() -const DistributedAddProcs = DistributedEvent{1}() -const DistributedRmProcs = DistributedEvent{2}() -const DistributedInitWorker = DistributedEvent{3}() -const DistributedStartWorker = DistributedEvent{4}() -const DistributedRemoteCall = DistributedEvent{5}() -const DistributedRemoteCallFetch = DistributedEvent{6}() -const DistributedRemoteCallWait = DistributedEvent{7}() -const DistributedProcessMessages = DistributedEvent{8}() -const DistributedInterrupt = DistributedEvent{9}() - -const DistributedUsefulWork = DistributedUsefulWorkEvent{1}() -const DistributedNotUsefulWork = DistributedUsefulWorkEvent{0}() - -const DistributedHandleEnd = DistributedMessageHandlingEvent{0}() -const DistributedHandleCall = DistributedMessageHandlingEvent{1}() -const DistributedHandleCallFetch = DistributedMessageHandlingEvent{2}() -const DistributedHandleCallWait = DistributedMessageHandlingEvent{3}() -const DistributedHandleRemoteDo = DistributedMessageHandlingEvent{4}() -const DistributedHandleResult = DistributedMessageHandlingEvent{5}() -const DistributedHandleIdentifySocket = DistributedMessageHandlingEvent{6}() -const DistributedHandleIdentifySocketAck = DistributedMessageHandlingEvent{7}() -const DistributedHandleJoinPGRP = DistributedMessageHandlingEvent{8}() -const DistributedHandleJoinComplete = DistributedMessageHandlingEvent{9}() - -description(::typeof(DistributedEnd)) = "end" -description(::typeof(DistributedAddProcs)) = "addprocs" -description(::typeof(DistributedRmProcs)) = "rmprocs" -description(::typeof(DistributedInitWorker)) = "init_worker" -description(::typeof(DistributedStartWorker)) = "start_worker" -description(::typeof(DistributedRemoteCall)) = "remotecall" -description(::typeof(DistributedRemoteCallFetch)) = "remotecall_fetch" -description(::typeof(DistributedRemoteCallWait)) = "remotecall_wait" -description(::typeof(DistributedProcessMessages)) = "process_messages" -description(::typeof(DistributedInterrupt)) = "interrupt" - -description(::typeof(DistributedHandleEnd)) = "End" -description(::typeof(DistributedHandleCall)) = "CallMsg{:call}" -description(::typeof(DistributedHandleCallFetch)) = "CallMsg{:call_fetch}" -description(::typeof(DistributedHandleCallWait)) = "CallWaitMsg" -description(::typeof(DistributedHandleRemoteDo)) = "RemoteDoMsg" -description(::typeof(DistributedHandleResult)) = "ResultMsg" -description(::typeof(DistributedHandleIdentifySocket)) = "IdentifySocketMsg" -description(::typeof(DistributedHandleIdentifySocketAck)) = "IdentifySocketAckMsg" -description(::typeof(DistributedHandleJoinPGRP)) = "JoinPGRPMsg" -description(::typeof(DistributedHandleJoinComplete)) = "JoinCompletesg" - -description(::typeof(DistributedUsefulWork)) = "Useful" -description(::typeof(DistributedNotUsefulWork)) = "Not Useful" - -##### WORKAROUND TO NOT ESCAPE ON TASKS. See: https://github.com/JuliaLabs/Cassette.jl/issues/120 -Cassette.overdub(ctx::ExtraeCtx, ::typeof(Base.Core._Task), @nospecialize(f), stack::Int, future) = Base.Core._Task(()->Cassette.overdub(ctx, f), stack, future) - - -# worker management -Cassette.prehook(::ExtraeCtx, ::typeof(addprocs), args...) = emit(DistributedAddProcs) -Cassette.posthook(::ExtraeCtx, _, ::typeof(addprocs), args...) = emit(DistributedEnd) - -Cassette.prehook(::ExtraeCtx, ::typeof(rmprocs), args...) = emit(DistributedRmProcs) -Cassette.posthook(::ExtraeCtx, _, ::typeof(rmprocs), args...) = emit(DistributedEnd) - -Cassette.prehook(::ExtraeCtx, ::typeof(init_worker), args...) = emit(DistributedInitWorker) -Cassette.posthook(::ExtraeCtx, _, ::typeof(init_worker), args...) = emit(DistributedEnd) - -Cassette.prehook(::ExtraeCtx, ::typeof(start_worker), args...) = emit(DistributedStartWorker) -Cassette.posthook(::ExtraeCtx, _, ::typeof(start_worker), args...) = emit(DistributedEnd) - -# task creation -Cassette.prehook(::ExtraeCtx, ::typeof(remotecall), f, ::Distributed.Worker, args...) = emit(DistributedRemoteCall) -Cassette.posthook(::ExtraeCtx, _, ::typeof(remotecall), f, ::Distributed.Worker, args...) = emit(DistributedEnd) - -Cassette.prehook(::ExtraeCtx, ::typeof(remotecall_fetch), ::typeof(Distributed.fetch_ref), ::Distributed.Worker, args...) = emit(DistributedRemoteCallFetch) -Cassette.posthook(::ExtraeCtx, _, ::typeof(remotecall_fetch), ::typeof(Distributed.fetch_ref), ::Distributed.Worker, args...) = emit(DistributedEnd) - -Cassette.prehook(::ExtraeCtx, ::typeof(remotecall_wait), args...) = emit(DistributedRemoteCallWait) -Cassette.posthook(::ExtraeCtx, _, ::typeof(remotecall_wait), args...) = emit(DistributedEnd) - -# task management -Cassette.prehook(::ExtraeCtx, ::typeof(process_messages), args...) = emit(DistributedProcessMessages) -Cassette.posthook(::ExtraeCtx, _, ::typeof(process_messages), args...) = emit(DistributedEnd) - -Cassette.prehook(::ExtraeCtx, ::typeof(interrupt), args...) = emit(DistributedInterrupt) -Cassette.posthook(::ExtraeCtx, _, ::typeof(interrupt), args...) = emit(DistributedEnd) - -# workload execution -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.run_work_thunk), f::Function, args...) = emit(DistributedUsefulWork) -Cassette.posthook(::ExtraeCtx, _, ::typeof(Distributed.run_work_thunk), f::Function, args...) = emit(DistributedNotUsefulWork) - -# message handle -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.CallMsg{:call}, args...) = emit(DistributedHandleCall) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.CallMsg{:call_fetch}, args...) = emit(DistributedHandleCallFetch) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.CallWaitMsg, args...) = emit(DistributedHandleCallWait) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.RemoteDoMsg, args...) = emit(DistributedHandleRemoteDo) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.ResultMsg, args...) = emit(DistributedHandleResult) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.IdentifySocketMsg, args...) = emit(DistributedHandleIdentifySocket) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.IdentifySocketAckMsg, args...) = emit(DistributedHandleIdentifySocketAck) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.JoinPGRPMsg, args...) = emit(DistributedHandleJoinPGRP) -Cassette.prehook(::ExtraeCtx, ::typeof(Distributed.handle_msg), ::Distributed.JoinCompleteMsg, args...) = emit(DistributedHandleJoinComplete) - -Cassette.posthook(::ExtraeCtx, _, ::typeof(Distributed.handle_msg), args...) = emit(DistributedHandleEnd) - - -# resource identification -dist_taskid()::Cuint = Distributed.myid() - 1 -dist_numtasks()::Cuint = Distributed.nworkers() + 1 - -# cluster manager addprocs -function addprocs_extrae(np::Integer; restrict=true, kwargs...) - manager = Extrae.ExtraeLocalManager(np, restrict) - #check_addprocs_args(manager, kwargs) - new_workers = addprocs(manager; kwargs...) - - Extrae.init() - for pid in new_workers - @fetchfrom pid Extrae.init() - end - return new_workers -end -export addprocs_extrae diff --git a/src/Instrumentation/ExtraeLocalManager.jl b/src/Instrumentation/ExtraeLocalManager.jl deleted file mode 100644 index 596b590..0000000 --- a/src/Instrumentation/ExtraeLocalManager.jl +++ /dev/null @@ -1,56 +0,0 @@ -""" - ExtraeLocalManager.jl - -Implements a copy of the default `LocalClusterManager` that starts -workers with an overdubbed event loop. -""" - -struct ExtraeLocalManager <: ClusterManager - np::Int - restrict::Bool # Restrict binding to 127.0.0.1 only -end - -Base.show(io::IO, manager::ExtraeLocalManager) = print(io, "ExtraeLocalManager(#procs=$(manager.np), restrict=$(manager.restrict))") - -function Distributed.launch(manager::ExtraeLocalManager, params::Dict, launched::Array, c::Condition) - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)` - cookie = cluster_cookie() - - active_project = Base.active_project() - hookline = """ - using Distributed - using Extrae - using Cassette - - Cassette.overdub(Extrae.ExtraeCtx(), start_worker, $(repr(cookie))) - """ - - # Bug: Instead of using julia_cmd(exename), I directly use exename because idk howto access Base.julia_cmd - for i in 1:manager.np - cmd = `env EXTRAE_SKIP_AUTO_LIBRARY_INITIALIZE=1 $exename $exeflags --project=$active_project --bind-to $bind_to -e "$hookline"` - io = open(detach(setenv(cmd, dir=dir)), "r+") - - # Cluster cookie is not passed through IO. Instead, we set it - # as a parameter when starting worker, through the hookline - #Distributed.write_cookie(io) - - wconfig = WorkerConfig() - wconfig.process = io - wconfig.io = io.out - wconfig.enable_threaded_blas = params[:enable_threaded_blas] - push!(launched, wconfig) - end - - notify(c) -end - -function Distributed.manage(manager::ExtraeLocalManager, id::Integer, config::WorkerConfig, op::Symbol) - if op === :interrupt - kill(config.process, 2) - end -end - -export ExtraeLocalManager \ No newline at end of file