diff --git a/libensemble/comms/comms.py b/libensemble/comms/comms.py index 51042c463..d8d892319 100644 --- a/libensemble/comms/comms.py +++ b/libensemble/comms/comms.py @@ -226,6 +226,7 @@ def _qcomm_main(comm, main, *args, **kwargs): if not kwargs.get("user_function"): _result = main(comm, *args, **kwargs) else: + # SH - could we insert comm into libE_info["comm"] here if it exists _result = main(*args) comm.send(CommResult(_result)) except Exception as e: @@ -264,8 +265,8 @@ def __init__(self, main, nworkers, *args, **kwargs): self.inbox = Queue() self.outbox = Queue() super().__init__(self, main, *args, **kwargs) - comm = QComm(self.inbox, self.outbox, nworkers) - self.handle = Process(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs) + self.comm = QComm(self.inbox, self.outbox, nworkers) + self.handle = Process(target=_qcomm_main, args=(self.comm, main) + args, kwargs=kwargs) def terminate(self, timeout=None): """Terminate the process.""" diff --git a/libensemble/generators.py b/libensemble/generators.py index eb9dfe462..cae1f109e 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -1,6 +1,5 @@ # import queue as thread_queue from abc import ABC, abstractmethod -from multiprocessing import Manager # from multiprocessing import Queue as process_queue from typing import List, Optional @@ -8,7 +7,7 @@ import numpy as np from numpy import typing as npt -from libensemble.comms.comms import QComm, QCommProcess # , QCommThread +from libensemble.comms.comms import QCommProcess # , QCommThread from libensemble.executors import Executor from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP from libensemble.tools.tools import add_unique_random_streams @@ -150,14 +149,13 @@ def setup(self) -> None: """Must be called once before calling ask/tell. Initializes the background thread.""" # self.inbox = thread_queue.Queue() # sending betweween HERE and gen # self.outbox = thread_queue.Queue() - self.m = Manager() - self.inbox = self.m.Queue() - self.outbox = self.m.Queue() - comm = QComm(self.inbox, self.outbox) - self.libE_info["comm"] = comm # replacing comm so gen sends HERE instead of manager + # SH this contains the thread lock - removing.... wrong comm to pass on anyway. + if hasattr(Executor.executor, "comm"): + del Executor.executor.comm self.libE_info["executor"] = Executor.executor + # SH - fix comment (thread and process & name object appropriately - task? qcomm?) # self.thread = QCommThread( # TRY A PROCESS # self.gen_f, # None, @@ -176,7 +174,10 @@ def setup(self) -> None: self.gen_specs, self.libE_info, user_function=True, - ) # note that self.thread's inbox/outbox are unused by the underlying gen + ) + + # SH this is a bit hacky - maybe it can be done inside comms (in _qcomm_main)? + self.libE_info["comm"] = self.thread.comm def _set_sim_ended(self, results: npt.NDArray) -> npt.NDArray: new_results = np.zeros(len(results), dtype=self.gen_specs["out"] + [("sim_ended", bool), ("f", float)]) @@ -197,19 +198,18 @@ def ask_numpy(self, num_points: int = 0) -> npt.NDArray: if self.thread is None: self.setup() self.thread.run() - _, ask_full = self.outbox.get() + _, ask_full = self.thread.recv() return ask_full["calc_out"] def tell_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: """Send the results of evaluations to the generator, as a NumPy array.""" if results is not None: results = self._set_sim_ended(results) - self.inbox.put( - (tag, {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}}) - ) - self.inbox.put((0, np.copy(results))) + Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}} + self.thread.send(tag, Work) + self.thread.send(tag, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object else: - self.inbox.put((tag, None)) + self.thread.send(tag, None) def final_tell(self, results: npt.NDArray = None) -> (npt.NDArray, dict, int): """Send any last results to the generator, and it to close down.""" diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 08d52a27e..3adab746a 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -136,11 +136,6 @@ def _start_generator_loop(self, tag, Work, H_in): def _persistent_result(self, calc_in, persis_info, libE_info): """Setup comms with manager, setup gen, loop gen to completion, return gen's results""" self.ps = PersistentSupport(libE_info, EVAL_GEN_TAG) - if hasattr(self.gen, "setup"): - self.gen.persis_info = persis_info # passthrough, setup() uses the gen attributes - self.gen.libE_info = libE_info - if self.gen.thread is None: - self.gen.setup() # maybe we're reusing a live gen from a previous run # libE gens will hit the following line, but list_dicts_to_np will passthrough if the output is a numpy array H_out = list_dicts_to_np(self._get_initial_ask(libE_info), dtype=self.specs.get("out")) tag, Work, H_in = self.ps.send_recv(H_out) # evaluate the initial sample @@ -178,7 +173,7 @@ def _get_initial_ask(self, libE_info) -> npt.NDArray: def _ask_and_send(self): """Loop over generator's outbox contents, send to manager""" - while self.gen.outbox.qsize(): # recv/send any outstanding messages + while self.gen.thread.outbox.qsize(): # recv/send any outstanding messages points, updates = self.gen.ask_numpy(), self.gen.ask_updates() if updates is not None and len(updates): self.ps.send(points)