-
Notifications
You must be signed in to change notification settings - Fork 0
/
astar.py
52 lines (39 loc) · 1.88 KB
/
astar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import utils
from priority_queue import PriorityQueue
from node import Node
class AStar:
def __init__(self, name, heuristic):
self.name = name
self.visited_edges = {}
self.opened_edges = PriorityQueue()
self.heuristic = heuristic
def h(self, x, goal_state_map):
return self.heuristic(x.state, goal_state_map)
def f(self, x, goal_state_map):
return x.g() + self.h(x, goal_state_map)
def has_lower_cost(self, node):
return node.cost < self.visited_edges.get(node.id)
def solve(self, initial_state, goal_state):
goal_state_map = utils.get_map_by_state(goal_state)
nodes, count_states, result = [], 0, None
root_node = Node(initial_state, 0, real_cost=0)
self.opened_edges.put(root_node)
while not self.opened_edges.empty():
edge = self.opened_edges.get()
if edge.id in self.visited_edges and not self.has_lower_cost(edge):
continue
nodes.append({ "id": edge.id, "parent": edge.parent, "action": edge.action, "state": utils.get_output_state(edge.state)})
if edge.state == goal_state:
result = edge.state
break
count_states += 1
self.visited_edges[edge.id] = edge.cost
for expanded_edge in edge.expand():
new_cost = self.f(expanded_edge, goal_state_map)
expanded_edge.set_cost(new_cost)
if expanded_edge.id not in self.visited_edges:
self.opened_edges.put(expanded_edge)
elif expanded_edge.id in self.visited_edges and self.has_lower_cost(expanded_edge):
self.opened_edges.replace_priority(expanded_edge)
self.visited_edges[expanded_edge.id] = expanded_edge.cost
return {"nodes": nodes, "count_states": count_states, "result": result}