Skip to content

Commit

Permalink
yaml output and CompletedJob (#105)
Browse files Browse the repository at this point in the history
* logdir/appdir/datadir as str

* Replace completed Jobs with CompletedJob

* fstrings in logging

* Don't replace "ce" and "cd".

* add yaml argument.

* ruff

* eliminate with_data

* More docs

* More print changes.

* Docs fix.
  • Loading branch information
PeterKraus authored Nov 18, 2024
1 parent 69d676b commit b224469
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 233 deletions.
28 changes: 13 additions & 15 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ The easiest way to do create this file is using the provided ``tomato init`` com

.. code-block::
:linenos:
:emphasize-lines: 3
:emphasize-lines: 2
kraus@dorje:/home/kraus/$ tomato init
data: null
msg: wrote default settings into /home/kraus/.config/tomato/1.0a1/settings.toml
success: true
Success: wrote default settings into /home/kraus/.config/tomato/1.0a1/settings.toml
Where *appdir* is ``/home/kraus/.config/tomato/1.0a1/``. The *appdir* can be specified
using the ``--appdir`` argument to **tomato**.
Expand Down Expand Up @@ -206,12 +204,12 @@ The ``devices`` section of the default *devices file* is shown below:
- name: dev-counter
driver: "example_counter"
address: "example-addr"
channels: [1]
channels: ["1"]
pollrate: 1
Here, we define a single device using the :mod:`~tomato.drivers.example_counter` driver.
The definition includes the ``address`` of the device (:class:`str` type) as well as an
enumeration of individually-addressable channels the device has (:class:`list[int]`).
enumeration of individually-addressable channels the device has (:class:`list[str]`).

For example, the devices shown in the :ref:`concepts flowchart <concepts>` above would
be defined as:
Expand All @@ -223,17 +221,17 @@ be defined as:
- name: device 1
driver: "driver 123"
address: "192.168.1.1"
channels: [1, 2, 3]
channels: ["1", "2", "3"]
pollrate: 1
- name: device a
driver: "driver abc"
address: "COM1"
channels: [100]
channels: ["100"]
pollrate: 5
- name: device b
driver: "driver abc"
address: "COM2"
channels: [100]
channels: ["100"]
pollrate: 5
Expand All @@ -256,7 +254,7 @@ The default ``pipelines`` section looks as follows:
components:
- role: counter
device: dev-counter
channel: 1
channel: "1"
Here, a single *pipeline* called ``pip-counter`` is defined to contain the one available
channel of the ``dev-counter`` device (defined on line 5) shown further above. For multi
Expand Down Expand Up @@ -290,23 +288,23 @@ above can be defined as:
components:
- role: dev 123
device: device 1
channel: 1
channel: "1"
- role: dev abc
device: device a
channel: 100
channel: "100"
- name: pipeline b2
components:
- role: dev 123
device: device 1
channel: 2
channel: "2"
- role: dev abc
device: device b
channel: 100
channel: "100"
- name: pipeline 3
components:
- role: dev 123
device: device 1
channel: 3
channel: "3"
.. _payfile:

Expand Down
37 changes: 27 additions & 10 deletions src/tomato/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ def run_tomato():
subparsers = parser.add_subparsers(dest="subcommand", required=True)

status = subparsers.add_parser("status")
status.add_argument(
"--with-data",
action="store_true",
default=False,
help="Return full daemon status. If false, only daemon.status will be returned",
)
status.set_defaults(func=tomato.status)

start = subparsers.add_parser("start")
Expand Down Expand Up @@ -103,6 +97,13 @@ def run_tomato():
type=int,
default=3000,
)
p.add_argument(
"--yaml",
"-y",
help="Return output as a yaml.",
action="store_true",
default=False,
)

for p in [start, init, reload]:
p.add_argument(
Expand Down Expand Up @@ -140,7 +141,10 @@ def run_tomato():
context = zmq.Context()
if "func" in args:
ret = args.func(**vars(args), context=context, verbosity=verbosity)
print(yaml.dump(ret.dict()))
if args.yaml:
print(yaml.dump(ret.dict()))
else:
print(f"{'Success' if ret.success else 'Failure'}: {ret.msg}")


def run_ketchup():
Expand Down Expand Up @@ -253,6 +257,13 @@ def run_ketchup():
type=int,
default=3000,
)
p.add_argument(
"--yaml",
"-y",
help="Return output as a yaml.",
action="store_true",
default=False,
)

args, extras = parser.parse_known_args()
args, extras = verbose.parse_known_args(extras, args)
Expand All @@ -262,11 +273,17 @@ def run_ketchup():

if "func" in args:
context = zmq.Context()
status = tomato.status(**vars(args), context=context, with_data=True)
status = tomato.status(**vars(args), context=context)
if not status.success:
print(yaml.dump(status.dict()))
if args.yaml:
print(yaml.dump(status.dict()))
else:
print(f"Failure: {status.msg}")
else:
ret = args.func(
**vars(args), verbosity=verbosity, context=context, status=status
)
print(yaml.dump(ret.dict()))
if args.yaml:
print(yaml.dump(ret.dict()))
else:
print(f"{'Success' if ret.success else 'Failure'}: {ret.msg}")
15 changes: 7 additions & 8 deletions src/tomato/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,21 @@ def tomato_daemon():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--port", "-p", type=int, default=1234)
parser.add_argument("--verbosity", "-V", type=int, default=logging.INFO)
parser.add_argument("--appdir", "-A", type=Path, default=Path.cwd())
parser.add_argument("--logdir", "-L", type=Path, default=Path.cwd())
parser.add_argument("--appdir", "-A", type=str, default=str(Path.cwd()))
parser.add_argument("--logdir", "-L", type=str, default=str(Path.cwd()))
args = parser.parse_args()
settings = toml.load(args.appdir / "settings.toml")
settings = toml.load(Path(args.appdir) / "settings.toml")

daemon = Daemon(**vars(args), status="bootstrap", settings=settings)
setup_logging(daemon)
logger.info(f"logging set up with verbosity {daemon.verbosity}")
logger.info("logging set up with verbosity %s", daemon.verbosity)

logger.debug("attempting to restore daemon state")
io.load(daemon)
logger.debug(f"{daemon=}")

context = zmq.Context()
rep = context.socket(zmq.REP)
logger.debug(f"binding zmq.REP socket on port {daemon.port}")
logger.debug("binding zmq.REP socket on port %d", daemon.port)
rep.bind(f"tcp://127.0.0.1:{daemon.port}")
poller = zmq.Poller()
poller.register(rep, zmq.POLLIN)
Expand Down Expand Up @@ -93,7 +92,7 @@ def tomato_daemon():
if daemon.status == "stop":
for mgr, label in [(jmgr, "job"), (dmgr, "driver")]:
if mgr is not None and mgr.do_run:
logger.debug(f"stopping {label} manager thread")
logger.debug("stopping %s manager thread", label)
mgr.do_run = False
if jmgr is not None:
jmgr.join(1e-3)
Expand All @@ -112,4 +111,4 @@ def tomato_daemon():
if tN - t0 > 10:
io.store(daemon)
t0 = tN
logger.critical(f"tomato-daemon on port {daemon.port} exiting")
logger.critical("tomato-daemon on port %d is exiting", daemon.port)
30 changes: 23 additions & 7 deletions src/tomato/daemon/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,16 @@
"""

from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job, Component
from tomato.models import (
Daemon,
Driver,
Device,
Reply,
Pipeline,
Job,
Component,
CompletedJob,
)
from pydantic import BaseModel
from typing import Any
import logging
Expand Down Expand Up @@ -46,10 +55,7 @@ def merge_pipelines(


def status(msg: dict, daemon: Daemon) -> Reply:
if msg.get("with_data", False):
return Reply(success=True, msg=daemon.status, data=daemon)
else:
return Reply(success=True, msg=daemon.status)
return Reply(success=True, msg=daemon.status, data=daemon)


def stop(msg: dict, daemon: Daemon) -> Reply:
Expand Down Expand Up @@ -224,12 +230,22 @@ def job(msg: dict, daemon: Daemon) -> Reply:
if jobid is None:
jobid = daemon.nextjob
daemon.jobs[jobid] = Job(id=jobid, **msg.get("params", {}))
logger.info(f"received job {jobid}")
logger.info("received job %d", jobid)
daemon.nextjob += 1
else:
for k, v in msg.get("params", {}).items():
logger.debug(f"setting job {jobid}.{k} to {v}")
logger.debug("setting job parameter %s.%s to %s", jobid, k, v)
setattr(daemon.jobs[jobid], k, v)
cjob = daemon.jobs[jobid]
if cjob.status in {"c"}:
daemon.jobs[jobid] = CompletedJob(
id=cjob.id,
status=cjob.status,
completed_at=cjob.completed_at,
jobname=cjob.jobname,
jobpath=cjob.jobpath,
respath=cjob.respath,
)
return Reply(success=True, msg="job updated", data=daemon.jobs[jobid])


Expand Down
6 changes: 3 additions & 3 deletions src/tomato/daemon/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def tomato_driver_bootstrap(
req: zmq.Socket, logger: logging.Logger, interface: ModelInterface, driver: str
):
logger.debug("getting daemon status")
req.send_pyobj(dict(cmd="status", with_data=True))
req.send_pyobj(dict(cmd="status"))
daemon = req.recv_pyobj().data
drv = daemon.drvs[driver]
interface.settings = drv.settings
Expand Down Expand Up @@ -260,7 +260,7 @@ def manager(port: int, timeout: int = 1000):
spawned_drivers = dict()

while getattr(thread, "do_run"):
req.send_pyobj(dict(cmd="status", with_data=True, sender=sender))
req.send_pyobj(dict(cmd="status", sender=sender))
events = dict(poller.poll(to))
if req not in events:
logger.warning("could not contact tomato-daemon in %d ms", to)
Expand Down Expand Up @@ -311,7 +311,7 @@ def manager(port: int, timeout: int = 1000):
time.sleep(1 if len(spawned_drivers) > 0 else 0.1)

logger.info("instructed to quit")
req.send_pyobj(dict(cmd="status", with_data=True, sender=sender))
req.send_pyobj(dict(cmd="status", sender=sender))
daemon = req.recv_pyobj().data
for driver in daemon.drvs.values():
logger.debug("stopping driver '%s' on port %d", driver.name, driver.port)
Expand Down
14 changes: 8 additions & 6 deletions src/tomato/daemon/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,14 @@ def check_queued_jobs(daemon: Daemon, req) -> dict[int, list[Pipeline]]:
)
if len(matched[job.id]) > 0 and job.status == "q":
logger.info(
f"job {job.id} can queue on pips: {[p.name for p in matched[job.id]]}"
"job %d can queue on pips: {%s}",
job.id,
[p.name for p in matched[job.id]],
)
req.send_pyobj(dict(cmd="job", id=job.id, params=dict(status="qw")))
ret = req.recv_pyobj()
if not ret.success:
logger.error(f"could not set status of job {job.id}")
logger.error("could not set status of job %d", job.id)
continue
else:
job.status = "qw"
Expand Down Expand Up @@ -238,7 +240,7 @@ def manager(port: int, timeout: int = 500):
to = timeout
while getattr(thread, "do_run"):
logger.debug("tick")
req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager"))
req.send_pyobj(dict(cmd="status", sender=f"{__name__}.manager"))
events = dict(poller.poll(to))
if req not in events:
logger.warning(f"could not contact tomato-daemon in {to} ms")
Expand Down Expand Up @@ -399,12 +401,12 @@ def tomato_job() -> None:
logger.error("could not set job status for unknown reason")
return 1

logger.info(f"resetting pipeline {pip!r}")
logger.info("resetting pipeline '%s'", pip)
params = dict(jobid=None, ready=ready, name=pip)
ret = lazy_pirate(pyobj=dict(cmd="pipeline", params=params), **pkwargs)
logger.debug(f"{ret=}")
if not ret.success:
logger.error(f"could not reset pipeline {pip!r}")
logger.error("could not reset pipeline '%s'", pip)
return 1
logger.info("exiting tomato-job")

Expand Down Expand Up @@ -504,7 +506,7 @@ def job_main_loop(
req.connect(f"tcp://127.0.0.1:{port}")

while True:
req.send_pyobj(dict(cmd="status", with_data=True, sender=sender))
req.send_pyobj(dict(cmd="status", sender=sender))
daemon = req.recv_pyobj().data
if all([drv.port is not None for drv in daemon.drvs.values()]):
break
Expand Down
Loading

0 comments on commit b224469

Please sign in to comment.