Skip to content

Commit

Permalink
Merge pull request #6489 from cylc/8.3.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.3.x-sync into master
  • Loading branch information
MetRonnie authored Nov 25, 2024
2 parents 01bbca5 + b4e7924 commit 443112e
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 35 deletions.
1 change: 1 addition & 0 deletions .mailmap
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ Utheri Wagura <[email protected]>
github-actions[bot] <[email protected]> <41898282+github-actions[bot]@users.noreply.github.com>
github-actions[bot] <[email protected]> GitHub Action
Diquan Jabbour <[email protected]>
Maxime Rio <[email protected]>
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ requests_).
- Diquan Jabbour
- Shixian Sheng
- Utheri Wagura
- Maxime Rio
- Paul Armstrong
- Paul Earnshaw
<!-- end-shortlog -->
Expand Down
2 changes: 2 additions & 0 deletions changes.d/6081.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix job submission when a batch of jobs is submitted to a runner that does
not return a newline with the job ID (did not affect built-in job runners).
2 changes: 1 addition & 1 deletion cylc/flow/job_runner_handlers/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def submit(cls, job_file_path, submit_opts):
exc.filename = "nohup"
return (1, None, str(exc))
else:
return (0, "%d\n" % (proc.pid), None)
return (0, str(proc.pid), None)


JOB_RUNNER_HANDLER = BgCommandHandler()
3 changes: 1 addition & 2 deletions cylc/flow/job_runner_handlers/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ def submit(
ret_code:
Subprocess return code.
out:
Subprocess standard output, note this should be newline
terminated.
Subprocess standard output.
err:
Subprocess standard error.
Expand Down
22 changes: 10 additions & 12 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,10 @@ def jobs_kill(self, job_log_root, job_log_dirs):
self.OUT_PREFIX_SUMMARY, now, job_log_dir, ret_code))
# Note: Print STDERR to STDOUT may look a bit strange, but it
# requires less logic for the workflow to parse the output.
if err.strip():
for line in err.splitlines(True):
if not line.endswith("\n"):
line += "\n"
sys.stdout.write("%s%s|%s|%s" % (
self.OUT_PREFIX_CMD_ERR, now, job_log_dir, line))
for line in err.strip().splitlines():
sys.stdout.write(
f"{self.OUT_PREFIX_CMD_ERR}{now}|{job_log_dir}|{line}\n"
)

def jobs_poll(self, job_log_root, job_log_dirs):
"""Poll multiple jobs.
Expand Down Expand Up @@ -303,13 +301,13 @@ def jobs_submit(self, job_log_root, job_log_dirs, remote_mode=False,
sys.stdout.write("%s%s|%s|%d|%s\n" % (
self.OUT_PREFIX_SUMMARY, now, job_log_dir, ret_code, job_id))
for key, value in [("STDERR", err), ("STDOUT", out)]:
if value is None or not value.strip():
if value is None:
continue
for line in value.splitlines(True):
if not value.endswith("\n"):
value += "\n"
sys.stdout.write("%s%s|%s|[%s] %s" % (
self.OUT_PREFIX_COMMAND, now, job_log_dir, key, line))
for line in value.strip().splitlines():
sys.stdout.write(
f"{self.OUT_PREFIX_COMMAND}{now}"
f"|{job_log_dir}|[{key}] {line}\n"
)

def job_kill(self, st_file_path):
"""Ask job runner to terminate the job specified in "st_file_path".
Expand Down
29 changes: 14 additions & 15 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
Check or poll a workflow database for task statuses or completed outputs.
The ID argument can target a workflow, or a cycle point, or a specific
task, with an optional selector on cycle or task to match task status,
output trigger (if not a status, or with --trigger) or output message
(with --message). All matching results will be printed.
The ID argument can target a workflow, or a cycle point, or a specific task,
with an optional selector on cycle or task to match final task statuses,
output trigger (with --triggers), or output message (with --messages).
You cannot poll for transient states such as "submitted" and "running";
poll for the corresponding output triggers instead ("submitted", "started").
All matching results will be printed.
If no results match, the command will repeatedly check (poll) until a match
is found or polling is exhausted (see --max-polls and --interval). For a
Expand All @@ -35,14 +38,12 @@
Legacy (pre-8.3.0) options are supported, but deprecated, for existing scripts:
cylc workflow-state --task=NAME --point=CYCLE --status=STATUS
--output=MESSAGE --message=MESSAGE --task-point WORKFLOW
(Note from 8.0 until 8.3.0 --output and --message both match task messages).
(Note from 8.0 until 8.3.0 --output and --message both matched task messages).
In "cycle/task:selector" the selector will match task statuses, unless:
- if it is not a known status, it will match task output triggers
(Cylc 8 DB) or task ouput messages (Cylc 7 DB)
- with --triggers, it will only match task output triggers
- with --messages (deprecated), it will only match task output messages.
Triggers are more robust - they match manually and naturally set outputs.
- with --triggers, it will only match task output triggers.
- with --messages, it will only match task output messages. It is recommended
to use triggers instead - they match both naturally & manually set outputs.
Selector does not default to "succeeded". If omitted, any status will match.
Expand All @@ -64,8 +65,6 @@
Warnings:
- Typos in the workflow or task ID will result in fruitless polling.
- To avoid missing transient states ("submitted", "running") poll for the
corresponding output trigger instead ("submitted", "started").
- Cycle points are auto-converted to the DB point format (and UTC mode).
- Task outputs manually completed by "cylc set" have "(force-completed)"
recorded as the task message in the DB, so it is best to query trigger
Expand Down Expand Up @@ -296,8 +295,8 @@ def get_option_parser() -> COP:

parser.add_option(
"--triggers",
help="Task selector should match output triggers rather than status."
" (Note this is not needed for custom outputs).",
help="Task selector should match output trigger names rather than "
"status.",
action="store_true", dest="is_trigger", default=False)

parser.add_option(
Expand Down Expand Up @@ -370,7 +369,7 @@ def main(parser: COP, options: 'Values', *ids: str) -> None:
[
options.depr_task,
options.depr_status,
options.depr_msg, # --message and --trigger
options.depr_msg,
options.depr_point,
options.depr_env_point
]
Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ def workflow_state(
e.g. PT1H (1 hour) or P1 (1 integer cycle)
flow_num:
Flow number of the target task.
is_message:
Interpret the task:selector as a task output message
(the default is a task status or trigger)
is_trigger:
Interpret the task:selector as a task trigger name
(only needed if it is also a valid status name)
Interpret the task:selector as a task trigger name rather than a
task status.
is_message:
Interpret the task:selector as a task output message rather than a
task status.
alt_cylc_run_dir:
Alternate cylc-run directory, e.g. for another user.
Expand Down
85 changes: 85 additions & 0 deletions tests/integration/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import errno
import logging
from pathlib import Path
import re
from textwrap import dedent

from cylc.flow.job_runner_mgr import JobRunnerManager
from cylc.flow.pathutil import get_workflow_run_job_dir
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.subprocctx import SubProcContext


async def test_kill_error(one, start, test_dir, capsys, log_filter):
"""It should report the failure to kill a job."""
async with start(one) as log:
# make it look like the task is running
itask = one.pool.get_tasks()[0]
itask.submit_num += 1
itask.state_reset(TASK_STATUS_RUNNING)

# fake job details
workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow))
job_id = itask.tokens.duplicate(job='01').relative_id
job_log_dir = Path(workflow_job_log_dir, job_id)

# create job status file (give it a fake pid)
job_log_dir.mkdir(parents=True)
(job_log_dir / 'job.status').write_text(dedent('''
CYLC_JOB_RUNNER_NAME=background
CYLC_JOB_ID=99999999
CYLC_JOB_PID=99999999
'''))

# attempt to kill the job using the jobs-kill script
# (note this is normally run via a subprocess)
capsys.readouterr()
JobRunnerManager().jobs_kill(str(workflow_job_log_dir), [job_id])

# the kill should fail, the failure should be written to stdout
# (the jobs-kill callback will read this in and handle it)
out, err = capsys.readouterr()
assert re.search(
# # NOTE: ESRCH = no such process
rf'TASK JOB ERROR.*{job_id}.*Errno {errno.ESRCH}',
out,
)

# feed this jobs-kill output into the scheduler
# (as if we had run the jobs-kill script as a subprocess)
one.task_job_mgr._kill_task_jobs_callback(
# mock the subprocess
SubProcContext(
one.task_job_mgr.JOBS_KILL,
['mock-cmd'],
# provide it with the out/err the script produced
out=out,
err=err,
),
one.workflow,
[itask],
)

# a warning should be logged
assert log_filter(
log,
regex=r'1/one/01:running.*job kill failed',
level=logging.WARNING,
)
assert itask.state(TASK_STATUS_RUNNING)

0 comments on commit 443112e

Please sign in to comment.