Skip to content

Commit

Permalink
Merge pull request #6081 from jennan/fix_job_runner_newline
Browse files Browse the repository at this point in the history
Fix job runner stdout missing newline
  • Loading branch information
oliver-sanders authored Nov 25, 2024
2 parents ee20614 + 1a9876f commit c266fd8
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 15 deletions.
1 change: 1 addition & 0 deletions .mailmap
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ Utheri Wagura <[email protected]>
github-actions[bot] <[email protected]> <41898282+github-actions[bot]@users.noreply.github.com>
github-actions[bot] <[email protected]> GitHub Action
Diquan Jabbour <[email protected]>
Maxime Rio <[email protected]>
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ requests_).
- Diquan Jabbour
- Shixian Sheng
- Utheri Wagura
- Maxime Rio
<!-- end-shortlog -->

(All contributors are identifiable with email addresses in the git version
Expand Down
2 changes: 2 additions & 0 deletions changes.d/6081.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix job submission when a batch of jobs is submitted to a runner that does
not return a newline with the job ID (did not affect built-in job runners).
2 changes: 1 addition & 1 deletion cylc/flow/job_runner_handlers/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def submit(cls, job_file_path, submit_opts):
exc.filename = "nohup"
return (1, None, str(exc))
else:
return (0, "%d\n" % (proc.pid), None)
return (0, str(proc.pid), None)


JOB_RUNNER_HANDLER = BgCommandHandler()
3 changes: 1 addition & 2 deletions cylc/flow/job_runner_handlers/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ def submit(
ret_code:
Subprocess return code.
out:
Subprocess standard output, note this should be newline
terminated.
Subprocess standard output.
err:
Subprocess standard error.
Expand Down
22 changes: 10 additions & 12 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,10 @@ def jobs_kill(self, job_log_root, job_log_dirs):
self.OUT_PREFIX_SUMMARY, now, job_log_dir, ret_code))
# Note: Print STDERR to STDOUT may look a bit strange, but it
# requires less logic for the workflow to parse the output.
if err.strip():
for line in err.splitlines(True):
if not line.endswith("\n"):
line += "\n"
sys.stdout.write("%s%s|%s|%s" % (
self.OUT_PREFIX_CMD_ERR, now, job_log_dir, line))
for line in err.strip().splitlines():
sys.stdout.write(
f"{self.OUT_PREFIX_CMD_ERR}{now}|{job_log_dir}|{line}\n"
)

def jobs_poll(self, job_log_root, job_log_dirs):
"""Poll multiple jobs.
Expand Down Expand Up @@ -303,13 +301,13 @@ def jobs_submit(self, job_log_root, job_log_dirs, remote_mode=False,
sys.stdout.write("%s%s|%s|%d|%s\n" % (
self.OUT_PREFIX_SUMMARY, now, job_log_dir, ret_code, job_id))
for key, value in [("STDERR", err), ("STDOUT", out)]:
if value is None or not value.strip():
if value is None:
continue
for line in value.splitlines(True):
if not value.endswith("\n"):
value += "\n"
sys.stdout.write("%s%s|%s|[%s] %s" % (
self.OUT_PREFIX_COMMAND, now, job_log_dir, key, line))
for line in value.strip().splitlines():
sys.stdout.write(
f"{self.OUT_PREFIX_COMMAND}{now}"
f"|{job_log_dir}|[{key}] {line}\n"
)

def job_kill(self, st_file_path):
"""Ask job runner to terminate the job specified in "st_file_path".
Expand Down
85 changes: 85 additions & 0 deletions tests/integration/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import errno
import logging
from pathlib import Path
import re
from textwrap import dedent

from cylc.flow.job_runner_mgr import JobRunnerManager
from cylc.flow.pathutil import get_workflow_run_job_dir
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.subprocctx import SubProcContext


async def test_kill_error(one, start, test_dir, capsys, log_filter):
"""It should report the failure to kill a job."""
async with start(one) as log:
# make it look like the task is running
itask = one.pool.get_tasks()[0]
itask.submit_num += 1
itask.state_reset(TASK_STATUS_RUNNING)

# fake job details
workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow))
job_id = itask.tokens.duplicate(job='01').relative_id
job_log_dir = Path(workflow_job_log_dir, job_id)

# create job status file (give it a fake pid)
job_log_dir.mkdir(parents=True)
(job_log_dir / 'job.status').write_text(dedent('''
CYLC_JOB_RUNNER_NAME=background
CYLC_JOB_ID=99999999
CYLC_JOB_PID=99999999
'''))

# attempt to kill the job using the jobs-kill script
# (note this is normally run via a subprocess)
capsys.readouterr()
JobRunnerManager().jobs_kill(str(workflow_job_log_dir), [job_id])

# the kill should fail, the failure should be written to stdout
# (the jobs-kill callback will read this in and handle it)
out, err = capsys.readouterr()
assert re.search(
# # NOTE: ESRCH = no such process
rf'TASK JOB ERROR.*{job_id}.*Errno {errno.ESRCH}',
out,
)

# feed this jobs-kill output into the scheduler
# (as if we had run the jobs-kill script as a subprocess)
one.task_job_mgr._kill_task_jobs_callback(
# mock the subprocess
SubProcContext(
one.task_job_mgr.JOBS_KILL,
['mock-cmd'],
# provide it with the out/err the script produced
out=out,
err=err,
),
one.workflow,
[itask],
)

# a warning should be logged
assert log_filter(
log,
regex=r'1/one/01:running.*job kill failed',
level=logging.WARNING,
)
assert itask.state(TASK_STATUS_RUNNING)

0 comments on commit c266fd8

Please sign in to comment.