-
Notifications
You must be signed in to change notification settings - Fork 1
/
bel_updater.py
178 lines (133 loc) · 5.98 KB
/
bel_updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import roslib
import rospy
import yaml
import actionlib
import numpy as np
import pickle as pkl
import math
import json
from prolex_msgs.msg import GetCurrentLocationAction, GetCurrentLocationGoal, GetCurrentLocationFeedback, GetCurrentLocationResult, GoToAction, GoToGoal, GoToFeedback, GoToResult, IsInRoomAction, IsInRoomGoal, IsInRoomFeedback, IsInRoomResult
from prolex_msgs.msg import RetBoxAction, RetBoxGoal, RetBoxFeedback, RetBoxResult
from prolex_msgs.msg import BelPushAction, BelPushGoal, BelPushFeedback, BelPushResult
from prolex_msgs.msg import BelPullAction, BelPullGoal, BelPullFeedback, BelPullResult
from prolex_msgs.msg import GetObjsAction, GetObjsGoal, GetObjsFeedback, GetObjsResult
from MCTS_planner import Loc
from threading import Lock
class PushVoxel:
def __init__(self, obj_tp, x, y, obs):
self.obj_tp = obj_tp
self.x = x
self.y = y
self.obs = obs
class BelUpdater:
def __init__(self):
#Action Clients
self.cur_loc_client = actionlib.SimpleActionClient("/get_current_location_server",GetCurrentLocationAction)
res = self.cur_loc_client.wait_for_server()
print("Connected to Location Server: ", res)
self.bel_push_client = actionlib.SimpleActionClient("/bel_push_server", BelPushAction)
res = self.bel_push_client.wait_for_server()
print("Connected to Belief Push Server: ", res)
self.get_objs_client = actionlib.SimpleActionClient("/get_objs_server", GetObjsAction)
res = self.get_objs_client.wait_for_server()
print("Connected to Get Objects Server: ", res)
# Get configs
with open('config.json', 'r') as f:
self.configs = json.load(f)
def get_new_node(self, current_loc, dist, angle):
# In robot frame: robot direction is X-axis.
# Find the X,Y locations of the point in robot frame
# from distance and angle
new_x = np.cos(angle) * dist
new_y = np.sin(angle) * dist
# Do a rotation based on robot theta to get delta x,y in real coords
delta_x = new_x * np.cos(current_loc.theta) - new_y * np.sin(current_loc.theta)
delta_y = new_x * np.sin(current_loc.theta) + new_y * np.cos(current_loc.theta)
# Get actual x and y based off of current loc
real_x = current_loc.x + delta_x
real_y = current_loc.y + delta_y
# Round to map granularity and return
map_granularity = self.configs['rf_params']['map_granularity']
rounded_x = np.round(real_x / map_granularity) * map_granularity
rounded_y = np.round(real_y / map_granularity) * map_granularity
return rounded_x, rounded_y
def get_robot_fov(self, loc):
min_angle = self.configs['camera_params']['min_angle']
max_angle = self.configs['camera_params']['max_angle']
min_v_dist = self.configs['camera_params']['min_visual_distance']
max_v_dist = self.configs['camera_params']['max_visual_distance']
angle_delta = self.configs['rf_params']['angle_delta']
dist_delta = self.configs['rf_params']['dist_delta']
angle = min_angle
dist = min_v_dist
fov = []
while angle < max_angle:
while dist < max_v_dist:
# Get node that corresponds to angle and dist (relative to robot)
x, y = self.get_new_node(loc, dist, angle)
# Skip if already added
if (x, y) in fov:
dist += dist_delta
continue
# Can't see through obstacles so break
# TODO: Think about how to represent with different map
# granularities once LIDAR setup
#if self.obstacle_map[int(x), int(y)]:
# break
fov.append((x,y))
# Increment distance
dist += dist_delta
# Increment angle
angle += angle_delta
return fov
def get_belief_vals(self):
# Get vision Result
get_objs_goal = GetObjsGoal(tp="new")
self.get_objs_client.send_goal(get_objs_goal)
self.get_objs_client.wait_for_result()
result = self.get_objs_client.get_result()
objs = pkl.loads(eval(result.objs))
if len(objs.keys()) == 0: #No QR codes found
#Get robot position
loc_goal = GetCurrentLocationGoal()
self.cur_loc_client.send_goal(loc_goal)
self.cur_loc_client.wait_for_result()
res = self.cur_loc_client.get_result()
res = res.result.strip('()').split(' ')
robot_loc = (float(res[0].strip(',')), float(res[1].strip(',')), float(res[2].strip(',')))
fov = self.get_robot_fov(Loc(robot_loc[0], robot_loc[1], robot_loc[2]))
vox_list = []
for (x, y) in fov:
print('Updating in FOV, (x,y) = (', x,', ', y, ')')
vox_list.append(PushVoxel(
obj_tp = None,
x = x,
y = y,
obs = 0))
# Return updated
return vox_list
else: # QR Codes found
ret_voxes = []
for o_id in objs:
# Deserialize and append to list
ret_voxes.append(PushVoxel(
obj_tp=objs['o_id']['obj_tp'],
x=objs['o_id']['x'],
y=objs['o_id']['y'],
obs=1)
)
return ret_voxes
def run(self):
print("Starting Belief Updater Node")
while True:
# Get belief vals
obs = self.get_belief_vals()
#Push Results
goal = BelPushGoal()
goal.bel_update_vals = str(pkl.dumps(obs))
self.bel_push_client.send_goal(goal)
self.bel_push_client.wait_for_result()
if __name__ == "__main__":
rospy.init_node('bel_updater_client', anonymous=False)
bel_updater_node = BelUpdater()
bel_updater_node.run()