From e8b7052bd7613d81f92028993ea3519b6ec5ed10 Mon Sep 17 00:00:00 2001 From: Stephen Hudson Date: Thu, 14 Nov 2024 12:39:40 -0600 Subject: [PATCH 1/2] Feature/spawn with interfacer (#1464) * Use QComm in QCommProcess for comms * Remove thread locked comm in executor * Add conditional code for executor forwarding * Remove extra setup() call * Use correct outbox queue --- libensemble/comms/comms.py | 5 +++-- libensemble/generators.py | 28 ++++++++++++++-------------- libensemble/utils/runners.py | 2 +- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/libensemble/comms/comms.py b/libensemble/comms/comms.py index 51042c463..d8d892319 100644 --- a/libensemble/comms/comms.py +++ b/libensemble/comms/comms.py @@ -226,6 +226,7 @@ def _qcomm_main(comm, main, *args, **kwargs): if not kwargs.get("user_function"): _result = main(comm, *args, **kwargs) else: + # SH - could we insert comm into libE_info["comm"] here if it exists _result = main(*args) comm.send(CommResult(_result)) except Exception as e: @@ -264,8 +265,8 @@ def __init__(self, main, nworkers, *args, **kwargs): self.inbox = Queue() self.outbox = Queue() super().__init__(self, main, *args, **kwargs) - comm = QComm(self.inbox, self.outbox, nworkers) - self.handle = Process(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs) + self.comm = QComm(self.inbox, self.outbox, nworkers) + self.handle = Process(target=_qcomm_main, args=(self.comm, main) + args, kwargs=kwargs) def terminate(self, timeout=None): """Terminate the process.""" diff --git a/libensemble/generators.py b/libensemble/generators.py index eb9dfe462..cae1f109e 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -1,6 +1,5 @@ # import queue as thread_queue from abc import ABC, abstractmethod -from multiprocessing import Manager # from multiprocessing import Queue as process_queue from typing import List, Optional @@ -8,7 +7,7 @@ import numpy as np from numpy import typing as npt -from libensemble.comms.comms import QComm, QCommProcess # , QCommThread +from libensemble.comms.comms import QCommProcess # , QCommThread from libensemble.executors import Executor from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP from libensemble.tools.tools import add_unique_random_streams @@ -150,14 +149,13 @@ def setup(self) -> None: """Must be called once before calling ask/tell. Initializes the background thread.""" # self.inbox = thread_queue.Queue() # sending betweween HERE and gen # self.outbox = thread_queue.Queue() - self.m = Manager() - self.inbox = self.m.Queue() - self.outbox = self.m.Queue() - comm = QComm(self.inbox, self.outbox) - self.libE_info["comm"] = comm # replacing comm so gen sends HERE instead of manager + # SH this contains the thread lock - removing.... wrong comm to pass on anyway. + if hasattr(Executor.executor, "comm"): + del Executor.executor.comm self.libE_info["executor"] = Executor.executor + # SH - fix comment (thread and process & name object appropriately - task? qcomm?) # self.thread = QCommThread( # TRY A PROCESS # self.gen_f, # None, @@ -176,7 +174,10 @@ def setup(self) -> None: self.gen_specs, self.libE_info, user_function=True, - ) # note that self.thread's inbox/outbox are unused by the underlying gen + ) + + # SH this is a bit hacky - maybe it can be done inside comms (in _qcomm_main)? + self.libE_info["comm"] = self.thread.comm def _set_sim_ended(self, results: npt.NDArray) -> npt.NDArray: new_results = np.zeros(len(results), dtype=self.gen_specs["out"] + [("sim_ended", bool), ("f", float)]) @@ -197,19 +198,18 @@ def ask_numpy(self, num_points: int = 0) -> npt.NDArray: if self.thread is None: self.setup() self.thread.run() - _, ask_full = self.outbox.get() + _, ask_full = self.thread.recv() return ask_full["calc_out"] def tell_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: """Send the results of evaluations to the generator, as a NumPy array.""" if results is not None: results = self._set_sim_ended(results) - self.inbox.put( - (tag, {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}}) - ) - self.inbox.put((0, np.copy(results))) + Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}} + self.thread.send(tag, Work) + self.thread.send(tag, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object else: - self.inbox.put((tag, None)) + self.thread.send(tag, None) def final_tell(self, results: npt.NDArray = None) -> (npt.NDArray, dict, int): """Send any last results to the generator, and it to close down.""" diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 5a11f7e09..3adab746a 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -173,7 +173,7 @@ def _get_initial_ask(self, libE_info) -> npt.NDArray: def _ask_and_send(self): """Loop over generator's outbox contents, send to manager""" - while self.gen.outbox.qsize(): # recv/send any outstanding messages + while self.gen.thread.outbox.qsize(): # recv/send any outstanding messages points, updates = self.gen.ask_numpy(), self.gen.ask_updates() if updates is not None and len(updates): self.ps.send(points) From 23e5164227dadb228192668e557023fd996715be Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 14 Nov 2024 14:09:35 -0600 Subject: [PATCH 2/2] use macOS-supported condition to check if gen_f has enqueued any outbound messages --- libensemble/utils/runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 3adab746a..d74ea89d8 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -173,7 +173,7 @@ def _get_initial_ask(self, libE_info) -> npt.NDArray: def _ask_and_send(self): """Loop over generator's outbox contents, send to manager""" - while self.gen.thread.outbox.qsize(): # recv/send any outstanding messages + while not self.gen.thread.outbox.empty(): # recv/send any outstanding messages points, updates = self.gen.ask_numpy(), self.gen.ask_updates() if updates is not None and len(updates): self.ps.send(points)