Skip to content

Commit

Permalink
Add new test and fixed a number of upcoming issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Enrico Scala authored and Enrico Scala committed Nov 4, 2024
1 parent b7f61f0 commit 0d6b4de
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 48 deletions.
30 changes: 28 additions & 2 deletions unified_planning/model/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def __init__(
self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set()

def __eq__(self, oth: object) -> bool:
if isinstance(oth, InstantaneousAction):
if isinstance(oth, InstantaneousTransitionMixin):
cond = (
self._environment == oth._environment
and self._name == oth._name
Expand Down Expand Up @@ -207,7 +207,7 @@ def clone(self):
new_params = OrderedDict(
(param_name, param.type) for param_name, param in self._parameters.items()
)
new_instantaneous_action = InstantaneousAction(
new_instantaneous_action = InstantaneousTransitionMixin(
self._name, new_params, self._environment
)
new_instantaneous_action._preconditions = self._preconditions[:]
Expand Down Expand Up @@ -488,6 +488,20 @@ def __repr__(self) -> str:
s.append(" }")
return "".join(s)

def clone(self):
new_params = OrderedDict(
(param_name, param.type) for param_name, param in self._parameters.items()
)
new_instantaneous_action = InstantaneousAction(
self._name, new_params, self._environment
)
new_instantaneous_action._preconditions = self._preconditions[:]
new_instantaneous_action._effects = [e.clone() for e in self._effects]
new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy()
new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy()
new_instantaneous_action._simulated_effect = self._simulated_effect
return new_instantaneous_action


class DurativeAction(Action, TimedCondsEffs):
"""Represents a durative action."""
Expand Down Expand Up @@ -1014,3 +1028,15 @@ def __repr__(self) -> str:
s.append(f" simulated effect = {self._simulated_effect}\n")
s.append(" }")
return "".join(s)

def clone(self):
new_params = OrderedDict(
(param_name, param.type) for param_name, param in self._parameters.items()
)
new_instantaneous_action = Event(self._name, new_params, self._environment)
new_instantaneous_action._preconditions = self._preconditions[:]
new_instantaneous_action._effects = [e.clone() for e in self._effects]
new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy()
new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy()
new_instantaneous_action._simulated_effect = self._simulated_effect
return new_instantaneous_action
8 changes: 2 additions & 6 deletions unified_planning/model/problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ def _get_static_and_unused_fluents(
(f.fluent() for e in exps for f in fve.get(e))
)
for a in self._actions:
if isinstance(a, up.model.action.InstantaneousAction) or isinstance(
a, up.model.action.Event
):
if isinstance(a, up.model.action.InstantaneousTransitionMixin):
remove_used_fluents(*a.preconditions)
for e in a.effects:
remove_used_fluents(e.fluent, e.value, e.condition)
Expand Down Expand Up @@ -994,9 +992,7 @@ def update_problem_kind_action(
if isinstance(action, up.model.tamp.InstantaneousMotionAction):
if len(action.motion_constraints) > 0:
self.kind.set_problem_class("TAMP")
if isinstance(action, up.model.action.InstantaneousAction) or isinstance(
action, up.model.action.Event
):
if isinstance(action, up.model.action.InstantaneousTransitionMixin):
for c in action.preconditions:
self.update_problem_kind_expression(c)
for e in action.effects:
Expand Down
10 changes: 9 additions & 1 deletion unified_planning/test/examples/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@
import unified_planning.test.examples.testing_variants as testing_variants
import unified_planning.test.examples.hierarchical as hierarchical
import unified_planning.test.examples.scheduling as scheduling
import unified_planning.test.examples.processes as processes


def get_example_problems():
sub_modules = [minimals, realistic, testing_variants, hierarchical, scheduling]
sub_modules = [
minimals,
realistic,
testing_variants,
hierarchical,
scheduling,
processes,
]
return dict(x for m in sub_modules for x in m.get_example_problems().items())
78 changes: 41 additions & 37 deletions unified_planning/test/examples/processes.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,43 @@
from unified_planning.shortcuts import *
from unified_planning.model.action import Process


on = Fluent("on")
d = Fluent("d", RealType())

a = InstantaneousAction("turn_on")
a.add_precondition(Not(on))
a.add_effect(on, True)

evt = Event("turn_off_automatically")
evt.add_precondition(GE(d, 200))
evt.add_effect(on, False)

b = Process("moving")
b.add_precondition(on)
b.add_derivative(d, 1)

problem = Problem("1D Movement")
problem.add_fluent(on)
problem.add_fluent(d)
problem.add_action(a)
problem.add_action(b)
problem.add_action(evt)
problem.set_initial_value(on, False)
problem.set_initial_value(d, 0)
problem.add_goal(GE(d, 10))

z = Fluent("z", BoolType())
pr = Process("Name")
pr.add_precondition(z)

with OneshotPlanner(problem_kind=problem.kind) as planner:
result = planner.solve(problem)
if result.status in unified_planning.engines.results.POSITIVE_OUTCOMES:
print(f"{planner.name} found this plan: {result.plan}")
else:
print("No plan found.")
from unified_planning.test import TestCase


def get_example_problems():
problems = {}
on = Fluent("on")
d = Fluent("d", RealType())

a = InstantaneousAction("turn_on")
a.add_precondition(Not(on))
a.add_effect(on, True)

evt = Event("turn_off_automatically")
evt.add_precondition(GE(d, 200))
evt.add_effect(on, False)

b = Process("moving")
b.add_precondition(on)
b.add_derivative(d, 1)

problem = Problem("1d_Movement")
problem.add_fluent(on)
problem.add_fluent(d)
problem.add_action(a)
problem.add_action(b)
problem.add_action(evt)
problem.set_initial_value(on, False)
problem.set_initial_value(d, 0)
problem.add_goal(GE(d, 10))

z = Fluent("z", BoolType())
pr = Process("Name")
pr.add_precondition(z)
problem = TestCase(
problem=problem,
solvable=True,
valid_plans=[],
invalid_plans=[],
)
problems["1d_movement"] = problem
return problems
2 changes: 1 addition & 1 deletion unified_planning/test/test_anml_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ def test_anml_io(self):
if problem.name in problems_to_skip:
continue
kind = problem.kind
if not kind.has_action_based():
if not kind.has_action_based() or kind.has_processes():
continue
with tempfile.TemporaryDirectory() as tempdir:
problem_filename = os.path.join(tempdir, "problem.anml")
Expand Down
7 changes: 6 additions & 1 deletion unified_planning/test/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from unified_planning.test.examples import get_example_problems
from unified_planning.test import unittest_TestCase, main
from unified_planning.model.action import InstantaneousTransitionMixin


class TestModel(unittest_TestCase):
Expand Down Expand Up @@ -70,14 +71,18 @@ def test_clone_problem_and_action(self):
for action_1, action_2 in zip(
problem_clone_1.actions, problem_clone_2.actions
):
if isinstance(action_2, InstantaneousAction):
if isinstance(action_2, InstantaneousTransitionMixin):
action_2._effects = []
action_1_clone = action_1.clone()
action_1_clone._effects = []
elif isinstance(action_2, DurativeAction):
action_2._effects = {}
action_1_clone = action_1.clone()
action_1_clone._effects = {}
elif isinstance(action_2, Process):
action_2._effects = {}
action_1_clone = action_1.clone()
action_1_clone._effects = {}
else:
raise NotImplementedError
self.assertEqual(action_2, action_1_clone)
Expand Down
8 changes: 8 additions & 0 deletions unified_planning/test/test_pddl_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ def test_basic_conditional_writer(self):
self.assertIn("(:init)", pddl_problem)
self.assertIn("(:goal (and (x)))", pddl_problem)

def test_processes_writer(self):
problem = self.problems["1d_movement"].problem
w = PDDLWriter(problem)
pddl_domain = w.get_domain()
self.assertIn("(:process moving", pddl_domain)
self.assertIn("#t", pddl_domain)

def test_basic_exists_writer(self):
problem = self.problems["basic_exists"].problem

Expand Down Expand Up @@ -591,6 +598,7 @@ def test_examples_io(self):
for a in problem.actions:
parsed_a = parsed_problem.action(w.get_pddl_name(a))
self.assertEqual(a, w.get_item_named(parsed_a.name))

for param, parsed_param in zip(a.parameters, parsed_a.parameters):
self.assertEqual(
param.type,
Expand Down
1 change: 1 addition & 0 deletions unified_planning/test/test_problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ def test_simple_numeric_planning_kind(self):
"sched:basic",
"sched:resource_set",
"sched:jobshop-ft06-operators",
"1d_Movement",
]
for example in self.problems.values():
problem = example.problem
Expand Down
2 changes: 2 additions & 0 deletions unified_planning/test/test_protobuf_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ def test_all_problems(self):
for name, example in self.problems.items():
problem = example.problem
kind = problem.kind
if kind.has_processes():
continue
problem_pb = self.pb_writer.convert(problem)
problem_up = self.pb_reader.convert(problem_pb)

Expand Down

0 comments on commit 0d6b4de

Please sign in to comment.