Skip to content

Commit

Permalink
fix: overhauled distributed COS calculations
Browse files Browse the repository at this point in the history
moved logic to separate functions located in pysisyphus.cos.distributed
  • Loading branch information
Johannes Steinmetzer committed Oct 4, 2023
1 parent d64069a commit 383ee43
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 56 deletions.
20 changes: 15 additions & 5 deletions examples/complex/13_orca_parallel_neb/13_orca_parallel_neb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@ cos:
# avaiable physical cores by default (logical cores, e.g., from
# Hyperthreading are disabled by default).
cluster: True
# The number of avaiable workers can be controlled by setting 'n_workers'
# The number of avaiable threads can be controlled by setting 'threads_per_worker'
# to an appropriate value. When 'cluster_kwargs' with some arguments is given
# 'cluster' is automatically set to True.
#
# It is probably a good idea to set 'n_workers' to the number of actual
# physical cores that should be utilized.
# It is probably a good idea to set 'threads_per_worker' to the number of actual
# physical cores to be utilized.
#
# As there are 1+8+1 images in total and only 8 are moving I pick 4
# workers, so all images can be treated in two batches.
# workers, so all images can be treated in two batches. In the first
# cycle two additional calculations (first and last image) will be done.
cluster_kwargs:
n_workers: 4
n_workers: 1
threads_per_worker: 4
# pysisyphus uses resources to manage the different tasks, so the appropriate
# number of CPU resources per worker must be given here. This should be the same
# number as used in threads_per_worker.
#
# As mentioned above; just setting cluster: True should be enough in most of the
# cases.
resources:
CPU: 4
# Alternatively, the address to an external scheduler not managed by
# pysisyphus can be provided. If an address is provided, this takes
# precedence and pysisyphus won't create an internal cluster.
Expand Down
70 changes: 25 additions & 45 deletions pysisyphus/cos/ChainOfStates.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from pysisyphus.helpers_pure import hash_arr, rms
from pysisyphus.modefollow import geom_lanczos

# from pysisyphus.calculators.parallel import distributed_calculations
from pysisyphus.cos.distributed import distributed_calculations


class ClusterDummy:
def close(self):
Expand Down Expand Up @@ -67,10 +70,20 @@ def __init__(
self._cluster = bool(cluster) or (cluster_kwargs is not None)
if cluster_kwargs is None:
cluster_kwargs = dict()
# _cluster_kwargs = {
# # Only one calculation / worker
# "threads_per_worker": 1,
# "n_workers": psutil.cpu_count(logical=False),
# }
ncores = psutil.cpu_count(logical=False)
_cluster_kwargs = {
# Only one calculation / worker
"threads_per_worker": 1,
"n_workers": psutil.cpu_count(logical=False),
# One worker per cluster with multiple threads
"threads_per_worker": ncores,
"n_workers": 1,
# Register available cores as resource
"resources": {
"CPU": ncores,
},
}
_cluster_kwargs.update(cluster_kwargs)
self.cluster_kwargs = _cluster_kwargs
Expand Down Expand Up @@ -175,10 +188,6 @@ def get_fixed_indices(self):

def get_dask_local_cluster(self):
cluster = LocalCluster(**self.cluster_kwargs)
dashboard = cluster.scheduler.services["dashboard"]
# address = dashboard.address
# port = dashboard.port
# link = f"http://{address}:{port}/status"
link = cluster.dashboard_link
self.log(f"Created {cluster}. Dashboard is available at {link}")
return cluster
Expand Down Expand Up @@ -335,47 +344,18 @@ def par_image_calc(self, image):
image.calc_energy_and_forces()
return image

def set_images(self, indices, images):
for ind, image in zip(indices, images):
self.images[ind] = image

def concurrent_force_calcs(self, images_to_calculate, image_indices):
client = self.get_dask_client()
self.log(f"Doing concurrent force calculations using {client}")

# save original pals to restore them later
orig_pal = images_to_calculate[0].calculator.pal

# divide pal of each image by the number of workers available or available images
# number of workers available
n_workers = len(client.scheduler_info()["workers"])
# number of images to calculate
n_images = len(images_to_calculate)
# divide pal by the number of workers (or 1 if more workers)
new_pal = max(1, orig_pal // n_workers)

# split images to calculate into batches of n_workers and set pal of each image
n_batches = n_images // n_workers
for i in range(0, n_batches):
for j in range(i * n_workers, (i + 1) * n_workers):
images_to_calculate[j].calculator.pal = new_pal

# distribute the pals among the remaining images
n_last_batch = n_images % n_workers
if n_last_batch > 0:
# divide pal by the remainder
new_pal = max(1, orig_pal // n_last_batch)
for i in range(n_batches * n_workers, n_images):
images_to_calculate[i].calculator.pal = new_pal

# map images to workers
image_futures = client.map(self.par_image_calc, images_to_calculate)
# set images to the results of the calculations
self.set_images(image_indices, client.gather(image_futures))

# Restore original pals
for i in range(0, n_images):
self.images[i].calculator.pal = orig_pal
calculated_images = distributed_calculations(
client,
images_to_calculate,
self.par_image_calc,
logger=self.logger,
)
# Set calculated images
for ind, image in zip(image_indices, calculated_images):
self.images[ind] = image
client.close()

def calculate_forces(self):
Expand Down
129 changes: 129 additions & 0 deletions pysisyphus/cos/distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from dataclasses import dataclass
from collections.abc import Callable
import math

import distributed
import numpy as np

from pysisyphus.Geometry import Geometry
from pysisyphus.helpers_pure import log


@dataclass
class ParallelData:
ncores: int
ncalcs: int
nbatches: int
batch_sizes: np.ndarray
batch_pals: np.ndarray
calc_pals: np.ndarray


def pal_values_for_parallel_calcs(ncores: int, ncalcs: int) -> list[int]:
"""Determine sensible pal values for parallel calculations.
Given 'ncores' available CPU cores and 'ncalcs' calculation, determine suitable
'pal' values for each calculation, so the calculations can run in parallel. If there
are more calculations than cores, most calculations will run with pal=1, with a few
remaining calculations with pal > 1.
Parameters
----------
ncores
Positive integer, number of available cores.
ncalcs
Integer, number of calculations to carry out.
Returns
-------
calc_pals
List of positive integers.
"""
assert ncores > 0
assert ncalcs > 0

# Most calculations will run with pal=1, i.e., one calculation per core.
nbatches = math.ceil(ncalcs / ncores)

# Run one calculation per core in a batch and the remaining calculations in the
# last batch with potentially more cores.
batch_sizes = np.full(nbatches, ncores)
if (rest := ncalcs % ncores) != 0:
batch_sizes[-1] = rest

# The number of cores used per calculation in a batch are derived from the number
# of available cores.
batch_pals = ncores // batch_sizes
# print(f"{ncalcs=}, {ncores=}, {nbatches=}, {batch_sizes=}, {batch_pals=}")

# Distribute pal values of all batches and images
calc_pals = np.repeat(batch_pals, batch_sizes).tolist()
assert len(calc_pals) == ncalcs
pal_data = ParallelData(
ncores=ncores,
ncalcs=ncalcs,
nbatches=nbatches,
batch_sizes=batch_sizes,
batch_pals=batch_pals,
calc_pals=calc_pals,
)
return pal_data


def distributed_calculations(
client: distributed.Client,
images: list[Geometry],
func: Callable,
logger=None,
) -> list[Geometry]:
"""Carray out distributed calculations via dask.
func should return the modified image."""
nimages = len(images)

# Determine number of available CPU resources.
ncores = 0
scheduler_info = client.scheduler_info()
for worker_data in scheduler_info["workers"].values():
try:
cpu = worker_data["resources"]["CPU"]
except KeyError:
cpu = 0
ncores += cpu
assert ncores > 0, "No 'CPU' resources available. Did you forget to specify them?"

# Backup original pal values for later restoration.
pals_backup = [image.calculator.pal for image in images]
# Assert that all pal values are initially the same. In the end this is probably
# not necessary, but if there are varying pal values a more elaborate strategy
# may be warranted.
pal0 = pals_backup[0]
assert all([pal == pal0 for pal in pals_backup]), (
"Image calculators have different pal values! This function only supports "
"images calculators with the same pal value throughout!"
)
pal_data = pal_values_for_parallel_calcs(ncores, nimages)
log(logger, pal_data)
pals_parallel = pal_data.calc_pals

futures = list()
for image, pal_parallel in zip(images, pals_parallel):
# Set potentially modified pal value
image.calculator.pal = pal_parallel
futures.append(
client.submit(
func,
image,
resources={
"CPU": pal_parallel,
},
)
)
# print(f"submitted task with {pal_parallel=}")
calculated_images = client.gather(futures)

# Set original pal values on all calculators
for image, pal_org in zip(calculated_images, pals_backup):
image.calculator.pal = pal_org

return calculated_images
22 changes: 16 additions & 6 deletions tests/test_dask/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
@pytest.mark.skip_ci
@using("orca")
def test_dask():
with LocalCluster(n_workers=5, threads_per_worker=1) as cluster:
ncores = 5
with LocalCluster(
n_workers=1,
threads_per_worker=ncores,
resources={
"CPU": ncores,
},
) as cluster:
address = cluster.scheduler_address

geoms = geom_loader("lib:ala_dipeptide_iso_b3lyp_631gd_10_images.trj")
Expand All @@ -35,16 +42,19 @@ def test_dask():
@pytest.mark.parametrize("is_external", (True, False))
@using("orca")
def test_external(is_external):
n_workers = 5
ncores = 5
cos_kwargs = {}
if is_external:
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1)
cluster = LocalCluster(
n_workers=1,
threads_per_worker=ncores,
resources={
"CPU": ncores,
},
)
cos_kwargs["scheduler"] = cluster.scheduler_address
else:
cos_kwargs["cluster"] = True
cos_kwargs["cluster_kwargs"] = {
"n_workers": 5,
}

geoms = geom_loader("lib:ala_dipeptide_iso_b3lyp_631gd_10_images.trj")

Expand Down

0 comments on commit 383ee43

Please sign in to comment.