Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operation.with_directives decorator #502

Merged
merged 24 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
68f94da
Add FlowProject.operation.with_directives decorator
b-butler Apr 20, 2021
00eed2e
Update changelog for ``FlowProject.operation.with_directives``.
b-butler Apr 20, 2021
ffa7fec
Move documentation examples to use ``FlowProject.operation.with_direc…
b-butler Apr 20, 2021
bacd044
Return to using an instance for FlowProject.operation
b-butler Apr 21, 2021
45862c2
Rename FlowProject specific operation to OperationRegister
b-butler Apr 21, 2021
88f309f
Revert OperationRegister.with_directives to method
b-butler Apr 21, 2021
9f75c91
Fix test name typo directies -> directives
b-butler Apr 21, 2021
e537d38
Add tests to improve codecoverage
b-butler Apr 23, 2021
f3e3ec3
Fix bug where user groups could overwrite operation names
b-butler Apr 23, 2021
65d8702
Merge branch 'master' into feature/operation-with-directives
b-butler Apr 23, 2021
a2139dc
Merge branch 'master' into feature/operation-with-directives
b-butler Apr 27, 2021
20d5711
Merge branch 'master' into feature/operation-with-directives
bdice May 6, 2021
4e6d722
Add comma.
bdice May 6, 2021
7831c1d
Use double quotes for consistency.
bdice May 6, 2021
112137d
Remove empty directives line.
bdice May 6, 2021
632e6e1
Use dict syntax.
bdice May 6, 2021
b4bc426
Deprecate @directives and remove from all but one test
b-butler May 13, 2021
718310d
Deprecate @directives and remove from all but one test
b-butler May 13, 2021
5573dde
Apply suggestions from code review
b-butler May 14, 2021
bf690f5
Update changelog
b-butler May 14, 2021
a856c8d
Merge branch 'master' into feature/operation-with-directives
b-butler May 18, 2021
b42b521
Remove old tests on invalid operation func signatures
b-butler May 18, 2021
65cb99e
Merge remote-tracking branch 'origin/master' into feature/operation-w…
bdice May 20, 2021
895fa20
Update changelog.
bdice May 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ Added
+++++

- Add official support for Andes cluster (#500).
- Decorator for setting directives while registering operation function ``FlowProject.operation.with_directives`` (#309, #502).

Changed
+++++++

- Jinja templates are indented for easier reading (#461, #495).
- ``flow.directives`` is deprecated in favor of ``flow.FlowProject.operation.with_directives`` (#309, #502).

Fixed
+++++
Expand Down
6 changes: 5 additions & 1 deletion doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The FlowProject
FlowProject.main
FlowProject.make_group
FlowProject.operation
FlowProject.operation.with_directives
FlowProject.operations
FlowProject.post
FlowProject.post.copy_from
Expand All @@ -58,8 +59,11 @@ The FlowProject
.. autoclass:: FlowProject
:show-inheritance:
:members:
:exclude-members: pre,post
:exclude-members: pre,post,operation

.. automethod:: flow.FlowProject.operation(func, name=None)
b-butler marked this conversation as resolved.
Show resolved Hide resolved

.. automethod:: flow.FlowProject.operation.with_directives(directives, name=None)

.. automethod:: flow.FlowProject.post

Expand Down
15 changes: 5 additions & 10 deletions flow/directives.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,7 @@ def _sum_not_none(value, other):

.. code-block:: python

@Project.operation
@directives(walltime=24)
@Project.operation.with_directives({"walltime": 24})
def op(job):
# This operation takes 1 day to run
pass
Expand All @@ -541,8 +540,7 @@ def op(job):

.. code-block:: python

@Project.operation
@directives(memory="4g")
@Project.operation.with_directives({"memory": "4g"})
def op(job):
pass

Expand All @@ -552,8 +550,7 @@ def op(job):

.. code-block:: python

@Project.operation
@directives(memory="512m")
@Project.operation.with_directives({"memory": "512m"})
def op(job):
pass

Expand All @@ -563,13 +560,11 @@ def op(job):

.. code-block:: python

@Project.operation
@directives(memory="4")
@Project.operation.with_directives({"memory": "4"})
def op1(job):
pass

@Project.operation
@directives(memory=4)
@Project.operation.with_directives({"memory": 4})
def op2(job):
pass
"""
Expand Down
13 changes: 7 additions & 6 deletions flow/environments/incite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ class SummitEnvironment(DefaultLSFEnvironment):

Example::

@Project.operation
@directives(nranks=3) # 3 MPI ranks per operation
@directives(ngpu=3) # 3 GPUs
@directives(np=3) # 3 CPU cores
@directives(rs_tasks=3) # 3 tasks per resource set
@directives(extra_jsrun_args='--smpiargs="-gpu"') # extra jsrun arguments
@Project.operation.with_directives({
"nranks": 3, # 3 MPI ranks per operation
"ngpu": 3, # 3 GPUs
"np": 3, # 3 CPU cores
"rs_tasks": 3, # 3 tasks per resource set
"extra_jsrun_args": '--smpiargs="-gpu"', # extra jsrun arguments
})
def my_operation(job):
...

Expand Down
9 changes: 9 additions & 0 deletions flow/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from functools import wraps
from textwrap import indent

from deprecation import deprecated

from .environment import ComputeEnvironment
from .version import __version__

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,6 +111,12 @@ def decorated(*jobs):
return decorated


@deprecated(
deprecated_in="0.15",
removed_in="1.0",
current_version=__version__,
details="Use FlowProject.operation.with_directives",
)
class directives:
"""Decorator for operation functions to provide additional execution directives.

Expand Down
197 changes: 125 additions & 72 deletions flow/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,13 @@ class FlowGroup:

group = FlowProject.make_group(name='example_group')

@group.with_directives(nranks=4)
@FlowProject.operation
@directives(nranks=2, executable="python3")
@group.with_directives({"nranks": 4})
@FlowProject.operation.with_directives({"nranks": 2, "executable": "python3"})
def op1(job):
pass

@group
@FlowProject.operation
@directives(nranks=2, executable="python3")
@FlowProject.operation.with_directives({"nranks": 2, "executable": "python3"})
def op2(job):
pass

Expand Down Expand Up @@ -1251,6 +1249,9 @@ def __new__(metacls, name, bases, namespace):
cls.pre = cls._setup_preconditions_class(parent_class=cls)
cls.post = cls._setup_postconditions_class(parent_class=cls)

# Give the class an operation register object
cls.operation = cls._setup_operation_object(parent_class=cls)

# All groups are registered with the function returned by the
# make_group classmethod. In contrast to operations and labels, the
# make_group classmethod does not serve as the decorator, the functor
Expand Down Expand Up @@ -1417,6 +1418,119 @@ def copy_from(cls, *other_funcs):

return post

@staticmethod
def _setup_operation_object(parent_class):
class OperationRegister:
"""Add operation functions to the class workflow definition.

This object is designed to be used as a decorator, for example:

.. code-block:: python

@FlowProject.operation
def hello(job):
print('Hello', job)

Directives can also be specified by using :meth:`FlowProject.operation.with_directives`.

.. code-block:: python

@FlowProject.operation.with_directives({"nranks": 4})
def mpi_hello(job):
print("hello")

Parameters
----------
func : callable
The function to add to the workflow.
name : str
The operation name. Uses the name of the function if None.
(Default value = None)

Returns
-------
callable
The operation function.
"""

_parent_class = parent_class

def __call__(self, func, name=None):
if isinstance(func, str):
return lambda op: self(op, name=func)

if func in chain(
*self._parent_class._OPERATION_PRECONDITIONS.values(),
*self._parent_class._OPERATION_POSTCONDITIONS.values(),
):
raise ValueError(
"A condition function cannot be used as an operation."
)

if name is None:
name = func.__name__

for (
registered_name,
registered_func,
) in self._parent_class._OPERATION_FUNCTIONS:
if name == registered_name:
raise ValueError(
f"An operation with name '{name}' is already registered."
)
if func is registered_func:
raise ValueError(
"An operation with this function is already registered."
)
if name in self._parent_class._GROUP_NAMES:
raise ValueError(
f"A group with name '{name}' is already registered."
)

if not getattr(func, "_flow_aggregate", False):
func._flow_aggregate = aggregator.groupsof(1)

# Append the name and function to the class registry
self._parent_class._OPERATION_FUNCTIONS.append((name, func))
# We register aggregators associated with operation functions in
# `_register_groups` and we do not set the aggregator explicitly. We
# delay setting the aggregator because we do not restrict the decorator
# placement in terms of `@FlowGroupEntry`, `@aggregator`, or
# `@operation`.
self._parent_class._GROUPS.append(FlowGroupEntry(name=name, options=""))
if hasattr(func, "_flow_groups"):
func._flow_groups.append(name)
else:
func._flow_groups = [name]
return func

def with_directives(self, directives, name=None):
"""Return a decorator that also sets directives for the operation.

Parameters
----------
directives : dict
Directives to use for resource requests and running the operation through the
group.
name : str
The operation name. Uses the name of the function if None.
(Default value = None)

Returns
-------
function
A decorator which registers the function with the correct name and directives as
an operation of the :class:`~.FlowProject` subclass.
"""

def add_operation_with_directives(function):
function._flow_directives = directives
return self(function, name)

return add_operation_with_directives

return OperationRegister()


class FlowProject(signac.contrib.Project, metaclass=_FlowProjectClass):
"""A signac project class specialized for workflow management.
Expand Down Expand Up @@ -4050,73 +4164,6 @@ def _next_operations(
ignore_conditions=ignore_conditions,
)

@classmethod
def operation(cls, func, name=None):
"""Add an operation function to the class workflow definition.

This function is designed to be used as a decorator, for example:

.. code-block:: python

@FlowProject.operation
def hello(job):
print('Hello', job)

Parameters
----------
func : callable
The function to add to the workflow.
name : str
The operation name. Uses the name of the function if None.
(Default value = None)

Returns
-------
callable
The operation function.

"""
if isinstance(func, str):
return lambda op: cls.operation(op, name=func)

if func in chain(
*cls._OPERATION_PRECONDITIONS.values(),
*cls._OPERATION_POSTCONDITIONS.values(),
):
raise ValueError("A condition function cannot be used as an operation.")

if name is None:
name = func.__name__

for registered_name, registered_func in cls._OPERATION_FUNCTIONS:
if name == registered_name:
raise ValueError(
f"An operation with name '{name}' is already registered."
)
if func is registered_func:
raise ValueError(
"An operation with this function is already registered."
)
if name in cls._GROUP_NAMES:
raise ValueError(f"A group with name '{name}' is already registered.")

if not getattr(func, "_flow_aggregate", False):
func._flow_aggregate = aggregator.groupsof(1)

# Append the name and function to the class registry
cls._OPERATION_FUNCTIONS.append((name, func))
# We register aggregators associated with operation functions in
# `_register_groups` and we do not set the aggregator explicitly.
# We delay setting the aggregator because we do not restrict the
# decorator placement in terms of `@FlowGroupEntry`, `@aggregator`, or
# `@operation`.
cls._GROUPS.append(FlowGroupEntry(name=name, options=""))
if hasattr(func, "_flow_groups"):
func._flow_groups.append(name)
else:
func._flow_groups = [name]
return func

@classmethod
def _collect_operations(cls):
"""Collect all operations added with the ``@FlowProject.operation`` decorator."""
Expand Down Expand Up @@ -4205,6 +4252,12 @@ def foo(job):
"""
if name in cls._GROUP_NAMES:
raise ValueError(f"Repeat definition of group with name '{name}'.")
if any(
name == operation_name for operation_name, _ in cls._OPERATION_FUNCTIONS
):
raise ValueError(
f"Cannot create a group with the same name as the existing operation {name}"
)
cls._GROUP_NAMES.add(name)
group_entry = FlowGroupEntry(
name=name, options=options, group_aggregator=group_aggregator
Expand Down
12 changes: 5 additions & 7 deletions tests/define_directives_test_project.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime

import flow
from flow import FlowProject


Expand All @@ -11,22 +10,21 @@ class _DirectivesTestProject(FlowProject):
group = _DirectivesTestProject.make_group(name="walltimegroup")


@_DirectivesTestProject.operation
@flow.directives(walltime=1.0)
@_DirectivesTestProject.operation.with_directives({"walltime": 1.0})
@group
def op_walltime(job):
pass


@_DirectivesTestProject.operation
@flow.directives(walltime=None)
@_DirectivesTestProject.operation.with_directives({"walltime": None})
@group
def op_walltime_2(job):
pass


@_DirectivesTestProject.operation
@flow.directives(walltime=datetime.timedelta(hours=2))
@_DirectivesTestProject.operation.with_directives(
{"walltime": datetime.timedelta(hours=2)}
)
@group
def op_walltime_3(job):
pass
Expand Down
Loading