Skip to content

Commit

Permalink
Fix cancel & sort submit (#47)
Browse files Browse the repository at this point in the history
* Fix cancel & sort submit

* Black
  • Loading branch information
Peter Kraus authored Aug 22, 2022
1 parent b967231 commit ea713c6
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 50 deletions.
108 changes: 58 additions & 50 deletions src/tomato/daemon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _find_matching_pipelines(pipelines: list, method: list[dict]) -> list[str]:


def _pipeline_ready_sample(ret: tuple, sample: dict) -> bool:
sampleid, ready, jobid, pid = ret
sampleid, ready, _, _ = ret
if ready == 0:
return False
else:
Expand All @@ -74,61 +74,69 @@ def main_loop(settings: dict, pipelines: dict, test: bool = False) -> None:
log.debug(f"checking PID of running job '{jobid}'")
if psutil.pid_exists(pid) and "tomato_job" in psutil.Process(pid).name():
log.debug(f"PID of running job '{jobid}' found")
# dbhandler.job_set_status(queue, "r", jobid)
_, _, st, _, _, _ = dbhandler.job_get_info(qup, jobid, type=qut)
if st in {"rd"}:
log.warning(f"cancelling a running job {jobid} with pid {pid}")
proc = psutil.Process(pid=pid)
log.debug(f"{proc=}")
_kill_tomato_job(proc)
log.info(f"setting job {jobid} to status 'cd'")
dbhandler.job_set_status(qup, "cd", jobid, type=qut)
else:
log.debug(f"PID of running job '{jobid}' not found")
dbhandler.pipeline_reset_job(stp, pip, False, type=stt)
dbhandler.job_set_status(qup, "ce", jobid, type=qut)
dbhandler.job_set_time(qup, "completed_at", jobid, type=qut)

# check existing jobs in queue
ret = dbhandler.job_get_all(qup, type=qut)
for jobid, jobname, strpl, st in ret:
# check queued jobs in queue, get their payloads and any matching pipelines
ret = dbhandler.job_get_all_queued(qup, type=qut)
matched_pips = {}
payloads = {}
jobids = []
for jobid, _, strpl, st in ret:
payload = json.loads(strpl)
if st in {"q", "qw"}:
if st == "q":
log.info(f"checking whether job '{jobid}' can ever be matched")
matched_pips = _find_matching_pipelines(pipelines, payload["method"])
if len(matched_pips) > 0 and st != "qw":
dbhandler.job_set_status(qup, "qw", jobid, type=qut)
log.debug(f"checking whether job '{jobid}' can be queued")
for pip in matched_pips:
pipinfo = dbhandler.pipeline_get_info(stp, pip["name"], type=stt)
can_queue = _pipeline_ready_sample(pipinfo, payload["sample"])
if can_queue:
log.info(f"queueing job '{jobid}' on pipeline '{pip['name']}'")
dbhandler.pipeline_reset_job(stp, pip["name"], False, type=stt)
args = {
"settings": settings,
"pipeline": pip,
"payload": payload,
"jobid": jobid,
}
root = os.path.join(settings["queue"]["storage"], str(jobid))
os.makedirs(root)
jpath = os.path.join(root, "jobdata.json")
with open(jpath, "w") as of:
json.dump(args, of, indent=1)
if psutil.WINDOWS:
cfs = subprocess.CREATE_NO_WINDOW
if not test:
cfs |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen(
["tomato_job", str(jpath)],
creationflags=cfs,
)
elif psutil.POSIX:
sns = False if test else True
subprocess.Popen(
["tomato_job", str(jpath)], start_new_session=sns
)
break
elif st in {"rd"}:
log.warning(f"cancelling a running job {jobid} with pid {pid}")
proc = psutil.Process(pid=pid)
log.debug(f"{proc=}")
_kill_tomato_job(proc)
log.info(f"setting job {jobid} to status 'cd'")
dbhandler.job_set_status(qup, "cd", jobid, type=qut)
payloads[jobid] = payload
jobids.append(jobid)
if st == "q":
log.info(f"checking whether job '{jobid}' can ever be matched")
matched_pips[jobid] = _find_matching_pipelines(pipelines, payload["method"])
if len(matched_pips[jobid]) > 0 and st != "qw":
dbhandler.job_set_status(qup, "qw", jobid, type=qut)

# iterate over sorted queued jobs and submit if pipeline with is loaded & ready
for jobid in sorted(jobids):
payload = payloads[jobid]
log.debug(f"checking whether job '{jobid}' can be queued")
for pip in matched_pips[jobid]:
pipinfo = dbhandler.pipeline_get_info(stp, pip["name"], type=stt)
if not _pipeline_ready_sample(pipinfo, payload["sample"]):
continue
log.info(f"queueing job '{jobid}' on pipeline '{pip['name']}'")
dbhandler.pipeline_reset_job(stp, pip["name"], False, type=stt)
args = {
"settings": settings,
"pipeline": pip,
"payload": payload,
"jobid": jobid,
}
root = os.path.join(settings["queue"]["storage"], str(jobid))
os.makedirs(root)
jpath = os.path.join(root, "jobdata.json")
with open(jpath, "w") as of:
json.dump(args, of, indent=1)
if psutil.WINDOWS:
cfs = subprocess.CREATE_NO_WINDOW
if not test:
cfs |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen(
["tomato_job", str(jpath)],
creationflags=cfs,
)
elif psutil.POSIX:
sns = False if test else True
subprocess.Popen(
["tomato_job", str(jpath)],
start_new_session=sns,
)
break
time.sleep(settings.get("main loop", 1))
15 changes: 15 additions & 0 deletions src/tomato/dbhandler/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ def job_set_time(
conn.close()


def job_get_all_queued(
dbpath: str,
type: str = "sqlite3",
) -> list[tuple]:
conn, cur = get_db_conn(dbpath, type)
cur.execute(
"SELECT jobid, jobname, payload, status "
"FROM queue "
f"WHERE status IN ('qw', 'q');"
)
ret = cur.fetchall()
conn.close()
return ret


def job_get_all(
dbpath: str,
type: str = "sqlite3",
Expand Down

0 comments on commit ea713c6

Please sign in to comment.