From e8c55e77b745570e635da62c8f6f9574cb6863e4 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 11 Nov 2024 09:02:47 -0600 Subject: [PATCH 1/8] Use QComm in QCommProcess for comms --- libensemble/comms/comms.py | 5 +++-- libensemble/generators.py | 23 ++++++++++------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/libensemble/comms/comms.py b/libensemble/comms/comms.py index 51042c463..d8d892319 100644 --- a/libensemble/comms/comms.py +++ b/libensemble/comms/comms.py @@ -226,6 +226,7 @@ def _qcomm_main(comm, main, *args, **kwargs): if not kwargs.get("user_function"): _result = main(comm, *args, **kwargs) else: + # SH - could we insert comm into libE_info["comm"] here if it exists _result = main(*args) comm.send(CommResult(_result)) except Exception as e: @@ -264,8 +265,8 @@ def __init__(self, main, nworkers, *args, **kwargs): self.inbox = Queue() self.outbox = Queue() super().__init__(self, main, *args, **kwargs) - comm = QComm(self.inbox, self.outbox, nworkers) - self.handle = Process(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs) + self.comm = QComm(self.inbox, self.outbox, nworkers) + self.handle = Process(target=_qcomm_main, args=(self.comm, main) + args, kwargs=kwargs) def terminate(self, timeout=None): """Terminate the process.""" diff --git a/libensemble/generators.py b/libensemble/generators.py index eb9dfe462..3fa08471a 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -1,6 +1,5 @@ # import queue as thread_queue from abc import ABC, abstractmethod -from multiprocessing import Manager # from multiprocessing import Queue as process_queue from typing import List, Optional @@ -150,14 +149,10 @@ def setup(self) -> None: """Must be called once before calling ask/tell. Initializes the background thread.""" # self.inbox = thread_queue.Queue() # sending betweween HERE and gen # self.outbox = thread_queue.Queue() - self.m = Manager() - self.inbox = self.m.Queue() - self.outbox = self.m.Queue() - comm = QComm(self.inbox, self.outbox) - self.libE_info["comm"] = comm # replacing comm so gen sends HERE instead of manager self.libE_info["executor"] = Executor.executor + # SH - fix comment (thread and process & name object appropriately - task? qcomm?) # self.thread = QCommThread( # TRY A PROCESS # self.gen_f, # None, @@ -176,7 +171,10 @@ def setup(self) -> None: self.gen_specs, self.libE_info, user_function=True, - ) # note that self.thread's inbox/outbox are unused by the underlying gen + ) + + # SH this is a bit hacky - maybe it can be done inside comms (in _qcomm_main)? + self.libE_info["comm"] = self.thread.comm def _set_sim_ended(self, results: npt.NDArray) -> npt.NDArray: new_results = np.zeros(len(results), dtype=self.gen_specs["out"] + [("sim_ended", bool), ("f", float)]) @@ -197,19 +195,18 @@ def ask_numpy(self, num_points: int = 0) -> npt.NDArray: if self.thread is None: self.setup() self.thread.run() - _, ask_full = self.outbox.get() + _, ask_full = self.thread.recv() return ask_full["calc_out"] def tell_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: """Send the results of evaluations to the generator, as a NumPy array.""" if results is not None: results = self._set_sim_ended(results) - self.inbox.put( - (tag, {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}}) - ) - self.inbox.put((0, np.copy(results))) + Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}} + self.thread.send(tag, Work) + self.thread.send(20, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object else: - self.inbox.put((tag, None)) + self.thread.send(tag, None) def final_tell(self, results: npt.NDArray = None) -> (npt.NDArray, dict, int): """Send any last results to the generator, and it to close down.""" From b4b4462027c0cb52e2ab0b22e13f68dc8e3ab5e2 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 11 Nov 2024 09:03:22 -0600 Subject: [PATCH 2/8] Remove thread locked comm in executor --- libensemble/generators.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libensemble/generators.py b/libensemble/generators.py index 3fa08471a..b478757c1 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -150,6 +150,9 @@ def setup(self) -> None: # self.inbox = thread_queue.Queue() # sending betweween HERE and gen # self.outbox = thread_queue.Queue() + # SH this contains the thread lock - removing.... wrong comm to pass on anyway. + # SH what if not using executor - should be an IF here + del Executor.executor.comm self.libE_info["executor"] = Executor.executor # SH - fix comment (thread and process & name object appropriately - task? qcomm?) From 9a766c7fea27247b5d972ed6287a554de649a504 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 11 Nov 2024 09:29:06 -0600 Subject: [PATCH 3/8] Fix debugging tag --- libensemble/generators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/generators.py b/libensemble/generators.py index b478757c1..d6b197d77 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -207,7 +207,7 @@ def tell_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: results = self._set_sim_ended(results) Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}} self.thread.send(tag, Work) - self.thread.send(20, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object + self.thread.send(tag, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object else: self.thread.send(tag, None) From 019c8d72e1f52e713c2113d8d972e8a54568755a Mon Sep 17 00:00:00 2001 From: shudson Date: Wed, 13 Nov 2024 16:42:14 -0600 Subject: [PATCH 4/8] Remove unused import of QComm --- libensemble/generators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/generators.py b/libensemble/generators.py index d6b197d77..3b4c50e40 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -7,7 +7,7 @@ import numpy as np from numpy import typing as npt -from libensemble.comms.comms import QComm, QCommProcess # , QCommThread +from libensemble.comms.comms import QCommProcess # , QCommThread from libensemble.executors import Executor from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP from libensemble.tools.tools import add_unique_random_streams From 033c32d4325a55138c95427d18d67d09fcb37ff6 Mon Sep 17 00:00:00 2001 From: shudson Date: Wed, 13 Nov 2024 16:47:11 -0600 Subject: [PATCH 5/8] Add conditional code for executor forwarding --- libensemble/generators.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/libensemble/generators.py b/libensemble/generators.py index 3b4c50e40..d694c7a78 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -150,10 +150,11 @@ def setup(self) -> None: # self.inbox = thread_queue.Queue() # sending betweween HERE and gen # self.outbox = thread_queue.Queue() - # SH this contains the thread lock - removing.... wrong comm to pass on anyway. - # SH what if not using executor - should be an IF here - del Executor.executor.comm - self.libE_info["executor"] = Executor.executor + if Executor.executor is not None: + # SH this contains the thread lock - removing.... wrong comm to pass on anyway. + if hasattr(Executor.executor, "comm"): + del Executor.executor.comm + self.libE_info["executor"] = Executor.executor # SH - fix comment (thread and process & name object appropriately - task? qcomm?) # self.thread = QCommThread( # TRY A PROCESS From 74924d88120d8a40f62af862833a2fd5d7dae7dd Mon Sep 17 00:00:00 2001 From: shudson Date: Wed, 13 Nov 2024 18:47:22 -0600 Subject: [PATCH 6/8] Allow forwarding None executor --- libensemble/generators.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libensemble/generators.py b/libensemble/generators.py index d694c7a78..cae1f109e 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -150,11 +150,10 @@ def setup(self) -> None: # self.inbox = thread_queue.Queue() # sending betweween HERE and gen # self.outbox = thread_queue.Queue() - if Executor.executor is not None: - # SH this contains the thread lock - removing.... wrong comm to pass on anyway. - if hasattr(Executor.executor, "comm"): - del Executor.executor.comm - self.libE_info["executor"] = Executor.executor + # SH this contains the thread lock - removing.... wrong comm to pass on anyway. + if hasattr(Executor.executor, "comm"): + del Executor.executor.comm + self.libE_info["executor"] = Executor.executor # SH - fix comment (thread and process & name object appropriately - task? qcomm?) # self.thread = QCommThread( # TRY A PROCESS From db66ce6b3f158d138d5985127453dbbb8f1f7fa9 Mon Sep 17 00:00:00 2001 From: shudson Date: Thu, 14 Nov 2024 12:09:31 -0600 Subject: [PATCH 7/8] Remove extra setup() call --- libensemble/utils/runners.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 08d52a27e..5a11f7e09 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -136,11 +136,6 @@ def _start_generator_loop(self, tag, Work, H_in): def _persistent_result(self, calc_in, persis_info, libE_info): """Setup comms with manager, setup gen, loop gen to completion, return gen's results""" self.ps = PersistentSupport(libE_info, EVAL_GEN_TAG) - if hasattr(self.gen, "setup"): - self.gen.persis_info = persis_info # passthrough, setup() uses the gen attributes - self.gen.libE_info = libE_info - if self.gen.thread is None: - self.gen.setup() # maybe we're reusing a live gen from a previous run # libE gens will hit the following line, but list_dicts_to_np will passthrough if the output is a numpy array H_out = list_dicts_to_np(self._get_initial_ask(libE_info), dtype=self.specs.get("out")) tag, Work, H_in = self.ps.send_recv(H_out) # evaluate the initial sample From c323ef8d7e98b3d03903004a4b1eac538102374c Mon Sep 17 00:00:00 2001 From: shudson Date: Thu, 14 Nov 2024 12:10:06 -0600 Subject: [PATCH 8/8] Use correct outbox queue --- libensemble/utils/runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 5a11f7e09..3adab746a 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -173,7 +173,7 @@ def _get_initial_ask(self, libE_info) -> npt.NDArray: def _ask_and_send(self): """Loop over generator's outbox contents, send to manager""" - while self.gen.outbox.qsize(): # recv/send any outstanding messages + while self.gen.thread.outbox.qsize(): # recv/send any outstanding messages points, updates = self.gen.ask_numpy(), self.gen.ask_updates() if updates is not None and len(updates): self.ps.send(points)