-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
68 lines (57 loc) · 1.82 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import logging
import sys
from logging import config
from typing import Any, Dict
from rich.logging import RichHandler
from libq import RedisJobStore, Scheduler, types
from libq.worker import AsyncWorker
# from libq.job_store import RedisJobStore
LOG_CONFIG: Dict[str, Any] = dict( # no cov
version=1,
disable_existing_loggers=False,
loggers={
"libq": {"level": "DEBUG", "handlers": ["rich"]},
},
handlers={
"console": {
"class": "logging.StreamHandler",
"formatter": "generic",
"stream": sys.stdout,
},
"rich": {"class": "rich.logging.RichHandler"},
},
formatters={
"generic": {
"format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
"class": "logging.Formatter",
},
"access": {
"format": "%(asctime)s - (%(name)s)[%(levelname)s][%(host)s]: "
+ "%(request)s %(message)s %(status)d %(byte)d",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
"class": "logging.Formatter",
},
},
)
async def create_dummy_job(scheduler: Scheduler, qname="default") -> types.JobPayload:
job = await scheduler.create_job(
"examples.hello.hello_world",
queue=qname,
params={"timeout": 1, "error": False},
interval="30s",
)
return job
if __name__ == "__main__":
logging.config.dictConfig(LOG_CONFIG)
# log = logging.getLogger("streamq.worker")
# log.info("[magenta]Hello[/]", extra={"markup": True})
try:
queues = sys.argv[1]
except IndexError:
queues = "default"
# queues = _queues.split(",")
store = RedisJobStore()
scheduler = Scheduler(store)
worker = AsyncWorker(queues=queues, scheduler=scheduler)
worker.run()