diff --git a/nibe/connection/nibegw.py b/nibe/connection/nibegw.py index b3163a5..2fcf7f5 100644 --- a/nibe/connection/nibegw.py +++ b/nibe/connection/nibegw.py @@ -2,6 +2,7 @@ from asyncio import CancelledError, Future, InvalidStateError from binascii import hexlify from contextlib import suppress +import errno from functools import reduce from io import BytesIO from ipaddress import ip_address @@ -9,7 +10,7 @@ from operator import xor import socket import struct -from typing import Container, Dict, Union +from typing import Any, Container, Dict, Optional, Union from construct import ( Adapter, @@ -25,6 +26,7 @@ Flag, FlagsEnum, GreedyBytes, + GreedyRange, GreedyString, IfThenElse, Int8sb, @@ -51,6 +53,7 @@ from nibe.connection.encoders import CoilDataEncoder from nibe.event_server import EventServer from nibe.exceptions import ( + AddressInUseException, CoilNotFoundException, CoilReadException, CoilReadTimeoutException, @@ -59,6 +62,7 @@ DecodeException, NibeException, ProductInfoReadTimeoutException, + WriteException, ) from nibe.heatpump import HeatPump, ProductInfo @@ -84,13 +88,14 @@ class NibeGW(asyncio.DatagramProtocol, Connection, EventServer): def __init__( self, heatpump: HeatPump, - remote_ip: str, + remote_ip: Optional[str] = None, remote_read_port: int = 9999, remote_write_port: int = 10000, listening_ip: str = "0.0.0.0", listening_port: int = 9999, read_retries: int = 3, write_retries: int = 3, + rmu_write_ports: Optional[Dict[int, int]] = None, ) -> None: super().__init__() @@ -99,8 +104,22 @@ def __init__( self._listening_port = listening_port self._remote_ip = remote_ip - self._remote_read_port = remote_read_port - self._remote_write_port = remote_write_port + self._request_ports = {} + + if remote_read_port: + self._request_ports[ + (Address.MODBUS40, Command.MODBUS_READ_REQ) + ] = remote_read_port + + if remote_write_port: + self._request_ports[ + (Address.MODBUS40, Command.MODBUS_WRITE_REQ) + ] = remote_write_port + + for rmu_index, rmu_write_port in (rmu_write_ports or {}).items(): + self._request_ports[ + (RMU_INDEX_TO_ADDRESS[rmu_index], Command.RMU_WRITE_REQ) + ] = rmu_write_port self._transport = None self._status = ConnectionStatus.UNKNOWN @@ -134,22 +153,28 @@ async def start(self): proto=socket.IPPROTO_UDP, family=socket.AddressFamily.AF_INET, )[0] + sock = socket.socket(family, type, proto) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - if ip_address(sockaddr[0]).is_multicast: - group_bin = socket.inet_pton(family, sockaddr[0]) - if family == socket.AF_INET: # IPv4 - sock.bind(("", sockaddr[1])) - mreq = group_bin + struct.pack("=I", socket.INADDR_ANY) - sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + try: + if ip_address(sockaddr[0]).is_multicast: + group_bin = socket.inet_pton(family, sockaddr[0]) + if family == socket.AF_INET: # IPv4 + sock.bind(("", sockaddr[1])) + mreq = group_bin + struct.pack("=I", socket.INADDR_ANY) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + else: + sock.bind(("", sockaddr[1])) + mreq = group_bin + struct.pack("@I", 0) + sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) + elif self._listening_ip: + sock.bind(sockaddr) else: sock.bind(("", sockaddr[1])) - mreq = group_bin + struct.pack("@I", 0) - sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) - elif self._listening_ip: - sock.bind(sockaddr) - else: - sock.bind(("", sockaddr[1])) + except OSError as exception: + if exception.errno == errno.EADDRINUSE: + raise AddressInUseException(f"Address in use {sockaddr}") + raise await asyncio.get_event_loop().create_datagram_endpoint(lambda: self, sock=sock) @@ -159,9 +184,15 @@ def connection_made(self, transport): def datagram_received(self, data: bytes, addr): logger.debug(f"Received {hexlify(data).decode('utf-8')} from {addr}") - self._set_status(ConnectionStatus.CONNECTED) try: msg = Response.parse(data) + + if not self._remote_ip: + logger.debug("Pump discovered at %s", addr) + self._remote_ip = addr[0] + + self._set_status(ConnectionStatus.CONNECTED) + logger.debug(msg.fields.value) cmd = msg.fields.value.cmd if cmd == "MODBUS_DATA_MSG": @@ -190,6 +221,11 @@ def datagram_received(self, data: bytes, addr): elif cmd == "PRODUCT_INFO_MSG": with suppress(InvalidStateError, CancelledError, KeyError): self._futures["product_info"].set_result(msg.fields.value.data) + elif cmd == "NIBEGW_PORTS": + request_ports = {} + for row in msg.fields.value.data: + request_ports[(row.address, row.cmd)] = row.port + self._request_ports = request_ports elif not isinstance(cmd, EnumIntegerString): logger.debug(f"Unknown command {cmd}") except ChecksumError: @@ -216,6 +252,15 @@ async def read_product_info( finally: del self._futures["product_info"] + def _send_request(self, address: int, command: int, data: bytes): + remote_port = self._request_ports[(address, command)] + + logger.debug( + f"Sending {hexlify(data).decode('utf-8')} to {self._remote_ip}:{remote_port}" + ) + + self._transport.sendto(data, (self._remote_ip, remote_port)) + async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: async with self._send_lock: assert self._transport, "Transport is closed" @@ -231,10 +276,11 @@ async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: self._futures["read"] = asyncio.get_event_loop().create_future() - logger.debug( - f"Sending {hexlify(data).decode('utf-8')} (read request) to {self._remote_ip}:{self._remote_write_port}" - ) - self._transport.sendto(data, (self._remote_ip, self._remote_read_port)) + try: + self._send_request(Address.MODBUS40, Command.MODBUS_READ_REQ, data) + except socket.gaierror: + raise CoilReadException(f"Unable to lookup hostname: {self._remote_ip}") + logger.debug(f"Waiting for read response for {coil.name}") try: @@ -269,10 +315,12 @@ async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil self._futures["write"] = asyncio.get_event_loop().create_future() - logger.debug( - f"Sending {hexlify(data).decode('utf-8')} (write request) to {self._remote_ip}:{self._remote_write_port}" - ) - self._transport.sendto(data, (self._remote_ip, self._remote_write_port)) + try: + self._send_request(Address.MODBUS40, Command.MODBUS_WRITE_REQ, data) + except socket.gaierror: + raise CoilWriteException( + f"Unable to lookup hostname: {self._remote_ip}" + ) try: await asyncio.wait_for(self._futures["write"], timeout) @@ -292,6 +340,30 @@ async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil return coil + async def _send_rmu_write_req(self, address: int, index: str, value: Any): + async with self._send_lock: + assert self._transport, "Transport is closed" + data = Request.build( + dict( + fields=dict( + value=dict( + cmd="RMU_WRITE_REQ", + data=dict(index=index, value=value), + ) + ) + ) + ) + + try: + self._send_request(address, Command.RMU_WRITE_REQ, data) + except socket.gaierror: + raise WriteException(f"Unable to lookup hostname: {self._remote_ip}") + + async def write_rmu_temperature(self, index: int, temperature: float): + await self._send_rmu_write_req( + RMU_INDEX_TO_ADDRESS[index], "TEMPERATURE", temperature + ) + def error_received(self, exc): logger.error(exc) @@ -299,6 +371,10 @@ def error_received(self, exc): def status(self) -> ConnectionStatus: return self._status + @property + def remote_ip(self) -> str: + return self._remote_ip + def _set_status(self, status: ConnectionStatus): if status != self._status: self._status = status @@ -445,7 +521,7 @@ def _decode(self, obj, context, path): return round(obj * self._scale + self._offset, self._ndigits) def _encode(self, obj, context, path): - return (obj - self._offset) / self._scale + return round((obj - self._offset) / self._scale) StringData = Struct( @@ -533,6 +609,13 @@ def _encode(self, obj, context, path): "PRODUCT_INFO_MSG": ProductInfoData, "RMU_DATA_MSG": RmuData, "STRING_MSG": StringData, + "NIBEGW_PORTS": GreedyRange( + Struct( + "address" / Int8ub, + "cmd" / Int8ub, + "port" / Int16ul, + ) + ), }, default=Bytes(this.length), ) @@ -556,6 +639,7 @@ def _encode(self, obj, context, path): ECS_DATA_MSG_1=0x55, ECS_DATA_MSG_2=0xA0, STRING_MSG=0xB1, + NIBEGW_PORTS=0xFF, # Virtual messages used for discovery ) Address = Enum( @@ -585,6 +669,12 @@ def _encode(self, obj, context, path): Address.RMU40_S4: 40030, } +RMU_INDEX_TO_ADDRESS = { + 1: Address.RMU40_S1, + 2: Address.RMU40_S2, + 3: Address.RMU40_S3, + 4: Address.RMU40_S4, +} # fmt: off Response = Struct( diff --git a/nibe/exceptions.py b/nibe/exceptions.py index a6fae0d..287ef95 100644 --- a/nibe/exceptions.py +++ b/nibe/exceptions.py @@ -2,6 +2,10 @@ class NibeException(Exception): pass +class AddressInUseException(NibeException): + pass + + class CoilNotFoundException(NibeException): pass diff --git a/nibe/heatpump.py b/nibe/heatpump.py index 015137c..88e4aef 100644 --- a/nibe/heatpump.py +++ b/nibe/heatpump.py @@ -90,6 +90,9 @@ class HeatPump(EventServer): def __init__(self, model: Model = None): super().__init__() + self._address_to_coil = {} + self._name_to_coil = {} + if model is not None: self.model = model diff --git a/tests/connection/test_nibegw.py b/tests/connection/test_nibegw.py index c95b086..64ce143 100644 --- a/tests/connection/test_nibegw.py +++ b/tests/connection/test_nibegw.py @@ -16,7 +16,7 @@ async def asyncSetUp(self) -> None: self.heatpump = HeatPump(Model.F1255) await self.heatpump.initialize() - self.nibegw = NibeGW(self.heatpump, "127.0.0.1") + self.nibegw = NibeGW(self.heatpump, "127.0.0.1", rmu_write_ports={1: 10001}) self.transport = Mock() assert self.nibegw.status == "unknown" @@ -112,6 +112,13 @@ async def send_receive(): binascii.unhexlify("c06b0604bc0400000011"), ("127.0.0.1", 10000) ) + async def test_write_rmu_temperature(self): + await self.nibegw.write_rmu_temperature(1, 20.0) + + self.transport.sendto.assert_called_with( + binascii.unhexlify("c0600306cf006a"), ("127.0.0.1", 10001) + ) + async def test_read_product_info(self): async def read_product_info(): task = self.loop.create_task(self.nibegw.read_product_info()) diff --git a/tests/connection/test_nibegw_message_parsing.py b/tests/connection/test_nibegw_message_parsing.py index 969ddd9..52cf39e 100644 --- a/tests/connection/test_nibegw_message_parsing.py +++ b/tests/connection/test_nibegw_message_parsing.py @@ -227,6 +227,17 @@ def test_parse_strings(self): self.assertEqual(data.data.id, 10040) self.assertEqual(data.data.string, "värme") + def test_parse_nibegw_ports(self): + data = self._parse_hexlified_raw_message( + "5c0020ff08" "20690f27" "206b1027" "ca" + ) + assert data.address == "MODBUS40" + assert data.cmd == "NIBEGW_PORTS" + assert data.data == [ + dict(address=0x20, cmd=0x69, port=9999), + dict(address=0x20, cmd=0x6B, port=10000), + ] + @staticmethod def _parse_hexlified_raw_message(txt_raw): raw = binascii.unhexlify(txt_raw)