From 024e91ccdf727f4f193e70f6292507b651c9b189 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Wed, 20 Mar 2024 20:02:17 -0700 Subject: [PATCH] feat: add support for lazy evaluation during simulation processing This feature refactors the parallel processing to use lazy evaluation When simulation data grows, keeping the data in memory causes memory bloat, which causes too Ram to be consumed. The refactor uses lazy_flattens to keep the memory Python consumes low, the change was able to reduce consumption by a factor of 10 in many cases. The change also writes temporary files to disk before lazily rearranging the simulation results to fit the cadCAD expected format for a regular pandas dataframe. --- cadCAD/engine/__init__.py | 54 +++++++++++++++++++++++------- cadCAD/engine/execution.py | 44 ++++++++++++++++++++---- cadCAD/tools/execution/easy_run.py | 6 +++- 3 files changed, 85 insertions(+), 19 deletions(-) diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 4dc33409..e18a0b9f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,8 +1,10 @@ +import itertools +from memory_profiler import profile from time import time -from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping +from typing import Callable, Dict, Generator, List, Any, Tuple, Union, Sequence, Mapping from tqdm.auto import tqdm -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten from cadCAD.utils.execution import print_exec_info from cadCAD.configuration import Configuration, Processor from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts @@ -80,6 +82,7 @@ def __init__(self, self.configs = configs self.empty_return = empty_return + @profile def execute(self) -> Tuple[object, object, Dict[str, object]]: if self.empty_return is True: return [], [], [] @@ -142,21 +145,44 @@ def get_final_dist_results(simulations: List[StateHistory], psu, ep) for psu, ep in list(zip(psus, eps))] return simulations, tensor_fields, sessions + def get_final_results_lazy(simulations: Generator, + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict], + remote_threshold: int): + is_generator: bool = isinstance(simulations, Generator) + if is_generator == False: + raise ValueError( + 'Invalid simulation results (Executor output is not a Generator required for lazy execution)') + + tensor_fields = [] + # NOTE here we change the result type to iterable + tensor_fields = itertools.chain.from_iterable( + map(create_tensor_field, zip(psus, eps))) + + flat_simulations = map( + lazy_flatten, map(lazy_flatten, simulations)) + + # NOTE here we change the result type, which is now an iterable + iterable_flat_simulations = itertools.chain.from_iterable( + flat_simulations) + + return iterable_flat_simulations, tensor_fields, sessions + def get_final_results(simulations: List[StateHistory], psus: List[StateUpdateBlocks], eps, sessions: List[SessionDict], remote_threshold: int): - + # if list of lists of lists of dicts: do flatten # if list of dicts: do not flatetn # else raise error - init: bool = isinstance(simulations, Sequence) failed_1 = False failed_2 = False - + try: init: bool = isinstance(simulations, Sequence) dont_flatten = init & isinstance(simulations[0], Mapping) @@ -174,8 +200,8 @@ def get_final_results(simulations: List[StateHistory], do_flatten = False if failed_1 and failed_2: - raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') - + raise ValueError( + 'Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') flat_timesteps, tensor_fields = [], [] for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)), @@ -184,7 +210,7 @@ def get_final_results(simulations: List[StateHistory], if do_flatten: flat_timesteps.append(flatten(sim_result)) tensor_fields.append(create_tensor_field(psu, ep)) - + if do_flatten: flat_simulations = flatten(flat_timesteps) else: @@ -209,15 +235,19 @@ def get_final_results(simulations: List[StateHistory], else: raise ValueError("Invalid execution mode specified") - print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs ) - final_result = get_final_results( - simulations_results, partial_state_updates, eps, sessions, remote_threshold) + if (self.additional_objs is not None and self.additional_objs['lazy_eval']): + final_result = get_final_results_lazy( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + else: + final_result = get_final_results( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + elif self.exec_context == ExecutionMode.distributed: print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( @@ -228,6 +258,6 @@ def get_final_results(simulations: List[StateHistory], simulations_results, partial_state_updates, eps, sessions) t2 = time() - print(f"Total execution time: {t2 - t1 :.2f}s") + print(f"Total execution time: {t2 - t1:.2f}s") return final_result diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index a88ed53c..29073f4b 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,11 +1,14 @@ import os -from typing import Callable, Dict, List, Any, Tuple, Sequence +from typing import Callable, Dict, Generator, List, Any, Tuple, Sequence from pathos.multiprocessing import ProcessPool # type: ignore from collections import Counter from cadCAD.types import * -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten import tempfile import pickle +import sys +from pympler import asizeof +from memory_profiler import profile import dill VarDictType = Dict[str, List[object]] @@ -51,6 +54,15 @@ def single_proc_exec( def process_executor(params): simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + result = [simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + )] + return result + + +def process_executor_disk(params): + simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + result = [simulation_exec( var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n )] @@ -60,7 +72,20 @@ def process_executor(params): return temp_file.name -def file_handler(filenames: List[str]) -> List: +@profile +def file_handler_inc(filenames: List[str]) -> Generator[List, None, None]: + # combined_results = [] + for file_name in filenames: + with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode + result = dill.load(f) + yield result # Yield the loaded result for immediate processing + + f.close() + os.remove(file_name) # Clean up temporary file + + +@profile +def file_handler(filenames: List[str]) -> Generator[List, None, None]: combined_results = [] for file_name in filenames: with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode @@ -69,10 +94,10 @@ def file_handler(filenames: List[str]) -> List: result = None f.close() os.remove(file_name) # Clean up temporary file - del result # Delete the result from memory after processing return combined_results +@profile def parallelize_simulations( simulation_execs: List[ExecutorFunction], var_dict_list: List[Parameters], @@ -90,6 +115,7 @@ def parallelize_simulations( ): print(f'Execution Mode: parallelized') + lazy_eval = additional_objs['lazy_eval'] params = [ (sim_exec, var_dict, states_list, config, env_processes, @@ -99,10 +125,16 @@ def parallelize_simulations( env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows) ] + if (lazy_eval): + with ProcessPool(maxtasksperchild=1) as pool: + temp_files = pool.map(process_executor_disk, params) + generator = file_handler_inc(temp_files) + return lazy_flatten(generator) + with ProcessPool(maxtasksperchild=1) as pool: - temp_files = pool.map(process_executor, params) + results = pool.map(process_executor, params) - return flatten(file_handler(temp_files)) + return flatten(results) def local_simulations( diff --git a/cadCAD/tools/execution/easy_run.py b/cadCAD/tools/execution/easy_run.py index 5ea737e1..c1fda147 100644 --- a/cadCAD/tools/execution/easy_run.py +++ b/cadCAD/tools/execution/easy_run.py @@ -42,6 +42,7 @@ def easy_run( drop_substeps=True, exec_mode='local', deepcopy_off=False, + lazy_eval=False ) -> pd.DataFrame: """ Run cadCAD simulations without headaches. @@ -66,7 +67,10 @@ def easy_run( _exec_mode = ExecutionMode().local_mode elif exec_mode == 'single': _exec_mode = ExecutionMode().single_mode - exec_context = ExecutionContext(_exec_mode, additional_objs={'deepcopy_off': deepcopy_off}) + exec_context = ExecutionContext(_exec_mode, additional_objs={ + 'deepcopy_off': deepcopy_off, + 'lazy_eval': lazy_eval + }) executor = Executor(exec_context=exec_context, configs=configs) # Execute the cadCAD experiment