-
Notifications
You must be signed in to change notification settings - Fork 54
/
triad_openvr.py
299 lines (263 loc) · 13.1 KB
/
triad_openvr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import time
import sys
import openvr
import math
import json
from functools import lru_cache
# Function to print out text but instead of starting a new line it will overwrite the existing line
def update_text(txt):
sys.stdout.write('\r'+txt)
sys.stdout.flush()
#Convert the standard 3x4 position/rotation matrix to a x,y,z location and the appropriate Euler angles (in degrees)
def convert_to_euler(pose_mat):
yaw = 180 / math.pi * math.atan2(pose_mat[1][0], pose_mat[0][0])
pitch = 180 / math.pi * math.atan2(pose_mat[2][0], pose_mat[0][0])
roll = 180 / math.pi * math.atan2(pose_mat[2][1], pose_mat[2][2])
x = pose_mat[0][3]
y = pose_mat[1][3]
z = pose_mat[2][3]
return [x,y,z,yaw,pitch,roll]
#Convert the standard 3x4 position/rotation matrix to a x,y,z location and the appropriate Quaternion
def convert_to_quaternion(pose_mat):
# Per issue #2, adding a abs() so that sqrt only results in real numbers
r_w = math.sqrt(abs(1+pose_mat[0][0]+pose_mat[1][1]+pose_mat[2][2]))/2
r_x = (pose_mat[2][1]-pose_mat[1][2])/(4*r_w)
r_y = (pose_mat[0][2]-pose_mat[2][0])/(4*r_w)
r_z = (pose_mat[1][0]-pose_mat[0][1])/(4*r_w)
x = pose_mat[0][3]
y = pose_mat[1][3]
z = pose_mat[2][3]
return [x,y,z,r_w,r_x,r_y,r_z]
#Define a class to make it easy to append pose matricies and convert to both Euler and Quaternion for plotting
class pose_sample_buffer():
def __init__(self):
self.i = 0
self.index = []
self.time = []
self.x = []
self.y = []
self.z = []
self.yaw = []
self.pitch = []
self.roll = []
self.r_w = []
self.r_x = []
self.r_y = []
self.r_z = []
def append(self,pose_mat,t):
self.time.append(t)
self.x.append(pose_mat[0][3])
self.y.append(pose_mat[1][3])
self.z.append(pose_mat[2][3])
self.yaw.append(180 / math.pi * math.atan(pose_mat[1][0] /pose_mat[0][0]))
self.pitch.append(180 / math.pi * math.atan(-1 * pose_mat[2][0] / math.sqrt(pow(pose_mat[2][1], 2) + math.pow(pose_mat[2][2], 2))))
self.roll.append(180 / math.pi * math.atan(pose_mat[2][1] /pose_mat[2][2]))
r_w = math.sqrt(abs(1+pose_mat[0][0]+pose_mat[1][1]+pose_mat[2][2]))/2
self.r_w.append(r_w)
self.r_x.append((pose_mat[2][1]-pose_mat[1][2])/(4*r_w))
self.r_y.append((pose_mat[0][2]-pose_mat[2][0])/(4*r_w))
self.r_z.append((pose_mat[1][0]-pose_mat[0][1])/(4*r_w))
def get_pose(vr_obj):
return vr_obj.getDeviceToAbsoluteTrackingPose(openvr.TrackingUniverseStanding, 0, openvr.k_unMaxTrackedDeviceCount)
class vr_tracked_device():
def __init__(self,vr_obj,index,device_class):
self.device_class = device_class
self.index = index
self.vr = vr_obj
@lru_cache(maxsize=None)
def get_serial(self):
return self.vr.getStringTrackedDeviceProperty(self.index, openvr.Prop_SerialNumber_String)
def get_model(self):
return self.vr.getStringTrackedDeviceProperty(self.index, openvr.Prop_ModelNumber_String)
def get_battery_percent(self):
return self.vr.getFloatTrackedDeviceProperty(self.index, openvr.Prop_DeviceBatteryPercentage_Float)
def is_charging(self):
return self.vr.getBoolTrackedDeviceProperty(self.index, openvr.Prop_DeviceIsCharging_Bool)
def sample(self,num_samples,sample_rate):
interval = 1/sample_rate
rtn = pose_sample_buffer()
sample_start = time.time()
for i in range(num_samples):
start = time.time()
pose = get_pose(self.vr)
rtn.append(pose[self.index].mDeviceToAbsoluteTracking,time.time()-sample_start)
sleep_time = interval- (time.time()-start)
if sleep_time>0:
time.sleep(sleep_time)
return rtn
def get_pose_euler(self, pose=None):
if pose == None:
pose = get_pose(self.vr)
if pose[self.index].bPoseIsValid:
return convert_to_euler(pose[self.index].mDeviceToAbsoluteTracking)
else:
return None
def get_pose_matrix(self, pose=None):
if pose == None:
pose = get_pose(self.vr)
if pose[self.index].bPoseIsValid:
return pose[self.index].mDeviceToAbsoluteTracking
else:
return None
def get_velocity(self, pose=None):
if pose == None:
pose = get_pose(self.vr)
if pose[self.index].bPoseIsValid:
return pose[self.index].vVelocity
else:
return None
def get_angular_velocity(self, pose=None):
if pose == None:
pose = get_pose(self.vr)
if pose[self.index].bPoseIsValid:
return pose[self.index].vAngularVelocity
else:
return None
def get_pose_quaternion(self, pose=None):
if pose == None:
pose = get_pose(self.vr)
if pose[self.index].bPoseIsValid:
return convert_to_quaternion(pose[self.index].mDeviceToAbsoluteTracking)
else:
return None
def controller_state_to_dict(self, pControllerState):
# This function is graciously borrowed from https://gist.github.com/awesomebytes/75daab3adb62b331f21ecf3a03b3ab46
# docs: https://github.com/ValveSoftware/openvr/wiki/IVRSystem::GetControllerState
d = {}
d['unPacketNum'] = pControllerState.unPacketNum
# on trigger .y is always 0.0 says the docs
d['trigger'] = pControllerState.rAxis[1].x
# 0.0 on trigger is fully released
# -1.0 to 1.0 on joystick and trackpads
d['trackpad_x'] = pControllerState.rAxis[0].x
d['trackpad_y'] = pControllerState.rAxis[0].y
# These are published and always 0.0
# for i in range(2, 5):
# d['unknowns_' + str(i) + '_x'] = pControllerState.rAxis[i].x
# d['unknowns_' + str(i) + '_y'] = pControllerState.rAxis[i].y
d['ulButtonPressed'] = pControllerState.ulButtonPressed
d['ulButtonTouched'] = pControllerState.ulButtonTouched
# To make easier to understand what is going on
# Second bit marks menu button
d['menu_button'] = bool(pControllerState.ulButtonPressed >> 1 & 1)
# 32 bit marks trackpad
d['trackpad_pressed'] = bool(pControllerState.ulButtonPressed >> 32 & 1)
d['trackpad_touched'] = bool(pControllerState.ulButtonTouched >> 32 & 1)
# third bit marks grip button
d['grip_button'] = bool(pControllerState.ulButtonPressed >> 2 & 1)
# System button can't be read, if you press it
# the controllers stop reporting
return d
def get_controller_inputs(self):
result, state = self.vr.getControllerState(self.index)
return self.controller_state_to_dict(state)
def trigger_haptic_pulse(self, duration_micros=1000, axis_id=0):
"""
Causes devices with haptic feedback to vibrate for a short time.
"""
self.vr.triggerHapticPulse(self.index ,axis_id, duration_micros)
class vr_tracking_reference(vr_tracked_device):
def get_mode(self):
return self.vr.getStringTrackedDeviceProperty(self.index,openvr.Prop_ModeLabel_String).decode('utf-8').upper()
def sample(self,num_samples,sample_rate):
print("Warning: Tracking References do not move, sample isn't much use...")
class triad_openvr():
def __init__(self, configfile_path=None):
# Initialize OpenVR in the
self.vr = openvr.init(openvr.VRApplication_Other)
self.vrsystem = openvr.VRSystem()
# Initializing object to hold indexes for various tracked objects
self.object_names = {"Tracking Reference":[],"HMD":[],"Controller":[],"Tracker":[]}
self.devices = {}
self.device_index_map = {}
poses = self.vr.getDeviceToAbsoluteTrackingPose(openvr.TrackingUniverseStanding, 0,
openvr.k_unMaxTrackedDeviceCount)
# Loading config file
if configfile_path:
try:
with open(configfile_path, 'r') as json_data:
config = json.load(json_data)
except EnvironmentError: # parent of IOError, OSError *and* WindowsError where available
print('config.json not found.')
exit(1)
# Iterate through the pose list to find the active devices and determine their type
for i in range(openvr.k_unMaxTrackedDeviceCount):
if poses[i].bDeviceIsConnected:
device_serial = self.vr.getStringTrackedDeviceProperty(i,openvr.Prop_SerialNumber_String).decode('utf-8')
for device in config['devices']:
if device_serial == device['serial']:
device_name = device['name']
self.object_names[device['type']].append(device_name)
self.devices[device_name] = vr_tracked_device(self.vr,i,device['type'])
else:
# Iterate through the pose list to find the active devices and determine their type
for i in range(openvr.k_unMaxTrackedDeviceCount):
if poses[i].bDeviceIsConnected:
self.add_tracked_device(i)
def __del__(self):
openvr.shutdown()
def get_pose(self):
return get_pose(self.vr)
def poll_vr_events(self):
"""
Used to poll VR events and find any new tracked devices or ones that are no longer tracked.
"""
event = openvr.VREvent_t()
while self.vrsystem.pollNextEvent(event):
if event.eventType == openvr.VREvent_TrackedDeviceActivated:
self.add_tracked_device(event.trackedDeviceIndex)
elif event.eventType == openvr.VREvent_TrackedDeviceDeactivated:
#If we were already tracking this device, quit tracking it.
if event.trackedDeviceIndex in self.device_index_map:
self.remove_tracked_device(event.trackedDeviceIndex)
def add_tracked_device(self, tracked_device_index):
i = tracked_device_index
device_class = self.vr.getTrackedDeviceClass(i)
if (device_class == openvr.TrackedDeviceClass_Controller):
device_name = "controller_"+str(len(self.object_names["Controller"])+1)
self.object_names["Controller"].append(device_name)
self.devices[device_name] = vr_tracked_device(self.vr,i,"Controller")
self.device_index_map[i] = device_name
elif (device_class == openvr.TrackedDeviceClass_HMD):
device_name = "hmd_"+str(len(self.object_names["HMD"])+1)
self.object_names["HMD"].append(device_name)
self.devices[device_name] = vr_tracked_device(self.vr,i,"HMD")
self.device_index_map[i] = device_name
elif (device_class == openvr.TrackedDeviceClass_GenericTracker):
device_name = "tracker_"+str(len(self.object_names["Tracker"])+1)
self.object_names["Tracker"].append(device_name)
self.devices[device_name] = vr_tracked_device(self.vr,i,"Tracker")
self.device_index_map[i] = device_name
elif (device_class == openvr.TrackedDeviceClass_TrackingReference):
device_name = "tracking_reference_"+str(len(self.object_names["Tracking Reference"])+1)
self.object_names["Tracking Reference"].append(device_name)
self.devices[device_name] = vr_tracking_reference(self.vr,i,"Tracking Reference")
self.device_index_map[i] = device_name
def remove_tracked_device(self, tracked_device_index):
if tracked_device_index in self.device_index_map:
device_name = self.device_index_map[tracked_device_index]
self.object_names[self.devices[device_name].device_class].remove(device_name)
del self.device_index_map[tracked_device_index]
del self.devices[device_name]
else:
raise Exception("Tracked device index {} not valid. Not removing.".format(tracked_device_index))
def rename_device(self,old_device_name,new_device_name):
self.devices[new_device_name] = self.devices.pop(old_device_name)
for i in range(len(self.object_names[self.devices[new_device_name].device_class])):
if self.object_names[self.devices[new_device_name].device_class][i] == old_device_name:
self.object_names[self.devices[new_device_name].device_class][i] = new_device_name
def print_discovered_objects(self):
for device_type in self.object_names:
plural = device_type
if len(self.object_names[device_type])!=1:
plural+="s"
print("Found "+str(len(self.object_names[device_type]))+" "+plural)
for device in self.object_names[device_type]:
if device_type == "Tracking Reference":
print(" "+device+" ("+self.devices[device].get_serial()+
", Mode "+self.devices[device].get_model()+
", "+self.devices[device].get_model()+
")")
else:
print(" "+device+" ("+self.devices[device].get_serial()+
", "+self.devices[device].get_model()+")")