Skip to content

Commit

Permalink
Merge pull request #264 from andlaus/refactor_decoding1
Browse files Browse the repository at this point in the history
Refactor decoding, part 1
  • Loading branch information
andlaus authored Feb 8, 2024
2 parents 76a4c44 + 4e8eceb commit 240718a
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 89 deletions.
26 changes: 14 additions & 12 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,23 @@ def convert_bytes_to_physical(self,
if bit_position != 0:
raise DecodeError("Structures must be aligned, i.e. bit_position=0, but "
f"{self.short_name} was passed the bit position {bit_position}")
byte_code = decode_state.coded_message[decode_state.cursor_position:]
inner_decode_state = DecodeState(
coded_message=byte_code, parameter_values={}, cursor_position=0)

# move the origin since positions specified by sub-parameters of
# structures are relative to the beginning of the structure object.
orig_origin = decode_state.origin_position
decode_state.origin_position = decode_state.cursor_position

result = {}
for param in self.parameters:
value, cursor_position = param.decode_from_pdu(inner_decode_state)
value, cursor_position = param.decode_from_pdu(decode_state)

inner_decode_state.parameter_values[param.short_name] = value
inner_decode_state = DecodeState(
coded_message=byte_code,
parameter_values=inner_decode_state.parameter_values,
cursor_position=max(inner_decode_state.cursor_position, cursor_position),
)
result[param.short_name] = value
decode_state.cursor_position = max(decode_state.cursor_position, cursor_position)

# decoding of the structure finished. move back the origin.
decode_state.origin_position = orig_origin

return inner_decode_state.parameter_values, decode_state.cursor_position + inner_decode_state.cursor_position
return result, decode_state.cursor_position

def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes:
"""
Expand All @@ -264,7 +266,7 @@ def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue

def decode(self, message: bytes) -> ParameterValueDict:
# dummy decode state to be passed to convert_bytes_to_physical
decode_state = DecodeState(parameter_values={}, coded_message=message, cursor_position=0)
decode_state = DecodeState(coded_message=message)
param_values, cursor_position = self.convert_bytes_to_physical(decode_state)
if not isinstance(param_values, dict):
odxraise(f"Decoding a structure must result in a dictionary of parameter "
Expand Down
25 changes: 19 additions & 6 deletions odxtools/decodestate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict

from .odxtypes import ParameterValueDict
if TYPE_CHECKING:
from .tablerow import TableRow


@dataclass
Expand All @@ -11,8 +13,19 @@ class DecodeState:
#: bytes to be decoded
coded_message: bytes

#: values of already decoded parameters
parameter_values: ParameterValueDict
#: Absolute position of the origin
#:
#: i.e., the absolute byte position to which all relative positions
#: refer to, e.g. the position of the first byte of a structure.
origin_position: int = 0

#: Position of the next parameter if its position is not specified in ODX
cursor_position: int
#: Absolute position of the next undecoded byte to be considered
#:
#: (if not explicitly specified by the object to be decoded.)
cursor_position: int = 0

#: values of the length key parameters decoded so far
length_keys: Dict[str, int] = field(default_factory=dict)

#: values of the table key parameters decoded so far
table_keys: Dict[str, "TableRow"] = field(default_factory=dict)
7 changes: 2 additions & 5 deletions odxtools/endofpdufield.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# SPDX-License-Identifier: MIT
from copy import copy
from dataclasses import dataclass
from typing import List, Optional, Tuple
from xml.etree import ElementTree
Expand Down Expand Up @@ -62,18 +61,16 @@ def convert_physical_to_bytes(
def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
decode_state = copy(decode_state)
cursor_position = decode_state.cursor_position
byte_code = decode_state.coded_message

value = []
while len(byte_code) > cursor_position:
while cursor_position < len(decode_state.coded_message):
# ATTENTION: the ODX specification is very misleading
# here: it says that the item is repeated until the end of
# the PDU, but it means that DOP of the items that are
# repeated are identical, not their values
new_value, cursor_position = self.structure.convert_bytes_to_physical(
decode_state, bit_position=bit_position)
decode_state, bit_position=0)
# Update next byte_position
decode_state.cursor_position = cursor_position
value.append(new_value)
Expand Down
13 changes: 7 additions & 6 deletions odxtools/multiplexer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# SPDX-License-Identifier: MIT
from copy import copy
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
from xml.etree import ElementTree
Expand Down Expand Up @@ -114,12 +115,12 @@ def convert_bytes_to_physical(self,
f"{self.short_name} was passed the bit position {bit_position}")
key_value, key_next_byte = self.switch_key.dop.convert_bytes_to_physical(decode_state)

byte_code = decode_state.coded_message[decode_state.cursor_position:]
case_decode_state = DecodeState(
coded_message=byte_code[self.byte_position:],
parameter_values={},
cursor_position=0,
)
case_decode_state = copy(decode_state)
if self.byte_position is not None:
case_decode_state.origin_position = decode_state.origin_position + self.byte_position
else:
case_decode_state.origin_position = decode_state.cursor_position

case_found = False
case_next_byte = 0
case_value = None
Expand Down
15 changes: 7 additions & 8 deletions odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# SPDX-License-Identifier: MIT
import warnings
from copy import copy
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple

Expand Down Expand Up @@ -64,15 +63,15 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
self.coded_value, encode_state=encode_state, bit_position=bit_position_int)

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]:
decode_state = copy(decode_state)
if self.byte_position is not None and self.byte_position != decode_state.cursor_position:
# Update byte position
decode_state.cursor_position = self.byte_position

# Extract coded values
bit_position_int = self.bit_position if self.bit_position is not None else 0
orig_cursor_pos = decode_state.cursor_position
if self.byte_position is not None:
decode_state.cursor_position = decode_state.origin_position + self.byte_position

coded_val, cursor_position = self.diag_coded_type.convert_bytes_to_internal(
decode_state, bit_position=bit_position_int)
decode_state, bit_position=self.bit_position or 0)

decode_state.cursor_position = orig_cursor_pos

# Check if the coded value in the message is correct.
if self.coded_value != coded_val:
Expand Down
11 changes: 9 additions & 2 deletions odxtools/parameters/lengthkeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..exceptions import odxrequire
from ..exceptions import odxraise, odxrequire
from ..odxlink import OdxLinkDatabase, OdxLinkId
from ..odxtypes import ParameterValue
from .parameter import ParameterType
Expand Down Expand Up @@ -67,4 +67,11 @@ def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
return super().encode_into_pdu(encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]:
return super().decode_from_pdu(decode_state)
phys_val, cursor_position = super().decode_from_pdu(decode_state)

if not isinstance(phys_val, int):
odxraise(f"The pysical type of length keys must be an integer, "
f"(is {type(phys_val).__name__})")
decode_state.length_keys[self.short_name] = phys_val

return phys_val, cursor_position
5 changes: 3 additions & 2 deletions odxtools/parameters/matchingrequestparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
self.byte_length]

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]:
byte_position = (
self.byte_position if self.byte_position is not None else decode_state.cursor_position)
byte_position = decode_state.cursor_position
if self.byte_position is not None:
byte_position = decode_state.origin_position + self.byte_position
bit_position = self.bit_position or 0
byte_length = (8 * self.byte_length + bit_position + 7) // 8
val_as_bytes = decode_state.coded_message[byte_position:byte_position + byte_length]
Expand Down
14 changes: 8 additions & 6 deletions odxtools/parameters/parameterwithdop.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# SPDX-License-Identifier: MIT
from copy import copy
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple

Expand Down Expand Up @@ -77,13 +76,16 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
physical_value, encode_state, bit_position=bit_position_int)

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]:
decode_state = copy(decode_state)
if self.byte_position is not None and self.byte_position != decode_state.cursor_position:
decode_state.cursor_position = self.byte_position
orig_cursor_pos = decode_state.cursor_position
if (pos := getattr(self, "byte_position", None)) is not None:
decode_state.cursor_position = decode_state.origin_position + pos

bit_position = self.bit_position or 0

# Use DOP to decode
bit_position_int = self.bit_position if self.bit_position is not None else 0
phys_val, cursor_position = self.dop.convert_bytes_to_physical(
decode_state, bit_position=bit_position_int)
decode_state, bit_position=bit_position)

decode_state.cursor_position = max(orig_cursor_pos, cursor_position)

return phys_val, cursor_position
7 changes: 5 additions & 2 deletions odxtools/parameters/tablekeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in
if self.byte_position is not None and self.byte_position != decode_state.cursor_position:
cursor_position = self.byte_position

# update the decode_state's table key
if self.table_row is not None:
# the table row to be used is statically specified -> no
# need to decode anything!
Expand All @@ -158,6 +157,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in
elif len(table_row_candidates) > 1:
raise DecodeError(
f"Multiple rows exhibiting key '{str(key_dop_val)}' found in table")
phys_val = table_row_candidates[0].short_name
table_row = table_row_candidates[0]
phys_val = table_row.short_name

# update the decode_state's table key
decode_state.table_keys[self.short_name] = table_row

return phys_val, cursor_position
23 changes: 9 additions & 14 deletions odxtools/parameters/tablestructparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
bit_position = self.bit_position or 0
if tr.structure is not None:
# the selected table row references a structure
if not isinstance(tr_value, dict):
raise EncodeError(f"The value of `{tr_short_name}` must "
f"be a key-value dictionary.")

inner_encode_state = EncodeState(
coded_message=b'',
parameter_values=tr_value,
Expand Down Expand Up @@ -139,24 +135,23 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in
# find the selected table row
key_name = self.table_key.short_name

table = self.table_key.table
tr_short_name = decode_state.parameter_values[key_name]
candidate_trs = [tr for tr in table.table_rows if tr.short_name == tr_short_name]
if len(candidate_trs) != 1:
raise EncodeError(f"Could not uniquely resolve a table row named "
f"'{str(tr_short_name)}' in table '{str(table.short_name)}' ")
table_row = candidate_trs[0]
decode_state.table_keys[key_name]
table_row = decode_state.table_keys.get(key_name)
if table_row is None:
raise odxraise(f"No table key '{key_name}' found when decoding "
f"table struct parameter '{str(self.short_name)}'")
dummy_val = cast(str, None), cast(int, None)
return dummy_val, decode_state.cursor_position

# Use DOP or structure to decode the value
if table_row.dop is not None:
dop = table_row.dop
val, i = dop.convert_bytes_to_physical(decode_state)
return (table_row.short_name, val), i
elif table_row.structure is not None:
structure = table_row.structure
val, i = structure.convert_bytes_to_physical(decode_state)
val, i = table_row.structure.convert_bytes_to_physical(decode_state)
return (table_row.short_name, val), i
else:
# the table row associated with the key neither defines a
# DOP not a structure -> ignore it
# DOP nor a structure -> ignore it
return (table_row.short_name, cast(int, None)), decode_state.cursor_position
22 changes: 11 additions & 11 deletions odxtools/paramlengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ def convert_bytes_to_internal(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[AtomicOdxType, int]:
# Find length key with matching ID.
bit_length = None
for parameter_name, value in decode_state.parameter_values.items():
if parameter_name == self.length_key.short_name:
# The bit length of the parameter to be extracted is given by the length key.
bit_length = value
if not isinstance(bit_length, int):
odxraise(f"The bit length must be an integer, is {type(bit_length)}")
break

if not isinstance(bit_length, int):
odxraise(f"Did not find any length key with short name {self.length_key.short_name}")
bit_length = 0

# The bit length of the parameter to be extracted is given by the length key.
if self.length_key.short_name not in decode_state.length_keys:
odxraise(f"Unspecified mandatory length key parameter "
f"{self.length_key.short_name}")
else:
bit_length = decode_state.length_keys[self.length_key.short_name]
if not isinstance(bit_length, int):
odxraise(f"The bit length must be an integer, is {type(bit_length)}")
bit_length = 0

# Extract the internal value and return.
return self._extract_internal_value(
Expand Down
4 changes: 2 additions & 2 deletions tests/test_decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ def test_decode_request_end_of_pdu_field(self) -> None:
diag_layer._resolve_odxlinks(odxlinks)
diag_layer._finalize_init(odxlinks)

coded_message = bytes([0x12, 0x34, 0x34])
coded_message = bytes([0x12, 0x34, 0x54])
expected_message = Message(
coded_message=coded_message,
service=service,
Expand All @@ -903,7 +903,7 @@ def test_decode_request_end_of_pdu_field(self) -> None:
},
{
"struct_param_1": 4,
"struct_param_2": 3
"struct_param_2": 5
},
],
},
Expand Down
Loading

0 comments on commit 240718a

Please sign in to comment.