Skip to content

Commit

Permalink
feature(interpreted functions): work in progress - added precondition…
Browse files Browse the repository at this point in the history
…s and implies for elaborated actions
  • Loading branch information
Samuel Gobbi committed Oct 22, 2024
1 parent 853d37d commit ff2f7ca
Showing 1 changed file with 176 additions and 14 deletions.
190 changes: 176 additions & 14 deletions unified_planning/engines/compilers/interpreted_functions_remover.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from collections import OrderedDict, deque
import unified_planning as up
import unified_planning.engines as engines
from unified_planning.model.fluent import Fluent
from unified_planning.model.interpreted_function import InterpretedFunction
from unified_planning.model.object import Object
from unified_planning.model.timing import EndTiming, StartTiming, TimepointKind
from unified_planning.model.walkers.substituter import Substituter
from unified_planning.model.walkers.operators_extractor import OperatorsExtractor
Expand Down Expand Up @@ -50,6 +52,7 @@
check_and_simplify_conditions,
replace_action,
)
from unified_planning.shortcuts import UserType
from unified_planning.utils import powerset
from typing import List, Dict, Tuple, Optional, Iterator
from functools import partial
Expand Down Expand Up @@ -211,10 +214,15 @@ def _compile(

new_to_old: Dict[Action, Optional[Action]] = {}
new_problem = problem.clone()
temp_problem = problem.clone() # used to get fresh names might not be necessary
new_problem.name = f"{self.name}_{problem.name}"
new_problem.clear_actions()
# temp_problem.clear_actions()
better_knowledge = self.elaborate_known_IFs(self._interpreted_functions_values)
combined_knowledge = self.knowledge_combinations(better_knowledge)
kNum_for_interpreted_functions = UserType(
"kNum_for_interpreted_functions"
) # does this need to have a unique name?

for a in problem.actions:
print(
Expand Down Expand Up @@ -281,26 +289,87 @@ def _compile(
temp_action_list.clear()
for partially_elaborated_action in actions_list:
# print (partially_elaborated_action)

placeholder_function_name = get_fresh_name(
temp_problem,
("f_" + IF_to_handle_now.interpreted_function().name),
)
argnamesstring = ""
for arg in IF_to_handle_now.args:
argnamesstring = argnamesstring + "_" + str(arg)
print(argnamesstring)
action_parameter_name = (
"P_" + placeholder_function_name + argnamesstring
)
action_for_known_values = (
partially_elaborated_action.clone()
self._clone_instantaneous_add_parameter(
partially_elaborated_action,
action_parameter_name,
kNum_for_interpreted_functions,
)
)
temp_action_list.append(action_for_known_values)
print("just cloned action")
print(action_for_known_values)
# action_for_known_values = (partially_elaborated_action.clone())

# maybe can use new_problem but not sure
print("need name, retType and signature")
print(
"placeholder name"
) # might not be necessary - just for naming purposes
print(
placeholder_function_name
) # might not be necessary - just for naming purposes
print("retType")
print(IF_to_handle_now.interpreted_function().return_type)
# input for placeholder is always one value, that might represent more than one input parameter for the original function
# function_placeholder = Fluent (placeholder_function_name)
# ---------------------------------------
# add parameter (Px:kNum)
# ? -> can I add this without reconstructing the action from scratch?
# ---------------------------------------

known_values_precondition_list = list()
known_values_precondition_list.clear()
known_values_imply_list = list()
known_values_imply_list.clear()
for known_value in better_knowledge[
IF_to_handle_now.interpreted_function()
]:
print("known value of")
print(known_value)
print("is")
print(self._interpreted_functions_values[known_value])
compressed_input_name = "O"
argcounter_for_name = 0
while argcounter_for_name < len(IF_to_handle_now.args):
# print (IF_to_handle_now.arg(argcounter_for_name))
# compressed_input_name = compressed_input_name + IF_to_handle_now.arg(argcounter_for_name)
# print (known_value.arg(argcounter_for_name).constant_value())
compressed_input_name = (
compressed_input_name
+ "_"
+ str(
known_value.arg(
argcounter_for_name
).constant_value()
)
)
argcounter_for_name = argcounter_for_name + 1

print(
"placeholder name before double checking"
) # not sure double checking is necessary - the fluents have to be added only at the end I think
print(compressed_input_name)
known_value_object = Object(
compressed_input_name,
kNum_for_interpreted_functions,
)
print("known value object we just created")
print(known_value_object)
argcounter = 0
known_values_precondition = None
imply_precondition = None
while argcounter < len(known_value.args):

if known_values_precondition is None:
Expand All @@ -322,16 +391,75 @@ def _compile(
argcounter = argcounter + 1
print("known_values_precondition")
print(known_values_precondition)
# known_values_precondition implies that the value of the new action parameter is O_*knownvalue*
action_parameter = action_for_known_values.parameter(
action_parameter_name
)
imply_precondition = action_for_known_values.environment.expression_manager.Implies(
known_values_precondition,
action_for_known_values.environment.expression_manager.Equals(
action_parameter, known_value_object
),
)
print("imply precondition")
print(imply_precondition)
known_values_imply_list.append(imply_precondition)
known_values_precondition_list.append(
known_values_precondition
)
print(
"printing elaborated lists ------------------------------"
)
print("preconditions for known values")
print(known_values_precondition_list)
for p in known_values_precondition_list:
for printp in known_values_precondition_list:
print(printp)
print("implications for known values")
for printi in known_values_imply_list:
print(printi)
for p in known_values_imply_list:
action_for_known_values.add_precondition(p)
Or_condition = None
for (
p
) in known_values_precondition_list: # we have to Or these
if Or_condition is None:
print("or condition is none")
print(Or_condition)
Or_condition = p
print("now it is ")
print(Or_condition)
else:
print("or condition is not none, it's already")
print(Or_condition)
print("let's add another or")
Or_condition = action_for_known_values.environment.expression_manager.Or(
Or_condition, p
)
print(Or_condition)
action_for_known_values.add_precondition(Or_condition)
print(
"end printing elaborated lists --------------------------"
)

# handle known values with usertype and object

print(
"========================================================"
)
print("action for known values:")
print(action_for_known_values)
print(
"========================================================"
)
temp_action_list.append(action_for_known_values)
action_for_known_values.name = get_fresh_name(
temp_problem, a.name
) # might not be necessary - just for naming purposes
temp_problem.add_action(
action_for_known_values
) # might not be necessary - just for naming purposes
# unknown values ----------------------------------------------------------------
# print ("now making action for unknown values ---")
# now add precondition for unknown values and remove the if
action_for_unknown_values = (
partially_elaborated_action.clone()
Expand All @@ -346,24 +474,35 @@ def _compile(
# print ("has ifs:")
IFs = self.interpreted_functions_extractor.get(p)
# print (IFs)
if IF_to_handle_now in IFs:
print(p)
print(IF_to_handle_now)
print("this one is here")
else:
print(
"seems that this precondition does not contain our IF, we can add it back"
)
print(p)
if IF_to_handle_now not in IFs:
# print("seems that this precondition does not contain our IF, we can add it back")
# print(p)
action_for_unknown_values.add_precondition(p)
# else:
# print(p)
# print(IF_to_handle_now)
# print("this one is here")

print("for unknown values we have to Not the preocnditions")
# print("for unknown values we have to Not the preocnditions")
for p in known_values_precondition_list:
notp = action_for_unknown_values.environment.expression_manager.Not(
p
)
action_for_unknown_values.add_precondition(notp)
temp_action_list.append(action_for_unknown_values)
print("action for unknown values:")
print(action_for_unknown_values)

print(
"========================================================"
)
# print ("end elaborating unknown values if")
action_for_unknown_values.name = get_fresh_name(
temp_problem, a.name
) # might not be necessary - just for naming purposes
temp_problem.add_action(
action_for_unknown_values
) # might not be necessary - just for naming purposes
actions_list.clear()
actions_list.extend(temp_action_list)
else:
Expand Down Expand Up @@ -983,3 +1122,26 @@ def _Equals_or_Iff(self, exp1, exp2, action):
else:
ret_exp = action.environment.expression_manager.Equals(exp1, exp2)
return ret_exp

def _clone_instantaneous_add_parameter(
self, action, new_parameter_name, param_type
):
# not sure how to do this without "private" variables
new_params = OrderedDict(
(param_name, param.type) for param_name, param in action._parameters.items()
)

if new_parameter_name in new_params.keys():
print("oopsie")
print("could not add the new parameter")
else:
new_params[new_parameter_name] = param_type
new_instantaneous_action = InstantaneousAction(
action._name, new_params, action._environment
)
new_instantaneous_action._preconditions = action._preconditions[:]
new_instantaneous_action._effects = [e.clone() for e in action._effects]
new_instantaneous_action._fluents_assigned = action._fluents_assigned.copy()
new_instantaneous_action._fluents_inc_dec = action._fluents_inc_dec.copy()
new_instantaneous_action._simulated_effect = action._simulated_effect
return new_instantaneous_action

0 comments on commit ff2f7ca

Please sign in to comment.