diff --git a/examples/complex/13_orca_parallel_neb/13_orca_parallel_neb.yaml b/examples/complex/13_orca_parallel_neb/13_orca_parallel_neb.yaml index 144221ac1..a1f159ebf 100644 --- a/examples/complex/13_orca_parallel_neb/13_orca_parallel_neb.yaml +++ b/examples/complex/13_orca_parallel_neb/13_orca_parallel_neb.yaml @@ -19,17 +19,27 @@ cos: # avaiable physical cores by default (logical cores, e.g., from # Hyperthreading are disabled by default). cluster: True - # The number of avaiable workers can be controlled by setting 'n_workers' + # The number of avaiable threads can be controlled by setting 'threads_per_worker' # to an appropriate value. When 'cluster_kwargs' with some arguments is given # 'cluster' is automatically set to True. # - # It is probably a good idea to set 'n_workers' to the number of actual - # physical cores that should be utilized. + # It is probably a good idea to set 'threads_per_worker' to the number of actual + # physical cores to be utilized. # # As there are 1+8+1 images in total and only 8 are moving I pick 4 - # workers, so all images can be treated in two batches. + # workers, so all images can be treated in two batches. In the first + # cycle two additional calculations (first and last image) will be done. cluster_kwargs: - n_workers: 4 + n_workers: 1 + threads_per_worker: 4 + # pysisyphus uses resources to manage the different tasks, so the appropriate + # number of CPU resources per worker must be given here. This should be the same + # number as used in threads_per_worker. + # + # As mentioned above; just setting cluster: True should be enough in most of the + # cases. + resources: + CPU: 4 # Alternatively, the address to an external scheduler not managed by # pysisyphus can be provided. If an address is provided, this takes # precedence and pysisyphus won't create an internal cluster. diff --git a/pysisyphus/cos/ChainOfStates.py b/pysisyphus/cos/ChainOfStates.py index 9558c94f5..9e32407f3 100644 --- a/pysisyphus/cos/ChainOfStates.py +++ b/pysisyphus/cos/ChainOfStates.py @@ -16,6 +16,9 @@ from pysisyphus.helpers_pure import hash_arr, rms from pysisyphus.modefollow import geom_lanczos +# from pysisyphus.calculators.parallel import distributed_calculations +from pysisyphus.cos.distributed import distributed_calculations + class ClusterDummy: def close(self): @@ -67,10 +70,20 @@ def __init__( self._cluster = bool(cluster) or (cluster_kwargs is not None) if cluster_kwargs is None: cluster_kwargs = dict() + # _cluster_kwargs = { + # # Only one calculation / worker + # "threads_per_worker": 1, + # "n_workers": psutil.cpu_count(logical=False), + # } + ncores = psutil.cpu_count(logical=False) _cluster_kwargs = { - # Only one calculation / worker - "threads_per_worker": 1, - "n_workers": psutil.cpu_count(logical=False), + # One worker per cluster with multiple threads + "threads_per_worker": ncores, + "n_workers": 1, + # Register available cores as resource + "resources": { + "CPU": ncores, + }, } _cluster_kwargs.update(cluster_kwargs) self.cluster_kwargs = _cluster_kwargs @@ -175,10 +188,6 @@ def get_fixed_indices(self): def get_dask_local_cluster(self): cluster = LocalCluster(**self.cluster_kwargs) - dashboard = cluster.scheduler.services["dashboard"] - # address = dashboard.address - # port = dashboard.port - # link = f"http://{address}:{port}/status" link = cluster.dashboard_link self.log(f"Created {cluster}. Dashboard is available at {link}") return cluster @@ -335,47 +344,18 @@ def par_image_calc(self, image): image.calc_energy_and_forces() return image - def set_images(self, indices, images): - for ind, image in zip(indices, images): - self.images[ind] = image - def concurrent_force_calcs(self, images_to_calculate, image_indices): client = self.get_dask_client() self.log(f"Doing concurrent force calculations using {client}") - - # save original pals to restore them later - orig_pal = images_to_calculate[0].calculator.pal - - # divide pal of each image by the number of workers available or available images - # number of workers available - n_workers = len(client.scheduler_info()["workers"]) - # number of images to calculate - n_images = len(images_to_calculate) - # divide pal by the number of workers (or 1 if more workers) - new_pal = max(1, orig_pal // n_workers) - - # split images to calculate into batches of n_workers and set pal of each image - n_batches = n_images // n_workers - for i in range(0, n_batches): - for j in range(i * n_workers, (i + 1) * n_workers): - images_to_calculate[j].calculator.pal = new_pal - - # distribute the pals among the remaining images - n_last_batch = n_images % n_workers - if n_last_batch > 0: - # divide pal by the remainder - new_pal = max(1, orig_pal // n_last_batch) - for i in range(n_batches * n_workers, n_images): - images_to_calculate[i].calculator.pal = new_pal - - # map images to workers - image_futures = client.map(self.par_image_calc, images_to_calculate) - # set images to the results of the calculations - self.set_images(image_indices, client.gather(image_futures)) - - # Restore original pals - for i in range(0, n_images): - self.images[i].calculator.pal = orig_pal + calculated_images = distributed_calculations( + client, + images_to_calculate, + self.par_image_calc, + logger=self.logger, + ) + # Set calculated images + for ind, image in zip(image_indices, calculated_images): + self.images[ind] = image client.close() def calculate_forces(self): diff --git a/pysisyphus/cos/distributed.py b/pysisyphus/cos/distributed.py new file mode 100644 index 000000000..a0e84b27f --- /dev/null +++ b/pysisyphus/cos/distributed.py @@ -0,0 +1,129 @@ +from dataclasses import dataclass +from collections.abc import Callable +import math + +import distributed +import numpy as np + +from pysisyphus.Geometry import Geometry +from pysisyphus.helpers_pure import log + + +@dataclass +class ParallelData: + ncores: int + ncalcs: int + nbatches: int + batch_sizes: np.ndarray + batch_pals: np.ndarray + calc_pals: np.ndarray + + +def pal_values_for_parallel_calcs(ncores: int, ncalcs: int) -> list[int]: + """Determine sensible pal values for parallel calculations. + + Given 'ncores' available CPU cores and 'ncalcs' calculation, determine suitable + 'pal' values for each calculation, so the calculations can run in parallel. If there + are more calculations than cores, most calculations will run with pal=1, with a few + remaining calculations with pal > 1. + + Parameters + ---------- + ncores + Positive integer, number of available cores. + ncalcs + Integer, number of calculations to carry out. + + Returns + ------- + calc_pals + List of positive integers. + """ + assert ncores > 0 + assert ncalcs > 0 + + # Most calculations will run with pal=1, i.e., one calculation per core. + nbatches = math.ceil(ncalcs / ncores) + + # Run one calculation per core in a batch and the remaining calculations in the + # last batch with potentially more cores. + batch_sizes = np.full(nbatches, ncores) + if (rest := ncalcs % ncores) != 0: + batch_sizes[-1] = rest + + # The number of cores used per calculation in a batch are derived from the number + # of available cores. + batch_pals = ncores // batch_sizes + # print(f"{ncalcs=}, {ncores=}, {nbatches=}, {batch_sizes=}, {batch_pals=}") + + # Distribute pal values of all batches and images + calc_pals = np.repeat(batch_pals, batch_sizes).tolist() + assert len(calc_pals) == ncalcs + pal_data = ParallelData( + ncores=ncores, + ncalcs=ncalcs, + nbatches=nbatches, + batch_sizes=batch_sizes, + batch_pals=batch_pals, + calc_pals=calc_pals, + ) + return pal_data + + +def distributed_calculations( + client: distributed.Client, + images: list[Geometry], + func: Callable, + logger=None, +) -> list[Geometry]: + """Carray out distributed calculations via dask. + + func should return the modified image.""" + nimages = len(images) + + # Determine number of available CPU resources. + ncores = 0 + scheduler_info = client.scheduler_info() + for worker_data in scheduler_info["workers"].values(): + try: + cpu = worker_data["resources"]["CPU"] + except KeyError: + cpu = 0 + ncores += cpu + assert ncores > 0, "No 'CPU' resources available. Did you forget to specify them?" + + # Backup original pal values for later restoration. + pals_backup = [image.calculator.pal for image in images] + # Assert that all pal values are initially the same. In the end this is probably + # not necessary, but if there are varying pal values a more elaborate strategy + # may be warranted. + pal0 = pals_backup[0] + assert all([pal == pal0 for pal in pals_backup]), ( + "Image calculators have different pal values! This function only supports " + "images calculators with the same pal value throughout!" + ) + pal_data = pal_values_for_parallel_calcs(ncores, nimages) + log(logger, pal_data) + pals_parallel = pal_data.calc_pals + + futures = list() + for image, pal_parallel in zip(images, pals_parallel): + # Set potentially modified pal value + image.calculator.pal = pal_parallel + futures.append( + client.submit( + func, + image, + resources={ + "CPU": pal_parallel, + }, + ) + ) + # print(f"submitted task with {pal_parallel=}") + calculated_images = client.gather(futures) + + # Set original pal values on all calculators + for image, pal_org in zip(calculated_images, pals_backup): + image.calculator.pal = pal_org + + return calculated_images diff --git a/tests/test_dask/test_dask.py b/tests/test_dask/test_dask.py index 05983e944..601ce8a64 100644 --- a/tests/test_dask/test_dask.py +++ b/tests/test_dask/test_dask.py @@ -13,7 +13,14 @@ @pytest.mark.skip_ci @using("orca") def test_dask(): - with LocalCluster(n_workers=5, threads_per_worker=1) as cluster: + ncores = 5 + with LocalCluster( + n_workers=1, + threads_per_worker=ncores, + resources={ + "CPU": ncores, + }, + ) as cluster: address = cluster.scheduler_address geoms = geom_loader("lib:ala_dipeptide_iso_b3lyp_631gd_10_images.trj") @@ -35,16 +42,19 @@ def test_dask(): @pytest.mark.parametrize("is_external", (True, False)) @using("orca") def test_external(is_external): - n_workers = 5 + ncores = 5 cos_kwargs = {} if is_external: - cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1) + cluster = LocalCluster( + n_workers=1, + threads_per_worker=ncores, + resources={ + "CPU": ncores, + }, + ) cos_kwargs["scheduler"] = cluster.scheduler_address else: cos_kwargs["cluster"] = True - cos_kwargs["cluster_kwargs"] = { - "n_workers": 5, - } geoms = geom_loader("lib:ala_dipeptide_iso_b3lyp_631gd_10_images.trj")