From b224469e3ecf0968c0c0d77829a3dffbf44df8cb Mon Sep 17 00:00:00 2001 From: Peter Kraus Date: Mon, 18 Nov 2024 13:19:22 +0100 Subject: [PATCH] `yaml` output and `CompletedJob` (#105) * logdir/appdir/datadir as str * Replace completed Jobs with CompletedJob * fstrings in logging * Don't replace "ce" and "cd". * add yaml argument. * ruff * eliminate with_data * More docs * More print changes. * Docs fix. --- docs/source/quickstart.rst | 28 ++--- src/tomato/__init__.py | 37 ++++-- src/tomato/daemon/__init__.py | 15 ++- src/tomato/daemon/cmd.py | 30 +++-- src/tomato/daemon/driver.py | 6 +- src/tomato/daemon/job.py | 14 ++- src/tomato/ketchup/__init__.py | 201 +++++++++++++------------------ src/tomato/models.py | 18 ++- src/tomato/tomato/__init__.py | 113 ++++++++++++++--- tests/test_01_tomato.py | 2 +- tests/test_02_ketchup.py | 42 +++---- tests/test_03_state.py | 12 +- tests/test_99_example_counter.py | 18 ++- tests/test_99_psutil.py | 4 +- tests/utils.py | 14 +-- 15 files changed, 321 insertions(+), 233 deletions(-) diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index f01884c9..93670823 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -21,12 +21,10 @@ The easiest way to do create this file is using the provided ``tomato init`` com .. code-block:: :linenos: - :emphasize-lines: 3 + :emphasize-lines: 2 kraus@dorje:/home/kraus/$ tomato init - data: null - msg: wrote default settings into /home/kraus/.config/tomato/1.0a1/settings.toml - success: true + Success: wrote default settings into /home/kraus/.config/tomato/1.0a1/settings.toml Where *appdir* is ``/home/kraus/.config/tomato/1.0a1/``. The *appdir* can be specified using the ``--appdir`` argument to **tomato**. @@ -206,12 +204,12 @@ The ``devices`` section of the default *devices file* is shown below: - name: dev-counter driver: "example_counter" address: "example-addr" - channels: [1] + channels: ["1"] pollrate: 1 Here, we define a single device using the :mod:`~tomato.drivers.example_counter` driver. The definition includes the ``address`` of the device (:class:`str` type) as well as an -enumeration of individually-addressable channels the device has (:class:`list[int]`). +enumeration of individually-addressable channels the device has (:class:`list[str]`). For example, the devices shown in the :ref:`concepts flowchart ` above would be defined as: @@ -223,17 +221,17 @@ be defined as: - name: device 1 driver: "driver 123" address: "192.168.1.1" - channels: [1, 2, 3] + channels: ["1", "2", "3"] pollrate: 1 - name: device a driver: "driver abc" address: "COM1" - channels: [100] + channels: ["100"] pollrate: 5 - name: device b driver: "driver abc" address: "COM2" - channels: [100] + channels: ["100"] pollrate: 5 @@ -256,7 +254,7 @@ The default ``pipelines`` section looks as follows: components: - role: counter device: dev-counter - channel: 1 + channel: "1" Here, a single *pipeline* called ``pip-counter`` is defined to contain the one available channel of the ``dev-counter`` device (defined on line 5) shown further above. For multi @@ -290,23 +288,23 @@ above can be defined as: components: - role: dev 123 device: device 1 - channel: 1 + channel: "1" - role: dev abc device: device a - channel: 100 + channel: "100" - name: pipeline b2 components: - role: dev 123 device: device 1 - channel: 2 + channel: "2" - role: dev abc device: device b - channel: 100 + channel: "100" - name: pipeline 3 components: - role: dev 123 device: device 1 - channel: 3 + channel: "3" .. _payfile: diff --git a/src/tomato/__init__.py b/src/tomato/__init__.py index 42216152..57b87833 100644 --- a/src/tomato/__init__.py +++ b/src/tomato/__init__.py @@ -37,12 +37,6 @@ def run_tomato(): subparsers = parser.add_subparsers(dest="subcommand", required=True) status = subparsers.add_parser("status") - status.add_argument( - "--with-data", - action="store_true", - default=False, - help="Return full daemon status. If false, only daemon.status will be returned", - ) status.set_defaults(func=tomato.status) start = subparsers.add_parser("start") @@ -103,6 +97,13 @@ def run_tomato(): type=int, default=3000, ) + p.add_argument( + "--yaml", + "-y", + help="Return output as a yaml.", + action="store_true", + default=False, + ) for p in [start, init, reload]: p.add_argument( @@ -140,7 +141,10 @@ def run_tomato(): context = zmq.Context() if "func" in args: ret = args.func(**vars(args), context=context, verbosity=verbosity) - print(yaml.dump(ret.dict())) + if args.yaml: + print(yaml.dump(ret.dict())) + else: + print(f"{'Success' if ret.success else 'Failure'}: {ret.msg}") def run_ketchup(): @@ -253,6 +257,13 @@ def run_ketchup(): type=int, default=3000, ) + p.add_argument( + "--yaml", + "-y", + help="Return output as a yaml.", + action="store_true", + default=False, + ) args, extras = parser.parse_known_args() args, extras = verbose.parse_known_args(extras, args) @@ -262,11 +273,17 @@ def run_ketchup(): if "func" in args: context = zmq.Context() - status = tomato.status(**vars(args), context=context, with_data=True) + status = tomato.status(**vars(args), context=context) if not status.success: - print(yaml.dump(status.dict())) + if args.yaml: + print(yaml.dump(status.dict())) + else: + print(f"Failure: {status.msg}") else: ret = args.func( **vars(args), verbosity=verbosity, context=context, status=status ) - print(yaml.dump(ret.dict())) + if args.yaml: + print(yaml.dump(ret.dict())) + else: + print(f"{'Success' if ret.success else 'Failure'}: {ret.msg}") diff --git a/src/tomato/daemon/__init__.py b/src/tomato/daemon/__init__.py index 4fdf9b05..c5755531 100644 --- a/src/tomato/daemon/__init__.py +++ b/src/tomato/daemon/__init__.py @@ -50,22 +50,21 @@ def tomato_daemon(): parser = argparse.ArgumentParser(add_help=False) parser.add_argument("--port", "-p", type=int, default=1234) parser.add_argument("--verbosity", "-V", type=int, default=logging.INFO) - parser.add_argument("--appdir", "-A", type=Path, default=Path.cwd()) - parser.add_argument("--logdir", "-L", type=Path, default=Path.cwd()) + parser.add_argument("--appdir", "-A", type=str, default=str(Path.cwd())) + parser.add_argument("--logdir", "-L", type=str, default=str(Path.cwd())) args = parser.parse_args() - settings = toml.load(args.appdir / "settings.toml") + settings = toml.load(Path(args.appdir) / "settings.toml") daemon = Daemon(**vars(args), status="bootstrap", settings=settings) setup_logging(daemon) - logger.info(f"logging set up with verbosity {daemon.verbosity}") + logger.info("logging set up with verbosity %s", daemon.verbosity) logger.debug("attempting to restore daemon state") io.load(daemon) - logger.debug(f"{daemon=}") context = zmq.Context() rep = context.socket(zmq.REP) - logger.debug(f"binding zmq.REP socket on port {daemon.port}") + logger.debug("binding zmq.REP socket on port %d", daemon.port) rep.bind(f"tcp://127.0.0.1:{daemon.port}") poller = zmq.Poller() poller.register(rep, zmq.POLLIN) @@ -93,7 +92,7 @@ def tomato_daemon(): if daemon.status == "stop": for mgr, label in [(jmgr, "job"), (dmgr, "driver")]: if mgr is not None and mgr.do_run: - logger.debug(f"stopping {label} manager thread") + logger.debug("stopping %s manager thread", label) mgr.do_run = False if jmgr is not None: jmgr.join(1e-3) @@ -112,4 +111,4 @@ def tomato_daemon(): if tN - t0 > 10: io.store(daemon) t0 = tN - logger.critical(f"tomato-daemon on port {daemon.port} exiting") + logger.critical("tomato-daemon on port %d is exiting", daemon.port) diff --git a/src/tomato/daemon/cmd.py b/src/tomato/daemon/cmd.py index e2c75682..4a9e257c 100644 --- a/src/tomato/daemon/cmd.py +++ b/src/tomato/daemon/cmd.py @@ -12,7 +12,16 @@ """ -from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job, Component +from tomato.models import ( + Daemon, + Driver, + Device, + Reply, + Pipeline, + Job, + Component, + CompletedJob, +) from pydantic import BaseModel from typing import Any import logging @@ -46,10 +55,7 @@ def merge_pipelines( def status(msg: dict, daemon: Daemon) -> Reply: - if msg.get("with_data", False): - return Reply(success=True, msg=daemon.status, data=daemon) - else: - return Reply(success=True, msg=daemon.status) + return Reply(success=True, msg=daemon.status, data=daemon) def stop(msg: dict, daemon: Daemon) -> Reply: @@ -224,12 +230,22 @@ def job(msg: dict, daemon: Daemon) -> Reply: if jobid is None: jobid = daemon.nextjob daemon.jobs[jobid] = Job(id=jobid, **msg.get("params", {})) - logger.info(f"received job {jobid}") + logger.info("received job %d", jobid) daemon.nextjob += 1 else: for k, v in msg.get("params", {}).items(): - logger.debug(f"setting job {jobid}.{k} to {v}") + logger.debug("setting job parameter %s.%s to %s", jobid, k, v) setattr(daemon.jobs[jobid], k, v) + cjob = daemon.jobs[jobid] + if cjob.status in {"c"}: + daemon.jobs[jobid] = CompletedJob( + id=cjob.id, + status=cjob.status, + completed_at=cjob.completed_at, + jobname=cjob.jobname, + jobpath=cjob.jobpath, + respath=cjob.respath, + ) return Reply(success=True, msg="job updated", data=daemon.jobs[jobid]) diff --git a/src/tomato/daemon/driver.py b/src/tomato/daemon/driver.py index e040dc22..2ef8e994 100644 --- a/src/tomato/daemon/driver.py +++ b/src/tomato/daemon/driver.py @@ -30,7 +30,7 @@ def tomato_driver_bootstrap( req: zmq.Socket, logger: logging.Logger, interface: ModelInterface, driver: str ): logger.debug("getting daemon status") - req.send_pyobj(dict(cmd="status", with_data=True)) + req.send_pyobj(dict(cmd="status")) daemon = req.recv_pyobj().data drv = daemon.drvs[driver] interface.settings = drv.settings @@ -260,7 +260,7 @@ def manager(port: int, timeout: int = 1000): spawned_drivers = dict() while getattr(thread, "do_run"): - req.send_pyobj(dict(cmd="status", with_data=True, sender=sender)) + req.send_pyobj(dict(cmd="status", sender=sender)) events = dict(poller.poll(to)) if req not in events: logger.warning("could not contact tomato-daemon in %d ms", to) @@ -311,7 +311,7 @@ def manager(port: int, timeout: int = 1000): time.sleep(1 if len(spawned_drivers) > 0 else 0.1) logger.info("instructed to quit") - req.send_pyobj(dict(cmd="status", with_data=True, sender=sender)) + req.send_pyobj(dict(cmd="status", sender=sender)) daemon = req.recv_pyobj().data for driver in daemon.drvs.values(): logger.debug("stopping driver '%s' on port %d", driver.name, driver.port) diff --git a/src/tomato/daemon/job.py b/src/tomato/daemon/job.py index b046a5ec..e689f3d3 100644 --- a/src/tomato/daemon/job.py +++ b/src/tomato/daemon/job.py @@ -151,12 +151,14 @@ def check_queued_jobs(daemon: Daemon, req) -> dict[int, list[Pipeline]]: ) if len(matched[job.id]) > 0 and job.status == "q": logger.info( - f"job {job.id} can queue on pips: {[p.name for p in matched[job.id]]}" + "job %d can queue on pips: {%s}", + job.id, + [p.name for p in matched[job.id]], ) req.send_pyobj(dict(cmd="job", id=job.id, params=dict(status="qw"))) ret = req.recv_pyobj() if not ret.success: - logger.error(f"could not set status of job {job.id}") + logger.error("could not set status of job %d", job.id) continue else: job.status = "qw" @@ -238,7 +240,7 @@ def manager(port: int, timeout: int = 500): to = timeout while getattr(thread, "do_run"): logger.debug("tick") - req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager")) + req.send_pyobj(dict(cmd="status", sender=f"{__name__}.manager")) events = dict(poller.poll(to)) if req not in events: logger.warning(f"could not contact tomato-daemon in {to} ms") @@ -399,12 +401,12 @@ def tomato_job() -> None: logger.error("could not set job status for unknown reason") return 1 - logger.info(f"resetting pipeline {pip!r}") + logger.info("resetting pipeline '%s'", pip) params = dict(jobid=None, ready=ready, name=pip) ret = lazy_pirate(pyobj=dict(cmd="pipeline", params=params), **pkwargs) logger.debug(f"{ret=}") if not ret.success: - logger.error(f"could not reset pipeline {pip!r}") + logger.error("could not reset pipeline '%s'", pip) return 1 logger.info("exiting tomato-job") @@ -504,7 +506,7 @@ def job_main_loop( req.connect(f"tcp://127.0.0.1:{port}") while True: - req.send_pyobj(dict(cmd="status", with_data=True, sender=sender)) + req.send_pyobj(dict(cmd="status", sender=sender)) daemon = req.recv_pyobj().data if all([drv.port is not None for drv in daemon.drvs.values()]): break diff --git a/src/tomato/ketchup/__init__.py b/src/tomato/ketchup/__init__.py index 78062dd1..905f3fe1 100644 --- a/src/tomato/ketchup/__init__.py +++ b/src/tomato/ketchup/__init__.py @@ -54,32 +54,22 @@ def submit( >>> # Submit a job: >>> ketchup submit counter_15_0.1.yml - data: - completed_at: null - executed_at: null - id: 1 - jobname: null - payload: - [...] - pid: null - status: q - submitted_at: '2024-03-03 15:16:49.522866+00:00' - msg: job submitted successfully - success: true + Success: job submitted successfully with jobid 1 - >>> # With a job name: + >>> # Submit a job with a job name: >>> ketchup submit counter_15_0.1.yml -j jobname_is_this + Success: job submitted successfully with jobid 1 and jobname 'jobname_is_this' + + >>> # Submit a job with yaml output: + >>> ketchup submit counter_15_0.1.yml -y data: - completed_at: null - executed_at: null - id: 2 - jobname: jobname_is_this - payload: + completed_at: null + executed_at: null + id: 1 [...] - pid: null - status: q - submitted_at: '2024-03-03 15:19:09.856354+00:00' - msg: job submitted successfully + status: q + submitted_at: '2024-11-17 19:39:16.972593+00:00' + msg: job submitted successfully with jobid 1 success: true """ @@ -123,7 +113,10 @@ def submit( req.send_pyobj(dict(cmd="job", id=None, params=params)) ret = req.recv_pyobj() if ret.success: - return Reply(success=True, msg="job submitted successfully", data=ret.data) + msg = f"job submitted successfully with jobid {ret.data.id}" + if ret.data.jobname is not None: + msg += f" and jobname {ret.data.jobname!r}" + return Reply(success=True, msg=msg, data=ret.data) else: return Reply(success=False, msg="unknown error", data=ret.data) @@ -153,62 +146,60 @@ def status( >>> # Get status of a given job >>> ketchup status 1 - data: - 1: - completed_at: null - executed_at: null - id: 1 - jobname: null - payload: - [...] - pid: null - status: qw - submitted_at: '2024-03-03 15:16:49.522866+00:00' - msg: found 1 queued jobs - success: true + Success: found 1 job with status 'qw': [1] >>> # Get status of multiple jobs >>> ketchup status 1 2 + Success: found 2 job with status 'qw': [1] + found 1 job with status 'c' : [2] + + >>> # Get status of non-existent job + >>> ketchup status 3 + Failure: found no jobs with jobids [3] + + >>> # Get a status of a job with yaml output + >>> ketchup status 1 -y data: - 1: - completed_at: null + - completed_at: null executed_at: null id: 1 - jobname: counter - payload: - [...] - pid: null + [...] status: qw - submitted_at: '2024-03-03 15:16:49.522866+00:00' - data: - 1: - completed_at: '2024-03-03 15:27:44.660493+00:00' - executed_at: '2024-03-03 15:27:28.593896+00:00' - id: 1 - jobname: null - payload: - [...] - pid: 514771 - status: c - submitted_at: '2024-03-03 15:23:40.987074+00:00' - msg: found 2 queued jobs + submitted_at: '2024-11-17 17:53:46.133355+00:00' + msg: found 1 job with status ['qw'] success: true - >>> # Get status of non-existent job - >>> ketchup status 3 - data: {} - msg: found 0 queued jobs - success: true """ jobs = status.data.jobs if len(jobs) == 0: return Reply(success=False, msg="job queue is empty") elif len(jobids) == 0: - return Reply(success=True, msg=f"found {len(jobs)} queued jobs", data=jobs) + rets = [job for job in jobs.values()] + else: + rets = [job for job in jobs.values() if job.id in jobids] + if len(rets) == 0: + if len(jobids) == 1: + msg = f"found no job with jobid {jobids}" + else: + msg = f"found no jobs with jobids {jobids}" + return Reply(success=False, msg=msg) + elif len(rets) == 1: + msg = f"found {len(rets)} job with status {[job.status for job in rets]}" else: - rets = {job.id: job for job in jobs.values() if job.id in jobids} - return Reply(success=True, msg=f"found {len(rets)} queued jobs", data=rets) + msg = "" + for st in ["q", "qw", "r", "rd", "c", "cd", "ce"]: + jobst = [j.id for j in rets if j.status == st] + if len(jobst) > 1: + msg += ( + f"found {len(jobst)} jobs with status {st!r:4s}: {jobst}\n " + ) + elif len(jobst) == 1: + msg += ( + f"found {len(jobst)} job with status {st!r:4s}: {jobst}\n " + ) + msg = msg.strip() + return Reply(success=True, msg=msg, data=rets) def cancel( @@ -232,36 +223,20 @@ def cancel( Examples -------- - >>> # Cancel a queued job: - >>> ketchup cancel 2 + >>> # Cancel a job: + >>> ketchup cancel 1 + Success: job [1] cancelled successfully + + >>> # Cancel a job with yaml output: + >>> ketchup cancel 2 -y data: - 2: - completed_at: null + - completed_at: null executed_at: null id: 2 - jobname: null - payload: - [...] - pid: null + [...] status: cd submitted_at: '2024-03-03 15:23:50.702504+00:00' - msg: cancelled jobs successfully - success: true - - >>> # Cancel a running job: - >>> ketchup cancel 3 - data: - 3: - completed_at: null - executed_at: '2024-03-03 15:37:45.635442+00:00' - id: 3 - jobname: null - payload: - [...] - pid: 515678 - status: rd - submitted_at: '2024-03-03 15:37:44.858713+00:00' - msg: cancelled jobs successfully + msg: job [2] cancelled successfully success: true """ @@ -272,7 +247,7 @@ def cancel( req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{port}") - data = {} + data = [] for jobid in jobids: if jobs[jobid].status in {"q", "qw"}: params = dict(status="cd") @@ -283,10 +258,14 @@ def cancel( req.send_pyobj(dict(cmd="job", id=jobid, params=params)) ret = req.recv_pyobj() if ret.success: - data[jobid] = ret.data + data.append(ret.data) else: return Reply(success=False, msg="unknown error", data=ret.data) - return Reply(success=True, msg="cancelled jobs successfully", data=data) + if len(data) == 1: + msg = f"job {[j.id for j in data]} cancelled successfully" + else: + msg = f"jobs {[j.id for j in data]} cancelled successfully" + return Reply(success=True, msg=msg, data=data) def snapshot( @@ -306,10 +285,7 @@ def snapshot( >>> # Create a snapshot in current working directory: >>> ketchup snapshot 3 - data: null - msg: snapshot for job(s) [3] created successfully - success: true - + Success: snapshot for job [3] created successfully """ jobs = status.data.jobs @@ -321,7 +297,11 @@ def snapshot( for jobid in jobids: merge_netcdfs(Path(jobs[jobid].jobpath), Path(f"snapshot.{jobid}.nc")) - return Reply(success=True, msg=f"snapshot for job(s) {jobids} created successfully") + if len(jobids) > 1: + msg = f"snapshot for jobs {jobids} created successfully" + else: + msg = f"snapshot for job {jobids} created successfully" + return Reply(success=True, msg=msg) def search( @@ -339,32 +319,25 @@ def search( Examples -------- - >>> # Create a snapshot in current working directory: + >>> # Search for a valid jobname >>> ketchup search counter - data: - 1: - completed_at: null - executed_at: null - id: 1 - jobname: counter - [...] - status: qw - submitted_at: '2024-03-03 15:40:21.205806+00:00' - msg: jobs matching 'counter' found - success: true + Success: job matching 'counter' found: [1] + >>> # Search for an invalid jobname >>> ketchup search nothing - data: null - msg: no job matching 'nothing' found - success: false + Failure: no job matching 'nothing' found """ jobs = status.data.jobs - ret = {} + ret = [] for jobid, job in jobs.items(): if job.jobname is not None and jobname in job.jobname: - ret[jobid] = job + ret.append(job) if len(ret) > 0: - return Reply(success=True, msg=f"jobs matching {jobname!r} found", data=ret) + if len(ret) == 1: + msg = f"job matching {jobname!r} found: {[j.id for j in ret]}" + else: + msg = f"jobs matching {jobname!r} found: {[j.id for j in ret]}" + return Reply(success=True, msg=msg, data=ret) else: return Reply(success=False, msg=f"no job matching {jobname!r} found") diff --git a/src/tomato/models.py b/src/tomato/models.py index 209c148f..cb0b9fcb 100644 --- a/src/tomato/models.py +++ b/src/tomato/models.py @@ -6,8 +6,7 @@ """ from pydantic import BaseModel, Field, field_validator -from typing import Optional, Any, Mapping, Sequence, Literal -from pathlib import Path +from typing import Optional, Any, Mapping, Sequence, Literal, Union import logging @@ -83,18 +82,27 @@ class Job(BaseModel): snappath: Optional[str] = None +class CompletedJob(BaseModel): + id: int + status: Literal["c", "cd", "ce"] + completed_at: str + jobname: Optional[str] = None + jobpath: str + respath: str + + class Daemon(BaseModel, arbitrary_types_allowed=True): status: Literal["bootstrap", "running", "stop"] port: int verbosity: int - logdir: Path - appdir: Path + logdir: str + appdir: str settings: dict pips: Mapping[str, Pipeline] = Field(default_factory=dict) devs: Mapping[str, Device] = Field(default_factory=dict) drvs: Mapping[str, Driver] = Field(default_factory=dict) cmps: Mapping[str, Component] = Field(default_factory=dict) - jobs: Mapping[int, Job] = Field(default_factory=dict) + jobs: Mapping[int, Union[Job, CompletedJob]] = Field(default_factory=dict) nextjob: int = 1 diff --git a/src/tomato/tomato/__init__.py b/src/tomato/tomato/__init__.py index c5588299..9ad2ea92 100644 --- a/src/tomato/tomato/__init__.py +++ b/src/tomato/tomato/__init__.py @@ -139,18 +139,51 @@ def status( port: int, timeout: int, context: zmq.Context, - with_data: bool = False, **_: dict, ) -> Reply: """ Get status of the tomato daemon. - If ``with_data`` is specified, the state of the daemon will be retrieved. + Examples + -------- + + >>> # Status with a running daemon + >>> tomato status + Success: tomato running on port 1234 + + >>> # Status without a running daemon + >>> tomato status + Failure: tomato not running on port 1234 + + >>> # Status of a running daemon with data + >>> tomato status -y + data: + appdir: /home/kraus/.config/tomato/1.0rc2.dev2 + cmps: + [...] + devs: + [...] + drvs: + [...] + jobs: + [...] + logdir: /home/kraus/.cache/tomato/1.0rc2.dev2/log + nextjob: 1 + pips: + [...] + port: 1234 + settings: + [...] + status: running + verbosity: 20 + msg: tomato running on port 1234 + success: true + """ logger.debug("checking status of tomato on port %d", port) req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{port}") - req.send_pyobj(dict(cmd="status", with_data=with_data, sender=f"{__name__}.status")) + req.send_pyobj(dict(cmd="status", sender=f"{__name__}.status")) poller = zmq.Poller() poller.register(req, zmq.POLLIN) events = dict(poller.poll(timeout)) @@ -175,13 +208,29 @@ def start( port: int, timeout: int, context: zmq.Context, - appdir: Path, - logdir: Path, + appdir: str, + logdir: str, verbosity: int, **_: dict, ) -> Reply: """ Start the tomato daemon. + + Examples + -------- + + >>> # Start tomato + >>> tomato start + Success: tomato on port 1234 reloaded with settings from /home/kraus/.config/tomato/1.0rc2.dev2 + + >>> # Start tomato with a custom port + >>> tomato start -p 1235 + Success: tomato on port 1235 reloaded with settings from /home/kraus/.config/tomato/1.0rc2.dev2 + + >>> # Start tomato with another tomato running + >>> tomato start + Failure: required port 1234 is already in use, choose a different one + """ logger.debug("checking for availability of port %d", port) try: @@ -202,7 +251,7 @@ def start( msg=f"required port {port} is already in use, choose a different one", ) - if not (appdir / "settings.toml").exists(): + if not (Path(appdir) / "settings.toml").exists(): return Reply( success=False, msg=f"settings file not found in {appdir}, run 'tomato init' to create one", @@ -248,6 +297,22 @@ def stop( Stop a running tomato daemon. Will not stop the daemon if any jobs are running. Will create a state snapshot. + + Examples + -------- + + >>> # Stop tomato daemon without running jobs + >>> tomato stop + Success: tomato on port 1234 closed successfully + + >>> # Attempt to stop tomato daemon with running jobs + >>> tomato stop + Failure: jobs are running + + >>> # Attempt to stop tomato daemon which is not running + >>> tomato stop -p 1235 + Failure: tomato not running on port 1235 + """ stat = status(port=port, timeout=timeout, context=context) if stat.success: @@ -255,22 +320,34 @@ def stop( req.connect(f"tcp://127.0.0.1:{port}") req.send_pyobj(dict(cmd="stop")) rep = req.recv_pyobj() - return rep + if rep.success: + return Reply(success=True, msg=f"tomato on port {port} closed successfully") + else: + return rep else: return stat def init( *, - appdir: Path, - datadir: Path, + appdir: str, + datadir: str, **_: dict, ) -> Reply: """ Create a default settings.toml file. Will overwrite any existing settings.toml file. + + Examples + -------- + + >>> tomato init + Success: wrote default settings into /home/kraus/.config/tomato/1.0rc2.dev2/settings.toml + """ + appdir = Path(appdir) + datadir = Path(datadir) defaults = textwrap.dedent( f"""\ # Default settings for tomato-{VERSION} @@ -303,16 +380,24 @@ def reload( port: int, timeout: int, context: zmq.Context, - appdir: Path, + appdir: str, **_: dict, ) -> Reply: """ Reload settings.toml and devices.yaml files and reconfigure tomato daemon. + + Examples + -------- + + >>> # Reload with compatible changes + >>> tomato reload + Success: tomato on port 1234 reloaded with settings from /home/kraus/.config/tomato/1.0rc2.dev2 + """ kwargs = dict(port=port, timeout=timeout, context=context) logger.debug("Loading settings.toml file from %s.", appdir) try: - settings = toml.load(appdir / "settings.toml") + settings = toml.load(Path(appdir) / "settings.toml") except FileNotFoundError: return Reply( success=False, @@ -378,7 +463,7 @@ def pipeline_load( tomato pipeline load """ - stat = status(port=port, timeout=timeout, context=context, with_data=True) + stat = status(port=port, timeout=timeout, context=context) if not stat.success: return stat @@ -422,7 +507,7 @@ def pipeline_eject( tomato pipeline eject """ - stat = status(port=port, timeout=timeout, context=context, with_data=True) + stat = status(port=port, timeout=timeout, context=context) if not stat.success: return stat @@ -475,7 +560,7 @@ def pipeline_ready( pipeline ready """ - stat = status(port=port, timeout=timeout, context=context, with_data=True) + stat = status(port=port, timeout=timeout, context=context) if not stat.success: return stat diff --git a/tests/test_01_tomato.py b/tests/test_01_tomato.py index 1ade0fb1..22fc1c4a 100644 --- a/tests/test_01_tomato.py +++ b/tests/test_01_tomato.py @@ -20,7 +20,7 @@ def test_tomato_status_down(): def test_tomato_status_up(start_tomato_daemon, stop_tomato_daemon): - ret = tomato.status(**kwargs, with_data=True) + ret = tomato.status(**kwargs) print(f"{ret=}") assert ret.success assert len(ret.data.pips) == 1 diff --git a/tests/test_02_ketchup.py b/tests/test_02_ketchup.py index 72e146b1..5d5a52f8 100644 --- a/tests/test_02_ketchup.py +++ b/tests/test_02_ketchup.py @@ -44,7 +44,7 @@ def test_ketchup_submit_two(datadir, start_tomato_daemon, stop_tomato_daemon): def test_ketchup_status_empty(start_tomato_daemon, stop_tomato_daemon): assert wait_until_tomato_running(port=PORT, timeout=5000) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[]) print(f"{ret=}") assert not ret.success @@ -54,7 +54,7 @@ def test_ketchup_status_empty(start_tomato_daemon, stop_tomato_daemon): def test_ketchup_status_all_queued(datadir, start_tomato_daemon, stop_tomato_daemon): args = [datadir, start_tomato_daemon, stop_tomato_daemon] test_ketchup_submit_two(*args) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[]) print(f"{ret=}") assert ret.success @@ -65,26 +65,26 @@ def test_ketchup_status_all_queued(datadir, start_tomato_daemon, stop_tomato_dae def test_ketchup_status_one_queued(datadir, start_tomato_daemon, stop_tomato_daemon): args = [datadir, start_tomato_daemon, stop_tomato_daemon] test_ketchup_submit_two(*args) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[2]) print(f"{ret=}") assert ret.success assert "found 1" in ret.msg assert len(ret.data) == 1 - assert 2 in ret.data.keys() + assert ret.data[0].id == 2 def test_ketchup_status_two_queued(datadir, start_tomato_daemon, stop_tomato_daemon): args = [datadir, start_tomato_daemon, stop_tomato_daemon] test_ketchup_submit_two(*args) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[1, 2]) print(f"{ret=}") assert ret.success assert "found 2" in ret.msg assert len(ret.data) == 2 - assert 1 in ret.data.keys() - assert 2 in ret.data.keys() + assert ret.data[0].id == 1 + assert ret.data[1].id == 2 @pytest.mark.parametrize( @@ -99,13 +99,13 @@ def test_ketchup_status_running(pl, datadir, start_tomato_daemon, stop_tomato_da tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid=pl) tomato.pipeline_ready(**kwargs, pipeline="pip-counter") wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") assert ret.success assert "found 1" in ret.msg assert len(ret.data) == 1 - assert ret.data[1].status == "r" + assert ret.data[0].status == "r" @pytest.mark.parametrize( @@ -120,11 +120,11 @@ def test_ketchup_status_complete(pl, datadir, start_tomato_daemon, stop_tomato_d tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid=pl) tomato.pipeline_ready(**kwargs, pipeline="pip-counter") assert wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=5000) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") assert ret.success - assert ret.data[1].status == "c" + assert ret.data[0].status == "c" assert os.path.exists("results.1.nc") @@ -142,20 +142,20 @@ def test_ketchup_cancel_running(pl, datadir, start_tomato_daemon, stop_tomato_da assert wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) assert wait_until_pickle(jobid=1, timeout=2000) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.cancel(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") assert ret.success - assert ret.data[1].status == "rd" + assert ret.data[0].status == "rd" assert wait_until_ketchup_status(jobid=1, status="cd", port=PORT, timeout=5000) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") print(f"{os.listdir()=}") print(f"{os.listdir('Jobs')=}") print(f"{os.listdir(os.path.join('Jobs', '1'))=}") - assert ret.data[1].status == "cd" + assert ret.data[0].status == "cd" assert os.path.exists("results.1.nc") @@ -171,11 +171,11 @@ def test_ketchup_cancel_queued(pl, datadir, start_tomato_daemon, stop_tomato_dae tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid=pl) assert wait_until_ketchup_status(jobid=1, status="qw", port=PORT, timeout=5000) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.cancel(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") assert ret.success - assert ret.data[1].status == "cd" + assert ret.data[0].status == "cd" @pytest.mark.parametrize( @@ -192,7 +192,7 @@ def test_ketchup_snapshot(pl, datadir, start_tomato_daemon, stop_tomato_daemon): assert wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) assert wait_until_pickle(jobid=1, timeout=2000) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.snapshot(jobids=[1], status=status) print(f"{ret=}") assert ret.success @@ -202,19 +202,19 @@ def test_ketchup_snapshot(pl, datadir, start_tomato_daemon, stop_tomato_daemon): def test_ketchup_search(datadir, start_tomato_daemon, stop_tomato_daemon): args = [datadir, start_tomato_daemon, stop_tomato_daemon] test_ketchup_submit_two(*args) - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.search(jobname="2", status=status) print(f"{ret=}") assert ret.success assert len(ret.data) == 1 - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.search(jobname="job", status=status) print(f"{ret=}") assert ret.success assert len(ret.data) == 2 - status = tomato.status(**kwargs, with_data=True) + status = tomato.status(**kwargs) ret = ketchup.search(jobname="wrong", status=status) print(f"{ret=}") assert ret.success is False diff --git a/tests/test_03_state.py b/tests/test_03_state.py index 18f6b171..5d1de3b7 100644 --- a/tests/test_03_state.py +++ b/tests/test_03_state.py @@ -31,7 +31,7 @@ def test_stop_with_queued_jobs(datadir, start_tomato_daemon, stop_tomato_daemon) tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) assert wait_until_tomato_running(port=PORT, timeout=WAIT) - ret = tomato.status(**kwargs, with_data=True) + ret = tomato.status(**kwargs) print(f"{ret=}") assert ret.success assert len(ret.data.jobs) == 2 @@ -66,7 +66,7 @@ def test_recover_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): ret = tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) print(f"{ret=}") assert wait_until_tomato_running(port=PORT, timeout=WAIT) - ret = tomato.status(**kwargs, with_data=True) + ret = tomato.status(**kwargs) print(f"{ret=}") assert ret.success assert len(ret.data.jobs) == 1 @@ -74,7 +74,7 @@ def test_recover_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): assert ret.data.jobs[1].status == "r" assert wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=25000) - ret = tomato.status(**kwargs, with_data=True) + ret = tomato.status(**kwargs) print(f"{ret=}") assert ret.success assert len(ret.data.jobs) == 1 @@ -97,7 +97,7 @@ def test_recover_waiting_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) assert wait_until_tomato_running(port=PORT, timeout=WAIT) assert wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=5000) - ret = tomato.status(**kwargs, with_data=True) + ret = tomato.status(**kwargs) print(f"{ret=}") assert ret.success assert len(ret.data.jobs) == 1 @@ -114,7 +114,7 @@ def test_recover_crashed_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid="counter_20_5") tomato.pipeline_ready(**kwargs, pipeline="pip-counter") wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) - ret = tomato.status(**kwargs, with_data=True) + ret = tomato.status(**kwargs) print(f"{ret=}") pid = ret.data.jobs[1].pid @@ -127,7 +127,7 @@ def test_recover_crashed_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) assert wait_until_tomato_running(port=PORT, timeout=WAIT) - ret = tomato.status(**kwargs, with_data=True) + ret = tomato.status(**kwargs) print(f"{ret=}") assert ret.success assert len(ret.data.jobs) == 1 diff --git a/tests/test_99_example_counter.py b/tests/test_99_example_counter.py index fdc988e0..8fb06969 100644 --- a/tests/test_99_example_counter.py +++ b/tests/test_99_example_counter.py @@ -26,10 +26,10 @@ def test_counter_npoints( ): os.chdir(datadir) utils.run_casenames([casename], [None], ["pip-counter"]) - status = utils.job_status(1)["data"][1]["status"] + status = utils.job_status(1) while status in {"q", "qw", "r"}: time.sleep(1) - status = utils.job_status(1)["data"][1]["status"] + status = utils.job_status(1) assert status == "c" files = os.listdir(os.path.join(".", "Jobs", "1")) assert "jobdata.json" in files @@ -52,16 +52,14 @@ def test_counter_cancel(casename, datadir, start_tomato_daemon, stop_tomato_daem os.chdir(datadir) cancel = True utils.run_casenames([casename], [None], ["pip-counter"]) - ret = utils.job_status(1) - status = ret["data"][1]["status"] + status = utils.job_status(1) while status in {"q", "qw", "r", "rd"}: time.sleep(2) if cancel and status == "r": subprocess.run(["ketchup", "cancel", "-p", "12345", "1"]) cancel = False time.sleep(2) - ret = utils.job_status(1) - status = ret["data"][1]["status"] + status = utils.job_status(1) assert status == "cd" @@ -78,13 +76,13 @@ def test_counter_snapshot( os.chdir(datadir) utils.run_casenames([casename], [None], ["pip-counter"]) time.sleep(5) - status = utils.job_status(1)["data"][1]["status"] + status = utils.job_status(1) while status in {"q", "qw", "r"}: time.sleep(1) if external and status == "r": subprocess.run(["ketchup", "snapshot", "-p", "12345", "1"]) external = False - status = utils.job_status(1)["data"][1]["status"] + status = utils.job_status(1) assert status == "c" assert os.path.exists("snapshot.1.nc") @@ -110,9 +108,7 @@ def test_counter_multidev(casename, npoints, datadir, stop_tomato_daemon): utils.wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=10000) utils.wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=10000) - ret = utils.job_status(1) - print(f"{ret=}") - status = utils.job_status(1)["data"][1]["status"] + status = utils.job_status(1) assert status == "c" files = os.listdir(os.path.join(".", "Jobs", "1")) assert "jobdata.json" in files diff --git a/tests/test_99_psutil.py b/tests/test_99_psutil.py index b2c27ac8..491bc3ff 100644 --- a/tests/test_99_psutil.py +++ b/tests/test_99_psutil.py @@ -31,9 +31,7 @@ def test_psutil_multidev(casename, npoints, datadir, stop_tomato_daemon): utils.wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=2000) utils.wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=2000) - ret = utils.job_status(1) - print(f"{ret=}") - status = utils.job_status(1)["data"][1]["status"] + status = utils.job_status(1) assert status == "c" files = os.listdir(os.path.join(".", "Jobs", "1")) assert "jobdata.json" in files diff --git a/tests/utils.py b/tests/utils.py index e225a77e..56f9f13d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,6 +1,5 @@ import subprocess import time -import yaml import logging import os import psutil @@ -27,8 +26,8 @@ def job_status(jobid): capture_output=True, text=True, ) - yml = yaml.safe_load(ret.stdout) - return yml + status = ret.stdout.split("'")[1] + return status def wait_until_tomato_running(port: int, timeout: int): @@ -39,8 +38,7 @@ def wait_until_tomato_running(port: int, timeout: int): capture_output=True, text=True, ) - data = yaml.safe_load(ret.stdout) - if data["success"]: + if "Success" in ret.stdout: return True time.sleep(0.5) return False @@ -54,8 +52,7 @@ def wait_until_tomato_stopped(port: int, timeout: int): capture_output=True, text=True, ) - data = yaml.safe_load(ret.stdout) - if not data["success"]: + if "Failure" in ret.stdout: return True time.sleep(0.5) return False @@ -69,8 +66,7 @@ def wait_until_ketchup_status(jobid: int, status: str, port: int, timeout: int): capture_output=True, text=True, ) - data = yaml.safe_load(ret.stdout)["data"] - if data[jobid]["status"] == status: + if f"[{status!r}]" in ret.stdout: return True time.sleep(0.5) return False