-
Notifications
You must be signed in to change notification settings - Fork 0
/
ping_chart.py
177 lines (141 loc) · 5.49 KB
/
ping_chart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import argparse
import asyncio # for executing async shell command
import platform # for getting the operating system name
import subprocess # for executing a shell command
import time # for sleeping
from datetime import datetime
import plotext as plt
from tqdm import tqdm # progress indicator
## accept user args for:
# duration in seconds
# host name of server running this script
# target server name
# target server network address (IP or FQDN)
def set_args() -> argparse.ArgumentParser:
arg_parser = argparse.ArgumentParser(
description='Creates charts from output of network pings.'
)
arg_parser.add_argument(
'-d',
'--duration',
help='The duration, in seconds, to run the network test for.',
type=int,
required=True,
)
arg_parser.add_argument(
'-n',
'--name',
help='The name of this host running the network test.',
type=str,
required=True,
)
arg_parser.add_argument(
'-t',
'--target',
help='The target host of the ping connections. Either IP or FQDN.',
type=str,
required=True,
)
arg_parser.add_argument(
'--nochart',
help='If this flag is set, no chart will be generated.',
required=False,
)
return arg_parser
async def run_ping_test(duration: int, name: str, target: str, result_file: str):
# Because the 'ping' and 'tqdm' command has blocking i/o, it will block the task
# from updating. We use asyncio.to_thread (new in python 3.9) to avoid.
await asyncio.gather(
asyncio.to_thread(display_progress_bar, duration),
asyncio.to_thread(start_ping_test, duration, target, result_file),
)
def start_ping_test(duration: int, target: int, result_file: str):
# Below code is borrowed from: https://stackoverflow.com/a/32684938
# Option for the number of packets as a function of
ping_cmd_param = '-n' if platform.system().lower() == 'windows' else '-c'
ping_loop_cmd = 'ping %s %s %s | while read pong;' % (
ping_cmd_param,
duration,
target,
)
ping_loop_cmd = ping_loop_cmd + ' do echo' + ' "$(date +%Y-%m-%d:%H:%M:%S): $pong"'
ping_loop_cmd = ping_loop_cmd + ' >> %s ; done' % result_file
subprocess.call(ping_loop_cmd, shell=True)
def display_progress_bar(duration):
for i in tqdm(
range(duration), unit='ping', desc='Network test progress', unit_scale=True
):
time.sleep(1)
def read_ping_results(result_file) -> tuple[list[int], list[int]]:
DATETIME_FORMAT = '%Y-%m-%d:%H:%M:%S'
ping_timestamps = []
ping_responsetimes_ms = []
# Read results file
test_results = open(result_file)
test_results.seek(0)
for line in test_results:
if 'from' in line:
if platform.system().lower() == 'windows':
ping_timestamps.append(
datetime.strptime(
line[: line.index(': Reply') - 10], DATETIME_FORMAT
)
)
ping_responsetimes_ms.append(
int((line[line.index('time') + 5 : line.index('ms')]))
)
else:
ping_timestamps.append(
datetime.strptime(line[: line.index('from') - 11], DATETIME_FORMAT)
)
ping_responsetimes_ms.append(
float((line[line.index('time') + 5 : line.index('ms')]))
)
test_results.close() # close results file
return ping_timestamps, ping_responsetimes_ms
def generate_text_chart(name: str, target: str, result_file: str):
ping_timestamps, ping_responsetimes_ms = read_ping_results(result_file)
plt.date_form('d/m/Y H:M:S')
times = plt.datetimes_to_string(ping_timestamps)
scatter_limit = 40
# If fewer than `scatter_limit` examples, connect the dots, else a scatter plot is fine
# Number picked because it seemed like a nice limit.
if len(times) < scatter_limit:
plt.plot(times, ping_responsetimes_ms)
else:
plt.scatter(times, ping_responsetimes_ms)
plt.title('Ping latency from ' + name + ' to ' + target)
plt.xlabel('Time of ping')
plt.ylabel('Ping response time (ms)')
plt.ylim(0)
plt.plot_size(
plt.terminal_width(), (plt.terminal_height() or 20) / 2
) # set default term height of 20
plt.clc()
plt.interactive(True)
plt.show()
def main():
# Sets user arguments, then calls the class method to parse.
args = set_args().parse_args()
# Creates a result file timestamped with current date and time as a string in the format YYYYMMDD_HHMMSS
result_file = 'result-%s-to-%s-%s.txt' % (
args.name,
args.target,
datetime.now().strftime('%Y%m%d_%H%M%S'),
)
# result_file = 'result-my-mac-to-www.github.com-20231026_222922.txt'
print('Results will be saved to this file: %s' % result_file)
print('To stop the test, press Ctrl-C until the test stops.')
if not args.nochart:
print(
'A chart will be generated at the end of the test. You can save the chart, or use the generate_chart() function after a series of tests to recover.'
)
# todo: display chart immediately during test, and update throughout
asyncio.run(
run_ping_test(args.duration, args.name, args.target, result_file),
)
if not args.nochart:
print('Generating chart...')
generate_text_chart(args.name, args.target, result_file)
if __name__ == '__main__':
main()