-
Notifications
You must be signed in to change notification settings - Fork 0
/
nvme.py
178 lines (157 loc) · 5.38 KB
/
nvme.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# !/usr/bin/env python
# coding: utf-8
#
# Copyright 2020 Tsecond Inc. All Rights Reserved.
#
# Unauthorized copying of this file, via any medium is strictly prohibited
# Proprietary and confidential
#
# Written by: Manavalan Krishnan 11/12/20
#
from logging import debug
from libutils import run_cmd
from json import loads
import concurrent.futures
from itertools import repeat
import multiprocessing as mp
def parallel_exe(func_name,*values, num_threads = 0.8*mp.cpu_count()):
with concurrent.futures.ThreadPoolExecutor(max_workers = num_threads) as executor:
results = executor.map(func_name, *values)
return results
def nvme_list_drives():
""" Enumerates all NVME drives in the system
Returns:
return code and value tuple
return code: 0 for success and non-zero for failure
value: list of drives as json on success or error string on failure
"""
debug("Listing all nvme drives in the system")
cmd = "sudo nvme list -o json"
rc, stdout, stderr = run_cmd(cmd)
out = stderr
if not rc:
out = loads(stdout)['Devices']
return rc, out
def sata_list_drives():
""" Enumerates all SATA drives in the system
Returns:
return code and value tuple
return code: 0 for success and non-zero for failure
value: list of drives as json on success or error string on failure
"""
debug("Listing all sata drives in the system")
cmd = "lsblk -o KNAME,TYPE,SIZE,SERIAL,MODEL,SUBSYSTEMS --json -b"
rc, stdout, stderr = run_cmd(cmd)
out = stderr
if not rc:
out = loads(stdout)['blockdevices']
return rc, out
def nvme_get_drive_info(drive):
""" Gets the detailed runtime info about the drive
Args:
drive: drive path ex. /dev/nvme0n1
Returns:
return code and value tuple
return code: 0 for success and non zero for failure
value: json document on success or error string
on failure
"""
debug("Getting detailed information for the drive {}".format(drive))
cmd = 'sudo nvme smart-log {} -o json'.format(drive)
rc, stdout, stderr = run_cmd(cmd)
out = stderr
if not rc:
out = stdout
return rc, out
def nvme_erase_drive(drive):
""" Securely erases the drive content
Args:
drive: drive path ex. /dev/nvme0n1
Returns:
return code and value tuple
return code: 0 for success and non zero for failure
value: error string on failure, success message on success
"""
debug("Securely erasing the drive {}".format(drive))
cmd = "sudo nvme format --force --ses=1 {}".format(drive)
return run_cmd(cmd)
def sata_erase_drive(drive):
""" Securely erases the drive content
Args:
drive: drive path ex. /dev/nvme0n1
Returns:
return code and value tuple
return code: 0 for success and non zero for failure
value: error string on failure, success message on success
"""
return 0, 'Erase Success of drive {}'.format(drive), ''
def nvme_erase_drives(drives, parallel=False):
""" Securely erases the drive content of given drives
Args:
drives: list of drive paths ex. /dev/nvme0n1,/dev/nvme1n1
parallel: Execute the erasing in parallel
Returns:
return code, stdout and stderr tuple
return code: 0 for success and non zero for failure
"""
debug("Securely erasing drives: {}".format(",".join(drives)))
errmsg = ""
outmsg = ""
rc = 0
results = []
if(parallel):
# values = drives,repeat(keyfile)
results = parallel_exe(nvme_erase_drive,drives)
else:
for drive in drives:
results.append(nvme_erase_drive(drive))
for result in results:
rc, out, err = result
if rc:
errmsg += err
return_code = 1
outmsg += out
return return_code, outmsg, errmsg
# if not parallel:
# for drive in drives:
# rcode, out, err = nvme_erase_drive(drive)
# if rcode:
# rc = 1
# err_msg += err
# out_msg += out
# return rc, out_msg, err_msg
def sata_erase_drives(drives, parallel=False):
""" Securely erases the drive content of given drives
Args:
drives: list of drive paths ex. /dev/nvme0n1,/dev/nvme1n1
parallel: Execute the erasing in parallel
Returns:
return code, stdout and stderr tuple
return code: 0 for success and non zero for failure
"""
debug("Securely erasing drives: {}".format(",".join(drives)))
errmsg = ""
outmsg = ""
rc = 0
results = []
if(parallel):
# values = drives,repeat(keyfile)
results = parallel_exe(sata_erase_drive,drives)
else:
for drive in drives:
results.append(sata_erase_drive(drive))
for result in results:
rc, out, err = result
if rc:
errmsg += err
rc = 1
outmsg += out
return rc, outmsg, errmsg
# if not parallel:
# for drive in drives:
# rcode, out, err = nvme_erase_drive(drive)
# if rcode:
# rc = 1
# err_msg += err
# out_msg += out
# return rc, out_msg, err_msg