diff --git a/unified_planning/model/multi_agent/action_parallelizer.py b/unified_planning/model/multi_agent/action_parallelizer.py new file mode 100644 index 000000000..5668da28c --- /dev/null +++ b/unified_planning/model/multi_agent/action_parallelizer.py @@ -0,0 +1,381 @@ +import unified_planning as up +from unified_planning.engines.sequential_simulator import UPSequentialSimulator as SequentialSimulator +import unified_planning.plans as plans + +class Parallelizer(): + + def __init__(self): + + self.problem = None + + + # generates the adjacency lists needed by 'GeneratePOP' to generate the final POP plan + def Update_POPdict(self, POP_dict, GAMMA, lastAct_ins): + if lastAct_ins is not None: + POP_dict[lastAct_ins] = [GAMMA[0]] # the start action + lenG = len(GAMMA) + + if lenG == 2: + POP_dict[GAMMA[0]] = [GAMMA[1]] # the endAction is in the adjacency list of the startAction + + if lenG > 2: # at least one action was parallelized to the start action + if lastAct_ins is not None: + if POP_dict.get(lastAct_ins) is None: + POP_dict[lastAct_ins] = [GAMMA[1]] + else: + POP_dict[lastAct_ins].append(GAMMA[1]) + for i in range(1, lenG-1): + POP_dict[GAMMA[i]] = [GAMMA[i+1]] + POP_dict[GAMMA[0]] = [GAMMA[lenG-1]] + + lastAct_ins = GAMMA[lenG-1] + + return POP_dict, lastAct_ins + + #-------------------------------------------------------------------------------------------------------# + + # generates the POP of the parallelized plan + def GeneratePOP(self, listActs): + POP = plans.PartialOrderPlan(listActs) + + return POP + + #-------------------------------------------------------------------------------------------------------# + + # writes in a file .plan the resulting parallelized plan + def UpdateParallelizedPlan(self, lista, f): + + L = len(lista) + + if self.actionMovement in str(lista[0]) and L>1: + stringedAction = str(lista[0]) + f.write(stringedAction+"; \n") + + if L==0: + return + + # in 'list' there is only one action, that is or: an atomic action or a start action that cannot be parallelized + elif L==1: + stringedAction = str(lista[0]) + f.write(stringedAction+"; \n") + + # I only have a 'start' and an 'end' action + elif L==2: + for act in lista: + stringedAction = str(act) + f.write(stringedAction+"; \n") + + # in the case something has been parallelized to a start action + else: + stringedAction = str(lista[0]) + f.write("{" + stringedAction+"}||{") + for act in lista[1:L-1]: + stringedAction = str(act) + f.write(stringedAction+"; ") + f.write("} \n") + stringedAction = str(lista[L-1]) + f.write(stringedAction+"; \n") + + #-------------------------------------------------------------------------------------------------------# + + # check if by adding a subplan of 'goto' actions, another action becomes parallelizable + def defineAndSolveSubproblem(self, new_goals, curren_state): + curr_state = curren_state + problem2 = self.problem.clone() + initial_values = self.problem.initial_values + + # I set the initial values of prob2 equals to the current state of the simulation + for fluent in initial_values: + problem2.set_initial_value(fluent, curr_state.get_value(fluent)) + + # I set the goal of prob2 equal to the unsatisfied conditions + problem2.clear_goals() + for necessaryPrecondition in new_goals: + problem2.add_goal(necessaryPrecondition) + + with OneshotPlanner(name = "fast-downward") as planner: + assert planner.supports(problem2.kind) + out_plan2 = planner.solve(problem2).plan + + foundSubplan = False + # check the plan is made up pf all movement actions + if out_plan2 is not None: + foundSubplan = True + contatoreAz = 0 + contatoreGoto = 0 + for act in out_plan2.actions: + contatoreAz += 1 + curr_state = self.sim.apply(curr_state, act) + if self.actionMovement in act.action.name: + contatoreGoto += 1 + if contatoreGoto != contatoreAz: # not only movement actions in the subplan --> I do not return the subplan + return curren_state, None, False + + return curr_state, out_plan2, foundSubplan + + #-------------------------------------------------------------------------------------------------------# + + # initial check that verifies if a certain action B is parallelizable to 'start_actionA'; that is if B is + # applicable even if A is terminated. + def initialCheck_applicability(self, actCercasi, actStart, actEnd, current_stateOriginal, current_stateBB): + + if current_stateBB == []: + curr_state = current_stateOriginal + else: + curr_state = current_stateBB + + curr_state = self.sim.apply(curr_state, actCercasi) + applicability = self.sim.is_applicable(curr_state, actStart) + if applicability: + curr_state = self.sim.apply(curr_state, actStart) + applicability = self.sim.is_applicable(curr_state, actEnd) + + return applicability + + #-------------------------------------------------------------------------------------------------------# + + # I search for the first action to be parallelized to the start_action + def findFirstParallelizableAction(self, i, j, durativeAct, start_action, end_action, current_state2, applicability): + + while j