This repository has been archived by the owner on Jan 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
testarc.py
executable file
·230 lines (192 loc) · 8.93 KB
/
testarc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/env python3
# Run the ARC test suite in YAML format against an implementation
#
# DEPENDENCIES:
# ddt, pyyaml, dnslib
#
import unittest
import sys
import tempfile
try:
import yaml
import argparse
import subprocess, os
from ddt import ddt, data
from arcdns import ArcTestResolver
except:
print("Mising dependency. Please check requirements.txt")
sys
DEFAULT_DNS_PORT = 8053
SIGN_TEST_FILE = "tests/arc-draft-sign-tests.yml"
VALIDATE_TEST_FILE = "tests/arc-draft-validation-tests.yml"
def validate_test(self, script, test_case, port, verbose=False):
if verbose:
print("\nTEST: %s" % test_case.tid)
print("DESC: %s" % test_case.test['description'])
print("MSG:")
print(test_case.test['message'])
with tempfile.TemporaryDirectory() as tmpdir:
with open('{0}/message.txt'.format(tmpdir), 'w') as f:
f.write(test_case.test["message"])
with ArcTestResolver(test_case.txt_records, port, verbose):
proc = subprocess.Popen([script, '{0}/message.txt'.format(tmpdir), str(port), str(verbose)], stdout=subprocess.PIPE)
out = proc.communicate()[0].decode("utf-8").strip()
if verbose:
print("RESULT:")
self.assertEqual(out.lower(), test_case.test["cv"].lower(), test_case.tid)
def sign_test(self, script, test_case, port, verbose=False):
if verbose:
print("\nTEST: %s" % test_case.tid)
print("DESC: %s" % test_case.test['description'])
print("MSG:")
print(test_case.test['message'])
with tempfile.TemporaryDirectory() as tmpdir:
with open('{0}/message.txt'.format(tmpdir), 'w') as f:
f.write(test_case.test["message"])
with open('{0}/privatekey.pem'.format(tmpdir), 'w') as f:
f.write(test_case.privatekey)
with ArcTestResolver(test_case.txt_records, port, verbose):
proc = subprocess.Popen([script, '{0}/message.txt'.format(tmpdir), str(port), '{0}/privatekey.pem'.format(tmpdir), test_case.test["srv-id"],
test_case.sel, test_case.domain, test_case.test["sig-headers"], str(test_case.test["t"]), str(verbose)],
stdout=subprocess.PIPE)
out = proc.communicate()[0].decode("utf-8")
if verbose:
print("\nOutput ARC-Set:")
print(out)
print("Expected ARC-Set:")
print("ARC-Seal: %s" % test_case.test["AS"])
print("ARC-Message-Signature: %s" % test_case.test["AMS"])
print("ARC-Authentication-Results: %s" % test_case.test["AAR"])
as_valid = ams_valid = aar_valid = False
if out == "":
aar_valid = test_case.test["AAR"] == ""
ams_valid = test_case.test["AMS"] == ""
as_valid = test_case.test["AS"] == ""
else:
for sig in out.split("\n\n"):
sig = "".join(sig.split())
(k, v) = sig.split(':', 1)
sig_res = set(v.split(';'))
if(k.lower() == "arc-authentication-results"):
s1 = "".join(test_case.test["AAR"].split())
s1 = set(s1.split(';'))
aar_valid = (s1 == sig_res)
elif(k.lower() == "arc-message-signature"):
s1 = "".join(test_case.test["AMS"].split())
s1 = set(s1.split(';'))
ams_valid = (sig_res == s1)
elif(k.lower() == "arc-seal"):
s1 = "".join(test_case.test["AS"].split())
s1 = set(s1.split(';'))
as_valid = (sig_res == s1)
else:
continue
print(aar_valid, ams_valid, as_valid)
if verbose:
print("RESULT:")
self.assertTrue(aar_valid, test_case.tid)
self.assertTrue(as_valid, test_case.tid)
self.assertTrue(ams_valid, test_case.tid)
class ArcValidateTestCase(object):
def __init__(self, tid, test, txt_records):
self.tid = tid
self.test = test
self.txt_records = txt_records
def __str__(self):
return ""
class ArcSignTestCase(object):
def __init__(self, tid, test, domain, sel, privatekey, txt_records):
self.tid = tid
self.test = test
self.domain = domain
self.sel = sel
self.privatekey = privatekey
self.txt_records = txt_records
def __str__(self):
return ""
# This is a little odd, but it lets us dynamically
# create tests from the test file
def genTestClass(op, tests, script, port, verbose=False):
@ddt
class ArcTest(unittest.TestCase):
@data(*tests)
def test(self, test_case):
func = sign_test if op == "sign" else validate_test
func(self, script, test_case, port, verbose)
return ArcTest
def main(op, script, test=None, port=DEFAULT_DNS_PORT, verbose=False):
tests = []
if op == "validate":
scenarios = list(yaml.safe_load_all(open(VALIDATE_TEST_FILE, 'rb')))
for scenario in scenarios:
tests += [ArcValidateTestCase(k, v, scenario["txt-records"]) for
(k, v) in scenario["tests"].items()]
elif op == "sign":
scenarios = list(yaml.safe_load_all(open(SIGN_TEST_FILE, 'rb')))
for scenario in scenarios:
tests += [ArcSignTestCase(k, v, scenario["domain"], scenario["sel"],
scenario["privatekey"], scenario["txt-records"]) for
(k, v) in scenario["tests"].items()]
else:
raise ValueError("invalid operation")
if test:
tests = [t for t in tests if t.tid == test]
testClass = genTestClass(op, tests, script, port, verbose)
suite = unittest.TestLoader().loadTestsFromTestCase(testClass)
v = 2 if verbose else 1
unittest.TextTestRunner(verbosity=v).run(suite)
desc = '''
OVERVIEW:
This script can run either the signing or validation test suites against
an ARC implementation, given a command line wrapper for this logic.
DEPENDENCIES:
specified in requirements.txt
DNS:
During execution of the script a DNS server is started on a local port and
this port is passed to the runner. This server hosts the key files needed
for ARC signature validation. There are two suggested methods of routing
DNS traffic to this server:
1. Stub out your dns calls. You can see this in practice in
runners/validatedkimpy.py
2. OS level DNS rerouting. On *nix, prepending 'nameserver 127.0.0.1' to
/etc/resolv.conf will check localhost:53 for DNS traffic. You'll also
need to pass -p 53 to the script, and run with sudo(to access port 53).
This is only temporary and an be reverted once you're done.
Something like this is necessary to correctly use this script.
VALIDATION:
Running './testarc.py validate script' will call script once for each
test case in the validation suite with the following arguments:
>>> script messagefile dnsport verbose
messagefile - a path to the message being validated
dnsport - a dns server will be running on locaclhost:dnsport
verbose - True/False, if the -v flag is passed to ./testarc.py
The script is expected to return Pass/Fail/None, depending on the ARC
validation status of the message. If this matches the value in the
suite, the test passes.
SIGNING:
Running './testarc.py sign script' will call script once for each
test case in the signing suite with the following arguments:
>>> script messagefile dnsport privatekeyfile authresfile selector domain headers timestamp verbose
messagefile - a path to the message being validated
dnsport - a dns server will be running on locaclhost:dnsport
privatekeyfile - the private key used to sign the message
authserv-id - the authserv-id of the Authentication-Results headers to prefix
selector - the signing selector
domain - the signing domain
headers - a colon separated list of headers to sign
timestamp - a simulated unix timestamp to sign the message with
verbose - True/False, if the -v flag is passed to ./testarc.py
The script is expected to return the ARC-Authentication-Results,
ARC-Seal, and ARC-Message-Signature of the signature, on successive lines.
These are matched up to permutaion(of both headers & tags) to the results
in the test suite.
'''
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=desc, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('op', choices=["sign", "validate"], default="sign", help='Suite to test')
parser.add_argument('script', help='A command line implementation of an arc signing or verification routine')
parser.add_argument('-t', dest='test', metavar='TEST', required=False, help='Specific test to run')
parser.add_argument('-p', dest='port', default=DEFAULT_DNS_PORT, metavar='port', required=False, help='Port to run stubbed dns server on', type=int)
parser.add_argument('-v', dest='verbose', action='store_true', required=False, help='verbose')
args = parser.parse_args()
main(args.op, args.script, args.test, args.port, args.verbose)