diff --git a/src/lsf.jl b/src/lsf.jl index ca5dce3..d2aa738 100755 --- a/src/lsf.jl +++ b/src/lsf.jl @@ -15,54 +15,61 @@ struct LSFException <: Exception msg end +function parse_host_port(stream, port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)") + bytestr = readline(stream) + conn_info_match = match(port_host_regex, bytestr) + if !isnothing(conn_info_match) + host = conn_info_match.captures[2] + port = parse(Int, conn_info_match.captures[1]) + @debug("lsf worker listening", connect_info=bytestr, host, port) + + return true, bytestr, host, port + end + return false, bytestr, nothing, nothing +end + function lsf_bpeek(manager::LSFManager, jobid, iarray) - port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)" stream = Base.BufferStream() mark(stream) # so that we can reset to beginning after ensuring process started streamer_cmd = pipeline(`$(manager.ssh_cmd) $(manager.bpeek_cmd) $(manager.bpeek_flags) $(jobid)\[$iarray\]`; stdout=stream, stderr=stream) - backoff = manager.retry_delays - delay, backoff_state = iterate(backoff) + retry_delays = manager.retry_delays streamer_proc = run(streamer_cmd; wait=false) - worker_started = false - host = nothing - port = nothing - - while !worker_started - bytestr = readline(stream) - conn_info_match = match(port_host_regex, bytestr) - if !isnothing(conn_info_match) - host = conn_info_match.captures[2] - port = parse(Int, conn_info_match.captures[1]) - @debug("lsf worker listening", connect_info=bytestr, host, port) - # process started, reset to marked position and hand over to Distributed module - reset(stream) - worker_started = true + + # Try once before retry loop in case user supplied an empty retry_delays iterator + worker_started, bytestr, host, port = parse_host_port(stream) + worker_started && return stream, host, port + + for retry_delay in retry_delays + if occursin("Not yet started", bytestr) + # reset to marked position, bpeek process would have stopped + wait(streamer_proc) + mark(stream) + + # Try bpeeking again after the retry delay + sleep(retry_delay) + streamer_proc = run(streamer_cmd; wait=false) + elseif occursin("<< output from stdout >>", bytestr) || occursin("<< output from stderr >>", bytestr) + # ignore this bpeek output decoration and continue to read the next line + mark(stream) else - if occursin("Not yet started", bytestr) - # reset to marked position, bpeek process would have stopped - wait(streamer_proc) - mark(stream) - - # retry with backoff if within retry limit - if backoff_state[1] == 0 - close(stream) - throw(LSFException(bytestr)) - end - sleep(delay) - delay, backoff_state = iterate(backoff, backoff_state) - streamer_proc = run(streamer_cmd; wait=false) - elseif occursin("<< output from stdout >>", bytestr) || occursin("<< output from stderr >>", bytestr) - # ignore this bpeek output decoration and continue to read the next line - mark(stream) - else - # unknown response from worker process - close(stream) - throw(LSFException(bytestr)) - end + # unknown response from worker process + close(stream) + throw(LSFException(bytestr)) end + + worker_started, bytestr, host, port = parse_host_port(stream) + worker_started && break end + if !worker_started + close(stream) + throw(LSFException(bytestr)) + end + + # process started, reset to marked position and hand over to Distributed module + reset(stream) + return stream, host, port end