Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make more linters happy #277

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions examples/somersaultlazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import asyncio
import logging
import random
from typing import List, Optional, Union
from socket import socket
from typing import List, Optional, Union, cast

import isotp
import isotp.tpsock
import somersaultecu

import odxtools
import odxtools.uds as uds
from odxtools.exceptions import odxrequire
from odxtools.exceptions import DecodeError, odxrequire
from odxtools.message import Message
from odxtools.odxtypes import ParameterValueDict
from odxtools.response import Response, ResponseType
Expand All @@ -41,11 +42,13 @@ def create_isotp_socket(channel: Optional[str], rxid: int,
if is_sterile:
return None

assert channel is not None

# create an ISO-TP socket without a timeout (timeouts are handled
# using asyncio). Also, for asyncio to work with this socket, it
# must be non-blocking...
result_socket = isotp.socket(timeout=None)
result_socket._socket.setblocking(0)
result_socket._socket.setblocking(False)

# set the ISO-TP flow control options:
#
Expand Down Expand Up @@ -81,7 +84,7 @@ async def ecu_send(isotp_socket: Optional[isotp.tpsock.socket], payload: bytes)
assert isotp_socket is not None

loop = asyncio.get_running_loop()
await loop.sock_sendall(isotp_socket, payload)
await loop.sock_sendall(cast(socket, isotp_socket), payload)


async def ecu_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
Expand All @@ -107,7 +110,7 @@ async def ecu_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
assert isotp_socket is not None

loop = asyncio.get_running_loop()
return await loop.sock_recv(isotp_socket, 4095)
return await loop.sock_recv(cast(socket, isotp_socket), 4095)


async def tester_send(isotp_socket: Optional[isotp.tpsock.socket], payload: bytes) -> None:
Expand All @@ -127,7 +130,7 @@ async def tester_send(isotp_socket: Optional[isotp.tpsock.socket], payload: byte
assert isotp_socket is not None

loop = asyncio.get_running_loop()
await loop.sock_sendall(isotp_socket, payload)
await loop.sock_sendall(cast(socket, isotp_socket), payload)


async def tester_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
Expand All @@ -153,7 +156,7 @@ async def tester_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
assert isotp_socket is not None

loop = asyncio.get_running_loop()
return await loop.sock_recv(isotp_socket, 4095)
return await loop.sock_recv(cast(socket, isotp_socket), 4095)


class SomersaultLazyEcu:
Expand Down Expand Up @@ -202,7 +205,7 @@ async def _handle_requests_task(self) -> None:
# that requests can always be uniquely decoded
assert len(messages) == 1
await self._handle_request(messages[0])
except odxtools.exceptions.DecodeError as e:
except DecodeError as e:
ecu_logger.warning(f"Could not decode request "
f"0x{data.hex()}: {e}")
return
Expand Down Expand Up @@ -382,7 +385,7 @@ async def tester_await_response(isotp_socket: Optional[isotp.tpsock.socket],

return replies[0].param_dict

except odxtools.exceptions.DecodeError as e:
except DecodeError as e:
if len(raw_response) >= 3:
sid = raw_response[0]
rq_sid = raw_response[1]
Expand Down
10 changes: 5 additions & 5 deletions odxtools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@
- _`[ISO22901]` The ISO 22901 Standard: https://www.iso.org/standard/41207.html

"""
from .load_file import load_file as load_file
from .load_odx_d_file import load_odx_d_file as load_odx_d_file
from .load_pdx_file import load_pdx_file as load_pdx_file
from .version import __version__ as __version__
from .write_pdx_file import write_pdx_file as write_pdx_file
from .load_file import load_file as load_file # noqa: F401
from .load_odx_d_file import load_odx_d_file as load_odx_d_file # noqa: F401
from .load_pdx_file import load_pdx_file as load_pdx_file # noqa: F401
from .version import __version__ as __version__ # noqa: F401
from .write_pdx_file import write_pdx_file as write_pdx_file # noqa: F401

__author__ = "Katrin Bauer"

Expand Down
8 changes: 4 additions & 4 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,18 @@ def _validate_coded_message(self, coded_message: bytes) -> None:
stacklevel=1)

def convert_physical_to_bytes(self,
param_values: ParameterValue,
physical_value: ParameterValue,
encode_state: EncodeState,
bit_position: int = 0) -> bytes:
if not isinstance(param_values, dict):
if not isinstance(physical_value, dict):
raise EncodeError(
f"Expected a dictionary for the values of structure {self.short_name}, "
f"got {type(param_values)}")
f"got {type(physical_value)}")
if bit_position != 0:
raise EncodeError("Structures must be aligned, i.e. bit_position=0, but "
f"{self.short_name} was passed the bit position {bit_position}")
return self.convert_physical_to_internal(
param_values,
physical_value,
triggering_coded_request=encode_state.triggering_request,
is_end_of_pdu=encode_state.is_end_of_pdu,
)
Expand Down
5 changes: 1 addition & 4 deletions odxtools/cli/_parser_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: MIT
import argparse
from typing import Any, Protocol, TypeVar, runtime_checkable
from typing import Any, Protocol, runtime_checkable

from ..database import Database
from ..load_file import load_file as _load_file
Expand All @@ -22,9 +22,6 @@ def add_parser(self, name: str, **kwargs: Any) -> "argparse.ArgumentParser":
...


TSubparsersAction = TypeVar("TSubparsersAction", bound=SubparsersList)


def add_pdx_argument(parser: argparse.ArgumentParser, is_optional: bool = False) -> None:
parser.add_argument(
"pdx_file",
Expand Down
1 change: 1 addition & 0 deletions odxtools/cli/_print_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def extract_parameter_tabulation_data(parameters: List[Parameter]) -> Dict[str,
byte.append(param.byte_position)
semantic.append(param.semantic)
param_type.append(param.parameter_type)
length = 0
if param.get_static_bit_length() is not None:
bit_length.append(param.get_static_bit_length())
length = (param.get_static_bit_length() or 0) // 4
Expand Down
31 changes: 17 additions & 14 deletions odxtools/cli/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from typing import List, Optional, Union, cast

import InquirerPy.prompt as PI_prompt
import InquirerPy.prompt as IP_prompt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What? are you sure?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the history of this is that in its current state pyright, complains about this:

$ pyright
/home/and/src/odxtools/odxtools/cli/browse.py
  /home/and/src/odxtools/odxtools/cli/browse.py:7:8 - error: Import "InquirerPy.prompt" could not be resolved (reportMissingImports)
  /home/and/src/odxtools/odxtools/cli/browse.py:100:14 - error: Module is not callable (reportCallIssue)
[...]

while if this issue is fixed by replacing the line with from InquirerPy import prompt as IP_prompt # type: ignore[attr-defined], mypy complains about the # type: ignore comment being unnecessary (at least on the GH actions runner, locally it is fine on my machine). the PI to IP rename is a residual of that, because I thought that if I have to change that line anyway, I could use a name that makes sense...

from tabulate import tabulate # TODO: switch to rich tables

from ..database import Database
Expand All @@ -20,7 +20,7 @@
from ..request import Request
from ..response import Response
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import extract_parameter_tabulation_data

# name of the tool
Expand Down Expand Up @@ -89,13 +89,15 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp
# else _convert_string_to_odx_type(x, p.physical_type.base_data_type, param=p) # This does not work because the next parameter to be promted is used (for some reason?)
}]

if hasattr(parameter, "dop") and hasattr(parameter.dop, "compu_method") \
and hasattr(parameter.dop.compu_method, "get_scales"):
scales = parameter.dop.compu_method.get_scales()
if (dop := getattr(parameter, "dop", None)) and \
(compu_method := getattr(dop, "compu_method", None)):
scales = compu_method.internal_to_phys
choices = [scale.compu_const for scale in scales if scale is not None]
if (cdv := compu_method.compu_default_value) is not None:
choices.append(cdv.compu_const)
param_prompt[0]["choices"] = choices

answer = PI_prompt(param_prompt)
answer = IP_prompt(param_prompt)
if answer.get(parameter.short_name) == "" and not parameter.is_required:
return None
elif parameter.physical_type.base_data_type is not None:
Expand All @@ -105,7 +107,7 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp
logging.warning(
f"Parameter {parameter.short_name} does not have a physical data type. Param details: {parameter}"
)
return answer.get(parameter.short_name)
return cast(str, answer.get(parameter.short_name))


def encode_message_interactively(sub_service: Union[Request, Response],
Expand All @@ -118,7 +120,7 @@ def encode_message_interactively(sub_service: Union[Request, Response],
for param_or_dict in param_dict.values():
if isinstance(param_or_dict, dict):
for param in param_or_dict.values():
if not isinstance(param_or_dict, dict) and param.is_settable:
if isinstance(param, Parameter) and param.is_settable:
exists_definable_param = True
elif param_or_dict.is_settable:
exists_definable_param = True
Expand All @@ -133,10 +135,11 @@ def encode_message_interactively(sub_service: Union[Request, Response],
"message": f"Do you want to encode a message? [y/n]",
"choices": ["yes", "no"],
}]
answer = PI_prompt(encode_message_prompt)
answer = IP_prompt(encode_message_prompt)
if answer.get("yes_no_prompt") == "no":
return

answered_request = b''
if isinstance(sub_service, Response):
answered_request_prompt = [{
"type":
Expand All @@ -148,7 +151,7 @@ def encode_message_interactively(sub_service: Union[Request, Response],
"filter":
lambda input: _convert_string_to_bytes(input),
}]
answer = PI_prompt(answered_request_prompt)
answer = IP_prompt(answered_request_prompt)
answered_request = cast(bytes, answer.get("request"))
print(f"Input interpretation as list: {list(answered_request)}")

Expand Down Expand Up @@ -268,7 +271,7 @@ def browse(odxdb: Database) -> None:
"message": "Select a Variant.",
"choices": list(dl_names) + ["[exit]"],
}]
answer = PI_prompt(selection)
answer = IP_prompt(selection)
if answer.get("variant") == "[exit]":
return

Expand Down Expand Up @@ -306,7 +309,7 @@ def browse(odxdb: Database) -> None:
f"The variant {variant.short_name} offers the following services. Select one!",
"choices": [s.short_name for s in services] + ["[back]"],
}]
answer = PI_prompt(selection)
answer = IP_prompt(selection)
if answer.get("service") == "[back]":
break

Expand Down Expand Up @@ -341,7 +344,7 @@ def browse(odxdb: Database) -> None:
"short": f"Negative response: {nr.short_name}",
} for nr in service.negative_responses] + ["[back]"], # type: ignore
}]
answer = PI_prompt(selection)
answer = IP_prompt(selection)
if answer.get("message_type") == "[back]":
continue

Expand All @@ -355,7 +358,7 @@ def browse(odxdb: Database) -> None:
encode_message_interactively(codec, ask_user_confirmation=True)


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
# Browse interactively to avoid spamming the console.
parser = subparsers.add_parser(
"browse",
Expand Down
14 changes: 8 additions & 6 deletions odxtools/cli/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ..parameters.physicalconstantparameter import PhysicalConstantParameter
from ..parameters.valueparameter import ValueParameter
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import (extract_service_tabulation_data, print_dl_metrics,
print_service_parameters)

Expand Down Expand Up @@ -482,11 +482,13 @@ def compare_diagnostic_layers(self, dl1: DiagLayer,

# check for deleted diagnostic services
if service2.short_name not in dl1_service_names and dl2_request_prefixes[
service2_idx] not in dl1_request_prefixes and service2 not in service_dict[
"deleted_services"]:
service2_idx] not in dl1_request_prefixes:

service_dict["deleted_services"].append( # type: ignore[union-attr]
service2) # type: ignore[arg-type]
deleted_list = service_dict["deleted_services"]
assert isinstance(deleted_list, list)
if service2 not in deleted_list:
service_dict["deleted_services"].append( # type: ignore[union-attr]
service2) # type: ignore[arg-type]

if service1.short_name == service2.short_name:
# compare request, pos. response and neg. response parameters of both diagnostic services
Expand Down Expand Up @@ -545,7 +547,7 @@ def compare_databases(self, database_new: Database,
return changes_variants


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"compare",
description="\n".join([
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..odxtypes import ParameterValue
from ..singleecujob import SingleEcuJob
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList

# name of the tool
_odxtools_tool_name_ = "decode"
Expand Down Expand Up @@ -71,7 +71,7 @@ def print_summary(
print(f" {param_name}={get_display_value(param_value)}")


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"decode",
description="\n".join([
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/dummy_sub_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import traceback
from io import StringIO

from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList


class DummyTool:
Expand All @@ -22,7 +22,7 @@ def __init__(self, tool_name: str, error: Exception):
self._odxtools_tool_name_ = tool_name
self._error = error

def add_subparser(self, subparser_list: TSubparsersAction) -> None:
def add_subparser(self, subparser_list: SubparsersList) -> None:
desc = StringIO()

print(f"Tool '{self._odxtools_tool_name_}' is unavailable: {self._error}", file=desc)
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..odxtypes import ParameterValue
from ..singleecujob import SingleEcuJob
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import print_diagnostic_service

# name of the tool
Expand Down Expand Up @@ -67,7 +67,7 @@ def print_summary(odxdb: Database,
print(f"Unknown service: {service}")


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"find",
description="\n".join([
Expand Down
4 changes: 2 additions & 2 deletions odxtools/cli/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..diagservice import DiagService
from ..singleecujob import SingleEcuJob
from . import _parser_utils
from ._parser_utils import TSubparsersAction
from ._parser_utils import SubparsersList
from ._print_utils import format_desc, print_diagnostic_service, print_dl_metrics

# name of the tool
Expand Down Expand Up @@ -112,7 +112,7 @@ def print_summary(odxdb: Database,
rich.print(f" {com_param.short_name}: {com_param.value}")


def add_subparser(subparsers: TSubparsersAction) -> None:
def add_subparser(subparsers: SubparsersList) -> None:
parser = subparsers.add_parser(
"list",
description="\n".join([
Expand Down
1 change: 1 addition & 0 deletions odxtools/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, List

import odxtools
import odxtools.exceptions

from ..version import __version__ as odxtools_version
from .dummy_sub_parser import DummyTool
Expand Down
Loading
Loading