Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support callback request subscribers and job notification refactoring #572

Merged
merged 17 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,24 @@ Changes

Changes:
--------
- No change.

Fixes:
------
- No change.
- Add `Job` ``subscribers`` support to define `OGC`-compliant callback URLs where HTTP(S) requests will be sent upon
reaching certain `Job` status milestones (resolves `#230 <https://github.com/crim-ca/weaver/issues/230>`_).
- Add email notification support to the new ``subscribers`` definition (extension over `OGC` minimal requirements).
- Deprecate `Job` ``notification_email`` in the `OpenAPI` specification in favor of ``subscribers``, but preserve
parsing of its value if provided in the `JSON` body during `Job` submission for backward compatibility support of
existing servers. The ``Job.notification_email`` attribute is removed to avoid duplicate references.
- Add notification email for `Job` ``started`` status, only available through the ``subscribers`` property.
- Add `CLI` and ``WeaverClient`` options to support ``subscribers`` specification for submitted `Job` execution.
- Add ``{PROCESS_ID}/{STATUS}.mako`` template detection under the ``weaver.wps_email_notify_template_dir`` location
to allow per-`Process` and per-`Job` status email customization.
- Refactor ``weaver/notify.py`` and ``weaver/processes/execution.py`` to avoid mixed references to the
encryption/decryption logic employed for notification emails. All notifications including emails and
callback requests are now completely handled and contained in the ``weaver/notify.py`` module.
- Remove partially duplicate Mako Template definition as hardcoded string and separate file for email notification.

Fixes:
------
- Fix ``weaver.cli`` logger not properly configured when executed from `CLI` causing log messages to not be reported.

.. _changes_4.33.0:

Expand Down
2 changes: 1 addition & 1 deletion docs/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ above :term:`CLI` variations, but it is usually more intuitive to use a Python :
{"href": "http://another.com/data.json"},
],
"single": {
"href": "/workspace/data.xml@mediaType", # note: uploaded to vault automatically before execution
"href": "/workspace/data.xml", # note: uploaded to vault automatically before execution
"type": "text/xml",
}
})
Expand Down
28 changes: 22 additions & 6 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,37 @@ they are optional and which default value or operation is applied in each situat
| The *path* variant **SHOULD** start with ``/`` for appropriate concatenation with ``weaver.url``, although this is
not strictly enforced.

- | ``weaver.wps_metadata_[...]`` (multiple settings)
- | ``weaver.wps_metadata_[...]`` (multiple settings) [:class:`str`]
|
| Metadata fields that will be rendered by either or both the WPS-1/2 and WPS-REST endpoints
(:ref:`GetCapabilities <proc_op_getcap>`).

- | ``weaver.wps_email_[...]`` (multiple settings)
|
| Defines configuration of email notification functionality on job completion.
| Defines configuration of email notification functionality on :term:`Job` status milestones.
|
| Encryption settings as well as custom email templates are available. Default email template defined in
`email-template`_ is employed if none is provided. Email notifications are sent only on job
completion if an email was provided in the :ref:`Execute <proc_op_execute>` request body
(see also: :ref:`Email Notification`).
| Encryption settings as well as custom email template locations are available.
The |default-notify-email-template|_ is employed if none is provided or when specified template
files or directory cannot be resolved.
|
| When looking up for templates within ``weaver.wps_email_notify_template_dir``, the following resolution order is
followed to attempt matching files. The first one that is found will be employed for the notification email.
|
| 1. file ``{TEMPLATE_DIR}/{PROCESS_ID}/{STATUS}.mako`` used for a specific :term:`Process` and :term:`Job` status
| 2. file ``{TEMPLATE_DIR}/{PROCESS_ID}.mako`` used for a specific :term:`Process` but any :term:`Job` status
| 3. file ``{TEMPLATE_DIR}/{weaver.wps_email_notify_template_default}`` used for any combination if specified
| 4. file ``{TEMPLATE_DIR}/default.mako`` used for any combination if an alternate default name was not specified
| 5. file |default-notify-email-template|_ as last resort
|
| Email notifications are sent only when corresponding :term:`Job` status milestones are reached and when
email(s) were provided in the :ref:`Execute <proc_op_execute>` request body. Emails will not be sent if
the request body did not include a subscription to those notifications, even if the templates were configured.

.. seealso::
See :ref:`Notification Subscribers <proc_op_execute_subscribers>` for more details.

.. versionadded:: 4.15
.. versionchanged:: 4.34

- | ``weaver.execute_sync_max_wait = <int>`` [:class:`int`, seconds]
| (default: ``20``)
Expand Down
18 changes: 17 additions & 1 deletion docs/source/processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1394,14 +1394,30 @@ the configured :term:`WPS` output directory.
.. versionadded:: 4.3
Addition of the ``X-WPS-Output-Context`` header.

Email Notification
.. _proc_op_execute_subscribers:

Notification Subscribers
~~~~~~~~~~~~~~~~~~~~~~~~~~

When submitting a :term:`Job` for execution, it is possible to provide the ``notification_email`` field.
Doing so will tell `Weaver` to send an email to the specified address with successful or failure details
upon :term:`Job` completion. The format of the email is configurable from `weaver.ini.example`_ file with
email-specific settings (see: :ref:`Configuration`).

Alternatively to ``notification_email``, the ``subscribers`` field of the :term:`API` can be employed during :term:`Job`
submission. Using this field will take precedence over ``notification_email`` for corresponding email and status
combinations. The :term:`Job` ``subscribers`` allow more fined-grained control over which emails will be sent for
the various combinations of :term:`Job` status milestones.

Furthermore, ``subscribers`` allow specifying URLs where HTTP(S) requests will be sent with
the :ref:`Job Status <proc_op_status>` or :ref:`Job Results <proc_op_result>` contents directly in :term:`JSON` format.
This allows users and/or servers to directly receive the necessary details using a push-notification mechanism instead
of the polling-based method on the :ref:`Job Status <proc_op_status>` endpoint otherwise required to obtain updated
:term:`Job` details.

.. seealso::
Refer to the |oas-rtd|_ of the |exec-req|_ request for all available ``subscribers`` properties.

.. _proc_op_status:
.. _proc_op_monitor:

Expand Down
3 changes: 2 additions & 1 deletion docs/source/references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
.. _request_options.yml.example: ../../../config/request_options.yml.example
.. _Dockerfile-manager: ../../../docker/Dockerfile-manager
.. _Dockerfile-worker: ../../../docker/Dockerfile-worker
.. _email-template: ../../../weaver/wps_restapi/templates/notification_email_example.mako
.. _default-notify-email-template: ../../../weaver/wps_restapi/templates/notification_email_example.mako
.. |default-notify-email-template| replace:: Default Notification Email Mako Template
.. |opensearch-deploy| replace:: OpenSearch Deploy
.. _opensearch-deploy: ../../../tests/opensearch/json/opensearch_deploy.json
.. |opensearch-examples| replace:: OpenSearch Examples
Expand Down
127 changes: 123 additions & 4 deletions tests/functional/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import shutil
import smtplib
import tempfile
import uuid
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -38,9 +39,10 @@
from weaver.cli import AuthHandler, BearerAuthHandler, WeaverClient, main as weaver_cli
from weaver.datatype import DockerAuthentication, Service
from weaver.formats import ContentType, OutputFormat, get_cwl_file_format, repr_json
from weaver.notify import decrypt_email
from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, ProcessSchema
from weaver.processes.types import ProcessType
from weaver.status import Status, StatusCategory
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
from weaver.utils import fully_qualified_name
from weaver.visibility import Visibility
from weaver.wps.utils import get_wps_output_url, map_wps_output_location
Expand All @@ -67,7 +69,10 @@ def setUpClass(cls):
settings.update({
"weaver.vault_dir": tempfile.mkdtemp(prefix="weaver-test-"),
"weaver.wps_output_dir": tempfile.mkdtemp(prefix="weaver-test-"),
"weaver.wps_output_url": "http://random-file-server.com/wps-outputs"
"weaver.wps_output_url": "http://random-file-server.com/wps-outputs",
"weaver.wps_email_notify_smtp_host": "http://fake-email-server",
"weaver.wps_email_notify_port": 1234,
"weaver.wps_email_encrypt_salt": "123456",
})
cls.settings = settings
super(TestWeaverClientBase, cls).setUpClass()
Expand Down Expand Up @@ -425,7 +430,8 @@ def test_describe(self):
), "CLI should not have overridden the process description field."

def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
preload=False, location=False, expect_success=True, mock_exec=True):
preload=False, location=False, expect_success=True,
mock_exec=True, **exec_kwargs):
if isinstance(inputs_param, str):
ref = {"location": inputs_param} if location else {"ref_name": inputs_param}
if preload:
Expand All @@ -440,7 +446,8 @@ def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
mock_exec_func = None
for mock_exec_proc in mocked_execute_celery(func_execute_task=mock_exec_func):
stack_exec.enter_context(mock_exec_proc)
result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process], inputs=inputs_param)
result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process],
inputs=inputs_param, **exec_kwargs)
if expect_success:
assert result.success, result.message + (result.text if result.text else "")
assert "jobID" in result.body
Expand Down Expand Up @@ -566,6 +573,63 @@ def test_execute_with_auto_monitor(self):
# status was periodically pooled and returned 'running' until the final 'succeeded' resumes to download.
raise NotImplementedError

def test_execute_subscribers(self):
"""
Test that specified subscribers are called for relevant :term:`Job` status milestones.

.. versionadded:: 4.34
"""
subscribers = {
"inProgressUri": "https://server.com/started",
"failedUri": "https://server.com/failure",
"successUri": "https://server.com/success",
"inProgressEmail": "[email protected]",
"failedEmail": "[email protected]",
"successEmail": "[email protected]",
}
with contextlib.ExitStack() as subs_stack:
# mock as close as possible to the 'send' operations of respective subscriber types
mocked_requests = subs_stack.enter_context(mock.patch("weaver.notify.request_extra"))
mocked_smtp = subs_stack.enter_context(mock.patch("smtplib.SMTP_SSL", autospec=smtplib.SMTP_SSL))
mocked_smtp.return_value.sendmail.return_value = None # sending worked
mocked_emails = mocked_smtp.return_value.sendmail # shortcut

result = self.run_execute_inputs_schema_variant(
{"message": "test-subscribers"},
subscribers=subscribers,
mock_exec=False, # need to run it to get subscriber calls
)

# NOTE:
# Because JSON of job status are pushed using the OGC schema definitions,
# actual status in the body will be mapped to their standard equivalents.
# For example, "started" will be represented as "running" in the callback request body,
# even though both of these statuses are used internally at distinct execution steps.
running_statuses = JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]
job_id = result.body["jobID"]
expect_outputs = {
"output": {
"href": f"{get_wps_output_url(self.settings)}/{job_id}/output/stdout.log",
"type": ContentType.TEXT_PLAIN,
"format": {"mediaType": ContentType.TEXT_PLAIN},
}
}

# order important, expect status 'started' (in-progress) to occur before 'succeeded'
# call for 'failed' should never happen since 'succeeded' expected, as validated by above method
assert mocked_requests.call_count == 2, "Should not have called both failed/success callback requests"
assert mocked_requests.call_args_list[0].args == ("POST", subscribers["inProgressUri"])
assert mocked_requests.call_args_list[0].kwargs["json"]["status"] in running_statuses # status JSON
assert mocked_requests.call_args_list[1].args == ("POST", subscribers["successUri"])
assert mocked_requests.call_args_list[1].kwargs["json"] == expect_outputs # results JSON

# first argument None is 'from_addr' not configured, this is allowed if provided by 'From' email header
assert mocked_emails.call_count == 2, "Should not have sent both failed/success email notifications"
assert mocked_emails.call_args_list[0].args[:2] == (None, subscribers["inProgressEmail"])
assert b"Job test-client-Echo Started" in mocked_emails.call_args_list[0].args[-1]
assert mocked_emails.call_args_list[1].args[:2] == (None, subscribers["successEmail"])
assert b"Job test-client-Echo Succeeded" in mocked_emails.call_args_list[1].args[-1]
fmigneault marked this conversation as resolved.
Show resolved Hide resolved

# NOTE:
# For all below '<>_auto_resolve_vault' test cases, the local file referenced in the Execute request body
# should be automatically handled by uploading to the Vault and forwarding the relevant X-Auth-Vault header.
Expand Down Expand Up @@ -1626,6 +1690,61 @@ def test_execute_output_context(self, cli_options, expect_output_context):
wps_path = link.split(wps_url)[-1]
assert wps_path == f"/{expect_output_context}/{job_id}/output/stdout.log"

def test_execute_subscriber_options(self):
"""
Validate that subscriber options are properly combined on the CLI.

Since options are provided by multiple separate arguments on the command line, but are a single JSON definition
in :class:`weaver.cli.WeaverClient`, ensure that mapping is accomplished as expected. Also, validate that those
definitions correspond to the final data structure obtained in the database for later use by the job execution.

.. versionadded:: 4.34
"""
proc = self.test_process["Echo"]
with contextlib.ExitStack() as stack_exec:
for mock_exec_proc in mocked_execute_celery():
stack_exec.enter_context(mock_exec_proc)

test_email_started = "[email protected]"
test_email_failed = "[email protected]"
test_callback_started = "https://server.com/started"
test_callback_success = "https://server.com/success"
lines = mocked_sub_requests(
self.app, run_command,
[
# "weaver",
"execute",
"-u", self.url,
"-p", proc,
"-I", "message='TEST MESSAGE!'",
"-M",
"-T", 2,
"-W", 1,
"-nL",
"-d",
"-F", OutputFormat.JSON_RAW,
"-sEP", test_email_started,
"-sEF", test_email_failed,
"-sCP", test_callback_started,
"-sCS", test_callback_success,
],
trim=False,
entrypoint=weaver_cli,
only_local=True,
)
data = json.loads(lines[0])
assert data["status"] == Status.SUCCEEDED

job = self.job_store.fetch_by_id(data["jobID"])
# to properly compare, we must decrypt emails (encrypt is not deterministic on multiple calls)
subs = copy.deepcopy(job.subscribers)
for sub, email in subs["emails"].items():
subs["emails"][sub] = decrypt_email(email, self.settings)
assert subs == {
"callbacks": {Status.STARTED: test_callback_started, Status.SUCCEEDED: test_callback_success},
"emails": {Status.STARTED: test_email_started, Status.FAILED: test_email_failed},
}, "Job subscribers should be as submitted, after combining CLI options, without extra or missing ones."

def test_execute_help_details(self):
"""
Verify that formatting of the execute operation help provides multiple paragraphs with more details.
Expand Down
12 changes: 6 additions & 6 deletions tests/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest

from weaver.datatype import Job
from weaver.notify import decrypt_email, encrypt_email, notify_job_complete
from weaver.notify import decrypt_email, encrypt_email, notify_job_email
from weaver.status import Status


Expand Down Expand Up @@ -50,7 +50,7 @@ def test_encrypt_decrypt_email_raise(email_func):
pytest.fail("Should have raised for invalid/missing settings")


def test_notify_job_complete():
def test_notify_email_job_complete():
test_url = "https://test-weaver.example.com"
settings = {
"weaver.url": test_url,
Expand All @@ -74,7 +74,7 @@ def test_notify_job_complete():
mock_smtp.return_value.sendmail.return_value = None # sending worked

test_job.status = Status.SUCCEEDED
notify_job_complete(test_job, notify_email, settings)
notify_job_email(test_job, notify_email, settings)
mock_smtp.assert_called_with("xyz.test.com", 12345, timeout=1)
assert mock_smtp.return_value.sendmail.call_args[0][0] == "[email protected]"
assert mock_smtp.return_value.sendmail.call_args[0][1] == notify_email
Expand All @@ -89,7 +89,7 @@ def test_notify_job_complete():
assert test_job_err_url not in message

test_job.status = Status.FAILED
notify_job_complete(test_job, notify_email, settings)
notify_job_email(test_job, notify_email, settings)
assert mock_smtp.return_value.sendmail.call_args[0][0] == "[email protected]"
assert mock_smtp.return_value.sendmail.call_args[0][1] == notify_email
message_encoded = mock_smtp.return_value.sendmail.call_args[0][2]
Expand All @@ -103,7 +103,7 @@ def test_notify_job_complete():
assert test_job_err_url in message


def test_notify_job_complete_custom_template():
def test_notify_job_email_custom_template():
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", suffix=".mako") as email_template_file:
email_template_file.writelines([
"From: Weaver\n",
Expand Down Expand Up @@ -137,7 +137,7 @@ def test_notify_job_complete_custom_template():

with mock.patch("smtplib.SMTP_SSL", autospec=smtplib.SMTP_SSL) as mock_smtp:
mock_smtp.return_value.sendmail.return_value = None # sending worked
notify_job_complete(test_job, notify_email, settings)
notify_job_email(test_job, notify_email, settings)

message_encoded = mock_smtp.return_value.sendmail.call_args[0][2]
message = message_encoded.decode("utf8")
Expand Down
Loading
Loading