From 8a28af6a609161f2b4454c42c765a5c30e2defbe Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Wed, 5 Apr 2023 13:52:21 +1200 Subject: [PATCH] Xtrigger function arg validation. --- cylc/flow/config.py | 37 ++++++++++++++++++------------- cylc/flow/subprocpool.py | 33 ++++++++++++++++----------- cylc/flow/xtrigger_mgr.py | 4 ++-- cylc/flow/xtriggers/wall_clock.py | 32 ++++++++++++++++++++++++++ tests/unit/test_subprocpool.py | 13 ++++++----- 5 files changed, 83 insertions(+), 36 deletions(-) diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 818fa6c62a5..771b8cb8187 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -57,6 +57,7 @@ from cylc.flow.id import Tokens from cylc.flow.cycling.integer import IntegerInterval from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval + from cylc.flow.exceptions import ( CylcError, WorkflowConfigError, @@ -84,6 +85,7 @@ from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM from cylc.flow.print_tree import print_tree from cylc.flow.subprocctx import SubFuncContext +from cylc.flow.subprocpool import get_func from cylc.flow.task_events_mgr import ( EventData, get_event_handler_data @@ -1749,28 +1751,31 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, if label != 'wall_clock': raise WorkflowConfigError(f"xtrigger not defined: {label}") else: - # Allow "@wall_clock" in the graph as an undeclared - # zero-offset clock xtrigger. - xtrig = SubFuncContext( - 'wall_clock', 'wall_clock', [], {}) + # Allow "@wall_clock" in graph as implicit zero-offset. + xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {}) - if xtrig.func_name == 'wall_clock': - if self.cycling_type == INTEGER_CYCLING_TYPE: - raise WorkflowConfigError( - "Clock xtriggers require datetime cycling:" - f" {label} = {xtrig.get_signature()}" - ) - else: - # Convert offset arg to kwarg for certainty later. - if "offset" not in xtrig.func_kwargs: - xtrig.func_kwargs["offset"] = None - with suppress(IndexError): - xtrig.func_kwargs["offset"] = xtrig.func_args[0] + if ( + xtrig.func_name == 'wall_clock' and + self.cycling_type == INTEGER_CYCLING_TYPE + ): + raise WorkflowConfigError( + "Clock xtriggers require datetime cycling:" + f" {label} = {xtrig.get_signature()}" + ) + + # Call the xtrigger's validate_config function if it has one. + with suppress(AttributeError): + get_func(xtrig.func_name, "validate_config", self.fdir)( + xtrig.func_args, + xtrig.func_kwargs, + xtrig.get_signature() + ) if self.xtrigger_mgr is None: XtriggerManager.validate_xtrigger(label, xtrig, self.fdir) else: self.xtrigger_mgr.add_trig(label, xtrig, self.fdir) + self.taskdefs[right].add_xtrig_label(label, seq) def get_actual_first_point(self, start_point): diff --git a/cylc/flow/subprocpool.py b/cylc/flow/subprocpool.py index e4e64755fc4..70cd20b0c6d 100644 --- a/cylc/flow/subprocpool.py +++ b/cylc/flow/subprocpool.py @@ -64,33 +64,38 @@ def _killpg(proc, signal): return True -def get_func(func_name, src_dir): - """Find and return an xtrigger function from a module of the same name. +def get_func(mod_name, func_name, src_dir): + """Find and return a name function from a named module. + + Can be in /lib/python, cylc.flow.xtriggers, or in Python path. - Can be in /lib/python, CYLC_MOD_LOC, or in Python path. Workflow source directory passed in as this is executed in an independent process in the command pool and therefore doesn't know about the workflow. + Raises: + ImportError, if the module is not found + AtributeError, if the function is not found in the module + """ if func_name in _XTRIG_FUNCS: + # Found and cached already. return _XTRIG_FUNCS[func_name] - # First look in /lib/python. + + # 1. look in /lib/python. sys.path.insert(0, os.path.join(src_dir, 'lib', 'python')) - mod_name = func_name try: mod_by_name = __import__(mod_name, fromlist=[mod_name]) except ImportError: - # Then look in built-in xtriggers. - mod_name = "%s.%s" % ("cylc.flow.xtriggers", func_name) + # 2. look in built-in xtriggers. + mod_name = "%s.%s" % ("cylc.flow.xtriggers", mod_name) try: mod_by_name = __import__(mod_name, fromlist=[mod_name]) except ImportError: raise - try: - _XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name) - except AttributeError: - # Module func_name has no function func_name. - raise + + # Module found and imported, return the named function. + + _XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name) return _XTRIG_FUNCS[func_name] @@ -99,6 +104,8 @@ def run_function(func_name, json_args, json_kwargs, src_dir): func_name(*func_args, **func_kwargs) + The function is presumed to be in a module of the same name. + Redirect any function stdout to stderr (and workflow log in debug mode). Return value printed to stdout as a JSON string - allows use of the existing process pool machinery as-is. src_dir is for local modules. @@ -107,7 +114,7 @@ def run_function(func_name, json_args, json_kwargs, src_dir): func_args = json.loads(json_args) func_kwargs = json.loads(json_kwargs) # Find and import then function. - func = get_func(func_name, src_dir) + func = get_func(func_name, func_name, src_dir) # Redirect stdout to stderr. orig_stdout = sys.stdout sys.stdout = sys.stderr diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4512d97b7c8..99dfa292546 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -27,7 +27,6 @@ import cylc.flow.flags from cylc.flow.hostuserutil import get_user from cylc.flow.xtriggers.wall_clock import wall_clock - from cylc.flow.subprocctx import SubFuncContext from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.data_store_mgr import DataStoreMgr @@ -252,8 +251,9 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: """ fname: str = fctx.func_name + try: - func = get_func(fname, fdir) + func = get_func(fname, fname, fdir) except ImportError: raise XtriggerConfigError( label, diff --git a/cylc/flow/xtriggers/wall_clock.py b/cylc/flow/xtriggers/wall_clock.py index 3c50d956f0a..9d615da115a 100644 --- a/cylc/flow/xtriggers/wall_clock.py +++ b/cylc/flow/xtriggers/wall_clock.py @@ -17,6 +17,8 @@ """xtrigger function to trigger off of a wall clock time.""" from time import time +from cylc.flow.cycling.iso8601 import interval_parse +from cylc.flow.exceptions import WorkflowConfigError def wall_clock(trigger_time=None): @@ -27,3 +29,33 @@ def wall_clock(trigger_time=None): Trigger time as seconds since Unix epoch. """ return time() > trigger_time + + +def validate_config(f_args, f_kwargs, f_signature): + """Validate and manipulate args parsed from the workflow config. + + wall_clock() # zero offset + wall_clock(PT1H) + wall_clock(offset=PT1H) + + And offset must be a valid ISO 8601 interval. + + If f_args used, convert to f_kwargs for clarity. + + """ + + n_args = len(f_args) + n_kwargs = len(f_kwargs) + + if n_args + n_kwargs > 1: + raise WorkflowConfigError(f"xtrigger: too many args: {f_signature}") + + if n_args: + f_kwargs["offset"] = f_args[0] + elif not n_kwargs: + f_kwargs["offset"] = "P0Y" + + try: + interval_parse(f_kwargs["offset"]) + except ValueError: + raise WorkflowConfigError(f"xtrigger: invalid offset: {f_signature}") diff --git a/tests/unit/test_subprocpool.py b/tests/unit/test_subprocpool.py index dc55fb00fd6..94bce09b197 100644 --- a/tests/unit/test_subprocpool.py +++ b/tests/unit/test_subprocpool.py @@ -152,7 +152,8 @@ def test_xfunction(self): with the_answer_file.open(mode="w") as f: f.write("""the_answer = lambda: 42""") f.flush() - fn = get_func("the_answer", temp_dir) + f_name = "the_answer" + fn = get_func(f_name, f_name, temp_dir) result = fn() self.assertEqual(42, result) @@ -165,14 +166,15 @@ def test_xfunction_cache(self): with amandita_file.open(mode="w") as f: f.write("""amandita = lambda: 'chocolate'""") f.flush() - fn = get_func("amandita", temp_dir) + f_name = "amandita" + fn = get_func(f_name, f_name, temp_dir) result = fn() self.assertEqual('chocolate', result) # is in the cache self.assertTrue('amandita' in _XTRIG_FUNCS) # returned from cache - self.assertEqual(fn, get_func("amandita", temp_dir)) + self.assertEqual(fn, get_func(f_name, f_name, temp_dir)) del _XTRIG_FUNCS['amandita'] # is not in the cache self.assertFalse('amandita' in _XTRIG_FUNCS) @@ -186,7 +188,7 @@ def test_xfunction_import_error(self): """ with TemporaryDirectory() as temp_dir: with self.assertRaises(ModuleNotFoundError): - get_func("invalid-module-name", temp_dir) + get_func("invalid-module-name", "func-name", temp_dir) def test_xfunction_attribute_error(self): """Test for error on looking for an attribute in a xtrigger script.""" @@ -197,8 +199,9 @@ def test_xfunction_attribute_error(self): with the_answer_file.open(mode="w") as f: f.write("""the_droid = lambda: 'excalibur'""") f.flush() + f_name = "the_sword" with self.assertRaises(AttributeError): - get_func("the_sword", temp_dir) + get_func(f_name, f_name, temp_dir) @pytest.fixture