-
Notifications
You must be signed in to change notification settings - Fork 1
/
sacker.py
136 lines (109 loc) · 4.73 KB
/
sacker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""sacker - convenient wrappers around struct.pack and struct.unpack"""
from struct import Struct
import re
class Sacker(object):
r"""
>>> sacker = Sacker('>', '''H magic
... 4s data
... b byte''')
>>> sacker.unwrap('\x00\xffDATA\x01', list)[1]
[('magic', 255), ('data', 'DATA'), ('byte', 1)]
>>> sacker.wrap({'magic': 255, 'data': 'DATA', 'byte': 1})
'\x00\xffDATA\x01'
"""
def __init__(self, endian, spec, name = None, length = None):
if length is not None:
assert parse(spec, endian)[0].size == length
self.endian = endian
self.spec = spec
self.name = name
def unwrap(self, binary, data_factory = dict):
return unwrap(binary, self.spec, self.name, data_factory, self.endian)
def wrap(self, data):
return wrap(data, self.spec, self.endian)
class BadDataError(Exception):
pass
def unwrap(binary, spec, data_name = None, data_factory = dict, endian = '<'):
r"""Unwrap `binary` according to `spec`, return (consumed_length, data)
Basically it's a convenient wrapper around struct.unpack. Each non-empty
line in spec must be: <struct format> <field name> [== <test> <action>]
<struct format> - struct format producing one value (except for 'x' format)
<field name> - dictionary key to put unpacked value into
<test> - optional test an unpacked value be equal to
<action> - what to do when test fails: `!` (bad data) or `?` (unsupported)
Example:
>>> unwrap('\xff\x00DATA1234\x10something else', '''# comment
... H magic == 0xff !
... 4s data
... 4x
... b byte''',
... data_factory = list)
(11, [('magic', 255), ('data', 'DATA'), ('byte', 16)])
"""
struct, names, tests, s_indices = parse(spec, endian)
# unpack binary data
length = struct.size
sub = binary[:length]
if isinstance(sub, memoryview):
sub = sub.tobytes()
values = list(struct.unpack(sub))
# strip padding from end of strings
for i in s_indices:
values[i] = values[i].rstrip('\x00')
# run optional tests
for i, test, action in tests:
if values[i] != test:
adj = {'!': 'Bad', '?': 'Unsupported'}[action]
raise BadDataError(' '.join(w for w in
[adj, data_name, names[i], '== %r' % values[i]] if w))
return length, data_factory(zip(names, values))
def wrap(data, spec, endian = '<'):
r"""Wrap `data` dict to binary according to `spec`. Opposite of `unwrap`.
Example:
>>> wrap({'data': 'DATA', 'num': 121},'''4s data
... b num
... h opt # missing data means 0
... ''')
'DATAy\x00\x00'
"""
struct, names, tests, s_indices = parse(spec, endian)
return struct.pack(*[data.get(name, 0) for name in names])
def strip(s):
try:
s = s[:s.index('#')]
except ValueError:
pass
return s.strip()
_cache = {}
def parse(spec, endian):
try:
return _cache[endian, spec]
except KeyError:
matches = [re.match("""(?P<format>\w+)
(
\s+
(?P<name>\w+)
\s*
( == \s* (?P<test>.+) \ (?P<action>[!?]) )?
)?
$""", strip(s), re.VERBOSE)
for s in spec.split('\n') if strip(s)]
for n, m in enumerate(matches):
if not m:
raise SyntaxError('Bad spec, LINE %d' % (n+1))
if not (m.group('name') or re.match('(\d+)x', m.group('format'))):
raise SyntaxError('Bad spec, name required, LINE %d' % (n+1))
formats = [m.group('format') for m in matches if m.group('name')]
names = [m.group('name') for m in matches if m.group('name')]
tests = [(m.group('test'), m.group('action')) for m in matches]
tests = [(i, eval(test, {}), action)
for i, (test, action) in enumerate(tests) if test]
# string format spec indices
s_indices = [i for i, c in enumerate(formats)
if re.match(r'(\d+)s', c)]
struct = Struct(endian + ''.join(m.group('format') for m in matches))
_cache[endian, spec] = struct, names, tests, s_indices
return _cache[endian, spec]
if __name__ == '__main__':
import doctest
doctest.testmod()