Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor decoding, part 2 #265

Merged
merged 7 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,21 @@ def convert_bytes_to_physical(self,

# move the origin since positions specified by sub-parameters of
# structures are relative to the beginning of the structure object.
orig_origin = decode_state.origin_position
decode_state.origin_position = decode_state.cursor_position
orig_origin = decode_state.origin_byte_position
decode_state.origin_byte_position = decode_state.cursor_byte_position

result = {}
for param in self.parameters:
value, cursor_position = param.decode_from_pdu(decode_state)
value, cursor_byte_position = param.decode_from_pdu(decode_state)

result[param.short_name] = value
decode_state.cursor_position = max(decode_state.cursor_position, cursor_position)
decode_state.cursor_byte_position = max(decode_state.cursor_byte_position,
cursor_byte_position)

# decoding of the structure finished. move back the origin.
decode_state.origin_position = orig_origin
decode_state.origin_byte_position = orig_origin

return result, decode_state.cursor_position
return result, decode_state.cursor_byte_position

def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes:
"""
Expand All @@ -267,14 +268,14 @@ def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue
def decode(self, message: bytes) -> ParameterValueDict:
# dummy decode state to be passed to convert_bytes_to_physical
decode_state = DecodeState(coded_message=message)
param_values, cursor_position = self.convert_bytes_to_physical(decode_state)
param_values, cursor_byte_position = self.convert_bytes_to_physical(decode_state)
if not isinstance(param_values, dict):
odxraise(f"Decoding a structure must result in a dictionary of parameter "
f"values (is {type(param_values)})")
if len(message) != cursor_position:
if len(message) != cursor_byte_position:
warnings.warn(
f"The message {message.hex()} is longer than could be parsed."
f" Expected {cursor_position} but got {len(message)}.",
f" Expected {cursor_byte_position} but got {len(message)}.",
DecodeError,
stacklevel=1,
)
Expand Down
2 changes: 1 addition & 1 deletion odxtools/cli/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from typing import List, Optional, Union

import PyInquirer.prompt as PI_prompt
from InquirerPy import prompt as PI_prompt
from tabulate import tabulate # TODO: switch to rich tables

from ..database import Database
Expand Down
13 changes: 11 additions & 2 deletions odxtools/cli/dummy_sub_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: MIT
import argparse
import sys
import traceback
from io import StringIO


class DummyTool:
Expand All @@ -19,10 +21,17 @@ def __init__(self, tool_name: str, error: Exception):
self._error = error

def add_subparser(self, subparser_list: "argparse._SubParsersAction") -> None:
desc = StringIO()

print(f"Tool '{self._odxtools_tool_name_}' is unavailable: {self._error}", file=desc)
print(file=desc)
print(f"Traceback:", file=desc)
traceback.print_tb(self._error.__traceback__, file=desc)

subparser_list.add_parser(
self._odxtools_tool_name_,
description=f"Tool '{self._odxtools_tool_name_}' is unavailable: {self._error}",
help="Dummy tool",
description=desc.getvalue(),
help="Tool unavailable",
formatter_class=argparse.RawTextHelpFormatter,
)

Expand Down
7 changes: 4 additions & 3 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ def convert_bytes_to_physical(self,
"""
odxassert(0 <= bit_position and bit_position < 8)

internal, cursor_position = self.diag_coded_type.convert_bytes_to_internal(
decode_state, bit_position=bit_position)
decode_state.cursor_bit_position = bit_position
internal = self.diag_coded_type.decode_from_pdu(decode_state)

if self.compu_method.is_valid_internal_value(internal):
return self.compu_method.convert_internal_to_physical(internal), cursor_position
return self.compu_method.convert_internal_to_physical(
internal), decode_state.cursor_byte_position
else:
# TODO: How to prevent this?
raise DecodeError(
Expand Down
10 changes: 8 additions & 2 deletions odxtools/decodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ class DecodeState:
#:
#: i.e., the absolute byte position to which all relative positions
#: refer to, e.g. the position of the first byte of a structure.
origin_position: int = 0
origin_byte_position: int = 0

#: Absolute position of the next undecoded byte to be considered
#:
#: (if not explicitly specified by the object to be decoded.)
cursor_position: int = 0
cursor_byte_position: int = 0

#: the bit position [0, 7] where the object to be extracted begins
#:
#: If bit position is undefined (`None`), the object to be extracted
#: starts at bit 0.
cursor_bit_position: int = 0

#: values of the length key parameters decoded so far
length_keys: Dict[str, int] = field(default_factory=dict)
Expand Down
10 changes: 4 additions & 6 deletions odxtools/diagcodedtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def _extract_internal_value(
byte_length = (bit_length + bit_position + 7) // 8
if byte_position + byte_length > len(coded_message):
raise DecodeError(f"Expected a longer message.")
cursor_position = byte_position + byte_length
extracted_bytes = coded_message[byte_position:cursor_position]
cursor_byte_position = byte_position + byte_length
extracted_bytes = coded_message[byte_position:cursor_byte_position]

# Apply byteorder for numerical objects. Note that doing this
# here might lead to garbage data being included in the result
Expand Down Expand Up @@ -129,7 +129,7 @@ def _extract_internal_value(
text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)

return internal_value, cursor_position
return internal_value, cursor_byte_position

@staticmethod
def _encode_internal_value(
Expand Down Expand Up @@ -254,9 +254,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:
pass

@abc.abstractmethod
def convert_bytes_to_internal(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[AtomicOdxType, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
"""Decode the parameter value from the coded message.

Parameters
Expand Down
8 changes: 4 additions & 4 deletions odxtools/dtcdop.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:

int_trouble_code, cursor_position = self.diag_coded_type.convert_bytes_to_internal(
decode_state, bit_position=bit_position)
decode_state.cursor_bit_position = bit_position
int_trouble_code = self.diag_coded_type.decode_from_pdu(decode_state)

if self.compu_method.is_valid_internal_value(int_trouble_code):
trouble_code = self.compu_method.convert_internal_to_physical(int_trouble_code)
Expand All @@ -110,7 +110,7 @@ def convert_bytes_to_physical(self,

if len(dtcs) == 1:
# we found exactly one described DTC
return dtcs[0], cursor_position
return dtcs[0], decode_state.cursor_byte_position

# the DTC was not specified. This probably means that the
# diagnostic description file is incomplete. We do not bail
Expand All @@ -129,7 +129,7 @@ def convert_bytes_to_physical(self,
sdgs=[],
)

return dtc, cursor_position
return dtc, decode_state.cursor_byte_position

def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState,
bit_position: int) -> bytes:
Expand Down
10 changes: 5 additions & 5 deletions odxtools/endofpdufield.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ def convert_physical_to_bytes(
def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
cursor_position = decode_state.cursor_position
cursor_byte_position = decode_state.cursor_byte_position

value = []
while cursor_position < len(decode_state.coded_message):
while cursor_byte_position < len(decode_state.coded_message):
# ATTENTION: the ODX specification is very misleading
# here: it says that the item is repeated until the end of
# the PDU, but it means that DOP of the items that are
# repeated are identical, not their values
new_value, cursor_position = self.structure.convert_bytes_to_physical(
new_value, cursor_byte_position = self.structure.convert_bytes_to_physical(
decode_state, bit_position=0)
# Update next byte_position
decode_state.cursor_position = cursor_position
decode_state.cursor_byte_position = cursor_byte_position
value.append(new_value)

return value, cursor_position
return value, cursor_byte_position
16 changes: 8 additions & 8 deletions odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Any, Optional, Tuple
from typing import Any, Optional

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
Expand Down Expand Up @@ -65,28 +65,27 @@ def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeSta

return length_bytes + value_bytes

def convert_bytes_to_internal(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[AtomicOdxType, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
coded_message = decode_state.coded_message

# Extract length of the parameter value
byte_length, byte_position = self._extract_internal_value(
coded_message=coded_message,
byte_position=decode_state.cursor_position,
bit_position=bit_position,
byte_position=decode_state.cursor_byte_position,
bit_position=decode_state.cursor_bit_position,
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32, # length is an integer
is_highlow_byte_order=self.is_highlow_byte_order,
)
decode_state.cursor_bit_position = 0

if not isinstance(byte_length, int):
odxraise()

# Extract actual value
# TODO: The returned value is None if the byte_length is 0. Maybe change it
# to some default value like an empty bytearray() or 0?
value, cursor_position = self._extract_internal_value(
value, cursor_byte_position = self._extract_internal_value(
coded_message=coded_message,
byte_position=byte_position,
bit_position=0,
Expand All @@ -95,4 +94,5 @@ def convert_bytes_to_internal(self,
is_highlow_byte_order=self.is_highlow_byte_order,
)

return value, cursor_position
decode_state.cursor_byte_position = cursor_byte_position
return value
25 changes: 15 additions & 10 deletions odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Optional, Tuple
from typing import Optional

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
Expand Down Expand Up @@ -100,14 +100,12 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:

return value_bytes

def convert_bytes_to_internal(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[AtomicOdxType, int]:
if decode_state.cursor_position + self.min_length > len(decode_state.coded_message):
def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType:
if decode_state.cursor_byte_position + self.min_length > len(decode_state.coded_message):
raise DecodeError("The PDU ended before minimum length was reached.")

coded_message = decode_state.coded_message
cursor_pos = decode_state.cursor_position
cursor_pos = decode_state.cursor_byte_position
termination_seq = self.__termination_sequence()

max_terminator_pos = len(coded_message)
Expand Down Expand Up @@ -148,7 +146,7 @@ def convert_bytes_to_internal(self,
value, byte_pos = self._extract_internal_value(
decode_state.coded_message,
byte_position=cursor_pos,
bit_position=bit_position,
bit_position=0,
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand All @@ -158,7 +156,10 @@ def convert_bytes_to_internal(self,
byte_pos += len(termination_seq)

# next byte starts after the actual data and the termination sequence
return value, byte_pos
decode_state.cursor_byte_position = byte_pos
decode_state.cursor_bit_position = 0

return value
else:
# If termination == "END-OF-PDU", the parameter ends after max_length
# or at the end of the PDU.
Expand All @@ -167,9 +168,13 @@ def convert_bytes_to_internal(self,
value, byte_pos = self._extract_internal_value(
decode_state.coded_message,
byte_position=cursor_pos,
bit_position=bit_position,
bit_position=0,
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)
return value, byte_pos

decode_state.cursor_byte_position = byte_pos
decode_state.cursor_bit_position = 0

return value
6 changes: 3 additions & 3 deletions odxtools/multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ def convert_bytes_to_physical(self,

case_decode_state = copy(decode_state)
if self.byte_position is not None:
case_decode_state.origin_position = decode_state.origin_position + self.byte_position
case_decode_state.origin_byte_position = decode_state.origin_byte_position + self.byte_position
else:
case_decode_state.origin_position = decode_state.cursor_position
case_decode_state.origin_byte_position = decode_state.cursor_byte_position

case_found = False
case_next_byte = 0
Expand All @@ -144,7 +144,7 @@ def convert_bytes_to_physical(self,
f"Failed to find a matching case in {self.short_name} for value {key_value!r}")

mux_value = {case.short_name: cast(ParameterValue, case_value)}
mux_next_byte = decode_state.cursor_position + max(
mux_next_byte = decode_state.cursor_byte_position + max(
key_next_byte + self.switch_key.byte_position, case_next_byte + self.byte_position)
return mux_value, mux_next_byte

Expand Down
16 changes: 8 additions & 8 deletions odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,28 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]:
# Extract coded values
orig_cursor_pos = decode_state.cursor_position
orig_cursor_pos = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_position = decode_state.origin_position + self.byte_position
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

coded_val, cursor_position = self.diag_coded_type.convert_bytes_to_internal(
decode_state, bit_position=self.bit_position or 0)

decode_state.cursor_position = orig_cursor_pos
decode_state.cursor_bit_position = self.bit_position or 0
coded_val = self.diag_coded_type.decode_from_pdu(decode_state)

# Check if the coded value in the message is correct.
if self.coded_value != coded_val:
warnings.warn(
f"Coded constant parameter does not match! "
f"The parameter {self.short_name} expected coded "
f"value {str(self._coded_value_str)} but got {str(coded_val)} "
f"at byte position {decode_state.cursor_position} "
f"at byte position {decode_state.cursor_byte_position} "
f"in coded message {decode_state.coded_message.hex()}.",
DecodeError,
stacklevel=1,
)

return coded_val, cursor_position
decode_state.cursor_byte_position = max(orig_cursor_pos, decode_state.cursor_byte_position)

return coded_val, decode_state.cursor_byte_position

@property
def _coded_value_str(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions odxtools/parameters/lengthkeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
return super().encode_into_pdu(encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]:
phys_val, cursor_position = super().decode_from_pdu(decode_state)
phys_val, cursor_byte_position = super().decode_from_pdu(decode_state)

if not isinstance(phys_val, int):
odxraise(f"The pysical type of length keys must be an integer, "
f"(is {type(phys_val).__name__})")
decode_state.length_keys[self.short_name] = phys_val

return phys_val, cursor_position
return phys_val, cursor_byte_position
Loading
Loading