-
Notifications
You must be signed in to change notification settings - Fork 2
/
validate_shift_metric.py
209 lines (168 loc) · 6.94 KB
/
validate_shift_metric.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# TODO: Move this into data models, doesn't belong here
import os
import sys
repository_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir)
sys.path.append(repository_root)
from map_processing.graph_opt_hl_interface import (
holistic_optimize,
WEIGHTS_DICT,
WeightSpecifier,
)
from map_processing.cache_manager import CacheManagerSingleton
from firebase_admin import credentials
from map_processing.data_models import OComputeInfParams, GTDataSet, OConfig
from map_processing import PrescalingOptEnum
from map_processing.transform_utils import transform_vector_to_matrix
import numpy as np
# The shift metric will be tested by utilizing two different
# maps. The optimization mapped will be optimized with a set
# of parameters in order to obtain global positions of each
# tag. The trajectory map will be used as a "test" map that
# will be iterated across to determine the shift metric.
TRAJECTORY_MAP_NAME = "generated_23-01-07-22-14-40.json"
OPTIMIZATION_MAP_NAME = "r1-single-straight-3round.json"
SBA = False
WEIGHTS = WEIGHTS_DICT[WeightSpecifier(5)]
CAMERA_POSE_FLIPPER = np.array(
[[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]
)
class TagDetection:
tag_pose: np.ndarray = np.ones(shape=[16, 1])
tag_id: int = 0
odom_pose: np.ndarray = np.ones(shape=[16, 1])
odom_id: int = 0
opt_tag_pose: np.ndarray = np.ones(shape=[])
@property
def tag_pose_homogeneous(self):
return np.reshape(np.array(self.tag_pose), (4, 4), order="C")
@property
def odom_pose_homogeneous(self):
return np.reshape(np.array(self.odom_pose), (4, 4), order="F")
@property
def tag_pose_global(self):
return (
self.odom_pose_homogeneous @ CAMERA_POSE_FLIPPER @ self.tag_pose_homogeneous
)
@property
def opt_tag_pose_homogeneous(self):
return transform_vector_to_matrix(self.opt_tag_pose)
class DetectionPair:
def __init__(self, detection_one: TagDetection, detection_two: TagDetection):
self.detection_one = detection_one
self.detection_two = detection_two
@property
def relative_tag_transform(self):
return self.detection_two.tag_pose_global @ np.linalg.inv(
self.detection_one.tag_pose_global
)
@property
def relative_tag_transform_trans(self):
return self.relative_tag_transform[0:3, 3]
@property
def opt_tag_one_on_tag_one(self):
return (
self.detection_one.opt_tag_pose_homogeneous
@ self.detection_one.tag_pose_global
@ np.linalg.inv(self.detection_one.opt_tag_pose_homogeneous)
)
@property
def opt_tag_two_on_tag_one(self):
return (
self.detection_two.opt_tag_pose_homogeneous
@ self.detection_one.tag_pose_global
@ np.linalg.inv(self.detection_one.opt_tag_pose_homogeneous)
)
@property
def relative_opt_tag_transform(self):
return self.opt_tag_two_on_tag_one @ np.linalg.inv(self.opt_tag_one_on_tag_one)
@property
def opt_tag_shift(self):
return self.relative_tag_transform @ np.linalg.inv(
self.relative_opt_tag_transform
)
@property
def opt_tag_shift_pos(self):
return np.abs(self.opt_tag_shift[0:3, 3])
def find_maps_and_data():
env_variable = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
cms = CacheManagerSingleton(
firebase_creds=credentials.Certificate(env_variable), max_listen_wait=0
)
# Assumes only one map exists in the .cache folder for the optimization and trajectory
optimization_map_info = cms.find_maps(
OPTIMIZATION_MAP_NAME, search_restriction=0
).pop()
# trajectory_map_info = cms.find_maps(TRAJECTORY_MAP_NAME, search_restriction=2).pop()
# Both maps should have the same ground truth
gt_data = cms.find_ground_truth_data_from_map_info(optimization_map_info)
return optimization_map_info, gt_data
def run_optimization(optimization_map_info, gt_data):
compute_inf_params = OComputeInfParams(
lin_vel_var=np.ones(3) * np.sqrt(3) * 1.0, tag_var=1.0, ang_vel_var=1.0
)
oconfig = OConfig(
is_sba=SBA, weights=WEIGHTS, compute_inf_params=compute_inf_params
)
opt_result = holistic_optimize(
map_info=optimization_map_info,
pso=PrescalingOptEnum(0 if SBA else 1),
oconfig=oconfig,
verbose=True,
visualize=False,
compare=False,
upload=False,
gt_data=GTDataSet.gt_data_set_from_dict_of_arrays(gt_data),
)
return opt_result
def calculate_shift_metric(opt_result_opt_tags, trajectory_map_dct):
trajectory_tag_data = trajectory_map_dct["tag_data"]
tag_pairs = []
opt_tag_pos_dct = {}
for tag in opt_result_opt_tags:
opt_tag_pos_dct[int(tag[-1])] = tag[:-1]
shift_metric = np.zeros([4])
for i in range(len(trajectory_tag_data) - 1):
if (
trajectory_tag_data[i][0]["tag_id"]
== trajectory_tag_data[i + 1][0]["tag_id"]
):
continue
detection_one = TagDetection()
detection_one.tag_id = trajectory_tag_data[i][0]["tag_id"]
detection_one.tag_pose = trajectory_tag_data[i][0]["tag_pose"]
detection_one.odom_id = trajectory_tag_data[i][0]["pose_id"]
detection_one.odom_pose = trajectory_map_dct["pose_data"][
detection_one.odom_id
]["pose"]
detection_one.opt_tag_pose = opt_tag_pos_dct[detection_one.tag_id]
detection_two = TagDetection()
detection_two.tag_id = trajectory_tag_data[i + 1][0]["tag_id"]
detection_two.tag_pose = trajectory_tag_data[i + 1][0]["tag_pose"]
detection_two.odom_id = trajectory_tag_data[i + 1][0]["pose_id"]
detection_two.odom_pose = trajectory_map_dct["pose_data"][
detection_two.odom_id
]["pose"]
detection_two.opt_tag_pose = opt_tag_pos_dct[detection_two.tag_id]
curr_tag_pair = DetectionPair(
detection_one=detection_one, detection_two=detection_two
)
tag_pairs.append(curr_tag_pair)
shift_metric[:3] += curr_tag_pair.opt_tag_shift_pos / np.linalg.norm(
curr_tag_pair.relative_tag_transform_trans
)
shift_metric[3] += np.linalg.norm(
curr_tag_pair.opt_tag_shift_pos
) / np.linalg.norm(curr_tag_pair.relative_tag_transform_trans)
return shift_metric / len(tag_pairs)
def run_shift_metric_test():
optimization_map_info, gt_data = find_maps_and_data()
opt_result = run_optimization(optimization_map_info, gt_data)
# shift_metric = calculate_shift_metric(opt_result.map_opt.tags, trajectory_map_info.map_dct)
shift_metric = opt_result.shift_metric
print("----------------------------------")
print(f"X Shift: {shift_metric[0]:.3f}")
print(f"Y Shift: {shift_metric[1]:.3f}")
print(f"Z Shift: {shift_metric[2]:.3f}")
print(f"Overall Shift: {shift_metric[3]:.3f}")
if __name__ == "__main__":
run_shift_metric_test()