From ff2f7cad9139bfb0915eac2b4c92bdbbcd095b9b Mon Sep 17 00:00:00 2001 From: Samuel Gobbi Date: Tue, 22 Oct 2024 11:03:27 +0200 Subject: [PATCH] feature(interpreted functions): work in progress - added preconditions and implies for elaborated actions --- .../interpreted_functions_remover.py | 190 ++++++++++++++++-- 1 file changed, 176 insertions(+), 14 deletions(-) diff --git a/unified_planning/engines/compilers/interpreted_functions_remover.py b/unified_planning/engines/compilers/interpreted_functions_remover.py index 8a21f1ad3..605e0d672 100644 --- a/unified_planning/engines/compilers/interpreted_functions_remover.py +++ b/unified_planning/engines/compilers/interpreted_functions_remover.py @@ -20,7 +20,9 @@ from collections import OrderedDict, deque import unified_planning as up import unified_planning.engines as engines +from unified_planning.model.fluent import Fluent from unified_planning.model.interpreted_function import InterpretedFunction +from unified_planning.model.object import Object from unified_planning.model.timing import EndTiming, StartTiming, TimepointKind from unified_planning.model.walkers.substituter import Substituter from unified_planning.model.walkers.operators_extractor import OperatorsExtractor @@ -50,6 +52,7 @@ check_and_simplify_conditions, replace_action, ) +from unified_planning.shortcuts import UserType from unified_planning.utils import powerset from typing import List, Dict, Tuple, Optional, Iterator from functools import partial @@ -211,10 +214,15 @@ def _compile( new_to_old: Dict[Action, Optional[Action]] = {} new_problem = problem.clone() + temp_problem = problem.clone() # used to get fresh names might not be necessary new_problem.name = f"{self.name}_{problem.name}" new_problem.clear_actions() + # temp_problem.clear_actions() better_knowledge = self.elaborate_known_IFs(self._interpreted_functions_values) combined_knowledge = self.knowledge_combinations(better_knowledge) + kNum_for_interpreted_functions = UserType( + "kNum_for_interpreted_functions" + ) # does this need to have a unique name? for a in problem.actions: print( @@ -281,10 +289,41 @@ def _compile( temp_action_list.clear() for partially_elaborated_action in actions_list: # print (partially_elaborated_action) + + placeholder_function_name = get_fresh_name( + temp_problem, + ("f_" + IF_to_handle_now.interpreted_function().name), + ) + argnamesstring = "" + for arg in IF_to_handle_now.args: + argnamesstring = argnamesstring + "_" + str(arg) + print(argnamesstring) + action_parameter_name = ( + "P_" + placeholder_function_name + argnamesstring + ) action_for_known_values = ( - partially_elaborated_action.clone() + self._clone_instantaneous_add_parameter( + partially_elaborated_action, + action_parameter_name, + kNum_for_interpreted_functions, + ) ) - temp_action_list.append(action_for_known_values) + print("just cloned action") + print(action_for_known_values) + # action_for_known_values = (partially_elaborated_action.clone()) + + # maybe can use new_problem but not sure + print("need name, retType and signature") + print( + "placeholder name" + ) # might not be necessary - just for naming purposes + print( + placeholder_function_name + ) # might not be necessary - just for naming purposes + print("retType") + print(IF_to_handle_now.interpreted_function().return_type) + # input for placeholder is always one value, that might represent more than one input parameter for the original function + # function_placeholder = Fluent (placeholder_function_name) # --------------------------------------- # add parameter (Px:kNum) # ? -> can I add this without reconstructing the action from scratch? @@ -292,6 +331,8 @@ def _compile( known_values_precondition_list = list() known_values_precondition_list.clear() + known_values_imply_list = list() + known_values_imply_list.clear() for known_value in better_knowledge[ IF_to_handle_now.interpreted_function() ]: @@ -299,8 +340,36 @@ def _compile( print(known_value) print("is") print(self._interpreted_functions_values[known_value]) + compressed_input_name = "O" + argcounter_for_name = 0 + while argcounter_for_name < len(IF_to_handle_now.args): + # print (IF_to_handle_now.arg(argcounter_for_name)) + # compressed_input_name = compressed_input_name + IF_to_handle_now.arg(argcounter_for_name) + # print (known_value.arg(argcounter_for_name).constant_value()) + compressed_input_name = ( + compressed_input_name + + "_" + + str( + known_value.arg( + argcounter_for_name + ).constant_value() + ) + ) + argcounter_for_name = argcounter_for_name + 1 + + print( + "placeholder name before double checking" + ) # not sure double checking is necessary - the fluents have to be added only at the end I think + print(compressed_input_name) + known_value_object = Object( + compressed_input_name, + kNum_for_interpreted_functions, + ) + print("known value object we just created") + print(known_value_object) argcounter = 0 known_values_precondition = None + imply_precondition = None while argcounter < len(known_value.args): if known_values_precondition is None: @@ -322,16 +391,75 @@ def _compile( argcounter = argcounter + 1 print("known_values_precondition") print(known_values_precondition) + # known_values_precondition implies that the value of the new action parameter is O_*knownvalue* + action_parameter = action_for_known_values.parameter( + action_parameter_name + ) + imply_precondition = action_for_known_values.environment.expression_manager.Implies( + known_values_precondition, + action_for_known_values.environment.expression_manager.Equals( + action_parameter, known_value_object + ), + ) + print("imply precondition") + print(imply_precondition) + known_values_imply_list.append(imply_precondition) known_values_precondition_list.append( known_values_precondition ) + print( + "printing elaborated lists ------------------------------" + ) print("preconditions for known values") - print(known_values_precondition_list) - for p in known_values_precondition_list: + for printp in known_values_precondition_list: + print(printp) + print("implications for known values") + for printi in known_values_imply_list: + print(printi) + for p in known_values_imply_list: action_for_known_values.add_precondition(p) + Or_condition = None + for ( + p + ) in known_values_precondition_list: # we have to Or these + if Or_condition is None: + print("or condition is none") + print(Or_condition) + Or_condition = p + print("now it is ") + print(Or_condition) + else: + print("or condition is not none, it's already") + print(Or_condition) + print("let's add another or") + Or_condition = action_for_known_values.environment.expression_manager.Or( + Or_condition, p + ) + print(Or_condition) + action_for_known_values.add_precondition(Or_condition) + print( + "end printing elaborated lists --------------------------" + ) # handle known values with usertype and object + print( + "========================================================" + ) + print("action for known values:") + print(action_for_known_values) + print( + "========================================================" + ) + temp_action_list.append(action_for_known_values) + action_for_known_values.name = get_fresh_name( + temp_problem, a.name + ) # might not be necessary - just for naming purposes + temp_problem.add_action( + action_for_known_values + ) # might not be necessary - just for naming purposes + # unknown values ---------------------------------------------------------------- + # print ("now making action for unknown values ---") # now add precondition for unknown values and remove the if action_for_unknown_values = ( partially_elaborated_action.clone() @@ -346,24 +474,35 @@ def _compile( # print ("has ifs:") IFs = self.interpreted_functions_extractor.get(p) # print (IFs) - if IF_to_handle_now in IFs: - print(p) - print(IF_to_handle_now) - print("this one is here") - else: - print( - "seems that this precondition does not contain our IF, we can add it back" - ) - print(p) + if IF_to_handle_now not in IFs: + # print("seems that this precondition does not contain our IF, we can add it back") + # print(p) action_for_unknown_values.add_precondition(p) + # else: + # print(p) + # print(IF_to_handle_now) + # print("this one is here") - print("for unknown values we have to Not the preocnditions") + # print("for unknown values we have to Not the preocnditions") for p in known_values_precondition_list: notp = action_for_unknown_values.environment.expression_manager.Not( p ) action_for_unknown_values.add_precondition(notp) temp_action_list.append(action_for_unknown_values) + print("action for unknown values:") + print(action_for_unknown_values) + + print( + "========================================================" + ) + # print ("end elaborating unknown values if") + action_for_unknown_values.name = get_fresh_name( + temp_problem, a.name + ) # might not be necessary - just for naming purposes + temp_problem.add_action( + action_for_unknown_values + ) # might not be necessary - just for naming purposes actions_list.clear() actions_list.extend(temp_action_list) else: @@ -983,3 +1122,26 @@ def _Equals_or_Iff(self, exp1, exp2, action): else: ret_exp = action.environment.expression_manager.Equals(exp1, exp2) return ret_exp + + def _clone_instantaneous_add_parameter( + self, action, new_parameter_name, param_type + ): + # not sure how to do this without "private" variables + new_params = OrderedDict( + (param_name, param.type) for param_name, param in action._parameters.items() + ) + + if new_parameter_name in new_params.keys(): + print("oopsie") + print("could not add the new parameter") + else: + new_params[new_parameter_name] = param_type + new_instantaneous_action = InstantaneousAction( + action._name, new_params, action._environment + ) + new_instantaneous_action._preconditions = action._preconditions[:] + new_instantaneous_action._effects = [e.clone() for e in action._effects] + new_instantaneous_action._fluents_assigned = action._fluents_assigned.copy() + new_instantaneous_action._fluents_inc_dec = action._fluents_inc_dec.copy() + new_instantaneous_action._simulated_effect = action._simulated_effect + return new_instantaneous_action