diff --git a/nb2workflow/nbadapter.py b/nb2workflow/nbadapter.py index c7ee3f28..211976d4 100644 --- a/nb2workflow/nbadapter.py +++ b/nb2workflow/nbadapter.py @@ -253,7 +253,7 @@ def new_tmpdir(self, cache_key=None): newdir = self.tmpdir if ( self.tempdir_cache is not None ) and ( cache_key is not None ): self.tempdir_cache[cache_key] = newdir - + return newdir @property @@ -412,12 +412,12 @@ def update_summary(self, **d): - def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None): + def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None, callback_url=None): t0 = time.time() logstasher.log(dict(origin="nb2workflow.execute", event="starting", parameters=parameters, workflow_name=notebook_short_name(self.notebook_fn), health=current_health())) logger.info("starting job") - exceptions = self._execute(parameters, progress_bar, log_output, inplace, tmpdir_key) + exceptions = self._execute(parameters, progress_bar, log_output, inplace, callback_url=callback_url, tmpdir_key=tmpdir_key) tspent = time.time() - t0 logstasher.log(dict(origin="nb2workflow.execute", @@ -430,7 +430,7 @@ def execute(self, parameters, progress_bar = True, log_output = True, inplace=Fa return exceptions - def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None): + def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, callback_url=None, tmpdir_key=None): if not inplace : tmpdir = self.new_tmpdir(tmpdir_key) @@ -450,7 +450,9 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F tmpdir =os.path.dirname(os.path.realpath(self.notebook_fn)) logger.info("executing inplace, no tmpdir is input dir: %s", tmpdir) - + if callback_url: + self._pass_callback_url(tmpdir, callback_url) + self.update_summary(state="started", parameters=parameters) self.inject_output_gathering() @@ -492,6 +494,17 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F return exceptions + def _pass_callback_url(self, workdir: str, callback_url: str): + """ + save callback_url to file .oda_api_callback in the notebook dir where it can be accessed by ODA API + :param workdir: directory to save notebook in + """ + callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib + callback_file_path = os.path.join(workdir, callback_file) + with open(callback_file_path, 'wt') as output: + print(callback_url, file=output) + logger.info("callback file created: %s", callback_file_path) + def extract_pm_output(self): nb = sb.read_notebook(self.output_notebook_fn) diff --git a/nb2workflow/service.py b/nb2workflow/service.py index 7c91a3de..aca9f2cd 100644 --- a/nb2workflow/service.py +++ b/nb2workflow/service.py @@ -176,8 +176,7 @@ def _run(self): nba = NotebookAdapter(template_nba.notebook_fn, tempdir_cache=app.async_workflow_jobdirs) try: - exceptions = nba.execute(self.params['request_parameters'], tmpdir_key=self.key) - + exceptions = nba.execute(self.params['request_parameters'], callback_url=self.callback, tmpdir_key=self.key) except PapermillWorkflowIncomplete as e: logger.info("found incomplete workflow: %s, rescheduling", repr(e)) @@ -288,33 +287,33 @@ def workflow(target, background=False, async_request=False): print('cache key/value', key, value) if value is None: - async_task = AsyncWorkflow(key=key, - target=target, - params=interpreted_parameters, + async_task = AsyncWorkflow(key=key, + target=target, + params=interpreted_parameters, callback=async_request_callback) app.async_workflows[key] = 'submitted' async_queue.put(async_task) - return make_response(jsonify(workflow_status="submitted", - comment="task created"), + return make_response(jsonify(workflow_status="submitted", + comment="task created"), 201) elif value == 'submitted': - return make_response(jsonify(workflow_status=value, - comment="task is "+value), + return make_response(jsonify(workflow_status=value, + comment="task is "+value), 201) - + elif value == 'started': - return make_response(jsonify(workflow_status=value, + return make_response(jsonify(workflow_status=value, comment="task is "+value, - jobdir=app.async_workflow_jobdirs.get(key)), + jobdir=app.async_workflow_jobdirs.get(key)), 201) else: - return make_response(jsonify(workflow_status="done", - data=value, - comment=""), + return make_response(jsonify(workflow_status="done", + data=value, + comment=""), 200) if len(issues) > 0: diff --git a/tests/test_callback.py b/tests/test_callback.py new file mode 100644 index 00000000..edca8660 --- /dev/null +++ b/tests/test_callback.py @@ -0,0 +1,70 @@ +import pytest +import nb2workflow +import logging +import threading +import time +from os import path +import json + +logger = logging.getLogger(__name__) +status_callback_file = "status.json" + +@pytest.fixture +def app(): + app = nb2workflow.service.app + app.notebook_adapters = nb2workflow.nbadapter.find_notebooks('tests/testfiles') + nb2workflow.service.setup_routes(app) + print("creating app") + return app + + +def test_progress_callback(client): + callback_url = 'file://' + status_callback_file + query_string = dict( + a=20, + _async_request='yes', + _async_request_callback=callback_url) + + r = client.get('/api/v1.0/get/callback', + query_string=query_string) + + assert r.status_code == 201 + + logger.info(r.json) + + from nb2workflow.service import AsyncWorker + + def test_worker_run(): + AsyncWorker('test-worker').run_one() + + test_worker_thread = threading.Thread(target=test_worker_run) + test_worker_thread.start() + + while True: + options = client.get('/api/v1.0/options') + assert options.status_code == 200 + + r = client.get('/api/v1.0/get/callback', + query_string=query_string) + + logger.info('service returns %s %s', r, r.json) + + if r.json['workflow_status'] == 'done': + logger.info('workflow done!') + break + + time.sleep(0.1) + + test_worker_thread.join() + assert 'data' in r.json + assert 'output' in r.json['data'] + assert 'callback' in r.json['data']['output'] + assert r.json['data']['output']['callback'] == callback_url + + workdir = r.json['data']['jobdir'] + with open(path.join(workdir, status_callback_file)) as json_file: + progress_params = json.load(json_file) + + test_data = dict(action='progress', stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message') + assert progress_params == test_data + diff --git a/tests/test_nbadapter.py b/tests/test_nbadapter.py index 2b3e7a09..60c50061 100644 --- a/tests/test_nbadapter.py +++ b/tests/test_nbadapter.py @@ -85,7 +85,7 @@ def test_find_notebooks(caplog): assert 'Ignoring pattern.' in caplog.text nbas = find_notebooks(nb_dir) - assert len(nbas) == 3 + assert len(nbas) == 4 nbas = find_notebooks(nb_dir, pattern=r'.*bool') assert len(nbas) == 1 diff --git a/tests/testfiles/callback.ipynb b/tests/testfiles/callback.ipynb new file mode 100644 index 00000000..2d387196 --- /dev/null +++ b/tests/testfiles/callback.ipynb @@ -0,0 +1,113 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "initial_id", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "a = 10" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "!pip install git+https://github.com/oda-hub/oda_api.git@217-add-functionality-to-send-callback-messages" + ], + "metadata": { + "collapsed": false + }, + "id": "e5b37dc052a7bc" + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ae02114d712f3ebc", + "metadata": { + "collapsed": false, + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "# from os.path import isfile\n", + "# callback_file = \".oda_api_callback\"\n", + "# if isfile(callback_file):\n", + "# with open(callback_file, 'r') as file:\n", + "# callback = file.read().strip()\n", + "# else:\n", + "# callback = ''" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "from oda_api.api import ProgressReporter\n", + "pr = ProgressReporter()\n", + "assert pr.enabled\n", + "pr.report_progress(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')\n", + "callback = pr._callback" + ], + "metadata": { + "collapsed": false + }, + "id": "3c6a5d05bbbd359f" + }, + { + "cell_type": "code", + "execution_count": null, + "id": "256b6a6696ecc58d", + "metadata": { + "collapsed": false, + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [ + "outputs" + ] + }, + "outputs": [], + "source": [ + "callback" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}