Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pail23 committed Jan 3, 2024
1 parent 349294e commit bc13ae5
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 32 deletions.
6 changes: 6 additions & 0 deletions custom_components/stiebel_eltron_isg/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
ACTUAL_TEMPERATURE_FEK = "actual_temperature_fek"
TARGET_TEMPERATURE_FEK = "target_temperature_fek"
ACTUAL_HUMIDITY = "actual_humidity"
ACTUAL_HUMIDITY_HK1 = "actual_humidity_hk1"
ACTUAL_HUMIDITY_HK2 = "actual_humidity_hk2"
ACTUAL_HUMIDITY_HK3 = "actual_humidity_hk3"
DEWPOINT_TEMPERATURE = "dew_point_temperature"
DEWPOINT_TEMPERATURE_HK1 = "dew_point_temperature_hk1"
DEWPOINT_TEMPERATURE_HK2 = "dew_point_temperature_hk2"
DEWPOINT_TEMPERATURE_HK3 = "dew_point_temperature_hk3"
OUTDOOR_TEMPERATURE = "outdoor_temperature"
ACTUAL_TEMPERATURE_HK1 = "actual_temperature_hk1"
TARGET_TEMPERATURE_HK1 = "target_temperature_hk1"
Expand Down
18 changes: 18 additions & 0 deletions custom_components/stiebel_eltron_isg/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
)

from .const import (
ACTUAL_HUMIDITY_HK1,
ACTUAL_HUMIDITY_HK2,
ACTUAL_HUMIDITY_HK3,
DEWPOINT_TEMPERATURE_HK1,
DEWPOINT_TEMPERATURE_HK2,
DEWPOINT_TEMPERATURE_HK3,
DOMAIN,
ACTUAL_TEMPERATURE,
TARGET_TEMPERATURE,
Expand Down Expand Up @@ -133,9 +139,21 @@ def create_volume_stream_entity_description(name, key):
"Target Temperature FEK", TARGET_TEMPERATURE_FEK
),
create_humidity_entity_description("Humidity", ACTUAL_HUMIDITY),
create_humidity_entity_description("Humidity HK 1", ACTUAL_HUMIDITY_HK1),
create_humidity_entity_description("Humidity HK 2", ACTUAL_HUMIDITY_HK2),
create_humidity_entity_description("Humidity HK 3", ACTUAL_HUMIDITY_HK3),
create_temperature_entity_description(
"Dew Point Temperature", DEWPOINT_TEMPERATURE
),
create_temperature_entity_description(
"Dew Point Temperature HK 1", DEWPOINT_TEMPERATURE_HK1
),
create_temperature_entity_description(
"Dew Point Temperature HK 2", DEWPOINT_TEMPERATURE_HK2
),
create_temperature_entity_description(
"Dew Point Temperature HK 3", DEWPOINT_TEMPERATURE_HK3
),
create_temperature_entity_description("Outdoor Temperature", OUTDOOR_TEMPERATURE),
create_temperature_entity_description(
"Actual Temperature HK 1", ACTUAL_TEMPERATURE_HK1
Expand Down
29 changes: 26 additions & 3 deletions custom_components/stiebel_eltron_isg/wpm_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
from pymodbus.payload import BinaryPayloadDecoder

from .const import (
ACTUAL_HUMIDITY_HK1,
ACTUAL_HUMIDITY_HK2,
ACTUAL_HUMIDITY_HK3,
ACTUAL_TEMPERATURE,
DEWPOINT_TEMPERATURE_HK1,
DEWPOINT_TEMPERATURE_HK2,
DEWPOINT_TEMPERATURE_HK3,
TARGET_TEMPERATURE,
ACTUAL_TEMPERATURE_FEK,
TARGET_TEMPERATURE_FEK,
Expand Down Expand Up @@ -140,7 +146,7 @@ def read_modbus_system_state(self) -> dict:
def read_modbus_system_values(self) -> dict:
"""Read the system related values from the ISG."""
result = {}
inverter_data = self.read_input_registers(slave=1, address=500, count=96) # 42
inverter_data = self.read_input_registers(slave=1, address=500, count=96)
if not inverter_data.isError():
decoder = BinaryPayloadDecoder.fromRegisters(
inverter_data.registers, byteorder=Endian.BIG
Expand Down Expand Up @@ -226,20 +232,37 @@ def read_modbus_system_values(self) -> dict:
result[TARGET_ROOM_TEMPERATURE_HK1] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
decoder.skip_bytes(4)
result[ACTUAL_HUMIDITY_HK1] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[DEWPOINT_TEMPERATURE_HK1] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[ACTUAL_ROOM_TEMPERATURE_HK2] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[TARGET_ROOM_TEMPERATURE_HK2] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
decoder.skip_bytes(4)
result[ACTUAL_HUMIDITY_HK2] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[DEWPOINT_TEMPERATURE_HK2] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[ACTUAL_ROOM_TEMPERATURE_HK3] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[TARGET_ROOM_TEMPERATURE_HK3] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[ACTUAL_HUMIDITY_HK3] = get_isg_scaled_value(
decoder.decode_16bit_int()
)
result[DEWPOINT_TEMPERATURE_HK3] = get_isg_scaled_value(
decoder.decode_16bit_int()
)

result["system_values"] = list(inverter_data.registers)
return result

Expand Down
58 changes: 37 additions & 21 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
# See here for more info: https://docs.pytest.org/en/latest/fixture.html (note that
# pytest includes fixtures OOB which you can use as defined on this page)
from unittest.mock import patch
from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse
from pymodbus.register_read_message import (
ReadHoldingRegistersResponse,
ReadInputRegistersResponse,
)
import pytest
from typing import Any

Expand Down Expand Up @@ -47,9 +50,7 @@ def skip_notifications_fixture():
@pytest.fixture(name="skip_connect")
def skip_connect_fixture():
"""Skip calls to get data from API."""
with patch(
"pymodbus.client.ModbusTcpClient.read_input_registers", return_value={}
):
with patch("pymodbus.client.ModbusTcpClient.read_input_registers", return_value={}):
yield


Expand All @@ -59,34 +60,42 @@ def skip_connect_fixture():
def bypass_get_data_fixture():
"""Skip calls to get data from API."""
with patch(
"custom_components.stiebel_eltron_isg.StiebelEltronModbusDataCoordinator._async_update_data", return_value={}
"custom_components.stiebel_eltron_isg.coordinator.StiebelEltronModbusDataCoordinator._async_update_data",
return_value={},
):
yield

def read_input_register(register, start, count)->ReadInputRegistersResponse:

def read_input_register(register, start, count) -> ReadInputRegistersResponse:
"""Read a slice from the input register."""
return ReadInputRegistersResponse(register[start: start + count])
return ReadInputRegistersResponse(register[start : start + count])

def read_input_registers_wpm(address: int, count: int = 1, slave: int = 0, **kwargs: Any):

def read_input_registers_wpm(
address: int, count: int = 1, slave: int = 0, **kwargs: Any
):
"""Simulate reads on the input registers on wpm models."""
system_info = [2, 390]
if address >= 5000:
return read_input_register(system_info, address - 5000, count)
else:
return ReadInputRegistersResponse(list(range(0,count)))
return ReadInputRegistersResponse(list(range(0, count)))


def read_input_registers_lwz(address: int, count: int = 1, slave: int = 0, **kwargs: Any):
def read_input_registers_lwz(
address: int, count: int = 1, slave: int = 0, **kwargs: Any
):
"""Simulate reads on the input registers on lwz models ."""
system_info = [2, 103]
if address >= 5000:
return read_input_register(system_info, address - 5000, count)
else:
return ReadInputRegistersResponse(list(range(0,count)))
return ReadInputRegistersResponse(list(range(0, count)))


def read_holding_registers(address: int, count: int = 1, slave: int = 0, **kwargs: Any):
"""Simulate reads on the holding registers on lwz models ."""
return ReadHoldingRegistersResponse(list(range(0,count)))
return ReadHoldingRegistersResponse(list(range(0, count)))


# This fixture, when used, will result in calls to read_input_registers to return mock data. To have the call
Expand All @@ -95,33 +104,37 @@ def read_holding_registers(address: int, count: int = 1, slave: int = 0, **kwarg
def modbus_wpm_fixture():
"""Skip calls to get data from API."""
with patch(
"pymodbus.client.ModbusTcpClient.read_input_registers", side_effect=read_input_registers_wpm
"pymodbus.client.ModbusTcpClient.read_input_registers",
side_effect=read_input_registers_wpm,
), patch(
"pymodbus.client.ModbusTcpClient.read_holding_registers", side_effect=read_holding_registers
"pymodbus.client.ModbusTcpClient.read_holding_registers",
side_effect=read_holding_registers,
), patch("pymodbus.client.ModbusTcpClient.connect"):
yield


# This fixture, when used, will result in calls to read_holding_registers to return mock data. To have the call
# return a value, we would add the `return_value=<VALUE_TO_RETURN>` parameter to the patch call.
@pytest.fixture(name="mock_modbus_lwz")
def modbus_lwz_fixture():
"""Skip calls to get data from API."""
with patch(
"pymodbus.client.ModbusTcpClient.read_input_registers", side_effect=read_input_registers_lwz
with patch(
"pymodbus.client.ModbusTcpClient.read_input_registers",
side_effect=read_input_registers_lwz,
), patch(
"pymodbus.client.ModbusTcpClient.read_holding_registers", side_effect=read_holding_registers
"pymodbus.client.ModbusTcpClient.read_holding_registers",
side_effect=read_holding_registers,
), patch("pymodbus.client.ModbusTcpClient.connect"):
yield



# In this fixture, we are forcing calls to async_get_data to raise an Exception. This is useful
# for exception handling.
@pytest.fixture(name="error_on_get_data")
def error_get_data_fixture():
"""Simulate error when retrieving data from API."""
with patch(
"custom_components.stiebel_eltron_isg.StiebelEltronModbusWPMDataCoordinator.read_modbus_data",
"custom_components.stiebel_eltron_isg.wpm_coordinator.StiebelEltronModbusWPMDataCoordinator.read_modbus_data",
side_effect=Exception,
):
yield
Expand All @@ -133,16 +146,19 @@ def error_get_data_fixture():
def get_model_wpm_fixture():
"""Skip calls to get data from API."""
with patch(
"custom_components.stiebel_eltron_isg.get_controller_model", return_value=391
"custom_components.stiebel_eltron_isg.coordinator.get_controller_model",
return_value=391,
):
yield


# This fixture, when used, will result in calls to async_get_data to return None. To have the call
# return a value, we would add the `return_value=<VALUE_TO_RETURN>` parameter to the patch call.
@pytest.fixture(name="get_model_lwz")
def get_model_lwz_fixture():
"""Skip calls to get data from API."""
with patch(
"custom_components.stiebel_eltron_isg.get_controller_model", return_value=103
"custom_components.stiebel_eltron_isg.coordinator.get_controller_model",
return_value=103,
):
yield
18 changes: 10 additions & 8 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import pytest
from pytest_homeassistant_custom_component.common import MockConfigEntry

from custom_components.stiebel_eltron_isg import (
StiebelEltronModbusWPMDataCoordinator
from custom_components.stiebel_eltron_isg.wpm_coordinator import (
StiebelEltronModbusWPMDataCoordinator,
)
from custom_components.stiebel_eltron_isg.const import DOMAIN

Expand All @@ -18,7 +18,9 @@
# Assertions allow you to verify that the return value of whatever is on the left
# side of the assertion matches with the right side.
@pytest.mark.asyncio
async def test_setup_unload_and_reload_entry(hass: HomeAssistant, bypass_get_data, get_model_wpm):
async def test_setup_unload_and_reload_entry(
hass: HomeAssistant, bypass_get_data, get_model_wpm
):
"""Test entry setup and unload."""
# Create a mock entry so we don't have to go through config flow
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG, entry_id="test")
Expand All @@ -40,7 +42,6 @@ async def test_setup_unload_and_reload_entry(hass: HomeAssistant, bypass_get_dat
assert config_entry.state is ConfigEntryState.NOT_LOADED



@pytest.mark.asyncio
async def test_data_coordinator_wpm(hass: HomeAssistant, mock_modbus_wpm):
"""Test creating a data coordinator for wpm models."""
Expand All @@ -54,20 +55,21 @@ async def test_data_coordinator_wpm(hass: HomeAssistant, mock_modbus_wpm):

state = hass.states.get("sensor.stiebel_eltron_isg_actual_temperature_fek")
assert state is not None
assert state.state == '0.2'
assert state.state == "0.2"
await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.NOT_LOADED
assert config_entry.state is ConfigEntryState.NOT_LOADED


@pytest.mark.asyncio
async def test_data_coordinator_lwz(hass: HomeAssistant, mock_modbus_lwz):
"""Test creating a data coordinator for lwz models."""
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG, entry_id="test_lwz")
config_entry.add_to_hass(hass)

#assert await async_setup_entry(hass, config_entry)
#assert DOMAIN in hass.data and config_entry.entry_id in hass.data[DOMAIN]
# assert await async_setup_entry(hass, config_entry)
# assert DOMAIN in hass.data and config_entry.entry_id in hass.data[DOMAIN]

await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
Expand All @@ -76,7 +78,7 @@ async def test_data_coordinator_lwz(hass: HomeAssistant, mock_modbus_lwz):

state = hass.states.get("sensor.stiebel_eltron_isg_actual_temperature_fek")
assert state is not None
assert state.state == '0.3'
assert state.state == "0.3"
await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.NOT_LOADED

0 comments on commit bc13ae5

Please sign in to comment.