diff --git a/CHANGES.rst b/CHANGES.rst index 3df02af5f..88b4d901d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -19,6 +19,8 @@ Fixes: - Fix default `XML` format resolution for `WPS` endpoint when no ``Accept`` header or ``format``/``f`` query parameter is provided and that the request is submitted from a Web Browser, which involves additional control logic to select the applicable ``Content-Type`` for the response. +- Fix pre-forked ``celery`` worker process inconsistently resolving the ``pyramid`` registry applied + by ``pyramid_celery`` after worker restart. .. _changes_4.36.0: diff --git a/tests/functional/test_celery.py b/tests/functional/test_celery.py new file mode 100644 index 000000000..db7c42de9 --- /dev/null +++ b/tests/functional/test_celery.py @@ -0,0 +1,187 @@ +""" +Tests to validate that :mod:`celery` execution behaves as intended. +""" +import contextlib +import inspect +import json +import os +import subprocess +import sys +import tempfile +from typing import TYPE_CHECKING + +from tests.utils import get_settings_from_testapp, get_test_weaver_app, setup_config_with_mongodb +from weaver.config import WeaverConfiguration +from weaver.database import get_db +from weaver.database.mongodb import get_mongodb_connection +from weaver.utils import retry_on_condition +from weaver.wps.utils import get_wps_url + +if TYPE_CHECKING: + from pymongo.collection import Collection + + +def is_attribute_none(exception): + # type: (Exception) -> bool + return isinstance(exception, AttributeError) and "None" in str(exception) + + +def get_taskmeta_output(taskmeta_collection, output): + # type: (Collection, str) -> str + taskmeta = taskmeta_collection.find_one({"_id": output.strip()}) + return taskmeta.get("traceback", "") + taskmeta.get("result", "") + + +def test_celery_registry_resolution(): + python_bin = sys.executable + python_dir = os.path.dirname(python_bin) + debug_path = os.path.expandvars(os.environ["PATH"]) + celery_bin = os.path.join(python_dir, "celery") + + config = setup_config_with_mongodb(settings={ + "weaver.configuration": WeaverConfiguration.HYBRID, + "weaver.wps_output_url": "http://localhost/wps-outputs", + "weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", # nosec: B108 # don't care hardcoded for test + }) + webapp = get_test_weaver_app(config=config) + settings = get_settings_from_testapp(webapp) + wps_url = get_wps_url(settings) + job_store = get_db(settings).get_store("jobs") + job1 = job_store.save_job( + task_id="tmp", + process="jsonarray2netcdf", + inputs={"input": {"href": "http://random-dont-care.com/fake.json"}}, + ) + job2 = job_store.save_job( + task_id="tmp", + process="jsonarray2netcdf", + inputs={"input": {"href": "http://random-dont-care.com/fake.json"}}, + ) + + with contextlib.ExitStack() as stack: + celery_mongo_broker = f"""mongodb://{settings["mongodb.host"]}:{settings["mongodb.port"]}/celery-test""" + cfg_ini = stack.enter_context(tempfile.NamedTemporaryFile(suffix=".ini", mode="w", encoding="utf-8")) + cfg_ini.write( + inspect.cleandoc(f""" + [app:main] + use = egg:weaver + [celery] + broker_url = {celery_mongo_broker} + result_backend = {celery_mongo_broker} + """) + ) + cfg_ini.flush() + cfg_ini.seek(0) + + celery_process = stack.enter_context(subprocess.Popen( + [ + celery_bin, + "-A", + "pyramid_celery.celery_app", + "worker", + "-B", + "-E", + "--ini", cfg_ini.name, + "--loglevel=DEBUG", + "--time-limit", "10", + "--soft-time-limit", "10", + "--detach", + # following will cause an error on any subsequent task + # if registry is not properly retrieved across processes/threads + "--concurrency", "1", + "--max-tasks-per-child", "1", + ], + universal_newlines=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": f"{python_dir}:{debug_path}"}, + )) # type: subprocess.Popen + celery_stdout, celery_stderr = celery_process.communicate() + celery_output = celery_stdout + celery_stderr + assert "Traceback" not in celery_output, "Unhandled error at Weaver/Celery startup. Cannot resume test." + assert all([ + msg in celery_output + for msg in + [ + "Initiating weaver application", + "Celery runner detected.", + ] + ]) + + celery_task_cmd1 = stack.enter_context(subprocess.Popen( + [ + celery_bin, + "-b", celery_mongo_broker, + "call", + "-a", json.dumps([str(job1.uuid), wps_url]), + "weaver.processes.execution.execute_process", + ], + universal_newlines=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": f"{python_dir}:{debug_path}"}, + )) # type: subprocess.Popen + celery_task_cmd2 = stack.enter_context(subprocess.Popen( + [ + celery_bin, + "-b", celery_mongo_broker, + "call", + "-a", json.dumps([str(job2.uuid), wps_url]), + "weaver.processes.execution.execute_process", + ], + universal_newlines=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": f"{python_dir}:{debug_path}"}, + )) # type: subprocess.Popen + + task1_output, _ = retry_on_condition( + lambda: celery_task_cmd1.communicate(), + condition=is_attribute_none, retries=5, interval=1, + ) + task2_output, _ = retry_on_condition( + lambda: celery_task_cmd2.communicate(), + condition=is_attribute_none, retries=5, interval=1, + ) + + celery_mongo_db = get_mongodb_connection({ + "mongodb.host": settings["mongodb.host"], + "mongodb.port": settings["mongodb.port"], + "mongodb.db_name": "celery-test", + }) + celery_taskmeta = celery_mongo_db.celery_taskmeta + task1_result = retry_on_condition( + get_taskmeta_output, celery_taskmeta, task1_output, + condition=is_attribute_none, retries=5, interval=1, + ) + task2_result = retry_on_condition( + get_taskmeta_output, celery_taskmeta, task2_output, + condition=is_attribute_none, retries=5, interval=1, + ) + + # following errors are not necessarily linked directly to celery failing + # however, if all other tests pass except this one, there's a big chance + # it is caused by a celery concurrency/processes/threading issue with the pyramid registry + potential_errors = [ + "AttributeError: 'NoneType' object", + "if settings.get(setting, None) is None", + "get_registry()", + "get_settings()", + "get_db()", + "get_registry(app)", + "get_settings(app)", + "get_db(app)", + "get_registry(celery_app)", + "get_settings(celery_app)", + "get_db(celery_app)", + "get_registry(None)", + "get_settings(None)", + "get_db(None)", + ] + task1_found_errors = [err_msg for err_msg in potential_errors if err_msg in task1_result] + task2_found_errors = [err_msg for err_msg in potential_errors if err_msg in task2_result] + assert not task1_found_errors, "potential error detected with celery and pyramid registry utilities" + assert not task2_found_errors, "potential error detected with celery and pyramid registry utilities" diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index f82e52a44..bdd4f6512 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -28,6 +28,7 @@ get_settings_from_config_ini, get_settings_from_testapp, get_test_weaver_app, + mocked_dismiss_process, mocked_execute_celery, mocked_file_server, mocked_sub_requests, @@ -451,10 +452,13 @@ def clean_test_processes(cls, allowed_codes=frozenset([HTTPOk.code, HTTPNotFound headers=cls.headers, cookies=cls.cookies, ignore_errors=True, log_enabled=False) cls.assert_response(resp, allowed_codes, message="Failed cleanup of test processes jobs!") - for job in resp.json.get("jobs", []): - cls.request("DELETE", f"{path}/{job}", - headers=cls.headers, cookies=cls.cookies, - ignore_errors=True, log_enabled=False) + with contextlib.ExitStack() as stack: + if cls.is_webtest(): + stack.enter_context(mocked_dismiss_process()) + for job in resp.json.get("jobs", []): + cls.request("DELETE", f"{path}/{job}", + headers=cls.headers, cookies=cls.cookies, + ignore_errors=True, log_enabled=False) # then clean the actual process path = f"/processes/{process_info.id}" @@ -818,7 +822,7 @@ def workflow_runner(self, stack_exec.enter_context(mock.patch(data_source_use, side_effect=self.mock_get_data_source_from_url)) if self.is_webtest(): # mock execution when running on local Web Test app since no Celery runner is available - for mock_exec in mocked_execute_celery(): + for mock_exec in mocked_execute_celery(web_test_app=self.app): stack_exec.enter_context(mock_exec) # mock HTTP HEAD request to validate WPS output access (see 'setUpClass' details) mock_req = stack_exec.enter_context(mocked_wps_output(self.settings, mock_head=True, mock_get=False)) diff --git a/tests/utils.py b/tests/utils.py index f9eb4c7b8..8e98f098a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, overload # Note: do NOT import 'boto3' here otherwise 'moto' will not be able to mock it effectively +import celery import mock import moto import pkg_resources @@ -58,7 +59,7 @@ from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, load_pywps_config if TYPE_CHECKING: - from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union + from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Type, TypeVar, Union from typing_extensions import Literal from mypy_boto3_s3.client import S3Client @@ -1047,8 +1048,12 @@ def mocked_wps_output(settings, # type: SettingsType ) -def mocked_execute_celery(celery_task="weaver.processes.execution.execute_process", func_execute_task=None): - # type: (str, Optional[Callable[[...], Any]]) -> Iterable[MockPatch] +def mocked_execute_celery( + celery_task="weaver.processes.execution.execute_process", # type: str + func_execute_task=None, # type: Optional[Callable[[...], Any]] + web_test_app=None, # type: Optional[TestApp] + celery_app_name="app", # type: str +): # type: (...) -> Iterable[MockPatch] """ Contextual mock of a task execution to run locally instead of dispatched :mod:`celery` worker. @@ -1072,11 +1077,17 @@ def mocked_execute_celery(celery_task="weaver.processes.execution.execute_proces should omit the input argument for the :class:`celery.task.Task` that will not be automatically inserted. The return value is ignored, as the mocked :class:`celery.task.Task` is always returned instead. If not provided, the function referred by :paramref:`celery_task` is imported and called directly. + :param web_test_app: + Test web application employed to execute tasks that would normally be dispatched to a :mod:`celery` worker. + If provided, ensures that references to :mod:`celery` that would normally look for the :mod:`pyramid` registry + will find the one provided by this test application, and all relevant settings set for it. + :param celery_app_name: + Name of the :mod:`celery` application or imported alias within the module referenced by :paramref:`celery_task`. """ - class MockTask(object): + class MockTaskContext(object): """ - Mocks the Celery Task for testing. + Mocks the :class:`celery.app.task.Context` for testing. Mocks call ``self.request.id`` in :func:`weaver.processes.execution.execute_process` and call ``result.id`` in :func:`weaver.processes.execution.submit_job_handler`. @@ -1094,15 +1105,17 @@ def id(self): # all following methods return what would be returned normally in sync mode def wait(self, *_, **__): + # type: (*Any, **Any) -> Type[CeleryTaskTimeoutError] raise CeleryTaskTimeoutError def ready(self, *_, **__): + # type: (*Any, **Any) -> bool return True - task = MockTask() + mocked_task_context = MockTaskContext() def mock_execute_task(*args, **kwargs): - # type: (*Any, **Any) -> MockTask + # type: (*Any, **Any) -> MockTaskContext if func_execute_task is None: mod, func = celery_task.rsplit(".", 1) module = importlib.import_module(mod) @@ -1110,17 +1123,37 @@ def mock_execute_task(*args, **kwargs): task_func(*args, **kwargs) else: func_execute_task(*args, **kwargs) # noqa - return task + return mocked_task_context - return ( + celery_mocks = [ mock.patch(f"{celery_task}.delay", side_effect=mock_execute_task), - mock.patch("celery.app.task.Context", return_value=task) - ) + mock.patch("celery.app.task.Context", return_value=mocked_task_context), + ] + + class MockCeleryApp(celery.Celery): + @property + def conf(self): + return {"PYRAMID_REGISTRY": web_test_app.app.registry} + + if isinstance(web_test_app, TestApp): + # WARNING: + # It is very critical that the mock referencing to the above test app registry is applied as context manager. + # Since the Celery app is created globally by 'pyramid_celery', omitting to reset the mock could cause the + # Pyramid registry and the underlying settings to be make its way across tests references. This can cause + # unexpected side-effects if configurations vary between tests to evaluate distinct use cases. Using the + # context manager, we make sure that the mock should live only during the corresponding 'with' block lifetime, + # which should be limited to more controlled duration than globally. + celery_task_module = celery_task.rsplit(".", 1)[0] + celery_mocks.append( + mock.patch(f"{celery_task_module}.{celery_app_name}", new_callable=MockCeleryApp, spec=celery.Celery) + ) + + return celery_mocks @contextlib.contextmanager def mocked_dismiss_process(): - # type: () -> mock.MagicMock + # type: () -> contextlib.AbstractContextManager[mock.MagicMock] """ Mock operations called to terminate :mod:`Celery` tasks. diff --git a/weaver/__init__.py b/weaver/__init__.py index 58b90eb07..dcdf81a58 100644 --- a/weaver/__init__.py +++ b/weaver/__init__.py @@ -28,9 +28,10 @@ def main(global_config, **settings): # type: (SettingsType, **Any) -> Router import weaver.app + from weaver.utils import is_celery # add flag to disable some unnecessary operations when runner is celery (worker) - settings["weaver.celery"] = sys.argv[0].rsplit("/", 1)[-1] == "celery" + settings["weaver.celery"] = is_celery() return weaver.app.main(global_config, **settings) diff --git a/weaver/database/__init__.py b/weaver/database/__init__.py index 3700557af..ad6339686 100644 --- a/weaver/database/__init__.py +++ b/weaver/database/__init__.py @@ -37,7 +37,7 @@ def get_db(container=None, reset_connection=False): if not reset_connection and registry and isinstance(getattr(registry, "db", None), MongoDatabase): return registry.db database = MongoDatabase(container) - if reset_connection: + if reset_connection and registry: registry.db = database return database diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 6177eaa3a..9bf8c9dd1 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -118,7 +118,7 @@ def execute_process(task, job_id, wps_url, headers=None): task_process = get_celery_process() rss_start = task_process.memory_info().rss - registry = get_registry(None) # local thread, whether locally or dispatched celery + registry = get_registry(app) # local thread, whether locally or dispatched celery settings = get_settings(registry) db = get_db(registry, reset_connection=True) # reset the connection because we are in a forked celery process store = db.get_store(StoreJobs) diff --git a/weaver/utils.py b/weaver/utils.py index e9b7aec0b..842928f53 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -49,6 +49,7 @@ from pyramid.settings import asbool, aslist from pyramid.threadlocal import get_current_registry from pyramid_beaker import set_cache_regions_from_settings +from pyramid_celery import celery_app as app from requests import HTTPError as RequestsHTTPError, Response from requests.structures import CaseInsensitiveDict from requests_file import FileAdapter @@ -64,7 +65,7 @@ from weaver.execute import ExecuteControlOption, ExecuteMode from weaver.formats import ContentType, get_content_type, get_extension, repr_json from weaver.status import map_status -from weaver.warning import TimeZoneInfoAlreadySetWarning +from weaver.warning import TimeZoneInfoAlreadySetWarning, UndefinedContainerWarning from weaver.xml_util import HTML_TREE_BUILDER, XML try: # refactor in jsonschema==4.18.0 @@ -457,6 +458,14 @@ def get_any_message(info, default=""): return (info.get("message") or info.get("description") or info.get("detail") or default).strip() +def is_celery(): + # type: () -> bool + """ + Detect if the current application was executed as a :mod:`celery` command. + """ + return sys.argv[0].rsplit("/", 1)[-1] == "celery" + + def get_registry(container=None, nothrow=False): # type: (Optional[AnyRegistryContainer], bool) -> Optional[Registry] """ @@ -468,6 +477,17 @@ def get_registry(container=None, nothrow=False): return container.registry if isinstance(container, Registry): return container + if container is None: + # find 2 parents since 'get_settings' calls 'get_registry' to provide better context + warnings.warn( + f"Function [{get_caller_name()}] called from [{get_caller_name(skip=2)}] " + "did not provide a settings container. Consider providing it explicitly.", + UndefinedContainerWarning, + ) + # preemptively check registry in celery if applicable + # avoids error related to forked processes when restarting workers + if container is None and is_celery(): + return app.conf.get("PYRAMID_REGISTRY", {}) if isinstance(container, WerkzeugRequest) or container is None: return get_current_registry() if nothrow: @@ -1508,14 +1528,23 @@ def make_dirs(path, mode=0o755, exist_ok=False): raise -def get_caller_name(skip=2, base_class=False): +def get_caller_name(skip=0, base_class=False): # type: (int, bool) -> str """ Find the name of a parent caller function or method. The name is returned with respective formats ``module.class.method`` or ``module.function``. - :param skip: specifies how many levels of stack to skip while getting the caller. + Supposing the following call stack ``main -> func1 -> func2 -> func3 -> get_caller_name``. + + Calling ``get_caller_name()`` or ``get_caller_name(skip=1)`` would return the full package location of ``func2`` + because it is 1-level higher than were ``get_caller_name`` is called from (inside ``func3``). + Calling ``get_caller_name(skip=0)`` would return ``func3`` directly, and ``func1`` for ``get_caller_name(skip=2)``. + + :param skip: + Specifies how many levels of stack to skip for getting the caller. + By default, uses ``skip=1`` to obtain the immediate parent function that called :func:`get_caller_name`, + were ``skip=0`` would be the function itself that called :func:`get_caller_name`. :param base_class: Specified if the base class should be returned or the top-most class in case of inheritance If the caller is not a class, this doesn't do anything. @@ -1732,6 +1761,7 @@ def retry_on_condition(operation, # type: AnyCallableAnyArgs *args, # type: Params.args condition=Exception, # type: RetryCondition retries=1, # type: int + interval=0, # type: Number **kwargs, # type: Params.kwargs ): # type: (...) -> Return """ @@ -1743,6 +1773,7 @@ def retry_on_condition(operation, # type: AnyCallableAnyArgs In case of a callable, success/failure result should be returned to indicate if retry is needed. If retry is not requested by the handler for the specified exception, it is raised directly. :param retries: Amount of retries to perform. If retries are exhausted, the final exception is re-raised. + :param interval: wait time interval (seconds) between retries. :return: Expected normal operation return value if it was handled within the specified amount of retries. """ if ( @@ -1781,6 +1812,8 @@ def retry_on_condition(operation, # type: AnyCallableAnyArgs last_exc = exc LOGGER.warning("Operation '%s' failed but matched handler condition for retry. Retrying (%s/%s)...", name, attempt, retries) + if interval and remain: + time.sleep(interval) LOGGER.error("Operation '%s' still failing. Maximum retry attempts reached (%s).", name, retries) raise last_exc diff --git a/weaver/warning.py b/weaver/warning.py index 85f91e48c..f0ee58674 100644 --- a/weaver/warning.py +++ b/weaver/warning.py @@ -3,31 +3,43 @@ """ -class TimeZoneInfoAlreadySetWarning(Warning): +class WeaverWarning(Warning): + """ + Base class of :class:`Warning` defined by :mod:`weaver` package. + """ + + +class UndefinedContainerWarning(WeaverWarning): + """ + Warn when settings or the registry could not be resolved from an explicit container reference. + """ + + +class TimeZoneInfoAlreadySetWarning(WeaverWarning): """ Warn when trying to obtain a localized time with already defined time-zone info. """ -class DisabledSSLCertificateVerificationWarning(Warning): +class DisabledSSLCertificateVerificationWarning(WeaverWarning): """ Warn when an option to disable SSL certificate verification is employed for some operations. """ -class UnsupportedOperationWarning(Warning): +class UnsupportedOperationWarning(WeaverWarning): """ Warn about an operation not yet implemented or unsupported according to context. """ -class NonBreakingExceptionWarning(Warning): +class NonBreakingExceptionWarning(WeaverWarning): """ Warn about an exception that is handled (ex: caught in try/except block) but still unexpected. """ -class MissingParameterWarning(Warning): +class MissingParameterWarning(WeaverWarning): """ Warn about an expected but missing parameter. """ diff --git a/weaver/wps/utils.py b/weaver/wps/utils.py index 552302e96..315ab88f6 100644 --- a/weaver/wps/utils.py +++ b/weaver/wps/utils.py @@ -156,9 +156,9 @@ def get_wps_output_context(request): def get_wps_local_status_location(url_status_location, container, must_exist=True): # type: (str, AnySettingsContainer, bool) -> Optional[str] """ - Attempts to retrieve the local XML file path corresponding to the WPS status location as URL. + Attempts to retrieve the local :term:`XML` file path corresponding to the :term:`WPS` status location as URL. - :param url_status_location: URL reference pointing to some WPS status location XML. + :param url_status_location: URL reference pointing to some WPS status location :term:`XML`. :param container: any settings container to map configured local paths. :param must_exist: return only existing path if enabled, otherwise return the parsed value without validation. :returns: found local file path if it exists, ``None`` otherwise.