diff --git a/.github/workflows/2_auto_publish_release.yml b/.github/workflows/2_auto_publish_release.yml index 15b4a590a69..011eb090202 100644 --- a/.github/workflows/2_auto_publish_release.yml +++ b/.github/workflows/2_auto_publish_release.yml @@ -38,7 +38,7 @@ jobs: uses: cylc/release-actions/build-python-package@v1 - name: Publish distribution to PyPI - uses: pypa/gh-action-pypi-publish@v1.10.1 + uses: pypa/gh-action-pypi-publish@v1.12.2 with: user: __token__ # uses the API token feature of PyPI - least permissions possible password: ${{ secrets.PYPI_TOKEN }} diff --git a/CHANGES.md b/CHANGES.md index 1dd641991c0..ab3010892b1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -101,8 +101,6 @@ $ towncrier create ..md --content "Short description" ### 🔧 Fixes -[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing. - [#6186](https://github.com/cylc/cylc-flow/pull/6186) - Fixed bug where using flow numbers with `cylc set` would not work correctly. [#6200](https://github.com/cylc/cylc-flow/pull/6200) - Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused @@ -119,6 +117,8 @@ $ towncrier create ..md --content "Short description" [#6176](https://github.com/cylc/cylc-flow/pull/6176) - Fix bug where jobs which fail to submit are not shown in GUI/TUI if submission retries are set. +[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing. + ## __cylc-8.3.0 (Released 2024-06-18)__ ### ⚠ Breaking Changes diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d215aaf3e82..b263449b76c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -97,6 +97,7 @@ requests_). - Shixian Sheng - Utheri Wagura - Paul Armstrong + - Paul Earnshaw (All contributors are identifiable with email addresses in the git version diff --git a/changes.d/6137.feat.md b/changes.d/6137.feat.md new file mode 100644 index 00000000000..d947999de99 --- /dev/null +++ b/changes.d/6137.feat.md @@ -0,0 +1 @@ +New Cylc lint rule: S014: Don't use job runner specific execution time limit directives, use execution time limit. \ No newline at end of file diff --git a/changes.d/6168.feat.md b/changes.d/6168.feat.md new file mode 100644 index 00000000000..84ccc73b236 --- /dev/null +++ b/changes.d/6168.feat.md @@ -0,0 +1 @@ +Allow symlinking log/job separately from log diff --git a/changes.d/6289.feat.md b/changes.d/6289.feat.md new file mode 100644 index 00000000000..9284a768f2f --- /dev/null +++ b/changes.d/6289.feat.md @@ -0,0 +1 @@ +Made the errors resulting from Jinja2 `raise` and `assert` statements more straight forward. diff --git a/changes.d/6444.feat.md b/changes.d/6444.feat.md new file mode 100644 index 00000000000..efa310dfa0b --- /dev/null +++ b/changes.d/6444.feat.md @@ -0,0 +1 @@ +The scheduler now traps the SIGINT, SIGTERM and SIGHUP signals and will respond by shutting down in --now mode. If the workflow is already shutting down in --now mode, it will escalate the shutdown to --now --now mode. diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py index 7662fc9469c..71a45f6bbf3 100644 --- a/cylc/flow/__init__.py +++ b/cylc/flow/__init__.py @@ -15,9 +15,8 @@ # along with this program. If not, see . """Set up the cylc environment.""" -import os import logging - +import os CYLC_LOG = 'cylc' @@ -53,7 +52,7 @@ def environ_init(): environ_init() -__version__ = '8.3.7.dev' +__version__ = '8.4.0.dev' def iter_entry_points(entry_point_name): diff --git a/cylc/flow/broadcast_report.py b/cylc/flow/broadcast_report.py index 72fedb4cbef..cb44f2212f3 100644 --- a/cylc/flow/broadcast_report.py +++ b/cylc/flow/broadcast_report.py @@ -72,7 +72,7 @@ def get_broadcast_change_iter(modified_settings, is_cancel=False): value = setting keys_str = "" while isinstance(value, dict): - key, value = list(value.items())[0] + key, value = next(iter(value.items())) if isinstance(value, dict): keys_str += "[" + key + "]" else: diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index 7dc04c99a02..987d2dc29ac 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -30,6 +30,7 @@ from cylc.flow.exceptions import GlobalConfigError from cylc.flow.hostuserutil import get_user_home from cylc.flow.network.client_factory import CommsMeth +from cylc.flow.pathutil import SYMLINKABLE_LOCATIONS from cylc.flow.parsec.config import ( ConfigNode as Conf, ParsecConfig, @@ -1148,55 +1149,21 @@ def default_for( .. versionadded:: 8.0.0 """) - Conf('log', VDR.V_STRING, None, desc=""" - Alternative location for the log dir. - - If specified the workflow log directory will be created in - ``/cylc-run//log`` and a - symbolic link will be created from - ``$HOME/cylc-run//log``. If not specified - the workflow log directory will be created in - ``$HOME/cylc-run//log``. - - .. versionadded:: 8.0.0 - """) - Conf('share', VDR.V_STRING, None, desc=""" - Alternative location for the share dir. - - If specified the workflow share directory will be - created in ``/cylc-run//share`` - and a symbolic link will be created from - ``<$HOME/cylc-run//share``. If not specified - the workflow share directory will be created in - ``$HOME/cylc-run//share``. - - .. versionadded:: 8.0.0 - """) - Conf('share/cycle', VDR.V_STRING, None, desc=""" - Alternative directory for the share/cycle dir. - - If specified the workflow share/cycle directory - will be created in - ``/cylc-run//share/cycle`` - and a symbolic link will be created from - ``$HOME/cylc-run//share/cycle``. If not - specified the workflow share/cycle directory will be - created in ``$HOME/cylc-run//share/cycle``. - - .. versionadded:: 8.0.0 - """) - Conf('work', VDR.V_STRING, None, desc=""" - Alternative directory for the work dir. - - If specified the workflow work directory will be created in - ``/cylc-run//work`` and a - symbolic link will be created from - ``$HOME/cylc-run//work``. If not specified - the workflow work directory will be created in - ``$HOME/cylc-run//work``. - - .. versionadded:: 8.0.0 - """) + for folder, versionadded in SYMLINKABLE_LOCATIONS.items(): + Conf(folder, VDR.V_STRING, None, desc=f""" + Alternative location for the {folder} dir. + + If specified the workflow {folder} directory will + be created in + ``/cylc-run//{folder}`` + and a symbolic link will be created from + ``$HOME/cylc-run//{folder}``. + If not specified the workflow log directory will + be created in + ``$HOME/cylc-run//{folder}``. + + .. versionadded:: {versionadded} + """) with Conf('platforms', desc=''' Platforms allow you to define compute resources available at your site. @@ -1311,7 +1278,7 @@ def default_for( The means by which task progress messages are reported back to the running workflow. - Options: + ..rubric:: Options: zmq Direct client-server TCP communication via network ports @@ -1320,6 +1287,8 @@ def default_for( ssh Use non-interactive ssh for task communications + For more information, see :ref:`TaskComms`. + .. versionchanged:: 8.0.0 {REPLACES}``global.rc[hosts][]task communication diff --git a/cylc/flow/clean.py b/cylc/flow/clean.py index 72c23e1e3e2..d4a443b4900 100644 --- a/cylc/flow/clean.py +++ b/cylc/flow/clean.py @@ -126,7 +126,7 @@ def _clean_check(opts: 'Values', id_: str, run_dir: Path) -> None: except ContactFileExists as exc: raise ServiceFileError( f"Cannot clean running workflow {id_}.\n\n{exc}" - ) + ) from None def init_clean(id_: str, opts: 'Values') -> None: @@ -170,7 +170,7 @@ def init_clean(id_: str, opts: 'Values') -> None: try: platform_names = get_platforms_from_db(local_run_dir) except ServiceFileError as exc: - raise ServiceFileError(f"Cannot clean {id_} - {exc}") + raise ServiceFileError(f"Cannot clean {id_} - {exc}") from None except sqlite3.OperationalError as exc: # something went wrong with the query # e.g. the table/field we need isn't there @@ -183,7 +183,7 @@ def init_clean(id_: str, opts: 'Values') -> None: ' local files (you may need to remove files on other' ' platforms manually).' ) - raise ServiceFileError(f"Cannot clean {id_} - {exc}") + raise ServiceFileError(f"Cannot clean {id_} - {exc}") from exc if platform_names and platform_names != {'localhost'}: remote_clean( @@ -367,7 +367,8 @@ def remote_clean( except PlatformLookupError as exc: raise PlatformLookupError( f"Cannot clean {id_} on remote platforms as the workflow database " - f"is out of date/inconsistent with the global config - {exc}") + f"is out of date/inconsistent with the global config - {exc}" + ) from None queue: Deque[RemoteCleanQueueTuple] = deque() remote_clean_cmd = partial( diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index d87c0711a8d..eb5e45c4734 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -77,7 +77,7 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: try: int(val) except ValueError: - raise InputError(ERR_OPT_FLOW_VAL) + raise InputError(ERR_OPT_FLOW_VAL) from None if flow_wait and flows[0] in {FLOW_NEW, FLOW_NONE}: raise InputError(ERR_OPT_FLOW_WAIT) diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 73acda5a550..b8b777e0957 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -211,7 +211,7 @@ async def stop( try: mode = StopMode(mode) except ValueError: - raise CommandFailedError(f"Invalid stop mode: '{mode}'") + raise CommandFailedError(f"Invalid stop mode: '{mode}'") from None schd._set_stop(mode) if mode is StopMode.REQUEST_KILL: schd.time_next_kill = time() @@ -308,7 +308,7 @@ async def set_verbosity(schd: 'Scheduler', level: Union[int, str]): lvl = int(level) LOG.setLevel(lvl) except (TypeError, ValueError) as exc: - raise CommandFailedError(exc) + raise CommandFailedError(exc) from None cylc.flow.flags.verbosity = log_level_to_verbosity(lvl) yield diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 9cb19ac7d2c..786095a215d 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -199,11 +199,11 @@ def interpolate_template(tmpl, params_dict): try: return tmpl % params_dict except KeyError: - raise ParamExpandError('bad parameter') + raise ParamExpandError('bad parameter') from None except TypeError: - raise ParamExpandError('wrong data type for parameter') + raise ParamExpandError('wrong data type for parameter') from None except ValueError: - raise ParamExpandError('bad template syntax') + raise ParamExpandError('bad template syntax') from None class WorkflowConfig: @@ -480,8 +480,8 @@ def __init__( get_interval(offset_string).standardise()) except IntervalParsingError: raise WorkflowConfigError( - "Illegal %s spec: %s" % ( - s_type, offset_string)) + "Illegal %s spec: %s" % (s_type, offset_string) + ) from None extn = "(" + offset_string + ")" # Replace family names with members. @@ -709,7 +709,7 @@ def process_initial_cycle_point(self) -> None: try: icp = ingest_time(orig_icp, get_current_time_string()) except IsodatetimeError as exc: - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None self.evaluated_icp = None if icp != orig_icp: # now/next()/previous() was used, need to store @@ -762,7 +762,7 @@ def process_start_cycle_point(self) -> None: for taskid in self.options.starttask ] except ValueError as exc: - raise InputError(str(exc)) + raise InputError(str(exc)) from None self.start_point = min( get_point(cycle).standardise() for cycle in cycle_points if cycle @@ -1115,7 +1115,7 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None: f'\n {expr}' '\nThe "finished" output cannot be used in completion' ' expressions, use "succeeded or failed".' - ) + ) from None for alt_qualifier, qualifier in ALT_QUALIFIERS.items(): _alt_compvar = trigger_to_completion_variable(alt_qualifier) @@ -1126,21 +1126,21 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None: f'\n {expr}' f'\nUse "{_compvar}" not "{_alt_compvar}" ' 'in completion expressions.' - ) + ) from None raise WorkflowConfigError( # NOTE: str(exc) == "name 'x' is not defined" tested in # tests/integration/test_optional_outputs.py f'Error in [runtime][{task_name}]completion:' f'\n{error}' - ) + ) from None except Exception as exc: # includes InvalidCompletionExpression # expression contains non-whitelisted syntax or any other error in # the expression e.g. SyntaxError raise WorkflowConfigError( f'Error in [runtime][{task_name}]completion:' f'\n{str(exc)}' - ) + ) from None # ensure consistency between the graph and the completion expression for compvar in ( @@ -1416,11 +1416,12 @@ def compute_family_tree(self): c3_single.mro(name)) except RecursionError: raise WorkflowConfigError( - "circular [runtime] inheritance?") + "circular [runtime] inheritance?" + ) from None except Exception as exc: # catch inheritance errors # TODO - specialise MRO exceptions - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None for name in self.cfg['runtime']: ancestors = self.runtime['linearized ancestors'][name] @@ -1772,7 +1773,7 @@ def _check_task_event_handlers(self): f' {taskdef.name}:' f' {handler_template}:' f' {repr(exc)}' - ) + ) from None def _check_special_tasks(self): """Check declared special tasks are valid, and detect special @@ -1879,7 +1880,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, try: expr_list = listify(lexpression) except SyntaxError: - raise WorkflowConfigError('Error in expression "%s"' % lexpression) + raise WorkflowConfigError( + 'Error in expression "%s"' % lexpression + ) from None triggers = {} xtrig_labels = set() @@ -1956,7 +1959,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, xtrig = xtrigs[label] except KeyError: if label != 'wall_clock': - raise WorkflowConfigError(f"xtrigger not defined: {label}") + raise WorkflowConfigError( + f"xtrigger not defined: {label}" + ) from None else: # Allow "@wall_clock" in graph as implicit zero-offset. xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {}) @@ -2290,7 +2295,7 @@ def load_graph(self): msg += ' (final cycle point=%s)' % fcp if isinstance(exc, CylcError): msg += ' %s' % exc.args[0] - raise WorkflowConfigError(msg) + raise WorkflowConfigError(msg) from None self.sequences.append(seq) parser = GraphParser( family_map, @@ -2445,7 +2450,7 @@ def get_taskdef( except TaskDefError as exc: if orig_expr: LOG.error(orig_expr) - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None else: # Record custom message outputs from [runtime]. messages = set(self.cfg['runtime'][name]['outputs'].values()) @@ -2459,7 +2464,7 @@ def get_taskdef( 'Duplicate task message in' f' "[runtime][{name}][outputs]' f'{output} = {message}" - messages must be unique' - ) + ) from None valid, msg = TaskOutputValidator.validate(output) if not valid: raise WorkflowConfigError( @@ -2485,7 +2490,7 @@ def _get_taskdef(self, name: str) -> TaskDef: try: rtcfg = self.cfg['runtime'][name] except KeyError: - raise WorkflowConfigError("Task not defined: %s" % name) + raise WorkflowConfigError("Task not defined: %s" % name) from None # We may want to put in some handling for cases of changing the # initial cycle via restart (accidentally or otherwise). @@ -2577,7 +2582,9 @@ def process_metadata_urls(self): 'workflow': self.workflow, } except (KeyError, ValueError): - raise InputError(f'Invalid template [meta]URL: {url}') + raise InputError( + f'Invalid template [meta]URL: {url}' + ) from None else: LOG.warning( 'Detected deprecated template variables in [meta]URL.' @@ -2613,7 +2620,9 @@ def process_metadata_urls(self): 'task': name, } except (KeyError, ValueError): - raise InputError(f'Invalid template [meta]URL: {url}') + raise InputError( + f'Invalid template [meta]URL: {url}' + ) from None else: LOG.warning( 'Detected deprecated template variables in' diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py index c963804a7d5..ce81ba55a94 100644 --- a/cylc/flow/cycling/integer.py +++ b/cylc/flow/cycling/integer.py @@ -150,7 +150,7 @@ def standardise(self, allow_truncated=True): try: self.value = str(int(self)) except (TypeError, ValueError) as exc: - raise PointParsingError(type(self), self.value, exc) + raise PointParsingError(type(self), self.value, exc) from None return self def __int__(self): diff --git a/cylc/flow/cycling/iso8601.py b/cylc/flow/cycling/iso8601.py index 3d7cf42bc3e..9b93bc77e6e 100644 --- a/cylc/flow/cycling/iso8601.py +++ b/cylc/flow/cycling/iso8601.py @@ -109,7 +109,7 @@ def standardise(self, allow_truncated=True): WorkflowSpecifics.NUM_EXPANDED_YEAR_DIGITS) else: message = str(exc) - raise PointParsingError(type(self), self.value, message) + raise PointParsingError(type(self), self.value, message) from None return self def sub(self, other): @@ -183,7 +183,7 @@ def standardise(self): try: self.value = str(interval_parse(self.value)) except IsodatetimeError: - raise IntervalParsingError(type(self), self.value) + raise IntervalParsingError(type(self), self.value) from None return self def add(self, other): @@ -789,7 +789,7 @@ def prev_next( raise WorkflowConfigError( f'Invalid offset: {my_time}:' f' Offset lists are semicolon separated, try {suggest}' - ) + ) from None timepoints.append(parsed_point + now) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 0f4beb44b86..a4b28d44fdc 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -116,6 +116,7 @@ if TYPE_CHECKING: from cylc.flow.cycling import PointBase from cylc.flow.flow_mgr import FlowNums + from cylc.flow.prerequisite import Prerequisite from cylc.flow.scheduler import Scheduler EDGES = 'edges' @@ -1444,7 +1445,7 @@ def apply_task_proxy_db_history(self): prereq_ids.add(f'{relative_id}/{flow_nums_str}') # Batch load prerequisites of tasks according to flow. - prereqs_map = {} + prereqs_map: Dict[str, dict] = {} for ( cycle, name, prereq_name, prereq_cycle, prereq_output, satisfied @@ -1458,16 +1459,14 @@ def apply_task_proxy_db_history(self): ] = satisfied if satisfied != '0' else False for ikey, prereqs in prereqs_map.items(): + itask_prereq: Prerequisite for itask_prereq in ( - self.db_load_task_proxies[ikey][0].state.prerequisites + self.db_load_task_proxies[ikey][0].state.prerequisites ): - for key in itask_prereq.satisfied.keys(): - try: - itask_prereq.satisfied[key] = prereqs[key] - except KeyError: - # This prereq is not in the DB: new dependencies - # added to an already-spawned task before restart. - itask_prereq.satisfied[key] = False + for key in itask_prereq: + itask_prereq[key] = prereqs.get(key, False) + # (False if prereq is not in the DB: new dependencies + # added to an already-spawned task before restart.) # Extract info from itasks to data-store. for task_info in self.db_load_task_proxies.values(): @@ -2421,23 +2420,27 @@ def delta_task_output( objects from the workflow task pool. """ - tproxy: Optional[PbTaskProxy] - tp_id, tproxy = self.store_node_fetcher(itask.tokens) - if not tproxy: - return - outputs = itask.state.outputs - label = outputs.get_trigger(message) - # update task instance - update_time = time() - tp_delta = self.updated[TASK_PROXIES].setdefault( - tp_id, PbTaskProxy(id=tp_id)) - tp_delta.stamp = f'{tp_id}@{update_time}' - output = tp_delta.outputs[label] - output.label = label - output.message = message - output.satisfied = outputs.is_message_complete(message) - output.time = update_time - self.updates_pending = True + # TODO: Restore incremental update when we have a protocol to do so + # https://github.com/cylc/cylc-flow/issues/6307 + return self.delta_task_outputs(itask) + + # tproxy: Optional[PbTaskProxy] + # tp_id, tproxy = self.store_node_fetcher(itask.tokens) + # if not tproxy: + # return + # outputs = itask.state.outputs + # label = outputs.get_trigger(message) + # # update task instance + # update_time = time() + # tp_delta = self.updated[TASK_PROXIES].setdefault( + # tp_id, PbTaskProxy(id=tp_id)) + # tp_delta.stamp = f'{tp_id}@{update_time}' + # output = tp_delta.outputs[label] + # output.label = label + # output.message = message + # output.satisfied = outputs.is_message_complete(message) + # output.time = update_time + # self.updates_pending = True def delta_task_outputs(self, itask: TaskProxy) -> None: """Create delta for change in all task proxy outputs. diff --git a/cylc/flow/dbstatecheck.py b/cylc/flow/dbstatecheck.py index 2de4bd7b3b5..fc2d9cf0da3 100644 --- a/cylc/flow/dbstatecheck.py +++ b/cylc/flow/dbstatecheck.py @@ -90,7 +90,7 @@ def __init__(self, rund, workflow, db_path=None): except sqlite3.OperationalError: with suppress(Exception): self.conn.close() - raise exc # original error + raise exc from None # original error def __enter__(self): return self @@ -141,7 +141,7 @@ def adjust_point_to_db(self, cycle, offset): raise InputError( f'Cycle point "{cycle}" is not compatible' f' with DB point format "{self.db_point_fmt}"' - ) + ) from None return cycle @staticmethod @@ -378,7 +378,7 @@ def check_polling_config(selector, is_trigger, is_message): try: trigger = TASK_STATE_MAP[selector] except KeyError: - raise InputError(f'No such task state "{selector}"') + raise InputError(f'No such task state "{selector}"') from None else: if trigger is None: raise InputError( diff --git a/cylc/flow/etc/examples/cycle-over-irregular-dates/.validate b/cylc/flow/etc/examples/cycle-over-irregular-dates/.validate new file mode 100755 index 00000000000..af06f1c5a7e --- /dev/null +++ b/cylc/flow/etc/examples/cycle-over-irregular-dates/.validate @@ -0,0 +1,24 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +set -eux + +cylc lint ./simple +cylc validate --check-circular ./simple + +cylc lint ./inter-dependent +cylc validate --check-circular ./inter-dependent diff --git a/cylc/flow/etc/examples/cycle-over-irregular-dates/index.rst b/cylc/flow/etc/examples/cycle-over-irregular-dates/index.rst new file mode 100644 index 00000000000..181a57ebc22 --- /dev/null +++ b/cylc/flow/etc/examples/cycle-over-irregular-dates/index.rst @@ -0,0 +1,132 @@ +Irregular Cycling +----------------- + +We typically schedule tasks on regular intervals, e.g. ``P1D`` (every day) or +``PT1H`` (every hour), however, sometimes our intervals are irregular. + +:ref:`user_guide.scheduling.exclusions` can be used to "subtract" dates or +entire recurrences e.g: + +``PT1H!PT6H`` + Every hour, except every six hours. +``PT1H!(T00, T12)`` + Every hour, except at 00:00 and 12:00. + +However, sometimes we want to schedule tasks on completely irregular intervals +or at arbitrary dates. E.g, when working on case studies, you might have a list +or range of arbitrary dates to work with. + + +.. rubric:: Simple Example + +.. admonition:: Get a copy of this example + :class: hint + + .. code-block:: console + + $ cylc get-resources examples/cycle-over-irregular-dates/simple + +This example uses :ref:`Jinja` to define the list of dates and write out a +scheduling section for each. + +.. literalinclude:: simple/flow.cylc + :language: cylc + +.. tip:: + + You can see the result of this Jinja2 code by running the ``cylc view -p`` + command. + + +.. rubric:: Example with inter-cycle dependencies + +.. admonition:: Get a copy of this example + :class: hint + + .. code-block:: console + + $ cylc get-resources examples/cycle-over-irregular-dates/inter-dependent + +.. _Jinja2 loop variable: https://jinja.palletsprojects.com/en/3.0.x/templates/#list-of-control-structures + +If you have dependencies between the cycles, you can make this work by using the +`Jinja2 loop variable`_. + +For example, the previous iteration of the ``{% for date in DATES %}`` loop is +available as ``loop.previtem`` and the next as ``loop.nextitem``. + +If you need to make the tasks which cycle on *irregular* intervals dependent on +tasks which cycle on *regular* intervals, then you might find the +:py:func:`strftime ` function +helpful as a way of determining the nearest matching cycle. + +.. literalinclude:: inter-dependent/flow.cylc + :language: cylc + +You can see how the cycles are linked together using the ``cylc graph`` +command: + +.. NOTE: use "cylc graph . -o foo.dot --cycles 2000 2001" to generate this code + +.. digraph:: Example + :align: center + + size = "7,15" + + graph [fontname="sans" fontsize="25"] + node [fontname="sans"] + + subgraph "cluster_20000101T0000Z" { + label="20000101T0000Z" + style="dashed" + "20000101T0000Z/install" [label="install\n20000101T0000Z"] + "20000101T0000Z/prep" [label="prep\n20000101T0000Z"] + } + + subgraph "cluster_20000105T0600Z" { + label="20000105T0600Z" + style="dashed" + "20000105T0600Z/plot" [label="plot\n20000105T0600Z"] + "20000105T0600Z/run_model" [label="run_model\n20000105T0600Z"] + } + + subgraph "cluster_20000305T1200Z" { + label="20000305T1200Z" + style="dashed" + "20000305T1200Z/plot" [label="plot\n20000305T1200Z"] + "20000305T1200Z/run_model" [label="run_model\n20000305T1200Z"] + } + + subgraph "cluster_20000528T1336Z" { + label="20000528T1336Z" + style="dashed" + "20000528T1336Z/plot" [label="plot\n20000528T1336Z"] + "20000528T1336Z/run_model" [label="run_model\n20000528T1336Z"] + } + + subgraph "cluster_20010101T0000Z" { + label="20010101T0000Z" + style="dashed" + "20010101T0000Z/prep" [label="prep\n20010101T0000Z"] + } + + subgraph "cluster_20010105T2324Z" { + label="20010105T2324Z" + style="dashed" + "20010105T2324Z/plot" [label="plot\n20010105T2324Z"] + "20010105T2324Z/run_model" [label="run_model\n20010105T2324Z"] + } + + "20000101T0000Z/install" -> "20000101T0000Z/prep" + "20000101T0000Z/install" -> "20010101T0000Z/prep" + "20000101T0000Z/prep" -> "20000105T0600Z/run_model" + "20000101T0000Z/prep" -> "20000305T1200Z/run_model" + "20000101T0000Z/prep" -> "20000528T1336Z/run_model" + "20000105T0600Z/run_model" -> "20000105T0600Z/plot" + "20000105T0600Z/run_model" -> "20000305T1200Z/run_model" + "20000305T1200Z/run_model" -> "20000305T1200Z/plot" + "20000305T1200Z/run_model" -> "20000528T1336Z/run_model" + "20000528T1336Z/run_model" -> "20000528T1336Z/plot" + "20000528T1336Z/run_model" -> "20010105T2324Z/run_model" + "20010101T0000Z/prep" -> "20010105T2324Z/run_model" + "20010105T2324Z/run_model" -> "20010105T2324Z/plot" diff --git a/cylc/flow/etc/examples/cycle-over-irregular-dates/inter-dependent/flow.cylc b/cylc/flow/etc/examples/cycle-over-irregular-dates/inter-dependent/flow.cylc new file mode 100644 index 00000000000..f0c02a7d709 --- /dev/null +++ b/cylc/flow/etc/examples/cycle-over-irregular-dates/inter-dependent/flow.cylc @@ -0,0 +1,51 @@ +#!Jinja2 + +{% + set DATES = [ + '2000-01-01T00:00Z', + '2000-01-01T06:00Z', + '2000-03-05T12:00Z', + '2000-05-28T13:36Z', + '2001-01-05T23:24', + '2002-04-30T04:20', + ] +%} + +[meta] + title = Irregular cycling example + description = """ + A workflow that runs a group of tasks on arbitrary dates + with inter-cycle dependencies between those dates. + """ + +[scheduling] + initial cycle point = 2000 + [[graph]] + # define the tasks you want to run on startup + R1 = install + + # run this graph every year + P1Y = """ + install[^] => prep + """ + +{# loop over the list of dates #} +{% for date in DATES %} + # schedule the tasks to run at each date + R1/{{ date }} = """ + # make "run_model" depend on the "prep" task from the same year + prep[{{ date | strftime('%Y') }}] => run_model => plot + + {# include this for all but the first date #} + {% if not loop.first %} + # make the run_model task depend on its previous instance + run_model[ {{ loop.previtem }} ] => run_model + {% endif %} + """ +{% endfor %} + +[runtime] + [[install]] + [[prep]] + [[run_model]] + [[plot]] diff --git a/cylc/flow/etc/examples/cycle-over-irregular-dates/simple/flow.cylc b/cylc/flow/etc/examples/cycle-over-irregular-dates/simple/flow.cylc new file mode 100644 index 00000000000..be39f88f0c3 --- /dev/null +++ b/cylc/flow/etc/examples/cycle-over-irregular-dates/simple/flow.cylc @@ -0,0 +1,40 @@ +#!Jinja2 + +{% + set DATES = [ + '2000-01-01T00:00Z', + '2000-01-01T06:00Z', + '2000-03-05T12:00Z', + '2000-05-28T13:36Z', + '2001-01-05T23:24', + '2002-04-30T04:20', + ] +%} + +[meta] + title = Irregular cycling example + description = """ + A workflow that runs a group of tasks on arbitrary dates. + """ + +[scheduling] + # start cycling at the first date + initial cycle point = {{ DATES[0] }} + [[graph]] + # define the tasks you want to run on startup + R1 = install + +{# loop over the list of dates #} +{% for date in DATES %} + # schedule the tasks to run at each date + R1/{{ date }} = """ + # NOTE: install[^] references the task "install" in the first cycle + install[^] => prep => run_model => plot + """ +{% endfor %} + +[runtime] + [[install]] + [[prep]] + [[run_model]] + [[plot]] diff --git a/cylc/flow/etc/examples/extending-workflow/.validate b/cylc/flow/etc/examples/extending-workflow/.validate new file mode 100755 index 00000000000..43c810372ce --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/.validate @@ -0,0 +1,80 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +set -eux + +test_simple () { + local ID + ID="$(< /dev/urandom tr -dc A-Za-z | head -c6)" + + # lint + cylc lint ./simple + + # copy into a temp directory + local SRC_DIR + SRC_DIR="$(mktemp -d)" + cp simple/flow.cylc "$SRC_DIR" + + # speed things up with simulation mode + cat >>"${SRC_DIR}/flow.cylc" <<__HERE__ + [runtime] + [[root]] + [[[simulation]]] + default run length = PT0S +__HERE__ + + # start the workflow + cylc vip \ + --check-circular \ + --no-run-name \ + --no-detach \ + --workflow-name "$ID" \ + --mode=simulation \ + "$SRC_DIR" + + # it should have reached the 2002 cycle + grep '2002/a' "${HOME}/cylc-run/${ID}/log/scheduler/log" + if grep '2003/a' "${HOME}/cylc-run/${ID}/log/scheduler/log"; then + exit 1 + fi + + # edit the "stop after cycle point" + sed -i \ + 's/stop after cycle point.*/stop after cycle point = 2004/' \ + "${SRC_DIR}/flow.cylc" + + # continue the run + cylc vr \ + --no-detach \ + --mode=simulation \ + --yes \ + "$ID" + + # it should have reached the 2004 cycle + grep '2004/a' "${HOME}/cylc-run/${ID}/log/scheduler/log" + if grep '2005/a' "${HOME}/cylc-run/${ID}/log/scheduler/log"; then + exit 1 + fi + + # clean up + cylc clean "$ID" + + rm -r "${SRC_DIR}" +} + + +test_simple diff --git a/cylc/flow/etc/examples/extending-workflow/index.rst b/cylc/flow/etc/examples/extending-workflow/index.rst new file mode 100644 index 00000000000..35c5b086aec --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/index.rst @@ -0,0 +1,105 @@ +Extending Workflow +------------------ + +.. cylc-scope:: flow.cylc[scheduling] + +Sometimes we may run a workflow to :term:`completion `, +but subsequently wish to run it for a few more cycles. + +With Cylc 7 this was often done by changing the `final cycle point` and +restarting the workflow. This approach worked, but was a little awkward. +It's possible with Cylc 8, but we would recommend moving away from this +pattern instead. + +The recommended approach to this problem (Cylc 6+) is to use the +`stop after cycle point` rather than the `final cycle point`. + +The `stop after cycle point` tells Cylc to **stop** after the workflow passes +the specified point, whereas the `final cycle point` tells Cylc that the +workflow **finishes** at the specified point. + +When a workflow **finishes**, it is a little awkward to restart as you have to +tell Cylc which tasks to continue on from. The `stop after cycle point` +solution avoids this issue. + + +Example +^^^^^^^ + +.. admonition:: Get a copy of this example + :class: hint + + .. code-block:: console + + $ cylc get-resources examples/extending-workflow/simple + +This workflow will stop at the end of the ``2002`` cycle: + +.. literalinclude:: simple/flow.cylc + :language: cylc + +After it has run and shut down, change the `stop after cycle point` to +the desired value and restart it. E.g: + +.. code-block:: bash + + # install and run the workflow: + cylc vip + + # then later edit "stop after cycle point" to "2004" + + # then reinstall and restart the workflow: + cylc vr + +The workflow will continue from where it left off and run until the end of the +``2004`` cycle. Because the workflow never hit the `final cycle point` it +never "finished" so no special steps are required to restart the workflow. + +You can also set the `stop after cycle point` when you start the workflow: + +.. code-block:: bash + + cylc play --stop-cycle-point=2020 myworkflow + +Or change it at any point whilst the workflow is running: + +.. code-block:: bash + + cylc stop myworkflow//2030 # change the stop after cycle point to 2030 + +.. note:: + + If you set the `stop after cycle point` on the command line, this value will + take precedence over the one in the workflow configuration. Use + ``cylc play --stop-cycle-point=reload`` to restart the workflow using the + `stop after cycle point` configured in the workflow configuration. + + +Running Tasks At The `stop after cycle point` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you have tasks that you want to run before the workflow shuts down at the +`stop after cycle point`, use the recurrence ``R1/`` to schedule +them, e.g: + +.. code-block:: cylc + + #!Jinja2 + + {% set stop_cycle = '3000' %} + + [scheduling] + initial cycle point = 2000 + stop after cycle point = {{ stop_cycle }} + [[graph]] + R1/{{ stop_cycle }} = """ + # these tasks will run *before* the workflow shuts down + z => run_me => and_me + """ + +When the workflow is subsequently restarted with a later +`stop after cycle point`, these tasks will be re-scheduled at the new +stop point. + + +.. cylc-scope:: diff --git a/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc b/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc new file mode 100644 index 00000000000..2de2f28f2ea --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc @@ -0,0 +1,24 @@ +[meta] + title = "Basic extendable workflow" + description = """ + Use the "stop after cycle point" rather than the "final cycle point" + to allow this workflow to be easily extended at a later date. + """ + +[scheduler] + # use the year for the cycle point + # (strip off the month, day, hour and minute) + cycle point format = CCYY + +[scheduling] + initial cycle point = 2000 + stop after cycle point = 2002 # stop after two years of simulated time + [[graph]] + P1Y = """ + z[-P1Y] => a + a => z + """ + +[runtime] + [[a]] + [[z]] diff --git a/cylc/flow/etc/examples/external-data-files/.validate b/cylc/flow/etc/examples/external-data-files/.validate new file mode 100755 index 00000000000..2ea8bbb7bd9 --- /dev/null +++ b/cylc/flow/etc/examples/external-data-files/.validate @@ -0,0 +1,32 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +set -eux + +ID="$(< /dev/urandom tr -dc A-Za-z | head -c6)" + +# run the workflow +cylc vip --check-circular --no-run-name --no-detach --workflow-name "$ID" + +# check the station:ID mapping came out as expected +grep 'fetch data for heathrow, WMO ID: 03772' "${HOME}/cylc-run/${ID}/log/job/20000101T0000Z/fetch_heathrow/NN/job.out" + +# lint +cylc lint "$ID" + +# clean up +cylc clean "$ID" diff --git a/cylc/flow/etc/examples/external-data-files/flow.cylc b/cylc/flow/etc/examples/external-data-files/flow.cylc new file mode 100644 index 00000000000..232448935d4 --- /dev/null +++ b/cylc/flow/etc/examples/external-data-files/flow.cylc @@ -0,0 +1,66 @@ +#!Jinja2 + +[meta] + title = Weather Station Workflow + description = """ + This workflow demonstrates how to read in a data file for use in + defining your workflow. + + We have a file called "stations.json" which contains a list of weather + stations with some data for each. This workflow reads the + "stations.json" file and creates a family for each weather station + with an environment variable for each data field. + + You can load data in other formats too. Try changing "load_json" to + "load_csv" and "stations.json" to "stations.csv" for a CSV example. + """ + + +{# Import a Python function to load our data. #} +{% from "load_data" import load_json %} + +{# Load data from the specified file. #} +{% set stations = load_json('stations.json') %} + +{# Extract a list of station names from the data file. #} +{% set station_names = stations | map(attribute="name") | list %} + + +{# Provide Cylc with a list of weather stations. #} +[task parameters] + station = {{ station_names | join(', ') }} + + +[scheduling] + initial cycle point = 2000-01-01 + final cycle point = 2000-01-02 + [[graph]] + P1D = fetch => process => collate + + +[runtime] +{# Define a family for each weather station #} +{% for station in stations %} + [[STATION]] + [[[environment]]] + {# Turn the parameter into an environment variable #} + {# NB: Just to show how, we could also have used `station["name"]`. #} + name = %(station)s + {# Turn the data for this station into environment variables. #} + wmo = {{ station["wmo"] }} + alt = {{ station["alt"] }} + lat = {{ station["lat"] }} + lon = {{ station["lon"] }} +{% endfor %} + + # a task that gets data + [[fetch]] + inherit = STATION + script = echo "fetch data for $name, WMO ID: $wmo" + + [[process]] + inherit = STATION + script = echo "process data for $name, location: $lat,$lon" + + [[collate]] + script = "echo collate data for stations: {{ station_names }}" diff --git a/cylc/flow/etc/examples/external-data-files/index.rst b/cylc/flow/etc/examples/external-data-files/index.rst new file mode 100644 index 00000000000..e68b34cfc50 --- /dev/null +++ b/cylc/flow/etc/examples/external-data-files/index.rst @@ -0,0 +1,122 @@ +Using Data To Define Your Workflow +================================== + +.. admonition:: Get a copy of this example + :class: hint + + .. code-block:: console + + $ cylc get-resources examples/external-data-files + +We often want to read in a dataset for use in defining our workflow. + +The :ref:`Cylc tutorial ` is an +example of this where we want one ``get_observations`` task for each of a list +of weather stations. Each weather station has a name (e.g. "heathrow") and an +ID (e.g. 3772). + +.. code-block:: cylc + + [runtime] + [[get_observations_heathrow]] + script = get-observations + [[[environment]]] + SITE_ID = 3772 + [[get_observations_camborne]] + script = get-observations + [[[environment]]] + SITE_ID = 3808 + [[get_observations_shetland]] + script = get-observations + [[[environment]]] + SITE_ID = 3005 + [[get_observations_aldergrove]] + script = get-observations + [[[environment]]] + SITE_ID = 3917 + +It can be inconvenient to write out the name and ID of each station in your +workflow like this, however, you may already have this information in a more +convenient format (i.e. a data file of some form). + +With Cylc, we can use :ref:`Jinja2 ` to read in a data file and use that data to +define your workflow. + + +The Approach +------------ + +This example has three components: + +1. A JSON file containing a list of weather stations along with all the data + associated with them. + + .. literalinclude:: stations.json + :language: json + :caption: stations.json + +2. A Python function that reads the JSON file. + + .. code-block:: python + :caption: lib/python/load_data.py + + import json + + + def load_json(filename): + with open(filename, 'r') as json_file: + return json.load(json_file) + + We put this Python code in the workflow's ``lib/python`` directory which + allows us to import it from within our workflow. + +3. A ``flow.cylc`` file that uses the Python function to load the + data file. + + We can import Python functions with Jinja2 using the following syntax: + + .. code-block:: + + {% from "load_data" import load_json %} + + For more information, see :ref:`jinja2.importing_python_modules`. + + + +The Workflow +------------ + +The three files are arranged like so: + +.. code-block:: none + :caption: File Structure + + |-- flow.cylc + |-- lib + | `-- python + | `-- load_data.py + `-- stations.json + +The ``flow.cylc`` file: + +* Imports the Python function. +* Uses it to load the data. +* Then uses the data to define the workflow. + +.. literalinclude:: flow.cylc + :language: ini + :caption: flow.cylc + + +Data Types +---------- + +We can load other types of data file too. This example also includes the same +data in CSV format along with a Python function to load CSV data. To try it +out, open the ``flow.cylc`` file and replace ``stations.json`` with +``stations.csv`` and ``load_json`` with ``load_csv``. + +Any Python code that you import using Jinja2 will be executed using the Python +environment that Cylc is running in. So if you want to import Python code that +isn't in the standard library, you may need to get your system administrator to +install this dependency into the Cylc environment for you. diff --git a/cylc/flow/etc/examples/external-data-files/lib/python/load_data.py b/cylc/flow/etc/examples/external-data-files/lib/python/load_data.py new file mode 100644 index 00000000000..7f9dd822884 --- /dev/null +++ b/cylc/flow/etc/examples/external-data-files/lib/python/load_data.py @@ -0,0 +1,12 @@ +import csv +import json + + +def load_csv(filename): + with open(filename, 'r') as csv_file: + return list(csv.DictReader(csv_file)) + + +def load_json(filename): + with open(filename, 'r') as json_file: + return json.load(json_file) diff --git a/cylc/flow/etc/examples/external-data-files/stations.csv b/cylc/flow/etc/examples/external-data-files/stations.csv new file mode 100644 index 00000000000..81c13a53ad2 --- /dev/null +++ b/cylc/flow/etc/examples/external-data-files/stations.csv @@ -0,0 +1,7 @@ +name,wmo,alt,lat,lon +camborne,03808,87,50.21841,-5.32753 +heathrow,03772,25,51.47922.-0.45061 +lerwick,03005,82,60.13893,-1.18491, +aldergrove,03917,63,54.66365,-6.22534, +exeter,03844,27,50.73717,-3.40579, +middle_wallop,03749,90,51.14987,-1.56994 diff --git a/cylc/flow/etc/examples/external-data-files/stations.json b/cylc/flow/etc/examples/external-data-files/stations.json new file mode 100644 index 00000000000..213d4282eb4 --- /dev/null +++ b/cylc/flow/etc/examples/external-data-files/stations.json @@ -0,0 +1,44 @@ +[ + { + "name": "camborne", + "wmo": "03808", + "alt": 87, + "lat": 50.21841, + "lon": -5.32753 + }, + { + "name": "heathrow", + "wmo": "03772", + "alt": 25, + "lat": 51.47922, + "lon": -0.45061 + }, + { + "name": "lerwick", + "wmo": "03005", + "alt": 82, + "lat": 60.13893, + "lon": -1.18491 + }, + { + "name": "aldergrove", + "wmo": "03917", + "alt": 63, + "lat": 54.66365, + "lon": -6.22534 + }, + { + "name": "exeter", + "wmo": "03844", + "alt": 27, + "lat": 50.73717, + "lon": -3.40579 + }, + { + "name": "middle_wallop", + "wmo": "03749", + "alt": 90, + "lat": 51.14987, + "lon": -1.56994 + } +] diff --git a/cylc/flow/etc/syntax/cylc.lang b/cylc/flow/etc/syntax/cylc.lang index 0270c7f3cfc..c3f43da2c95 100644 --- a/cylc/flow/etc/syntax/cylc.lang +++ b/cylc/flow/etc/syntax/cylc.lang @@ -219,7 +219,7 @@ #\} - \{\{.*\}\} + \{\{.*?\}\} \{% diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index 64665c96983..c751e6196b8 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -346,7 +346,7 @@ def parse_graph(self, graph_string: str) -> None: raise GraphParseError( f"Dangling {seq}:" f"{this_line}" - ) + ) from None part_lines.append(this_line) # Check that a continuation sequence doesn't end this line and @@ -638,7 +638,8 @@ def _proc_dep_pair( except KeyError: # "FAM:bad => foo" in LHS (includes "FAM => bar" too). raise GraphParseError( - f"Illegal family trigger in {expr}") + f"Illegal family trigger in {expr}" + ) from None else: # Not a family. if trig in self.__class__.fam_to_mem_trigger_map: @@ -911,7 +912,8 @@ def _compute_triggers( except KeyError: # Illegal family trigger on RHS of a pair. raise GraphParseError( - f"Illegal family trigger: {name}:{output}") + f"Illegal family trigger: {name}:{output}" + ) from None else: fam = False if not output: diff --git a/cylc/flow/host_select.py b/cylc/flow/host_select.py index 0eb34d088ca..69e32c68a71 100644 --- a/cylc/flow/host_select.py +++ b/cylc/flow/host_select.py @@ -373,7 +373,7 @@ def _filter_by_ranking(hosts, rankings, results, data=None): f'\n Expression: {item}' f'\n Configuration: {GLBL_CFG_STR}' f'\n Error: {exc}' - ) + ) from None if isinstance(result, bool): host_rankings[item] = result data[host][item] = result diff --git a/cylc/flow/id.py b/cylc/flow/id.py index 58fff7fa7bc..f2c8b05b4a1 100644 --- a/cylc/flow/id.py +++ b/cylc/flow/id.py @@ -128,7 +128,7 @@ def __getitem__(self, key): return dict.__getitem__(self, key) except KeyError: if key not in self._KEYS: - raise ValueError(f'Invalid token: {key}') + raise ValueError(f'Invalid token: {key}') from None return None def __str__(self): diff --git a/cylc/flow/id_cli.py b/cylc/flow/id_cli.py index 9c7493fd612..35afbf80d5a 100644 --- a/cylc/flow/id_cli.py +++ b/cylc/flow/id_cli.py @@ -167,9 +167,9 @@ def _parse_cli(*ids: str) -> List[Tokens]: # this ID is invalid with or without the trailing slash tokens = cli_tokenise(id_[:-1]) except ValueError: - raise InputError(f'Invalid ID: {id_}') + raise InputError(f'Invalid ID: {id_}') from None else: - raise InputError(f'Invalid ID: {id_}') + raise InputError(f'Invalid ID: {id_}') from None is_partial = tokens.get('workflow') and not tokens.get('cycle') is_relative = not tokens.get('workflow') @@ -347,7 +347,7 @@ async def parse_ids_async( if src: if not flow_file_path: # get the workflow file path from the run dir - flow_file_path = get_flow_file(list(workflows)[0]) + flow_file_path = get_flow_file(next(iter(workflows))) return workflows, flow_file_path return workflows, multi_mode @@ -375,7 +375,7 @@ async def parse_id_async( 'max_tasks': 1, }, ) - workflow_id = list(workflows)[0] + workflow_id = next(iter(workflows)) tokens_list = workflows[workflow_id] tokens: Optional[Tokens] if tokens_list: diff --git a/cylc/flow/install.py b/cylc/flow/install.py index 8ab619a323d..87b8b6b0565 100644 --- a/cylc/flow/install.py +++ b/cylc/flow/install.py @@ -330,7 +330,7 @@ def install_workflow( # This occurs when the file exists but is _not_ a directory. raise WorkflowFilesError( f"Cannot install as there is an existing file at {rundir}." - ) + ) from None if relink: link_runN(rundir) rsync_cmd = get_rsync_rund_cmd(source, rundir) @@ -531,7 +531,7 @@ def parse_cli_sym_dirs(symlink_dirs: str) -> Dict[str, Dict[str, Any]]: 'There is an error in --symlink-dirs option:' f' {pair}. Try entering option in the form ' '--symlink-dirs=\'log=$DIR, share=$DIR2, ...\'' - ) + ) from None if key not in possible_symlink_dirs: dirs = ', '.join(possible_symlink_dirs) raise InputError( diff --git a/cylc/flow/install_plugins/log_vc_info.py b/cylc/flow/install_plugins/log_vc_info.py index 29d861f7654..41479d6f1ed 100644 --- a/cylc/flow/install_plugins/log_vc_info.py +++ b/cylc/flow/install_plugins/log_vc_info.py @@ -253,7 +253,7 @@ def _run_cmd( except FileNotFoundError as exc: # This will only be raised if the VCS command is not installed, # otherwise Popen() will succeed with a non-zero return code - raise VCSNotInstalledError(vcs, exc) + raise VCSNotInstalledError(vcs, exc) from None if stdout == PIPE: out, err = pipe_poller(proc, proc.stdout, proc.stderr) else: diff --git a/cylc/flow/job_runner_handlers/loadleveler.py b/cylc/flow/job_runner_handlers/loadleveler.py index ee8203b2b47..d097ff85faf 100644 --- a/cylc/flow/job_runner_handlers/loadleveler.py +++ b/cylc/flow/job_runner_handlers/loadleveler.py @@ -83,6 +83,7 @@ class LoadlevelerHandler(): re.compile("^llsubmit: Processed command file through Submit Filter:")] SUBMIT_CMD_TMPL = "llsubmit '%(job)s'" VACATION_SIGNAL = "USR1" + TIME_LIMIT_DIRECTIVE = "wall_clock_limit" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -96,8 +97,8 @@ def format_directives(self, job_conf): directives["output"] = job_file_path + ".out" directives["error"] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("wall_clock_limit") is None): - directives["wall_clock_limit"] = "%d,%d" % ( + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d,%d" % ( job_conf["execution_time_limit"] + 60, job_conf["execution_time_limit"]) for key, value in list(job_conf["directives"].items()): diff --git a/cylc/flow/job_runner_handlers/lsf.py b/cylc/flow/job_runner_handlers/lsf.py index a465c9b7924..534d1205a9d 100644 --- a/cylc/flow/job_runner_handlers/lsf.py +++ b/cylc/flow/job_runner_handlers/lsf.py @@ -70,6 +70,7 @@ class LSFHandler(): POLL_CMD = "bjobs" REC_ID_FROM_SUBMIT_OUT = re.compile(r"^Job <(?P\d+)>") SUBMIT_CMD_TMPL = "bsub" + TIME_LIMIT_DIRECTIVE = "-W" @classmethod def format_directives(cls, job_conf): @@ -82,8 +83,11 @@ def format_directives(cls, job_conf): ) directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" - if job_conf["execution_time_limit"] and directives.get("-W") is None: - directives["-W"] = str(math.ceil( + if ( + job_conf["execution_time_limit"] + and directives.get(cls.TIME_LIMIT_DIRECTIVE) is None + ): + directives[cls.TIME_LIMIT_DIRECTIVE] = str(math.ceil( job_conf["execution_time_limit"] / 60)) for key, value in list(job_conf["directives"].items()): directives[key] = value diff --git a/cylc/flow/job_runner_handlers/moab.py b/cylc/flow/job_runner_handlers/moab.py index 839d246ccbc..ec068d48420 100644 --- a/cylc/flow/job_runner_handlers/moab.py +++ b/cylc/flow/job_runner_handlers/moab.py @@ -78,6 +78,7 @@ class MoabHandler: POLL_CMD = "checkjob" REC_ID_FROM_SUBMIT_OUT = re.compile(r"""\A\s*(?P\S+)\s*\Z""") SUBMIT_CMD_TMPL = "msub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l walltime" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -91,8 +92,9 @@ def format_directives(self, job_conf): directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("-l walltime") is None): - directives["-l walltime"] = "%d" % job_conf["execution_time_limit"] + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[ + "execution_time_limit"] # restartable? directives.update(job_conf["directives"]) lines = [] diff --git a/cylc/flow/job_runner_handlers/pbs.py b/cylc/flow/job_runner_handlers/pbs.py index aa264311fc4..ac0d6c47a00 100644 --- a/cylc/flow/job_runner_handlers/pbs.py +++ b/cylc/flow/job_runner_handlers/pbs.py @@ -84,6 +84,7 @@ class PBSHandler: POLL_CANT_CONNECT_ERR = "Connection refused" REC_ID_FROM_SUBMIT_OUT = re.compile(r"^\s*(?P\d+)", re.M) SUBMIT_CMD_TMPL = "qsub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l walltime" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -105,9 +106,12 @@ def format_directives(self, job_conf): directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" - if (job_conf["execution_time_limit"] and - directives.get("-l walltime") is None): - directives["-l walltime"] = "%d" % job_conf["execution_time_limit"] + if ( + job_conf["execution_time_limit"] + and directives.get(self.TIME_LIMIT_DIRECTIVE) is None + ): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[ + "execution_time_limit"] for key, value in list(job_conf["directives"].items()): directives[key] = value lines = [] diff --git a/cylc/flow/job_runner_handlers/sge.py b/cylc/flow/job_runner_handlers/sge.py index 33f7d5a26d7..c7c50956fb9 100644 --- a/cylc/flow/job_runner_handlers/sge.py +++ b/cylc/flow/job_runner_handlers/sge.py @@ -37,7 +37,6 @@ -cwd = -q = foo -l h_data = 1024M - -l h_rt = 24:00:00 These are written to the top of the job script like this: @@ -76,6 +75,7 @@ class SGEHandler: POLL_CMD = "qstat" REC_ID_FROM_SUBMIT_OUT = re.compile(r"\D+(?P\d+)\D+") SUBMIT_CMD_TMPL = "qsub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l h_rt" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -88,8 +88,8 @@ def format_directives(self, job_conf): directives['-o'] = job_file_path + ".out" directives['-e'] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("-l h_rt") is None): - directives["-l h_rt"] = "%d:%02d:%02d" % ( + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d:%02d:%02d" % ( job_conf["execution_time_limit"] / 3600, (job_conf["execution_time_limit"] / 60) % 60, job_conf["execution_time_limit"] % 60) diff --git a/cylc/flow/job_runner_handlers/slurm.py b/cylc/flow/job_runner_handlers/slurm.py index 4ec6be20471..33df2ebc926 100644 --- a/cylc/flow/job_runner_handlers/slurm.py +++ b/cylc/flow/job_runner_handlers/slurm.py @@ -135,6 +135,8 @@ class SLURMHandler(): # Separator between het job directive sections SEP_HETJOB = "#SBATCH hetjob" + TIME_LIMIT_DIRECTIVE = "--time" + @classmethod def filter_poll_many_output(cls, out): """Return list of job IDs extracted from job poll stdout. @@ -161,8 +163,8 @@ def format_directives(cls, job_conf): directives['--output'] = job_file_path.replace('%', '%%') + ".out" directives['--error'] = job_file_path.replace('%', '%%') + ".err" if (job_conf["execution_time_limit"] and - directives.get("--time") is None): - directives["--time"] = "%d:%02d" % ( + directives.get(cls.TIME_LIMIT_DIRECTIVE) is None): + directives[cls.TIME_LIMIT_DIRECTIVE] = "%d:%02d" % ( job_conf["execution_time_limit"] / 60, job_conf["execution_time_limit"] % 60) for key, value in list(job_conf['directives'].items()): diff --git a/cylc/flow/loggingutil.py b/cylc/flow/loggingutil.py index 3c6f63ee294..9d3060afade 100644 --- a/cylc/flow/loggingutil.py +++ b/cylc/flow/loggingutil.py @@ -211,7 +211,7 @@ def should_rollover(self, record: logging.LogRecord) -> bool: self.stream.seek(0, 2) except ValueError as exc: # intended to catch - ValueError: I/O operation on closed file - raise SystemExit(exc) + raise SystemExit(exc) from None return self.stream.tell() + len(msg.encode('utf8')) >= self.max_bytes @property diff --git a/cylc/flow/main_loop/__init__.py b/cylc/flow/main_loop/__init__.py index 2350153842c..e9f9f35f5da 100644 --- a/cylc/flow/main_loop/__init__.py +++ b/cylc/flow/main_loop/__init__.py @@ -329,14 +329,14 @@ def load(config, additional_plugins=None): f'No main-loop plugin: "{plugin_name}"\n' + ' Available plugins:\n' + indent('\n'.join(sorted(entry_points)), ' ') - ) + ) from None # load plugin try: module = entry_point.load() except Exception as exc: raise PluginError( 'cylc.main_loop', entry_point.name, exc - ) + ) from None # load coroutines log = [] for coro_name, coro in getmembers(module): diff --git a/cylc/flow/main_loop/health_check.py b/cylc/flow/main_loop/health_check.py index b488c878c0e..6e42724ba91 100644 --- a/cylc/flow/main_loop/health_check.py +++ b/cylc/flow/main_loop/health_check.py @@ -52,8 +52,8 @@ def _check_contact_file(scheduler): scheduler.workflow) if contact_data != scheduler.contact_data: raise CylcError('contact file modified') - except (AssertionError, IOError, ValueError, ServiceFileError): + except (AssertionError, IOError, ValueError, ServiceFileError) as exc: raise CylcError( '%s: contact file corrupted/modified and may be left' % workflow_files.get_contact_file_path(scheduler.workflow) - ) + ) from exc diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index 916b129e244..42b79475ca5 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -78,7 +78,7 @@ def get_location(workflow: str) -> Tuple[str, int, int]: contact = load_contact_file(workflow) except (IOError, ValueError, ServiceFileError): # Contact file does not exist or corrupted, workflow should be dead - raise WorkflowStopped(workflow) + raise WorkflowStopped(workflow) from None host = contact[ContactFileFields.HOST] host = get_fqdn_by_host(host) @@ -176,16 +176,14 @@ def _socket_bind(self, min_port, max_port, srv_prv_key_loc=None): srv_prv_key_info.full_key_path) except ValueError: raise ServiceFileError( - f"Failed to find server's public " - f"key in " + "Failed to find server's public key in " f"{srv_prv_key_info.full_key_path}." - ) - except OSError: + ) from None + except OSError as exc: raise ServiceFileError( - f"IO error opening server's private " - f"key from " + "IO error opening server's private key from " f"{srv_prv_key_info.full_key_path}." - ) + ) from exc if server_private_key is None: # this can't be caught by exception raise ServiceFileError( f"Failed to find server's private " @@ -204,7 +202,9 @@ def _socket_bind(self, min_port, max_port, srv_prv_key_loc=None): self.port = self.socket.bind_to_random_port( 'tcp://*', min_port, max_port) except (zmq.error.ZMQError, zmq.error.ZMQBindError) as exc: - raise CylcError(f'could not start Cylc ZMQ server: {exc}') + raise CylcError( + f'could not start Cylc ZMQ server: {exc}' + ) from None # Keeping srv_public_key_loc as optional arg so as to not break interface def _socket_connect(self, host, port, srv_public_key_loc=None): @@ -236,8 +236,10 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): try: client_public_key, client_priv_key = zmq.auth.load_certificate( client_priv_key_info.full_key_path) - except (OSError, ValueError): - raise ClientError(error_msg) + except ValueError: + raise ClientError(error_msg) from None + except OSError as exc: + raise ClientError(error_msg) from exc if client_priv_key is None: # this can't be caught by exception raise ClientError(error_msg) self.socket.curve_publickey = client_public_key @@ -245,6 +247,9 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): # A client can only connect to the server if it knows its public key, # so we grab this from the location it was created on the filesystem: + error_msg = ( + "Failed to load the workflow's public key, so cannot connect." + ) try: # 'load_certificate' will try to load both public & private keys # from a provided file but will return None, not throw an error, @@ -254,9 +259,10 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): server_public_key = zmq.auth.load_certificate( srv_pub_key_info.full_key_path)[0] self.socket.curve_serverkey = server_public_key - except (OSError, ValueError): # ValueError raised w/ no public key - raise ClientError( - "Failed to load the workflow's public key, so cannot connect.") + except ValueError: # ValueError raised w/ no public key + raise ClientError(error_msg) from None + except OSError as exc: + raise ClientError(error_msg) from exc self.socket.connect(f'tcp://{host}:{port}') diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index e7e26954d56..099ef8bc0ff 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -326,7 +326,7 @@ async def async_request( raise ClientError( error.get('message'), # type: ignore error.get('traceback'), # type: ignore - ) + ) from None def get_header(self) -> dict: """Return "header" data to attach to each request for traceability. diff --git a/cylc/flow/network/multi.py b/cylc/flow/network/multi.py index 2b9ea418976..9c190f68799 100644 --- a/cylc/flow/network/multi.py +++ b/cylc/flow/network/multi.py @@ -235,7 +235,7 @@ def _report( """ try: ret: List[Tuple[Optional[str], Optional[str], bool]] = [] - for _mutation_name, mutation_response in response.items(): + for mutation_response in response.values(): # extract the result of each mutation result in the response success, msg = mutation_response['result'][0]['response'] out = None diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 11eafd8bea5..fc9b67eeef5 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -750,7 +750,7 @@ async def _mutation_mapper( try: meth = COMMANDS[command] except KeyError: - raise ValueError(f"Command '{command}' not found") + raise ValueError(f"Command '{command}' not found") from None try: # Initiate the command. Validation may be performed at this point, diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 5fc277fb607..ab34def7f75 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -271,7 +271,7 @@ def field_name_from_type( try: return NODE_MAP[named_type.name] except KeyError: - raise ValueError(f"'{named_type.name}' is not a node type") + raise ValueError(f"'{named_type.name}' is not a node type") from None def get_resolvers(info: 'ResolveInfo') -> 'BaseResolvers': @@ -577,9 +577,10 @@ def resolve_mapping_to_list(root, info, **args): # Types: class NodeMeta(ObjectType): class Meta: - description = """ -Meta data fields, -including custom fields in a generic user-defined dump""" + description = sstrip(""" + Meta data fields, including custom fields in a generic user-defined + dump. + """) title = String(default_value=None) description = String(default_value=None) URL = String(default_value=None) @@ -588,7 +589,7 @@ class Meta: class TimeZone(ObjectType): class Meta: - description = """Time zone info.""" + description = 'Time zone info.' hours = Int() minutes = Int() string_basic = String() @@ -597,7 +598,7 @@ class Meta: class Workflow(ObjectType): class Meta: - description = """Global workflow info.""" + description = 'Global workflow info.' id = ID() # noqa: A003 (required for definition) name = String( description='The workflow ID with the ~user/ prefix removed.', @@ -730,7 +731,7 @@ class Meta: latest_state_tasks = GenericScalar( states=graphene.List( String, - description="List of task states to show", + description="List of task states to show.", default_value=TASK_STATUSES_ORDERED), resolver=resolve_state_tasks, description='The latest tasks to have entered each task state.', @@ -769,8 +770,9 @@ class Meta: ids=graphene.List( ID, description=sstrip(''' - Node IDs, cycle point and/or-just family/task namespace: - ["1234/foo", "1234/FAM", "*/FAM"] + Node IDs, cycle point and/or-just family/task namespace. + + E.g: `["1234/foo", "1234/FAM", "*/FAM"]` '''), default_value=[] ), @@ -858,7 +860,7 @@ class Meta: ) task_proxy = Field( lambda: TaskProxy, - description="The TaskProxy of the task which submitted this job", + description="The TaskProxy of the task which submitted this job.", strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -919,7 +921,7 @@ class Meta: ) runtime = Field( Runtime, - description="The task's `[runtime`] section.", + description="The task's `[runtime]` section.", ) mean_elapsed_time = Float( description="The task's average runtime." @@ -955,7 +957,7 @@ class Meta: class PollTask(ObjectType): class Meta: - description = """Polling task edge""" + description = 'Polling task edge.' local_proxy = ID() workflow = String() remote_proxy = ID() @@ -965,11 +967,11 @@ class Meta: class Condition(ObjectType): class Meta: - description = """Prerequisite conditions.""" + description = 'Prerequisite conditions.' task_id = String() task_proxy = Field( lambda: TaskProxy, - description="""Associated Task Proxy""", + description='Associated Task Proxy.', strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -985,18 +987,19 @@ def resolve_task_id(root, info, **args): class Prerequisite(ObjectType): class Meta: - description = """Task prerequisite.""" + description = 'Task prerequisite.' expression = String() conditions = graphene.List( Condition, - description="""Condition monomers of a task prerequisites.""") + description='Condition monomers of a task prerequisites.' + ) cycle_points = graphene.List(String) satisfied = Boolean() class Output(ObjectType): class Meta: - description = """Task output""" + description = 'Task output.' label = String() message = String() satisfied = Boolean() @@ -1004,16 +1007,16 @@ class Meta: class OutputLabel(String): - """Task output, e.g. "succeeded".""" + """Task output, e.g. `succeeded`.""" class PrerequisiteString(String): - """A task prerequisite, e.g. "2040/foo:succeeded".""" + """A task prerequisite, e.g. `2040/foo:succeeded`.""" class XTrigger(ObjectType): class Meta: - description = """Task trigger""" + description = 'Task trigger.' id = String() # noqa: A003 (required for definition) label = String() message = String() @@ -1023,7 +1026,7 @@ class Meta: class TaskProxy(ObjectType): class Meta: - description = """Task cycle instance.""" + description = 'Task cycle instance.' id = ID() # noqa: A003 (required for schema definition) task = Field( Task, @@ -1073,7 +1076,7 @@ class Meta: '''), ) depth = Int( - description='The family inheritance depth', + description='The family inheritance depth.', ) graph_depth = Int( description=sstrip(''' @@ -1158,7 +1161,7 @@ class Meta: class Family(ObjectType): class Meta: - description = """Task definition, static fields""" + description = 'Task definition, static fields.' id = ID() # noqa: A003 (required for schema definition) name = String() meta = Field(NodeMeta) @@ -1166,7 +1169,7 @@ class Meta: depth = Int() proxies = graphene.List( lambda: FamilyProxy, - description="""Associated cycle point proxies""", + description='Associated cycle point proxies.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1174,7 +1177,7 @@ class Meta: resolver=get_nodes_by_ids) parents = graphene.List( lambda: Family, - description="""Family definition parent.""", + description='Family definition parent.', args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1182,7 +1185,7 @@ class Meta: resolver=get_nodes_by_ids) child_tasks = graphene.List( Task, - description="""Descendant definition tasks.""", + description='Descendant definition tasks.', args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1190,7 +1193,7 @@ class Meta: resolver=get_nodes_by_ids) child_families = graphene.List( lambda: Family, - description="""Descendant desc families.""", + description='Descendant desc families.', args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1198,7 +1201,7 @@ class Meta: resolver=get_nodes_by_ids) first_parent = Field( lambda: Family, - description="""Family first parent.""", + description='Family first parent.', strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -1207,14 +1210,14 @@ class Meta: class FamilyProxy(ObjectType): class Meta: - description = """Family composite.""" + description = 'Family composite.' id = ID() # noqa: A003 (required for schema definition) cycle_point = String() # name & namespace for filtering/sorting name = String() family = Field( Family, - description="""Family definition""", + description='Family definition.', strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -1238,7 +1241,7 @@ class Meta: ) child_tasks = graphene.List( TaskProxy, - description="""Descendant task proxies.""", + description='Descendant task proxies.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1246,7 +1249,7 @@ class Meta: resolver=get_nodes_by_ids) child_families = graphene.List( lambda: FamilyProxy, - description="""Descendant family proxies.""", + description='Descendant family proxies.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1254,7 +1257,7 @@ class Meta: resolver=get_nodes_by_ids) first_parent = Field( lambda: FamilyProxy, - description="""Task first parent.""", + description='Task first parent.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1262,7 +1265,7 @@ class Meta: resolver=get_node_by_id) ancestors = graphene.List( lambda: FamilyProxy, - description="""First parent ancestors.""", + description='First parent ancestors.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1283,7 +1286,7 @@ def resolve_type(cls, instance, info): class Edge(ObjectType): class Meta: - description = """Dependency edge task/family proxies""" + description = 'Dependency edge task/family proxies.' id = ID() # noqa: A003 (required for schema definition) source = ID() source_node = Field( @@ -1305,7 +1308,7 @@ class Meta: class Edges(ObjectType): class Meta: - description = """Dependency edge""" + description = 'Dependency edge.' edges = graphene.List( Edge, args=EDGE_ARGS, @@ -1320,19 +1323,19 @@ class Meta: class NodesEdges(ObjectType): class Meta: - description = """Related Nodes & Edges.""" + description = 'Related Nodes & Edges.' nodes = graphene.List( TaskProxy, - description="""Task nodes from and including root.""") + description='Task nodes from and including root.') edges = graphene.List( Edge, - description="""Edges associated with the nodes.""") + description='Edges associated with the nodes.') # Query declaration class Queries(ObjectType): class Meta: - description = """Multi-Workflow root level queries.""" + description = 'Multi-Workflow root level queries.' workflows = graphene.List( Workflow, description=Workflow._meta.description, @@ -1420,7 +1423,7 @@ class Meta: # Generic containers class GenericResponse(ObjectType): class Meta: - description = """Container for command queued response""" + description = 'Container for command queued response.' result = GenericScalar() @@ -1455,6 +1458,7 @@ async def mutator( method). If None, uses mutation class name converted to snake_case. workflows: List of workflow IDs. exworkflows: List of workflow IDs. + """ if command is None: command = to_snake_case(info.field_name) @@ -1567,13 +1571,13 @@ class TaskState(InputObjectType): status = TaskStatus() is_held = Boolean(description=sstrip(''' - If a task is held no new job submissions will be made + If a task is held no new job submissions will be made. ''')) is_queued = Boolean(description=sstrip(''' - Task is queued for job submission + Task is queued for job submission. ''')) is_runahead = Boolean(description=sstrip(''' - Task is runahead limited + Task is runahead limited. ''')) @@ -1792,7 +1796,7 @@ class Arguments: event_time = String(default_value=None) messages = graphene.List( graphene.List(String), - description="""List in the form `[[severity, message], ...]`.""", + description='List in the form `[[severity, message], ...]`.', default_value=None ) @@ -1820,6 +1824,7 @@ class Resume(Mutation): class Meta: description = sstrip(''' Resume a paused workflow. + See also the opposite command `pause`. Valid for: paused workflows. @@ -1952,8 +1957,8 @@ class Meta: trigger event. When an incoming message satisfies a task's external trigger the message ID is broadcast to all downstream tasks in the cycle point as - ``$CYLC_EXT_TRIGGER_ID``. Tasks can use - ``$CYLC_EXT_TRIGGER_ID``, for example, to + `$CYLC_EXT_TRIGGER_ID`. Tasks can use + `$CYLC_EXT_TRIGGER_ID`, for example, to identify a new data file that the external triggering system is responding to. @@ -2116,9 +2121,9 @@ class Meta: Setting outputs contributes to the task's completion, sets the corresponding prerequisites of child tasks, and sets any implied outputs: - - ``started`` implies ``submitted``. - - ``succeeded`` and ``failed`` imply ``started``. - - custom outputs and ``expired`` do not imply any other outputs. + - `started` implies `submitted`. + - `succeeded` and `failed` imply `started`. + - custom outputs and `expired` do not imply any other outputs. Valid for: paused, running, stopping workflows. """) @@ -2227,7 +2232,10 @@ def delta_subs(root, info: 'ResolveInfo', **args) -> AsyncGenerator[Any, None]: class Pruned(ObjectType): class Meta: - description = """WFS Nodes/Edges that have been removed.""" + description = sstrip(''' + Objects (e.g. workflows, tasks, jobs) which have been removed from + the store. + ''') workflow = String() families = graphene.List(String, default_value=[]) family_proxies = graphene.List(String, default_value=[]) @@ -2249,7 +2257,7 @@ class Delta(Interface): families = graphene.List( Family, - description="""Family definitions.""", + description='Family definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2258,7 +2266,7 @@ class Delta(Interface): ) family_proxies = graphene.List( FamilyProxy, - description="""Family cycle instances.""", + description='Family cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2267,7 +2275,7 @@ class Delta(Interface): ) jobs = graphene.List( Job, - description="""Jobs.""", + description='Jobs.', args=JOB_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2276,7 +2284,7 @@ class Delta(Interface): ) tasks = graphene.List( Task, - description="""Task definitions.""", + description='Task definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2285,7 +2293,7 @@ class Delta(Interface): ) task_proxies = graphene.List( TaskProxy, - description="""Task cycle instances.""", + description='Task cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2294,7 +2302,7 @@ class Delta(Interface): ) edges = graphene.List( Edge, - description="""Graph edges""", + description='Graph edges', args=EDGE_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2313,18 +2321,18 @@ class Delta(Interface): class Added(ObjectType): class Meta: - description = """Added node/edge deltas.""" + description = 'Added node/edge deltas.' interfaces = (Delta,) class Updated(ObjectType): class Meta: - description = """Updated node/edge deltas.""" + description = 'Updated node/edge deltas.' interfaces = (Delta,) families = graphene.List( Family, - description="""Family definitions.""", + description='Family definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2333,7 +2341,7 @@ class Meta: ) family_proxies = graphene.List( FamilyProxy, - description="""Family cycle instances.""", + description='Family cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2342,7 +2350,7 @@ class Meta: ) jobs = graphene.List( Job, - description="""Jobs.""", + description='Jobs.', args=JOB_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2351,7 +2359,7 @@ class Meta: ) tasks = graphene.List( Task, - description="""Task definitions.""", + description='Task definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2360,7 +2368,7 @@ class Meta: ) task_proxies = graphene.List( TaskProxy, - description="""Task cycle instances.""", + description='Task cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2369,7 +2377,7 @@ class Meta: ) edges = graphene.List( Edge, - description="""Graph edges""", + description='Graph edges.', args=EDGE_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2388,7 +2396,7 @@ class Meta: class Deltas(ObjectType): class Meta: - description = """Grouped deltas of the WFS publish""" + description = 'Grouped deltas of the WFS publish.' id = ID() # noqa: A003 (required for schema definition) shutdown = Boolean(default_value=False) added = Field( @@ -2411,13 +2419,13 @@ class Meta: class Subscriptions(ObjectType): """Defines the subscriptions available in the schema.""" class Meta: - description = """Multi-Workflow root level subscriptions.""" + description = 'Multi-Workflow root level subscriptions.' deltas = Field( Deltas, description=Deltas._meta.description, workflows=graphene.List( - ID, description="List of full ID, i.e. `~user/workflow_id`" + ID, description="List of full ID, i.e. `~user/workflow_id`." ), strip_null=Boolean(default_value=False), initial_burst=Boolean(default_value=True), diff --git a/cylc/flow/network/ssh_client.py b/cylc/flow/network/ssh_client.py index d1dd4fb6da4..d2ed0dd33e9 100644 --- a/cylc/flow/network/ssh_client.py +++ b/cylc/flow/network/ssh_client.py @@ -90,7 +90,7 @@ async def async_request( f"Command exceeded the timeout {timeout}s. " "This could be due to network problems. " "Check the workflow log." - ) + ) from None def prepare_command( self, command: str, args: Optional[dict], timeout: Union[float, str] diff --git a/cylc/flow/param_expand.py b/cylc/flow/param_expand.py index 0707a46e1a3..1c9dec2598e 100644 --- a/cylc/flow/param_expand.py +++ b/cylc/flow/param_expand.py @@ -62,7 +62,6 @@ def expand(template, params, results, values=None): from cylc.flow.exceptions import ParamExpandError from cylc.flow.task_id import TaskID -from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults # To split runtime heading name lists. REC_NAMES = re.compile(r'(?:[^,<]|<[^>]*>)+') @@ -195,8 +194,9 @@ def _expand_name(self, results, tmpl, params, spec_vals=None): try: results.append((tmpl % current_values, current_values)) except KeyError as exc: - raise ParamExpandError('parameter %s is not ' - 'defined.' % str(exc.args[0])) + raise ParamExpandError( + 'parameter %s is not defined.' % str(exc.args[0]) + ) from None else: for param_val in params[0][1]: spec_vals[params[0][0]] = param_val @@ -306,8 +306,8 @@ def expand_parent_params(self, parent, param_values, origin): used[item] = param_values[item] except KeyError: raise ParamExpandError( - "parameter '%s' undefined in '%s'" % ( - item, origin)) + "parameter '%s' undefined in '%s'" % (item, origin) + ) from None # For each parameter substitute the param_tmpl_cfg. tmpl = tmpl.format(**self.param_tmpl_cfg) @@ -397,13 +397,12 @@ def _expand_graph(self, line, all_params, # Inner loop. for p_group in set(REC_P_GROUP.findall(line)): # Parameters must be expanded in the order found. - param_values = OrderedDictWithDefaults() - tmpl = '' + param_values = {} for item in p_group.split(','): pname, offs = REC_P_OFFS.match(item).groups() if offs is None: param_values[pname] = values[pname] - elif offs.startswith('='): + elif offs[0] == '=': # Specific value. try: # Template may require an integer @@ -420,13 +419,16 @@ def _expand_graph(self, line, all_params, else: offval = self._REMOVE param_values[pname] = offval - for pname in param_values: - tmpl += self.param_tmpl_cfg[pname] + tmpl = ''.join( + self.param_tmpl_cfg[pname] + for pname in param_values + ) try: repl = tmpl % param_values except KeyError as exc: - raise ParamExpandError('parameter %s is not ' - 'defined.' % str(exc.args[0])) + raise ParamExpandError( + 'parameter %s is not defined.' % str(exc.args[0]) + ) from None line = line.replace('<' + p_group + '>', repl) if line: line_set.add(line) diff --git a/cylc/flow/parsec/config.py b/cylc/flow/parsec/config.py index 19f937d8e5b..29944c03b30 100644 --- a/cylc/flow/parsec/config.py +++ b/cylc/flow/parsec/config.py @@ -150,10 +150,12 @@ def get(self, keys: Optional[Iterable[str]] = None, sparse: bool = False): # setting not present in __MANY__ section: key in self.spec.get(*parents) ): - raise ItemNotFoundError(itemstr(parents, key)) + raise ItemNotFoundError( + itemstr(parents, key) + ) from None raise InvalidConfigError( itemstr(parents, key), self.spec.name - ) + ) from None else: parents.append(key) diff --git a/cylc/flow/parsec/empysupport.py b/cylc/flow/parsec/empysupport.py index b4164894e0f..e3dc5e28df4 100644 --- a/cylc/flow/parsec/empysupport.py +++ b/cylc/flow/parsec/empysupport.py @@ -66,7 +66,7 @@ def empyprocess( raise EmPyError( str(exc), lines={'