Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

111 running long tasks on mmoda #121

Merged
merged 14 commits into from
Jan 10, 2024
Merged
23 changes: 18 additions & 5 deletions nb2workflow/nbadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def new_tmpdir(self, cache_key=None):
newdir = self.tmpdir
if ( self.tempdir_cache is not None ) and ( cache_key is not None ):
self.tempdir_cache[cache_key] = newdir

return newdir

@property
Expand Down Expand Up @@ -412,12 +412,12 @@ def update_summary(self, **d):



def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None):
def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None, callback_url=None):
t0 = time.time()
logstasher.log(dict(origin="nb2workflow.execute", event="starting", parameters=parameters, workflow_name=notebook_short_name(self.notebook_fn), health=current_health()))

logger.info("starting job")
exceptions = self._execute(parameters, progress_bar, log_output, inplace, tmpdir_key)
exceptions = self._execute(parameters, progress_bar, log_output, inplace, callback_url=callback_url, tmpdir_key=tmpdir_key)

tspent = time.time() - t0
logstasher.log(dict(origin="nb2workflow.execute",
Expand All @@ -430,7 +430,7 @@ def execute(self, parameters, progress_bar = True, log_output = True, inplace=Fa

return exceptions

def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None):
def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, callback_url=None, tmpdir_key=None):

if not inplace :
tmpdir = self.new_tmpdir(tmpdir_key)
Expand All @@ -450,7 +450,9 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F
tmpdir =os.path.dirname(os.path.realpath(self.notebook_fn))
logger.info("executing inplace, no tmpdir is input dir: %s", tmpdir)


if callback_url:
self._pass_callback_url(tmpdir, callback_url)

self.update_summary(state="started", parameters=parameters)

self.inject_output_gathering()
Expand Down Expand Up @@ -492,6 +494,17 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F

return exceptions

def _pass_callback_url(self, workdir: str, callback_url: str):
"""
save callback_url to file .oda_api_callback in the notebook dir where it can be accessed by ODA API
:param workdir: directory to save notebook in
"""
callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib
callback_file_path = os.path.join(workdir, callback_file)
with open(callback_file_path, 'wt') as output:
print(callback_url, file=output)
logger.info("callback file created: %s", callback_file_path)

def extract_pm_output(self):
nb = sb.read_notebook(self.output_notebook_fn)

Expand Down
29 changes: 14 additions & 15 deletions nb2workflow/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ def _run(self):
nba = NotebookAdapter(template_nba.notebook_fn, tempdir_cache=app.async_workflow_jobdirs)

try:
exceptions = nba.execute(self.params['request_parameters'], tmpdir_key=self.key)

exceptions = nba.execute(self.params['request_parameters'], callback_url=self.callback, tmpdir_key=self.key)
except PapermillWorkflowIncomplete as e:
logger.info("found incomplete workflow: %s, rescheduling", repr(e))

Expand Down Expand Up @@ -288,33 +287,33 @@ def workflow(target, background=False, async_request=False):
print('cache key/value', key, value)

if value is None:
async_task = AsyncWorkflow(key=key,
target=target,
params=interpreted_parameters,
async_task = AsyncWorkflow(key=key,
target=target,
params=interpreted_parameters,
callback=async_request_callback)

app.async_workflows[key] = 'submitted'
async_queue.put(async_task)

return make_response(jsonify(workflow_status="submitted",
comment="task created"),
return make_response(jsonify(workflow_status="submitted",
comment="task created"),
201)

elif value == 'submitted':
return make_response(jsonify(workflow_status=value,
comment="task is "+value),
return make_response(jsonify(workflow_status=value,
comment="task is "+value),
201)

elif value == 'started':
return make_response(jsonify(workflow_status=value,
return make_response(jsonify(workflow_status=value,
comment="task is "+value,
jobdir=app.async_workflow_jobdirs.get(key)),
jobdir=app.async_workflow_jobdirs.get(key)),
201)

else:
return make_response(jsonify(workflow_status="done",
data=value,
comment=""),
return make_response(jsonify(workflow_status="done",
data=value,
comment=""),
200)

if len(issues) > 0:
Expand Down
70 changes: 70 additions & 0 deletions tests/test_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest
import nb2workflow
import logging
import threading
import time
from os import path
import json

logger = logging.getLogger(__name__)
status_callback_file = "status.json"

@pytest.fixture
def app():
app = nb2workflow.service.app
app.notebook_adapters = nb2workflow.nbadapter.find_notebooks('tests/testfiles')
nb2workflow.service.setup_routes(app)
print("creating app")
return app


def test_progress_callback(client):
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
callback_url = 'file://' + status_callback_file
query_string = dict(
a=20,
_async_request='yes',
_async_request_callback=callback_url)

r = client.get('/api/v1.0/get/callback',
query_string=query_string)

assert r.status_code == 201

logger.info(r.json)

from nb2workflow.service import AsyncWorker

def test_worker_run():
AsyncWorker('test-worker').run_one()

test_worker_thread = threading.Thread(target=test_worker_run)
test_worker_thread.start()

while True:
options = client.get('/api/v1.0/options')
assert options.status_code == 200

r = client.get('/api/v1.0/get/callback',
query_string=query_string)

logger.info('service returns %s %s', r, r.json)

if r.json['workflow_status'] == 'done':
logger.info('workflow done!')
break

time.sleep(0.1)

test_worker_thread.join()
assert 'data' in r.json
assert 'output' in r.json['data']
assert 'callback' in r.json['data']['output']
assert r.json['data']['output']['callback'] == callback_url
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved

workdir = r.json['data']['jobdir']
with open(path.join(workdir, status_callback_file)) as json_file:
progress_params = json.load(json_file)

test_data = dict(action='progress', stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')
assert progress_params == test_data

2 changes: 1 addition & 1 deletion tests/test_nbadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_find_notebooks(caplog):
assert 'Ignoring pattern.' in caplog.text

nbas = find_notebooks(nb_dir)
assert len(nbas) == 3
assert len(nbas) == 4

nbas = find_notebooks(nb_dir, pattern=r'.*bool')
assert len(nbas) == 1
Expand Down
113 changes: 113 additions & 0 deletions tests/testfiles/callback.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "initial_id",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"a = 10"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"!pip install git+https://github.com/oda-hub/oda_api.git@217-add-functionality-to-send-callback-messages"
],
"metadata": {
"collapsed": false
},
"id": "e5b37dc052a7bc"
},
{
"cell_type": "code",
"execution_count": null,
"id": "ae02114d712f3ebc",
"metadata": {
"collapsed": false,
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": [
"# from os.path import isfile\n",
"# callback_file = \".oda_api_callback\"\n",
"# if isfile(callback_file):\n",
"# with open(callback_file, 'r') as file:\n",
"# callback = file.read().strip()\n",
"# else:\n",
"# callback = ''"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"from oda_api.api import ProgressReporter\n",
"pr = ProgressReporter()\n",
"assert pr.enabled\n",
"pr.report_progress(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')\n",
"callback = pr._callback"
],
"metadata": {
"collapsed": false
},
"id": "3c6a5d05bbbd359f"
},
{
"cell_type": "code",
"execution_count": null,
"id": "256b6a6696ecc58d",
"metadata": {
"collapsed": false,
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": [
"outputs"
]
},
"outputs": [],
"source": [
"callback"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading