forked from celestiaorg/roll-op
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processes.py
263 lines (210 loc) · 9.84 KB
/
processes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""
This module enable starting, killing and waiting for background processes. See
:py:class:`BackgroundProcessManager` for details. Use this through the singleton
value :py:data:`PROCESS_MGR` (its singleton nature is not enforced).
"""
import time
from multiprocessing import Process
from subprocess import Popen
from threading import Thread
from typing import Callable
import libroll as lib
from exithooks import EXIT_HOOKS_MGR
####################################################################################################
class BackgroundProcessManager:
"""
Enables starting, killing and waiting for background processes. It also registers an atexit
handler to kill all background processes on process exit.
"""
################################################################################################
def __init__(self):
self.processes = []
"""List of background processes that might still be running."""
EXIT_HOOKS_MGR.register(self._exit_hook)
################################################################################################
def start_py(
self,
descr: str,
target: Callable,
args: tuple[...],
on_exit: Callable = None) \
-> Process:
"""
Starts and returns a new python background process, running the target function and passing
the given arguments to it. If `on_exit` is specified, it will be called when the process
exits (which is monitored in a separate thread).
"""
process = Process(
target=target,
args=args,
name=f"roll_py_wrapper({descr})")
self.processes.append(process)
process.start()
if on_exit is not None:
monitor = Thread(target=self.monitor_process_exit, args=(process, on_exit))
monitor.daemon = True # kill thread when main thread exits
process.monitor = monitor # only attach to process after starting, or pickling error!
monitor.start()
return process
################################################################################################
def start(
self,
descr: str,
command: str | list[str],
on_exit: Callable = None,
**kwargs) \
-> Popen:
"""
Starts and returns a new background process, running the given command. The parameters
`descr`, `command` and `kwargs` are passed to
:py:func:`lib.run` which is used to start the process, additionally specifying the
`wait=False` option.
If `on_exit` is specified, it will be called when the process exits (which is monitored in a
separate thread). It is NOT called when we kill the process ourselves.
"""
# Using exec is necessary on linux, where killing the process only kills the outer shell if
# not present.
exec_command = "exec " + command if isinstance(command, str) else ["exec", *command]
process: Popen = lib.run(descr, exec_command, **kwargs, wait=False)
process.name = f"subprocess({descr})"
self.processes.append(process)
if on_exit is not None:
monitor = Thread(target=self.monitor_process_exit, args=(process, on_exit))
monitor.daemon = True # kill thread when main thread exits
process.monitor = monitor # only attach to process after starting, or pickling error!
monitor.start()
return process
################################################################################################
@staticmethod
def is_alive(process: Process | Popen) -> bool:
"""
Returns true if the given process is still running.
"""
if isinstance(process, Process):
return process.is_alive()
else:
# We have to use this internal method because `process.poll()` can return None from
# a terminated process!
# See https://bugs.python.org/issue2475 and
# https://github.com/python/cpython/issues/46727 for details.
# noinspection PyUnresolvedReferences,PyProtectedMember
return process._internal_poll(_deadstate="dead") is None
################################################################################################
def monitor_process_exit(self, process: Process, on_exit: Callable):
"""
Monitors the given process and calls the given function when the process exits for other
reasons than being explicitly killed by :py:func:`kill` or :py:func:`kill_all`.
"""
while True:
self._wait(process)
if not self.is_alive(process):
break
if process in self.processes:
# if not, it was killed by us, no need to notify
on_exit()
self.processes.remove(process)
################################################################################################
def kill(self, process: Process, ensure: bool = False):
"""
Tries to terminate the given process. If ensure is True, it will wait one second for the
process to terminate, then will try to kill (different more forceful signal on Linux) the
process if it is still alive. It will wait another second and log an error if the process
is still alive.
The process is removed from the list of background processes no matter what.
"""
if process in self.processes:
self.processes.remove(process)
# Terminate process if alive
if not self.is_alive(process):
return
process.terminate()
if not ensure:
return
# Only printing now because it causes waiting
lib.debug(f"Terminating {process.name}...")
# Sleep then kill process if still alive
time.sleep(1)
if not self.is_alive(process):
return
process.kill()
time.sleep(1)
# Log if process is still alive
if self.is_alive(process):
print(f"Failed to promptly terminate {process.name}")
################################################################################################
@staticmethod
def _wait(process: Process | Popen, timeout: int | None = None):
if isinstance(process, Process):
process.join(timeout)
else:
process.wait(timeout)
################################################################################################
def wait(self, process: Process | Popen, timeout: int | None = None):
"""
Waits for the given process to complete. If `timeout` is not None, the
implementation waits at most this many seconds. If the join fails, an error is logged.
"""
try:
self._wait(process, timeout)
if self.is_alive(process):
if timeout is None:
print(f"Failed to wait for {process.name} to complete")
else:
print(f"Process {process.name} didn't complete within {timeout} seconds")
finally:
# The process may have been already been terminated and removed
if process in self.processes:
self.processes.remove(process)
################################################################################################
def _exit_hook(self, _signum: int):
self.kill_all()
################################################################################################
def kill_all(self):
"""
Try to terminate (then kill — uses a different more forceful signal on Linux) all background
processes. Acts like `kill` with `ensure=True` for all background processes.
Clears the list of background processes before returning.
"""
# Try terminating all processes
alive_count = 0
processes = self.processes.copy()
self.processes.clear()
for process in processes:
if self.is_alive(process):
alive_count += 1
lib.debug(f"Terminating {process.name}")
process.terminate()
# There was no process to terminate
if alive_count == 0:
return
print("Terminating background processes...")
# Give them a second to terminate, then try to kill processes that are left
time.sleep(1)
alive_count = 0
for process in processes:
if self.is_alive(process):
alive_count += 1
lib.debug(f"Killing {process.name}")
process.kill()
# There was no process to kill
if alive_count == 0:
return
# Give killed processes a second to terminate, then log if they are still alive
time.sleep(1)
for process in processes:
if self.is_alive(process):
print(f"Failed to promptly kill {process.name}")
################################################################################################
def wait_all(self, per_process_timeout: int | None = None):
"""
Waits for all background processes to complete (by calling `join`).
If `per_process_timeout` is not None, the implementation waits at most this many seconds for
each process. If a join fails, an error is logged.
The list of background processes is cleared before returning.
"""
for process in self.processes:
self.wait(process, per_process_timeout)
####################################################################################################
PROCESS_MGR = BackgroundProcessManager()
"""Singleton instance of :py:class:`BackgroundProcessManager`."""
####################################################################################################