-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.py
72 lines (52 loc) · 1.34 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
from operator import itemgetter
import json
import time
from utils import STORAGE, TASK_CHANNEL
from back import FUNCTIONS
import settings
logging.getLogger().setLevel(logging.INFO)
sub = STORAGE.pubsub()
sub.subscribe(TASK_CHANNEL)
def get_message(message):
"""{
'pattern': None,
'type': 'subscribe',
'channel': 'my-second-channel',
'data': 1L,
}"""
if not message:
return
logging.info('MSG: %s', message)
data = message.get('data', {})
return json.loads(data)
def execute(data):
if not data:
return None, None
try:
func_name, arguments = itemgetter('func_name', 'arguments')(data)
except KeyError as exc:
logging.error('[worker] error %s', exc)
return
logging.info('executing: %s, with args: %s', func_name, arguments)
func = FUNCTIONS.get(func_name)
if func:
func(arguments)
return
def __warmup():
# warmup
time.sleep(15) # settling browser
sub.get_message()
STORAGE.publish('TASK_CHANNEL', b'{"func_name": "init", "arguments": {}}')
def process():
__warmup()
while True:
logging.info('tik')
data = get_message(sub.get_message())
if data:
execute(data)
time.sleep(1)
def main():
process()
if __name__ == '__main__':
main()