Skip to content

Commit

Permalink
refactoring: UntimedEffect and Precondition mixins
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Gobbi committed Dec 4, 2024
1 parent 162427f commit 8a8a44b
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 412 deletions.
219 changes: 8 additions & 211 deletions unified_planning/model/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
from typing import Any, Dict, List, Set, Union, Optional, Iterable
from collections import OrderedDict

from unified_planning.model.transition import SingleTimePointTransitionMixin, Transition
from unified_planning.model.transition import (
UntimedEffectMixin,
PreconditionMixin,
Transition,
)


class Action(Transition):
Expand All @@ -50,7 +54,7 @@ def __call__(
)


class InstantaneousAction(Action, SingleTimePointTransitionMixin):
class InstantaneousAction(UntimedEffectMixin, Action, PreconditionMixin):
"""Represents an instantaneous action."""

def __init__(
Expand All @@ -61,15 +65,8 @@ def __init__(
**kwargs: "up.model.types.Type",
):
Action.__init__(self, _name, _parameters, _env, **kwargs)
SingleTimePointTransitionMixin.__init__(self, _env)
self._effects: List[up.model.effect.Effect] = []
self._simulated_effect: Optional[up.model.effect.SimulatedEffect] = None
# fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment
self._fluents_assigned: Dict[
"up.model.fnode.FNode", "up.model.fnode.FNode"
] = {}
# fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease
self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set()
PreconditionMixin.__init__(self, _env)
UntimedEffectMixin.__init__(self, _env)

def __repr__(self) -> str:
s = []
Expand Down Expand Up @@ -139,206 +136,6 @@ def clone(self):
new_instantaneous_action._simulated_effect = self._simulated_effect
return new_instantaneous_action

@property
def effects(self) -> List["up.model.effect.Effect"]:
"""Returns the `list` of the `Action effects`."""
return self._effects

def clear_effects(self):
"""Removes all the `Action's effects`."""
self._effects = []
self._fluents_assigned = {}
self._fluents_inc_dec = set()
self._simulated_effect = None

@property
def conditional_effects(self) -> List["up.model.effect.Effect"]:
"""Returns the `list` of the `action conditional effects`.
IMPORTANT NOTE: this property does some computation, so it should be called as
seldom as possible."""
return [e for e in self._effects if e.is_conditional()]

def is_conditional(self) -> bool:
"""Returns `True` if the `action` has `conditional effects`, `False` otherwise."""
return any(e.is_conditional() for e in self._effects)

@property
def unconditional_effects(self) -> List["up.model.effect.Effect"]:
"""Returns the `list` of the `action unconditional effects`.
IMPORTANT NOTE: this property does some computation, so it should be called as
seldom as possible."""
return [e for e in self._effects if not e.is_conditional()]

def add_effect(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
value: "up.model.expression.Expression",
condition: "up.model.expression.BoolExpression" = True,
forall: Iterable["up.model.variable.Variable"] = tuple(),
):
"""
Adds the given `assignment` to the `action's effects`.
:param fluent: The `fluent` of which `value` is modified by the `assignment`.
:param value: The `value` to assign to the given `fluent`.
:param condition: The `condition` in which this `effect` is applied; the default
value is `True`.
:param forall: The 'Variables' that are universally quantified in this
effect; the default value is empty.
"""
(
fluent_exp,
value_exp,
condition_exp,
) = self._environment.expression_manager.auto_promote(fluent, value, condition)
if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot():
raise UPUsageError(
"fluent field of add_effect must be a Fluent or a FluentExp or a Dot."
)
if not self._environment.type_checker.get_type(condition_exp).is_bool_type():
raise UPTypeError("Effect condition is not a Boolean condition!")
if not fluent_exp.type.is_compatible(value_exp.type):
# Value is not assignable to fluent (its type is not a subset of the fluent's type).
raise UPTypeError(
f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}"
)
self._add_effect_instance(
up.model.effect.Effect(fluent_exp, value_exp, condition_exp, forall=forall)
)

def add_increase_effect(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
value: "up.model.expression.Expression",
condition: "up.model.expression.BoolExpression" = True,
forall: Iterable["up.model.variable.Variable"] = tuple(),
):
"""
Adds the given `increase effect` to the `action's effects`.
:param fluent: The `fluent` which `value` is increased.
:param value: The given `fluent` is incremented by the given `value`.
:param condition: The `condition` in which this `effect` is applied; the default
value is `True`.
:param forall: The 'Variables' that are universally quantified in this
effect; the default value is empty.
"""
(
fluent_exp,
value_exp,
condition_exp,
) = self._environment.expression_manager.auto_promote(
fluent,
value,
condition,
)
if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot():
raise UPUsageError(
"fluent field of add_increase_effect must be a Fluent or a FluentExp or a Dot."
)
if not condition_exp.type.is_bool_type():
raise UPTypeError("Effect condition is not a Boolean condition!")
if not fluent_exp.type.is_compatible(value_exp.type):
raise UPTypeError(
f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}"
)
if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type():
raise UPTypeError("Increase effects can be created only on numeric types!")
self._add_effect_instance(
up.model.effect.Effect(
fluent_exp,
value_exp,
condition_exp,
kind=up.model.effect.EffectKind.INCREASE,
forall=forall,
)
)

def add_decrease_effect(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
value: "up.model.expression.Expression",
condition: "up.model.expression.BoolExpression" = True,
forall: Iterable["up.model.variable.Variable"] = tuple(),
):
"""
Adds the given `decrease effect` to the `action's effects`.
:param fluent: The `fluent` which value is decreased.
:param value: The given `fluent` is decremented by the given `value`.
:param condition: The `condition` in which this `effect` is applied; the default
value is `True`.
:param forall: The 'Variables' that are universally quantified in this
effect; the default value is empty.
"""
(
fluent_exp,
value_exp,
condition_exp,
) = self._environment.expression_manager.auto_promote(fluent, value, condition)
if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot():
raise UPUsageError(
"fluent field of add_decrease_effect must be a Fluent or a FluentExp or a Dot."
)
if not condition_exp.type.is_bool_type():
raise UPTypeError("Effect condition is not a Boolean condition!")
if not fluent_exp.type.is_compatible(value_exp.type):
raise UPTypeError(
f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}"
)
if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type():
raise UPTypeError("Decrease effects can be created only on numeric types!")
self._add_effect_instance(
up.model.effect.Effect(
fluent_exp,
value_exp,
condition_exp,
kind=up.model.effect.EffectKind.DECREASE,
forall=forall,
)
)

def _add_effect_instance(self, effect: "up.model.effect.Effect"):
assert (
effect.environment == self._environment
), "effect does not have the same environment of the action"
up.model.effect.check_conflicting_effects(
effect,
None,
self._simulated_effect,
self._fluents_assigned,
self._fluents_inc_dec,
"action",
)
self._effects.append(effect)

@property
def simulated_effect(self) -> Optional["up.model.effect.SimulatedEffect"]:
"""Returns the `action` `simulated effect`."""
return self._simulated_effect

def set_simulated_effect(self, simulated_effect: "up.model.effect.SimulatedEffect"):
"""
Sets the given `simulated effect` as the only `action's simulated effect`.
:param simulated_effect: The `SimulatedEffect` instance that must be set as this `action`'s only
`simulated effect`.
"""
up.model.effect.check_conflicting_simulated_effects(
simulated_effect,
None,
self._fluents_assigned,
self._fluents_inc_dec,
"action",
)
if simulated_effect.environment != self.environment:
raise UPUsageError(
"The added SimulatedEffect does not have the same environment of the Action"
)
self._simulated_effect = simulated_effect


class DurativeAction(Action, TimedCondsEffs):
"""Represents a durative action."""
Expand Down
12 changes: 2 additions & 10 deletions unified_planning/model/mixins/natural_transitions_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,9 @@ def events(
@property
def natural_transitions(
self,
) -> List[
Union[
"up.model.natural_transition.Event", "up.model.natural_transition.Process"
]
]:
) -> List["up.model.natural_transition.NaturalTransition"]:
"""Returns the list of the `Processes` and `Events` in the `Problem`."""
ntlist: List[
Union[
up.model.natural_transition.Event, up.model.natural_transition.Process
]
] = []
ntlist: List[up.model.natural_transition.NaturalTransition] = []
ntlist.extend(self.processes)
ntlist.extend(self.events)
return ntlist
Expand Down
Loading

0 comments on commit 8a8a44b

Please sign in to comment.