Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rmu writes and nibegw discovery #62

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 116 additions & 26 deletions nibe/connection/nibegw.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from asyncio import CancelledError, Future, InvalidStateError
from binascii import hexlify
from contextlib import suppress
import errno
from functools import reduce
from io import BytesIO
from ipaddress import ip_address
import logging
from operator import xor
import socket
import struct
from typing import Container, Dict, Union
from typing import Any, Container, Dict, Optional, Union

from construct import (
Adapter,
Expand All @@ -25,6 +26,7 @@
Flag,
FlagsEnum,
GreedyBytes,
GreedyRange,
GreedyString,
IfThenElse,
Int8sb,
Expand All @@ -51,6 +53,7 @@
from nibe.connection.encoders import CoilDataEncoder
from nibe.event_server import EventServer
from nibe.exceptions import (
AddressInUseException,
CoilNotFoundException,
CoilReadException,
CoilReadTimeoutException,
Expand All @@ -59,6 +62,7 @@
DecodeException,
NibeException,
ProductInfoReadTimeoutException,
WriteException,
)
from nibe.heatpump import HeatPump, ProductInfo

Expand All @@ -84,13 +88,14 @@ class NibeGW(asyncio.DatagramProtocol, Connection, EventServer):
def __init__(
self,
heatpump: HeatPump,
remote_ip: str,
remote_ip: Optional[str] = None,
remote_read_port: int = 9999,
remote_write_port: int = 10000,
listening_ip: str = "0.0.0.0",
listening_port: int = 9999,
read_retries: int = 3,
write_retries: int = 3,
rmu_write_ports: Optional[Dict[int, int]] = None,
) -> None:
super().__init__()

Expand All @@ -99,8 +104,22 @@ def __init__(
self._listening_port = listening_port

self._remote_ip = remote_ip
self._remote_read_port = remote_read_port
self._remote_write_port = remote_write_port
self._request_ports = {}

if remote_read_port:
self._request_ports[
(Address.MODBUS40, Command.MODBUS_READ_REQ)
] = remote_read_port

if remote_write_port:
self._request_ports[
(Address.MODBUS40, Command.MODBUS_WRITE_REQ)
] = remote_write_port

for rmu_index, rmu_write_port in (rmu_write_ports or {}).items():
self._request_ports[
(RMU_INDEX_TO_ADDRESS[rmu_index], Command.RMU_WRITE_REQ)
] = rmu_write_port

self._transport = None
self._status = ConnectionStatus.UNKNOWN
Expand Down Expand Up @@ -134,22 +153,28 @@ async def start(self):
proto=socket.IPPROTO_UDP,
family=socket.AddressFamily.AF_INET,
)[0]

sock = socket.socket(family, type, proto)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if ip_address(sockaddr[0]).is_multicast:
group_bin = socket.inet_pton(family, sockaddr[0])
if family == socket.AF_INET: # IPv4
sock.bind(("", sockaddr[1]))
mreq = group_bin + struct.pack("=I", socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
try:
if ip_address(sockaddr[0]).is_multicast:
group_bin = socket.inet_pton(family, sockaddr[0])
if family == socket.AF_INET: # IPv4
sock.bind(("", sockaddr[1]))

Check warning

Code scanning / CodeQL

Binding a socket to all network interfaces

'' binds a socket to all interfaces.
mreq = group_bin + struct.pack("=I", socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
else:
sock.bind(("", sockaddr[1]))

Check warning

Code scanning / CodeQL

Binding a socket to all network interfaces

'' binds a socket to all interfaces.
mreq = group_bin + struct.pack("@I", 0)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
elif self._listening_ip:
sock.bind(sockaddr)
else:
sock.bind(("", sockaddr[1]))
mreq = group_bin + struct.pack("@I", 0)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
elif self._listening_ip:
sock.bind(sockaddr)
else:
sock.bind(("", sockaddr[1]))
except OSError as exception:
if exception.errno == errno.EADDRINUSE:
raise AddressInUseException(f"Address in use {sockaddr}")
raise

await asyncio.get_event_loop().create_datagram_endpoint(lambda: self, sock=sock)

Expand All @@ -159,9 +184,15 @@ def connection_made(self, transport):

def datagram_received(self, data: bytes, addr):
logger.debug(f"Received {hexlify(data).decode('utf-8')} from {addr}")
self._set_status(ConnectionStatus.CONNECTED)
try:
msg = Response.parse(data)

if not self._remote_ip:
logger.debug("Pump discovered at %s", addr)
self._remote_ip = addr[0]

self._set_status(ConnectionStatus.CONNECTED)

logger.debug(msg.fields.value)
cmd = msg.fields.value.cmd
if cmd == "MODBUS_DATA_MSG":
Expand Down Expand Up @@ -190,6 +221,11 @@ def datagram_received(self, data: bytes, addr):
elif cmd == "PRODUCT_INFO_MSG":
with suppress(InvalidStateError, CancelledError, KeyError):
self._futures["product_info"].set_result(msg.fields.value.data)
elif cmd == "NIBEGW_PORTS":
request_ports = {}
for row in msg.fields.value.data:
request_ports[(row.address, row.cmd)] = row.port
self._request_ports = request_ports
elif not isinstance(cmd, EnumIntegerString):
logger.debug(f"Unknown command {cmd}")
except ChecksumError:
Expand All @@ -216,6 +252,15 @@ async def read_product_info(
finally:
del self._futures["product_info"]

def _send_request(self, address: int, command: int, data: bytes):
remote_port = self._request_ports[(address, command)]

logger.debug(
f"Sending {hexlify(data).decode('utf-8')} to {self._remote_ip}:{remote_port}"
)

self._transport.sendto(data, (self._remote_ip, remote_port))

async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil:
async with self._send_lock:
assert self._transport, "Transport is closed"
Expand All @@ -231,10 +276,11 @@ async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil:

self._futures["read"] = asyncio.get_event_loop().create_future()

logger.debug(
f"Sending {hexlify(data).decode('utf-8')} (read request) to {self._remote_ip}:{self._remote_write_port}"
)
self._transport.sendto(data, (self._remote_ip, self._remote_read_port))
try:
self._send_request(Address.MODBUS40, Command.MODBUS_READ_REQ, data)
except socket.gaierror:
raise CoilReadException(f"Unable to lookup hostname: {self._remote_ip}")

logger.debug(f"Waiting for read response for {coil.name}")

try:
Expand Down Expand Up @@ -269,10 +315,12 @@ async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil

self._futures["write"] = asyncio.get_event_loop().create_future()

logger.debug(
f"Sending {hexlify(data).decode('utf-8')} (write request) to {self._remote_ip}:{self._remote_write_port}"
)
self._transport.sendto(data, (self._remote_ip, self._remote_write_port))
try:
self._send_request(Address.MODBUS40, Command.MODBUS_WRITE_REQ, data)
except socket.gaierror:
raise CoilWriteException(
f"Unable to lookup hostname: {self._remote_ip}"
)

try:
await asyncio.wait_for(self._futures["write"], timeout)
Expand All @@ -292,13 +340,41 @@ async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil

return coil

async def _send_rmu_write_req(self, address: int, index: str, value: Any):
async with self._send_lock:
assert self._transport, "Transport is closed"
data = Request.build(
dict(
fields=dict(
value=dict(
cmd="RMU_WRITE_REQ",
data=dict(index=index, value=value),
)
)
)
)

try:
self._send_request(address, Command.RMU_WRITE_REQ, data)
except socket.gaierror:
raise WriteException(f"Unable to lookup hostname: {self._remote_ip}")

async def write_rmu_temperature(self, index: int, temperature: float):
await self._send_rmu_write_req(
RMU_INDEX_TO_ADDRESS[index], "TEMPERATURE", temperature
)

def error_received(self, exc):
logger.error(exc)

@property
def status(self) -> ConnectionStatus:
return self._status

@property
def remote_ip(self) -> str:
return self._remote_ip

def _set_status(self, status: ConnectionStatus):
if status != self._status:
self._status = status
Expand Down Expand Up @@ -445,7 +521,7 @@ def _decode(self, obj, context, path):
return round(obj * self._scale + self._offset, self._ndigits)

def _encode(self, obj, context, path):
return (obj - self._offset) / self._scale
return round((obj - self._offset) / self._scale)


StringData = Struct(
Expand Down Expand Up @@ -533,6 +609,13 @@ def _encode(self, obj, context, path):
"PRODUCT_INFO_MSG": ProductInfoData,
"RMU_DATA_MSG": RmuData,
"STRING_MSG": StringData,
"NIBEGW_PORTS": GreedyRange(
Struct(
"address" / Int8ub,
"cmd" / Int8ub,
"port" / Int16ul,
)
),
},
default=Bytes(this.length),
)
Expand All @@ -556,6 +639,7 @@ def _encode(self, obj, context, path):
ECS_DATA_MSG_1=0x55,
ECS_DATA_MSG_2=0xA0,
STRING_MSG=0xB1,
NIBEGW_PORTS=0xFF, # Virtual messages used for discovery
)

Address = Enum(
Expand Down Expand Up @@ -585,6 +669,12 @@ def _encode(self, obj, context, path):
Address.RMU40_S4: 40030,
}

RMU_INDEX_TO_ADDRESS = {
1: Address.RMU40_S1,
2: Address.RMU40_S2,
3: Address.RMU40_S3,
4: Address.RMU40_S4,
}

# fmt: off
Response = Struct(
Expand Down
4 changes: 4 additions & 0 deletions nibe/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ class NibeException(Exception):
pass


class AddressInUseException(NibeException):
pass


class CoilNotFoundException(NibeException):
pass

Expand Down
3 changes: 3 additions & 0 deletions nibe/heatpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class HeatPump(EventServer):
def __init__(self, model: Model = None):
super().__init__()

self._address_to_coil = {}
self._name_to_coil = {}

if model is not None:
self.model = model

Expand Down
9 changes: 8 additions & 1 deletion tests/connection/test_nibegw.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def asyncSetUp(self) -> None:

self.heatpump = HeatPump(Model.F1255)
await self.heatpump.initialize()
self.nibegw = NibeGW(self.heatpump, "127.0.0.1")
self.nibegw = NibeGW(self.heatpump, "127.0.0.1", rmu_write_ports={1: 10001})

self.transport = Mock()
assert self.nibegw.status == "unknown"
Expand Down Expand Up @@ -112,6 +112,13 @@ async def send_receive():
binascii.unhexlify("c06b0604bc0400000011"), ("127.0.0.1", 10000)
)

async def test_write_rmu_temperature(self):
await self.nibegw.write_rmu_temperature(1, 20.0)

self.transport.sendto.assert_called_with(
binascii.unhexlify("c0600306cf006a"), ("127.0.0.1", 10001)
)

async def test_read_product_info(self):
async def read_product_info():
task = self.loop.create_task(self.nibegw.read_product_info())
Expand Down
11 changes: 11 additions & 0 deletions tests/connection/test_nibegw_message_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ def test_parse_strings(self):
self.assertEqual(data.data.id, 10040)
self.assertEqual(data.data.string, "värme")

def test_parse_nibegw_ports(self):
data = self._parse_hexlified_raw_message(
"5c0020ff08" "20690f27" "206b1027" "ca"
)
assert data.address == "MODBUS40"
assert data.cmd == "NIBEGW_PORTS"
assert data.data == [
dict(address=0x20, cmd=0x69, port=9999),
dict(address=0x20, cmd=0x6B, port=10000),
]

@staticmethod
def _parse_hexlified_raw_message(txt_raw):
raw = binascii.unhexlify(txt_raw)
Expand Down