-
Notifications
You must be signed in to change notification settings - Fork 2
/
dongle.py
96 lines (73 loc) · 3.1 KB
/
dongle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""This shall be the representation of an EnOcean dongle."""
import glob
import logging
from os.path import basename, normpath
from enocean.communicators import SerialCommunicator
from enocean.protocol.packet import RadioPacket
import serial
from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send
from .const import SIGNAL_RECEIVE_MESSAGE, SIGNAL_SEND_MESSAGE
_LOGGER = logging.getLogger(__name__)
class EnOceanDongle:
"""Representation of an EnOcean dongle.
The dongle is responsible for receiving the ENOcean frames,
creating devices if needed, and dispatching messages to platforms.
"""
def __init__(self, hass, serial_path):
"""Initialize the EnOcean dongle."""
self._communicator = SerialCommunicator(
port=serial_path, callback=self.callback
)
self.serial_path = serial_path
self.identifier = basename(normpath(serial_path))
self.hass = hass
self.dispatcher_disconnect_handle = None
async def async_setup(self):
"""Finish the setup of the bridge and supported platforms."""
self._communicator.start()
self.dispatcher_disconnect_handle = async_dispatcher_connect(
self.hass, SIGNAL_SEND_MESSAGE, self._send_message_callback
)
def unload(self):
"""Disconnect callbacks established at init time."""
if self.dispatcher_disconnect_handle:
self.dispatcher_disconnect_handle()
self.dispatcher_disconnect_handle = None
def _send_message_callback(self, command):
"""Send a command through the EnOcean dongle."""
self._communicator.send(command)
def send_message(self, command):
"""Send a command through the EnOcean dongle (public)."""
self._communicator.send(command)
@property
def communicator(self):
"""Set the communicator."""
return self._communicator
def callback(self, packet):
"""Handle EnOcean device's callback.
This is the callback function called by python-enocan whenever there
is an incoming packet.
"""
if isinstance(packet, RadioPacket):
_LOGGER.debug("Received radio packet: %s", packet)
dispatcher_send(self.hass, SIGNAL_RECEIVE_MESSAGE, packet)
def detect():
"""Return a list of candidate paths for USB ENOcean dongles.
This method is currently a bit simplistic, it may need to be
improved to support more configurations and OS.
"""
globs_to_test = ["/dev/tty*FTOA2PV*", "/dev/serial/by-id/*EnOcean*"]
found_paths = []
for current_glob in globs_to_test:
found_paths.extend(glob.glob(current_glob))
return found_paths
def validate_path(path: str):
"""Return True if the provided path points to a valid serial port, False otherwise."""
try:
# Creating the serial communicator will raise an exception
# if it cannot connect
SerialCommunicator(port=path)
return True
except serial.SerialException as exception:
_LOGGER.warning("Dongle path %s is invalid: %s", path, str(exception))
return False