-
Notifications
You must be signed in to change notification settings - Fork 2
/
raidpir_client.py
535 lines (399 loc) · 17.5 KB
/
raidpir_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
#!/usr/bin/env python3
"""
<Author>
Daniel Demmler
(inspired from upPIR by Justin Cappos)
(inspired from a previous version by Geremy Condra)
<Date>
January 2019
<Description>
Client code for retrieving RAID-PIR files. This program uses a manifest
to communicate with a vendor and retrieve a list of mirrors. The client
then _privately_ downloads the appropriate files from mirrors in the mirror
list. None of the mirrors can tell what file or files were downloaded.
For more technical explanation, please see the paper.
<Usage>
see python raidpir_client.py --help
$ python raidpir_client.py
[--retrievemanifestfrom <IP>:<PORT>]
[-r <REDUNDANCY>]
[-R]
[-p]
[-b]
[-t]
[--vendorip <IP>]
file1 [file2 ...]
<Options>
See below
"""
# This file is laid out in two main parts. First, there are some helper
# functions to do moderately complex things like retrieving a block from a
# mirror or split a file into blocks. The second part contains the option
# parsing and main. To get an overall feel for the code, it is recommended
# to follow the execution from main on.
#
# EXTENSION POINTS:
#
# Making the client extensible is a major problem. In particular, we will
# need to modify mirror selection, block selection, malicious mirror detection,
# and avoiding slow nodes simultaneously. To do this effectively, we need
# some sort of mechanism that gives the programmer control over how to handle
# these.
#
# The XORRequestor interface is used to address these issues.
# The programmer defines an object that is provided the manifest,
# mirrorlist, and blocks to retrieve. The XORRequestor object must support
# several methods: get_next_xorrequest(), notify_failure(xorrequest),
# notify_success(xorrequest, xordata), and return_block(blocknum). The
# request_blocks_from_mirrors function in this file will use threads to call
# these methods to determine what to retrieve. The notify_* routines are
# used to inform the XORRequestor object of prior results so that it can
# decide how to issue future block requests. This separates out the 'what'
# from the 'how' but has a slight loss of control. Note that the block
# reconstruction, etc. is done here to allow easy extensibility of malicious
# mirror detection / vendor notification.
#
# The manifest file could also be extended to support huge files (those that
# span multiple releases). The client would need to download files from
# multiple releases and then stitch them back together. This would require
# minor changes (or possibly could be done using this code as a black box).
#
import sys
import optparse
# helper functions that are shared
import raidpirlib as lib
# used to issue requests in parallel
import threading
import simplexorrequestor
import session
# for basename
import os.path
# to sleep...
import time
_timer = lib._timer
def _request_helper(rxgobj, tid):
"""Private helper to get requests.
Multiple threads will execute this, each with a unique tid."""
thisrequest = rxgobj.get_next_xorrequest(tid)
#the socket is fixed for each thread, so we only need to do this once
socket = thisrequest[0]['sock']
# go until there are no more requests
while thisrequest != ():
bitstring = thisrequest[2]
try:
# request the XOR block...
lib.request_xorblock(socket, bitstring)
except Exception as e:
if 'socked' in str(e):
rxgobj.notify_failure(thisrequest)
sys.stdout.write('F')
sys.stdout.flush()
else:
# otherwise, re-raise...
raise
# regardless of failure or success, get another request...
thisrequest = rxgobj.get_next_xorrequest(tid)
# and that's it!
return
def _request_helper_chunked(rxgobj, tid):
"""Private helper to get requests with chunks.
Potentially multiple threads will execute this, each with a unique tid."""
thisrequest = rxgobj.get_next_xorrequest(tid)
#the socket is fixed for each thread, so we only need to do this once
socket = thisrequest[0]['sock']
rqtype = thisrequest[3] #the request type is also fixed
# go until there are no more requests
while thisrequest != ():
chunks = thisrequest[2]
try:
# request the XOR block...
if rqtype == 1: # chunks and seed expansion
lib.request_xorblock_chunked_rng(socket, chunks)
elif rqtype == 2: # chunks, seed expansion and parallel
lib.request_xorblock_chunked_rng_parallel(socket, chunks)
else: # only chunks (redundancy)
lib.request_xorblock_chunked(socket, chunks)
except Exception as e:
if 'socked' in str(e):
rxgobj.notify_failure(thisrequest)
sys.stdout.write('F')
sys.stdout.flush()
else:
# otherwise, re-raise...
raise
# regardless of failure or success, get another request...
thisrequest = rxgobj.get_next_xorrequest(tid)
# and that's it!
return
def request_blocks_from_mirrors(requestedblocklist, manifestdict, redundancy, rng, parallel):
"""
<Purpose>
Retrieves blocks from mirrors
<Arguments>
requestedblocklist: the blocks to acquire
manifestdict: the manifest with information about the release
<Side Effects>
Contacts mirrors to retrieve blocks. It uses some global options
<Exceptions>
TypeError may be raised if the provided lists are invalid.
socket errors may be raised if communications fail.
<Returns>
A dict mapping blocknumber -> blockcontents.
"""
# let's get the list of mirrors...
if _commandlineoptions.vendorip == None:
# use data from manifest
mirrorinfolist = lib.retrieve_mirrorinfolist(manifestdict['vendorhostname'], manifestdict['vendorport'])
else:
# use commandlineoption
mirrorinfolist = lib.retrieve_mirrorinfolist(_commandlineoptions.vendorip)
print("Mirrors: ", mirrorinfolist)
if _commandlineoptions.timing:
setup_start = _timer()
# no chunks (regular upPIR / Chor)
if redundancy == None:
# let's set up a requestor object...
rxgobj = simplexorrequestor.RandomXORRequestor(mirrorinfolist, requestedblocklist, manifestdict, _commandlineoptions.numberofmirrors, _commandlineoptions.batch, _commandlineoptions.timing)
if _commandlineoptions.timing:
setup_time = _timer() - setup_start
_timing_log.write(str(len(rxgobj.activemirrors[0]['blockbitstringlist']))+"\n")
_timing_log.write(str(len(rxgobj.activemirrors[0]['blockbitstringlist']))+"\n")
print("Blocks to request:", len(rxgobj.activemirrors[0]['blockbitstringlist']))
if _commandlineoptions.timing:
req_start = _timer()
# let's fire up the requested number of threads. Our thread will also participate (-1 because of us!)
for tid in range(_commandlineoptions.numberofmirrors - 1):
threading.Thread(target=_request_helper, args=[rxgobj, tid]).start()
_request_helper(rxgobj, _commandlineoptions.numberofmirrors - 1)
# wait for receiving threads to finish
for mirror in rxgobj.activemirrors:
mirror['rt'].join()
else: # chunks
# let's set up a chunk requestor object...
rxgobj = simplexorrequestor.RandomXORRequestorChunks(mirrorinfolist, requestedblocklist, manifestdict, _commandlineoptions.numberofmirrors, redundancy, rng, parallel, _commandlineoptions.batch, _commandlineoptions.timing)
if _commandlineoptions.timing:
setup_time = _timer() - setup_start
_timing_log.write(str(len(rxgobj.activemirrors[0]['blocksneeded']))+"\n")
_timing_log.write(str(len(rxgobj.activemirrors[0]['blockchunklist']))+"\n")
print("# Blocks needed:", len(rxgobj.activemirrors[0]['blocksneeded']))
if parallel:
print("# Requests:", len(rxgobj.activemirrors[0]['blockchunklist']))
#chunk lengths in BYTE
global chunklen
global lastchunklen
chunklen = (manifestdict['blockcount'] / 8) / _commandlineoptions.numberofmirrors
lastchunklen = lib.bits_to_bytes(manifestdict['blockcount']) - (_commandlineoptions.numberofmirrors-1)*chunklen
if _commandlineoptions.timing:
req_start = _timer()
# let's fire up the requested number of threads. Our thread will also participate (-1 because of us!)
for tid in range(_commandlineoptions.numberofmirrors - 1):
threading.Thread(target=_request_helper_chunked, args=[rxgobj, tid]).start()
_request_helper_chunked(rxgobj, _commandlineoptions.numberofmirrors - 1)
# wait for receiving threads to finish
for mirror in rxgobj.activemirrors:
mirror['rt'].join()
rxgobj.cleanup()
if _commandlineoptions.timing:
req_time = _timer() - req_start
recons_time, comptimes, pings = rxgobj.return_timings()
avg_ping = sum(pings) / _commandlineoptions.numberofmirrors
avg_comptime = sum(comptimes) / _commandlineoptions.numberofmirrors
_timing_log.write(str(setup_time)+ "\n")
_timing_log.write(str(req_time)+ "\n")
_timing_log.write(str(recons_time)+ "\n")
_timing_log.write(str(avg_comptime)+ " " + str(comptimes)+ "\n")
_timing_log.write(str(avg_ping)+ " " + str(pings)+ "\n")
# okay, now we have them all. Let's get the returned dict ready.
retdict = {}
for blocknum in requestedblocklist:
retdict[blocknum] = rxgobj.return_block(blocknum)
return retdict
def request_files_from_mirrors(requestedfilelist, redundancy, rng, parallel, manifestdict):
"""
<Purpose>
Reconstitutes files by privately contacting mirrors
<Arguments>
requestedfilelist: the files to acquire
redundancy: use chunks and overlap this often
rng: use rnd to generate latter chunks
parallel: query one block per chunk
manifestdict: the manifest with information about the release
<Side Effects>
Contacts mirrors to retrieve files. They are written to disk
<Exceptions>
TypeError may be raised if the provided lists are invalid.
socket errors may be raised if communications fail.
<Returns>
None
"""
neededblocks = []
#print "Request Files:"
# let's figure out what blocks we need
for filename in requestedfilelist:
theseblocks = lib.get_blocklist_for_file(filename, manifestdict)
# add the blocks we don't already know we need to request
for blocknum in theseblocks:
if blocknum not in neededblocks:
neededblocks.append(blocknum)
# do the actual retrieval work
blockdict = request_blocks_from_mirrors(neededblocks, manifestdict, redundancy, rng, parallel)
# now we should write out the files
for filename in requestedfilelist:
filedata = lib.extract_file_from_blockdict(filename, manifestdict, blockdict)
# let's check the hash
thisfilehash = lib.find_hash(filedata, manifestdict['hashalgorithm'])
for fileinfo in manifestdict['fileinfolist']:
# find this entry
if fileinfo['filename'] == filename:
if thisfilehash == fileinfo['hash']:
# we found it and it checks out!
break
else:
raise Exception("Corrupt manifest has incorrect file hash despite passing block hash checks!")
else:
raise Exception("Internal Error: Cannot locate fileinfo in manifest!")
# open the filename w/o the dir and write it
filenamewithoutpath = os.path.basename(filename)
open(filenamewithoutpath, "wb").write(filedata)
print("wrote", filenamewithoutpath)
########################## Option parsing and main ###########################
_commandlineoptions = None
def parse_options():
"""
<Purpose>
Parses command line arguments.
<Arguments>
None
<Side Effects>
All relevant data is added to _commandlineoptions
<Exceptions>
These are handled by optparse internally. I believe it will print / exit
itself without raising exceptions further. I do print an error and
exit if there are extra args...
<Returns>
The list of files to retrieve
"""
global _commandlineoptions
# should be true unless we're initing twice...
assert _commandlineoptions == None
parser = optparse.OptionParser()
parser.add_option("", "--retrievemanifestfrom", dest="retrievemanifestfrom",
type="string", metavar="vendorIP:port", default="",
help="Specifies the vendor to retrieve the manifest from (default None).")
parser.add_option("", "--printfilenames", dest="printfiles",
action="store_true", default=False,
help="Print a list of all available files in the manifest file.")
parser.add_option("", "--vendorip", dest="vendorip", type="string", metavar="IP",
default=None, help="Vendor IP for overwriting the value from manifest; for testing purposes.")
parser.add_option("-m", "--manifestfile", dest="manifestfilename",
type="string", default="manifest.dat",
help="The manifest file to use (default manifest.dat).")
parser.add_option("-k", "--numberofmirrors", dest="numberofmirrors",
type="int", default=2,
help="How many servers do we query? (default 2)")
parser.add_option("-r", "--redundancy", dest="redundancy",
type="int", default=None,
help="Activates chunks and specifies redundancy r (how often they overlap). (default None)")
parser.add_option("-R", "--rng", action="store_true", dest="rng", default=False,
help="Use seed expansion from RNG for latter chunks (default False). Requires -r")
parser.add_option("-p", "--parallel", action="store_true", dest="parallel", default=False,
help="Query one block per chunk in parallel (default False). Requires -r")
parser.add_option("-b", "--batch", action="store_true", dest="batch", default=False,
help="Request the mirror to do computations in a batch. (default False)")
parser.add_option("-t", "--timing", action="store_true", dest="timing", default=False,
help="Do timing measurements and print them at the end. (default False)")
parser.add_option("-c", "--comment", type="string", dest="comment", default="",
help="Debug comment on this run, used to name timing log file.")
# let's parse the args
(_commandlineoptions, remainingargs) = parser.parse_args()
# Force the use of a seeded rng (-R) if MB (-p) is used. Temporary, until -p without -R is implemented.
if _commandlineoptions.parallel:
_commandlineoptions.rng = True
# sanity check parameters
# k >= 2
if _commandlineoptions.numberofmirrors < 2:
print("Mirrors to contact must be > 1")
sys.exit(1)
# r >= 2
if _commandlineoptions.redundancy != None and _commandlineoptions.redundancy < 2:
print("Redundancy must be > 1")
sys.exit(1)
# r <= k
if _commandlineoptions.redundancy != None and (_commandlineoptions.redundancy > _commandlineoptions.numberofmirrors):
print("Redundancy must be less or equal to number of mirrors (", _commandlineoptions.numberofmirrors, ")")
sys.exit(1)
# RNG or parallel query without chunks activated
if (_commandlineoptions.rng or _commandlineoptions.parallel) and not _commandlineoptions.redundancy:
print("Chunks must be enabled and redundancy set (-r <number>) to use RNG or parallel queries!")
sys.exit(1)
if len(remainingargs) == 0 and _commandlineoptions.printfiles == False:
print("Must specify at least one file to retrieve!")
sys.exit(1)
#filename(s)
_commandlineoptions.filestoretrieve = remainingargs
def start_logging():
global _timing_log
global total_start
logfilename = time.strftime("%y%m%d") + "_" + _commandlineoptions.comment
logfilename += "_k" + str(_commandlineoptions.numberofmirrors)
if _commandlineoptions.redundancy:
logfilename += "_r" + str(_commandlineoptions.redundancy)
if _commandlineoptions.rng:
logfilename += "_R"
if _commandlineoptions.parallel:
logfilename += "_p"
if _commandlineoptions.batch:
logfilename += "_b"
cur_time = time.strftime("%y%m%d-%H%M%S")
_timing_log = open("timing_" + logfilename + ".log", "a")
_timing_log.write(cur_time + "\n")
_timing_log.write(str(_commandlineoptions.filestoretrieve) + " ")
_timing_log.write(str(_commandlineoptions.numberofmirrors) + " ")
_timing_log.write(str(_commandlineoptions.redundancy) + " ")
_timing_log.write(str(_commandlineoptions.rng) + " ")
_timing_log.write(str(_commandlineoptions.parallel) + " ")
_timing_log.write(str(_commandlineoptions.batch) + "\n")
total_start = _timer()
def main():
"""main function with high level control flow"""
# If we were asked to retrieve the mainfest file, do so...
if _commandlineoptions.retrievemanifestfrom:
# We need to download this file...
rawmanifestdata = lib.retrieve_rawmanifest(_commandlineoptions.retrievemanifestfrom)
# ...make sure it is valid...
manifestdict = lib.parse_manifest(rawmanifestdata)
# ...and write it out if it's okay
open(_commandlineoptions.manifestfilename, "wb").write(rawmanifestdata)
else:
# Simply read it in from disk
rawmanifestdata = open(_commandlineoptions.manifestfilename, "rb").read()
manifestdict = lib.parse_manifest(rawmanifestdata)
# we will check that the files are in the release
# find the list of files
filelist = lib.get_filenames_in_release(manifestdict)
if (manifestdict['blockcount'] < _commandlineoptions.numberofmirrors * 8) and _commandlineoptions.redundancy != None:
print("Block count too low to use chunks! Try reducing the block size or add more files to the database.")
sys.exit(1)
if _commandlineoptions.printfiles:
print("Manifest - Blocks:", manifestdict['blockcount'], "x", manifestdict['blocksize'], "Byte - Files:\n", filelist)
if _commandlineoptions.timing:
_timing_log.write(str(manifestdict['blocksize']) + "\n")
_timing_log.write(str(manifestdict['blockcount']) + "\n")
# ensure the requested files are in there...
for filename in _commandlineoptions.filestoretrieve:
if filename not in filelist:
print("The file", filename, "is not listed in the manifest.")
sys.exit(2)
# don't run PIR if we're just printing the filenames in the manifest
if len(_commandlineoptions.filestoretrieve) > 0:
request_files_from_mirrors(_commandlineoptions.filestoretrieve, _commandlineoptions.redundancy, _commandlineoptions.rng, _commandlineoptions.parallel, manifestdict)
if __name__ == '__main__':
print("RAID-PIR Client", lib.pirversion)
parse_options()
if _commandlineoptions.timing:
start_logging()
main()
if _commandlineoptions.timing:
ttime = _timer() - total_start
_timing_log.write(str(ttime)+ "\n")
_timing_log.close()