forked from Smeat/fusee-launcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
usb_backend.py
230 lines (173 loc) · 7.95 KB
/
usb_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import ctypes
import platform
import os
from abc import ABC
class HaxBackend(ABC):
"""
Base class for backends for the TegraRCM vuln.
"""
# USB constants used
STANDARD_REQUEST_DEVICE_TO_HOST_TO_ENDPOINT = 0x82
STANDARD_REQUEST_DEVICE_TO_HOST = 0x80
GET_DESCRIPTOR = 0x6
GET_CONFIGURATION = 0x8
# Interface requests
GET_STATUS = 0x0
# List of OSs this class supports.
SUPPORTED_SYSTEMS = []
def __init__(self, skip_checks=False):
""" Sets up the backend for the given device. """
self.skip_checks = skip_checks
def trigger_vulnerability(self, length):
"""
Triggers the actual controlled memcpy.
The actual trigger needs to be executed carefully, as different host OSs
require us to ask for our invalid control request differently.
"""
raise NotImplementedError("Trying to use an abstract backend rather than an instance of the proper subclass!")
@classmethod
def supported(cls, system_override=None):
""" Returns true iff the given backend is supported on this platform. """
# If we have a SYSTEM_OVERRIDE, use it.
if system_override:
system = system_override
else:
system = platform.system()
return system in cls.SUPPORTED_SYSTEMS
@classmethod
def create_appropriate_backend(cls, system_override=None, skip_checks=False):
""" Creates a backend object appropriate for the current OS. """
# Search for a supportive backend, and try to create one.
for subclass in cls.__subclasses__():
if subclass.supported(system_override):
return subclass(skip_checks=skip_checks)
# ... if we couldn't, bail out.
raise IOError("No backend to trigger the vulnerability-- it's likely we don't support your OS!")
def read(self, length):
""" Reads data from the RCM protocol endpoint. """
return bytes(self.dev.read(0x81, length, 3000))
def write_single_buffer(self, data):
"""
Writes a single RCM buffer, which should be USB_XFER_MAX long.
The last packet may be shorter, and should trigger a ZLP (e.g. not divisible by 512).
If it's not, send a ZLP.
"""
return self.dev.write(0x01, data, 3000)
def find_device(self, vid=None, pid=None):
""" Set and return the device to be used """
import usb
dev = usb.core.find(find_all=1, idVendor=vid)
for cfg in dev:
try:
self.dev = cfg
return self.dev
print(type(cfg))
#print( dir(cfg))
print('VendorID=' + hex(cfg.idVendor) + ' & ProductID=' + hex(cfg.idProduct))
except:
print
return self.dev
class LinuxBackend(HaxBackend):
"""
More complex vulnerability trigger for Linux: we can't go through libusb,
as it limits control requests to a single page size, the limitation expressed
by the usbfs. More realistically, the usbfs seems fine with it, and we just
need to work around libusb.
"""
BACKEND_NAME = "Linux"
SUPPORTED_SYSTEMS = ['Linux', 'linux']
SUPPORTED_USB_CONTROLLERS = ['pci/drivers/xhci_hcd', 'platform/drivers/dwc_otg']
SETUP_PACKET_SIZE = 8
IOCTL_IOR = 0x80000000
IOCTL_TYPE = ord('U')
IOCTL_NR_SUBMIT_URB = 10
URB_CONTROL_REQUEST = 2
class SubmitURBIoctl(ctypes.Structure):
_fields_ = [
('type', ctypes.c_ubyte),
('endpoint', ctypes.c_ubyte),
('status', ctypes.c_int),
('flags', ctypes.c_uint),
('buffer', ctypes.c_void_p),
('buffer_length', ctypes.c_int),
('actual_length', ctypes.c_int),
('start_frame', ctypes.c_int),
('stream_id', ctypes.c_uint),
('error_count', ctypes.c_int),
('signr', ctypes.c_uint),
('usercontext', ctypes.c_void_p),
]
def trigger_vulnerability(self, length):
"""
Submit the control request directly using the USBFS submit_urb
ioctl, which issues the control request directly. This allows us
to send our giant control request despite size limitations.
"""
import os
import fcntl
# We only work for devices that are bound to a compatible HCD.
self._validate_environment()
# Figure out the USB device file we're going to use to issue the
# control request.
fd = os.open('/dev/bus/usb/{:0>3d}/{:0>3d}'.format(self.dev.bus, self.dev.address), os.O_RDWR)
# Define the setup packet to be submitted.
setup_packet = \
int.to_bytes(self.STANDARD_REQUEST_DEVICE_TO_HOST_TO_ENDPOINT, 1, byteorder='little') + \
int.to_bytes(self.GET_STATUS, 1, byteorder='little') + \
int.to_bytes(0, 2, byteorder='little') + \
int.to_bytes(0, 2, byteorder='little') + \
int.to_bytes(length, 2, byteorder='little')
# Create a buffer to hold the result.
buffer_size = self.SETUP_PACKET_SIZE + length
buffer = ctypes.create_string_buffer(setup_packet, buffer_size)
# Define the data structure used to issue the control request URB.
request = self.SubmitURBIoctl()
request.type = self.URB_CONTROL_REQUEST
request.endpoint = 0
request.buffer = ctypes.addressof(buffer)
request.buffer_length = buffer_size
# Manually submit an URB to the kernel, so it issues our 'evil' control request.
ioctl_number = (self.IOCTL_IOR | ctypes.sizeof(request) << 16 | ord('U') << 8 | self.IOCTL_NR_SUBMIT_URB)
fcntl.ioctl(fd, ioctl_number, request, True)
# Close our newly created fd.
os.close(fd)
# The other modules raise an IOError when the control request fails to complete. We don't fail out (as we don't bother
# reading back), so we'll simulate the same behavior as the others.
raise IOError("Raising an error to match the others!")
def _validate_environment(self):
"""
We can only inject giant control requests on devices that are backed
by certain usb controllers-- typically, the xhci_hcd on most PCs.
"""
from glob import glob
# If we're overriding checks, never fail out.
if self.skip_checks:
print("skipping checks")
return
# Search each device bound to the xhci_hcd driver for the active device...
for hci_name in self.SUPPORTED_USB_CONTROLLERS:
for path in glob("/sys/bus/{}/*/usb*".format(hci_name)):
if self._node_matches_our_device(path):
return
raise ValueError("This device needs to be on a supported backend. Usually that means plugged into a blue/USB 3.0 port!\nBailing out.")
def _node_matches_our_device(self, path):
"""
Checks to see if the given sysfs node matches our given device.
Can be used to check if an xhci_hcd controller subnode reflects a given device.,
"""
# If this isn't a valid USB device node, it's not what we're looking for.
if not os.path.isfile(path + "/busnum"):
return False
# We assume that a whole _bus_ is associated with a host controller driver, so we
# only check for a matching bus ID.
if self.dev.bus != self._read_num_file(path + "/busnum"):
return False
# If all of our checks passed, this is our device.
return True
def _read_num_file(self, path):
"""
Reads a numeric value from a sysfs file that contains only a number.
"""
with open(path, 'r') as f:
raw = f.read()
return int(raw)