forked from cizra/pycat
-
Notifications
You must be signed in to change notification settings - Fork 2
/
modular.py
239 lines (202 loc) · 8.02 KB
/
modular.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import re
import threading
import time
from mcp.mcp_negotiate import Negotiate
from requests.structures import CaseInsensitiveDict
def stack(line: str) -> list[str]:
assert '\n' not in line
if line.startswith('#$#'): # mcp
return [line]
out = []
startmatch = 0
for i in range(1, len(line) - 1):
if line[i] == ';' and line[i - 1] != ';' and line[i + 1] != ';':
out.append(line[startmatch:i].replace(';;', ';'))
startmatch = i + 1
out.append(line[startmatch:].replace(';;', ';'))
return out
class TimerMixin(object):
def __init__(self):
self.previous_checkpoint = time.time()
self.stopflag = threading.Event()
self.timer_thread = threading.Thread(target=TimerMixin.timer_thread_fn, args=(self,))
self.timer_thread.start()
def timer_thread_fn(self):
while not self.stopflag.is_set():
now = time.time()
delta = now - self.previous_checkpoint
self.timeslice(delta)
self.previous_checkpoint = now
time.sleep(0.1)
def timeslice(self, delta):
def update(name, rem_time):
timer = self.timers[name]
self.timers[name] = (timer[0], timer[1], rem_time, timer[3])
return rem_time
# deep copy timers to permit editing timers inside timers (changes dict size)
tmp_timers = {}
for name, timer in self.timers.items():
tmp_timers[name] = timer
remove = []
try:
for name, timer in tmp_timers.items():
oneshot, period, remaining, fn = timer
remaining = update(name, remaining - delta)
if remaining < 0:
if oneshot:
remove.append(name)
else:
update(name, period)
try:
fn(self)
except Exception as e:
self.log(e)
self.mud.logException(e)
except RuntimeError as e:
# RuntimeError: dictionary changed size during iteration
self.log(e)
self.mud.logException(e)
for name in remove:
del self.timers[name]
@staticmethod
def mktimer(period, fn, oneshot=False):
return (oneshot, period, period, fn)
@staticmethod
def mktimernow(period, fn, oneshot=False):
return (oneshot, period, 0, fn)
@staticmethod
def mkdelay(delay, fn):
return (True, delay, delay, fn)
def quit(self):
self.stopflag.set()
self.timer_thread.join()
def setTimerRemaining(self, timer, remainingTime):
self.timers[timer] = (self.timers[timer][0], self.timers[timer][1], remainingTime, self.timers[timer][3])
class ModularClient(TimerMixin):
def __init__(self, mud, *args):
# self.modules must be set up by child class
self.mud = mud
self.state = {}
self.gmcp = CaseInsensitiveDict()
self.mcp = [Negotiate(self)]
self.aliases = {}
self.triggers = {}
self.oneTimeTriggers = {}
self.timers = self.getTimers()
TimerMixin.__init__(self)
if not hasattr(self, 'modules'):
self.modules = {}
self.autowalk = True
for m in self.modules.values():
m.world = self
self.aliases.update(m.getAliases())
self.triggers.update(m.getTriggers())
self.oneTimeTriggers.update(m.getOneTimeTriggers())
self.timers.update(m.getTimers())
def getHostPort(self) -> tuple[str, str | int]:
for m in self.modules.values():
if hasattr(m, 'getHostPort'):
return m.getHostPort()
return input("Hostname: "), input("Port: ")
def alias(self, line: str) -> bool:
# It's possible to move command stacking and spamrepeat into modules, at the cost of horribly complicating
# everything in this function. Implementing them here results in less overall ugliness.
if self.autowalk:
sublines = stack(line)
if len(sublines) > 1:
for subline in sublines:
if not self.alias(subline):
self.mud.send(subline)
return True
else:
line = sublines[0]
if re.match(r'#\d+ .+', line):
match = re.match(r'#(\d+) (.+)', line)
times, cmd = match.groups()
for i in range(int(times)):
if not self.alias(cmd):
self.send(cmd)
return True
for alias, action in self.aliases.items():
if re.match(alias, line):
if isinstance(action, str):
self.send(action)
else:
output = action(self, re.match(alias, line).groups())
if output: # might be for side effects
self.send(output)
return True
else:
for module in self.modules.values():
# If alias wants to signal that it consumed the command, return True -- it won't be sent to MUD then
# Otherwise, the line is sent to MUD
if hasattr(module, 'alias'):
if module.alias(line):
return True
return False
def trigger(self, raw):
stripped = self.mud.strip_ansi(raw).strip()
for trigger, response in self.triggers.items():
if re.match(trigger, stripped):
if isinstance(response, str):
self.send(response)
else:
output = response(self, re.match(trigger, stripped).groups())
if output: # might be for side effects
self.send(output)
break
toDelete = []
for trigger, response in self.oneTimeTriggers.items():
if re.match(trigger, stripped):
if isinstance(response, str):
self.send(response)
else:
output = response(self, re.match(trigger, stripped).groups())
if output: # might be for side effects
self.send(output)
toDelete.append(trigger)
break
for item in toDelete:
del self.oneTimeTriggers[item]
replacement = None
for module in self.modules.values():
if hasattr(module, 'trigger'):
repl = module.trigger(raw, stripped)
if replacement is None and repl is not None: # modules come in order of priority, so first one wins
replacement = repl
return replacement
def handleGmcp(self, cmd, value):
for module in self.modules.values():
if hasattr(module, 'handleGmcp'):
try:
module.handleGmcp(cmd, value)
except Exception as e:
self.log("Exception in handleGmcp of {}: {}".format(module, e))
self.mud.logException(e)
def handleMcp(self, name: str, keys: dict[str, str], line: str) -> bool:
for package in self.mcp:
if package.handle(name, keys):
return True
print('Got unknown MCP message: ' + line)
return False
def handleMcpMultiline(self, tag: str, key: str, val: str) -> None:
for package in self.mcp:
if package.handleMultiline(tag, key.strip(':'), val):
return
def quit(self) -> None:
for module in self.modules.values():
if hasattr(module, 'quit'):
module.quit()
self.log("Stopping timers")
TimerMixin.quit(self)
self.log("Stopped timers")
def send(self, *args):
self.mud.send(*args)
def log(self, *args, **kwargs) -> None:
self.mud.log(*args, **kwargs)
def show(self, *args):
self.mud.show(*args)
def getTimers(self):
return {}
def getClass():
return ModularClient