-
Notifications
You must be signed in to change notification settings - Fork 0
/
pycangw.py
68 lines (52 loc) · 1.64 KB
/
pycangw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env -S python3
import asyncio
import can
import logging
log = logging.getLogger(__name__)
def rx_loop_gen(rx_bus, tx_busses):
async def loop():
reader = can.AsyncBufferedReader()
loop = asyncio.get_running_loop()
notifier = can.Notifier(rx_bus, [reader], loop=loop)
async for msg in reader:
for bus in tx_busses:
bus.send(msg)
return loop
async def run(configs):
if len(configs) < 2:
log.critical(f"Not enough devices specified ({len(configs)}), minimum 2 needed")
return
busses = []
for cfg in configs:
busses.append(can.Bus(**cfg))
coroutines = []
for i in range(0, len(busses)):
l = rx_loop_gen(busses[i], [bus for ii, bus in enumerate(busses) if ii != i])
coroutines.append(l())
await asyncio.gather(*coroutines)
if __name__ == "__main__":
import argparse
import textwrap
import json
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(
"""\
CAN gateway using python-can
The config file (JSON) is expected to be an array of
dictionaries - each of which represents one python-can
bus. Each dictionary will be passed to the Bus
constructor like so: can.Bus(**dict) .
"""
),
)
parser.add_argument("config")
args = parser.parse_args()
configs = None
with open(args.config, "r") as f:
configs = json.load(f)
try:
log.info("Starting Gateway")
asyncio.run(run(configs))
except KeyboardInterrupt:
pass