Skip to content

Commit

Permalink
Merge pull request #572 from crim-ca/subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored Oct 13, 2023
2 parents 1572fd0 + fe55014 commit d1cf5a9
Show file tree
Hide file tree
Showing 20 changed files with 860 additions and 175 deletions.
23 changes: 18 additions & 5 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,24 @@ Changes

Changes:
--------
- No change.

Fixes:
------
- No change.
- Add `Job` ``subscribers`` support to define `OGC`-compliant callback URLs where HTTP(S) requests will be sent upon
reaching certain `Job` status milestones (resolves `#230 <https://github.com/crim-ca/weaver/issues/230>`_).
- Add email notification support to the new ``subscribers`` definition (extension over `OGC` minimal requirements).
- Deprecate `Job` ``notification_email`` in the `OpenAPI` specification in favor of ``subscribers``, but preserve
parsing of its value if provided in the `JSON` body during `Job` submission for backward compatibility support of
existing servers. The ``Job.notification_email`` attribute is removed to avoid duplicate references.
- Add notification email for `Job` ``started`` status, only available through the ``subscribers`` property.
- Add `CLI` and ``WeaverClient`` options to support ``subscribers`` specification for submitted `Job` execution.
- Add ``{PROCESS_ID}/{STATUS}.mako`` template detection under the ``weaver.wps_email_notify_template_dir`` location
to allow per-`Process` and per-`Job` status email customization.
- Refactor ``weaver/notify.py`` and ``weaver/processes/execution.py`` to avoid mixed references to the
encryption/decryption logic employed for notification emails. All notifications including emails and
callback requests are now completely handled and contained in the ``weaver/notify.py`` module.
- Remove partially duplicate Mako Template definition as hardcoded string and separate file for email notification.

Fixes:
------
- Fix ``weaver.cli`` logger not properly configured when executed from `CLI` causing log messages to not be reported.

.. _changes_4.33.0:

Expand Down
2 changes: 1 addition & 1 deletion docs/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ above :term:`CLI` variations, but it is usually more intuitive to use a Python :
{"href": "http://another.com/data.json"},
],
"single": {
"href": "/workspace/data.xml@mediaType", # note: uploaded to vault automatically before execution
"href": "/workspace/data.xml", # note: uploaded to vault automatically before execution
"type": "text/xml",
}
})
Expand Down
28 changes: 22 additions & 6 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,37 @@ they are optional and which default value or operation is applied in each situat
| The *path* variant **SHOULD** start with ``/`` for appropriate concatenation with ``weaver.url``, although this is
not strictly enforced.
- | ``weaver.wps_metadata_[...]`` (multiple settings)
- | ``weaver.wps_metadata_[...]`` (multiple settings) [:class:`str`]
|
| Metadata fields that will be rendered by either or both the WPS-1/2 and WPS-REST endpoints
(:ref:`GetCapabilities <proc_op_getcap>`).
- | ``weaver.wps_email_[...]`` (multiple settings)
|
| Defines configuration of email notification functionality on job completion.
| Defines configuration of email notification functionality on :term:`Job` status milestones.
|
| Encryption settings as well as custom email templates are available. Default email template defined in
`email-template`_ is employed if none is provided. Email notifications are sent only on job
completion if an email was provided in the :ref:`Execute <proc_op_execute>` request body
(see also: :ref:`Email Notification`).
| Encryption settings as well as custom email template locations are available.
The |default-notify-email-template|_ is employed if none is provided or when specified template
files or directory cannot be resolved.
|
| When looking up for templates within ``weaver.wps_email_notify_template_dir``, the following resolution order is
followed to attempt matching files. The first one that is found will be employed for the notification email.
|
| 1. file ``{TEMPLATE_DIR}/{PROCESS_ID}/{STATUS}.mako`` used for a specific :term:`Process` and :term:`Job` status
| 2. file ``{TEMPLATE_DIR}/{PROCESS_ID}.mako`` used for a specific :term:`Process` but any :term:`Job` status
| 3. file ``{TEMPLATE_DIR}/{weaver.wps_email_notify_template_default}`` used for any combination if specified
| 4. file ``{TEMPLATE_DIR}/default.mako`` used for any combination if an alternate default name was not specified
| 5. file |default-notify-email-template|_ as last resort
|
| Email notifications are sent only when corresponding :term:`Job` status milestones are reached and when
email(s) were provided in the :ref:`Execute <proc_op_execute>` request body. Emails will not be sent if
the request body did not include a subscription to those notifications, even if the templates were configured.
.. seealso::
See :ref:`Notification Subscribers <proc_op_execute_subscribers>` for more details.

.. versionadded:: 4.15
.. versionchanged:: 4.34

- | ``weaver.execute_sync_max_wait = <int>`` [:class:`int`, seconds]
| (default: ``20``)
Expand Down
18 changes: 17 additions & 1 deletion docs/source/processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1394,14 +1394,30 @@ the configured :term:`WPS` output directory.
.. versionadded:: 4.3
Addition of the ``X-WPS-Output-Context`` header.

Email Notification
.. _proc_op_execute_subscribers:

Notification Subscribers
~~~~~~~~~~~~~~~~~~~~~~~~~~

When submitting a :term:`Job` for execution, it is possible to provide the ``notification_email`` field.
Doing so will tell `Weaver` to send an email to the specified address with successful or failure details
upon :term:`Job` completion. The format of the email is configurable from `weaver.ini.example`_ file with
email-specific settings (see: :ref:`Configuration`).

Alternatively to ``notification_email``, the ``subscribers`` field of the :term:`API` can be employed during :term:`Job`
submission. Using this field will take precedence over ``notification_email`` for corresponding email and status
combinations. The :term:`Job` ``subscribers`` allow more fined-grained control over which emails will be sent for
the various combinations of :term:`Job` status milestones.

Furthermore, ``subscribers`` allow specifying URLs where HTTP(S) requests will be sent with
the :ref:`Job Status <proc_op_status>` or :ref:`Job Results <proc_op_result>` contents directly in :term:`JSON` format.
This allows users and/or servers to directly receive the necessary details using a push-notification mechanism instead
of the polling-based method on the :ref:`Job Status <proc_op_status>` endpoint otherwise required to obtain updated
:term:`Job` details.

.. seealso::
Refer to the |oas-rtd|_ of the |exec-req|_ request for all available ``subscribers`` properties.

.. _proc_op_status:
.. _proc_op_monitor:

Expand Down
3 changes: 2 additions & 1 deletion docs/source/references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
.. _request_options.yml.example: ../../../config/request_options.yml.example
.. _Dockerfile-manager: ../../../docker/Dockerfile-manager
.. _Dockerfile-worker: ../../../docker/Dockerfile-worker
.. _email-template: ../../../weaver/wps_restapi/templates/notification_email_example.mako
.. _default-notify-email-template: ../../../weaver/wps_restapi/templates/notification_email_example.mako
.. |default-notify-email-template| replace:: Default Notification Email Mako Template
.. |opensearch-deploy| replace:: OpenSearch Deploy
.. _opensearch-deploy: ../../../tests/opensearch/json/opensearch_deploy.json
.. |opensearch-examples| replace:: OpenSearch Examples
Expand Down
128 changes: 124 additions & 4 deletions tests/functional/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import shutil
import smtplib
import tempfile
import uuid
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -38,9 +39,10 @@
from weaver.cli import AuthHandler, BearerAuthHandler, WeaverClient, main as weaver_cli
from weaver.datatype import DockerAuthentication, Service
from weaver.formats import ContentType, OutputFormat, get_cwl_file_format, repr_json
from weaver.notify import decrypt_email
from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, ProcessSchema
from weaver.processes.types import ProcessType
from weaver.status import Status, StatusCategory
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
from weaver.utils import fully_qualified_name
from weaver.visibility import Visibility
from weaver.wps.utils import get_wps_output_url, map_wps_output_location
Expand All @@ -67,7 +69,10 @@ def setUpClass(cls):
settings.update({
"weaver.vault_dir": tempfile.mkdtemp(prefix="weaver-test-"),
"weaver.wps_output_dir": tempfile.mkdtemp(prefix="weaver-test-"),
"weaver.wps_output_url": "http://random-file-server.com/wps-outputs"
"weaver.wps_output_url": "http://random-file-server.com/wps-outputs",
"weaver.wps_email_notify_smtp_host": "http://fake-email-server",
"weaver.wps_email_notify_port": 1234,
"weaver.wps_email_encrypt_salt": "123456",
})
cls.settings = settings
super(TestWeaverClientBase, cls).setUpClass()
Expand Down Expand Up @@ -425,7 +430,8 @@ def test_describe(self):
), "CLI should not have overridden the process description field."

def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
preload=False, location=False, expect_success=True, mock_exec=True):
preload=False, location=False, expect_success=True,
mock_exec=True, **exec_kwargs):
if isinstance(inputs_param, str):
ref = {"location": inputs_param} if location else {"ref_name": inputs_param}
if preload:
Expand All @@ -440,7 +446,8 @@ def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
mock_exec_func = None
for mock_exec_proc in mocked_execute_celery(func_execute_task=mock_exec_func):
stack_exec.enter_context(mock_exec_proc)
result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process], inputs=inputs_param)
result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process],
inputs=inputs_param, **exec_kwargs)
if expect_success:
assert result.success, result.message + (result.text if result.text else "")
assert "jobID" in result.body
Expand Down Expand Up @@ -566,6 +573,64 @@ def test_execute_with_auto_monitor(self):
# status was periodically pooled and returned 'running' until the final 'succeeded' resumes to download.
raise NotImplementedError

def test_execute_subscribers(self):
"""
Test that specified subscribers are called for relevant :term:`Job` status milestones.
.. versionadded:: 4.34
"""
subscribers = {
"inProgressUri": "https://server.com/started",
"failedUri": "https://server.com/failure",
"successUri": "https://server.com/success",
"inProgressEmail": "[email protected]",
"failedEmail": "[email protected]",
"successEmail": "[email protected]",
}
with contextlib.ExitStack() as subs_stack:
# mock as close as possible to the 'send' operations of respective subscriber types
mocked_requests = subs_stack.enter_context(mock.patch("weaver.notify.request_extra"))
mocked_smtp = subs_stack.enter_context(mock.patch("smtplib.SMTP_SSL", autospec=smtplib.SMTP_SSL))
mocked_smtp.return_value.sendmail.return_value = None # sending worked
mocked_emails = mocked_smtp.return_value.sendmail # shortcut

result = self.run_execute_inputs_schema_variant(
{"message": "test-subscribers"},
subscribers=subscribers,
mock_exec=False, # need to run it to get subscriber calls
)

# NOTE:
# Because JSON of job status are pushed using the OGC schema definitions,
# actual status in the body will be mapped to their standard equivalents.
# For example, "started" will be represented as "running" in the callback request body,
# even though both of these statuses are used internally at distinct execution steps.
running_statuses = JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]
job_id = result.body["jobID"]
expect_outputs = {
"output": {
"href": f"{get_wps_output_url(self.settings)}/{job_id}/output/stdout.log",
"type": ContentType.TEXT_PLAIN,
"format": {"mediaType": ContentType.TEXT_PLAIN},
}
}

# order important, expect status 'started' (in-progress) to occur before 'succeeded'
# call for 'failed' should never happen since 'succeeded' expected, as validated by above method
assert mocked_requests.call_count == 2, "Should not have called both failed/success callback requests"
assert mocked_requests.call_args_list[0].args == ("POST", subscribers["inProgressUri"])
assert mocked_requests.call_args_list[0].kwargs["json"]["status"] in running_statuses # status JSON
assert mocked_requests.call_args_list[1].args == ("POST", subscribers["successUri"])
assert mocked_requests.call_args_list[1].kwargs["json"] == expect_outputs # results JSON

# first argument None is 'from_addr' not configured, this is allowed if provided by 'From' email header
test_proc_byte = self.test_process["Echo"]
assert mocked_emails.call_count == 2, "Should not have sent both failed/success email notifications"
assert mocked_emails.call_args_list[0].args[:2] == (None, subscribers["inProgressEmail"])
assert f"Job {test_proc_byte} Started".encode() in mocked_emails.call_args_list[0].args[-1]
assert mocked_emails.call_args_list[1].args[:2] == (None, subscribers["successEmail"])
assert f"Job {test_proc_byte} Succeeded".encode() in mocked_emails.call_args_list[1].args[-1]

# NOTE:
# For all below '<>_auto_resolve_vault' test cases, the local file referenced in the Execute request body
# should be automatically handled by uploading to the Vault and forwarding the relevant X-Auth-Vault header.
Expand Down Expand Up @@ -1626,6 +1691,61 @@ def test_execute_output_context(self, cli_options, expect_output_context):
wps_path = link.split(wps_url)[-1]
assert wps_path == f"/{expect_output_context}/{job_id}/output/stdout.log"

def test_execute_subscriber_options(self):
"""
Validate that subscriber options are properly combined on the CLI.
Since options are provided by multiple separate arguments on the command line, but are a single JSON definition
in :class:`weaver.cli.WeaverClient`, ensure that mapping is accomplished as expected. Also, validate that those
definitions correspond to the final data structure obtained in the database for later use by the job execution.
.. versionadded:: 4.34
"""
proc = self.test_process["Echo"]
with contextlib.ExitStack() as stack_exec:
for mock_exec_proc in mocked_execute_celery():
stack_exec.enter_context(mock_exec_proc)

test_email_started = "[email protected]"
test_email_failed = "[email protected]"
test_callback_started = "https://server.com/started"
test_callback_success = "https://server.com/success"
lines = mocked_sub_requests(
self.app, run_command,
[
# "weaver",
"execute",
"-u", self.url,
"-p", proc,
"-I", "message='TEST MESSAGE!'",
"-M",
"-T", 2,
"-W", 1,
"-nL",
"-d",
"-F", OutputFormat.JSON_RAW,
"-sEP", test_email_started,
"-sEF", test_email_failed,
"-sCP", test_callback_started,
"-sCS", test_callback_success,
],
trim=False,
entrypoint=weaver_cli,
only_local=True,
)
data = json.loads(lines[0])
assert data["status"] == Status.SUCCEEDED

job = self.job_store.fetch_by_id(data["jobID"])
# to properly compare, we must decrypt emails (encrypt is not deterministic on multiple calls)
subs = copy.deepcopy(job.subscribers)
for sub, email in subs["emails"].items():
subs["emails"][sub] = decrypt_email(email, self.settings)
assert subs == {
"callbacks": {Status.STARTED: test_callback_started, Status.SUCCEEDED: test_callback_success},
"emails": {Status.STARTED: test_email_started, Status.FAILED: test_email_failed},
}, "Job subscribers should be as submitted, after combining CLI options, without extra or missing ones."

def test_execute_help_details(self):
"""
Verify that formatting of the execute operation help provides multiple paragraphs with more details.
Expand Down
Loading

0 comments on commit d1cf5a9

Please sign in to comment.