Skip to content

Commit

Permalink
feat: add support for lazy evaluation during simulation processing
Browse files Browse the repository at this point in the history
This feature refactors the parallel processing to use lazy evaluation
When simulation data grows, keeping the data in memory causes
memory bloat, which causes too Ram to be consumed.

The refactor uses lazy_flattens to keep the memory Python consumes
low, the change was able to reduce consumption by a factor of 10
in many cases.

The change also writes temporary files to disk before lazily
rearranging the simulation results to fit the cadCAD expected format
for a regular pandas dataframe.
  • Loading branch information
zcstarr committed Mar 21, 2024
1 parent 996dd5c commit 024e91c
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 19 deletions.
54 changes: 42 additions & 12 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import itertools
from memory_profiler import profile
from time import time
from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping
from typing import Callable, Dict, Generator, List, Any, Tuple, Union, Sequence, Mapping
from tqdm.auto import tqdm

from cadCAD.utils import flatten
from cadCAD.utils import flatten, lazy_flatten
from cadCAD.utils.execution import print_exec_info
from cadCAD.configuration import Configuration, Processor
from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts
Expand Down Expand Up @@ -80,6 +82,7 @@ def __init__(self,
self.configs = configs
self.empty_return = empty_return

@profile
def execute(self) -> Tuple[object, object, Dict[str, object]]:
if self.empty_return is True:
return [], [], []
Expand Down Expand Up @@ -142,21 +145,44 @@ def get_final_dist_results(simulations: List[StateHistory],
psu, ep) for psu, ep in list(zip(psus, eps))]
return simulations, tensor_fields, sessions

def get_final_results_lazy(simulations: Generator,
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict],
remote_threshold: int):
is_generator: bool = isinstance(simulations, Generator)
if is_generator == False:
raise ValueError(
'Invalid simulation results (Executor output is not a Generator required for lazy execution)')

tensor_fields = []
# NOTE here we change the result type to iterable
tensor_fields = itertools.chain.from_iterable(
map(create_tensor_field, zip(psus, eps)))

flat_simulations = map(
lazy_flatten, map(lazy_flatten, simulations))

# NOTE here we change the result type, which is now an iterable
iterable_flat_simulations = itertools.chain.from_iterable(
flat_simulations)

return iterable_flat_simulations, tensor_fields, sessions

def get_final_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict],
remote_threshold: int):

# if list of lists of lists of dicts: do flatten
# if list of dicts: do not flatetn
# else raise error


init: bool = isinstance(simulations, Sequence)
failed_1 = False
failed_2 = False

try:
init: bool = isinstance(simulations, Sequence)
dont_flatten = init & isinstance(simulations[0], Mapping)
Expand All @@ -174,8 +200,8 @@ def get_final_results(simulations: List[StateHistory],
do_flatten = False

if failed_1 and failed_2:
raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])')

raise ValueError(
'Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])')

flat_timesteps, tensor_fields = [], []
for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)),
Expand All @@ -184,7 +210,7 @@ def get_final_results(simulations: List[StateHistory],
if do_flatten:
flat_timesteps.append(flatten(sim_result))
tensor_fields.append(create_tensor_field(psu, ep))

if do_flatten:
flat_simulations = flatten(flat_timesteps)
else:
Expand All @@ -209,15 +235,19 @@ def get_final_results(simulations: List[StateHistory],
else:
raise ValueError("Invalid execution mode specified")


print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs
)

final_result = get_final_results(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)
if (self.additional_objs is not None and self.additional_objs['lazy_eval']):
final_result = get_final_results_lazy(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)
else:
final_result = get_final_results(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)

elif self.exec_context == ExecutionMode.distributed:
print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
Expand All @@ -228,6 +258,6 @@ def get_final_results(simulations: List[StateHistory],
simulations_results, partial_state_updates, eps, sessions)

t2 = time()
print(f"Total execution time: {t2 - t1 :.2f}s")
print(f"Total execution time: {t2 - t1:.2f}s")

return final_result
44 changes: 38 additions & 6 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
from typing import Callable, Dict, List, Any, Tuple, Sequence
from typing import Callable, Dict, Generator, List, Any, Tuple, Sequence
from pathos.multiprocessing import ProcessPool # type: ignore
from collections import Counter
from cadCAD.types import *
from cadCAD.utils import flatten
from cadCAD.utils import flatten, lazy_flatten
import tempfile
import pickle
import sys
from pympler import asizeof
from memory_profiler import profile
import dill

VarDictType = Dict[str, List[object]]
Expand Down Expand Up @@ -51,6 +54,15 @@ def single_proc_exec(
def process_executor(params):
simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params

result = [simulation_exec(
var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
)]
return result


def process_executor_disk(params):
simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params

result = [simulation_exec(
var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
)]
Expand All @@ -60,7 +72,20 @@ def process_executor(params):
return temp_file.name


def file_handler(filenames: List[str]) -> List:
@profile
def file_handler_inc(filenames: List[str]) -> Generator[List, None, None]:
# combined_results = []
for file_name in filenames:
with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode
result = dill.load(f)
yield result # Yield the loaded result for immediate processing

f.close()
os.remove(file_name) # Clean up temporary file


@profile
def file_handler(filenames: List[str]) -> Generator[List, None, None]:
combined_results = []
for file_name in filenames:
with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode
Expand All @@ -69,10 +94,10 @@ def file_handler(filenames: List[str]) -> List:
result = None
f.close()
os.remove(file_name) # Clean up temporary file
del result # Delete the result from memory after processing
return combined_results


@profile
def parallelize_simulations(
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
Expand All @@ -90,6 +115,7 @@ def parallelize_simulations(
):

print(f'Execution Mode: parallelized')
lazy_eval = additional_objs['lazy_eval']

params = [
(sim_exec, var_dict, states_list, config, env_processes,
Expand All @@ -99,10 +125,16 @@ def parallelize_simulations(
env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows)
]

if (lazy_eval):
with ProcessPool(maxtasksperchild=1) as pool:
temp_files = pool.map(process_executor_disk, params)
generator = file_handler_inc(temp_files)
return lazy_flatten(generator)

with ProcessPool(maxtasksperchild=1) as pool:
temp_files = pool.map(process_executor, params)
results = pool.map(process_executor, params)

return flatten(file_handler(temp_files))
return flatten(results)


def local_simulations(
Expand Down
6 changes: 5 additions & 1 deletion cadCAD/tools/execution/easy_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def easy_run(
drop_substeps=True,
exec_mode='local',
deepcopy_off=False,
lazy_eval=False
) -> pd.DataFrame:
"""
Run cadCAD simulations without headaches.
Expand All @@ -66,7 +67,10 @@ def easy_run(
_exec_mode = ExecutionMode().local_mode
elif exec_mode == 'single':
_exec_mode = ExecutionMode().single_mode
exec_context = ExecutionContext(_exec_mode, additional_objs={'deepcopy_off': deepcopy_off})
exec_context = ExecutionContext(_exec_mode, additional_objs={
'deepcopy_off': deepcopy_off,
'lazy_eval': lazy_eval
})
executor = Executor(exec_context=exec_context, configs=configs)

# Execute the cadCAD experiment
Expand Down

0 comments on commit 024e91c

Please sign in to comment.