Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

111 running long tasks on mmoda #121

Merged
merged 14 commits into from
Jan 10, 2024
Merged
20 changes: 17 additions & 3 deletions nb2workflow/nbadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,13 @@ def update_summary(self, **d):



def execute(self, parameters, progress_bar = True, log_output = True, inplace=False):
def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, callback=None):
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
t0 = time.time()

logstasher.log(dict(origin="nb2workflow.execute", event="starting", parameters=parameters, workflow_name=notebook_short_name(self.notebook_fn), health=current_health()))

logger.info("starting job")
exceptions = self._execute(parameters, progress_bar, log_output, inplace)
exceptions = self._execute(parameters, progress_bar, log_output, inplace, callback=callback)

tspent = time.time() - t0
logstasher.log(dict(origin="nb2workflow.execute",
Expand All @@ -425,7 +426,7 @@ def execute(self, parameters, progress_bar = True, log_output = True, inplace=Fa

return exceptions

def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False):
def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, callback=None):

if not inplace :
tmpdir = self.new_tmpdir()
Expand All @@ -445,6 +446,8 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F
tmpdir =os.path.dirname(os.path.realpath(self.notebook_fn))
logger.info("executing inplace, no tmpdir is input dir: %s", tmpdir)

if callback:
self._pass_callback(tmpdir, callback)

self.update_summary(state="started", parameters=parameters)

Expand Down Expand Up @@ -487,6 +490,17 @@ def _execute(self, parameters, progress_bar = True, log_output = True, inplace=F

return exceptions

def _pass_callback(self, workdir: str, callback: str):
"""
save callback to file .oda_api_callback in the notebook dir were it can be accessed by ODA API
burnout87 marked this conversation as resolved.
Show resolved Hide resolved
:param notebook_path: full path to the notebook
burnout87 marked this conversation as resolved.
Show resolved Hide resolved
"""
callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib
callback_file_path = os.path.join(workdir, callback_file)
with open(callback_file_path, 'wt') as output:
print(callback, file=output)
logger.info("callback file created: %s", callback_file_path)

def extract_pm_output(self):
nb = sb.read_notebook(self.output_notebook_fn)

Expand Down
2 changes: 1 addition & 1 deletion nb2workflow/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _run(self):
nba = NotebookAdapter(template_nba.notebook_fn)

try:
exceptions = nba.execute(self.params['request_parameters'])
exceptions = nba.execute(self.params['request_parameters'], callback=self.callback)
except PapermillWorkflowIncomplete as e:
logger.info("found incomplete workflow: %s, rescheduling", repr(e))

Expand Down
56 changes: 56 additions & 0 deletions tests/test_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest
import nb2workflow
import logging
import threading
import time

logger = logging.getLogger(__name__)

@pytest.fixture
def app():
app = nb2workflow.service.app
app.notebook_adapters = nb2workflow.nbadapter.find_notebooks('tests/testfiles')
nb2workflow.service.setup_routes(app)
print("creating app")
return app


def test_progress_callback(client):
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
callback_url = 'file://callback.json'
query_string = dict(
a=20,
_async_request='yes',
_async_request_callback=callback_url)

r = client.get('/api/v1.0/get/callback',
query_string=query_string)

assert r.status_code == 201

logger.info(r.json)

from nb2workflow.service import AsyncWorker

def test_worker_run():
AsyncWorker('test-worker').run_one()

test_worker_thread = threading.Thread(target=test_worker_run)
test_worker_thread.start()

while True:
options = client.get('/api/v1.0/options')
assert options.status_code == 200

r = client.get('/api/v1.0/get/callback',
query_string=query_string)

logger.info('service returns %s %s', r, r.json)

if r.json['workflow_status'] == 'done':
logger.info('workflow done!')
break

time.sleep(0.1)

test_worker_thread.join()
assert r.json['data']['output']['callback'] == callback_url
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion tests/test_nbadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_find_notebooks(caplog):
assert 'Ignoring pattern.' in caplog.text

nbas = find_notebooks(nb_dir)
assert len(nbas) == 3
assert len(nbas) == 4

nbas = find_notebooks(nb_dir, pattern=r'.*bool')
assert len(nbas) == 1
Expand Down
113 changes: 113 additions & 0 deletions tests/testfiles/callback.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "initial_id",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"a = 10"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"!pip install git+https://github.com/oda-hub/oda_api.git@217-add-functionality-to-send-callback-messages"
],
"metadata": {
"collapsed": false
},
"id": "e5b37dc052a7bc"
},
{
"cell_type": "code",
"execution_count": null,
"id": "ae02114d712f3ebc",
"metadata": {
"collapsed": false,
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": [
"# from os.path import isfile\n",
"# callback_file = \".oda_api_callback\"\n",
"# if isfile(callback_file):\n",
"# with open(callback_file, 'r') as file:\n",
"# callback = file.read().strip()\n",
"# else:\n",
"# callback = ''"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"from oda_api.api import ProgressReporter\n",
"pr = ProgressReporter()\n",
"assert pr.enabled\n",
"pr.report_progress(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')\n",
"callback = pr._callback"
],
"metadata": {
"collapsed": false
},
"id": "3c6a5d05bbbd359f"
},
{
"cell_type": "code",
"execution_count": null,
"id": "256b6a6696ecc58d",
"metadata": {
"collapsed": false,
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": [
"outputs"
]
},
"outputs": [],
"source": [
"callback"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading