Skip to content

Commit

Permalink
Merge branch 'experimental/jlnav_plus_shuds_asktell' into asktell/var…
Browse files Browse the repository at this point in the history
…iables_objectives
  • Loading branch information
jlnav committed Nov 14, 2024
2 parents 99a7a2c + 06c14d7 commit bc1587e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 18 deletions.
5 changes: 3 additions & 2 deletions libensemble/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def _qcomm_main(comm, main, *args, **kwargs):
if not kwargs.get("user_function"):
_result = main(comm, *args, **kwargs)
else:
# SH - could we insert comm into libE_info["comm"] here if it exists
_result = main(*args)
comm.send(CommResult(_result))
except Exception as e:
Expand Down Expand Up @@ -264,8 +265,8 @@ def __init__(self, main, nworkers, *args, **kwargs):
self.inbox = Queue()
self.outbox = Queue()
super().__init__(self, main, *args, **kwargs)
comm = QComm(self.inbox, self.outbox, nworkers)
self.handle = Process(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs)
self.comm = QComm(self.inbox, self.outbox, nworkers)
self.handle = Process(target=_qcomm_main, args=(self.comm, main) + args, kwargs=kwargs)

def terminate(self, timeout=None):
"""Terminate the process."""
Expand Down
28 changes: 13 additions & 15 deletions libensemble/generators.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# import queue as thread_queue
from abc import ABC, abstractmethod
from multiprocessing import Manager

# from multiprocessing import Queue as process_queue
from typing import List, Optional

import numpy as np
from numpy import typing as npt

from libensemble.comms.comms import QComm, QCommProcess # , QCommThread
from libensemble.comms.comms import QCommProcess # , QCommThread
from libensemble.executors import Executor
from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP
from libensemble.tools.tools import add_unique_random_streams
Expand Down Expand Up @@ -196,12 +195,9 @@ def setup(self) -> None:
"""Must be called once before calling ask/tell. Initializes the background thread."""
if self.thread is not None:
return

Check warning on line 197 in libensemble/generators.py

View check run for this annotation

Codecov / codecov/patch

libensemble/generators.py#L197

Added line #L197 was not covered by tests
self.m = Manager()
self.inbox = self.m.Queue()
self.outbox = self.m.Queue()

comm = QComm(self.inbox, self.outbox)
self.libE_info["comm"] = comm # replacing comm so gen sends HERE instead of manager
# SH this contains the thread lock - removing.... wrong comm to pass on anyway.
if hasattr(Executor.executor, "comm"):
del Executor.executor.comm
self.libE_info["executor"] = Executor.executor

self.thread = QCommProcess( # TRY A PROCESS
Expand All @@ -212,7 +208,10 @@ def setup(self) -> None:
self.gen_specs,
self.libE_info,
user_function=True,
) # note that self.thread's inbox/outbox are unused by the underlying gen
)

# SH this is a bit hacky - maybe it can be done inside comms (in _qcomm_main)?
self.libE_info["comm"] = self.thread.comm

def _set_sim_ended(self, results: npt.NDArray) -> npt.NDArray:
new_results = np.zeros(len(results), dtype=self.gen_specs["out"] + [("sim_ended", bool), ("f", float)])
Expand All @@ -233,19 +232,18 @@ def ask_numpy(self, num_points: int = 0) -> npt.NDArray:
if self.thread is None:
self.setup()
self.thread.run()
_, ask_full = self.outbox.get()
_, ask_full = self.thread.recv()
return ask_full["calc_out"]

def tell_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None:
"""Send the results of evaluations to the generator, as a NumPy array."""
if results is not None:
results = self._set_sim_ended(results)
self.inbox.put(
(tag, {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}})
)
self.inbox.put((0, np.copy(results)))
Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}}
self.thread.send(tag, Work)
self.thread.send(tag, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object
else:
self.inbox.put((tag, None))
self.thread.send(tag, None)

def final_tell(self, results: npt.NDArray = None) -> (npt.NDArray, dict, int):
"""Send any last results to the generator, and it to close down."""
Expand Down
2 changes: 1 addition & 1 deletion libensemble/utils/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def _get_initial_ask(self, libE_info) -> npt.NDArray:

def _ask_and_send(self):
"""Loop over generator's outbox contents, send to manager"""
while self.gen.outbox.qsize(): # recv/send any outstanding messages
while not self.gen.thread.outbox.empty(): # recv/send any outstanding messages
points, updates = self.gen.ask_numpy(), self.gen.ask_updates()
if updates is not None and len(updates):
self.ps.send(points)
Expand Down

0 comments on commit bc1587e

Please sign in to comment.