diff --git a/examples/somersaultlazy.py b/examples/somersaultlazy.py index 601b4750..d7ff756e 100755 --- a/examples/somersaultlazy.py +++ b/examples/somersaultlazy.py @@ -7,14 +7,15 @@ import asyncio import logging import random -from typing import List, Optional, Union +from socket import socket +from typing import List, Optional, Union, cast import isotp +import isotp.tpsock import somersaultecu -import odxtools import odxtools.uds as uds -from odxtools.exceptions import odxrequire +from odxtools.exceptions import DecodeError, odxrequire from odxtools.message import Message from odxtools.odxtypes import ParameterValueDict from odxtools.response import Response, ResponseType @@ -41,11 +42,13 @@ def create_isotp_socket(channel: Optional[str], rxid: int, if is_sterile: return None + assert channel is not None + # create an ISO-TP socket without a timeout (timeouts are handled # using asyncio). Also, for asyncio to work with this socket, it # must be non-blocking... result_socket = isotp.socket(timeout=None) - result_socket._socket.setblocking(0) + result_socket._socket.setblocking(False) # set the ISO-TP flow control options: # @@ -81,7 +84,7 @@ async def ecu_send(isotp_socket: Optional[isotp.tpsock.socket], payload: bytes) assert isotp_socket is not None loop = asyncio.get_running_loop() - await loop.sock_sendall(isotp_socket, payload) + await loop.sock_sendall(cast(socket, isotp_socket), payload) async def ecu_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes: @@ -107,7 +110,7 @@ async def ecu_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes: assert isotp_socket is not None loop = asyncio.get_running_loop() - return await loop.sock_recv(isotp_socket, 4095) + return await loop.sock_recv(cast(socket, isotp_socket), 4095) async def tester_send(isotp_socket: Optional[isotp.tpsock.socket], payload: bytes) -> None: @@ -127,7 +130,7 @@ async def tester_send(isotp_socket: Optional[isotp.tpsock.socket], payload: byte assert isotp_socket is not None loop = asyncio.get_running_loop() - await loop.sock_sendall(isotp_socket, payload) + await loop.sock_sendall(cast(socket, isotp_socket), payload) async def tester_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes: @@ -153,7 +156,7 @@ async def tester_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes: assert isotp_socket is not None loop = asyncio.get_running_loop() - return await loop.sock_recv(isotp_socket, 4095) + return await loop.sock_recv(cast(socket, isotp_socket), 4095) class SomersaultLazyEcu: @@ -202,7 +205,7 @@ async def _handle_requests_task(self) -> None: # that requests can always be uniquely decoded assert len(messages) == 1 await self._handle_request(messages[0]) - except odxtools.exceptions.DecodeError as e: + except DecodeError as e: ecu_logger.warning(f"Could not decode request " f"0x{data.hex()}: {e}") return @@ -382,7 +385,7 @@ async def tester_await_response(isotp_socket: Optional[isotp.tpsock.socket], return replies[0].param_dict - except odxtools.exceptions.DecodeError as e: + except DecodeError as e: if len(raw_response) >= 3: sid = raw_response[0] rq_sid = raw_response[1] diff --git a/odxtools/__init__.py b/odxtools/__init__.py index 31ce01c8..2a24e3bd 100644 --- a/odxtools/__init__.py +++ b/odxtools/__init__.py @@ -62,11 +62,11 @@ - _`[ISO22901]` The ISO 22901 Standard: https://www.iso.org/standard/41207.html """ -from .load_file import load_file as load_file -from .load_odx_d_file import load_odx_d_file as load_odx_d_file -from .load_pdx_file import load_pdx_file as load_pdx_file -from .version import __version__ as __version__ -from .write_pdx_file import write_pdx_file as write_pdx_file +from .load_file import load_file as load_file # noqa: F401 +from .load_odx_d_file import load_odx_d_file as load_odx_d_file # noqa: F401 +from .load_pdx_file import load_pdx_file as load_pdx_file # noqa: F401 +from .version import __version__ as __version__ # noqa: F401 +from .write_pdx_file import write_pdx_file as write_pdx_file # noqa: F401 __author__ = "Katrin Bauer" diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 8be26025..e4645473 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -210,18 +210,18 @@ def _validate_coded_message(self, coded_message: bytes) -> None: stacklevel=1) def convert_physical_to_bytes(self, - param_values: ParameterValue, + physical_value: ParameterValue, encode_state: EncodeState, bit_position: int = 0) -> bytes: - if not isinstance(param_values, dict): + if not isinstance(physical_value, dict): raise EncodeError( f"Expected a dictionary for the values of structure {self.short_name}, " - f"got {type(param_values)}") + f"got {type(physical_value)}") if bit_position != 0: raise EncodeError("Structures must be aligned, i.e. bit_position=0, but " f"{self.short_name} was passed the bit position {bit_position}") return self.convert_physical_to_internal( - param_values, + physical_value, triggering_coded_request=encode_state.triggering_request, is_end_of_pdu=encode_state.is_end_of_pdu, ) diff --git a/odxtools/cli/_parser_utils.py b/odxtools/cli/_parser_utils.py index 4d8f0552..e607a1e2 100644 --- a/odxtools/cli/_parser_utils.py +++ b/odxtools/cli/_parser_utils.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: MIT import argparse -from typing import Any, Protocol, TypeVar, runtime_checkable +from typing import Any, Protocol, runtime_checkable from ..database import Database from ..load_file import load_file as _load_file @@ -22,9 +22,6 @@ def add_parser(self, name: str, **kwargs: Any) -> "argparse.ArgumentParser": ... -TSubparsersAction = TypeVar("TSubparsersAction", bound=SubparsersList) - - def add_pdx_argument(parser: argparse.ArgumentParser, is_optional: bool = False) -> None: parser.add_argument( "pdx_file", diff --git a/odxtools/cli/_print_utils.py b/odxtools/cli/_print_utils.py index 449ec274..e3ce5828 100644 --- a/odxtools/cli/_print_utils.py +++ b/odxtools/cli/_print_utils.py @@ -156,6 +156,7 @@ def extract_parameter_tabulation_data(parameters: List[Parameter]) -> Dict[str, byte.append(param.byte_position) semantic.append(param.semantic) param_type.append(param.parameter_type) + length = 0 if param.get_static_bit_length() is not None: bit_length.append(param.get_static_bit_length()) length = (param.get_static_bit_length() or 0) // 4 diff --git a/odxtools/cli/browse.py b/odxtools/cli/browse.py index 8bc0a5c0..3b17bb3c 100644 --- a/odxtools/cli/browse.py +++ b/odxtools/cli/browse.py @@ -4,7 +4,7 @@ import sys from typing import List, Optional, Union, cast -import InquirerPy.prompt as PI_prompt +import InquirerPy.prompt as IP_prompt from tabulate import tabulate # TODO: switch to rich tables from ..database import Database @@ -20,7 +20,7 @@ from ..request import Request from ..response import Response from . import _parser_utils -from ._parser_utils import TSubparsersAction +from ._parser_utils import SubparsersList from ._print_utils import extract_parameter_tabulation_data # name of the tool @@ -89,13 +89,15 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp # else _convert_string_to_odx_type(x, p.physical_type.base_data_type, param=p) # This does not work because the next parameter to be promted is used (for some reason?) }] - if hasattr(parameter, "dop") and hasattr(parameter.dop, "compu_method") \ - and hasattr(parameter.dop.compu_method, "get_scales"): - scales = parameter.dop.compu_method.get_scales() + if (dop := getattr(parameter, "dop", None)) and \ + (compu_method := getattr(dop, "compu_method", None)): + scales = compu_method.internal_to_phys choices = [scale.compu_const for scale in scales if scale is not None] + if (cdv := compu_method.compu_default_value) is not None: + choices.append(cdv.compu_const) param_prompt[0]["choices"] = choices - answer = PI_prompt(param_prompt) + answer = IP_prompt(param_prompt) if answer.get(parameter.short_name) == "" and not parameter.is_required: return None elif parameter.physical_type.base_data_type is not None: @@ -105,7 +107,7 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp logging.warning( f"Parameter {parameter.short_name} does not have a physical data type. Param details: {parameter}" ) - return answer.get(parameter.short_name) + return cast(str, answer.get(parameter.short_name)) def encode_message_interactively(sub_service: Union[Request, Response], @@ -118,7 +120,7 @@ def encode_message_interactively(sub_service: Union[Request, Response], for param_or_dict in param_dict.values(): if isinstance(param_or_dict, dict): for param in param_or_dict.values(): - if not isinstance(param_or_dict, dict) and param.is_settable: + if isinstance(param, Parameter) and param.is_settable: exists_definable_param = True elif param_or_dict.is_settable: exists_definable_param = True @@ -133,10 +135,11 @@ def encode_message_interactively(sub_service: Union[Request, Response], "message": f"Do you want to encode a message? [y/n]", "choices": ["yes", "no"], }] - answer = PI_prompt(encode_message_prompt) + answer = IP_prompt(encode_message_prompt) if answer.get("yes_no_prompt") == "no": return + answered_request = b'' if isinstance(sub_service, Response): answered_request_prompt = [{ "type": @@ -148,7 +151,7 @@ def encode_message_interactively(sub_service: Union[Request, Response], "filter": lambda input: _convert_string_to_bytes(input), }] - answer = PI_prompt(answered_request_prompt) + answer = IP_prompt(answered_request_prompt) answered_request = cast(bytes, answer.get("request")) print(f"Input interpretation as list: {list(answered_request)}") @@ -268,7 +271,7 @@ def browse(odxdb: Database) -> None: "message": "Select a Variant.", "choices": list(dl_names) + ["[exit]"], }] - answer = PI_prompt(selection) + answer = IP_prompt(selection) if answer.get("variant") == "[exit]": return @@ -306,7 +309,7 @@ def browse(odxdb: Database) -> None: f"The variant {variant.short_name} offers the following services. Select one!", "choices": [s.short_name for s in services] + ["[back]"], }] - answer = PI_prompt(selection) + answer = IP_prompt(selection) if answer.get("service") == "[back]": break @@ -341,7 +344,7 @@ def browse(odxdb: Database) -> None: "short": f"Negative response: {nr.short_name}", } for nr in service.negative_responses] + ["[back]"], # type: ignore }] - answer = PI_prompt(selection) + answer = IP_prompt(selection) if answer.get("message_type") == "[back]": continue @@ -355,7 +358,7 @@ def browse(odxdb: Database) -> None: encode_message_interactively(codec, ask_user_confirmation=True) -def add_subparser(subparsers: TSubparsersAction) -> None: +def add_subparser(subparsers: SubparsersList) -> None: # Browse interactively to avoid spamming the console. parser = subparsers.add_parser( "browse", diff --git a/odxtools/cli/compare.py b/odxtools/cli/compare.py index dc75dfc3..25a622db 100644 --- a/odxtools/cli/compare.py +++ b/odxtools/cli/compare.py @@ -19,7 +19,7 @@ from ..parameters.physicalconstantparameter import PhysicalConstantParameter from ..parameters.valueparameter import ValueParameter from . import _parser_utils -from ._parser_utils import TSubparsersAction +from ._parser_utils import SubparsersList from ._print_utils import (extract_service_tabulation_data, print_dl_metrics, print_service_parameters) @@ -482,11 +482,13 @@ def compare_diagnostic_layers(self, dl1: DiagLayer, # check for deleted diagnostic services if service2.short_name not in dl1_service_names and dl2_request_prefixes[ - service2_idx] not in dl1_request_prefixes and service2 not in service_dict[ - "deleted_services"]: + service2_idx] not in dl1_request_prefixes: - service_dict["deleted_services"].append( # type: ignore[union-attr] - service2) # type: ignore[arg-type] + deleted_list = service_dict["deleted_services"] + assert isinstance(deleted_list, list) + if service2 not in deleted_list: + service_dict["deleted_services"].append( # type: ignore[union-attr] + service2) # type: ignore[arg-type] if service1.short_name == service2.short_name: # compare request, pos. response and neg. response parameters of both diagnostic services @@ -545,7 +547,7 @@ def compare_databases(self, database_new: Database, return changes_variants -def add_subparser(subparsers: TSubparsersAction) -> None: +def add_subparser(subparsers: SubparsersList) -> None: parser = subparsers.add_parser( "compare", description="\n".join([ diff --git a/odxtools/cli/decode.py b/odxtools/cli/decode.py index da9c9c04..134fe605 100644 --- a/odxtools/cli/decode.py +++ b/odxtools/cli/decode.py @@ -9,7 +9,7 @@ from ..odxtypes import ParameterValue from ..singleecujob import SingleEcuJob from . import _parser_utils -from ._parser_utils import TSubparsersAction +from ._parser_utils import SubparsersList # name of the tool _odxtools_tool_name_ = "decode" @@ -71,7 +71,7 @@ def print_summary( print(f" {param_name}={get_display_value(param_value)}") -def add_subparser(subparsers: TSubparsersAction) -> None: +def add_subparser(subparsers: SubparsersList) -> None: parser = subparsers.add_parser( "decode", description="\n".join([ diff --git a/odxtools/cli/dummy_sub_parser.py b/odxtools/cli/dummy_sub_parser.py index 974a396d..2f0f2c99 100644 --- a/odxtools/cli/dummy_sub_parser.py +++ b/odxtools/cli/dummy_sub_parser.py @@ -4,7 +4,7 @@ import traceback from io import StringIO -from ._parser_utils import TSubparsersAction +from ._parser_utils import SubparsersList class DummyTool: @@ -22,7 +22,7 @@ def __init__(self, tool_name: str, error: Exception): self._odxtools_tool_name_ = tool_name self._error = error - def add_subparser(self, subparser_list: TSubparsersAction) -> None: + def add_subparser(self, subparser_list: SubparsersList) -> None: desc = StringIO() print(f"Tool '{self._odxtools_tool_name_}' is unavailable: {self._error}", file=desc) diff --git a/odxtools/cli/find.py b/odxtools/cli/find.py index df3ff1cd..23fca4a8 100644 --- a/odxtools/cli/find.py +++ b/odxtools/cli/find.py @@ -7,7 +7,7 @@ from ..odxtypes import ParameterValue from ..singleecujob import SingleEcuJob from . import _parser_utils -from ._parser_utils import TSubparsersAction +from ._parser_utils import SubparsersList from ._print_utils import print_diagnostic_service # name of the tool @@ -67,7 +67,7 @@ def print_summary(odxdb: Database, print(f"Unknown service: {service}") -def add_subparser(subparsers: TSubparsersAction) -> None: +def add_subparser(subparsers: SubparsersList) -> None: parser = subparsers.add_parser( "find", description="\n".join([ diff --git a/odxtools/cli/list.py b/odxtools/cli/list.py index e1f8f9f6..7298ffb8 100644 --- a/odxtools/cli/list.py +++ b/odxtools/cli/list.py @@ -10,7 +10,7 @@ from ..diagservice import DiagService from ..singleecujob import SingleEcuJob from . import _parser_utils -from ._parser_utils import TSubparsersAction +from ._parser_utils import SubparsersList from ._print_utils import format_desc, print_diagnostic_service, print_dl_metrics # name of the tool @@ -112,7 +112,7 @@ def print_summary(odxdb: Database, rich.print(f" {com_param.short_name}: {com_param.value}") -def add_subparser(subparsers: TSubparsersAction) -> None: +def add_subparser(subparsers: SubparsersList) -> None: parser = subparsers.add_parser( "list", description="\n".join([ diff --git a/odxtools/cli/main.py b/odxtools/cli/main.py index 786c18fb..f9daf3ba 100644 --- a/odxtools/cli/main.py +++ b/odxtools/cli/main.py @@ -4,6 +4,7 @@ from typing import Any, List import odxtools +import odxtools.exceptions from ..version import __version__ as odxtools_version from .dummy_sub_parser import DummyTool diff --git a/odxtools/cli/snoop.py b/odxtools/cli/snoop.py index 2c5158d8..89517025 100644 --- a/odxtools/cli/snoop.py +++ b/odxtools/cli/snoop.py @@ -15,7 +15,7 @@ from odxtools.response import Response, ResponseType from . import _parser_utils -from ._parser_utils import TSubparsersAction +from ._parser_utils import SubparsersList # name of the tool _odxtools_tool_name_ = "snoop" @@ -31,6 +31,8 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None: global odx_diag_layer global last_request + assert odx_diag_layer is not None + if telegram_id == ecu_tx_id: if uds.is_response_pending(payload): print(f" ... (response pending)") @@ -210,7 +212,7 @@ def add_cli_arguments(parser: argparse.ArgumentParser) -> None: _parser_utils.add_pdx_argument(parser) -def add_subparser(subparsers: TSubparsersAction) -> None: +def add_subparser(subparsers: SubparsersList) -> None: parser = subparsers.add_parser( "snoop", description="Live decoding of a diagnostic session.", diff --git a/odxtools/comparaminstance.py b/odxtools/comparaminstance.py index ca824786..084e88bc 100644 --- a/odxtools/comparaminstance.py +++ b/odxtools/comparaminstance.py @@ -120,6 +120,7 @@ def get_subvalue(self, subparam_name: str) -> Optional[str]: name_list = [cp.short_name for cp in comparam_spec.subparams] try: idx = name_list.index(subparam_name) + subparam = comparam_spec.subparams[idx] except ValueError: warnings.warn( f"Communication parameter '{self.short_name}' " @@ -130,8 +131,8 @@ def get_subvalue(self, subparam_name: str) -> Optional[str]: return None result = value_list[idx] - if result is None: - result = comparam_spec.subparams[idx].physical_default_value + if result is None and isinstance(subparam, (Comparam, ComplexComparam)): + result = subparam.physical_default_value if not isinstance(result, str): odxraise() diff --git a/odxtools/compumethods/compumethod.py b/odxtools/compumethods/compumethod.py index 5a43fa81..e8b42f97 100644 --- a/odxtools/compumethods/compumethod.py +++ b/odxtools/compumethods/compumethod.py @@ -1,5 +1,4 @@ # SPDX-License-Identifier: MIT -import abc from dataclasses import dataclass from typing import Literal @@ -15,14 +14,13 @@ @dataclass -class CompuMethod(abc.ABC): +class CompuMethod: internal_type: DataType physical_type: DataType @property - @abc.abstractmethod def category(self) -> CompuMethodCategory: - pass + raise NotImplementedError() def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: raise NotImplementedError() diff --git a/odxtools/compumethods/compuscale.py b/odxtools/compumethods/compuscale.py index 22ff9fe7..def5c5be 100644 --- a/odxtools/compumethods/compuscale.py +++ b/odxtools/compumethods/compuscale.py @@ -88,7 +88,7 @@ def applies(self, internal_value: AtomicOdxType) -> bool: # which is allowed (cf section 7.3.6.6.1) assert self.lower_limit is not None - return internal_value == self.lower_limit._value + return internal_value == self.lower_limit.value elif self.lower_limit is None: # only the upper limit has been specified. the spec is # ambiguous: it only says that if no upper limit is @@ -101,7 +101,7 @@ def applies(self, internal_value: AtomicOdxType) -> bool: # specified. assert self.upper_limit is not None - return internal_value == self.upper_limit._value + return internal_value == self.upper_limit.value return self.lower_limit.complies_to_lower(internal_value) and \ self.upper_limit.complies_to_upper(internal_value) diff --git a/odxtools/compumethods/createanycompumethod.py b/odxtools/compumethods/createanycompumethod.py index ea598246..24c9639f 100644 --- a/odxtools/compumethods/createanycompumethod.py +++ b/odxtools/compumethods/createanycompumethod.py @@ -1,10 +1,8 @@ # SPDX-License-Identifier: MIT -import warnings from typing import Any, Dict, List, Optional from xml.etree import ElementTree -from ..exceptions import OdxWarning, odxassert, odxraise, odxrequire -from ..globals import logger +from ..exceptions import odxassert, odxraise, odxrequire from ..odxlink import OdxDocFragment from ..odxtypes import DataType from .compumethod import CompuMethod @@ -58,10 +56,8 @@ def _parse_compu_scale_to_linear_compu_method( if (string := coeffs.findtext("COMPU-DENOMINATOR/V")) is not None: denominator = float(string) if denominator == 0: - warnings.warn( - "CompuMethod: A denominator of zero will lead to divisions by zero.", - OdxWarning, - stacklevel=1) + odxraise("CompuMethod: A denominator of zero will lead to divisions by zero.") + # Read lower limit internal_lower_limit = Limit.limit_from_et( et_element.find("LOWER-LIMIT"), @@ -184,6 +180,7 @@ def create_any_compu_method_from_et(et_element: ElementTree.Element, return TabIntpCompuMethod( internal_points=internal_points, physical_points=physical_points, **kwargs) - # TODO: Implement other categories (never instantiate CompuMethod) - logger.warning(f"Warning: Computation category {compu_category} is not implemented!") + # TODO: Implement all categories (never instantiate the CompuMethod base class!) + odxraise(f"Warning: Computation category {compu_category} is not implemented!") + return IdenticalCompuMethod(internal_type=DataType.A_UINT32, physical_type=DataType.A_UINT32) diff --git a/odxtools/compumethods/limit.py b/odxtools/compumethods/limit.py index 96924e9a..3fe1f009 100644 --- a/odxtools/compumethods/limit.py +++ b/odxtools/compumethods/limit.py @@ -62,6 +62,10 @@ def set_value_type(self, value_type: DataType) -> None: if self.value_raw is not None: self._value = value_type.from_string(self.value_raw) + @property + def value(self) -> Optional[AtomicOdxType]: + return self._value + def complies_to_upper(self, value: AtomicOdxType) -> bool: """Checks if the value is in the range w.r.t. the upper limit. diff --git a/odxtools/dataobjectproperty.py b/odxtools/dataobjectproperty.py index 3543595d..54cbb774 100644 --- a/odxtools/dataobjectproperty.py +++ b/odxtools/dataobjectproperty.py @@ -120,8 +120,10 @@ def convert_physical_to_internal(self, physical_value: Any) -> Any: return self.compu_method.convert_physical_to_internal(physical_value) - def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeState, - bit_position: int) -> bytes: + def convert_physical_to_bytes(self, + physical_value: Any, + encode_state: EncodeState, + bit_position: int = 0) -> bytes: """ Convert a physical representation of a parameter to a string bytes that can be send over the wire """ diff --git a/odxtools/decodestate.py b/odxtools/decodestate.py index 4dac6241..e414bf1d 100644 --- a/odxtools/decodestate.py +++ b/odxtools/decodestate.py @@ -100,6 +100,7 @@ def extract_atomic_value( text_errors = 'strict' if exceptions.strict_mode else 'replace' if base_data_type == DataType.A_ASCIISTRING: + assert isinstance(internal_value, (bytes, bytearray)) # The spec says ASCII, meaning only byte values 0-127. # But in practice, vendors use iso-8859-1, aka latin-1 # reason being iso-8859-1 never fails since it has a valid @@ -107,9 +108,11 @@ def extract_atomic_value( text_encoding = 'iso-8859-1' internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) elif base_data_type == DataType.A_UTF8STRING: + assert isinstance(internal_value, (bytes, bytearray)) text_encoding = "utf-8" internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) elif base_data_type == DataType.A_UNICODE2STRING: + assert isinstance(internal_value, (bytes, bytearray)) # For UTF-16, we need to manually decode the extracted # bytes to a string text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le" diff --git a/odxtools/determinenumberofitems.py b/odxtools/determinenumberofitems.py index 6e60a6ac..21fa343b 100644 --- a/odxtools/determinenumberofitems.py +++ b/odxtools/determinenumberofitems.py @@ -7,7 +7,7 @@ from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef if TYPE_CHECKING: - from ..diaglayer import DiagLayer + from .diaglayer import DiagLayer @dataclass diff --git a/odxtools/diagcodedtype.py b/odxtools/diagcodedtype.py index 5aa6bcb0..f9c862c8 100644 --- a/odxtools/diagcodedtype.py +++ b/odxtools/diagcodedtype.py @@ -142,6 +142,7 @@ def _minimal_byte_length_of(self, internal_value: Union[bytes, str]) -> int: """Helper method to get the minimal byte length. (needed for LeadingLength- and MinMaxLengthType) """ + byte_length: int = -1 # A_BYTEFIELD, A_ASCIISTRING, A_UNICODE2STRING, A_UTF8STRING if self.base_data_type == DataType.A_BYTEFIELD: byte_length = len(internal_value) diff --git a/odxtools/diaglayer.py b/odxtools/diaglayer.py index c7a80d92..c42d9687 100644 --- a/odxtools/diaglayer.py +++ b/odxtools/diaglayer.py @@ -53,6 +53,9 @@ class DiagLayer: diag_layer_raw: DiagLayerRaw + def __post_init__(self) -> None: + self._global_negative_responses: NamedItemList[Response] + @staticmethod def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> "DiagLayer": diag_layer_raw = DiagLayerRaw.from_et(et_element, doc_frags) @@ -1149,14 +1152,8 @@ def decode(self, message: bytes) -> List[Message]: return self._decode(message, candidate_services) - def decode_response(self, response: bytes, request: Union[bytes, Message]) -> List[Message]: - if isinstance(request, Message): - candidate_services = [request.service] - else: - if not isinstance(request, (bytes, bytearray)): - raise TypeError(f"Request parameter must have type " - f"Message, bytes or bytearray but was {type(request)}") - candidate_services = self._find_services_for_uds(request) + def decode_response(self, response: bytes, request: bytes) -> List[Message]: + candidate_services = self._find_services_for_uds(request) if candidate_services is None: raise DecodeError(f"Couldn't find corresponding service for request {request.hex()}.") diff --git a/odxtools/dopbase.py b/odxtools/dopbase.py index 5e019c7b..e2b29ab3 100644 --- a/odxtools/dopbase.py +++ b/odxtools/dopbase.py @@ -64,8 +64,10 @@ def is_valid_physical_value(self, physical_value: ParameterValue) -> bool: """ raise NotImplementedError - def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, - bit_position: int) -> bytes: + def convert_physical_to_bytes(self, + physical_value: ParameterValue, + encode_state: EncodeState, + bit_position: int = 0) -> bytes: """Convert the physical value into bytes.""" raise NotImplementedError diff --git a/odxtools/dtcdop.py b/odxtools/dtcdop.py index afc01384..9c7aa3f2 100644 --- a/odxtools/dtcdop.py +++ b/odxtools/dtcdop.py @@ -128,8 +128,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: return dtc - def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, - bit_position: int) -> bytes: + def convert_physical_to_bytes(self, + physical_value: ParameterValue, + encode_state: EncodeState, + bit_position: int = 0) -> bytes: if isinstance(physical_value, DiagnosticTroubleCode): trouble_code = physical_value.trouble_code elif isinstance(physical_value, int): diff --git a/odxtools/endofpdufield.py b/odxtools/endofpdufield.py index c94b25eb..7d8058d9 100644 --- a/odxtools/endofpdufield.py +++ b/odxtools/endofpdufield.py @@ -41,7 +41,7 @@ def from_et(et_element: ElementTree.Element, def convert_physical_to_bytes( self, - physical_values: ParameterValue, + physical_value: ParameterValue, encode_state: EncodeState, bit_position: int = 0, ) -> bytes: @@ -49,13 +49,13 @@ def convert_physical_to_bytes( odxassert( bit_position == 0, "End of PDU field must be byte aligned. " "Is there an error in reading the .odx?", EncodeError) - if not isinstance(physical_values, list): + if not isinstance(physical_value, list): odxraise( f"Expected a list of values for end-of-pdu field {self.short_name}, " - f"got {type(physical_values)}", EncodeError) + f"got {type(physical_value)}", EncodeError) coded_message = b'' - for value in physical_values: + for value in physical_value: coded_message += self.structure.convert_physical_to_bytes(value, encode_state) return coded_message diff --git a/odxtools/environmentdatadescription.py b/odxtools/environmentdatadescription.py index c1943bf1..f4223b98 100644 --- a/odxtools/environmentdatadescription.py +++ b/odxtools/environmentdatadescription.py @@ -86,8 +86,10 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None: for ed in self.env_datas: ed._resolve_snrefs(diag_layer) - def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, - bit_position: int) -> bytes: + def convert_physical_to_bytes(self, + physical_value: ParameterValue, + encode_state: EncodeState, + bit_position: int = 0) -> bytes: """Convert the physical value into bytes. Since environmental data is supposed to never appear on the diff --git a/odxtools/isotp_state_machine.py b/odxtools/isotp_state_machine.py index 5ce30a26..f6591651 100644 --- a/odxtools/isotp_state_machine.py +++ b/odxtools/isotp_state_machine.py @@ -55,10 +55,13 @@ def decode_rx_frame(self, rx_id: int, data: bytes) -> Iterable[Tuple[int, bytes] return # unknown CAN ID # decode the isotp segment - (frame_type,) = bitstruct.unpack("u4", data) + frame_type, _ = bitstruct.unpack("u4", data) + assert isinstance(frame_type, int) + telegram_len = None if frame_type == IsoTp.FRAME_TYPE_SINGLE: frame_type, telegram_len = bitstruct.unpack("u4u4", data) + assert isinstance(telegram_len, int) self.on_single_frame(telegram_idx, data[1:1 + telegram_len]) self.on_telegram_complete(telegram_idx, data[1:1 + telegram_len]) @@ -67,6 +70,7 @@ def decode_rx_frame(self, rx_id: int, data: bytes) -> Iterable[Tuple[int, bytes] elif frame_type == IsoTp.FRAME_TYPE_FIRST: frame_type, telegram_len = bitstruct.unpack("u4u12", data) + assert isinstance(telegram_len, int) self._telegram_specified_len[telegram_idx] = telegram_len self._telegram_data[telegram_idx] = bytearray(data[2:]) @@ -76,11 +80,13 @@ def decode_rx_frame(self, rx_id: int, data: bytes) -> Iterable[Tuple[int, bytes] elif frame_type == IsoTp.FRAME_TYPE_CONSECUTIVE: frame_type, rx_segment_idx = bitstruct.unpack("u4u4", data) + assert isinstance(rx_segment_idx, int) expected_segment_idx = (self._telegram_last_rx_fragment_idx[telegram_idx] + 1) % 16 telegram_data = self._telegram_data[telegram_idx] assert isinstance(telegram_data, bytearray) + n = -1 if expected_segment_idx == rx_segment_idx: self._telegram_last_rx_fragment_idx[telegram_idx] = rx_segment_idx telegram_data += data[1:] @@ -102,6 +108,8 @@ def decode_rx_frame(self, rx_id: int, data: bytes) -> Iterable[Tuple[int, bytes] elif frame_type == IsoTp.FRAME_TYPE_FLOW_CONTROL: frame_type, flow_control_flag = bitstruct.unpack("u4u4", data) + assert isinstance(flow_control_flag, int) + self.on_flow_control_frame(telegram_idx, flow_control_flag) else: self.on_frame_type_error(telegram_idx, frame_type) @@ -139,7 +147,7 @@ async def read_telegrams(self, bus: Union[can.BusABC, return if m := self.can_normal_frame_re.match(cur_line.strip()): - #frame_interface = m.group(1) + # frame_interface = m.group(1) frame_id = int(m.group(2), 16) frame_data_formatted = m.group(3).strip() @@ -151,7 +159,7 @@ async def read_telegrams(self, bus: Union[can.BusABC, elif (m := self.can_log_frame_re.match( cur_line.strip())) or (m := self.can_fd_log_frame_re.match( cur_line.strip())): - #frame_interface = m.group(2) + # frame_interface = m.group(2) frame_id = int(m.group(2), 16) frame_data_formatted = m.group(3).strip() @@ -257,7 +265,7 @@ def can_tx_id(self, telegram_idx: int) -> int: def on_single_frame(self, telegram_idx: int, frame_payload: bytes) -> None: # send ACK - #rx_id = self.can_rx_id(telegram_idx) + # rx_id = self.can_rx_id(telegram_idx) tx_id = self.can_tx_id(telegram_idx) block_size = 0xFF min_separation_time = 0 # ms @@ -276,7 +284,7 @@ def on_single_frame(self, telegram_idx: int, frame_payload: bytes) -> None: def on_first_frame(self, telegram_idx: int, frame_payload: bytes) -> None: # send ACK - #rx_id = self.can_rx_id(telegram_idx) + # rx_id = self.can_rx_id(telegram_idx) tx_id = self.can_tx_id(telegram_idx) block_size = 0xFF # default value, can be overwritten later min_separation_time = 0 # ms @@ -308,7 +316,7 @@ def on_consecutive_frame(self, telegram_idx: int, segment_idx: int, # send new ACK if necessary block_size = self._block_size[telegram_idx] if block_size is not None and num_received >= block_size: - #rx_id = self.can_rx_id(telegram_idx) + # rx_id = self.can_rx_id(telegram_idx) tx_id = self.can_tx_id(telegram_idx) min_separation_time = 0 # ms fc_payload = bitstruct.pack( diff --git a/odxtools/multiplexer.py b/odxtools/multiplexer.py index 8e1970c0..fe11c58f 100644 --- a/odxtools/multiplexer.py +++ b/odxtools/multiplexer.py @@ -74,8 +74,10 @@ def _get_case_limits(self, case: MultiplexerCase) -> Tuple[AtomicOdxType, Atomic odxraise("Upper and lower bounds of limits must compareable") return lower_limit, upper_limit - def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, - bit_position: int) -> bytes: + def convert_physical_to_bytes(self, + physical_value: ParameterValue, + encode_state: EncodeState, + bit_position: int = 0) -> bytes: if bit_position != 0: raise EncodeError("Multiplexer must be aligned, i.e. bit_position=0, but " @@ -126,22 +128,23 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: f" for multiplexer '{self.short_name}')") case_value: Optional[ParameterValue] = None - for case in self.cases or []: - lower, upper = self._get_case_limits(case) + mux_case = None + for mux_case in self.cases or []: + lower, upper = self._get_case_limits(mux_case) if lower <= key_value and key_value <= upper: # type: ignore[operator] - if case._structure: - case_value = case._structure.decode_from_pdu(decode_state) + if mux_case._structure: + case_value = mux_case._structure.decode_from_pdu(decode_state) break if case_value is None and self.default_case is not None: if self.default_case._structure: case_value = self.default_case._structure.decode_from_pdu(decode_state) - if case_value is None: + if mux_case is None or case_value is None: odxraise(f"Failed to find a matching case in {self.short_name} for value {key_value!r}", DecodeError) - mux_value = (case.short_name, case_value) + mux_value = (mux_case.short_name, case_value) # go back to the original origin decode_state.origin_byte_position = orig_origin diff --git a/odxtools/multiplexerswitchkey.py b/odxtools/multiplexerswitchkey.py index 99e28607..3afc85de 100644 --- a/odxtools/multiplexerswitchkey.py +++ b/odxtools/multiplexerswitchkey.py @@ -7,7 +7,7 @@ from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef if TYPE_CHECKING: - from ..diaglayer import DiagLayer + from .diaglayer import DiagLayer @dataclass diff --git a/odxtools/nameditemlist.py b/odxtools/nameditemlist.py index 64dda3b7..5b9b3ac0 100644 --- a/odxtools/nameditemlist.py +++ b/odxtools/nameditemlist.py @@ -12,7 +12,7 @@ class OdxNamed(Protocol): @property def short_name(self) -> str: - pass + ... T = TypeVar("T") @@ -139,7 +139,8 @@ def __getitem__(self, key: str) -> T: def __getitem__(self, key: slice) -> List[T]: ... - def __getitem__(self, key: Union[SupportsIndex, str, slice]) -> Union[T, List[T]]: + def __getitem__( # type: ignore + self, key: Union[SupportsIndex, str, slice]) -> Union[T, List[T]]: if isinstance(key, (SupportsIndex, slice)): return super().__getitem__(key) else: @@ -177,7 +178,7 @@ def __repr__(self) -> str: class NamedItemList(ItemAttributeList[T]): - def _get_item_key(self, obj: T) -> str: + def _get_item_key(self, item: T) -> str: """Transform an object's `short_name` attribute into a valid python identifier @@ -187,9 +188,9 @@ def _get_item_key(self, obj: T) -> str: such short names. """ - if not isinstance(obj, OdxNamed): + if not isinstance(item, OdxNamed): odxraise() - sn = obj.short_name + sn = item.short_name if not isinstance(sn, str): odxraise() diff --git a/odxtools/odxlink.py b/odxtools/odxlink.py index f5822501..76a9a239 100644 --- a/odxtools/odxlink.py +++ b/odxtools/odxlink.py @@ -181,7 +181,7 @@ def resolve(self, ref: OdxLinkRef, expected_type: None = None) -> Any: def resolve(self, ref: OdxLinkRef, expected_type: Type[T]) -> T: ... - def resolve(self, ref: OdxLinkRef, expected_type: Optional[Type[T]] = None) -> Any: + def resolve(self, ref: OdxLinkRef, expected_type: Optional[Any] = None) -> Any: """ Resolve a reference to an object @@ -223,7 +223,7 @@ def resolve_lenient(self, ref: OdxLinkRef, expected_type: Type[T]) -> Optional[T def resolve_lenient(self, ref: OdxLinkRef, - expected_type: Optional[Type[T]] = None) -> Optional[Any]: + expected_type: Optional[Any] = None) -> Optional[Any]: """ Resolve a reference to an object diff --git a/odxtools/parameters/createanyparameter.py b/odxtools/parameters/createanyparameter.py index fb130625..0688ca2d 100644 --- a/odxtools/parameters/createanyparameter.py +++ b/odxtools/parameters/createanyparameter.py @@ -40,6 +40,8 @@ def create_any_parameter_from_et(et_element: ElementTree.Element, sdgs = create_sdgs_from_et(et_element.find("SDGS"), doc_frags) # Which attributes are set depends on the type of the parameter. + dop_ref = None + dop_snref = None if parameter_type in ["VALUE", "PHYS-CONST", "SYSTEM", "LENGTH-KEY"]: dop_ref = OdxLinkRef.from_et(et_element.find("DOP-REF"), doc_frags) dop_snref = None diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index c16b4792..d3a1e58a 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -5,7 +5,7 @@ from ..decodestate import DecodeState from ..encodestate import EncodeState -from ..exceptions import EncodeError, OdxWarning, odxraise +from ..exceptions import DecodeError, EncodeError, OdxWarning, odxraise from ..odxlink import OdxLinkDatabase, OdxLinkId, OdxLinkRef from ..odxtypes import ParameterValue from .parameter import Parameter, ParameterType @@ -136,8 +136,9 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: decode_state.table_keys[key_name] table_row = decode_state.table_keys.get(key_name) if table_row is None: - raise odxraise(f"No table key '{key_name}' found when decoding " - f"table struct parameter '{str(self.short_name)}'") + odxraise( + f"No table key '{key_name}' found when decoding " + f"table struct parameter '{str(self.short_name)}'", DecodeError) dummy_val = cast(str, None), cast(int, None) return dummy_val diff --git a/odxtools/statetransition.py b/odxtools/statetransition.py index aae6822d..5b40e131 100644 --- a/odxtools/statetransition.py +++ b/odxtools/statetransition.py @@ -20,7 +20,7 @@ class StateTransition(IdentifiableElement): """ source_snref: str target_snref: str - #external_access_method: Optional[ExternalAccessMethod] # TODO + # external_access_method: Optional[ExternalAccessMethod] # TODO @property def source_state(self) -> State: diff --git a/odxtools/write_pdx_file.py b/odxtools/write_pdx_file.py index acfa5c1e..ed614e39 100644 --- a/odxtools/write_pdx_file.py +++ b/odxtools/write_pdx_file.py @@ -120,9 +120,9 @@ def write_pdx_file( creation_date = file_cdate.strftime("%Y-%m-%dT%H:%M:%S") mime_type = "text/plain" - if template_file_name.endswith(".odx-cs"): + if output_file_name.endswith(".odx-cs"): mime_type = "application/x-asam.odx.odx-cs" - elif template_file_name.endswith(".odx-d"): + elif output_file_name.endswith(".odx-d"): mime_type = "application/x-asam.odx.odx-d" zf_name = os.path.basename(output_file_name) diff --git a/tests/test_cli.py b/tests/test_cli.py index f9cf0d49..18dd5649 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,6 +2,7 @@ import unittest from argparse import Namespace +from types import ModuleType from typing import List, Optional from unittest.mock import MagicMock, patch @@ -10,12 +11,11 @@ import odxtools.cli.find as find import odxtools.cli.list as list_tool -browse_import_failed = False - +browse: Optional[ModuleType] try: import odxtools.cli.browse as browse except ImportError: - browse_import_failed = True + browse = None class UtilFunctions: @@ -131,11 +131,12 @@ def test_compare_tool(self) -> None: ]) UtilFunctions.run_compare_tool(ecu_variants=["somersault_lazy", "somersault_assiduous"]) - @unittest.skipIf(browse_import_failed, "importing the browse tool failed") + @unittest.skipIf(browse is None, "importing the browse tool failed") # browse is an interactive tool, so we need to mock a few # functions to make PyInquirer reliably bail out @patch("sys.stdout.isatty", return_value=False) def test_browse_tool(self, pi_prompt: MagicMock) -> None: + assert browse is not None browse_args = Namespace(pdx_file="./examples/somersault.pdx") with self.assertRaises(SystemError): # browse can only be run interactively diff --git a/tests/test_odxtools.py b/tests/test_odxtools.py index 4493643a..45a4a13c 100644 --- a/tests/test_odxtools.py +++ b/tests/test_odxtools.py @@ -4,6 +4,7 @@ from typing import List import odxtools +import odxtools.exceptions from odxtools.exceptions import OdxError from odxtools.load_pdx_file import load_pdx_file from odxtools.nameditemlist import NamedItemList diff --git a/tests/test_somersault.py b/tests/test_somersault.py index 5924198d..fa6eda71 100644 --- a/tests/test_somersault.py +++ b/tests/test_somersault.py @@ -1,20 +1,12 @@ # SPDX-License-Identifier: MIT import unittest +from io import StringIO +from unittest.mock import patch from odxtools.exceptions import OdxError, odxrequire from odxtools.load_pdx_file import load_pdx_file from odxtools.parameters.nrcconstparameter import NrcConstParameter -try: - from unittest.mock import patch -except ImportError: - from mock import patch # type: ignore # noqa: UP026 - -try: - from StringIO import StringIO -except ImportError: - from io import StringIO - odxdb = load_pdx_file("./examples/somersault.pdx") @@ -241,29 +233,24 @@ def test_code_table_params(self) -> None: })) self.assertEqual(resp_data.hex(), "622a5c03fa7bf9") + # test decoding an object featuring a TableStruct parameter decoded_resp_data = pr.decode(resp_data) assert isinstance(decoded_resp_data, dict) self.assertEqual(decoded_resp_data["dizzyness_level"], 42) self.assertEqual(decoded_resp_data["happiness_level"], 92) self.assertEqual(decoded_resp_data["last_pos_response_key"], "forward_grudging") - self.assertEqual( - decoded_resp_data["last_pos_response"][0], # type: ignore[index] - "forward_grudging") - self.assertEqual( - set(decoded_resp_data["last_pos_response"] - [1].keys()), # type: ignore[index, union-attr] - {"sid", "num_flips_done", "sault_time"}) + last_pos_response = decoded_resp_data["last_pos_response"] + assert isinstance(last_pos_response, tuple) + lpr_name, lpr_value = last_pos_response + assert isinstance(lpr_name, str) + assert isinstance(lpr_value, dict) + self.assertEqual(lpr_name, "forward_grudging") + self.assertEqual(set(lpr_value.keys()), {"sid", "num_flips_done", "sault_time"}) # the num_flips_done parameter is a matching request parameter # for this response, so it produces a binary blob. possibly, # it should be changed to a ValueParameter... - self.assertEqual( - decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] - ["num_flips_done"], # type: ignore[index, call-overload] - 123) - self.assertEqual( - decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] - ["sault_time"], # type: ignore[index, call-overload] - 249) + self.assertEqual(lpr_value["num_flips_done"], 123) + self.assertEqual(lpr_value["sault_time"], 249) # test the "backward flips grudgingly done" response resp_data = pr.encode(