Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add test to detect celery/pyramid-registry problematic resolution #590

Merged
merged 18 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Fixes:
- Fix default `XML` format resolution for `WPS` endpoint when no ``Accept`` header or ``format``/``f`` query parameter
is provided and that the request is submitted from a Web Browser, which involves additional control logic to select
the applicable ``Content-Type`` for the response.
- Fix pre-forked ``celery`` worker process inconsistently resolving the ``pyramid`` registry applied
by ``pyramid_celery`` after worker restart.

.. _changes_4.36.0:

Expand Down
187 changes: 187 additions & 0 deletions tests/functional/test_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
Tests to validate that :mod:`celery` execution behaves as intended.
"""
import contextlib
import inspect
import json
import os
import subprocess
import sys
import tempfile
from typing import TYPE_CHECKING

from tests.utils import get_settings_from_testapp, get_test_weaver_app, setup_config_with_mongodb
from weaver.config import WeaverConfiguration
from weaver.database import get_db
from weaver.database.mongodb import get_mongodb_connection
from weaver.utils import retry_on_condition
from weaver.wps.utils import get_wps_url

if TYPE_CHECKING:
from pymongo.collection import Collection


def is_attribute_none(exception):
# type: (Exception) -> bool
return isinstance(exception, AttributeError) and "None" in str(exception)


def get_taskmeta_output(taskmeta_collection, output):
# type: (Collection, str) -> str
taskmeta = taskmeta_collection.find_one({"_id": output.strip()})
return taskmeta.get("traceback", "") + taskmeta.get("result", "")


def test_celery_registry_resolution():
python_bin = sys.executable
python_dir = os.path.dirname(python_bin)
debug_path = os.path.expandvars(os.environ["PATH"])
celery_bin = os.path.join(python_dir, "celery")

config = setup_config_with_mongodb(settings={
"weaver.configuration": WeaverConfiguration.HYBRID,
"weaver.wps_output_url": "http://localhost/wps-outputs",
"weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", # nosec: B108 # don't care hardcoded for test
})
webapp = get_test_weaver_app(config=config)
settings = get_settings_from_testapp(webapp)
wps_url = get_wps_url(settings)
job_store = get_db(settings).get_store("jobs")
job1 = job_store.save_job(
task_id="tmp",
process="jsonarray2netcdf",
inputs={"input": {"href": "http://random-dont-care.com/fake.json"}},
)
job2 = job_store.save_job(
task_id="tmp",
process="jsonarray2netcdf",
inputs={"input": {"href": "http://random-dont-care.com/fake.json"}},
)

with contextlib.ExitStack() as stack:
celery_mongo_broker = f"""mongodb://{settings["mongodb.host"]}:{settings["mongodb.port"]}/celery-test"""
cfg_ini = stack.enter_context(tempfile.NamedTemporaryFile(suffix=".ini", mode="w", encoding="utf-8"))
cfg_ini.write(
inspect.cleandoc(f"""
[app:main]
use = egg:weaver
[celery]
broker_url = {celery_mongo_broker}
result_backend = {celery_mongo_broker}
""")
)
cfg_ini.flush()
cfg_ini.seek(0)

celery_process = stack.enter_context(subprocess.Popen(
[
celery_bin,
"-A",
"pyramid_celery.celery_app",
"worker",
"-B",
"-E",
"--ini", cfg_ini.name,
"--loglevel=DEBUG",
"--time-limit", "10",
"--soft-time-limit", "10",
"--detach",
# following will cause an error on any subsequent task
# if registry is not properly retrieved across processes/threads
"--concurrency", "1",
"--max-tasks-per-child", "1",
],
universal_newlines=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PATH": f"{python_dir}:{debug_path}"},
)) # type: subprocess.Popen
celery_stdout, celery_stderr = celery_process.communicate()
celery_output = celery_stdout + celery_stderr
assert "Traceback" not in celery_output, "Unhandled error at Weaver/Celery startup. Cannot resume test."
assert all([
msg in celery_output
for msg in
[
"Initiating weaver application",
"Celery runner detected.",
]
])

celery_task_cmd1 = stack.enter_context(subprocess.Popen(
[
celery_bin,
"-b", celery_mongo_broker,
"call",
"-a", json.dumps([str(job1.uuid), wps_url]),
"weaver.processes.execution.execute_process",
],
universal_newlines=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PATH": f"{python_dir}:{debug_path}"},
)) # type: subprocess.Popen
celery_task_cmd2 = stack.enter_context(subprocess.Popen(
[
celery_bin,
"-b", celery_mongo_broker,
"call",
"-a", json.dumps([str(job2.uuid), wps_url]),
"weaver.processes.execution.execute_process",
],
universal_newlines=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PATH": f"{python_dir}:{debug_path}"},
)) # type: subprocess.Popen

task1_output, _ = retry_on_condition(
lambda: celery_task_cmd1.communicate(),
condition=is_attribute_none, retries=5, interval=1,
)
task2_output, _ = retry_on_condition(
lambda: celery_task_cmd2.communicate(),
condition=is_attribute_none, retries=5, interval=1,
)

celery_mongo_db = get_mongodb_connection({
"mongodb.host": settings["mongodb.host"],
"mongodb.port": settings["mongodb.port"],
"mongodb.db_name": "celery-test",
})
celery_taskmeta = celery_mongo_db.celery_taskmeta
task1_result = retry_on_condition(
get_taskmeta_output, celery_taskmeta, task1_output,
condition=is_attribute_none, retries=5, interval=1,
)
task2_result = retry_on_condition(
get_taskmeta_output, celery_taskmeta, task2_output,
condition=is_attribute_none, retries=5, interval=1,
)

# following errors are not necessarily linked directly to celery failing
# however, if all other tests pass except this one, there's a big chance
# it is caused by a celery concurrency/processes/threading issue with the pyramid registry
potential_errors = [
"AttributeError: 'NoneType' object",
"if settings.get(setting, None) is None",
"get_registry()",
"get_settings()",
"get_db()",
"get_registry(app)",
"get_settings(app)",
"get_db(app)",
"get_registry(celery_app)",
"get_settings(celery_app)",
"get_db(celery_app)",
"get_registry(None)",
"get_settings(None)",
"get_db(None)",
]
task1_found_errors = [err_msg for err_msg in potential_errors if err_msg in task1_result]
task2_found_errors = [err_msg for err_msg in potential_errors if err_msg in task2_result]
assert not task1_found_errors, "potential error detected with celery and pyramid registry utilities"
assert not task2_found_errors, "potential error detected with celery and pyramid registry utilities"
14 changes: 9 additions & 5 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
get_settings_from_config_ini,
get_settings_from_testapp,
get_test_weaver_app,
mocked_dismiss_process,
mocked_execute_celery,
mocked_file_server,
mocked_sub_requests,
Expand Down Expand Up @@ -451,10 +452,13 @@ def clean_test_processes(cls, allowed_codes=frozenset([HTTPOk.code, HTTPNotFound
headers=cls.headers, cookies=cls.cookies,
ignore_errors=True, log_enabled=False)
cls.assert_response(resp, allowed_codes, message="Failed cleanup of test processes jobs!")
for job in resp.json.get("jobs", []):
cls.request("DELETE", f"{path}/{job}",
headers=cls.headers, cookies=cls.cookies,
ignore_errors=True, log_enabled=False)
with contextlib.ExitStack() as stack:
if cls.is_webtest():
stack.enter_context(mocked_dismiss_process())
for job in resp.json.get("jobs", []):
cls.request("DELETE", f"{path}/{job}",
headers=cls.headers, cookies=cls.cookies,
ignore_errors=True, log_enabled=False)

# then clean the actual process
path = f"/processes/{process_info.id}"
Expand Down Expand Up @@ -818,7 +822,7 @@ def workflow_runner(self,
stack_exec.enter_context(mock.patch(data_source_use, side_effect=self.mock_get_data_source_from_url))
if self.is_webtest():
# mock execution when running on local Web Test app since no Celery runner is available
for mock_exec in mocked_execute_celery():
for mock_exec in mocked_execute_celery(web_test_app=self.app):
stack_exec.enter_context(mock_exec)
# mock HTTP HEAD request to validate WPS output access (see 'setUpClass' details)
mock_req = stack_exec.enter_context(mocked_wps_output(self.settings, mock_head=True, mock_get=False))
Expand Down
57 changes: 45 additions & 12 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import TYPE_CHECKING, overload

# Note: do NOT import 'boto3' here otherwise 'moto' will not be able to mock it effectively
import celery
import mock
import moto
import pkg_resources
Expand Down Expand Up @@ -58,7 +59,7 @@
from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, load_pywps_config

if TYPE_CHECKING:
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Type, TypeVar, Union
from typing_extensions import Literal

from mypy_boto3_s3.client import S3Client
Expand Down Expand Up @@ -1047,8 +1048,12 @@ def mocked_wps_output(settings, # type: SettingsType
)


def mocked_execute_celery(celery_task="weaver.processes.execution.execute_process", func_execute_task=None):
# type: (str, Optional[Callable[[...], Any]]) -> Iterable[MockPatch]
def mocked_execute_celery(
celery_task="weaver.processes.execution.execute_process", # type: str
func_execute_task=None, # type: Optional[Callable[[...], Any]]
web_test_app=None, # type: Optional[TestApp]
celery_app_name="app", # type: str
): # type: (...) -> Iterable[MockPatch]
"""
Contextual mock of a task execution to run locally instead of dispatched :mod:`celery` worker.

Expand All @@ -1072,11 +1077,17 @@ def mocked_execute_celery(celery_task="weaver.processes.execution.execute_proces
should omit the input argument for the :class:`celery.task.Task` that will not be automatically inserted.
The return value is ignored, as the mocked :class:`celery.task.Task` is always returned instead.
If not provided, the function referred by :paramref:`celery_task` is imported and called directly.
:param web_test_app:
Test web application employed to execute tasks that would normally be dispatched to a :mod:`celery` worker.
If provided, ensures that references to :mod:`celery` that would normally look for the :mod:`pyramid` registry
will find the one provided by this test application, and all relevant settings set for it.
:param celery_app_name:
Name of the :mod:`celery` application or imported alias within the module referenced by :paramref:`celery_task`.
"""

class MockTask(object):
class MockTaskContext(object):
"""
Mocks the Celery Task for testing.
Mocks the :class:`celery.app.task.Context` for testing.

Mocks call ``self.request.id`` in :func:`weaver.processes.execution.execute_process` and
call ``result.id`` in :func:`weaver.processes.execution.submit_job_handler`.
Expand All @@ -1094,33 +1105,55 @@ def id(self):
# all following methods return what would be returned normally in sync mode

def wait(self, *_, **__):
# type: (*Any, **Any) -> Type[CeleryTaskTimeoutError]
raise CeleryTaskTimeoutError

def ready(self, *_, **__):
# type: (*Any, **Any) -> bool
return True

task = MockTask()
mocked_task_context = MockTaskContext()

def mock_execute_task(*args, **kwargs):
# type: (*Any, **Any) -> MockTask
# type: (*Any, **Any) -> MockTaskContext
if func_execute_task is None:
mod, func = celery_task.rsplit(".", 1)
module = importlib.import_module(mod)
task_func = getattr(module, func)
task_func(*args, **kwargs)
else:
func_execute_task(*args, **kwargs) # noqa
return task
return mocked_task_context

return (
celery_mocks = [
mock.patch(f"{celery_task}.delay", side_effect=mock_execute_task),
mock.patch("celery.app.task.Context", return_value=task)
)
mock.patch("celery.app.task.Context", return_value=mocked_task_context),
]

class MockCeleryApp(celery.Celery):
@property
def conf(self):
return {"PYRAMID_REGISTRY": web_test_app.app.registry}

if isinstance(web_test_app, TestApp):
# WARNING:
# It is very critical that the mock referencing to the above test app registry is applied as context manager.
# Since the Celery app is created globally by 'pyramid_celery', omitting to reset the mock could cause the
# Pyramid registry and the underlying settings to be make its way across tests references. This can cause
# unexpected side-effects if configurations vary between tests to evaluate distinct use cases. Using the
# context manager, we make sure that the mock should live only during the corresponding 'with' block lifetime,
# which should be limited to more controlled duration than globally.
celery_task_module = celery_task.rsplit(".", 1)[0]
celery_mocks.append(
mock.patch(f"{celery_task_module}.{celery_app_name}", new_callable=MockCeleryApp, spec=celery.Celery)
)

return celery_mocks


@contextlib.contextmanager
def mocked_dismiss_process():
# type: () -> mock.MagicMock
# type: () -> contextlib.AbstractContextManager[mock.MagicMock]
"""
Mock operations called to terminate :mod:`Celery` tasks.

Expand Down
3 changes: 2 additions & 1 deletion weaver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
def main(global_config, **settings):
# type: (SettingsType, **Any) -> Router
import weaver.app
from weaver.utils import is_celery

Check warning on line 31 in weaver/__init__.py

View check run for this annotation

Codecov / codecov/patch

weaver/__init__.py#L31

Added line #L31 was not covered by tests

# add flag to disable some unnecessary operations when runner is celery (worker)
settings["weaver.celery"] = sys.argv[0].rsplit("/", 1)[-1] == "celery"
settings["weaver.celery"] = is_celery()

Check warning on line 34 in weaver/__init__.py

View check run for this annotation

Codecov / codecov/patch

weaver/__init__.py#L34

Added line #L34 was not covered by tests
return weaver.app.main(global_config, **settings)


Expand Down
2 changes: 1 addition & 1 deletion weaver/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_db(container=None, reset_connection=False):
if not reset_connection and registry and isinstance(getattr(registry, "db", None), MongoDatabase):
return registry.db
database = MongoDatabase(container)
if reset_connection:
if reset_connection and registry:
registry.db = database
return database

Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def execute_process(task, job_id, wps_url, headers=None):

task_process = get_celery_process()
rss_start = task_process.memory_info().rss
registry = get_registry(None) # local thread, whether locally or dispatched celery
registry = get_registry(app) # local thread, whether locally or dispatched celery
settings = get_settings(registry)
db = get_db(registry, reset_connection=True) # reset the connection because we are in a forked celery process
store = db.get_store(StoreJobs)
Expand Down
Loading
Loading