Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/spawn with interfacer #1464

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions libensemble/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def _qcomm_main(comm, main, *args, **kwargs):
if not kwargs.get("user_function"):
_result = main(comm, *args, **kwargs)
else:
# SH - could we insert comm into libE_info["comm"] here if it exists
_result = main(*args)
comm.send(CommResult(_result))
except Exception as e:
Expand Down Expand Up @@ -264,8 +265,8 @@ def __init__(self, main, nworkers, *args, **kwargs):
self.inbox = Queue()
self.outbox = Queue()
super().__init__(self, main, *args, **kwargs)
comm = QComm(self.inbox, self.outbox, nworkers)
self.handle = Process(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs)
self.comm = QComm(self.inbox, self.outbox, nworkers)
self.handle = Process(target=_qcomm_main, args=(self.comm, main) + args, kwargs=kwargs)

def terminate(self, timeout=None):
"""Terminate the process."""
Expand Down
28 changes: 14 additions & 14 deletions libensemble/generators.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# import queue as thread_queue
from abc import ABC, abstractmethod
from multiprocessing import Manager

# from multiprocessing import Queue as process_queue
from typing import List, Optional

import numpy as np
from numpy import typing as npt

from libensemble.comms.comms import QComm, QCommProcess # , QCommThread
from libensemble.comms.comms import QCommProcess # , QCommThread
from libensemble.executors import Executor
from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP
from libensemble.tools.tools import add_unique_random_streams
Expand Down Expand Up @@ -150,14 +149,13 @@
"""Must be called once before calling ask/tell. Initializes the background thread."""
# self.inbox = thread_queue.Queue() # sending betweween HERE and gen
# self.outbox = thread_queue.Queue()
self.m = Manager()
self.inbox = self.m.Queue()
self.outbox = self.m.Queue()

comm = QComm(self.inbox, self.outbox)
self.libE_info["comm"] = comm # replacing comm so gen sends HERE instead of manager
# SH this contains the thread lock - removing.... wrong comm to pass on anyway.
if hasattr(Executor.executor, "comm"):
del Executor.executor.comm

Check warning on line 155 in libensemble/generators.py

View check run for this annotation

Codecov / codecov/patch

libensemble/generators.py#L155

Added line #L155 was not covered by tests
self.libE_info["executor"] = Executor.executor

# SH - fix comment (thread and process & name object appropriately - task? qcomm?)
# self.thread = QCommThread( # TRY A PROCESS
# self.gen_f,
# None,
Expand All @@ -176,7 +174,10 @@
self.gen_specs,
self.libE_info,
user_function=True,
) # note that self.thread's inbox/outbox are unused by the underlying gen
)

# SH this is a bit hacky - maybe it can be done inside comms (in _qcomm_main)?
self.libE_info["comm"] = self.thread.comm

def _set_sim_ended(self, results: npt.NDArray) -> npt.NDArray:
new_results = np.zeros(len(results), dtype=self.gen_specs["out"] + [("sim_ended", bool), ("f", float)])
Expand All @@ -197,19 +198,18 @@
if self.thread is None:
self.setup()
self.thread.run()
_, ask_full = self.outbox.get()
_, ask_full = self.thread.recv()
return ask_full["calc_out"]

def tell_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None:
"""Send the results of evaluations to the generator, as a NumPy array."""
if results is not None:
results = self._set_sim_ended(results)
self.inbox.put(
(tag, {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}})
)
self.inbox.put((0, np.copy(results)))
Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}}
self.thread.send(tag, Work)
self.thread.send(tag, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object
else:
self.inbox.put((tag, None))
self.thread.send(tag, None)

def final_tell(self, results: npt.NDArray = None) -> (npt.NDArray, dict, int):
"""Send any last results to the generator, and it to close down."""
Expand Down
7 changes: 1 addition & 6 deletions libensemble/utils/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ def _start_generator_loop(self, tag, Work, H_in):
def _persistent_result(self, calc_in, persis_info, libE_info):
"""Setup comms with manager, setup gen, loop gen to completion, return gen's results"""
self.ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
if hasattr(self.gen, "setup"):
self.gen.persis_info = persis_info # passthrough, setup() uses the gen attributes
self.gen.libE_info = libE_info
if self.gen.thread is None:
self.gen.setup() # maybe we're reusing a live gen from a previous run
# libE gens will hit the following line, but list_dicts_to_np will passthrough if the output is a numpy array
H_out = list_dicts_to_np(self._get_initial_ask(libE_info), dtype=self.specs.get("out"))
tag, Work, H_in = self.ps.send_recv(H_out) # evaluate the initial sample
Expand Down Expand Up @@ -178,7 +173,7 @@ def _get_initial_ask(self, libE_info) -> npt.NDArray:

def _ask_and_send(self):
"""Loop over generator's outbox contents, send to manager"""
while self.gen.outbox.qsize(): # recv/send any outstanding messages
while self.gen.thread.outbox.qsize(): # recv/send any outstanding messages
points, updates = self.gen.ask_numpy(), self.gen.ask_updates()
if updates is not None and len(updates):
self.ps.send(points)
Expand Down
Loading