Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSoC] Parallelisation of AnalysisBase with multiprocessing and dask #4162

Merged
merged 288 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 250 commits
Commits
Show all changes
288 commits
Select commit Hold shift + click to select a range
c58da9f
Remove _scheduler attribute and make dask-based tests run properly
Jun 15, 2023
1061a80
Refactor scheduler usage
Jun 20, 2023
2d7a56c
Add multiple workers in dask for testing
Jun 20, 2023
3d1cf80
Refactor _setup_bslices and add processes to dask scheduler kwargs
Jun 21, 2023
bb31a32
Create frame_indices and trajectory for each bslice during _setup_bsl…
Jun 22, 2023
eb1feb1
Use explicit initialisation of timeseries wiith zeros
Jun 22, 2023
4e11bcf
Add non-trivial _parallel_conclude function
Jun 22, 2023
bc295c8
Fix tests for new dask fixture
Jun 22, 2023
75d1707
Add type-matching _parallel_conclude
Jun 23, 2023
c5bd0b5
Add fixtures to test combinations of dask and multiprocessing
Jun 25, 2023
556c309
dask and multiprocessing works in test_atomicdistances.py
Jun 25, 2023
83630b4
Fix bug in results is np.ndarray codepath
Jun 25, 2023
e41cba3
Add _setup_scheduler raising NotImplemented error in align.py::Avera…
Jun 25, 2023
1659323
dask and multiprocessing schedulers to test_align.py
Jun 25, 2023
f9a8072
dask scheduler for test_contacts.py and test for incompatibility with…
Jun 25, 2023
f975ecf
dask and multiprocessing scheduler for test_density.py
Jun 25, 2023
cf0253d
Add _parallel_conclude implementation for dielectric
Jun 25, 2023
8b0f4fb
dask and multiprocessing schedulers for test_dielectric.py
Jun 25, 2023
cc25c65
dask and multiprocessing schedulers for test_diffusionmap.py
Jun 25, 2023
4aa005b
Add NotImplementedError for parallel schedulers in dihedrals.py
Jun 25, 2023
4c1a73a
only current scheduler for test_dihedrals.py
Jun 25, 2023
b76d6ff
dask and multiprocessing tests for test_encore.py -- but some fail be…
Jun 25, 2023
502fb84
Add NotImplementedError for _setup_scheduler in gnm.py
Jun 25, 2023
49d1742
Add NotImplementedError for _setup_scheduler in helix_analysis.py
Jun 25, 2023
90a318d
current process scheduler for test_helix_analysis.py
Jun 25, 2023
3cc1dff
dask and multiprocessing schedulers for test_hole2.py
Jun 25, 2023
33e8b4c
Add NotImplementedError in for not-None schedulers
Jun 25, 2023
1058ed3
current process scheduler and test for failing non-current ones in te…
Jun 25, 2023
b49a8ba
current process only scheduler and failing test for others in test_li…
Jun 25, 2023
4e08384
Add NotImplementedError for non-current process schedulers
Jun 25, 2023
274a6ff
current process scheduler only and failing tests for non-current ones…
Jun 25, 2023
664e378
Add NotImplementedError for non-current process schedulers
Jun 25, 2023
e8eeba7
Fix scope of fixtures
Jun 25, 2023
42e34dc
Add NotImplemented error for all non-current process schedulers
Jun 25, 2023
38fe4ea
only current process scheduler and failing tests for test_nucleicacid…
Jun 25, 2023
b9b651d
dask and multiprocessing schedulers for test_persistentlength.py
Jun 25, 2023
0c151a1
Add _parallel_conclude implementation
Jun 25, 2023
33af089
dask and multiprocessing schedulers for test_psa.py
Jun 25, 2023
a79229d
Add _parallel_conclude implementation for RDF and RDF_S
Jun 25, 2023
a788358
dask and multiprocessing schedulers for test_rdf_s.py
Jun 25, 2023
6fe92c2
dask and multiprocessing schedulers for test_rdf.py
Jun 25, 2023
4c0937e
Add NotImplementedError for RMSD and RMSF classes
Jun 25, 2023
c40e9e1
only local process scheduler and failing tests for others for test_rm…
Jun 25, 2023
68795b8
current process scheduler only and failing test for others for test_w…
Jun 25, 2023
419a97a
Add NotImplementedError in _setup_scheduler
Jun 25, 2023
bab8e81
Add more clear message during exception
Jun 25, 2023
641170f
Add timeseries aggregation function
Jun 29, 2023
f31792a
dask and multiprocessing scheduler for most of the test_base.py testc…
Jun 29, 2023
4a10112
dask and multiprocessing schedulers for test_rms.py::TestRMSD
Jun 29, 2023
c8187ae
Add NotImplementedError for pca and rms
Jun 29, 2023
ac035eb
dask and multiprocessing schedulers for test_bat
Jun 29, 2023
076b4ce
dcurrent process scheduler for test_gnm.py
Jun 29, 2023
0dffdc8
dcurrent process scheduler for test_pca.py
Jun 29, 2023
d9429a8
Fix rmsf-related scheduler usage to only current process scheduler
Jun 29, 2023
767388f
remove fixme marks
Jun 29, 2023
1780468
Switch to enumerate in _compute main loop and fix code review comments
Jun 30, 2023
00593c0
Add dask to CI setup actions
Jul 2, 2023
6603173
Remove local scheduler for progressbar test
Jul 2, 2023
fd33788
Add installation with dask as asetup option
Jul 2, 2023
2a3b2f2
fix hole2 tests for -- implement only current scheduler and add faili…
Jul 2, 2023
268eada
fix progressbar test by changing order of ProgressBar and enumerate
Jul 3, 2023
4a019af
use only frame indices and frames in _setup_bslices after writing a b…
Jul 4, 2023
4877125
Refactor _setup_bslices: move enumerate to numpy and fuse logic in de…
Jul 4, 2023
9d9d918
Add documentation to AnalysisBase._parallel_conclude()
Jul 4, 2023
81a8df4
add functional-like interface draft
Jul 20, 2023
1430a84
Implement proper Client class, separating computations from AnalysisBase
Jul 23, 2023
48f094c
FINALLY implement working one-time dask cluster setup in kwargs of a …
Jul 23, 2023
110b589
Correct tests accordingly
Jul 23, 2023
d9d63d1
Separately process case of only one remote worker
Jul 24, 2023
60e4ea4
Add available_schedulers to AverageStructure
Jul 24, 2023
a91dacc
Use automatic fixture for AverageStructure
Jul 24, 2023
247d870
Add fixture for AverageStructure
Jul 24, 2023
df76f91
Add fixture for AtomicDistances
Jul 24, 2023
17c29d7
Change default available_backends to all implemented in Client
Jul 24, 2023
fc6d44d
Limit available backends for AverageStructure
Jul 24, 2023
fc74f5c
Add fixture for BAT
Jul 24, 2023
284d7c0
Add fixture tests to Contacts
Jul 24, 2023
ee91c1b
Fix n_workers check and boolean frames handling
Jul 24, 2023
b2fdd41
Fix performance of backend="dask"
Jul 27, 2023
eab7136
Add available_backends for Contacts
Jul 27, 2023
180569a
Remove _setup_scheduler
Jul 27, 2023
ba2246a
Use client fixture for Contacts
Jul 27, 2023
5c08885
Use client fixture for RMSD/RMSF
Jul 27, 2023
5c7b750
Revert files to their state in develop
Jul 27, 2023
49c9dcb
Delete files_for_undoing_changes.txt
Jul 27, 2023
62760c9
Delete conftest.py
Jul 27, 2023
90e99a5
Delete parallel_analysis_demo.ipynb
Jul 27, 2023
39f324a
Clean up notebook
Jul 27, 2023
7d7b5e5
remove notebook
Jul 27, 2023
787309b
Limit available schedulers in RMSF
Jul 27, 2023
77cff6d
Split test in two due to failing with "expectation" parametrization
Jul 27, 2023
2c583c4
Add fixture generator and fixtures for test_base and test_rms
Jul 27, 2023
76c59d3
Add dask to pyproject.toml
Jul 27, 2023
ba30774
Return computation groups explicitly
Jul 27, 2023
ac0b4a3
Merge branch 'develop' of https://github.com/MDAnalysis/mdanalysis in…
Aug 1, 2023
6e76520
Fix dask position in setup-deps/action.yaml
Aug 2, 2023
a5d24a3
Add dask[distributed] to mdanalysis[parallel] installation
Aug 2, 2023
2417d87
Undo autoformatter
Aug 2, 2023
1af07d3
Manually define available_backends for RMSD class
Aug 6, 2023
38a81db
Create separate "parallel" entry
Aug 6, 2023
d956159
Add is_installed function to utils
Aug 6, 2023
9e5e5ad
Add dict-based validatdion and computation logic for ParallelExecutor
Aug 6, 2023
fed5d9d
Add tests for ParallelExecutor
Aug 6, 2023
966ceca
Add documentation for "apply" method of ParallelExecutor
Aug 6, 2023
dd1fe28
Correct dask.distributed name
Aug 6, 2023
5643de1
Use chunksize=1 instead of explicit Pool in _compute_with_dask
Aug 6, 2023
14c5c53
Remove unnecessary function in conftest
Aug 6, 2023
477f08d
Fix function to retrieve dask client if dask is not installed
Aug 6, 2023
8c6738c
Fix base tests when dask is not installed
Aug 6, 2023
144b909
Use new LocalCluster every time
Aug 8, 2023
3224f28
Fix client/backend logic
Aug 8, 2023
f1da39e
Add documentation to a silly square function
Aug 8, 2023
a0ed309
Switch to package-wise autouse fixture for dask.distributed.Client
Aug 8, 2023
e8625c0
Add explicit result() when computing with cluster
Aug 8, 2023
0ba8407
Fix codereview
Aug 17, 2023
6206760
Replace list with tuple in available_backends for RMSD
Aug 17, 2023
2e101cd
Remove unnecessary get_running_dask_client
Aug 17, 2023
b6cb101
Implement fixture injection for subclasses testing
Aug 17, 2023
1d547f7
Add warnings filters
Aug 17, 2023
34ec5fc
Fix backend check when client is present
Aug 17, 2023
0ecec69
Return get_runnning_dask_client function
Aug 17, 2023
81b7b71
Change dask fixture scope
Aug 18, 2023
540cd26
Close LocalCluster to avoid trillions of logs
Aug 18, 2023
5a39a1a
Implement ResultsGroup based aggregation instead of type matching
Aug 18, 2023
7695dde
Add non-default _get_aggregator() to RMS and Base classes
Aug 18, 2023
074f1b2
Mark test_multiprocessing.py::test_creating_multiple_universe_without…
Aug 19, 2023
94cbefd
Restore failing test
Aug 19, 2023
7f53672
Make aggregation functions static methods of ResultsGroup
Aug 21, 2023
0ca5c5f
Merge branch 'feature/dask-0' of github.com:marinegor/mdanalysis into…
Aug 21, 2023
72ece49
Remove test skip
Aug 21, 2023
ef95b04
Move parallel part into a separate file
Aug 24, 2023
6d37652
Fix imports
Aug 31, 2023
6c26771
Proof of concept for duck-typed backends
Sep 5, 2023
5cd0ab0
Remove unused code
Sep 5, 2023
165174c
Replace ParallelExecutor with multiple backend classes and add duck-t…
Sep 12, 2023
0a72d04
Add all tests for analysis/parallel.py and fix bug in ResultsGroup.nd…
Sep 12, 2023
a168266
Change typing to py3.9 compatible syntax
Sep 13, 2023
5683245
Add _is_parallelizable to AnalysisFromFunction
Sep 13, 2023
28b67f8
Remove dask[distributed] even as an optional dependency
Sep 13, 2023
44a4600
Update documentation
Sep 13, 2023
bf3fb06
Remove function to get running dask client
Sep 13, 2023
ced8a04
Remove unused code from analysis/conftest.py
Sep 13, 2023
a399f67
Fix documentation and minor issues from codereview
Sep 13, 2023
eafa51d
Update package/MDAnalysis/analysis/rms.py
Sep 13, 2023
4ba84a8
Merge branch 'feature/dask-0' of github.com:marinegor/mdanalysis into…
Sep 13, 2023
a936b4a
Add more backend validation tests and fix autoformatter issues
Sep 13, 2023
9641dc5
Start implementing correct result sizes in separate computation groups
Sep 15, 2023
9dda941
Continue working: diffusionmap and PCA tests fail
Sep 15, 2023
2d9ca29
Merge remote-tracking branch 'upstream/develop' into feature/appropri…
marinegor Jan 10, 2024
211cbcf
Fix bug in PCA trajectory iteration -- avoid explicit usage of self.s…
marinegor Jan 12, 2024
d4de910
update changelog and tests for PCA fix
marinegor Jan 12, 2024
a58e8e1
Merge branch 'bugfix/pca-frames-iteration' into feature/appropriate-s…
marinegor Jan 12, 2024
856f65c
Fix diffusionmap and pca
marinegor Jan 12, 2024
1058e00
Make sure not to reset self.{start,stop,step} during self._compute
marinegor Jan 12, 2024
24a11c3
Change iteration pattern to sliced trajectory
marinegor Jan 12, 2024
22987c7
Change iteration pattern to sliced trajectory
marinegor Jan 12, 2024
cc033a1
Update package/MDAnalysis/analysis/parallel.py
Jan 12, 2024
eb43f4f
Apply suggestions from code review
Jan 12, 2024
db3d8bc
Split _setup_frames into two separate functions
marinegor Jan 12, 2024
1107ccc
Merge branch 'feature/appropriate-sized-results-in-parallel' into fea…
marinegor Jan 12, 2024
bb53e3f
Merge branch 'feature/analysisbase-code-deduplication' into feature/p…
marinegor Jan 12, 2024
44bcd89
Add docstrings for _prepare_sliced_trajectory and _define_run_frames
marinegor Jan 12, 2024
0be8ab9
Remove dask-distributed from dependencies
marinegor Jan 12, 2024
25ef3c7
Test only 2 processors with parallelizable backends
marinegor Jan 12, 2024
633505f
Rename available_backends and safe
marinegor Jan 12, 2024
92bad39
Apply codereview changes
marinegor Jan 12, 2024
6bb1779
Make tests for AnalysisBase subclasses explicit
marinegor Jan 13, 2024
bf96b67
Exclude "multiprocessing" from analysis_class function available back…
marinegor Jan 13, 2024
102e91a
Split parallel.py into results.py and parallel.py
marinegor Jan 13, 2024
83a552c
Finalize separation of results and backends
marinegor Jan 13, 2024
6a685fd
Rename parallel.py to backends.py
marinegor Jan 14, 2024
e8b080d
Add results and backends to analysis/__init__.py
marinegor Jan 14, 2024
514688b
Fix pep8 errors in docstrings and code
marinegor Jan 14, 2024
ebcd4ca
Add versionadded to documentation
marinegor Jan 14, 2024
fcdf330
Merge branch 'feature/pep8-parallelization' into feature/parallelization
marinegor Jan 17, 2024
f529a41
Update sphinx documentation with backends and results
marinegor Jan 17, 2024
bf07a0f
Add parallelization reference to base.py
marinegor Jan 17, 2024
3408a54
Switch to relative imports
marinegor Jan 17, 2024
39c9560
Update documentation, adding introduced changes
marinegor Jan 17, 2024
53e00e8
Update documentation adding parallelization support for rms
marinegor Jan 17, 2024
2fb504b
Add module documentation to results and backends
marinegor Jan 17, 2024
0f240a5
Fix merge conflicts
marinegor Jan 21, 2024
abe711d
Fix BackendSerial validation and add its tests
marinegor Jan 22, 2024
8a59a75
Fix calling of self._is_paralellizable()
marinegor Jan 22, 2024
b87152e
Add tests on is_parallelizable and get_supported_backends
marinegor Jan 22, 2024
ff508de
Fix bug with default progressbar_kwargs being dict
marinegor Jan 24, 2024
d045225
Merge branch 'develop' into feature/dask-0
RMeli Jan 30, 2024
b9e8e53
Apply suggestions from code review
Jan 31, 2024
a53df61
Add docstrings to apply() in backends
marinegor Feb 7, 2024
4ea030c
Add double n_worker check
marinegor Feb 7, 2024
656e461
Apply suggestions from code review
Feb 7, 2024
552aab4
Merge with develop
marinegor Feb 7, 2024
ea7b0c9
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor Feb 7, 2024
b7de11c
Fix hasattr in double n_worker check
marinegor Feb 7, 2024
536a197
Revert test `with expectation` in test_align
marinegor Feb 7, 2024
04705d7
Merge remote-tracking branch 'origin/feature/dask-1' into feature/par…
marinegor Feb 17, 2024
a71c809
Merge remote-tracking branch 'upstream/develop' into feature/parallel…
marinegor Feb 21, 2024
c94b5bf
Update testsuite/MDAnalysisTests/analysis/test_pca.py
Feb 21, 2024
a67f5d3
Update package/MDAnalysis/lib/util.py
Feb 21, 2024
12b255b
Update changelog
marinegor Feb 21, 2024
eaa4c30
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor Feb 21, 2024
7e2ae21
Apply suggestions from code review
marinegor Feb 21, 2024
fded90e
Add parallelization section to the documentation
marinegor Feb 23, 2024
605b451
Fix versionadded in new classes
marinegor Feb 23, 2024
e2f1e1b
Finish parallelization section for documentation
marinegor Feb 23, 2024
f307d1e
Sync with develop
marinegor Feb 23, 2024
16030ad
Fix typos
marinegor Feb 23, 2024
c13f526
Merge branch 'develop' into feature/dask-0
RMeli Mar 4, 2024
4d0c8d3
Apply suggestions from code review
Mar 6, 2024
95b004f
Apply suggestions from code review
Mar 6, 2024
6e8de75
Refactor TreadsBackend example and add a warning
marinegor Apr 8, 2024
8d6bbfd
Add n_workers instantiation from backend argument
marinegor Apr 8, 2024
c9eaab9
Update package/MDAnalysis/analysis/backends.py
Apr 8, 2024
95d969e
Update package/doc/sphinx/source/documentation_pages/analysis/paralle…
Apr 8, 2024
c438381
Add remark about RMSF parallelization
marinegor Apr 8, 2024
140f252
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor Apr 8, 2024
69a44ec
Merge develop and fix merge conflicts
marinegor Apr 8, 2024
7a67248
Apply suggestions from codereview
marinegor Apr 19, 2024
9c45568
Apply suggestions from code review
marinegor Apr 19, 2024
35fb1ae
Fix documentation typo
marinegor Apr 19, 2024
a8e0ccc
Update dask installation test after exception text changed
marinegor Apr 21, 2024
ab394a4
Merge branch 'develop' into feature/dask-0
yuxuanzhuang Apr 28, 2024
1324890
Merge branch 'feature/dask-0' of github.com:marinegor/mdanalysis into…
orbeckst May 1, 2024
68a7d23
edited documentation for parallelization
orbeckst May 4, 2024
41f231c
analysis top level docs fixes
orbeckst May 5, 2024
0964974
Merge remote-tracking branch 'origin/feature/dask-0' into feature/par…
marinegor May 20, 2024
9627212
Added comments regarding `_is_parallelizable` (and fixed documentatio…
marinegor May 20, 2024
99d43b9
Rename AnalysisBase.parallelizable and fix parallelizable transformat…
marinegor May 22, 2024
d63a38b
Remove explicit parallelizable=True in NoJump test call
marinegor May 22, 2024
c51139b
Merge branch 'develop' into feature/dask-0
orbeckst Jun 11, 2024
2d6fc0a
Apply suggestions from code review
orbeckst Jun 11, 2024
862cfb4
add explicit comment to AnalysisBase._analysis_algorithm_is_paralleli…
orbeckst Jun 11, 2024
fa87c2d
Add client_RMSD explanation
marinegor Jun 12, 2024
bb9df26
versioninformation markup fix in base.py
orbeckst Jun 13, 2024
57b581c
Merge branch 'develop' into feature/dask-0
orbeckst Jun 18, 2024
4eddd8c
Merge branch 'develop' into feature/dask-0
marinegor Jul 29, 2024
f182b9e
Apply suggestions from code review
marinegor Aug 5, 2024
e1f4e28
Apply suggestions from code review
marinegor Aug 5, 2024
d543f42
Add comments explaining client_... fixtures
marinegor Aug 5, 2024
611398d
Move class properties to the top of the class
marinegor Aug 5, 2024
e31e317
Undo accidental versionadded change
marinegor Aug 5, 2024
ccd1842
Remove duplicating versionadded
marinegor Aug 5, 2024
2ca27d0
Add versionadded for backend
marinegor Aug 5, 2024
c063585
Add link to github profile
marinegor Aug 5, 2024
8bbd901
Update package/doc/sphinx/source/documentation_pages/analysis/paralle…
orbeckst Aug 7, 2024
afba2c3
Update testsuite/MDAnalysisTests/analysis/test_backends.py
marinegor Aug 11, 2024
9fe37f9
minor text fixes
orbeckst Aug 15, 2024
ff4d0c2
Update package/MDAnalysis/analysis/base.py
marinegor Aug 16, 2024
f6672ff
Update package/MDAnalysis/analysis/base.py
marinegor Aug 16, 2024
1dc4613
Remove issubclass check
marinegor Aug 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/actions/setup-deps/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ inputs:
default: 'chemfiles-python>=0.9'
clustalw:
default: 'clustalw=2.1'
dask:
default: 'dask'
distopia:
default: 'distopia>=0.2.0'
h5py:
Expand Down Expand Up @@ -134,6 +136,7 @@ runs:
${{ inputs.biopython }}
${{ inputs.chemfiles-python }}
${{ inputs.clustalw }}
${{ inputs.dask }}
${{ inputs.distopia }}
${{ inputs.gsd }}
${{ inputs.h5py }}
Expand Down
2 changes: 2 additions & 0 deletions package/CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Fixes
* Fix groups.py doctests using sphinx directives (Issue #3925, PR #4374)

Enhancements
* Introduce parallelization API to `AnalysisBase` and to `analysis.rms.RMSD` class
(Issue #4158, PR #4304)
* Add `analysis.DSSP` module for protein secondary structure assignment, based on [pydssp](https://github.com/ShintaroMinami/PyDSSP)
* Added a tqdm progress bar for `MDAnalysis.analysis.pca.PCA.transform()`
(PR #4531)
Expand Down
2 changes: 2 additions & 0 deletions package/MDAnalysis/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

__all__ = [
'align',
'backends',
'base',
'contacts',
'density',
Expand All @@ -45,6 +46,7 @@
'pca',
'psa',
'rdf',
'results',
'rms',
'waterdynamics',
]
333 changes: 333 additions & 0 deletions package/MDAnalysis/analysis/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
"""Analysis backends --- :mod:`MDAnalysis.analysis.backends`
============================================================

.. versionadded:: 2.8.0


The :mod:`backends` module provides :class:`BackendBase` base class to
implement custom backends for
marinegor marked this conversation as resolved.
Show resolved Hide resolved
:meth:`MDAnalysis.analysis.base.AnalysisBase.run` and its
subclasses.

.. SeeAlso:: :ref:`parallel-analysis`

.. _backends:

Backends
--------

Three built-in backend classes classes are provided:
marinegor marked this conversation as resolved.
Show resolved Hide resolved

* *serial*: :class:`BackendSerial`, that is equal to using no
parallelization and is default
marinegor marked this conversation as resolved.
Show resolved Hide resolved

* *multiprocessing*: :class:`BackendMultiprocessing` that supports
parallelization via standard Python :mod:`multiprocessing` module
and uses default :mod:`pickle` serialization

* *dask*: :class:`BackendDask`, that uses the same process-based
parallelization as :class:`BackendMultiprocessing`, but different
serialization algorithm via `dask <https://dask.org/>`_ (see `dask
serialization algorithms
<https://distributed.dask.org/en/latest/serialization.html>`_ for details)

Classes
-------

yuxuanzhuang marked this conversation as resolved.
Show resolved Hide resolved
"""
import warnings
from typing import Callable
from MDAnalysis.lib.util import is_installed


class BackendBase:
"""Base class for backend implementation.

Initializes an instance and performs checks for its validity, such as
``n_workers`` and possibly other ones.

Parameters
----------
n_workers : int
number of workers (usually, processes) over which the work is split

Examples
yuxuanzhuang marked this conversation as resolved.
Show resolved Hide resolved
--------
.. code-block:: python

from MDAnalysis.analysis.backends import BackendBase

class ThreadsBackend(BackendBase):
def apply(self, func, computations):
from multiprocessing.dummy import Pool

with Pool(processes=self.n_workers) as pool:
results = pool.map(func, computations)
return results

import MDAnalysis as mda
from MDAnalysis.tests.datafiles import PSF, DCD
from MDAnalysis.analysis.rms import RMSD

u = mda.Universe(PSF, DCD)
ref = mda.Universe(PSF, DCD)

R = RMSD(u, ref)

n_workers = 2
backend = ThreadsBackend(n_workers=n_workers)
R.run(backend=backend, unsupported_backend=True)
RMeli marked this conversation as resolved.
Show resolved Hide resolved

.. warning::
Using `ThreadsBackend` above will lead to erroneous results, since it
is an educational example. Do not use it for real analysis.


.. versionadded:: 2.8.0
orbeckst marked this conversation as resolved.
Show resolved Hide resolved

"""

def __init__(self, n_workers: int):
self.n_workers = n_workers
self._validate()

def _get_checks(self):
"""Get dictionary with ``condition: error_message`` pairs that ensure the
validity of the backend instance

Returns
-------
dict
dictionary with ``condition: error_message`` pairs that will get
checked during ``_validate()`` run
"""
return {
isinstance(self.n_workers, int) and self.n_workers > 0:
f"n_workers should be positive integer, got {self.n_workers=}",
}

def _get_warnings(self):
"""Get dictionary with ``condition: warning_message`` pairs that ensure
the good usage of the backend instance

Returns
-------
dict
dictionary with ``condition: warning_message`` pairs that will get
checked during ``_validate()`` run
"""
return dict()

def _validate(self):
"""Check correctness (e.g. ``dask`` is installed if using ``backend='dask'``)
and good usage (e.g. ``n_workers=1`` if backend is serial) of the backend

Raises
------
ValueError
if one of the conditions in :meth:`_get_checks` is ``True``
"""
for check, msg in self._get_checks().items():
if not check:
raise ValueError(msg)
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
for check, msg in self._get_warnings().items():
if not check:
warnings.warn(msg)

def apply(self, func: Callable, computations: list) -> list:
"""map function `func` to all tasks in the `computations` list

Main method that will get called when using an instance of
``BackendBase``. It is equivalent to running ``[func(item) for item in
computations]`` while using the parallel backend capabilities.

Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to

Returns
-------
list
list of results of the function

"""
raise NotImplementedError


class BackendSerial(BackendBase):
"""A built-in backend that does serial execution of the function, without any
parallelization
marinegor marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
n_workers : int
Is ignored in this class, and if ``n_workers`` > 1, a warning will be
given.

marinegor marked this conversation as resolved.
Show resolved Hide resolved

.. versionadded:: 2.8.0
"""

def _get_warnings(self):
"""Get dictionary with ``condition: warning_message`` pairs that ensure
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An issue for discussion afterwards, since this isn't the only place we do this, I believe that as of Python 3.5, you can get away without having to redefine the docstring, it'll just inherit it from the parent.

See:

In [10]: class Base:
    ...:     def __init__(self, n_workers):
    ...:         self.n_workers = n_workers
    ...:         pass
    ...:     def _run(self):
    ...:         """
    ...:         prints out the number of workers
    ...:         """
    ...:         print(self.n_workers)
    ...: 

In [11]: class A(Base):
    ...:     def _run(self):
    ...:         print(self.n_workers+1)
    ...: 

In [12]: Base._run??
Signature: Base._run(self)
Source:   
    def _run(self):
        """
        prints out the number of workers
        """
        print(self.n_workers)
File:      ~/github/IndustryBenchmarks2024/docs/source/<ipython-input-10-3ad4e8f2102b>
Type:      function

In [13]: A._run??
Signature: A._run(self)
Docstring: prints out the number of workers
Source:   
    def _run(self):
        print(self.n_workers+1)
File:      ~/github/IndustryBenchmarks2024/docs/source/<ipython-input-11-e1adfaaade90>
Type:      function

the good usage of the backend instance. Here, it checks if the number
of workers is not 1, otherwise gives warning.

Returns
-------
dict
dictionary with ``condition: warning_message`` pairs that will get
checked during ``_validate()`` run
"""
return {
self.n_workers == 1:
"n_workers is ignored when executing with backend='serial'"
}

def apply(self, func: Callable, computations: list) -> list:
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
"""
Applies `func` to each object in ``computations``.
marinegor marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to

Returns
-------
list
list of results of the function
"""
return [func(task) for task in computations]


class BackendMultiprocessing(BackendBase):
"""A built-in backend that executes a given function using the
:meth:`multiprocessing.Pool.map <multiprocessing.pool.Pool.map>` method.

Parameters
----------
n_workers : int
number of processes in :class:`multiprocessing.Pool
<multiprocessing.pool.Pool>` to distribute the workload
between. Must be a positive integer.

Examples
--------

.. code-block:: python

from MDAnalysis.analysis.backends import BackendMultiprocessing
import multiprocessing as mp

backend_obj = BackendMultiprocessing(n_workers=mp.cpu_count())


.. versionadded:: 2.8.0

"""

def apply(self, func: Callable, computations: list) -> list:
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
"""Applies `func` to each object in ``computations``.
marinegor marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to

Returns
-------
list
list of results of the function
"""
from multiprocessing import Pool

with Pool(processes=self.n_workers) as pool:
results = pool.map(func, computations)
return results


class BackendDask(BackendBase):
"""A built-in backend that executes a given function with *dask*.

Execution is performed with the :func:`dask.compute` function of
:class:`dask.delayed.Delayed` object (created with
:func:`dask.delayed.delayed`) with ``scheduler='processes'`` and
``chunksize=1`` (this ensures uniform distribution of tasks among
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for future consideration, but it might be interesting to expose the chunksize to the user.

processes). Requires the `dask package <https://docs.dask.org/en/stable/>`_
to be `installed <https://docs.dask.org/en/stable/install.html>`_.

Parameters
----------
n_workers : int
number of processes in to distribute the workload
between. Must be a positive integer. Workers are actually
:class:`multiprocessing.pool.Pool` processes, but they use a different and
more flexible `serialization protocol
<https://docs.dask.org/en/stable/phases-of-computation.html#graph-serialization>`_.

Examples
--------

.. code-block:: python

from MDAnalysis.analysis.backends import BackendDask
import multiprocessing as mp

backend_obj = BackendDask(n_workers=mp.cpu_count())


.. versionadded:: 2.8.0

"""

def apply(self, func: Callable, computations: list) -> list:
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
"""Applies `func` to each object in ``computations``.

Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to

Returns
-------
list
list of results of the function
"""
from dask.delayed import delayed
import dask

computations = [delayed(func)(task) for task in computations]
results = dask.compute(computations,
scheduler="processes",
chunksize=1,
num_workers=self.n_workers)[0]
return results

def _get_checks(self):
"""Get dictionary with ``condition: error_message`` pairs that ensure the
validity of the backend instance. Here checks if ``dask`` module is
installed in the environment.

Returns
-------
dict
dictionary with ``condition: error_message`` pairs that will get
checked during ``_validate()`` run
"""
base_checks = super()._get_checks()
checks = {
is_installed("dask"):
("module 'dask' is missing. Please install: "
"https://docs.dask.org/en/stable/install.html")
marinegor marked this conversation as resolved.
Show resolved Hide resolved
}
return base_checks | checks
Loading
Loading