diff --git a/.github/workflows/python_installation.yml b/.github/workflows/python_installation.yml index 1496c5f743..efafb034bb 100644 --- a/.github/workflows/python_installation.yml +++ b/.github/workflows/python_installation.yml @@ -34,6 +34,11 @@ jobs: conda config --append channels conda-forge conda config --set show_channel_urls true conda install lhapdf pandoc + - name: Install MongoDB for parallel hyperopts + shell: bash -l {0} + run: | + conda install mongodb + mongod --version - name: Install nnpdf with testing and qed extras shell: bash -l {0} run: | diff --git a/conda-recipe/meta.yaml b/conda-recipe/meta.yaml index 3c49ccc2f9..f1a22deec2 100644 --- a/conda-recipe/meta.yaml +++ b/conda-recipe/meta.yaml @@ -32,6 +32,8 @@ requirements: - psutil # to ensure n3fit affinity is with the right processors - blas==1.0 *mkl* # [osx] # Host's blas is mkl, force also runtime blas to be - hyperopt + - mongodb + - pymongo <4 - seaborn - lhapdf - sqlite @@ -57,7 +59,8 @@ requirements: - pineappl >=0.6.2 - eko >=0.14.1 - fiatlux - - curio >=1.0 # reportengine uses it but it's not in its dependencies + - frozendict # needed for caching of data loading + - curio >=1.0 # reportengine uses it but it's not in its dependencies test: requires: diff --git a/doc/sphinx/source/n3fit/hyperopt.rst b/doc/sphinx/source/n3fit/hyperopt.rst index 7eb5ae2343..9ee0cdf971 100644 --- a/doc/sphinx/source/n3fit/hyperopt.rst +++ b/doc/sphinx/source/n3fit/hyperopt.rst @@ -386,3 +386,44 @@ To achieve this, you can use the ``--restart`` option within the ``n3fit`` comma The above command example is effective when the number of saved trials in the ``test_run/nnfit/replica_1/tries.pkl`` is less than ``20``. If there are ``20`` or more saved trials, ``n3fit`` will simply terminate, displaying the best results. + + +Running hyperoptimizations in parallel with MongoDB +--------------------------------------------------- + +In NNPDF, you can effectively run hyperoptimization experiments in parallel using `MongoDB `_. +This functionality is provided by the :class:`~n3fit.hyper_optimization.mongofiletrials.MongoFileTrials` class, +which extends the capabilities of `hyperopt `_'s `MongoTrials` and enables the +simultaneous evaluation of multiple trials. + +To set up and run a parallelized hyperopt search, follow these steps: + + 1. **Instantiate the MongoDB database:** Start by setting up the database in your current directory. + This database is referred to as ``hyperopt-db`` in the following instructions. You can initiate it with the command: + + .. code-block:: bash + + mongod --dbpath ./hyperopt-db + + By default, ``mongod`` uses port ``27017``. This is also the default port for the ``n3fit --db-port`` option. + If you wish to use a different port, specify it as follows: ``mongod --dbpath ./hyperopt-db --db-port YOUR_PORT_NUMBER``. + + 2. **Launch NNPDF with MongoDB integration:** Open a new command prompt and run ``n3fit`` with the desired configuration: + + .. code-block:: bash + + n3fit hyper-quickcard.yml 1 -r N_replicas --hyperopt N_trials --parallel-hyperopt --num-mongo-workers N + + Here, ``N`` represents the number of MongoDB workers you wish to launch in parallel. + Each mongo worker handles one trial in Hyperopt. So, launching more workers allows for the simultaneous calculation of a greater number of trials. + Note that there is no need to manually launch mongo workers, as the ``hyperopt-mongo-worker`` command is automatically + executed by the :meth:`~n3fit.hyper_optimization.mongofiletrials.MongoFileTrials.start_mongo_workers` method. + By default, the ``host`` argument is set to ``localhost``, and the database is named ``hyperopt``. + If necessary, you can modify these settings using the ``n3fit --db-host`` or ``n3fit --db-name`` options. + + +.. note:: + + Unlike in serial execution, parallel hyperoptimization runs do not generate ``tries.pkl`` files. + To resume an experiment, simply retain the MongoDB database created during your previous run. + Then, follow steps 1 and 2 as described above to restart the experiment. diff --git a/n3fit/src/n3fit/backends/__init__.py b/n3fit/src/n3fit/backends/__init__.py index 3676dd25d7..e48c4a856f 100644 --- a/n3fit/src/n3fit/backends/__init__.py +++ b/n3fit/src/n3fit/backends/__init__.py @@ -15,6 +15,7 @@ ) from n3fit.backends.keras_backend.internal_state import ( clear_backend_state, + get_physical_gpus, set_eager, set_initial_state, ) diff --git a/n3fit/src/n3fit/backends/keras_backend/internal_state.py b/n3fit/src/n3fit/backends/keras_backend/internal_state.py index 6cfc921c68..e818716940 100644 --- a/n3fit/src/n3fit/backends/keras_backend/internal_state.py +++ b/n3fit/src/n3fit/backends/keras_backend/internal_state.py @@ -143,3 +143,14 @@ def set_initial_state(debug=False, external_seed=None, max_cores=None, double_pr # Once again, if in debug mode or external_seed set, set also the TF seed if debug or external_seed: tf.random.set_seed(use_seed) + + +def get_physical_gpus(): + """ + Retrieve a list of all physical GPU devices available in the system. + + Returns + ------- + list: A list of TensorFlow physical devices of type 'GPU'. + """ + return tf.config.list_physical_devices('GPU') diff --git a/n3fit/src/n3fit/hyper_optimization/hyper_scan.py b/n3fit/src/n3fit/hyper_optimization/hyper_scan.py index eec8242b73..35ee70fb6d 100644 --- a/n3fit/src/n3fit/hyper_optimization/hyper_scan.py +++ b/n3fit/src/n3fit/hyper_optimization/hyper_scan.py @@ -21,6 +21,7 @@ from n3fit.backends import MetaLayer, MetaModel from n3fit.hyper_optimization.filetrials import FileTrials +from n3fit.hyper_optimization.mongofiletrials import MongoFileTrials log = logging.getLogger(__name__) @@ -120,28 +121,53 @@ def hyper_scan_wrapper(replica_path_set, model_trainer, hyperscanner, max_evals= """ # Tell the trainer we are doing hpyeropt model_trainer.set_hyperopt(True, keys=hyperscanner.hyper_keys) + # Generate the trials object - trials = FileTrials(replica_path_set, parameters=hyperscanner.as_dict()) + if hyperscanner.parallel_hyperopt: + # Instantiate `MongoFileTrials` + # Mongo database should have already been initiated at this point + trials = MongoFileTrials( + replica_path_set, + db_host=hyperscanner.db_host, + db_port=hyperscanner.db_port, + db_name=hyperscanner.db_name, + num_workers=hyperscanner.num_mongo_workers, + parameters=hyperscanner.as_dict(), + ) + else: + # Instantiate `FileTrials` + trials = FileTrials(replica_path_set, parameters=hyperscanner.as_dict()) + # Initialize seed for hyperopt trials.rstate = np.random.default_rng(HYPEROPT_SEED) - # For restarts, reset the state of `FileTrials` saved in the pickle file if hyperscanner.restart_hyperopt: - pickle_file_to_load = f"{replica_path_set}/tries.pkl" - log.info("Restarting hyperopt run using the pickle file %s", pickle_file_to_load) - trials = FileTrials.from_pkl(pickle_file_to_load) - - # Perform the scan - best = hyperopt.fmin( - fn=_status_wrapper(model_trainer.hyperparametrizable), + # For parallel hyperopt restarts, extract the database tar file + if hyperscanner.parallel_hyperopt: + log.info("Restarting hyperopt run using the MongoDB database %s", trials.db_name) + trials.extract_mongodb_database() + else: + # For sequential hyperopt restarts, reset the state of `FileTrials` saved in the pickle file + pickle_file_to_load = f"{replica_path_set}/tries.pkl" + log.info("Restarting hyperopt run using the pickle file %s", pickle_file_to_load) + trials = FileTrials.from_pkl(pickle_file_to_load) + + # Call to hyperopt.fmin + fmin_args = dict( + fn=model_trainer.hyperparametrizable, space=hyperscanner.as_dict(), algo=hyperopt.tpe.suggest, max_evals=max_evals, - show_progressbar=False, trials=trials, rstate=trials.rstate, - trials_save_file=trials.pkl_file, ) + if hyperscanner.parallel_hyperopt: + trials.start_mongo_workers() + best = hyperopt.fmin(**fmin_args, show_progressbar=True, max_queue_len=trials.num_workers) + trials.stop_mongo_workers() + trials.compress_mongodb_database() + else: + best = hyperopt.fmin(**fmin_args, show_progressbar=False, trials_save_file=trials.pkl_file) return hyperscanner.space_eval(best) @@ -213,6 +239,17 @@ def __init__(self, parameters, sampling_dict, steps=5): restart_config = sampling_dict.get("restart") self.restart_hyperopt = True if restart_config else False + # adding extra options for parallel execution + parallel_config = sampling_dict.get("parallel") + self.parallel_hyperopt = True if parallel_config else False + + # setting up MondoDB options + if self.parallel_hyperopt: + self.db_host = sampling_dict.get("db_host") + self.db_port = sampling_dict.get("db_port") + self.db_name = sampling_dict.get("db_name") + self.num_mongo_workers = sampling_dict.get("num_mongo_workers") + self.hyper_keys = set([]) if "parameters" in sampling_dict: diff --git a/n3fit/src/n3fit/hyper_optimization/mongofiletrials.py b/n3fit/src/n3fit/hyper_optimization/mongofiletrials.py new file mode 100644 index 0000000000..64f2a877f5 --- /dev/null +++ b/n3fit/src/n3fit/hyper_optimization/mongofiletrials.py @@ -0,0 +1,243 @@ +""" + Hyperopt trial object for parallel hyperoptimization with MongoDB. + Data are fetched from MongoDB databases and stored in the form of json files within the nnfit folder +""" +import glob +import json +import logging +import os +import subprocess + +from bson import SON, ObjectId +from hyperopt.mongoexp import MongoTrials + +from n3fit.backends import get_physical_gpus +from n3fit.hyper_optimization.filetrials import space_eval_trial + +log = logging.getLogger(__name__) + + +def convert_bson_to_dict(obj): + """ + Recursively convert a BSON object to a standard Python dictionary. + + This function is particularly useful for converting MongoDB query results, + which may contain BSON types like ObjectId and SON, into a more manageable + dictionary format. + + Parameters + ---------- + obj : dict or bson.SON or list or any + The object to convert. Can be a BSON object (like SON), a dictionary + containing BSON types, a list of such objects, or any other type. + + Returns + ------- + dict or list or any + A Python dictionary with all BSON types converted to standard Python + types (e.g., ObjectId converted to string). If the input is a list, + returns a list of converted elements. For other types, returns the + object as is. + + Examples + -------- + >>> from bson import ObjectId, SON + >>> sample_son = SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]) + >>> convert_bson_to_dict(sample_son) + {'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'} + + >>> sample_list = [SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]), {'age': 30}] + >>> convert_bson_to_dict(sample_list) + [{'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'}, {'age': 30}] + """ + if isinstance(obj, (SON, dict)): + return {k: convert_bson_to_dict(v) for k, v in obj.items()} + if isinstance(obj, ObjectId): + return str(obj) # or just return None if you don't need the ObjectId + if isinstance(obj, list): + return [convert_bson_to_dict(v) for v in obj] + return obj + + +class MongoFileTrials(MongoTrials): + """ + MongoDB implementation of :class:`n3fit.hyper_optimization.filetrials.FileTrials`. + + Parameters + ---------- + replica_path: path + Replica folder as generated by n3fit. + db_host: str + MongoDB database connection host. Defaults to "localhost". + db_port: int + MongoDB database connection port. Defaults to 27017. + db_name: str + MongoDB database name. Defaults to "hyperopt-db". + num_workers: int + Number of MongoDB workers to be initiated concurrently. Defaults to 1. + parameters: dict + Dictionary of parameters on which we are doing hyperoptimization. Default to None. + store_trial: bool + If True, store data into json file. Default to True. + """ + + def __init__( + self, + replica_path, + db_host="localhost", + db_port=27017, + db_name="hyperopt-db", + num_workers=1, + parameters=None, + *args, + **kwargs, + ): + self.db_host = db_host + self.db_port = str(db_port) + self.db_name = db_name + self.num_workers = num_workers + self.mongotrials_arg = f"mongo://{self.db_host}:{self.db_port}/{self.db_name}/jobs" + self.workers = [] + + self._store_trial = False + self._json_file = replica_path / "tries.json" + self.database_tar_file = replica_path / f"{self.db_name}.tar.gz" + self._parameters = parameters + self._rstate = None + self._dynamic_trials = [] + + super().__init__(self.mongotrials_arg, *args, **kwargs) + + @property + def rstate(self): + """Returns the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" + return self._rstate + + @rstate.setter + def rstate(self, random_generator): + """Sets the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" + self._rstate = random_generator + + def _set_dynamic_trials(self): + """Converts self._trials to a dictionary and stores it in self._dynamic_trials.""" + self._dynamic_trials = [convert_bson_to_dict(item) for item in self._trials] + + def refresh(self): + """Fetches data from mongo database and save to a json file.""" + super().refresh() + + # convert BSON object to a dictionary + self._set_dynamic_trials() + + # write json to disk + if self._store_trial: + log.info("Storing scan in %s", self._json_file) + local_trials = [] + for idx, t in enumerate(self._dynamic_trials): + local_trials.append(t) + local_trials[idx]["misc"]["space_vals"] = space_eval_trial(self._parameters, t) + + all_to_str = json.dumps(local_trials, default=str) + with open(self._json_file, "w") as f: + f.write(all_to_str) + + # like in `FileTrials` the two methods below are implemented to avoid writing to the database twice + def new_trial_ids(self, n): + self._store_trial = False + return super().new_trial_ids(n) + + def _insert_trial_docs(self, docs): + self._store_trial = True + return super()._insert_trial_docs(docs) + + def start_mongo_workers( + self, workdir=None, exp_key=None, poll_interval=0.1, use_subprocesses=False + ): + """Initiates all mongo workers simultaneously.""" + # get the number of gpu cards, if any + gpus_all_physical_list = get_physical_gpus() + num_gpus_available = len(gpus_all_physical_list) + if not num_gpus_available: + log.warning("No GPUs found in the system.") + + # launch mongo workers + for i in range(self.num_workers): + # construct the command to start a hyperopt-mongo-worker + args = [ + "hyperopt-mongo-worker", + "--mongo", + f"{self.db_host}:{self.db_port}/{self.db_name}", + ] + if workdir: + args.extend(["--workdir", workdir]) + if exp_key: + args.extend(["--exp-key", exp_key]) + args.extend(["--poll-interval", str(poll_interval)]) + if use_subprocesses: + args.append("--no-subprocesses") + + # start the worker as a subprocess + try: + my_env = os.environ.copy() + + if num_gpus_available: + # set CUDA_VISIBLE_DEVICES environment variable + # the GPU index assigned to each worker i is given by mod(i, num_gpus_available) + my_env["CUDA_VISIBLE_DEVICES"] = str(i % num_gpus_available) + # set tensorflow memory growth + my_env["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" + # avoid memory fragmentation issues? + # my_env["TF_GPU_ALLOCATOR"] = "cuda_malloc_async" + + # run mongo workers + worker = subprocess.Popen(args, env=my_env) + # we could use `stderr=subprocess.DEVNULL` in Popen to suppress output info + self.workers.append(worker) + log.info(f"Started mongo worker {i+1}/{self.num_workers}") + except OSError as err: + msg = f"Failed to execute {args}. Make sure you have MongoDB installed." + raise EnvironmentError(msg) from err + + def stop_mongo_workers(self): + """Terminates all active mongo workers.""" + for worker in self.workers: + try: + worker.terminate() + worker.wait() + log.info(f"Stopped mongo worker {self.workers.index(worker)+1}/{self.num_workers}") + except Exception as err: + log.error( + f"Failed to stop mongo worker {self.workers.index(worker)+1}/{self.num_workers}: {err}" + ) + + def compress_mongodb_database(self): + """Saves MongoDB database as tar file""" + # check if the database exist + if not os.path.exists(f"{self.db_name}" and not glob.glob('65*')): + raise FileNotFoundError( + f"The MongoDB database directory '{self.db_name}' does not exist. " + "Ensure it has been initiated correctly and it is in your path." + ) + # create the tar.gz file + try: + log.info(f"Compressing MongoDB database into {self.database_tar_file}") + subprocess.run( + ['tar', '-cvf', f'{self.database_tar_file}', f'{self.db_name}'] + glob.glob('65*'), + check=True, + ) + except subprocess.CalledProcessError as err: + raise RuntimeError(f"Error compressing the database: {err}") + + def extract_mongodb_database(self): + """Untar MongoDB database for use in restarts.""" + # check if the database tar file exist + if not os.path.exists(f"{self.database_tar_file}"): + raise FileNotFoundError( + f"The MongoDB database tar file '{self.database_tar_file}' does not exist." + ) + # extract tar file + try: + log.info(f"Extracting MongoDB database from {self.database_tar_file}") + subprocess.run(['tar', '-xvf', f'{self.database_tar_file}'], check=True) + except subprocess.CalledProcessError as err: + raise RuntimeError(f"Error extracting the database: {err}") diff --git a/n3fit/src/n3fit/layers/losses.py b/n3fit/src/n3fit/layers/losses.py index d9b8be014e..1d330ef8f5 100644 --- a/n3fit/src/n3fit/layers/losses.py +++ b/n3fit/src/n3fit/layers/losses.py @@ -41,7 +41,6 @@ class LossInvcovmat(MetaLayer): def __init__(self, invcovmat, y_true, mask=None, covmat=None, **kwargs): self._invcovmat = op.numpy_to_tensor(invcovmat) self._covmat = covmat - self._diag = diag self._y_true = op.numpy_to_tensor(y_true) self._ndata = y_true.shape[-1] if mask is None or all(mask): @@ -68,10 +67,7 @@ def add_covmat(self, covmat): Note, however, that the _covmat attribute of the layer will still refer to the original data covmat """ - if self._diag: - new_covmat = np.invert(self._covmat + covmat) - else: - new_covmat = np.linalg.inv(self._covmat + covmat) + new_covmat = np.linalg.inv(self._covmat + covmat) self.kernel.assign(new_covmat) def update_mask(self, new_mask): diff --git a/n3fit/src/n3fit/scripts/n3fit_exec.py b/n3fit/src/n3fit/scripts/n3fit_exec.py index 32ddf5260f..fb9de3d081 100755 --- a/n3fit/src/n3fit/scripts/n3fit_exec.py +++ b/n3fit/src/n3fit/scripts/n3fit_exec.py @@ -234,6 +234,16 @@ def produce_hyperscanner(self, parameters, hyperscan_config=None, hyperopt=None) return None if hyperopt and self.environment.restart: hyperscan_config.update({'restart': 'true'}) + if hyperopt and self.environment.parallel_hyperopt: + hyperscan_config.update({'parallel': 'true'}) + hyperscan_config.update( + { + 'db_host': self.environment.db_host, + 'db_port': self.environment.db_port, + 'db_name': self.environment.db_name, + 'num_mongo_workers': self.environment.num_mongo_workers, + } + ) return HyperScanner(parameters, hyperscan_config) @@ -261,6 +271,20 @@ def check_positive(value): parser.add_argument("--hyperopt", help="Enable hyperopt scan", default=None, type=int) parser.add_argument("--restart", help="Enable hyperopt restarts", action="store_true") + parser.add_argument( + "--parallel-hyperopt", + help="Enable hyperopt run in parallel with MongoDB", + action="store_true", + ) + parser.add_argument("--db-host", help="MongoDB host", default="localhost") + parser.add_argument("--db-port", help="MongoDB port", default=27017) + parser.add_argument("--db-name", help="MongoDB dataset name", default="hyperopt-db") + parser.add_argument( + "--num-mongo-workers", + help="Number of mongo workers to be launched simultaneously", + type=check_positive, + default=1, + ) parser.add_argument("replica", help="MC replica number", type=check_positive) parser.add_argument( "-r", @@ -272,6 +296,18 @@ def check_positive(value): def get_commandline_arguments(self, cmdline=None): args = super().get_commandline_arguments(cmdline) + + # Validate dependencies related to the --hyperopt argument + if args["hyperopt"] is None: + if args["restart"]: + raise argparse.ArgumentError( + None, "The --restart option requires --hyperopt to be set." + ) + if args["parallel_hyperopt"]: + raise argparse.ArgumentError( + None, "The --parallel-hyperopt option requires --hyperopt to be set." + ) + if args["output"] is None: args["output"] = pathlib.Path(args["config_yml"]).stem return args @@ -287,6 +323,11 @@ def run(self): self.environment.replicas = NSList(replicas, nskey="replica") self.environment.hyperopt = self.args["hyperopt"] self.environment.restart = self.args["restart"] + self.environment.parallel_hyperopt = self.args["parallel_hyperopt"] + self.environment.db_host = self.args["db_host"] + self.environment.db_port = self.args["db_port"] + self.environment.db_name = self.args["db_name"] + self.environment.num_mongo_workers = self.args["num_mongo_workers"] super().run() except N3FitError as e: log.error(f"Error in n3fit:\n{e}") diff --git a/n3fit/src/n3fit/tests/test_hyperopt.py b/n3fit/src/n3fit/tests/test_hyperopt.py index 965c9773b7..364367b722 100644 --- a/n3fit/src/n3fit/tests/test_hyperopt.py +++ b/n3fit/src/n3fit/tests/test_hyperopt.py @@ -5,6 +5,7 @@ import pathlib import shutil import subprocess as sp +import time import numpy as np from numpy.testing import assert_approx_equal @@ -161,3 +162,98 @@ def test_restart_from_pickle(tmp_path): assert restart_json[i]['state'] == direct_json[i]['state'] assert restart_json[i]['tid'] == direct_json[i]['tid'] assert restart_json[i]['result'] == direct_json[i]['result'] + + +def start_mongo_database(tmp_path): + """Creates MongoDB database and returns the Popen object.""" + db_command = ["mongod", "--dbpath", f"{tmp_path}/hyperopt-db"] + directory_path = f"{tmp_path}/hyperopt-db" + try: + # create database directory + sp.run(["mkdir", "-p", directory_path], check=True) + # launch database + process = sp.Popen(db_command, cwd=tmp_path) + return process + except (sp.CalledProcessError, OSError) as err: + msg = f"Error creating directory or executing {db_command}: {err}" + raise EnvironmentError(msg) from err + + +def stop_mongod_command(process): + """Stops the MongoDB database.""" + # directory_path = f"{tmp_path}/hyperopt" + try: + # stop mongod command + process.terminate() + process.wait() + # remove database files + # sp.run(f"rm -r {directory_path} && rm -r {tmp_path}/65*", check=True) + except (sp.CalledProcessError, OSError) as err: + msg = f"Error stopping the MongoDB process or removing database files: {err}" + raise EnvironmentError(msg) from err + + +def test_parallel_hyperopt(tmp_path): + """Ensure that the parallel implementation of hyperopt with MongoDB works as expected.""" + # Prepare the run + quickcard = f"hyper-{QUICKNAME}.yml" + quickpath = REGRESSION_FOLDER / quickcard + + # Define number of trials and number of mongo-workers to launch + n_trials = 6 + n_mongo_workers = 3 + + # Set up output directories + output_sequential = tmp_path / "run_hyperopt_sequential" + output_parallel = tmp_path / "run_hyperopt_parallel" + + # cp runcard to tmp folder + shutil.copy(quickpath, tmp_path) + + # Run hyperopt sequentially + start_time = time.time() + sp.run( + f"{EXE} {quickpath} {REPLICA} --hyperopt {n_trials} " f"-o {output_sequential}".split(), + cwd=tmp_path, + check=True, + ) + end_time = time.time() + sequential_run_time = end_time - start_time + + # Generate on-the-fly a real MongoDB database + process = start_mongo_database(tmp_path) + + # Run hyperopt in parallel + start_time = time.time() + sp.run( + f"{EXE} {quickpath} {REPLICA} --hyperopt {n_trials} " + f"--parallel-hyperopt --num-mongo-workers {n_mongo_workers} " + f"-o {output_parallel}".split(), + cwd=tmp_path, + check=True, + ) + end_time = time.time() + parallel_run_time = end_time - start_time + + # Stop mongod command + stop_mongod_command(process) + + # Read up generated json files + sequential_json_path = f"{output_sequential}/nnfit/replica_{REPLICA}/tries.json" + sequential_json = load_data(sequential_json_path) + parallel_json_path = f"{output_parallel}/nnfit/replica_{REPLICA}/tries.json" + parallel_json = load_data(parallel_json_path) + + # Check that the parallel run time is lower than the sequential one + assert parallel_run_time < sequential_run_time + + # Check that the final json files have the same number of trials + assert len(parallel_json) == len(sequential_json) + + for i in range(n_trials): + # Check that the files share the same content + assert len(parallel_json[i]['misc']) == len(sequential_json[i]['misc']) + assert len(parallel_json[i]['result']) == len(sequential_json[i]['result']) + # Note: cannot check that they share exactly the same history + # as the hyperopt algorithm depends on the results from previous runs + # which is obviously different between parallel and sequential runs diff --git a/pyproject.toml b/pyproject.toml index 68d7ae5370..3c8b1095d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ vp-nextfitruncard = "validphys.scripts.vp_nextfitruncard:main" vp-hyperoptplot = "validphys.scripts.vp_hyperoptplot:main" vp-deltachi2 = "validphys.scripts.vp_deltachi2:main" -[tool.poetry.dependencies] +[tool.poetry.dependencies] # Generic dependencies (i.e., validphys) python = "^3.9" matplotlib = ">=3.3.0,<3.8" @@ -68,6 +68,7 @@ pandas = "<2" numpy = "*" validobj = "*" prompt_toolkit = "*" +frozendict = "*" # validphys: needed for caching of data loading # Reportengine (and its dependencies) need to be installed in a bit more manual way reportengine = { git = "https://github.com/NNPDF/reportengine", rev = "3bb2b1d"} ruamel_yaml = {version = "<0.18"} @@ -78,6 +79,8 @@ eko = "^0.14.1" # Hyperopt hyperopt = "*" seaborn = "*" +# Hyperopt parallel +pymongo = "<4" # LHAPDF installation for debugging purposes # a3b2bbc3ced97675ac3a71df45f55ba = "*" # Optional dependencies diff --git a/validphys2/src/validphys/config.py b/validphys2/src/validphys/config.py index dd440bea3c..0b2cff9e09 100644 --- a/validphys2/src/validphys/config.py +++ b/validphys2/src/validphys/config.py @@ -1288,7 +1288,7 @@ def produce_rules( if added_filter_rules: for i, rule in enumerate(added_filter_rules): - if not (isinstance(rule, dict) or isinstance(rule, frozendict)): + if not isinstance(rule, (dict, frozendict)): raise ConfigError(f"added rule {i} is not a dict") try: rule_list.append( diff --git a/validphys2/src/validphys/utils.py b/validphys2/src/validphys/utils.py index 9bb3fa28ea..119dd5b4eb 100644 --- a/validphys2/src/validphys/utils.py +++ b/validphys2/src/validphys/utils.py @@ -8,14 +8,41 @@ import functools import pathlib import shutil -import tempfile -from typing import Any, Mapping, Sequence +from typing import Any, Hashable, Mapping, Sequence from frozendict import frozendict import numpy as np from validobj import ValidationError, parse_input +def make_hashable(obj: Any): + # So that we don't infinitely recurse since frozenset and tuples + # are Sequences. + if isinstance(obj, Hashable): + return obj + elif isinstance(obj, Mapping): + return frozendict(obj) + elif isinstance(obj, Sequence): + return tuple([make_hashable(i) for i in obj]) + else: + raise ValueError("Object is not hashable") + + +def freeze_args(func): + """Transform mutable dictionary + Into immutable + Useful to be compatible with cache + """ + + @functools.wraps(func) + def wrapped(*args, **kwargs): + args = tuple([make_hashable(arg) for arg in args]) + kwargs = {k: make_hashable(v) for k, v in kwargs.items()} + return func(*args, **kwargs) + + return wrapped + + # Since typing.Hashable doesn't check recursively you actually # have to try hashing it. def is_hashable(obj):