diff --git a/nb2workflow/nbadapter.py b/nb2workflow/nbadapter.py index f492378b..c7ee3f28 100644 --- a/nb2workflow/nbadapter.py +++ b/nb2workflow/nbadapter.py @@ -238,18 +238,23 @@ def as_dict(self): class NotebookAdapter: limit_output_attachment_file = None - def __init__(self, notebook_fn): + def __init__(self, notebook_fn, tempdir_cache=None): self.notebook_fn = os.path.abspath(notebook_fn) self.name = notebook_short_name(notebook_fn) + self.tempdir_cache = tempdir_cache logger.debug("notebook adapter for %s", self.notebook_fn) logger.debug(self.extract_parameters()) - def new_tmpdir(self): + def new_tmpdir(self, cache_key=None): logger.debug("tmpdir was "+getattr(self,'_tmpdir','unset')) self._tmpdir = None logger.debug("tmpdir became %s", self._tmpdir) - return self.tmpdir + newdir = self.tmpdir + if ( self.tempdir_cache is not None ) and ( cache_key is not None ): + self.tempdir_cache[cache_key] = newdir + + return newdir @property def tmpdir(self): @@ -407,12 +412,12 @@ def update_summary(self, **d): - def execute(self, parameters, progress_bar = True, log_output = True, inplace=False): + def execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None): t0 = time.time() logstasher.log(dict(origin="nb2workflow.execute", event="starting", parameters=parameters, workflow_name=notebook_short_name(self.notebook_fn), health=current_health())) logger.info("starting job") - exceptions = self._execute(parameters, progress_bar, log_output, inplace) + exceptions = self._execute(parameters, progress_bar, log_output, inplace, tmpdir_key) tspent = time.time() - t0 logstasher.log(dict(origin="nb2workflow.execute", @@ -425,10 +430,10 @@ def execute(self, parameters, progress_bar = True, log_output = True, inplace=Fa return exceptions - def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False): + def _execute(self, parameters, progress_bar = True, log_output = True, inplace=False, tmpdir_key=None): if not inplace : - tmpdir = self.new_tmpdir() + tmpdir = self.new_tmpdir(tmpdir_key) logger.info("new tmpdir: %s", tmpdir) try: diff --git a/nb2workflow/service.py b/nb2workflow/service.py index 0dff2896..7c91a3de 100644 --- a/nb2workflow/service.py +++ b/nb2workflow/service.py @@ -103,6 +103,7 @@ def create_app(): app = create_app() app.async_workflows = dict() +app.async_workflow_jobdirs = dict() app.started_at = datetime.datetime.now() @@ -172,10 +173,11 @@ def _run(self): template_nba = app.notebook_adapters.get(self.target) - nba = NotebookAdapter(template_nba.notebook_fn) + nba = NotebookAdapter(template_nba.notebook_fn, tempdir_cache=app.async_workflow_jobdirs) try: - exceptions = nba.execute(self.params['request_parameters']) + exceptions = nba.execute(self.params['request_parameters'], tmpdir_key=self.key) + except PapermillWorkflowIncomplete as e: logger.info("found incomplete workflow: %s, rescheduling", repr(e)) @@ -286,19 +288,34 @@ def workflow(target, background=False, async_request=False): print('cache key/value', key, value) if value is None: - async_task = AsyncWorkflow( - key=key, target=target, params=interpreted_parameters, callback=async_request_callback) - - async_queue.put(async_task) + async_task = AsyncWorkflow(key=key, + target=target, + params=interpreted_parameters, + callback=async_request_callback) app.async_workflows[key] = 'submitted' - return make_response(jsonify(workflow_status="submitted", comment="task created"), 201) + async_queue.put(async_task) - elif value in ['started', 'submitted']: - return make_response(jsonify(workflow_status=value, comment="task is "+value), 201) + return make_response(jsonify(workflow_status="submitted", + comment="task created"), + 201) + + elif value == 'submitted': + return make_response(jsonify(workflow_status=value, + comment="task is "+value), + 201) + + elif value == 'started': + return make_response(jsonify(workflow_status=value, + comment="task is "+value, + jobdir=app.async_workflow_jobdirs.get(key)), + 201) else: - return make_response(jsonify(workflow_status="done", data=value, comment=""), 200) + return make_response(jsonify(workflow_status="done", + data=value, + comment=""), + 200) if len(issues) > 0: return make_response(jsonify(issues=issues), 400) diff --git a/tests/test_service.py b/tests/test_service.py index b7ea94fb..b03c46b1 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -141,6 +141,10 @@ def test_worker_run(): logger.info('service returns %s %s', r, r.json) + if r.json['workflow_status'] == 'started': + assert 'jobdir' in r.json + logger.info('jobdir is reported as %s', r.json['jobdir']) + if r.json['workflow_status'] == 'done': logger.info('workflow done!') break