Skip to content

Commit

Permalink
ketchup.cancel: Implement new cancel mechanism. (#46)
Browse files Browse the repository at this point in the history
* Shift cancel from ketchup to tomato main loop.

* Docs
  • Loading branch information
Peter Kraus authored Aug 22, 2022
1 parent 5698cfe commit b967231
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 36 deletions.
11 changes: 4 additions & 7 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ single experimental *payload*.
For instructions on how to set **tomato** up for a first run, see the :ref:`quickstart`.


.. warning::

Currently, all *jobs* are executed under the user that started the :mod:`tomato.daemon`.
This means that when the :mod:`tomato.daemon` is running under a different user than the
current user who submits a *job*, this current user (if unpriviledged) will not be able to
cancel their own *job*.

Using :mod:`~tomato.ketchup`
````````````````````````````

Expand Down Expand Up @@ -78,6 +71,7 @@ by loading or ejecting *samples* and marking *pipelines* ready for execution.
q Job has entered the queue.
qw Job is in the queue, waiting for a pipeline to be ready.
r Job is running.
rd Job has been marked for cancellation.
c Job has completed successfully.
ce Job has completed with an error.
cd Job has been cancelled.
Expand All @@ -94,6 +88,9 @@ by loading or ejecting *samples* and marking *pipelines* ready for execution.
>>> ketchup cancel <jobid>
This will mark the `job` for cancellation by setting its status to ``rd``. The
:mod:`tomato.daemon` will then proceed with cancelling the `job`.

*Jobs* submitted to the *queue* will remain in the *queue* until a *pipeline* meets all
of the following criteria:

Expand Down
34 changes: 32 additions & 2 deletions src/tomato/daemon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,29 @@
import logging
from .. import dbhandler

log = logging.getLogger(__name__)


def _kill_tomato_job(proc):
pc = proc.children()
log.warning(f"{proc.name()=}, {proc.pid=}, {pc=}")
if psutil.WINDOWS:
for proc in pc:
if proc.name() in {"conhost.exe"}:
continue
ppc = proc.children()
for proc in ppc:
log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}")
proc.terminate()
gone, alive = psutil.wait_procs(ppc, timeout=10)
elif psutil.POSIX:
for proc in pc:
log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}")
proc.terminate()
gone, alive = psutil.wait_procs(pc, timeout=10)
log.debug(f"{gone=}")
log.debug(f"{alive=}")


def _find_matching_pipelines(pipelines: list, method: list[dict]) -> list[str]:
req_names = set([item["device"] for item in method])
Expand Down Expand Up @@ -40,7 +63,6 @@ def _pipeline_ready_sample(ret: tuple, sample: dict) -> bool:


def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None:
log = logging.getLogger(__name__)
qup = settings["queue"]["path"]
qut = settings["queue"]["type"]
stp = settings["state"]["path"]
Expand All @@ -63,7 +85,7 @@ def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None:
ret = dbhandler.job_get_all(qup, type=qut)
for jobid, jobname, strpl, st in ret:
payload = json.loads(strpl)
if st in ["q", "qw"]:
if st in {"q", "qw"}:
if st == "q":
log.info(f"checking whether job '{jobid}' can ever be matched")
matched_pips = _find_matching_pipelines(pipelines, payload["method"])
Expand Down Expand Up @@ -101,4 +123,12 @@ def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None:
["tomato_job", str(jpath)], start_new_session=sns
)
break
elif st in {"rd"}:
log.warning(f"cancelling a running job {jobid} with pid {pid}")
proc = psutil.Process(pid=pid)
log.debug(f"{proc=}")
_kill_tomato_job(proc)
log.info(f"setting job {jobid} to status 'cd'")
dbhandler.job_set_status(qup, "cd", jobid, type=qut)

time.sleep(settings.get("main loop", 1))
2 changes: 1 addition & 1 deletion src/tomato/dbhandler/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_db_conn(
sql = sqlite3
else:
raise RuntimeError(f"database type '{type}' unsupported")

head, tail = os.path.split(dbpath)
if head != "" and not os.path.exists(head):
log.warning("making local data folder '%s'", head)
Expand Down
2 changes: 1 addition & 1 deletion src/tomato/drivers/dummy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def start_job(
jobqueue
:class:`multiprocessing.Queue` for passing job related data.
logger
:class:`logging.Logger` instance for writing logs.
Expand Down
33 changes: 8 additions & 25 deletions src/tomato/ketchup/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ def cancel(args: Namespace) -> None:
cancelled. Optional arguments include the verbose/quiet switches (``-v/-q``) and
the testing switch (``-t``).
.. note::
The :func:`~ketchup.functions.cancel` only sets the status of the running
job to ``rd``; the actual job cancellation is performed in the
:func:`tomato.daemon.main.main_loop`.
Examples
--------
Expand All @@ -247,27 +253,6 @@ def cancel(args: Namespace) -> None:
Cancelling a completed job will do nothing.
"""

def kill_tomato_job(proc):
pc = proc.children()
log.warning(f"{proc.name()=}, {proc.pid=}, {pc=}")
if psutil.WINDOWS:
for proc in pc:
if proc.name() in {"conhost.exe"}:
continue
ppc = proc.children()
for proc in ppc:
log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}")
proc.terminate()
gone, alive = psutil.wait_procs(ppc, timeout=10)
elif psutil.POSIX:
for proc in pc:
log.debug(f"{proc.name()=}, {proc.pid=}, {proc.children()=}")
proc.terminate()
gone, alive = psutil.wait_procs(pc, timeout=10)
log.debug(f"{gone=}")
log.debug(f"{alive=}")

dirs = setlib.get_dirs(args.test)
settings = setlib.get_settings(dirs.user_config_dir, dirs.user_data_dir)
state = settings["state"]
Expand All @@ -286,10 +271,8 @@ def kill_tomato_job(proc):
running = dbhandler.pipeline_get_running(state["path"], type=state["type"])
for pip, pjobid, pid in running:
if pjobid == jobid:
log.warning(f"cancelling a running job {jobid} with pid {pid}")
proc = psutil.Process(pid=pid)
log.debug(f"{proc=}")
kill_tomato_job(proc)
log.info(f"setting job {jobid} to status 'rd'")
dbhandler.job_set_status(queue["path"], "rd", jobid, type=queue["type"])


def load(args: Namespace) -> None:
Expand Down

0 comments on commit b967231

Please sign in to comment.