forked from schumilo/vUSBf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vusbf.py
294 lines (238 loc) · 10.8 KB
/
vusbf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#!/usr/bin/python
"""
vUSBf: A KVM/QEMU based USB-fuzzing framework.
Copyright (C) 2015 Sergej Schumilo, OpenSource Security Ralf Spenneberg
This file is part of vUSBf.
See the file LICENSE for copying permission.
"""
__author__ = 'Sergej Schumilo'
# suppress scapy ipv6 warning
import logging
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
import sys
import os
import config
from process.only_payload import only_payload_process
from process.multi_process import multi_processing
from process.distributor_process import server
from process.client_process import client
from process.execute_object import execute_object_process
__author__ = 'Sergej Schumilo'
__version__ = '0.2'
splash = " _ _ _ _ \n"
splash += " __ __(_) _ __ | |_ _ _ __ _ | | _ _ ___ | |__ \n"
splash += " \ \ / /| || '__|| __|| | | | / _` || | | | | |/ __|| '_ \ \n"
splash += " \ V / | || | | |_ | |_| || (_| || | | |_| |\__ \| |_) |\n"
splash += " \_/ |_||_| \__| \__,_| \__,_||_| \__,_||___/|_.__/ \n"
splash += " \n"
splash += " / _| _ _ ____ ____ ___ _ __ \n"
splash += " | |_ | | | ||_ /|_ // _ \| '__|\n"
splash += " | _|| |_| | / / / /| __/| | \n"
splash += " |_| \__,_|/___|/___|\___||_| \n"
splash = " _ __ __ __ _______ ____ \n"
splash += " _ __(_)____/ /___ ______ _/ / / / / / ___// __ )\n"
splash += "| | / / / ___/ __/ / / / __ `/ / / / / /\__ \/ __ |\n"
splash += "| |/ / / / / /_/ /_/ / /_/ / / / /_/ /___/ / /_/ / \n"
splash += "|_______/ \__/\__,_/\__,_/_/ \____//____/_____/ \n"
splash += " / __/_ __________ ___ _____ \n"
splash += " / /_/ / / /_ /_ / / _ \/ ___/ \n"
splash += " / __/ /_/ / / /_/ /_/ __/ / \n"
splash += "/_/ \__,_/ /___/___/\___/_/ \n"
# parameter prefix and number of following options
# type, number of parameters, list_of_recommend_parameter, list_of_illegal_parameter
parameter = [['o', 1, [], []],
['e', 1, [], []],
['ef', 1, ['e'], []],
['tf', 1, ['e'], []],
['cf', 1, ['e'], []],
['n', 1, ['e'], ['nr']],
['nr', 1, ['e'], ['n']],
['sp', 2, ['e'], ['rm', 's', 'sc', 'c', 'w']],
['w', 0, ['e'], ['n'], ['rm', 's', 'sp', 'sc', 'c']],
['eon', 3, [], ['rm', 's', 'sc', 'c', 'w']],
['eo', 1, ['o'], ['rm', 's', 'sc', 'c', 'w']],
['r', 0, ['o', 'e'], ['rm', 'sp', 's', 'sc', 'c', 'w']],
['rm', 0, ['o', 'e', 'p'], ['sp', 's', 'sc', 'c', 'w']],
['s', 2, ['e'], ['rm', 'sp', 'r', 'sc', 'c', 'w']],
['sc', 2, ['o', 'p', 'e'], ['rm', 'sp', 'r', 's', 'c', 'n', 'nr', 'w']],
['c', 2, ['o', 'p'], ['rm', 'sp', 'r', 's', 'sc', 'n', 'nr', 'w']],
['C', 0, ['e'], []],
['p', 1, [], []],
['v1', 0, [], ['v2']],
['v2', 0, [], ['v1']],
['L', 0, [], []],
['h', 0, [], []],
['K', 0, [], []],
['sh', 0, [], []],
['l', 0, [], []],
['rl', 0, [], []]]
def parameter_parser(parameter_list):
exec_path = "test_generation/execution.xml"
testcase_path = "test_generation/testcase.xml"
test_path = "test_generation/test.xml"
reload_test = False
shuffle_test = True
if '-ef' in [e[0] for e in parameter_list]:
exec_path = [e[1] for e in parameter_list if e[0] == '-ef'][0]
if '-cf' in [e[0] for e in parameter_list]:
testcase_path = [e[1] for e in parameter_list if e[0] == '-cf'][0]
if '-tf' in [e[0] for e in parameter_list]:
test_path = [e[1] for e in parameter_list if e[0] == '-tf'][0]
if '-rl' in [e[0] for e in parameter_list]:
reload_test = True
# if '-rl' in [e[0] for e in parameter_list]:
# shuffle_test = True
parameter_type = [e[0] for e in parameter_list]
# def execute_object_process(host, port, object_file):
if '-eon' in parameter_type:
print "EXECUTE OBJECT MODE (NETWORK)"
host = [e[1] for e in parameter_list if e[0] == '-eon'][0]
port = int([e[2] for e in parameter_list if e[0] == '-eon'][0])
object_file = [e[3] for e in parameter_list if e[0] == '-eon'][0]
execute_object_process("" + object_file, host=host, port=port)
if '-eo' in parameter_type:
print "EXECUTE OBJECT MODE"
object_file = [e[1] for e in parameter_list if e[0] == '-eo'][0]
target_object = [e[1] for e in parameter_list if e[0] == '-o'][0]
if '-v1' in [e[0] for e in parameter_list]:
config.PRINT_DEVICE_DESCRIPTORS = True
if '-v2' in [e[0] for e in parameter_list]:
config.VERBOSE_LEVEL = 5
execute_object_process("" + object_file, target=target_object)
if '-sp' in parameter_type:
print "ONLY PAYLOAD MODE"
host = [e[1] for e in parameter_list if e[0] == '-sp'][0]
port = int([e[2] for e in parameter_list if e[0] == '-sp'][0])
exec_name = [e[1] for e in parameter_list if e[0] == '-e'][0]
exec_list = []
for e in [e[1] for e in parameter_list if e[0] == '-n']:
exec_list.append(int(e))
only_payload_process(host, port, exec_name, exec_list, exec_path, testcase_path, test_path)
elif '-r' in parameter_type:
print "SINGLE CORE MODE"
target_object = [e[1] for e in parameter_list if e[0] == '-o'][0]
exec_name = [e[1] for e in parameter_list if e[0] == '-e'][0]
exec_list = []
for e in [e[1] for e in parameter_list if e[0] == '-n']:
exec_list.append(int(e))
multi_processing(1, target_object, exec_name, exec_list, exec_path, testcase_path, test_path,
reload_test, shuffle_test)
elif '-rm' in parameter_type:
print "MULTIPROCESSING MODE"
process_number = int([e[1] for e in parameter_list if e[0] == '-p'][0])
target_object = [e[1] for e in parameter_list if e[0] == '-o'][0]
exec_name = [e[1] for e in parameter_list if e[0] == '-e'][0]
exec_list = []
for e in [e[1] for e in parameter_list if e[0] == '-n']:
exec_list.append(int(e))
multi_processing(process_number, target_object, exec_name, exec_list, exec_path, testcase_path, test_path,
reload_test, shuffle_test)
elif '-s' in parameter_type:
print "SERVER MODE"
# target_object = [e[1] for e in parameter_list if e[0] == '-o'][0]
host = [e[1] for e in parameter_list if e[0] == '-s'][0]
port = int([e[2] for e in parameter_list if e[0] == '-s'][0])
exec_name = [e[1] for e in parameter_list if e[0] == '-e'][0]
exec_list = []
for e in [e[1] for e in parameter_list if e[0] == '-n']:
exec_list.append(int(e))
server(host, port, exec_name, exec_list, exec_path, testcase_path, test_path, shuffle_test)
elif '-sc' in parameter_type:
print "HYBRID NOT IMPLEMENTED YET - SORRY :)"
elif '-c' in parameter_type:
print "CLIENT MODE"
target_object = [e[1] for e in parameter_list if e[0] == '-o'][0]
process_number = int([e[1] for e in parameter_list if e[0] == '-p'][0])
host = [e[1] for e in parameter_list if e[0] == '-c'][0]
port = int([e[2] for e in parameter_list if e[0] == '-c'][0])
client(process_number, target_object, host, port, reload_test)
# TODO FIX ME
elif '-l' in parameter_type:
print "List payloads:\n"
payload_file = os.listdir("payload/")
for payload in payload_file:
if payload.endswith(".obj"):
print "=> " + payload
if os.path.isfile("payload/" + payload.split(".obj")[0] + ".info"):
print "\t====INFO================================"
f = open("payload/" + payload.split(".obj")[0] + ".info")
for line in f:
print "\t" + line.replace("\n", "")
f.close()
else:
print "\t====INFO================================"
print "\t no info"
print "\t========================================"
elif '-L' in parameter_type:
print "List emulators:\n"
emulators = os.listdir("emulator/")
for emulator in emulators:
if emulator.endswith(".py") and emulator != "__init__.py" and emulator != "emulator.py":
print "=> " + emulator.split(".py")[0]
def check_parameter(parameter_list):
for a in parameter_list:
data = [e for e in parameter if e[0] == a[0][1:]][0]
# illegal
for element in parameter_list:
if element[0][1:] in data[3]:
# print element
print "Illegal parameter: -" + element
return False
# recommend
for element in data[2]:
if element not in [e[0][1:] for e in parameter_list]:
print "Parameter not found: -" + element
return False
return True
def main():
global splash
print splash
print "A KVM/QEMU based USB-fuzzing framework."
print __author__ + ", OpenSource Training Spenneberg 2015"
print "Version: " + __version__
print ""
print "Type -h for help"
if len(sys.argv[1:]) == 0:
return
parameter_list = argv_parser()
if parameter_list is None:
return
if check_parameter(parameter_list):
parameter_parser(parameter_list)
else:
return
def print_help():
f = open("help.txt")
for line in f:
print line,
def argv_parser():
data = sys.argv[1:]
if '-h' in data:
print_help()
return None
else:
parameter_list = []
i = 0
parameter_element = []
for element in data:
if i == 0:
if element.replace("-", "") in [e[0] for e in parameter]:
if len(parameter_element) != 0:
parameter_list.append(parameter_element)
parameter_element = []
i = [e[1] for e in parameter if e[0] == element.replace("-", "")][0]
parameter_element.append(element)
else:
raise Exception("illegal parameter error")
else:
parameter_element.append(element)
i -= 1
if parameter_element is not None:
if parameter_element not in parameter_list:
value = [e[1] for e in parameter if e[0] == parameter_element[0][1:]][0]
if len(parameter_element) != value + 1:
raise Exception("illegal parameter error")
parameter_list.append(parameter_element)
return parameter_list
if __name__ == "__main__":
main()