Skip to content

Commit

Permalink
111 running long tasks on mmoda (#121)
Browse files Browse the repository at this point in the history
* saving callback URL to notebook working dir

* callback test updated

* fixing test_find_notebooks

* parameter callback renamed to callback_url, function _pass_callback renamed to _pass_callback_url

* _pass_callback_url parameter comments updated

* _pass_callback_url parameter comments updated

* bugfixing after refactoring

* verifying progress report from callback.ipynb

* trying to fix issue problem with outdated version of owlready2

* extra asserts added to tests/test_callback.py

* merging changes from master
  • Loading branch information
okolo authored Jan 10, 2024
1 parent 33de39b commit 2d54b5b
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 21 deletions.
23 changes: 18 additions & 5 deletions nb2workflow/nbadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def new_tmpdir(self, cache_key=None):
newdir = self.tmpdir
if ( self.tempdir_cache is not None ) and ( cache_key is not None ):
self.tempdir_cache[cache_key] = newdir

return newdir

@property
Expand Down Expand Up @@ -412,12 +412,12 @@ def update_summary(self, **d):



def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None):
def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None, callback_url=None):
t0 = time.time()
logstasher.log(dict(origin="nb2workflow.execute", event="starting", parameters=parameters, workflow_name=notebook_short_name(self.notebook_fn), health=current_health()))

logger.info("starting job")
exceptions = self._execute(parameters, progress_bar, log_output, inplace, tmpdir_key)
exceptions = self._execute(parameters, progress_bar, log_output, inplace, callback_url=callback_url, tmpdir_key=tmpdir_key)

tspent = time.time() - t0
logstasher.log(dict(origin="nb2workflow.execute",
Expand All @@ -430,7 +430,7 @@ def execute(self, parameters, progress_bar = True, log_output = True, inplace=Fa

return exceptions

def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None):
def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, callback_url=None, tmpdir_key=None):

if not inplace :
tmpdir = self.new_tmpdir(tmpdir_key)
Expand All @@ -450,7 +450,9 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F
tmpdir =os.path.dirname(os.path.realpath(self.notebook_fn))
logger.info("executing inplace, no tmpdir is input dir: %s", tmpdir)


if callback_url:
self._pass_callback_url(tmpdir, callback_url)

self.update_summary(state="started", parameters=parameters)

self.inject_output_gathering()
Expand Down Expand Up @@ -492,6 +494,17 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F

return exceptions

def _pass_callback_url(self, workdir: str, callback_url: str):
"""
save callback_url to file .oda_api_callback in the notebook dir where it can be accessed by ODA API
:param workdir: directory to save notebook in
"""
callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib
callback_file_path = os.path.join(workdir, callback_file)
with open(callback_file_path, 'wt') as output:
print(callback_url, file=output)
logger.info("callback file created: %s", callback_file_path)

def extract_pm_output(self):
nb = sb.read_notebook(self.output_notebook_fn)

Expand Down
29 changes: 14 additions & 15 deletions nb2workflow/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ def _run(self):
nba = NotebookAdapter(template_nba.notebook_fn, tempdir_cache=app.async_workflow_jobdirs)

try:
exceptions = nba.execute(self.params['request_parameters'], tmpdir_key=self.key)

exceptions = nba.execute(self.params['request_parameters'], callback_url=self.callback, tmpdir_key=self.key)
except PapermillWorkflowIncomplete as e:
logger.info("found incomplete workflow: %s, rescheduling", repr(e))

Expand Down Expand Up @@ -288,33 +287,33 @@ def workflow(target, background=False, async_request=False):
print('cache key/value', key, value)

if value is None:
async_task = AsyncWorkflow(key=key,
target=target,
params=interpreted_parameters,
async_task = AsyncWorkflow(key=key,
target=target,
params=interpreted_parameters,
callback=async_request_callback)

app.async_workflows[key] = 'submitted'
async_queue.put(async_task)

return make_response(jsonify(workflow_status="submitted",
comment="task created"),
return make_response(jsonify(workflow_status="submitted",
comment="task created"),
201)

elif value == 'submitted':
return make_response(jsonify(workflow_status=value,
comment="task is "+value),
return make_response(jsonify(workflow_status=value,
comment="task is "+value),
201)

elif value == 'started':
return make_response(jsonify(workflow_status=value,
return make_response(jsonify(workflow_status=value,
comment="task is "+value,
jobdir=app.async_workflow_jobdirs.get(key)),
jobdir=app.async_workflow_jobdirs.get(key)),
201)

else:
return make_response(jsonify(workflow_status="done",
data=value,
comment=""),
return make_response(jsonify(workflow_status="done",
data=value,
comment=""),
200)

if len(issues) > 0:
Expand Down
70 changes: 70 additions & 0 deletions tests/test_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest
import nb2workflow
import logging
import threading
import time
from os import path
import json

logger = logging.getLogger(__name__)
status_callback_file = "status.json"

@pytest.fixture
def app():
app = nb2workflow.service.app
app.notebook_adapters = nb2workflow.nbadapter.find_notebooks('tests/testfiles')
nb2workflow.service.setup_routes(app)
print("creating app")
return app


def test_progress_callback(client):
callback_url = 'file://' + status_callback_file
query_string = dict(
a=20,
_async_request='yes',
_async_request_callback=callback_url)

r = client.get('/api/v1.0/get/callback',
query_string=query_string)

assert r.status_code == 201

logger.info(r.json)

from nb2workflow.service import AsyncWorker

def test_worker_run():
AsyncWorker('test-worker').run_one()

test_worker_thread = threading.Thread(target=test_worker_run)
test_worker_thread.start()

while True:
options = client.get('/api/v1.0/options')
assert options.status_code == 200

r = client.get('/api/v1.0/get/callback',
query_string=query_string)

logger.info('service returns %s %s', r, r.json)

if r.json['workflow_status'] == 'done':
logger.info('workflow done!')
break

time.sleep(0.1)

test_worker_thread.join()
assert 'data' in r.json
assert 'output' in r.json['data']
assert 'callback' in r.json['data']['output']
assert r.json['data']['output']['callback'] == callback_url

workdir = r.json['data']['jobdir']
with open(path.join(workdir, status_callback_file)) as json_file:
progress_params = json.load(json_file)

test_data = dict(action='progress', stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')
assert progress_params == test_data

2 changes: 1 addition & 1 deletion tests/test_nbadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_find_notebooks(caplog):
assert 'Ignoring pattern.' in caplog.text

nbas = find_notebooks(nb_dir)
assert len(nbas) == 3
assert len(nbas) == 4

nbas = find_notebooks(nb_dir, pattern=r'.*bool')
assert len(nbas) == 1
Expand Down
113 changes: 113 additions & 0 deletions tests/testfiles/callback.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "initial_id",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"a = 10"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"!pip install git+https://github.com/oda-hub/oda_api.git@217-add-functionality-to-send-callback-messages"
],
"metadata": {
"collapsed": false
},
"id": "e5b37dc052a7bc"
},
{
"cell_type": "code",
"execution_count": null,
"id": "ae02114d712f3ebc",
"metadata": {
"collapsed": false,
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": [
"# from os.path import isfile\n",
"# callback_file = \".oda_api_callback\"\n",
"# if isfile(callback_file):\n",
"# with open(callback_file, 'r') as file:\n",
"# callback = file.read().strip()\n",
"# else:\n",
"# callback = ''"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"from oda_api.api import ProgressReporter\n",
"pr = ProgressReporter()\n",
"assert pr.enabled\n",
"pr.report_progress(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')\n",
"callback = pr._callback"
],
"metadata": {
"collapsed": false
},
"id": "3c6a5d05bbbd359f"
},
{
"cell_type": "code",
"execution_count": null,
"id": "256b6a6696ecc58d",
"metadata": {
"collapsed": false,
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": [
"outputs"
]
},
"outputs": [],
"source": [
"callback"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit 2d54b5b

Please sign in to comment.