Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanups #263

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified examples/somersault.pdx
Binary file not shown.
12 changes: 12 additions & 0 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,18 @@ class SomersaultSID(IntEnum):
bit_position=None,
sdgs=[],
),
ValueParameter(
short_name="sault_time",
long_name=None,
semantic=None,
description=None,
physical_default_value_raw="255",
byte_position=2,
dop_ref=OdxLinkRef("somersault.DOP.duration", doc_frags),
dop_snref=None,
bit_position=None,
sdgs=[],
),
]),
byte_size=None,
),
Expand Down
50 changes: 36 additions & 14 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .dataobjectproperty import DataObjectProperty
from .decodestate import DecodeState
from .encodestate import EncodeState
from .exceptions import DecodeError, EncodeError, OdxWarning, odxassert, odxraise
from .exceptions import DecodeError, EncodeError, OdxWarning, odxassert, odxraise, strict_mode
from .nameditemlist import NamedItemList
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
from .odxtypes import ParameterDict, ParameterValue, ParameterValueDict
Expand Down Expand Up @@ -74,10 +74,10 @@ def get_static_bit_length(self) -> Optional[int]:
def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
prefix = b''
encode_state = EncodeState(prefix, parameter_values={}, triggering_request=request_prefix)
for p in self.parameters:
if isinstance(p, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter,
PhysicalConstantParameter)):
encode_state.coded_message = p.encode_into_pdu(encode_state)
for param in self.parameters:
if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter,
PhysicalConstantParameter)):
encode_state.coded_message = param.encode_into_pdu(encode_state)
else:
break
return encode_state.coded_message
Expand Down Expand Up @@ -124,6 +124,13 @@ def convert_physical_to_internal(self,
f"Expected a dictionary for the values of structure {self.short_name}, "
f"got {type(param_value)}")

# in strict mode, ensure that no values for unknown parameters are specified.
if strict_mode:
param_names = [param.short_name for param in self.parameters]
for param_key in param_value:
if param_key not in param_names:
odxraise(f"Value for unknown parameter '{param_key}' specified")

encode_state = EncodeState(
b'',
dict(param_value),
Expand All @@ -139,6 +146,21 @@ def convert_physical_to_internal(self,
# the ODX is located last in the PDU...
encode_state.is_end_of_pdu = is_end_of_pdu

if isinstance(
param,
(LengthKeyParameter, TableKeyParameter)) and param.short_name in param_value:
# This is a hack to always encode a dummy value for
# length- and table keys. since these can be specified
# implicitly (i.e., by means of parameters that use
# these keys), to avoid getting an "overlapping
# parameter" warning, we must encode a value of zero
# into the PDU here and add the real value of the
# parameter in a post processing step.
tmp = encode_state.parameter_values.pop(param.short_name)
encode_state.coded_message = param.encode_into_pdu(encode_state)
encode_state.parameter_values[param.short_name] = tmp
continue

encode_state.coded_message = param.encode_into_pdu(encode_state)

if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size:
Expand Down Expand Up @@ -213,10 +235,10 @@ def convert_bytes_to_physical(self,
inner_decode_state = DecodeState(
coded_message=byte_code, parameter_values={}, cursor_position=0)

for parameter in self.parameters:
value, cursor_position = parameter.decode_from_pdu(inner_decode_state)
for param in self.parameters:
value, cursor_position = param.decode_from_pdu(inner_decode_state)

inner_decode_state.parameter_values[parameter.short_name] = value
inner_decode_state.parameter_values[param.short_name] = value
inner_decode_state = DecodeState(
coded_message=byte_code,
parameter_values=inner_decode_state.parameter_values,
Expand Down Expand Up @@ -282,21 +304,21 @@ def parameter_dict(self) -> ParameterDict:
def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
result = super()._build_odxlinks()

for p in self.parameters:
result.update(p._build_odxlinks())
for param in self.parameters:
result.update(param._build_odxlinks())

return result

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
"""Recursively resolve any references (odxlinks or sn-refs)"""
super()._resolve_odxlinks(odxlinks)

for p in self.parameters:
p._resolve_odxlinks(odxlinks)
for param in self.parameters:
param._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:
"""Recursively resolve any references (odxlinks or sn-refs)"""
super()._resolve_snrefs(diag_layer)

for p in self.parameters:
p._resolve_snrefs(diag_layer)
for param in self.parameters:
param._resolve_snrefs(diag_layer)
31 changes: 20 additions & 11 deletions odxtools/cli/snoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import odxtools.uds as uds
from odxtools.exceptions import DecodeError
from odxtools.isotp_state_machine import IsoTpStateMachine
from odxtools.response import Response, ResponseType

from . import _parser_utils

Expand All @@ -31,7 +32,7 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None:

if telegram_id == ecu_tx_id:
if uds.is_response_pending(payload):
print(f" -> ECU: ... (response pending)")
print(f" ... (response pending)")
return

decoded_message = None
Expand All @@ -48,21 +49,29 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None:
if len(decoded_message) > 1:
dec_str = f" (decoding {i+1})"

response_type = getattr(resp.coding_object, "response_type", None)
rt_str = "unknown"
if response_type == "POS-RESPONSE":
rt_str = "positive"
elif response_type == "NEG-RESPONSE":
rt_str = "negative"

print(f" -> {rt_str} ECU response{dec_str}: \"{resp.coding_object.short_name}\"")
if isinstance(resp.coding_object, Response):
if resp.coding_object.response_type == ResponseType.POSITIVE:
rt_str = "positive"
elif resp.coding_object.response_type in (ResponseType.NEGATIVE,
ResponseType.GLOBAL_NEGATIVE):
rt_str = "negative"

settable_params = []
for param_name, param_val in resp.param_dict.items():
param = [x for x in params if x.short_name == param_name][0]
if not param.is_settable:
continue
print(f" {param_name} = {repr(param_val)}")
settable_params.append((param_name, param_val))

if settable_params:
print(f" {rt_str} response{dec_str} {resp.coding_object.short_name}:")
for param_name, param_val in settable_params:
print(f" {param_name} = {repr(param_val)}")
else:
print(f" {rt_str} response{dec_str} {resp.coding_object.short_name}")
else:
print(f" -> ECU response: unrecognized response of {len(payload)} bytes length: "
print(f" unrecognized response of {len(payload)} bytes length: "
f"0x{payload.hex()}")

return
Expand All @@ -76,7 +85,7 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None:
last_request = None

if decoded_message is not None:
print(f"tester request: \"{decoded_message.coding_object.short_name}\"")
print(f"request {decoded_message.coding_object.short_name}:")
params = decoded_message.coding_object.parameters
for param_name, param_val in decoded_message.param_dict.items():
param = [x for x in params if x.short_name == param_name][0]
Expand Down
11 changes: 10 additions & 1 deletion odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@

@dataclass
class LeadingLengthInfoType(DiagCodedType):
#: bit length of the length specifier field
#:
#: this is then followed by the number of bytes specified by that
#: field, i.e., this is NOT the length of the LeadingLengthInfoType
#: object.
bit_length: int

def __post_init__(self) -> None:
Expand All @@ -31,7 +36,11 @@ def dct_type(self) -> DctType:
return "LEADING-LENGTH-INFO-TYPE"

def get_static_bit_length(self) -> Optional[int]:
return self.bit_length
# note that self.bit_length is just the length of the length
# specifier field. This is then followed by the same number of
# bytes as the value of this field, i.e., the length of this
# DCT is dynamic!
return None

def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeState,
bit_position: int) -> bytes:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_singleecujob.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ def test_resolve_odxlinks(self) -> None:
self.assertEqual(self.context.specialAudience,
odxrequire(self.singleecujob_object.audience).enabled_audiences[0])

self.assertEqual(self.context.inputDOP, self.singleecujob_object.input_params[0].dop)
self.assertEqual(self.context.outputDOP, self.singleecujob_object.output_params[0].dop)
self.assertEqual(self.context.inputDOP, self.singleecujob_object.input_params[0].dop_base)
self.assertEqual(self.context.outputDOP, self.singleecujob_object.output_params[0].dop_base)
self.assertEqual(self.context.negOutputDOP,
self.singleecujob_object.neg_output_params[0].dop)

Expand Down
42 changes: 30 additions & 12 deletions tests/test_somersault.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
import unittest

from odxtools.exceptions import odxrequire
from odxtools.exceptions import OdxError, odxrequire
from odxtools.load_pdx_file import load_pdx_file
from odxtools.parameters.nrcconstparameter import NrcConstParameter

Expand Down Expand Up @@ -145,9 +145,11 @@ def test_somersault_lazy(self) -> None:
self.assertEqual([x.short_name for x in service.negative_responses], ["flips_not_done"])

pr = service.positive_responses.grudging_forward
self.assertEqual([x.short_name for x in pr.parameters], ["sid", "num_flips_done"])
self.assertEqual([x.short_name for x in pr.parameters],
["sid", "num_flips_done", "sault_time"])
self.assertEqual([x.short_name for x in pr.required_parameters], [])
self.assertEqual(pr.get_static_bit_length(), 16)
self.assertEqual([x.short_name for x in pr.free_parameters], ["sault_time"])
self.assertEqual(pr.get_static_bit_length(), 24)

nr = service.negative_responses.flips_not_done
self.assertEqual(
Expand All @@ -162,7 +164,16 @@ def test_somersault_lazy(self) -> None:
self.assertEqual(nrc_const.coded_values, [0, 1, 2])


class TestDecode(unittest.TestCase):
class TestEnDecode(unittest.TestCase):

def test_encode_specify_unknown_param(self) -> None:
ecu = odxdb.ecus.somersault_lazy
service = ecu.services.do_forward_flips
request = odxrequire(service.request)
with self.assertRaises(OdxError) as eo:
request.encode(forward_soberness_check=0x12, num_flips=5, grass_level="what grass?")

self.assertEqual(str(eo.exception), "Value for unknown parameter 'grass_level' specified")

def test_decode_request(self) -> None:
messages = odxdb.ecus.somersault_assiduous.decode(bytes([0x03, 0x45]))
Expand Down Expand Up @@ -226,9 +237,9 @@ def test_code_table_params(self) -> None:
dizzyness_level=42,
happiness_level=92,
last_pos_response=("forward_grudging", {
"dizzyness_level": 42
"sault_time": 249
}))
self.assertEqual(resp_data.hex(), "622a5c03fa7b")
self.assertEqual(resp_data.hex(), "622a5c03fa7bf9")

decoded_resp_data = pr.decode(resp_data)
assert isinstance(decoded_resp_data, dict)
Expand All @@ -241,21 +252,24 @@ def test_code_table_params(self) -> None:
self.assertEqual(
set(decoded_resp_data["last_pos_response"]
[1].keys()), # type: ignore[index, union-attr]
{"sid", "num_flips_done"})
{"sid", "num_flips_done", "sault_time"})
# the num_flips_done parameter is a matching request parameter
# for this response, so it produces a binary blob. possibly,
# it should be changed to a ValueParameter...
self.assertEqual(
decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload]
["num_flips_done"], # type: ignore[index, call-overload]
bytes([123]))
self.assertEqual(
decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload]
["sault_time"], # type: ignore[index, call-overload]
249)

# test the "backward flips grudgingly done" response
resp_data = pr.encode(
dizzyness_level=75,
happiness_level=3,
last_pos_response=("backward_grudging", {
'dizzyness_level': 75,
'num_flips_done': 5,
'grumpiness_level': 150
}))
Expand Down Expand Up @@ -310,14 +324,14 @@ def test_free_param_info(self) -> None:

with patch("sys.stdout", stdout):
pos_response.print_free_parameters_info()
expected_output = "forward_soberness_check: uint8\nnum_flips: uint8\n"
expected_output = "forward_soberness_check: uint8\nnum_flips: uint8\nsault_time: uint8\n"
actual_output = stdout.getvalue()
self.assertEqual(actual_output, expected_output)

with patch("sys.stdout", stdout):
neg_response.print_free_parameters_info()
expected_output = (
"forward_soberness_check: uint8\nnum_flips: uint8\nflips_successfully_done: uint8\n"
"forward_soberness_check: uint8\nnum_flips: uint8\nsault_time: uint8\nflips_successfully_done: uint8\n"
)
actual_output = stdout.getvalue()
self.assertEqual(actual_output, expected_output)
Expand All @@ -336,9 +350,13 @@ def test_decode_response(self) -> None:
f"There should be only one service for 0x0145 but there are: {messages}",
)
m = messages[0]
self.assertEqual(m.coded_message, bytes([0xFA, 0x03]))
self.assertEqual(m.coded_message.hex(), "fa03ff")
self.assertEqual(m.coding_object, pos_response)
self.assertEqual(m.param_dict, {"sid": 0xFA, "num_flips_done": bytearray([0x03])})
self.assertEqual(m.param_dict, {
"sid": 0xFA,
"num_flips_done": bytearray([0x03]),
"sault_time": 255
})


class TestNavigation(unittest.TestCase):
Expand Down
Loading