-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
271 lines (230 loc) · 9.92 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import socket
import ssl
import os
import subprocess
import argparse
def create_dir(dir):
"""
Create a directory if it doesn't exist.
:param dir: Directory to be created
"""
if not os.path.exists(dir):
os.makedirs(dir)
return
def generate_private_key(key_path):
if not os.path.exists(key_path):
# Generate the private key
subprocess.run(["openssl", "genpkey", "-algorithm", "RSA", "-out", key_path])
return
def generate_self_signed_cert(key_path, cert_path, common_name):
if not os.path.exists(cert_path):
# Generate the self-signed certificate using the private key
subprocess.run(["openssl", "req", "-new", "-x509", "-key", key_path, "-out", cert_path, "-subj", "/CN="+common_name])
return
def find_next_avail_idx(report_dir):
"""
Find the next available index in the report directory.
:param report_dir: Directory containing folders with naming conventions clt0, clt1, clt2, etc.
:return: The next available index as an integer.
"""
index = 0
while True:
folder_name = f"clt{index}"
folder_path = os.path.join(report_dir, folder_name)
if not os.path.exists(folder_path):
return index
index += 1
return index
def prep_sev_guest_kernel_module():
# Check if the sev-guest kernel module is loaded
if check_sev_guest_module():
print("sev-guest kernel module is loaded.")
else:
print("sev-guest kernel module is not loaded. Attempting to load it...")
if load_sev_guest_module():
print("sev-guest kernel module loaded successfully.")
else:
print("Failed to load sev-guest kernel module.")
return False
# Check if the /dev/sev-guest device is available
if check_sev_guest_device():
print("/dev/sev-guest device is available.")
else:
print("/dev/sev-guest device is not available.")
return False
return True
def check_sev_guest_module():
"""
Check if the sev-guest kernel module is loaded.
:return: True if the module is loaded, False otherwise.
"""
try:
# Use lsmod command to check if the sev_guest module is loaded
cmd = 'lsmod | grep sev_guest'
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def load_sev_guest_module():
"""
Load the sev-guest kernel module using sudo modprobe.
:return: True if the module is loaded successfully, False otherwise.
"""
try:
# Use sudo modprobe command to load the sev_guest module
cmd = 'sudo modprobe sev_guest'
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def check_sev_guest_device():
"""
Check if the /dev/sev-guest device is available.
:return: True if the device is available, False otherwise.
"""
try:
# Use ls command to check if /dev/sev-guest file exists
cmd = 'ls /dev/sev-guest'
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def generate_attestation_report(snpguest, report_dir, clt_folder):
"""
Generate an attestation report using snpguest.
:param snpguest: Path to snpguest binary.
:param report_dir: Directory to store the attestation report and related files.
:return: Path to the generated attestation report if successful.
:raises: Exception if failed to generate the attestation report.
"""
report_file = os.path.join(clt_folder, "attestation_report.bin")
nonce_file = os.path.join(clt_folder, "random-request-file.txt")
try:
# Generate the attestation report using snpguest command
cmd = f"sudo {snpguest} report --random -a {report_file} --request {nonce_file}"
subprocess.run(cmd, shell=True, check=True)
return report_file
except subprocess.CalledProcessError as e:
error_message = f"Failed to generate attestation report. {e}"
raise Exception(error_message)
def exchange_certificates(client_socket, client_cert_file, server_cert_file):
"""
Perform certificate exchange between client and server.
:param client_socket: Client socket object
:param client_cert_file: File path of the client's certificate
:param server_cert_file: File path of the server's certificate
"""
# Receive the client's certificate length
client_cert_len = int.from_bytes(client_socket.recv(4), byteorder='big')
# Receive the client's certificate
client_certificate = client_socket.recv(client_cert_len)
# Store the client's certificate
with open(client_cert_file, 'wb') as client_cert:
client_cert.write(client_certificate)
# Read the server certificate
with open(server_cert_file, 'rb') as cert:
server_certificate = cert.read()
# Send the length of the server certificate to the client
client_socket.send(len(server_certificate).to_bytes(4, byteorder='big'))
# Send the server certificate to the client
client_socket.sendall(server_certificate)
def handle_client_connection(client_socket, snpguest, report_dir, cert_path, key_file):
"""
Handle client connection and perform SEV-SNP attestation.
:param client_socket: Client socket object
:param snpguest: Path to snpguest utility executable
:param report_dir: Directory to store attestation reports
:param cert_path: File path of the server's certificate
:param key_file: File path of the server's private key
"""
# Create the respective client's folder for certificate and report
folder_prefix = "clt"
index = find_next_avail_idx(report_dir)
clt_folder = os.path.join(report_dir, f"{folder_prefix}{index}")
create_dir(clt_folder)
# Perform certificate exchange between client and server
client_cert_file = os.path.join(clt_folder, f"clt{index}_cert.pem")
exchange_certificates(client_socket, client_cert_file, cert_path)
# Create an SSL context
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
# Require client certificate and set the custom verification callback
context.verify_mode = ssl.CERT_REQUIRED
# Load the server certificate and private key
context.load_cert_chain(certfile=cert_path, keyfile=key_file)
context.load_verify_locations(cafile=client_cert_file)
# Wrap the socket with SSL/TLS
ssl_socket = context.wrap_socket(client_socket, server_side=True)
try:
# Perform TLS handshake
ssl_socket.do_handshake()
# Perform AMD SEV-SNP attestation
try:
attestation_report = generate_attestation_report(snpguest, report_dir, clt_folder)
print(f"Attestation report generated successfully: {attestation_report}")
except Exception as e:
print(f"Error: {str(e)}")
ssl_socket.close()
return
# Read the attestation report file
with open(attestation_report, "rb") as file:
report_content = file.read()
# Send the length of the attestation report to the client
ssl_socket.send(len(report_content).to_bytes(4, byteorder='big'))
# Send the attestation report to the client
ssl_socket.sendall(report_content)
# Receive data from the client
data = ssl_socket.recv(1024).decode()
print("Received from client:", data)
# Send a response to the client
response = "Message received: {}".format(data)
ssl_socket.send(response.encode())
except ssl.SSLError:
print("TLS handshake failed.")
# Close the SSL socket
ssl_socket.close()
def run_server(ip_addr, port, snpguest, report_dir, cert_path, key_file):
"""
Run the server and listen for client connections.
:param ip_addr: IP address to bind the server socket
:param port: Port to listen for connections
:param snpguest: Path to snpguest utility executable
:param report_dir: Directory to store attestation reports
:param cert_path: File path of the server's certificate
:param key_file: File path of the server's private key
"""
# Create a TCP socket
server_socket = socket.create_server((ip_addr, port))
# Accept client connections
while True:
client_socket, client_address = server_socket.accept()
handle_client_connection(client_socket, snpguest, report_dir, cert_path, key_file)
# Close the server socket
server_socket.close()
def main():
# Check if the sev-guest kernel module is loaded and
# if the /dev/sev-guest device is present
if (not prep_sev_guest_kernel_module()):
exit()
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument('-ip', '--ip_addr', default='192.168.122.48', help="IP address (default: 192.168.122.48)")
parser.add_argument('-p', '--port', default=8888, help="Port to connect (default: 8888)")
parser.add_argument('-r', '--report_dir', default='./reports', help="Directory to store attestation reports (default: ./reports)")
parser.add_argument('-sg', '--snpguest', default='./snpguest/target/debug/snpguest', help="Location of the snpguest utility executable (default: ./snpguest/target/debug/snpguest)")
parser.add_argument('-s', '--secrets_dir', default='./srv_secrets', help="Directory to store server's secret (default: ./srv_secrets)")
parser.add_argument('-k', '--key_file', default='server.key', help="Name of the server's key file (default: server.key)")
parser.add_argument('-sc', '--self_cert_file', default='server.pem', help="Name of the server's certificate file (default: server.pem)")
parser.add_argument('-cn', '--common_name', default='localhost', help="Common name to be used as a certificate parameter (default: localhost)")
args = parser.parse_args()
create_dir(args.secrets_dir)
create_dir(args.report_dir)
# Generate client private key and self-signed certificate
key_file = os.path.join(args.secrets_dir, args.key_file)
generate_private_key(key_file)
cert_path = os.path.join(args.secrets_dir, args.self_cert_file)
generate_self_signed_cert(key_file, cert_path, args.common_name)
# generate_attestation_report(args.snpguest, args.report_dir)
# Run the server
run_server(args.ip_addr, args.port, args.snpguest, args.report_dir, cert_path, key_file)
if __name__ == '__main__':
main()