From d5ed04eb1582058e5d2aaf47a62ce27cc55b9d35 Mon Sep 17 00:00:00 2001 From: Louis-David Perron <100434291+perronld@users.noreply.github.com> Date: Wed, 15 Nov 2023 16:52:15 -0500 Subject: [PATCH 01/15] Woraround for registry resolving when restarting celery prefork workers --- weaver/utils.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/weaver/utils.py b/weaver/utils.py index e9b7aec0b..9018e3d62 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -35,6 +35,7 @@ from bs4 import BeautifulSoup from celery.app import Celery from mypy_boto3_s3.literals import RegionName +from pyramid_celery import celery_app as app from pyramid.config import Configurator from pyramid.exceptions import ConfigurationError from pyramid.httpexceptions import ( @@ -47,7 +48,7 @@ from pyramid.request import Request as PyramidRequest from pyramid.response import _guess_type as guess_file_contents # noqa: W0212 from pyramid.settings import asbool, aslist -from pyramid.threadlocal import get_current_registry +from pyramid.threadlocal import get_current_registry, get_current_request from pyramid_beaker import set_cache_regions_from_settings from requests import HTTPError as RequestsHTTPError, Response from requests.structures import CaseInsensitiveDict @@ -469,7 +470,10 @@ def get_registry(container=None, nothrow=False): if isinstance(container, Registry): return container if isinstance(container, WerkzeugRequest) or container is None: - return get_current_registry() + if get_current_request() is None: + return get_registry(app) + else: + return get_current_registry() if nothrow: return None raise TypeError(f"Could not retrieve registry from container object of type [{fully_qualified_name(container)}].") From d85cf99a034142e044891edd2f2250c96474e2a8 Mon Sep 17 00:00:00 2001 From: Louis-David Perron <100434291+perronld@users.noreply.github.com> Date: Wed, 15 Nov 2023 17:06:50 -0500 Subject: [PATCH 02/15] Fixed import sorting --- weaver/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/utils.py b/weaver/utils.py index 9018e3d62..346ea97d9 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -35,7 +35,6 @@ from bs4 import BeautifulSoup from celery.app import Celery from mypy_boto3_s3.literals import RegionName -from pyramid_celery import celery_app as app from pyramid.config import Configurator from pyramid.exceptions import ConfigurationError from pyramid.httpexceptions import ( @@ -50,6 +49,7 @@ from pyramid.settings import asbool, aslist from pyramid.threadlocal import get_current_registry, get_current_request from pyramid_beaker import set_cache_regions_from_settings +from pyramid_celery import celery_app as app from requests import HTTPError as RequestsHTTPError, Response from requests.structures import CaseInsensitiveDict from requests_file import FileAdapter From d0446627947b1056554186be495d70b68883410c Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 16 Nov 2023 00:05:53 -0500 Subject: [PATCH 03/15] add test to detect celery/pyramid-registry problematic resolution --- tests/functional/test_celery.py | 172 ++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 tests/functional/test_celery.py diff --git a/tests/functional/test_celery.py b/tests/functional/test_celery.py new file mode 100644 index 000000000..786390109 --- /dev/null +++ b/tests/functional/test_celery.py @@ -0,0 +1,172 @@ +""" +Tests to validate that :mod:`celery` execution behaves as intended. +""" +import time + +import json + +import inspect + +import tempfile + +import contextlib + +import os +import subprocess +import sys + +from tests.utils import ( + get_settings_from_testapp, + get_test_weaver_app, + setup_config_with_mongodb +) +from weaver.config import WeaverConfiguration +from weaver.database import get_db +from weaver.database.mongodb import get_mongodb_connection +from weaver.wps.utils import get_wps_url + + +def test_celery_registry_resolution(): + python_bin = sys.executable + python_dir = os.path.dirname(python_bin) + debug_path = os.path.expandvars(os.environ["PATH"]) + celery_bin = os.path.join(python_dir, "celery") + + config = setup_config_with_mongodb(settings={ + "weaver.configuration": WeaverConfiguration.HYBRID, + "weaver.wps_output_url": "http://localhost/wps-outputs", + "weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", # nosec: B108 # don't care hardcoded for test + }) + webapp = get_test_weaver_app(config=config) + settings = get_settings_from_testapp(webapp) + wps_url = get_wps_url(settings) + job_store = get_db(settings).get_store("jobs") + job1 = job_store.save_job( + task_id="tmp", + process="jsonarray2netcdf", + inputs={"input": {"href": "http://random-dont-care.com/fake.json"}}, + ) + job2 = job_store.save_job( + task_id="tmp", + process="jsonarray2netcdf", + inputs={"input": {"href": "http://random-dont-care.com/fake.json"}}, + ) + + with contextlib.ExitStack() as stack: + celery_mongo_broker = f"""mongodb://{settings["mongodb.host"]}:{settings["mongodb.port"]}/celery-test""" + cfg_ini = stack.enter_context(tempfile.NamedTemporaryFile(suffix=".ini", mode="w", encoding="utf-8")) + cfg_ini.write( + inspect.cleandoc(f""" + [app:main] + use = egg:weaver + [celery] + broker_url = {celery_mongo_broker} + result_backend = {celery_mongo_broker} + """) + ) + cfg_ini.flush() + cfg_ini.seek(0) + + celery_process = stack.enter_context(subprocess.Popen( + [ + celery_bin, + "-A", + "pyramid_celery.celery_app", + "worker", + "-B", + "-E", + "--ini", cfg_ini.name, + "--loglevel=DEBUG", + "--time-limit", "10", + "--soft-time-limit", "10", + "--detach", + # following will cause an error on any subsequent task + # if registry is not properly retrieved across processes/threads + "--concurrency", "1", + "--max-tasks-per-child", "1", + ], + universal_newlines=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": f"{python_dir}:{debug_path}"}, + )) # type: subprocess.Popen + celery_stdout, celery_stderr = celery_process.communicate() + celery_output = celery_stdout + celery_stderr + assert all([ + msg in celery_output + for msg in + [ + "Initiating weaver application", + "Celery runner detected.", + ] + ]) + + celery_task_cmd1 = stack.enter_context(subprocess.Popen( + [ + celery_bin, + "-b", celery_mongo_broker, + "call", + "-a", json.dumps([str(job1.uuid), wps_url]), + "weaver.processes.execution.execute_process", + ], + universal_newlines=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": f"{python_dir}:{debug_path}"}, + )) # type: subprocess.Popen + celery_task_cmd2 = stack.enter_context(subprocess.Popen( + [ + celery_bin, + "-b", celery_mongo_broker, + "call", + "-a", json.dumps([str(job2.uuid), wps_url]), + "weaver.processes.execution.execute_process", + ], + universal_newlines=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": f"{python_dir}:{debug_path}"}, + )) # type: subprocess.Popen + + task1, _ = celery_task_cmd1.communicate() + time.sleep(.25) + task2, _ = celery_task_cmd2.communicate() + time.sleep(.25) + + celery_mongo_db = get_mongodb_connection({ + "mongodb.host": settings["mongodb.host"], + "mongodb.port": settings["mongodb.port"], + "mongodb.db_name": "celery-test", + }) + celery_taskmeta = celery_mongo_db.celery_taskmeta + task1_meta = celery_taskmeta.find_one({"_id": task1.strip()}) + task2_meta = celery_taskmeta.find_one({"_id": task2.strip()}) + task1_result = task1_meta.get("traceback", "") + task1_meta.get("result", "") + task2_result = task2_meta.get("traceback", "") + task2_meta.get("result", "") + + # following errors are not necessarily linked directly to celery failing + # however, if all other tests pass except this one, there's a big chance + # it is caused by a celery concurrency/processes/threading issue with the pyramid registry + potential_errors = [ + "AttributeError: 'NoneType' object", + "if settings.get(setting, None) is None", + "get_registry()", + "get_settings()", + "get_db()", + "get_registry(app)", + "get_settings(app)", + "get_db(app)", + "get_registry(celery_app)", + "get_settings(celery_app)", + "get_db(celery_app)", + "get_registry(None)", + "get_settings(None)", + "get_db(None)", + ] + task1_found_errors = [err_msg for err_msg in potential_errors if err_msg in task1_result] + task2_found_errors = [err_msg for err_msg in potential_errors if err_msg in task2_result] + assert not task1_found_errors, "potential error detected with celery and pyramid registry utilities" + assert not task2_found_errors, "potential error detected with celery and pyramid registry utilities" From 641278ab581f9b23f564571bb25c491c984b4c81 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 16 Nov 2023 00:11:01 -0500 Subject: [PATCH 04/15] fix imports linting --- tests/functional/test_celery.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/tests/functional/test_celery.py b/tests/functional/test_celery.py index 786390109..55770a084 100644 --- a/tests/functional/test_celery.py +++ b/tests/functional/test_celery.py @@ -1,25 +1,16 @@ """ Tests to validate that :mod:`celery` execution behaves as intended. """ -import time - -import json - -import inspect - -import tempfile - import contextlib - +import inspect +import json import os import subprocess import sys +import tempfile +import time -from tests.utils import ( - get_settings_from_testapp, - get_test_weaver_app, - setup_config_with_mongodb -) +from tests.utils import get_settings_from_testapp, get_test_weaver_app, setup_config_with_mongodb from weaver.config import WeaverConfiguration from weaver.database import get_db from weaver.database.mongodb import get_mongodb_connection From 2f28d0f00eb39feb5e508fc2ec122692a0175f84 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 16 Nov 2023 00:14:26 -0500 Subject: [PATCH 05/15] increase delay for python 3.11 'too fast' --- tests/functional/test_celery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional/test_celery.py b/tests/functional/test_celery.py index 55770a084..7ff65ea1f 100644 --- a/tests/functional/test_celery.py +++ b/tests/functional/test_celery.py @@ -123,9 +123,9 @@ def test_celery_registry_resolution(): )) # type: subprocess.Popen task1, _ = celery_task_cmd1.communicate() - time.sleep(.25) + time.sleep(1) task2, _ = celery_task_cmd2.communicate() - time.sleep(.25) + time.sleep(1) celery_mongo_db = get_mongodb_connection({ "mongodb.host": settings["mongodb.host"], From ab1b834164d79d00935fdb2d902d146c951aa4d2 Mon Sep 17 00:00:00 2001 From: Louis-David Perron <100434291+perronld@users.noreply.github.com> Date: Mon, 20 Nov 2023 14:51:41 -0500 Subject: [PATCH 06/15] Replaced get_current_request by sys.argv to detect celery worker presence --- weaver/utils.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/weaver/utils.py b/weaver/utils.py index 346ea97d9..55308b71a 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -47,7 +47,7 @@ from pyramid.request import Request as PyramidRequest from pyramid.response import _guess_type as guess_file_contents # noqa: W0212 from pyramid.settings import asbool, aslist -from pyramid.threadlocal import get_current_registry, get_current_request +from pyramid.threadlocal import get_current_registry from pyramid_beaker import set_cache_regions_from_settings from pyramid_celery import celery_app as app from requests import HTTPError as RequestsHTTPError, Response @@ -469,11 +469,10 @@ def get_registry(container=None, nothrow=False): return container.registry if isinstance(container, Registry): return container + if container is None and sys.argv[0].rsplit("/", 1)[-1] == "celery": + return app.conf.get("PYRAMID_REGISTRY", {}) if isinstance(container, WerkzeugRequest) or container is None: - if get_current_request() is None: - return get_registry(app) - else: - return get_current_registry() + return get_current_registry() if nothrow: return None raise TypeError(f"Could not retrieve registry from container object of type [{fully_qualified_name(container)}].") From 0e1928f57c1f0ae162c2a951a72821d91ab5a86a Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 20 Nov 2023 15:44:49 -0500 Subject: [PATCH 07/15] add is_celery utility --- weaver/__init__.py | 3 ++- weaver/utils.py | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/weaver/__init__.py b/weaver/__init__.py index 58b90eb07..dcdf81a58 100644 --- a/weaver/__init__.py +++ b/weaver/__init__.py @@ -28,9 +28,10 @@ def main(global_config, **settings): # type: (SettingsType, **Any) -> Router import weaver.app + from weaver.utils import is_celery # add flag to disable some unnecessary operations when runner is celery (worker) - settings["weaver.celery"] = sys.argv[0].rsplit("/", 1)[-1] == "celery" + settings["weaver.celery"] = is_celery() return weaver.app.main(global_config, **settings) diff --git a/weaver/utils.py b/weaver/utils.py index e9b7aec0b..0bf3d871a 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -457,6 +457,14 @@ def get_any_message(info, default=""): return (info.get("message") or info.get("description") or info.get("detail") or default).strip() +def is_celery(): + # type: () -> bool + """ + Detect if the current application was executed as a :mod:`celery` command. + """ + return sys.argv[0].rsplit("/", 1)[-1] == "celery" + + def get_registry(container=None, nothrow=False): # type: (Optional[AnyRegistryContainer], bool) -> Optional[Registry] """ From bca45664ecc40fd59bcb91486b366c3ae7cc977d Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 20 Nov 2023 16:18:58 -0500 Subject: [PATCH 08/15] add caller warning to identify sources of missing settings/registry explicitly provided --- weaver/utils.py | 28 ++++++++++++++++++++++++---- weaver/warning.py | 22 +++++++++++++++++----- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/weaver/utils.py b/weaver/utils.py index 0443756ca..1c584324e 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -65,7 +65,7 @@ from weaver.execute import ExecuteControlOption, ExecuteMode from weaver.formats import ContentType, get_content_type, get_extension, repr_json from weaver.status import map_status -from weaver.warning import TimeZoneInfoAlreadySetWarning +from weaver.warning import UndefinedContainerWarning, TimeZoneInfoAlreadySetWarning from weaver.xml_util import HTML_TREE_BUILDER, XML try: # refactor in jsonschema==4.18.0 @@ -477,7 +477,16 @@ def get_registry(container=None, nothrow=False): return container.registry if isinstance(container, Registry): return container - if container is None and sys.argv[0].rsplit("/", 1)[-1] == "celery": + if container is None: + # find 2 parents since 'get_settings' calls 'get_registry' to provide better context + warnings.warn( + f"Function [{get_caller_name()}] called from [{get_caller_name(skip=2)}] " + "did not provide a settings container. Consider providing it explicitly.", + UndefinedContainerWarning, + ) + # preemptively check registry in celery if applicable + # avoids error related to forked processes when restarting workers + if container is None and is_celery(): return app.conf.get("PYRAMID_REGISTRY", {}) if isinstance(container, WerkzeugRequest) or container is None: return get_current_registry() @@ -1519,14 +1528,25 @@ def make_dirs(path, mode=0o755, exist_ok=False): raise -def get_caller_name(skip=2, base_class=False): +def get_caller_name(skip=0, base_class=False): # type: (int, bool) -> str """ Find the name of a parent caller function or method. The name is returned with respective formats ``module.class.method`` or ``module.function``. - :param skip: specifies how many levels of stack to skip while getting the caller. + Example: + + Supposing the following call stack ``main -> func1 -> func2 -> func3 -> get_caller_name``. + + Calling ``get_caller_name()`` or ``get_caller_name(skip=1)`` would return the full package location of ``func2`` + because it is 1-level higher than were ``get_caller_name`` is called from (inside ``func3``). + Calling ``get_caller_name(skip=0)`` would return ``func3`` directly, and ``func1`` for ``get_caller_name(skip=2)``. + + :param skip: + Specifies how many levels of stack to skip for getting the caller. + By default, uses ``skip=1`` to obtain the immediate parent function that called :func:`get_caller_name`, + were ``skip=0`` would be the function itself that called :func:`get_caller_name`. :param base_class: Specified if the base class should be returned or the top-most class in case of inheritance If the caller is not a class, this doesn't do anything. diff --git a/weaver/warning.py b/weaver/warning.py index 85f91e48c..f0ee58674 100644 --- a/weaver/warning.py +++ b/weaver/warning.py @@ -3,31 +3,43 @@ """ -class TimeZoneInfoAlreadySetWarning(Warning): +class WeaverWarning(Warning): + """ + Base class of :class:`Warning` defined by :mod:`weaver` package. + """ + + +class UndefinedContainerWarning(WeaverWarning): + """ + Warn when settings or the registry could not be resolved from an explicit container reference. + """ + + +class TimeZoneInfoAlreadySetWarning(WeaverWarning): """ Warn when trying to obtain a localized time with already defined time-zone info. """ -class DisabledSSLCertificateVerificationWarning(Warning): +class DisabledSSLCertificateVerificationWarning(WeaverWarning): """ Warn when an option to disable SSL certificate verification is employed for some operations. """ -class UnsupportedOperationWarning(Warning): +class UnsupportedOperationWarning(WeaverWarning): """ Warn about an operation not yet implemented or unsupported according to context. """ -class NonBreakingExceptionWarning(Warning): +class NonBreakingExceptionWarning(WeaverWarning): """ Warn about an exception that is handled (ex: caught in try/except block) but still unexpected. """ -class MissingParameterWarning(Warning): +class MissingParameterWarning(WeaverWarning): """ Warn about an expected but missing parameter. """ From 23047deb96d873b50139d16008d6982a3a64179b Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 20 Nov 2023 17:03:25 -0500 Subject: [PATCH 09/15] fix doc lint --- weaver/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/weaver/utils.py b/weaver/utils.py index 1c584324e..63044cf4c 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -1535,8 +1535,6 @@ def get_caller_name(skip=0, base_class=False): The name is returned with respective formats ``module.class.method`` or ``module.function``. - Example: - Supposing the following call stack ``main -> func1 -> func2 -> func3 -> get_caller_name``. Calling ``get_caller_name()`` or ``get_caller_name(skip=1)`` would return the full package location of ``func2`` From b7a64f5a8c77c6a3d966428a1fab0effbf69144d Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 20 Nov 2023 19:18:35 -0500 Subject: [PATCH 10/15] pass celery app in execute_process to resolve registry reference --- weaver/processes/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 6177eaa3a..9bf8c9dd1 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -118,7 +118,7 @@ def execute_process(task, job_id, wps_url, headers=None): task_process = get_celery_process() rss_start = task_process.memory_info().rss - registry = get_registry(None) # local thread, whether locally or dispatched celery + registry = get_registry(app) # local thread, whether locally or dispatched celery settings = get_settings(registry) db = get_db(registry, reset_connection=True) # reset the connection because we are in a forked celery process store = db.get_store(StoreJobs) From 62bc2f60417f153f53cea5fd011d2b7857ae9877 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 20 Nov 2023 19:21:50 -0500 Subject: [PATCH 11/15] update changelog --- CHANGES.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 3df02af5f..88b4d901d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -19,6 +19,8 @@ Fixes: - Fix default `XML` format resolution for `WPS` endpoint when no ``Accept`` header or ``format``/``f`` query parameter is provided and that the request is submitted from a Web Browser, which involves additional control logic to select the applicable ``Content-Type`` for the response. +- Fix pre-forked ``celery`` worker process inconsistently resolving the ``pyramid`` registry applied + by ``pyramid_celery`` after worker restart. .. _changes_4.36.0: From 8a5e2d06e54138c8533abb4597fe1b75c6c6792e Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 21 Nov 2023 11:25:43 -0500 Subject: [PATCH 12/15] fix imports lint --- weaver/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/utils.py b/weaver/utils.py index 63044cf4c..2665ea56b 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -65,7 +65,7 @@ from weaver.execute import ExecuteControlOption, ExecuteMode from weaver.formats import ContentType, get_content_type, get_extension, repr_json from weaver.status import map_status -from weaver.warning import UndefinedContainerWarning, TimeZoneInfoAlreadySetWarning +from weaver.warning import TimeZoneInfoAlreadySetWarning, UndefinedContainerWarning from weaver.xml_util import HTML_TREE_BUILDER, XML try: # refactor in jsonschema==4.18.0 From d631a753bb739f98805cb4dad3cd937971d9b130 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 21 Nov 2023 14:38:29 -0500 Subject: [PATCH 13/15] handle test celery variable taskmeta retrieval delay --- tests/functional/test_celery.py | 42 ++++++++++++++++++++++++++------- weaver/utils.py | 4 ++++ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/tests/functional/test_celery.py b/tests/functional/test_celery.py index 7ff65ea1f..db7c42de9 100644 --- a/tests/functional/test_celery.py +++ b/tests/functional/test_celery.py @@ -8,14 +8,29 @@ import subprocess import sys import tempfile -import time +from typing import TYPE_CHECKING from tests.utils import get_settings_from_testapp, get_test_weaver_app, setup_config_with_mongodb from weaver.config import WeaverConfiguration from weaver.database import get_db from weaver.database.mongodb import get_mongodb_connection +from weaver.utils import retry_on_condition from weaver.wps.utils import get_wps_url +if TYPE_CHECKING: + from pymongo.collection import Collection + + +def is_attribute_none(exception): + # type: (Exception) -> bool + return isinstance(exception, AttributeError) and "None" in str(exception) + + +def get_taskmeta_output(taskmeta_collection, output): + # type: (Collection, str) -> str + taskmeta = taskmeta_collection.find_one({"_id": output.strip()}) + return taskmeta.get("traceback", "") + taskmeta.get("result", "") + def test_celery_registry_resolution(): python_bin = sys.executable @@ -84,6 +99,7 @@ def test_celery_registry_resolution(): )) # type: subprocess.Popen celery_stdout, celery_stderr = celery_process.communicate() celery_output = celery_stdout + celery_stderr + assert "Traceback" not in celery_output, "Unhandled error at Weaver/Celery startup. Cannot resume test." assert all([ msg in celery_output for msg in @@ -122,10 +138,14 @@ def test_celery_registry_resolution(): env={"PATH": f"{python_dir}:{debug_path}"}, )) # type: subprocess.Popen - task1, _ = celery_task_cmd1.communicate() - time.sleep(1) - task2, _ = celery_task_cmd2.communicate() - time.sleep(1) + task1_output, _ = retry_on_condition( + lambda: celery_task_cmd1.communicate(), + condition=is_attribute_none, retries=5, interval=1, + ) + task2_output, _ = retry_on_condition( + lambda: celery_task_cmd2.communicate(), + condition=is_attribute_none, retries=5, interval=1, + ) celery_mongo_db = get_mongodb_connection({ "mongodb.host": settings["mongodb.host"], @@ -133,10 +153,14 @@ def test_celery_registry_resolution(): "mongodb.db_name": "celery-test", }) celery_taskmeta = celery_mongo_db.celery_taskmeta - task1_meta = celery_taskmeta.find_one({"_id": task1.strip()}) - task2_meta = celery_taskmeta.find_one({"_id": task2.strip()}) - task1_result = task1_meta.get("traceback", "") + task1_meta.get("result", "") - task2_result = task2_meta.get("traceback", "") + task2_meta.get("result", "") + task1_result = retry_on_condition( + get_taskmeta_output, celery_taskmeta, task1_output, + condition=is_attribute_none, retries=5, interval=1, + ) + task2_result = retry_on_condition( + get_taskmeta_output, celery_taskmeta, task2_output, + condition=is_attribute_none, retries=5, interval=1, + ) # following errors are not necessarily linked directly to celery failing # however, if all other tests pass except this one, there's a big chance diff --git a/weaver/utils.py b/weaver/utils.py index 2665ea56b..842928f53 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -1761,6 +1761,7 @@ def retry_on_condition(operation, # type: AnyCallableAnyArgs *args, # type: Params.args condition=Exception, # type: RetryCondition retries=1, # type: int + interval=0, # type: Number **kwargs, # type: Params.kwargs ): # type: (...) -> Return """ @@ -1772,6 +1773,7 @@ def retry_on_condition(operation, # type: AnyCallableAnyArgs In case of a callable, success/failure result should be returned to indicate if retry is needed. If retry is not requested by the handler for the specified exception, it is raised directly. :param retries: Amount of retries to perform. If retries are exhausted, the final exception is re-raised. + :param interval: wait time interval (seconds) between retries. :return: Expected normal operation return value if it was handled within the specified amount of retries. """ if ( @@ -1810,6 +1812,8 @@ def retry_on_condition(operation, # type: AnyCallableAnyArgs last_exc = exc LOGGER.warning("Operation '%s' failed but matched handler condition for retry. Retrying (%s/%s)...", name, attempt, retries) + if interval and remain: + time.sleep(interval) LOGGER.error("Operation '%s' still failing. Maximum retry attempts reached (%s).", name, retries) raise last_exc From 594549b3494c130997f924d60eac613a000d4093 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 21 Nov 2023 23:14:34 -0500 Subject: [PATCH 14/15] fix celery/pyramid registry retrieval during workflow tests --- tests/functional/test_workflow.py | 14 +++++++++----- tests/utils.py | 29 ++++++++++++++++++++--------- weaver/database/__init__.py | 2 +- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index f82e52a44..bdd4f6512 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -28,6 +28,7 @@ get_settings_from_config_ini, get_settings_from_testapp, get_test_weaver_app, + mocked_dismiss_process, mocked_execute_celery, mocked_file_server, mocked_sub_requests, @@ -451,10 +452,13 @@ def clean_test_processes(cls, allowed_codes=frozenset([HTTPOk.code, HTTPNotFound headers=cls.headers, cookies=cls.cookies, ignore_errors=True, log_enabled=False) cls.assert_response(resp, allowed_codes, message="Failed cleanup of test processes jobs!") - for job in resp.json.get("jobs", []): - cls.request("DELETE", f"{path}/{job}", - headers=cls.headers, cookies=cls.cookies, - ignore_errors=True, log_enabled=False) + with contextlib.ExitStack() as stack: + if cls.is_webtest(): + stack.enter_context(mocked_dismiss_process()) + for job in resp.json.get("jobs", []): + cls.request("DELETE", f"{path}/{job}", + headers=cls.headers, cookies=cls.cookies, + ignore_errors=True, log_enabled=False) # then clean the actual process path = f"/processes/{process_info.id}" @@ -818,7 +822,7 @@ def workflow_runner(self, stack_exec.enter_context(mock.patch(data_source_use, side_effect=self.mock_get_data_source_from_url)) if self.is_webtest(): # mock execution when running on local Web Test app since no Celery runner is available - for mock_exec in mocked_execute_celery(): + for mock_exec in mocked_execute_celery(web_test_app=self.app): stack_exec.enter_context(mock_exec) # mock HTTP HEAD request to validate WPS output access (see 'setUpClass' details) mock_req = stack_exec.enter_context(mocked_wps_output(self.settings, mock_head=True, mock_get=False)) diff --git a/tests/utils.py b/tests/utils.py index f9eb4c7b8..3493cc3d9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -32,6 +32,7 @@ from pyramid.httpexceptions import HTTPException, HTTPNotFound, HTTPUnprocessableEntity from pyramid.registry import Registry from pyramid.testing import DummyRequest +from pyramid_celery import celery_app from pytest_server_fixtures.http import HTTPTestServer, SimpleHTTPTestServer from requests import Response from webtest import TestApp, TestResponse @@ -1047,8 +1048,11 @@ def mocked_wps_output(settings, # type: SettingsType ) -def mocked_execute_celery(celery_task="weaver.processes.execution.execute_process", func_execute_task=None): - # type: (str, Optional[Callable[[...], Any]]) -> Iterable[MockPatch] +def mocked_execute_celery( + celery_task="weaver.processes.execution.execute_process", # type: str + func_execute_task=None, # type: Optional[Callable[[...], Any]] + web_test_app=None, # type: Optional[TestApp] +): # type: (...) -> Iterable[MockPatch] """ Contextual mock of a task execution to run locally instead of dispatched :mod:`celery` worker. @@ -1072,11 +1076,15 @@ def mocked_execute_celery(celery_task="weaver.processes.execution.execute_proces should omit the input argument for the :class:`celery.task.Task` that will not be automatically inserted. The return value is ignored, as the mocked :class:`celery.task.Task` is always returned instead. If not provided, the function referred by :paramref:`celery_task` is imported and called directly. + :param web_test_app: + Test web application employed to execute tasks without dispatch to :mod:`celery`. + If provided, ensures that references to :mod:`celery` that would normally look for the :mod:`pyramid` registry + will find the one provided by this test application, and all relevant settings set for it. """ - class MockTask(object): + class MockTaskContext(object): """ - Mocks the Celery Task for testing. + Mocks the :class:`celery.app.task.Context` for testing. Mocks call ``self.request.id`` in :func:`weaver.processes.execution.execute_process` and call ``result.id`` in :func:`weaver.processes.execution.submit_job_handler`. @@ -1099,10 +1107,10 @@ def wait(self, *_, **__): def ready(self, *_, **__): return True - task = MockTask() + mocked_task_context = MockTaskContext() def mock_execute_task(*args, **kwargs): - # type: (*Any, **Any) -> MockTask + # type: (*Any, **Any) -> MockTaskContext if func_execute_task is None: mod, func = celery_task.rsplit(".", 1) module = importlib.import_module(mod) @@ -1110,17 +1118,20 @@ def mock_execute_task(*args, **kwargs): task_func(*args, **kwargs) else: func_execute_task(*args, **kwargs) # noqa - return task + return mocked_task_context + + if isinstance(web_test_app, TestApp): + celery_app.conf.setdefault("PYRAMID_REGISTRY", web_test_app.app.registry) return ( mock.patch(f"{celery_task}.delay", side_effect=mock_execute_task), - mock.patch("celery.app.task.Context", return_value=task) + mock.patch("celery.app.task.Context", return_value=mocked_task_context), ) @contextlib.contextmanager def mocked_dismiss_process(): - # type: () -> mock.MagicMock + # type: () -> contextlib.AbstractContextManager[mock.MagicMock] """ Mock operations called to terminate :mod:`Celery` tasks. diff --git a/weaver/database/__init__.py b/weaver/database/__init__.py index 3700557af..ad6339686 100644 --- a/weaver/database/__init__.py +++ b/weaver/database/__init__.py @@ -37,7 +37,7 @@ def get_db(container=None, reset_connection=False): if not reset_connection and registry and isinstance(getattr(registry, "db", None), MongoDatabase): return registry.db database = MongoDatabase(container) - if reset_connection: + if reset_connection and registry: registry.db = database return database From bb8cdb4c517520b802b6204086d56d9192558e35 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 22 Nov 2023 01:02:38 -0500 Subject: [PATCH 15/15] second attempt patching workflow test with celery-app/pyramid-registry resolution --- tests/utils.py | 38 ++++++++++++++++++++++++++++++-------- weaver/wps/utils.py | 4 ++-- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/tests/utils.py b/tests/utils.py index 3493cc3d9..8e98f098a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, overload # Note: do NOT import 'boto3' here otherwise 'moto' will not be able to mock it effectively +import celery import mock import moto import pkg_resources @@ -32,7 +33,6 @@ from pyramid.httpexceptions import HTTPException, HTTPNotFound, HTTPUnprocessableEntity from pyramid.registry import Registry from pyramid.testing import DummyRequest -from pyramid_celery import celery_app from pytest_server_fixtures.http import HTTPTestServer, SimpleHTTPTestServer from requests import Response from webtest import TestApp, TestResponse @@ -59,7 +59,7 @@ from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, load_pywps_config if TYPE_CHECKING: - from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union + from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Type, TypeVar, Union from typing_extensions import Literal from mypy_boto3_s3.client import S3Client @@ -1052,6 +1052,7 @@ def mocked_execute_celery( celery_task="weaver.processes.execution.execute_process", # type: str func_execute_task=None, # type: Optional[Callable[[...], Any]] web_test_app=None, # type: Optional[TestApp] + celery_app_name="app", # type: str ): # type: (...) -> Iterable[MockPatch] """ Contextual mock of a task execution to run locally instead of dispatched :mod:`celery` worker. @@ -1077,9 +1078,11 @@ def mocked_execute_celery( The return value is ignored, as the mocked :class:`celery.task.Task` is always returned instead. If not provided, the function referred by :paramref:`celery_task` is imported and called directly. :param web_test_app: - Test web application employed to execute tasks without dispatch to :mod:`celery`. + Test web application employed to execute tasks that would normally be dispatched to a :mod:`celery` worker. If provided, ensures that references to :mod:`celery` that would normally look for the :mod:`pyramid` registry will find the one provided by this test application, and all relevant settings set for it. + :param celery_app_name: + Name of the :mod:`celery` application or imported alias within the module referenced by :paramref:`celery_task`. """ class MockTaskContext(object): @@ -1102,9 +1105,11 @@ def id(self): # all following methods return what would be returned normally in sync mode def wait(self, *_, **__): + # type: (*Any, **Any) -> Type[CeleryTaskTimeoutError] raise CeleryTaskTimeoutError def ready(self, *_, **__): + # type: (*Any, **Any) -> bool return True mocked_task_context = MockTaskContext() @@ -1120,13 +1125,30 @@ def mock_execute_task(*args, **kwargs): func_execute_task(*args, **kwargs) # noqa return mocked_task_context - if isinstance(web_test_app, TestApp): - celery_app.conf.setdefault("PYRAMID_REGISTRY", web_test_app.app.registry) - - return ( + celery_mocks = [ mock.patch(f"{celery_task}.delay", side_effect=mock_execute_task), mock.patch("celery.app.task.Context", return_value=mocked_task_context), - ) + ] + + class MockCeleryApp(celery.Celery): + @property + def conf(self): + return {"PYRAMID_REGISTRY": web_test_app.app.registry} + + if isinstance(web_test_app, TestApp): + # WARNING: + # It is very critical that the mock referencing to the above test app registry is applied as context manager. + # Since the Celery app is created globally by 'pyramid_celery', omitting to reset the mock could cause the + # Pyramid registry and the underlying settings to be make its way across tests references. This can cause + # unexpected side-effects if configurations vary between tests to evaluate distinct use cases. Using the + # context manager, we make sure that the mock should live only during the corresponding 'with' block lifetime, + # which should be limited to more controlled duration than globally. + celery_task_module = celery_task.rsplit(".", 1)[0] + celery_mocks.append( + mock.patch(f"{celery_task_module}.{celery_app_name}", new_callable=MockCeleryApp, spec=celery.Celery) + ) + + return celery_mocks @contextlib.contextmanager diff --git a/weaver/wps/utils.py b/weaver/wps/utils.py index 552302e96..315ab88f6 100644 --- a/weaver/wps/utils.py +++ b/weaver/wps/utils.py @@ -156,9 +156,9 @@ def get_wps_output_context(request): def get_wps_local_status_location(url_status_location, container, must_exist=True): # type: (str, AnySettingsContainer, bool) -> Optional[str] """ - Attempts to retrieve the local XML file path corresponding to the WPS status location as URL. + Attempts to retrieve the local :term:`XML` file path corresponding to the :term:`WPS` status location as URL. - :param url_status_location: URL reference pointing to some WPS status location XML. + :param url_status_location: URL reference pointing to some WPS status location :term:`XML`. :param container: any settings container to map configured local paths. :param must_exist: return only existing path if enabled, otherwise return the parsed value without validation. :returns: found local file path if it exists, ``None`` otherwise.