Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add test to detect celery/pyramid-registry problematic resolution #590

Merged
merged 18 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Fixes:
- Fix default `XML` format resolution for `WPS` endpoint when no ``Accept`` header or ``format``/``f`` query parameter
is provided and that the request is submitted from a Web Browser, which involves additional control logic to select
the applicable ``Content-Type`` for the response.
- Fix pre-forked ``celery`` worker process inconsistently resolving the ``pyramid`` registry applied
by ``pyramid_celery`` after worker restart.

.. _changes_4.36.0:

Expand Down
163 changes: 163 additions & 0 deletions tests/functional/test_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""
Tests to validate that :mod:`celery` execution behaves as intended.
"""
import contextlib
import inspect
import json
import os
import subprocess
import sys
import tempfile
import time

from tests.utils import get_settings_from_testapp, get_test_weaver_app, setup_config_with_mongodb
from weaver.config import WeaverConfiguration
from weaver.database import get_db
from weaver.database.mongodb import get_mongodb_connection
from weaver.wps.utils import get_wps_url


def test_celery_registry_resolution():
python_bin = sys.executable
python_dir = os.path.dirname(python_bin)
debug_path = os.path.expandvars(os.environ["PATH"])
celery_bin = os.path.join(python_dir, "celery")

config = setup_config_with_mongodb(settings={
"weaver.configuration": WeaverConfiguration.HYBRID,
"weaver.wps_output_url": "http://localhost/wps-outputs",
"weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", # nosec: B108 # don't care hardcoded for test
})
webapp = get_test_weaver_app(config=config)
settings = get_settings_from_testapp(webapp)
wps_url = get_wps_url(settings)
job_store = get_db(settings).get_store("jobs")
job1 = job_store.save_job(
task_id="tmp",
process="jsonarray2netcdf",
inputs={"input": {"href": "http://random-dont-care.com/fake.json"}},
)
job2 = job_store.save_job(
task_id="tmp",
process="jsonarray2netcdf",
inputs={"input": {"href": "http://random-dont-care.com/fake.json"}},
)

with contextlib.ExitStack() as stack:
celery_mongo_broker = f"""mongodb://{settings["mongodb.host"]}:{settings["mongodb.port"]}/celery-test"""
cfg_ini = stack.enter_context(tempfile.NamedTemporaryFile(suffix=".ini", mode="w", encoding="utf-8"))
cfg_ini.write(
inspect.cleandoc(f"""
[app:main]
use = egg:weaver
[celery]
broker_url = {celery_mongo_broker}
result_backend = {celery_mongo_broker}
""")
)
cfg_ini.flush()
cfg_ini.seek(0)

celery_process = stack.enter_context(subprocess.Popen(
[
celery_bin,
"-A",
"pyramid_celery.celery_app",
"worker",
"-B",
"-E",
"--ini", cfg_ini.name,
"--loglevel=DEBUG",
"--time-limit", "10",
"--soft-time-limit", "10",
"--detach",
# following will cause an error on any subsequent task
# if registry is not properly retrieved across processes/threads
"--concurrency", "1",
"--max-tasks-per-child", "1",
],
universal_newlines=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PATH": f"{python_dir}:{debug_path}"},
)) # type: subprocess.Popen
celery_stdout, celery_stderr = celery_process.communicate()
celery_output = celery_stdout + celery_stderr
assert all([
msg in celery_output
for msg in
[
"Initiating weaver application",
"Celery runner detected.",
]
])

celery_task_cmd1 = stack.enter_context(subprocess.Popen(
[
celery_bin,
"-b", celery_mongo_broker,
"call",
"-a", json.dumps([str(job1.uuid), wps_url]),
"weaver.processes.execution.execute_process",
],
universal_newlines=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PATH": f"{python_dir}:{debug_path}"},
)) # type: subprocess.Popen
celery_task_cmd2 = stack.enter_context(subprocess.Popen(
[
celery_bin,
"-b", celery_mongo_broker,
"call",
"-a", json.dumps([str(job2.uuid), wps_url]),
"weaver.processes.execution.execute_process",
],
universal_newlines=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PATH": f"{python_dir}:{debug_path}"},
)) # type: subprocess.Popen

task1, _ = celery_task_cmd1.communicate()
time.sleep(1)
task2, _ = celery_task_cmd2.communicate()
time.sleep(1)

celery_mongo_db = get_mongodb_connection({
"mongodb.host": settings["mongodb.host"],
"mongodb.port": settings["mongodb.port"],
"mongodb.db_name": "celery-test",
})
celery_taskmeta = celery_mongo_db.celery_taskmeta
task1_meta = celery_taskmeta.find_one({"_id": task1.strip()})
task2_meta = celery_taskmeta.find_one({"_id": task2.strip()})
task1_result = task1_meta.get("traceback", "") + task1_meta.get("result", "")
task2_result = task2_meta.get("traceback", "") + task2_meta.get("result", "")

# following errors are not necessarily linked directly to celery failing
# however, if all other tests pass except this one, there's a big chance
# it is caused by a celery concurrency/processes/threading issue with the pyramid registry
potential_errors = [
"AttributeError: 'NoneType' object",
"if settings.get(setting, None) is None",
"get_registry()",
"get_settings()",
"get_db()",
"get_registry(app)",
"get_settings(app)",
"get_db(app)",
"get_registry(celery_app)",
"get_settings(celery_app)",
"get_db(celery_app)",
"get_registry(None)",
"get_settings(None)",
"get_db(None)",
]
task1_found_errors = [err_msg for err_msg in potential_errors if err_msg in task1_result]
task2_found_errors = [err_msg for err_msg in potential_errors if err_msg in task2_result]
assert not task1_found_errors, "potential error detected with celery and pyramid registry utilities"
assert not task2_found_errors, "potential error detected with celery and pyramid registry utilities"
3 changes: 2 additions & 1 deletion weaver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
def main(global_config, **settings):
# type: (SettingsType, **Any) -> Router
import weaver.app
from weaver.utils import is_celery

# add flag to disable some unnecessary operations when runner is celery (worker)
settings["weaver.celery"] = sys.argv[0].rsplit("/", 1)[-1] == "celery"
settings["weaver.celery"] = is_celery()
return weaver.app.main(global_config, **settings)


Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def execute_process(task, job_id, wps_url, headers=None):

task_process = get_celery_process()
rss_start = task_process.memory_info().rss
registry = get_registry(None) # local thread, whether locally or dispatched celery
registry = get_registry(app) # local thread, whether locally or dispatched celery
settings = get_settings(registry)
db = get_db(registry, reset_connection=True) # reset the connection because we are in a forked celery process
store = db.get_store(StoreJobs)
Expand Down
35 changes: 32 additions & 3 deletions weaver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from pyramid.settings import asbool, aslist
from pyramid.threadlocal import get_current_registry
from pyramid_beaker import set_cache_regions_from_settings
from pyramid_celery import celery_app as app
from requests import HTTPError as RequestsHTTPError, Response
from requests.structures import CaseInsensitiveDict
from requests_file import FileAdapter
Expand All @@ -64,7 +65,7 @@
from weaver.execute import ExecuteControlOption, ExecuteMode
from weaver.formats import ContentType, get_content_type, get_extension, repr_json
from weaver.status import map_status
from weaver.warning import TimeZoneInfoAlreadySetWarning
from weaver.warning import UndefinedContainerWarning, TimeZoneInfoAlreadySetWarning
from weaver.xml_util import HTML_TREE_BUILDER, XML

try: # refactor in jsonschema==4.18.0
Expand Down Expand Up @@ -457,6 +458,14 @@ def get_any_message(info, default=""):
return (info.get("message") or info.get("description") or info.get("detail") or default).strip()


def is_celery():
# type: () -> bool
"""
Detect if the current application was executed as a :mod:`celery` command.
"""
return sys.argv[0].rsplit("/", 1)[-1] == "celery"


def get_registry(container=None, nothrow=False):
# type: (Optional[AnyRegistryContainer], bool) -> Optional[Registry]
"""
Expand All @@ -468,6 +477,17 @@ def get_registry(container=None, nothrow=False):
return container.registry
if isinstance(container, Registry):
return container
if container is None:
# find 2 parents since 'get_settings' calls 'get_registry' to provide better context
warnings.warn(
f"Function [{get_caller_name()}] called from [{get_caller_name(skip=2)}] "
"did not provide a settings container. Consider providing it explicitly.",
UndefinedContainerWarning,
)
# preemptively check registry in celery if applicable
# avoids error related to forked processes when restarting workers
if container is None and is_celery():
return app.conf.get("PYRAMID_REGISTRY", {})
if isinstance(container, WerkzeugRequest) or container is None:
return get_current_registry()
if nothrow:
Expand Down Expand Up @@ -1508,14 +1528,23 @@ def make_dirs(path, mode=0o755, exist_ok=False):
raise


def get_caller_name(skip=2, base_class=False):
def get_caller_name(skip=0, base_class=False):
# type: (int, bool) -> str
"""
Find the name of a parent caller function or method.

The name is returned with respective formats ``module.class.method`` or ``module.function``.

:param skip: specifies how many levels of stack to skip while getting the caller.
Supposing the following call stack ``main -> func1 -> func2 -> func3 -> get_caller_name``.

Calling ``get_caller_name()`` or ``get_caller_name(skip=1)`` would return the full package location of ``func2``
because it is 1-level higher than were ``get_caller_name`` is called from (inside ``func3``).
Calling ``get_caller_name(skip=0)`` would return ``func3`` directly, and ``func1`` for ``get_caller_name(skip=2)``.

:param skip:
Specifies how many levels of stack to skip for getting the caller.
By default, uses ``skip=1`` to obtain the immediate parent function that called :func:`get_caller_name`,
were ``skip=0`` would be the function itself that called :func:`get_caller_name`.
:param base_class:
Specified if the base class should be returned or the top-most class in case of inheritance
If the caller is not a class, this doesn't do anything.
Expand Down
22 changes: 17 additions & 5 deletions weaver/warning.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,43 @@
"""


class TimeZoneInfoAlreadySetWarning(Warning):
class WeaverWarning(Warning):
"""
Base class of :class:`Warning` defined by :mod:`weaver` package.
"""


class UndefinedContainerWarning(WeaverWarning):
"""
Warn when settings or the registry could not be resolved from an explicit container reference.
"""


class TimeZoneInfoAlreadySetWarning(WeaverWarning):
"""
Warn when trying to obtain a localized time with already defined time-zone info.
"""


class DisabledSSLCertificateVerificationWarning(Warning):
class DisabledSSLCertificateVerificationWarning(WeaverWarning):
"""
Warn when an option to disable SSL certificate verification is employed for some operations.
"""


class UnsupportedOperationWarning(Warning):
class UnsupportedOperationWarning(WeaverWarning):
"""
Warn about an operation not yet implemented or unsupported according to context.
"""


class NonBreakingExceptionWarning(Warning):
class NonBreakingExceptionWarning(WeaverWarning):
"""
Warn about an exception that is handled (ex: caught in try/except block) but still unexpected.
"""


class MissingParameterWarning(Warning):
class MissingParameterWarning(WeaverWarning):
"""
Warn about an expected but missing parameter.
"""
Loading