diff --git a/.github/workflows/pull-request-commit.yml b/.github/workflows/pull-request-commit.yml index 1d94cf2f..33f3f401 100644 --- a/.github/workflows/pull-request-commit.yml +++ b/.github/workflows/pull-request-commit.yml @@ -2,7 +2,7 @@ name: pull-request-commit on: pull_request: branches: - - master + - 0.2.x jobs: build: strategy: @@ -20,7 +20,7 @@ jobs: - uses: ./.github/workflows/before-job - uses: ./.github/workflows/build-job - uses: actions/upload-artifact@v2 - with: + with: name: dist-${{ matrix.os }}-${{ matrix.pyver }} path: dist test: @@ -72,6 +72,6 @@ jobs: subprocess.run(["pip", "install", f"{fn}[docs]"]) - uses: ./.github/workflows/docs-job - uses: actions/upload-artifact@v2 - with: + with: name: public-${{ matrix.os }}-${{ matrix.pyver }} path: public \ No newline at end of file diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 163d8fa6..29dea4ba 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -137,6 +137,15 @@ To manage *samples* in the *pipelines*, use the following :mod:`~tomato.ketchup` has been implemented to allow the user to investigate the *sample* and/or *pipeline* for any faults. + #. **To mark** a *pipeline* as **not ready**, run: + + .. code-block:: bash + + >>> ketchup unready + + This command will mark the *pipeline* as not ready, any *jobs* that are submitted + will not be run automatically. + .. note:: Further information about :mod:`~tomato.ketchup` is available in the documentation diff --git a/src/tomato/drivers/driver_funcs.py b/src/tomato/drivers/driver_funcs.py index bbe119d8..2db56e11 100644 --- a/src/tomato/drivers/driver_funcs.py +++ b/src/tomato/drivers/driver_funcs.py @@ -143,30 +143,30 @@ def data_poller( stops_required = kwargs.get("stops_required", 2) stops_received = 0 log.debug(f"in 'data_poller', {pollrate=}") - _, _, metadata = driver_api( - driver, "get_status", jq, log, address, channel, **kwargs - ) - mem_size = metadata["mem_size"] - done = False + finished_polling = False previous = None - while not done: - while True: + data = {} + while not finished_polling: + try: # try to get status from last get_data, otherwise use get_status + status = data["current"]["status"] + if status in ["RUN", "PAUSE"]: + stop = False + else: + stop = True + if status != "STOP": + log.info( + f"device '{device}' status '{status}' not understood, expected 'RUN' 'PAUSE' or 'STOP'" + ) + except (TypeError, KeyError): # data["current"]["status"] doesn't exist + ts, stop, _ = driver_api( + driver, "get_status", jq, log, address, channel, **kwargs + ) + while True: # get data until no more data is available ts, nrows, data = driver_api( driver, "get_data", jq, log, address, channel, **kwargs ) data["previous"] = previous previous = data["current"] - mem_filled = data["current"]["mem_filled"] - f_mem_filled = mem_filled / mem_size - if f_mem_filled > 0.995: - log.critical( - f"{device} {address}:{channel} memory is full, data is being lost" - ) - elif f_mem_filled > 0.8: - log.warning( - f"{device} {address}:{channel} memory is {f_mem_filled*100:.1f}% full" - ) - if nrows > 0: isots = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() isots = isots.replace(":", "") @@ -175,26 +175,13 @@ def data_poller( with open(fn, "w") as of: json.dump(data, of) else: - status = data["current"]["status"] - if status == "STOP": - stops_received += 1 - log.debug( - f"{address}:{channel} has given {stops_received} 'STOP' statuses in a row" - ) - elif status in ["RUN", "PAUSE"]: - stops_received = 0 - else: - stops_received += 1 - log.critical( - f"get_data status not understood: '{status}', counting as 'STOP' status {stops_received}" - ) break - if stops_received >= stops_required: - done = True - log.info( - f"device '{device}' has stopped polling after {stops_required} consecutive 'STOP' statuses" - ) + if stop: + stops_received += 1 + if stops_received >= stops_required: + finished_polling = True else: + stops_received = 0 time.sleep(pollrate) log.info(f"rejoining main thread") return diff --git a/src/tomato/ketchup/__init__.py b/src/tomato/ketchup/__init__.py index bf09f97d..523fcd13 100644 --- a/src/tomato/ketchup/__init__.py +++ b/src/tomato/ketchup/__init__.py @@ -18,6 +18,7 @@ - :func:`.load` to load a *sample* into a *pipeline* - :func:`.eject` to remove any *sample* present in a *pipeline* - :func:`.ready` to mark a *pipeline* as ready +- :func:`.unready` to mark a *pipeline* as not ready """ @@ -29,6 +30,7 @@ load, eject, ready, + unready, snapshot, search, cancel_all, diff --git a/src/tomato/ketchup/functions.py b/src/tomato/ketchup/functions.py index cc1ca038..3a694bd6 100644 --- a/src/tomato/ketchup/functions.py +++ b/src/tomato/ketchup/functions.py @@ -2,6 +2,7 @@ import json import yaml import logging +import base64 import signal import psutil import time @@ -22,15 +23,16 @@ def submit(args: Namespace) -> None: .. code:: bash - ketchup [-t] [-v] [-q] submit [--jobname JOBNAME] + ketchup [-t] [-v] [-q] submit [-J] [--jobname JOBNAME] Attempts to open the ``yaml/json`` file specified in the ```` argument, and submit it to tomato's queue. The supplied :class:`argparse.Namespace` has to contain the path to the ``payload``. Optional arguments include an optional ``--jobname/-j`` parameter for supplying a - job name for the queue, the verbose/quiet switches (``-v/-q``) and the testing - switch (``-t``). + job name for the queue, the verbose/quiet switches (``-v/-q``), the testing + switch (``-t``), and the ``--json/-J`` switch for directly submitting a base64 + encoded json string directly instead of a file name. Examples -------- @@ -57,20 +59,26 @@ def submit(args: Namespace) -> None: dirs = setlib.get_dirs(args.test) settings = setlib.get_settings(dirs.user_config_dir, dirs.user_data_dir) queue = settings["queue"] - if os.path.exists(args.payload) and os.path.isfile(args.payload): - log.debug(f"attempting to open Payload at '{args.payload}'") - else: - log.error(f"Payload file '{args.payload} not found.") - return None - - with open(args.payload, "r") as infile: - if args.payload.endswith("json"): - pldict = json.load(infile) - elif args.payload.endswith("yml") or args.payload.endswith("yaml"): - pldict = yaml.full_load(infile) + if args.json: # read directly from json string encoded in base64 + log.debug("attempting to open payload delivered by base64 encoded json string") + json_string = base64.b64decode(args.payload).decode() + pldict = json.loads(json_string) + payload = to_payload(**pldict) + else: # load a local file + if os.path.exists(args.payload) and os.path.isfile(args.payload): + log.debug(f"attempting to open Payload at '{args.payload}'") else: - log.error("Payload file name must end with one of: {json, yml, yaml}.") + log.error(f"Payload file '{args.payload} not found.") return None + + with open(args.payload, "r") as infile: + if args.payload.endswith("json"): + pldict = json.load(infile) + elif args.payload.endswith("yml") or args.payload.endswith("yaml"): + pldict = yaml.full_load(infile) + else: + log.error("Payload file name must end with one of: {json, yml, yaml}.") + return None payload = to_payload(**pldict) log.debug("Payload=Payload(%s)", payload) if payload.tomato.output.path is None: @@ -98,9 +106,9 @@ def status(args: Namespace) -> None: .. code:: bash - ketchup [-t] [-v] [-q] status - ketchup [-t] [-v] [-q] status [queue|state] - ketchup [-t] [-v] [-q] status + ketchup [-t] [-v] [-q] status [-J] + ketchup [-t] [-v] [-q] status [-J] [queue|state] + ketchup [-t] [-v] [-q] status [-J] The :class:`argparse.Namespace` has to contain the ```` the status of which is supposed to be queried. Alternatively, the status of the ``queue`` @@ -133,6 +141,11 @@ def status(args: Namespace) -> None: 3 None r 1035 dummy-10 4 other_name q + >>> # Get pipeline status of tomato as a json string: + >>> ketchup -J status + {"pipeline": ["dummy-10", "dummy-5"], "ready": [false, false], "jobid": [3, null], + "PID": [1035, null], "sampleid": ["dummy_sequential_1_0.05", null]} + .. note:: Calling ``ketchup status`` with a single ``jobid`` will return a ``yaml`` @@ -154,65 +167,91 @@ def status(args: Namespace) -> None: queue = settings["queue"] if "state" in args.jobid: + column_names = ["pipeline", "sampleid", "ready", "jobid", "pid"] + column_widths = [20, 25, 5, 6, 9] pips = dbhandler.pipeline_get_all(state["path"], type=state["type"]) - print( - f"{'pipeline':20s} {'ready':5s} {'jobid':6s} {'(PID)':9s} {'sampleid':20s} " - ) - print("=" * 67) - for pip in pips: - sampleid, ready, jobid, pid = dbhandler.pipeline_get_info( - state["path"], pip, state["type"] - ) - rstr = "yes" if ready else "no" - job = f"{str(jobid):6s} ({pid})" if jobid is not None else str(jobid) - print(f"{pip:20s} {rstr:5s} {job:16s} {str(sampleid):20s}") + rows = [ + (pip,) + dbhandler.pipeline_get_info(state["path"], pip, state["type"]) + for pip in pips + ] elif "queue" in args.jobid: + column_names = ["jobid", "jobname", "status", "pipeline", "pid"] + column_widths = [6, 15, 6, 20, 9] jobs = dbhandler.job_get_all(queue["path"], type=queue["type"]) running = dbhandler.pipeline_get_running(state["path"], type=state["type"]) - print( - f"{'jobid':6s} {'jobname':20s} {'status':6s} {'(PID)':9s} {'pipeline':20s}" - ) - print("=" * (7 + 21 + 7 + 10 + 20)) + rows = [] + jobids = [] for jobid, jobname, payload, status in jobs: - if status.startswith("q"): - print(f"{str(jobid):6s} {str(jobname):20s} {status}") - elif status.startswith("r"): - for pip, pjobid, pid in running: - if pjobid == jobid: - print( - f"{str(jobid):6s} {str(jobname):20s} " - f"{status:6s} {str(pid):7s} {pip:20s}" - ) - elif status.startswith("c") and args.verbose - args.quiet > 0: - print(f"{str(jobid):6s} {str(jobname):20s} {status:6s}") + if not status.startswith("c") or args.verbose - args.quiet > 0: + row = [jobid, jobname, status, None, None] + jobids += [jobid] + rows += [row] + for pip, pjobid, pid in running: + try: + idx = jobids.index(pjobid) + rows[idx][3:4] = [pip, pid] + except ValueError: + pass # not in running else: - for jobid in args.jobid: + column_names = [ + "jobid", + "jobname", + "status", + "submitted_at", + "executed_at", + "completed_at", + "pipeline", + "pid", + ] + running = dbhandler.pipeline_get_running(state["path"], type=state["type"]) + if running: + pips, pjobids, pids = zip(*running) + rows = [] + for i, jobid in enumerate(args.jobid): try: jobid = int(jobid) except: logging.error("could not parse provided jobid: '%s'", jobid) - return 1 + return None ji = dbhandler.job_get_info(queue["path"], jobid, type=queue["type"]) - if ji is None: - log.error("job with jobid '%s' does not exist.", jobid) + if not ji: + log.error(f"job with jobid '{jobid}' does not exist.") return None jobname, payload, status, submitted_at, executed_at, completed_at = ji - print(f"- jobid: {jobid}") - print(f" jobname: {'null' if jobname is None else jobname}") - print(f" status: {status}") - print(f" submitted: {submitted_at}") - if status.startswith("r") or status.startswith("c"): - print(f" executed: {executed_at}") - running = dbhandler.pipeline_get_running( - state["path"], type=state["type"] + row = [ + int(jobid), + jobname, + status, + submitted_at, + executed_at, + completed_at, + None, + None, + ] + if running and (status.startswith("r") or status.startswith("c")): + try: + i = pjobids.index(int(jobid)) + row[6:7] = [pips[i], pids[i]] + except ValueError: + pass # not in running + rows += [row] + if args.json: + columns = list(map(list, zip(*rows))) + data = {name: column for name, column in zip(column_names, columns)} + print(json.dumps(data)) + else: + if "state" in args.jobid or "queue" in args.jobid: + print( + " ".join([f"{n:<{w}.{w}}" for n, w in zip(column_names, column_widths)]) + ) + print("=" * (sum(column_widths) + len(column_names))) + for row in rows: + print( + " ".join([f"{str(r):<{w}.{w}}" for r, w in zip(row, column_widths)]) ) - for pipeline, pjobid, pid in running: - if pjobid == jobid: - print(f" pipeline: {pipeline}") - print(f" pid: {pid}") - break - if status.startswith("c"): - print(f" completed: {completed_at}") + else: + data = [dict(zip(column_names, row)) for row in rows] + print(yaml.dump(data, default_flow_style=False, sort_keys=False)) def cancel(args: Namespace) -> None: @@ -379,7 +418,7 @@ def eject(args: Namespace) -> None: log.info(f"pipeline '{args.pipeline}' is already empty") -def ready(args): +def ready(args, mark_ready=True): """ Mark pipeline as ready. Usage: @@ -405,10 +444,29 @@ def ready(args): ) if jobid is None and pid is None: - log.info(f"marking pipeline '{args.pipeline}' as ready.") - dbhandler.pipeline_reset_job(state["path"], args.pipeline, True, state["type"]) + log.info( + f"marking pipeline '{args.pipeline}' as {'ready' if mark_ready else 'not ready'}." + ) + dbhandler.pipeline_reset_job( + state["path"], args.pipeline, mark_ready, state["type"] + ) else: - log.warning(f"cannot mark pipeline as ready: job '{jobid}' is running.") + log.warning(f"cannot change pipeline ready status: job '{jobid}' is running.") + + +def unready(args): + """ + Mark pipeline as unready. Usage: + + .. code:: bash + + ketchup [-t] [-v] [-q] unready + + Marks the ``pipeline`` as unready. Checks whether the pipeline exists, and + whether it is currently running. + + """ + ready(args, mark_ready=False) def snapshot(args: Namespace) -> None: diff --git a/src/tomato/main.py b/src/tomato/main.py index 9a4a1256..dca341c8 100644 --- a/src/tomato/main.py +++ b/src/tomato/main.py @@ -122,6 +122,13 @@ def run_ketchup(): help="Set the job name of the submitted job to?", default=None, ) + submit.add_argument( + "-J", + "--json", + action="store_true", + default=False, + help="Return a json string from ketchup commands.", + ) submit.set_defaults(func=ketchup.submit) status = subparsers.add_parser("status") @@ -135,6 +142,13 @@ def run_ketchup(): ), default=["state"], ) + status.add_argument( + "-J", + "--json", + action="store_true", + default=False, + help="Return a json string from ketchup commands.", + ) status.set_defaults(func=ketchup.status) cancel = subparsers.add_parser("cancel") @@ -165,6 +179,12 @@ def run_ketchup(): ) ready.set_defaults(func=ketchup.ready) + unready = subparsers.add_parser("unready") + unready.add_argument( + "pipeline", help="Name of the pipeline to mark as not ready.", default=None + ) + unready.set_defaults(func=ketchup.unready) + snapshot = subparsers.add_parser("snapshot") snapshot.add_argument( "jobid", help="The jobid of the job to be snapshotted.", default=None