Skip to content

Commit

Permalink
Merge pull request #266 from andlaus/refactor_decoding3
Browse files Browse the repository at this point in the history
Refactor decoding, part 3
  • Loading branch information
andlaus authored Feb 15, 2024
2 parents 911df59 + 6b49fbb commit c8221bb
Show file tree
Hide file tree
Showing 24 changed files with 166 additions and 184 deletions.
42 changes: 17 additions & 25 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from xml.etree import ElementTree

from .complexdop import ComplexDop
Expand Down Expand Up @@ -225,30 +225,22 @@ def convert_physical_to_bytes(self,
is_end_of_pdu=encode_state.is_end_of_pdu,
)

def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
if bit_position != 0:
raise DecodeError("Structures must be aligned, i.e. bit_position=0, but "
f"{self.short_name} was passed the bit position {bit_position}")

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
# move the origin since positions specified by sub-parameters of
# structures are relative to the beginning of the structure object.
orig_origin = decode_state.origin_byte_position
decode_state.origin_byte_position = decode_state.cursor_byte_position

result = {}
for param in self.parameters:
value, cursor_byte_position = param.decode_from_pdu(decode_state)
value = param.decode_from_pdu(decode_state)

result[param.short_name] = value
decode_state.cursor_byte_position = max(decode_state.cursor_byte_position,
cursor_byte_position)

# decoding of the structure finished. move back the origin.
# decoding of the structure finished. go back the original origin.
decode_state.origin_byte_position = orig_origin

return result, decode_state.cursor_byte_position
return result

def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes:
"""
Expand All @@ -266,20 +258,20 @@ def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue
is_end_of_pdu=True)

def decode(self, message: bytes) -> ParameterValueDict:
# dummy decode state to be passed to convert_bytes_to_physical
decode_state = DecodeState(coded_message=message)
param_values, cursor_byte_position = self.convert_bytes_to_physical(decode_state)
param_values = self.decode_from_pdu(decode_state)

if len(message) != decode_state.cursor_byte_position:
odxraise(
f"The message {message.hex()} probably could not be completely parsed:"
f" Expected length of {decode_state.cursor_byte_position} but got {len(message)}.",
DecodeError)
return {}

if not isinstance(param_values, dict):
odxraise(f"Decoding a structure must result in a dictionary of parameter "
f"values (is {type(param_values)})")
if len(message) != cursor_byte_position:
warnings.warn(
f"The message {message.hex()} is longer than could be parsed."
f" Expected {cursor_byte_position} but got {len(message)}.",
DecodeError,
stacklevel=1,
)
return param_values
odxraise("Decoding structures must result in a dictionary")

return cast(ParameterValueDict, param_values)

def parameter_dict(self) -> ParameterDict:
"""
Expand Down
12 changes: 3 additions & 9 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from xml.etree import ElementTree

from .compumethods.compumethod import CompuMethod
Expand Down Expand Up @@ -133,22 +133,16 @@ def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeSta
return self.diag_coded_type.convert_internal_to_bytes(
internal_val, encode_state, bit_position=bit_position)

def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[Any, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""
Convert the internal representation of a value into its physical value.
Returns a (physical_value, start_position_of_next_parameter) tuple.
"""
odxassert(0 <= bit_position and bit_position < 8)

decode_state.cursor_bit_position = bit_position
internal = self.diag_coded_type.decode_from_pdu(decode_state)

if self.compu_method.is_valid_internal_value(internal):
return self.compu_method.convert_internal_to_physical(
internal), decode_state.cursor_byte_position
return self.compu_method.convert_internal_to_physical(internal)
else:
# TODO: How to prevent this?
raise DecodeError(
Expand Down
6 changes: 2 additions & 4 deletions odxtools/dopbase.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from xml.etree import ElementTree

from .admindata import AdminData
Expand Down Expand Up @@ -69,8 +69,6 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state
"""Convert the physical value into bytes."""
raise NotImplementedError

def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""Extract the bytes from the PDU and convert them to the physical value."""
raise NotImplementedError
11 changes: 4 additions & 7 deletions odxtools/dtcdop.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
# from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
from xml.etree import ElementTree

from .compumethods.compumethod import CompuMethod
Expand Down Expand Up @@ -87,11 +87,8 @@ def is_visible(self) -> bool:
def linked_dtc_dops(self) -> NamedItemList["DtcDop"]:
return self._linked_dtc_dops

def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:

decode_state.cursor_bit_position = bit_position
int_trouble_code = self.diag_coded_type.decode_from_pdu(decode_state)

if self.compu_method.is_valid_internal_value(int_trouble_code):
Expand All @@ -110,7 +107,7 @@ def convert_bytes_to_physical(self,

if len(dtcs) == 1:
# we found exactly one described DTC
return dtcs[0], decode_state.cursor_byte_position
return dtcs[0]

# the DTC was not specified. This probably means that the
# diagnostic description file is incomplete. We do not bail
Expand All @@ -129,7 +126,7 @@ def convert_bytes_to_physical(self,
sdgs=[],
)

return dtc, decode_state.cursor_byte_position
return dtc

def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState,
bit_position: int) -> bytes:
Expand Down
6 changes: 2 additions & 4 deletions odxtools/dynamiclengthfield.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from typing import TYPE_CHECKING, Any, Dict, List
from xml.etree import ElementTree

from .decodestate import DecodeState
Expand Down Expand Up @@ -55,7 +55,5 @@ def convert_physical_to_bytes(
) -> bytes:
raise NotImplementedError()

def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError()
21 changes: 8 additions & 13 deletions odxtools/endofpdufield.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import List, Optional, Tuple
from typing import List, Optional
from xml.etree import ElementTree

from .decodestate import DecodeState
Expand Down Expand Up @@ -58,21 +58,16 @@ def convert_physical_to_bytes(
coded_message += self.structure.convert_physical_to_bytes(value, encode_state)
return coded_message

def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
cursor_byte_position = decode_state.cursor_byte_position
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
odxassert(not decode_state.cursor_bit_position,
"No bit position can be specified for end-of-pdu fields!")

value = []
while cursor_byte_position < len(decode_state.coded_message):
result: List[ParameterValue] = []
while decode_state.cursor_byte_position < len(decode_state.coded_message):
# ATTENTION: the ODX specification is very misleading
# here: it says that the item is repeated until the end of
# the PDU, but it means that DOP of the items that are
# repeated are identical, not their values
new_value, cursor_byte_position = self.structure.convert_bytes_to_physical(
decode_state, bit_position=0)
# Update next byte_position
decode_state.cursor_byte_position = cursor_byte_position
value.append(new_value)
result.append(self.structure.decode_from_pdu(decode_state))

return value, cursor_byte_position
return result
4 changes: 3 additions & 1 deletion odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:

return value_bytes

def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType:
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
odxassert(decode_state.cursor_bit_position == 0,
"No bit position can be specified for MIN-MAX-LENGTH-TYPE values.")
if decode_state.cursor_byte_position + self.min_length > len(decode_state.coded_message):
raise DecodeError("The PDU ended before minimum length was reached.")

Expand Down
76 changes: 38 additions & 38 deletions odxtools/multiplexer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# SPDX-License-Identifier: MIT
from copy import copy
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from xml.etree import ElementTree

from .complexdop import ComplexDop
from .decodestate import DecodeState
from .encodestate import EncodeState
from .exceptions import DecodeError, EncodeError, odxrequire
from .exceptions import DecodeError, EncodeError, odxraise, odxrequire
from .multiplexercase import MultiplexerCase
from .multiplexerdefaultcase import MultiplexerDefaultCase
from .multiplexerswitchkey import MultiplexerSwitchKey
Expand Down Expand Up @@ -69,6 +68,9 @@ def _get_case_limits(self, case: MultiplexerCase) -> Tuple[AtomicOdxType, Atomic
key_type = self.switch_key.dop.physical_type.base_data_type
lower_limit = key_type.make_from(case.lower_limit)
upper_limit = key_type.make_from(case.upper_limit)
if not isinstance(lower_limit, type(upper_limit)) and not isinstance(
upper_limit, type(lower_limit)):
odxraise("Upper and lower bounds of limits must compareable")
return lower_limit, upper_limit

def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState,
Expand All @@ -85,15 +87,15 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state
case_name, case_value = next(iter(physical_value.items()))
case_pos = self.byte_position

for case in self.cases or []:
if case.short_name == case_name:
if case._structure:
case_bytes = case._structure.convert_physical_to_bytes(
for mux_case in self.cases or []:
if mux_case.short_name == case_name:
if mux_case._structure:
case_bytes = mux_case._structure.convert_physical_to_bytes(
case_value, encode_state, 0)
else:
case_bytes = b''

key_value, _ = self._get_case_limits(case)
key_value, _ = self._get_case_limits(mux_case)
key_bytes = self.switch_key.dop.convert_physical_to_bytes(
key_value, encode_state, bit_position=self.switch_key.bit_position or 0)

Expand All @@ -106,47 +108,45 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state

raise EncodeError(f"The case {case_name} is not found in Multiplexer {self.short_name}")

def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:

if bit_position != 0:
raise DecodeError("Multiplexers must be byte-aligned, i.e. bit_position=0, but "
f"{self.short_name} was passed the bit position {bit_position}")
key_value, key_next_byte = self.switch_key.dop.convert_bytes_to_physical(decode_state)
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:

case_decode_state = copy(decode_state)
# multiplexers are structures and thus the origin position
# must be moved to the start of the multiplexer
orig_origin = decode_state.origin_byte_position
orig_cursor = decode_state.cursor_byte_position
if self.byte_position is not None:
case_decode_state.origin_byte_position = decode_state.origin_byte_position + self.byte_position
else:
case_decode_state.origin_byte_position = decode_state.cursor_byte_position
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position
decode_state.origin_byte_position = decode_state.cursor_byte_position

case_found = False
case_next_byte = 0
case_value = None
key_value = self.switch_key.dop.decode_from_pdu(decode_state)

if not isinstance(key_value, int):
odxraise(f"Multiplexer keys must be integers (is '{type(key_value).__name__}'"
f" for multiplexer '{self.short_name}')")

case_value: Optional[ParameterValue] = None
for case in self.cases or []:
lower, upper = self._get_case_limits(case)
if lower <= key_value and key_value <= upper:
case_found = True
if lower <= key_value and key_value <= upper: # type: ignore[operator]
if case._structure:
case_value, case_next_byte = case._structure.convert_bytes_to_physical(
case_decode_state)
case_value = case._structure.decode_from_pdu(decode_state)
break

if not case_found and self.default_case is not None:
case_found = True
if case_value is None and self.default_case is not None:
if self.default_case._structure:
case_value, case_next_byte = self.default_case._structure.convert_bytes_to_physical(
case_decode_state)
case_value = self.default_case._structure.decode_from_pdu(decode_state)

if case_value is None:
odxraise(f"Failed to find a matching case in {self.short_name} for value {key_value!r}",
DecodeError)

mux_value = (case.short_name, case_value)

if not case_found:
raise DecodeError(
f"Failed to find a matching case in {self.short_name} for value {key_value!r}")
# go back to the original origin
decode_state.origin_byte_position = orig_origin
decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position)

mux_value = {case.short_name: cast(ParameterValue, case_value)}
mux_next_byte = decode_state.cursor_byte_position + max(
key_next_byte + self.switch_key.byte_position, case_next_byte + self.byte_position)
return mux_value, mux_next_byte
return mux_value

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks = super()._build_odxlinks()
Expand Down
6 changes: 3 additions & 3 deletions odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, Optional

from ..decodestate import DecodeState
from ..diagcodedtype import DiagCodedType
Expand Down Expand Up @@ -62,7 +62,7 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
return self.diag_coded_type.convert_internal_to_bytes(
self.coded_value, encode_state=encode_state, bit_position=bit_position_int)

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
# Extract coded values
orig_cursor_pos = decode_state.cursor_byte_position
if self.byte_position is not None:
Expand All @@ -85,7 +85,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int

decode_state.cursor_byte_position = max(orig_cursor_pos, decode_state.cursor_byte_position)

return coded_val, decode_state.cursor_byte_position
return coded_val

@property
def _coded_value_str(self) -> str:
Expand Down
3 changes: 1 addition & 2 deletions odxtools/parameters/dynamicparameter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Tuple

from ..decodestate import DecodeState
from ..encodestate import EncodeState
Expand All @@ -26,5 +25,5 @@ def is_settable(self) -> bool:
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.")

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError("Decoding a DynamicParameter is not implemented yet.")
Loading

0 comments on commit c8221bb

Please sign in to comment.