forked from dictation-toolbox/aenea
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_x11.py
353 lines (314 loc) · 11.2 KB
/
server_x11.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
#!/usr/bin/python
import os
import sys
import time
import jsonrpclib
import jsonrpclib.SimpleJSONRPCServer
import config
_MOUSE_BUTTONS = {"left":1, "middle":2, "right":3, "wheelup":4, "wheeldown":5}
_MOUSE_CLICKS = {"click":"click", "down":"mousedown", "up":"mouseup"}
_KEY_PRESSES = {"press":"", "up":"up", "down":"down"}
_MOUSE_MOVE_COMMANDS = {"absolute":"mousemove",
"relative":"mousemove_relative",
"relative_active":"mousemove_active"}
_SERVER_INFO = {
"window_manager":"awesome",
"operating_system":"linux",
"display":"X11",
"server":"aenea_reference",
"server_version":1
}
_XPROP_PROPERTIES = {
"_NET_WM_DESKTOP(CARDINAL)":"desktop",
"WM_WINDOW_ROLE(STRING)":"role",
"_NET_WM_WINDOW_TYPE(ATOM)":"type",
"_NET_WM_PID(CARDINAL)":"pid",
"WM_LOCALE_NAME(STRING)":"locale",
"WM_CLIENT_MACHINE(STRING)":"client_machine",
"WM_NAME(STRING)":"name"
}
_MOD_TRANSLATION = {
"alt":"Alt_L",
"shift":"Shift_L",
"control":"Control_L",
"super":"Super_L",
"hyper":"Hyper_L",
"meta":"Meta_L",
"win":"Super_L",
"flag":"Super_L",
}
_KEY_TRANSLATION = {
"ampersand":"ampersand",
"apostrophe":"apostrophe",
"apps":"Menu",
"asterisk":"asterisk",
"at":"at",
"backslash":"backslash",
"backspace":"BackSpace",
"backtick":"grave",
"bar":"bar",
"caret":"asciicircum",
"colon":"colon",
"comma":"comma",
"del":"Delete",
"dollar":"dollar",
"dot":"period",
"dquote":"quotedbl",
"enter":"Return",
"equal":"equal",
"exclamation":"exclam",
"hash":"numbersign",
"hyphen":"minus",
"langle":"less",
"lbrace":"braceleft",
"lbracket":"bracketleft",
"lparen":"parenleft",
"minus":"minus",
"npadd":"KP_Add",
"npdec":"KP_Decimal",
"npdiv":"KP_Divide",
"npmul":"KP_Multiply",
"percent":"percent",
"pgdown":"Next",
"pgup":"Prior",
"plus":"plus",
"question":"question",
"rangle":"greater",
"rbrace":"braceright",
"rbracket":"bracketright",
"rparen":"parenright",
"semicolon":"semicolon",
"shift":"Shift_L",
"slash":"slash",
"space":"space",
"squote":"apostrophe",
"tilde":"asciitilde",
"underscore":"underscore",
"win":"Super_L",
}
def update_key_translation(translation):
for key in (["left", "right", "up", "down", "home", "end", "tab", "insert",
"escape"] + ["f%i" % i for i in xrange(1, 13)]):
translation[key] = key[0].upper() + key[1:]
for index in xrange(10):
translation["np%i" % index] = "KP_%i" % index
for c in range(ord("a"), ord("z")) + range(ord("0"), ord("9")):
translation[chr(c)] = chr(c)
translation[chr(c).upper()] = chr(c).upper()
update_key_translation(_KEY_TRANSLATION)
def run_command(command, executable="xdotool"):
command_string = "%s %s" % (executable, command)
os.system(command_string)
def read_command(command, executable="xdotool"):
with os.popen("%s %s" % (executable, command), "r") as fd:
rval = fd.read()
return rval
def write_command(message, arguments="type --file -", executable="xdotool"):
with os.popen("%s %s" % (executable, arguments), "w") as fd:
fd.write(message)
def get_active_window(_xdotool=None):
"""Returns the window id and title of the active window."""
flush_xdotool(_xdotool)
window_id = read_command("getactivewindow")
if window_id:
window_id = int(window_id)
window_title = read_command("getwindowname %i" % window_id).strip()
return window_id, window_title
else:
return None, None
def get_geometry(window_id=None, _xdotool=None):
flush_xdotool(_xdotool)
if window_id is None:
window_id, _ = get_active_window()
geo = dict([val.lower() for val in line.split("=")]
for line in read_command(("getwindowgeometry --shell %i"
% window_id)).strip().split("\n"))
geo = dict((key, int(value)) for (key, value) in geo.iteritems())
return dict((key, geo[key]) for key in ("x", "y", "width", "height", "screen"))
def transform_relative_mouse_event(event):
geo = get_geometry()
dx, dy = map(int, map(float, event.split()))
return [("mousemove", "%i %i" % (geo["x"] + dx, geo["y"] + dy))]
def get_context(_xdotool=None):
"""return a dictionary of window properties for the currently active window.
it is fine to include platform specific information, but at least include
title and executable."""
flush_xdotool(_xdotool)
window_id, window_title = get_active_window()
if window_id is None:
return {}
properties = {
"id":window_id,
"title":window_title,
}
for line in read_command("-id %s" % window_id, "xprop").split("\n"):
split = line.split(" = ", 1)
if len(split) == 2:
rawkey, value = split
if split[0] in _XPROP_PROPERTIES:
property_value = value[1:-1] if "(STRING)" in rawkey else value
properties[_XPROP_PROPERTIES[rawkey]] = property_value
elif rawkey == "WM_CLASS(STRING)":
window_class_name, window_class = value.split('", "')
properties["cls_name"] = window_class_name[1:]
properties["cls"] = window_class[:-1]
# Sigh...
properties["executable"] = None
try:
properties["executable"] = os.readlink("/proc/%s/exe" % properties["pid"])
except OSError:
ps = read_command("%s" % properties["pid"], executable="ps").split("\n")[1:]
if ps:
try:
properties["executable"] = ps[0].split()[4]
except Exception:
pass
return properties
def key_press(key, modifiers=(), direction="press", count=1, count_delay=None, _xdotool=None):
"""press a key possibly modified by modifiers. direction may be "press",
"down", or "up". modifiers may contain "alt", "shift", "control", "super".
this X11 server also supports "hyper", "meta", and "flag"
(same as super). count is number of times to press it. count_delay delay
in ms between presses."""
delay = "" if (count_delay is None or count < 2) else "--delay %i " % count_delay
modifiers = [_MOD_TRANSLATION.get(mod, mod) for mod in modifiers]
key_to_press = _KEY_TRANSLATION.get(key, key)
keys = (["keydown " + key for key in modifiers] +
(["key%s %s" % (_KEY_PRESSES[direction], key_to_press)] * count) +
["keyup " + key for key in reversed(modifiers)])
if _xdotool is not None:
_xdotool.extend(keys)
else:
run_command(delay + " ".join(keys))
def write_text(text, paste=False, _xdotool=None):
"""send text formatted exactly as written to active window. If paste is True,
will use X11 PRIMARY clipboard to paste the text instead of typing it.
See config.ENABLE_XSEL documentation for more information on this."""
# Workaround for https://github.com/jordansissel/xdotool/pull/29
if text:
if paste and config.ENABLE_XSEL:
# swap primary and secondary X11 clipboards so we can restore after paste
run_command("-x", executable="xsel")
# copy the pasted text to the clipboard
write_command(text, arguments="-i", executable="xsel")
# paste by simulating midde click
# TODO: can we do this even in programs that don't have a middle click?
# if not, we may need a blacklist of buggy programs.
click_mouse(2, _xdotool=_xdotool)
flush_xdotool(_xdotool)
# nuke the text we selected
run_command("-c", executable="xsel")
# restore the previous clipboard contents
run_command("-x", executable="xsel")
else:
flush_xdotool(_xdotool)
write_command(text, arguments="type --file - --delay 0")
def click_mouse(button, direction="click", count=1, count_delay=None, _xdotool=None):
"""click the mouse button specified. button maybe one of "right", "left",
"middle", "wheeldown", "wheelup". This X11 server will also accept a
number."""
delay = "" if (count_delay is None or count < 2) else "--delay %i" % count_delay
repeat = "" if count == 1 else "--repeat %i" % count
try:
button = _MOUSE_BUTTONS[button]
except KeyError:
button = int(button)
command = ("%s %s %s %s" %
(_MOUSE_CLICKS[direction], delay, repeat, button))
if _xdotool is not None:
_xdotool.append(command)
else:
run_command(command)
def move_mouse(x, y, reference="absolute", proportional=False, phantom=None, _xdotool=None):
"""move the mouse to the specified coordinates. reference may be one of
"absolute", "relative", or "relative_active". if phantom is not None,
it is a button as click_mouse. If possible, click that location without
moving the mouse. If not, the server will move the mouse there and click."""
geo = get_geometry()
if proportional:
x = geo["width"] * x
y = geo["height"] * y
command = _MOUSE_MOVE_COMMANDS[reference]
if command == "mousemove_active":
command = "mousemove --window %i" % get_active_window()[0]
commands = ["%s %f %f" % (command, x, y)]
if phantom is not None:
commands.append("click %s" % _MOUSE_BUTTONS[phantom])
commands.append("mousemove restore")
if _xdotool is not None:
_xdotool.extend(commands)
else:
run_command(" ".join(commands))
def pause(amount, _xdotool=None):
"""pause amount in ms."""
if _xdotool is not None:
_xdotool.append("sleep %f" % (amount / 1000.))
else:
time.sleep(amount / 1000.)
def server_info(_xdotool=None):
flush_xdotool(_xdotool)
return _SERVER_INFO
def flush_xdotool(actions):
if actions:
run_command(" ".join(actions))
del actions[:]
def list_rpc_commands():
_RPC_COMMANDS = {
"get_context":get_context,
"key_press":key_press,
"write_text":write_text,
"click_mouse":click_mouse,
"move_mouse":move_mouse,
"server_info":server_info,
"pause":pause,
}
return _RPC_COMMANDS
def multiple_actions(actions):
"""execute multiple rpc commands, aborting on any error.
will not return anything ever. actions is an array of objects, possessing
"method", "params", and "optional" keys. See also JSON-RPC multicall.
Guaranteed to execute in specified order."""
xdotool = []
for (method, parameters, optional) in actions:
commands = list_rpc_commands()
if method in commands:
commands[method](*parameters, _xdotool=xdotool, **optional)
else:
break
flush_xdotool(xdotool)
def setup_server(host, port):
server = jsonrpclib.SimpleJSONRPCServer.SimpleJSONRPCServer((host, port))
for command in list_rpc_commands():
server.register_function(globals()[command])
server.register_function(multiple_actions)
return server
if __name__ == "__main__":
if len(sys.argv) == 2 and sys.argv[-1] == "getcontext":
ctx = get_context()
try:
import pprint
pprint.pprint(ctx)
except ImportError:
print ctx
else:
if "-d" in sys.argv or "--daemon" in sys.argv:
if os.fork() == 0:
os.setsid()
if os.fork() == 0:
os.chdir("/")
os.umask(0)
# Safe upper bound on number of fds we could possibly have opened.
for fd in range(64):
try:
os.close(fd)
except OSError:
pass
os.open(os.devnull, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
else:
os._exit(0)
else:
os._exit(0)
server = setup_server(config.HOST, config.PORT)
server.serve_forever()