Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asktell/various fixes #1481

Merged
merged 9 commits into from
Dec 10, 2024
1 change: 0 additions & 1 deletion libensemble/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ def _qcomm_main(comm, main, *args, **kwargs):
if not kwargs.get("user_function"):
_result = main(comm, *args, **kwargs)
else:
# SH - could we insert comm into libE_info["comm"] here if it exists
_result = main(*args)
comm.send(CommResult(_result))
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions libensemble/gen_classes/aposmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import numpy as np
from numpy import typing as npt

from libensemble.generators import LibensembleGenThreadInterfacer
from libensemble.generators import PersistentGenInterfacer
from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP


class APOSMM(LibensembleGenThreadInterfacer):
class APOSMM(PersistentGenInterfacer):
"""
Standalone object-oriented APOSMM generator
"""
Expand Down
50 changes: 23 additions & 27 deletions libensemble/generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ def ask(self, num_points: Optional[int]) -> List[dict]:
Request the next set of points to evaluate.
"""

def ask_updates(self) -> List[npt.NDArray]:
"""
Request any updates to previous points, e.g. minima discovered, points to cancel.
"""

def tell(self, results: List[dict]) -> None:
"""
Send the results of evaluations to the generator.
Expand Down Expand Up @@ -125,15 +120,14 @@ def __init__(

self.n = len(self.variables)
# build our own lb and ub
if "lb" not in kwargs and "ub" not in kwargs:
lb = []
ub = []
for i, v in enumerate(self.variables.values()):
if isinstance(v, list) and (isinstance(v[0], int) or isinstance(v[0], float)):
lb.append(v[0])
ub.append(v[1])
kwargs["lb"] = np.array(lb)
kwargs["ub"] = np.array(ub)
lb = []
ub = []
for i, v in enumerate(self.variables.values()):
if isinstance(v, list) and (isinstance(v[0], int) or isinstance(v[0], float)):
lb.append(v[0])
ub.append(v[1])
kwargs["lb"] = np.array(lb)
kwargs["ub"] = np.array(ub)

if len(kwargs) > 0: # so user can specify gen-specific parameters as kwargs to constructor
if not self.gen_specs.get("user"):
Expand Down Expand Up @@ -170,7 +164,7 @@ def tell(self, results: List[dict]) -> None:
self.tell_numpy(list_dicts_to_np(results, mapping=self.variables_mapping))


class LibensembleGenThreadInterfacer(LibensembleGenerator):
class PersistentGenInterfacer(LibensembleGenerator):
"""Implement ask/tell for traditionally written libEnsemble persistent generator functions.
Still requires a handful of libEnsemble-specific data-structures on initialization.
"""
Expand All @@ -189,18 +183,18 @@ def __init__(
self.gen_f = gen_specs["gen_f"]
self.History = History
self.libE_info = libE_info
self.thread = None
self.running_gen_f = None

def setup(self) -> None:
"""Must be called once before calling ask/tell. Initializes the background thread."""
if self.thread is not None:
if self.running_gen_f is not None:
return
# SH this contains the thread lock - removing.... wrong comm to pass on anyway.
if hasattr(Executor.executor, "comm"):
del Executor.executor.comm
self.libE_info["executor"] = Executor.executor

self.thread = QCommProcess( # TRY A PROCESS
self.running_gen_f = QCommProcess(
self.gen_f,
None,
self.History,
Expand All @@ -210,8 +204,8 @@ def setup(self) -> None:
user_function=True,
)

# SH this is a bit hacky - maybe it can be done inside comms (in _qcomm_main)?
self.libE_info["comm"] = self.thread.comm
# this is okay since the object isnt started until the first ask
self.libE_info["comm"] = self.running_gen_f.comm

def _set_sim_ended(self, results: npt.NDArray) -> npt.NDArray:
new_results = np.zeros(len(results), dtype=self.gen_specs["out"] + [("sim_ended", bool), ("f", float)])
Expand All @@ -229,23 +223,25 @@ def tell(self, results: List[dict], tag: int = EVAL_GEN_TAG) -> None:

def ask_numpy(self, num_points: int = 0) -> npt.NDArray:
"""Request the next set of points to evaluate, as a NumPy array."""
if self.thread is None:
if self.running_gen_f is None:
self.setup()
self.thread.run()
_, ask_full = self.thread.recv()
self.running_gen_f.run()
_, ask_full = self.running_gen_f.recv()
return ask_full["calc_out"]

def tell_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None:
"""Send the results of evaluations to the generator, as a NumPy array."""
if results is not None:
results = self._set_sim_ended(results)
Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}}
self.thread.send(tag, Work)
self.thread.send(tag, np.copy(results)) # SH for threads check - might need deepcopy due to dtype=object
self.running_gen_f.send(tag, Work)
self.running_gen_f.send(
tag, np.copy(results)
) # SH for threads check - might need deepcopy due to dtype=object
else:
self.thread.send(tag, None)
self.running_gen_f.send(tag, None)

def final_tell(self, results: npt.NDArray = None) -> (npt.NDArray, dict, int):
"""Send any last results to the generator, and it to close down."""
self.tell_numpy(results, PERSIS_STOP) # conversion happens in tell
return self.thread.result()
return self.running_gen_f.result()
139 changes: 92 additions & 47 deletions libensemble/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,8 @@ def specs_checker_setattr(obj, key, value):
obj.__dict__[key] = value


def _decide_dtype(name: str, entry, size: int) -> tuple:
if isinstance(entry, str):
output_type = "U" + str(len(entry) + 1)
else:
output_type = type(entry)
if size == 1 or not size:
return (name, output_type)
else:
return (name, output_type, (size,))


def _combine_names(names: list) -> list:
"""combine fields with same name *except* for final digits"""

out_names = []
stripped = list(i.rstrip("0123456789") for i in names) # ['x', 'x', y', 'z', 'a']
for name in names:
Expand All @@ -108,41 +96,87 @@ def _combine_names(names: list) -> list:
return list(set(out_names))


def _get_new_dtype_fields(first: dict, mapping: dict = {}) -> list:
"""build list of fields that will be in the output numpy array"""
new_dtype_names = _combine_names([i for i in first.keys()]) # -> ['x', 'y']
fields_to_convert = list( # combining all mapping lists
chain.from_iterable(list(mapping.values()))
) # fields like ["beam_length", "beam_width"] that will become "x"
new_dtype_names = [i for i in new_dtype_names if i not in fields_to_convert] + list(
mapping.keys()
) # array dtype needs "x". avoid fields from mapping values since we're converting those to "x"
return new_dtype_names


def _get_combinable_multidim_names(first: dict, new_dtype_names: list) -> list:
"""inspect the input dict for fields that can be combined (e.g. x0, x1)"""
combinable_names = []
for name in new_dtype_names:
combinable_group = [i for i in first.keys() if i.rstrip("0123456789") == name]
if len(combinable_group) > 1: # multiple similar names, e.g. x0, x1
combinable_names.append(combinable_group)
else: # single name, e.g. local_pt, a0 *AS LONG AS THERE ISNT AN A1*
combinable_names.append([name])
return combinable_names


def _decide_dtype(name: str, entry, size: int) -> tuple:
"""decide dtype of field, and size if needed"""
if isinstance(entry, str): # use numpy style for string type
output_type = "U" + str(len(entry) + 1)
else:
output_type = type(entry) # use default "python" type
if size == 1 or not size:
return (name, output_type)
else:
return (name, output_type, (size,)) # 3-tuple for multi-dimensional


def _start_building_dtype(
first: dict, new_dtype_names: list, combinable_names: list, dtype: list, mapping: dict
) -> list:
"""parse out necessary components of dtype for output numpy array"""
for i, entry in enumerate(combinable_names):
name = new_dtype_names[i]
size = len(combinable_names[i]) # e.g. 2 for [x0, x1]
if name not in mapping: # mapping keys are what we're converting *to*
dtype.append(_decide_dtype(name, first[entry[0]], size))
return dtype


def _pack_field(input_dict: dict, field_names: list) -> tuple:
"""pack dict data into tuple for slotting into numpy array"""
# {"x0": 1, "x1": 2} -> (1, 2)
return tuple(input_dict[name] for name in field_names) if len(field_names) > 1 else input_dict[field_names[0]]


def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) -> npt.NDArray:
if list_dicts is None:
return None

if not isinstance(list_dicts, list): # presumably already a numpy array, conversion not necessary
return list_dicts

# entering gen: convert _id to sim_id
for entry in list_dicts:
if "_id" in entry:
entry["sim_id"] = entry.pop("_id")

if dtype is None:
dtype = []
# first entry is used to determine dtype
first = list_dicts[0]

# build a presumptive dtype
new_dtype_names = _get_new_dtype_fields(first, mapping)
combinable_names = _get_combinable_multidim_names(first, new_dtype_names) # [['x0', 'x1'], ['z']]

first = list_dicts[0] # for determining dtype of output np array
new_dtype_names = _combine_names([i for i in first.keys()]) # -> ['x', 'y']
fields_to_convert = list(chain.from_iterable(list(mapping.values())))
new_dtype_names = [i for i in new_dtype_names if i not in fields_to_convert] + list(mapping.keys())
combinable_names = [] # [['x0', 'x1'], ['y0', 'y1', 'y2'], ['z']]
for name in new_dtype_names:
combinable_group = [i for i in first.keys() if i.rstrip("0123456789") == name]
if len(combinable_group) > 1: # multiple similar names, e.g. x0, x1
combinable_names.append(combinable_group)
else: # single name, e.g. local_pt, a0 *AS LONG AS THERE ISNT AN A1*
combinable_names.append([name])
if (
dtype is None
): # rather roundabout. I believe default value gets set upon function instantiation. (default is mutable!)
dtype = []

# build dtype of non-mapped fields
# build dtype of non-mapped fields. appending onto empty dtype
if not len(dtype):
for i, entry in enumerate(combinable_names):
name = new_dtype_names[i]
size = len(combinable_names[i])
if name not in mapping:
dtype.append(_decide_dtype(name, first[entry[0]], size))
dtype = _start_building_dtype(first, new_dtype_names, combinable_names, dtype, mapping)

# append dtype of mapped float fields
if len(mapping):
Expand All @@ -152,46 +186,57 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) -

out = np.zeros(len(list_dicts), dtype=dtype)

# starting packing data from list of dicts into array
for j, input_dict in enumerate(list_dicts):
for output_name, field_names in zip(new_dtype_names, combinable_names):
for output_name, input_names in zip(new_dtype_names, combinable_names): # [('x', ['x0', 'x1']), ...]
if output_name not in mapping:
out[output_name][j] = (
tuple(input_dict[name] for name in field_names)
if len(field_names) > 1
else input_dict[field_names[0]]
)
out[output_name][j] = _pack_field(input_dict, input_names)
else:
out[output_name][j] = (
tuple(input_dict[name] for name in mapping[output_name])
if len(mapping[output_name]) > 1
else input_dict[mapping[output_name][0]]
)

out[output_name][j] = _pack_field(input_dict, mapping[output_name])
return out


def _is_multidim(selection: npt.NDArray) -> bool:
return hasattr(selection, "__len__") and len(selection) > 1 and not isinstance(selection, str)


def _is_singledim(selection: npt.NDArray) -> bool:
return hasattr(selection, "__len__") and len(selection) == 1


def np_to_list_dicts(array: npt.NDArray, mapping: dict = {}) -> List[dict]:
if array is None:
return None
out = []

for row in array:
new_dict = {}

for field in row.dtype.names:
# non-string arrays, lists, etc.

if field not in list(mapping.keys()):
if hasattr(row[field], "__len__") and len(row[field]) > 1 and not isinstance(row[field], str):
if _is_multidim(row[field]):
for i, x in enumerate(row[field]):
new_dict[field + str(i)] = x
elif hasattr(row[field], "__len__") and len(row[field]) == 1: # single-entry arrays, lists, etc.

elif _is_singledim(row[field]): # single-entry arrays, lists, etc.
new_dict[field] = row[field][0] # will still work on single-char strings

else:
new_dict[field] = row[field]
else:
assert array.dtype[field].shape[0] == len(mapping[field]), "unable to unpack multidimensional array"

else: # keys from mapping and array unpacked into corresponding fields in dicts
assert array.dtype[field].shape[0] == len(mapping[field]), (
"dimension mismatch between mapping and array with field " + field
)

for i, name in enumerate(mapping[field]):
new_dict[name] = row[field][i]

out.append(new_dict)

# exiting gen: convert sim_id to _id
for entry in out:
if "sim_id" in entry:
entry["_id"] = entry.pop("sim_id")
Expand Down
19 changes: 14 additions & 5 deletions libensemble/utils/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy.typing as npt

from libensemble.comms.comms import QCommThread
from libensemble.generators import LibensembleGenerator, LibensembleGenThreadInterfacer
from libensemble.generators import LibensembleGenerator, PersistentGenInterfacer
from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG
from libensemble.tools.persistent_support import PersistentSupport
from libensemble.utils.misc import list_dicts_to_np, np_to_list_dicts
Expand All @@ -23,7 +23,7 @@
if specs.get("threaded"):
return ThreadRunner(specs)
if (generator := specs.get("generator")) is not None:
if isinstance(generator, LibensembleGenThreadInterfacer):
if isinstance(generator, PersistentGenInterfacer):
return LibensembleGenThreadRunner(specs)
if isinstance(generator, LibensembleGenerator):
return LibensembleGenRunner(specs)
Expand Down Expand Up @@ -160,7 +160,12 @@
return H_out

def _get_points_updates(self, batch_size: int) -> (npt.NDArray, list):
return self.gen.ask_numpy(batch_size), self.gen.ask_updates()
numpy_out = self.gen.ask_numpy(batch_size)
if callable(getattr(self.gen, "ask_updates", None)):
updates = self.gen.ask_updates()

Check warning on line 165 in libensemble/utils/runners.py

View check run for this annotation

Codecov / codecov/patch

libensemble/utils/runners.py#L165

Added line #L165 was not covered by tests
else:
updates = None
return numpy_out, updates

def _convert_tell(self, x: npt.NDArray) -> list:
self.gen.tell_numpy(x)
Expand All @@ -178,8 +183,12 @@

def _ask_and_send(self):
"""Loop over generator's outbox contents, send to manager"""
while not self.gen.thread.outbox.empty(): # recv/send any outstanding messages
points, updates = self.gen.ask_numpy(), self.gen.ask_updates()
while not self.gen.running_gen_f.outbox.empty(): # recv/send any outstanding messages
points = self.gen.ask_numpy()
if callable(getattr(self.gen, "ask_updates", None)):
updates = self.gen.ask_updates()
else:
updates = None

Check warning on line 191 in libensemble/utils/runners.py

View check run for this annotation

Codecov / codecov/patch

libensemble/utils/runners.py#L191

Added line #L191 was not covered by tests
if updates is not None and len(updates):
self.ps.send(points)
for i in updates:
Expand Down
Loading