-
Notifications
You must be signed in to change notification settings - Fork 7
/
tests.py
193 lines (163 loc) · 7.56 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import unittest
import os
import resource
import base64
import fuzzyhashlib
class BaseFuzzyHashTest(unittest.TestCase):
"""Base fuzzyhashlib test class."""
FUZZY_HASH_CLASS = None
TEST_DATA_PATH = None
KNOWN_RESULT = None
MEM_LEAK_ITERATIONS = 1000
MEM_LEAK_TOLERANCE = 64
@classmethod
def setUpClass(cls):
if cls is BaseFuzzyHashTest:
raise unittest.SkipTest()
super(BaseFuzzyHashTest, cls).setUpClass()
@property
def known_data(self):
dir_path = os.path.dirname(__file__)
data_path = os.path.join(dir_path, "TESTDATA.b64")
with open(data_path, "rb") as data_file:
return base64.b64decode(data_file.read())
def setUp(self):
# Ensure inheriting class specifies the class to test.
if self.FUZZY_HASH_CLASS is None:
msg = "%s did not set FUZZY_HASH_CLASS" % self.__class__.__name__
raise NotImplementedError(msg)
# Ensure inheriting class specifies where to find its library.
if self.TEST_DATA_PATH is None:
msg = "%s did not set TEST_DATA_PATH" % self.__class__.__name__
raise NotImplementedError(msg)
self.assertTrue(os.path.exists(self.TEST_DATA_PATH))
with open(self.TEST_DATA_PATH, "rb") as test_data_file:
self.test_data_1 = test_data_file.read()
with open(__file__, "rb") as test_data_file:
self.test_data_2 = test_data_file.read()
# Generate some test hash objects.
self.h1 = self.FUZZY_HASH_CLASS(self.test_data_1)
self.h2 = self.FUZZY_HASH_CLASS(self.test_data_2)
def test_known_result(self):
computed = self.FUZZY_HASH_CLASS(self.known_data)
known = self.FUZZY_HASH_CLASS(hash=self.KNOWN_RESULT)
self.assertEquals(computed, known)
def test_comparisons(self):
# Test with .compare() method.
self.assertNotEqual(self.h1.hexdigest(), self.h2.hexdigest())
self.assertNotEqual(self.h1.compare(self.h2), 100)
self.assertEqual(self.h1.compare(self.h2),
self.h2.compare(self.h1),
msg="commutative test failed")
# Test with subtraction operator.
self.assertNotEqual(self.h1.hexdigest(), self.h2.hexdigest())
self.assertNotEqual(self.h1 - self.h2, 100)
self.assertEqual(self.h1 - self.h2, self.h2 - self.h1,
msg="commutative test failed")
# Test .compare and subtraction are the same.
self.assertEqual(self.h1 - self.h2, self.h1.compare(self.h2))
# Test comparisons with self a score of 100.
msg = "(%s) comparing self to self did not score 100"
self.assertEqual(self.h1 - self.h1, 100, msg=msg % self.h1.name)
self.assertEqual(self.h2 - self.h2, 100, msg=msg % self.h2.name)
def test_equalities(self):
# gclen's PR enables digest-to-object comparisons. Test combinations.
msg = "(%s) comparing self to self was not equal"
self.assertEqual(self.h1,
self.h1,
msg=msg % self.h1.name)
self.assertEqual(self.h1,
self.h1.hexdigest(),
msg=msg % self.h1.name)
self.assertEqual(self.h1.hexdigest(),
self.h1,
msg=msg % self.h1.name)
self.assertEqual(self.h1.hexdigest(),
self.h1.hexdigest(),
msg=msg % self.h1.name)
self.assertEqual(self.h2,
self.h2,
msg=msg % self.h2.name)
self.assertEqual(self.h2,
self.h2.hexdigest(),
msg=msg % self.h2.name)
self.assertEqual(self.h2.hexdigest(),
self.h2,
msg=msg % self.h2.name)
self.assertEqual(self.h2.hexdigest(),
self.h2.hexdigest(),
msg=msg % self.h2.name)
def test_copy(self):
h3 = self.h1.copy()
self.assertEqual(self.h1.hexdigest(), h3.hexdigest())
self.assertTrue(self.h1 == h3)
self.assertEqual(self.h1, self.h1)
self.assertEqual(self.h1 - h3, h3 - self.h1,
msg="commutative test failed")
def test_update(self):
self.h1.update(self.test_data_2)
self.assertNotEqual(self.h1, self.h1.hexdigest())
def test_create_from_hash(self):
h3 = self.FUZZY_HASH_CLASS(hash=self.h1.hexdigest())
self.assertEquals(h3, self.h1)
# So far all algorithms created from hashes cannot be updated. Test.
with self.assertRaises(fuzzyhashlib.InvalidOperation) as exc:
h3.update("this should error")
def test_leak(self):
initial = resource.getrusage(resource.RUSAGE_SELF)[2]
threshold = initial + self.MEM_LEAK_TOLERANCE
x = 0
delta = 0
buf = 100000 * chr(x & 0xff)
while x < self.MEM_LEAK_ITERATIONS:
# Compute hash for arbitrary data, check if more mem is used.
h1 = self.FUZZY_HASH_CLASS(buf)
current = \
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
delta = current - initial
self.assertLessEqual(current, threshold,
"memory usage increased %s after %d iterations (%s); "
"tolerance: %d" % \
(delta, x, self.h1.name, self.MEM_LEAK_TOLERANCE))
x += 1
class TestSsdeep(BaseFuzzyHashTest):
"""Test fuzzyhashlib.ssdeep"""
FUZZY_HASH_CLASS = fuzzyhashlib.ssdeep
TEST_DATA_PATH = fuzzyhashlib.libssdeep_wrapper.libssdeep_path
KNOWN_RESULT = "192:nU6G5KXSD9VYUKhu1JVF9hFGvV/QiGkS594drFjuHYx5dvTrLh3k" \
"TSEn7HbHR:U9vlKM1zJlFvmNz5VrlkTS07Ht"
class TestSdhash(BaseFuzzyHashTest):
"""Test fuzzyhashlib.sdhash"""
FUZZY_HASH_CLASS = fuzzyhashlib.sdhash
TEST_DATA_PATH = fuzzyhashlib.sdhash_wrapper.sdbf_library_path
KNOWN_RESULT = "sdbf:03:0::11358:sha1:256:5:7ff:160:1:160:IoFBClI" \
"QqFAxCa4JCEns8ACBIAQ1UEwAAkUiSoDIEiyNm5QQCJQDhEGISPghTIDWVVaATIMjJC" \
"hQK4CkgSAgtGCEbIacfGUQgxygkgBEgaRBigAhCoCQO4ZGCEtuB8RgLuQKaAk2AgKA6" \
"SAQGCirEEa1doFBwTwyKiAxLEhRKHAYArAUgAkICheDgGY0QVtLKByAwQSQ4CoFAwBW" \
"eQHyCIqy4IiACikBBKsAAjXoGAhgFEgCpAzEjYYAFoZT0AAB4QEQCDQC0EoiCkpCUVI" \
"I33eqdIAJGioMmBXseEq9Wgg4MxhVNCIRPFMLH6pJyZgRDJDRKAIkcaBC4AEgjIjqAQ=="
def test_invalid_buffer_size_raises(self):
with self.assertRaises(ValueError) as context:
fuzzyhashlib.sdhash("buffer_too_short")
self.assertEquals(context.exception.message,
"sdhash requires buffer >= 512 in size")
def test_update(self):
# Override default to capture .update() being unsupported.
with self.assertRaises(Exception) as context:
self.h1.update(self.test_data_2)
self.assertEquals(context.exception.message,
"sdhash does not support update()")
class TestTlsh(BaseFuzzyHashTest):
"""Test fuzzyhashlib.tlsh"""
FUZZY_HASH_CLASS = fuzzyhashlib.tlsh
TEST_DATA_PATH = fuzzyhashlib.tlsh_wrapper.tlsh_library_path
KNOWN_RESULT = \
"1632623FBA48037706C20162BB9764CBF2" \
"1E903F3B552568354CC1681F6BA6543FB6EA"
MEM_LEAK_ITERATIONS = 10000
MEM_LEAK_TOLERANCE = 1024
def test_invalid_buffer_size_raises(self):
with self.assertRaises(ValueError) as context:
fuzzyhashlib.tlsh("buffer_too_short").hexdigest()
self.assertTrue(
context.exception.message.startswith("tlsh requires buffer"))