Skip to content

Commit

Permalink
fix a few issues flagged by pytype and pyright
Browse files Browse the repository at this point in the history
pytype in particular seems to be missing a few features and is
glacially slow...

Signed-off-by: Andreas Lauser <[email protected]>
Signed-off-by: Christian Hackenbeck <[email protected]>
  • Loading branch information
andlaus committed Mar 4, 2024
1 parent 97f9edf commit 8b37dc4
Show file tree
Hide file tree
Showing 37 changed files with 157 additions and 136 deletions.
23 changes: 13 additions & 10 deletions examples/somersaultlazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import asyncio
import logging
import random
from typing import List, Optional, Union
from socket import socket
from typing import List, Optional, Union, cast

import isotp
import isotp.tpsock
import somersaultecu

import odxtools
import odxtools.uds as uds
from odxtools.exceptions import odxrequire
from odxtools.exceptions import DecodeError, odxrequire
from odxtools.message import Message
from odxtools.odxtypes import ParameterValueDict
from odxtools.response import Response, ResponseType
Expand All @@ -41,11 +42,13 @@ def create_isotp_socket(channel: Optional[str], rxid: int,
if is_sterile:
return None

assert channel is not None

# create an ISO-TP socket without a timeout (timeouts are handled
# using asyncio). Also, for asyncio to work with this socket, it
# must be non-blocking...
result_socket = isotp.socket(timeout=None)
result_socket._socket.setblocking(0)
result_socket._socket.setblocking(False)

# set the ISO-TP flow control options:
#
Expand Down Expand Up @@ -81,7 +84,7 @@ async def ecu_send(isotp_socket: Optional[isotp.tpsock.socket], payload: bytes)
assert isotp_socket is not None

loop = asyncio.get_running_loop()
await loop.sock_sendall(isotp_socket, payload)
await loop.sock_sendall(cast(socket, isotp_socket), payload)


async def ecu_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
Expand All @@ -107,7 +110,7 @@ async def ecu_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
assert isotp_socket is not None

loop = asyncio.get_running_loop()
return await loop.sock_recv(isotp_socket, 4095)
return await loop.sock_recv(cast(socket, isotp_socket), 4095)


async def tester_send(isotp_socket: Optional[isotp.tpsock.socket], payload: bytes) -> None:
Expand All @@ -127,7 +130,7 @@ async def tester_send(isotp_socket: Optional[isotp.tpsock.socket], payload: byte
assert isotp_socket is not None

loop = asyncio.get_running_loop()
await loop.sock_sendall(isotp_socket, payload)
await loop.sock_sendall(cast(socket, isotp_socket), payload)


async def tester_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
Expand All @@ -153,7 +156,7 @@ async def tester_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
assert isotp_socket is not None

loop = asyncio.get_running_loop()
return await loop.sock_recv(isotp_socket, 4095)
return await loop.sock_recv(cast(socket, isotp_socket), 4095)


class SomersaultLazyEcu:
Expand Down Expand Up @@ -202,7 +205,7 @@ async def _handle_requests_task(self) -> None:
# that requests can always be uniquely decoded
assert len(messages) == 1
await self._handle_request(messages[0])
except odxtools.exceptions.DecodeError as e:
except DecodeError as e:
ecu_logger.warning(f"Could not decode request "
f"0x{data.hex()}: {e}")
return
Expand Down Expand Up @@ -382,7 +385,7 @@ async def tester_await_response(isotp_socket: Optional[isotp.tpsock.socket],

return replies[0].param_dict

except odxtools.exceptions.DecodeError as e:
except DecodeError as e:
if len(raw_response) >= 3:
sid = raw_response[0]
rq_sid = raw_response[1]
Expand Down
8 changes: 4 additions & 4 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,18 @@ def _validate_coded_message(self, coded_message: bytes) -> None:
stacklevel=1)

def convert_physical_to_bytes(self,
param_values: ParameterValue,
physical_value: ParameterValue,
encode_state: EncodeState,
bit_position: int = 0) -> bytes:
if not isinstance(param_values, dict):
if not isinstance(physical_value, dict):
raise EncodeError(
f"Expected a dictionary for the values of structure {self.short_name}, "
f"got {type(param_values)}")
f"got {type(physical_value)}")
if bit_position != 0:
raise EncodeError("Structures must be aligned, i.e. bit_position=0, but "
f"{self.short_name} was passed the bit position {bit_position}")
return self.convert_physical_to_internal(
param_values,
physical_value,
triggering_coded_request=encode_state.triggering_request,
is_end_of_pdu=encode_state.is_end_of_pdu,
)
Expand Down
5 changes: 1 addition & 4 deletions odxtools/cli/_parser_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: MIT
import argparse
from typing import Any, Protocol, TypeVar, runtime_checkable
from typing import Any, Protocol, runtime_checkable

from ..database import Database
from ..load_file import load_file as _load_file
Expand All @@ -22,9 +22,6 @@ def add_parser(self, name: str, **kwargs: Any) -> "argparse.ArgumentParser":
...


TSubparsersAction = TypeVar("TSubparsersAction", bound=SubparsersList)


def add_pdx_argument(parser: argparse.ArgumentParser, is_optional: bool = False) -> None:
parser.add_argument(
"pdx_file",
Expand Down
1 change: 1 addition & 0 deletions odxtools/cli/_print_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def extract_parameter_tabulation_data(parameters: List[Parameter]) -> Dict[str,
byte.append(param.byte_position)
semantic.append(param.semantic)
param_type.append(param.parameter_type)
length = 0
if param.get_static_bit_length() is not None:
bit_length.append(param.get_static_bit_length())
length = (param.get_static_bit_length() or 0) // 4
Expand Down
31 changes: 17 additions & 14 deletions odxtools/cli/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from typing import List, Optional, Union, cast

import InquirerPy.prompt as PI_prompt
import InquirerPy.prompt as IP_prompt
from tabulate import tabulate # TODO: switch to rich tables

from ..database import Database
Expand All @@ -20,7 +20,7 @@
from ..request import Request
from ..response import Response
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import extract_parameter_tabulation_data

# name of the tool
Expand Down Expand Up @@ -89,13 +89,15 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp
# else _convert_string_to_odx_type(x, p.physical_type.base_data_type, param=p) # This does not work because the next parameter to be promted is used (for some reason?)
}]

if hasattr(parameter, "dop") and hasattr(parameter.dop, "compu_method") \
and hasattr(parameter.dop.compu_method, "get_scales"):
scales = parameter.dop.compu_method.get_scales()
if (dop := getattr(parameter, "dop", None)) and \
(compu_method := getattr(dop, "compu_method", None)):
scales = compu_method.internal_to_phys
choices = [scale.compu_const for scale in scales if scale is not None]
if (cdv := compu_method.compu_default_value) is not None:
choices.append(cdv.compu_const)
param_prompt[0]["choices"] = choices

answer = PI_prompt(param_prompt)
answer = IP_prompt(param_prompt)
if answer.get(parameter.short_name) == "" and not parameter.is_required:
return None
elif parameter.physical_type.base_data_type is not None:
Expand All @@ -105,7 +107,7 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp
logging.warning(
f"Parameter {parameter.short_name} does not have a physical data type. Param details: {parameter}"
)
return answer.get(parameter.short_name)
return cast(str, answer.get(parameter.short_name))


def encode_message_interactively(sub_service: Union[Request, Response],
Expand All @@ -118,7 +120,7 @@ def encode_message_interactively(sub_service: Union[Request, Response],
for param_or_dict in param_dict.values():
if isinstance(param_or_dict, dict):
for param in param_or_dict.values():
if not isinstance(param_or_dict, dict) and param.is_settable:
if isinstance(param, Parameter) and param.is_settable:
exists_definable_param = True
elif param_or_dict.is_settable:
exists_definable_param = True
Expand All @@ -133,10 +135,11 @@ def encode_message_interactively(sub_service: Union[Request, Response],
"message": f"Do you want to encode a message? [y/n]",
"choices": ["yes", "no"],
}]
answer = PI_prompt(encode_message_prompt)
answer = IP_prompt(encode_message_prompt)
if answer.get("yes_no_prompt") == "no":
return

answered_request = b''
if isinstance(sub_service, Response):
answered_request_prompt = [{
"type":
Expand All @@ -148,7 +151,7 @@ def encode_message_interactively(sub_service: Union[Request, Response],
"filter":
lambda input: _convert_string_to_bytes(input),
}]
answer = PI_prompt(answered_request_prompt)
answer = IP_prompt(answered_request_prompt)
answered_request = cast(bytes, answer.get("request"))
print(f"Input interpretation as list: {list(answered_request)}")

Expand Down Expand Up @@ -268,7 +271,7 @@ def browse(odxdb: Database) -> None:
"message": "Select a Variant.",
"choices": list(dl_names) + ["[exit]"],
}]
answer = PI_prompt(selection)
answer = IP_prompt(selection)
if answer.get("variant") == "[exit]":
return

Expand Down Expand Up @@ -306,7 +309,7 @@ def browse(odxdb: Database) -> None:
f"The variant {variant.short_name} offers the following services. Select one!",
"choices": [s.short_name for s in services] + ["[back]"],
}]
answer = PI_prompt(selection)
answer = IP_prompt(selection)
if answer.get("service") == "[back]":
break

Expand Down Expand Up @@ -341,7 +344,7 @@ def browse(odxdb: Database) -> None:
"short": f"Negative response: {nr.short_name}",
} for nr in service.negative_responses] + ["[back]"], # type: ignore
}]
answer = PI_prompt(selection)
answer = IP_prompt(selection)
if answer.get("message_type") == "[back]":
continue

Expand All @@ -355,7 +358,7 @@ def browse(odxdb: Database) -> None:
encode_message_interactively(codec, ask_user_confirmation=True)


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
# Browse interactively to avoid spamming the console.
parser = subparsers.add_parser(
"browse",
Expand Down
14 changes: 8 additions & 6 deletions odxtools/cli/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ..parameters.physicalconstantparameter import PhysicalConstantParameter
from ..parameters.valueparameter import ValueParameter
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import (extract_service_tabulation_data, print_dl_metrics,
print_service_parameters)

Expand Down Expand Up @@ -482,11 +482,13 @@ def compare_diagnostic_layers(self, dl1: DiagLayer,

# check for deleted diagnostic services
if service2.short_name not in dl1_service_names and dl2_request_prefixes[
service2_idx] not in dl1_request_prefixes and service2 not in service_dict[
"deleted_services"]:
service2_idx] not in dl1_request_prefixes:

service_dict["deleted_services"].append( # type: ignore[union-attr]
service2) # type: ignore[arg-type]
deleted_list = service_dict["deleted_services"]
assert isinstance(deleted_list, list)
if service2 not in deleted_list:
service_dict["deleted_services"].append( # type: ignore[union-attr]
service2) # type: ignore[arg-type]

if service1.short_name == service2.short_name:
# compare request, pos. response and neg. response parameters of both diagnostic services
Expand Down Expand Up @@ -545,7 +547,7 @@ def compare_databases(self, database_new: Database,
return changes_variants


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"compare",
description="\n".join([
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..odxtypes import ParameterValue
from ..singleecujob import SingleEcuJob
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList

# name of the tool
_odxtools_tool_name_ = "decode"
Expand Down Expand Up @@ -71,7 +71,7 @@ def print_summary(
print(f" {param_name}={get_display_value(param_value)}")


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"decode",
description="\n".join([
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/dummy_sub_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import traceback
from io import StringIO

from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList


class DummyTool:
Expand All @@ -22,7 +22,7 @@ def __init__(self, tool_name: str, error: Exception):
self._odxtools_tool_name_ = tool_name
self._error = error

def add_subparser(self, subparser_list: TSubparsersAction) -> None:
def add_subparser(self, subparser_list: SubparsersList) -> None:
desc = StringIO()

print(f"Tool '{self._odxtools_tool_name_}' is unavailable: {self._error}", file=desc)
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..odxtypes import ParameterValue
from ..singleecujob import SingleEcuJob
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import print_diagnostic_service

# name of the tool
Expand Down Expand Up @@ -67,7 +67,7 @@ def print_summary(odxdb: Database,
print(f"Unknown service: {service}")


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"find",
description="\n".join([
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..diagservice import DiagService
from ..singleecujob import SingleEcuJob
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import format_desc, print_diagnostic_service, print_dl_metrics

# name of the tool
Expand Down Expand Up @@ -112,7 +112,7 @@ def print_summary(odxdb: Database,
rich.print(f" {com_param.short_name}: {com_param.value}")


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"list",
description="\n".join([
Expand Down
1 change: 1 addition & 0 deletions odxtools/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, List

import odxtools
import odxtools.exceptions

from ..version import __version__ as odxtools_version
from .dummy_sub_parser import DummyTool
Expand Down
6 changes: 4 additions & 2 deletions odxtools/cli/snoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from odxtools.response import Response, ResponseType

from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList

# name of the tool
_odxtools_tool_name_ = "snoop"
Expand All @@ -31,6 +31,8 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None:
global odx_diag_layer
global last_request

assert odx_diag_layer is not None

if telegram_id == ecu_tx_id:
if uds.is_response_pending(payload):
print(f" ... (response pending)")
Expand Down Expand Up @@ -210,7 +212,7 @@ def add_cli_arguments(parser: argparse.ArgumentParser) -> None:
_parser_utils.add_pdx_argument(parser)


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"snoop",
description="Live decoding of a diagnostic session.",
Expand Down
Loading

0 comments on commit 8b37dc4

Please sign in to comment.