Skip to content

Commit

Permalink
Added explicit agent actions
Browse files Browse the repository at this point in the history
  • Loading branch information
Alee08 committed Sep 30, 2023
1 parent d4732b7 commit 459a26c
Showing 1 changed file with 160 additions and 111 deletions.
271 changes: 160 additions & 111 deletions unified_planning/io/ma_pddl_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,19 @@ def walk_dot(self, expression, args):
agent = self._problem.agent(expression.agent())
fluent = expression.args[0].fluent()
objects = expression.args[0].args
if not self._unfactored:
"""if not self._unfactored:
return f'(a_{self.get_mangled_name(fluent)} {self.get_mangled_name(agent)} {" ".join([self.convert(obj) for obj in objects])})'
else:
return f'(a_{self.get_mangled_name(fluent)} {"a"} {" ".join([self.convert(obj) for obj in objects])})'
return f'(a_{self.get_mangled_name(fluent)} {"a"} {" ".join([self.convert(obj) for obj in objects])})'"""
return f'(a_{self.get_mangled_name(fluent)} {self.get_mangled_name(agent)} {" ".join([self.convert(obj) for obj in objects])})'

def walk_fluent_exp(self, expression, args):
fluent = expression.fluent()
if not self._unfactored and self._agent is not None:
"""if not self._unfactored and self._agent is not None:
agent_name = self._agent.name
else:
agent_name = "a"
agent_name = "a" """
agent_name = self._agent.name
if self._agent is not None and fluent in self._agent.fluents:
return f'(a_{self.get_mangled_name(fluent)} ?{agent_name}{" " if len(args) > 0 else ""}{" ".join(args)})'
else:
Expand Down Expand Up @@ -177,7 +179,7 @@ def _write_domain(self):

if self.needs_requirements:
out.write(" (:requirements :multi-agent")
if self.unfactored == False:
if not self.unfactored:
out.write(" :factored-privacy")
else:
out.write(" :unfactored-privacy")
Expand Down Expand Up @@ -275,9 +277,23 @@ def _write_domain(self):
predicates_environment,
functions_environment,
) = self.get_predicates_functions(self.problem.ma_environment)
predicates_agent, functions_agent = self.get_predicates_functions(
ag, is_private=True
)

predicates_agents = []
functions_agent = []
nl = "\n "
if self.unfactored:
for ag in self.problem.agents:
predicates_agent, functions_agent = self.get_predicates_functions(
ag, is_private=True
)
if len(predicates_agent) > 0:
predicates_agents.append(
f" (:private agent - {ag.name + '_type'}\n {nl.join(predicates_agent)})\n"
)
else:
predicates_agents, functions_agent = self.get_predicates_functions(
ag, is_private=True
)

predicates_public_agent = []
functions_public_agent = []
Expand Down Expand Up @@ -354,7 +370,7 @@ def _write_domain(self):
out.write(
f" (:predicates\n "
if len(predicates_environment) > 0
or len(predicates_agent) > 0
or len(predicates_agents) > 0
or len(predicates_agent_goal) > 0
or len(predicates_public_agent) > 0
else ""
Expand All @@ -376,15 +392,21 @@ def _write_domain(self):
)

nl = "\n "
out.write(
f" (:private\n {nl.join(predicates_agent)})"
if len(predicates_agent) > 0
else ""
)

if self.unfactored:
out.write(
"".join(predicates_agents) if len(predicates_agents) > 0 else ""
)
else:
out.write(
f" (:private\n {nl.join(predicates_agents)})"
if len(predicates_agents) > 0
else ""
)
out.write(
f")\n"
if len(predicates_environment) > 0
or len(predicates_agent) > 0
or len(predicates_agents) > 0
or len(predicates_agent_goal) > 0
or len(predicates_public_agent) > 0
else ""
Expand Down Expand Up @@ -427,119 +449,143 @@ def _write_domain(self):
else ""
)

converter = ConverterToMAPDDLString(
self.problem, self._get_mangled_name, ag, self.unfactored
)
costs: dict = {}
for a in ag.actions:
if isinstance(a, up.model.InstantaneousAction):
out.write(f" (:action {self._get_mangled_name(a)}")
if self.unfactored:
out.write(f'\n :agent {"?a - ag"}')

out.write(f"\n :parameters (")

if not self.unfactored:
out.write(
f' ?{self._get_mangled_name(ag)} - {self._get_mangled_name(ag) +"_type"}'
)
for ap in a.parameters:
if ap.type.is_user_type():
def write_action(ag):
converter = ConverterToMAPDDLString(
self.problem, self._get_mangled_name, ag, self.unfactored
)
for a in ag.actions:
if isinstance(a, up.model.InstantaneousAction):
if self.unfactored:
out.write(
f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}"
f" (:action {self._get_mangled_name(a)}_{ag.name}"
)
else:
raise UPTypeError(
"MA-PDDL supports only user type parameters"
)
out.write(")")
if len(a.preconditions) > 0:
out.write(f"\n :precondition (and \n")
for p in a.preconditions:
out.write(f" {converter.convert(p)}\n")
out.write(f" )")

if len(a.effects) > 0:
out.write("\n :effect (and\n")
for e in a.effects:
_write_effect(
e,
None,
out,
converter,
self.rewrite_bool_assignments,
self._get_mangled_name,
)
out.write(f" (:action {self._get_mangled_name(a)}")
if self.unfactored:
out.write(f'\n :agent ?{ag.name} - {ag.name + "_type"}')

if a in costs:
out.write(
f" (increase (total-cost) {converter.convert(costs[a])})"
)
out.write(")")
out.write(")\n")
elif isinstance(a, DurativeAction):
out.write(f" (:durative-action {self._get_mangled_name(a)}")
out.write(f"\n :parameters (")
for ap in a.parameters:
if ap.type.is_user_type():
out.write(f"\n :parameters (")

if not self.unfactored:
out.write(
f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}"
f' ?{self._get_mangled_name(ag)} - {self._get_mangled_name(ag) +"_type"}'
)
else:
raise UPTypeError(
"MA-PDDL supports only user type parameters"
)
out.write(")")
l, r = a.duration.lower, a.duration.upper
if l == r:
out.write(f"\n :duration (= ?duration {converter.convert(l)})")
else:
out.write(f"\n :duration (and ")
if a.duration.is_left_open():
out.write(f"(> ?duration {converter.convert(l)})")
else:
out.write(f"(>= ?duration {converter.convert(l)})")
if a.duration.is_right_open():
out.write(f"(< ?duration {converter.convert(r)})")
else:
out.write(f"(<= ?duration {converter.convert(r)})")
out.write(")")
if len(a.conditions) > 0:
out.write(f"\n :condition (and ")
for interval, cl in a.conditions.items():
for c in cl:
if interval.lower == interval.upper:
if interval.lower.is_from_start():
out.write(f"(at start {converter.convert(c)})")
else:
out.write(f"(at end {converter.convert(c)})")
else:
if not interval.is_left_open():
out.write(f"(at start {converter.convert(c)})")
out.write(f"(over all {converter.convert(c)})")
if not interval.is_right_open():
out.write(f"(at end {converter.convert(c)})")
for ap in a.parameters:
if ap.type.is_user_type():
out.write(
f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}"
)
else:
raise UPTypeError(
"MA-PDDL supports only user type parameters"
)
out.write(")")
if len(a.effects) > 0:
out.write("\n :effect (and")
for t, el in a.effects.items():
for e in el:
if len(a.preconditions) > 0:
out.write(f"\n :precondition (and \n")
for p in a.preconditions:
out.write(f" {converter.convert(p)}\n")
out.write(f" )")

if len(a.effects) > 0:
out.write("\n :effect (and\n")
for e in a.effects:
_write_effect(
e,
t,
None,
out,
converter,
self.rewrite_bool_assignments,
self._get_mangled_name,
)
if a in costs:

if a in costs:
out.write(
f" (increase (total-cost) {converter.convert(costs[a])})"
)
out.write(")")
out.write(")\n")
elif isinstance(a, DurativeAction):
out.write(f" (:durative-action {self._get_mangled_name(a)}")
out.write(f"\n :parameters (")
for ap in a.parameters:
if ap.type.is_user_type():
out.write(
f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}"
)
else:
raise UPTypeError(
"MA-PDDL supports only user type parameters"
)
out.write(")")
l, r = a.duration.lower, a.duration.upper
if l == r:
out.write(
f" (at end (increase (total-cost) {converter.convert(costs[a])}))"
f"\n :duration (= ?duration {converter.convert(l)})"
)
out.write(")")
out.write(")\n")
else:
raise NotImplementedError
else:
out.write(f"\n :duration (and ")
if a.duration.is_left_open():
out.write(f"(> ?duration {converter.convert(l)})")
else:
out.write(f"(>= ?duration {converter.convert(l)})")
if a.duration.is_right_open():
out.write(f"(< ?duration {converter.convert(r)})")
else:
out.write(f"(<= ?duration {converter.convert(r)})")
out.write(")")
if len(a.conditions) > 0:
out.write(f"\n :condition (and ")
for interval, cl in a.conditions.items():
for c in cl:
if interval.lower == interval.upper:
if interval.lower.is_from_start():
out.write(
f"(at start {converter.convert(c)})"
)
else:
out.write(
f"(at end {converter.convert(c)})"
)
else:
if not interval.is_left_open():
out.write(
f"(at start {converter.convert(c)})"
)
out.write(f"(over all {converter.convert(c)})")
if not interval.is_right_open():
out.write(
f"(at end {converter.convert(c)})"
)
out.write(")")
if len(a.effects) > 0:
out.write("\n :effect (and")
for t, el in a.effects.items():
for e in el:
_write_effect(
e,
t,
out,
converter,
self.rewrite_bool_assignments,
self._get_mangled_name,
)
if a in costs:
out.write(
f" (at end (increase (total-cost) {converter.convert(costs[a])}))"
)
out.write(")")
out.write(")\n")
else:
raise NotImplementedError

if self.unfactored:
for ag in self.problem.agents:
write_action(ag)
else:
write_action(ag)

out.write(")\n")

ag_domains[self._get_mangled_name(ag)] = out.getvalue()
Expand Down Expand Up @@ -749,7 +795,10 @@ def get_predicates_functions(
else:
raise UPTypeError("MA-PDDL supports only user type parameters")
if isinstance(obj, up.model.multi_agent.Agent):
expression = f'({prefix}{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})'
if self.unfactored:
expression = f'({prefix}{self._get_mangled_name(f)} ?agent - {obj.name + "_type"}{"".join(params)})'
else:
expression = f'({prefix}{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})'
else:
expression = f'({prefix}{self._get_mangled_name(f)}{"".join(params)})'
if f.type.is_bool_type():
Expand Down

0 comments on commit 459a26c

Please sign in to comment.