Skip to content

Commit

Permalink
Merge pull request #278 from andlaus/positionable_param
Browse files Browse the repository at this point in the history
Add `PositionableParam`
  • Loading branch information
andlaus authored Mar 7, 2024
2 parents 0614e2b + b7d5157 commit cda6e5e
Show file tree
Hide file tree
Showing 15 changed files with 93 additions and 107 deletions.
3 changes: 2 additions & 1 deletion odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def get_static_bit_length(self) -> Optional[int]:
cursor += param_bit_length
length = max(length, cursor)

# Round up to account for padding bits
# Round up to account for padding bits (all structures are
# byte aligned)
return ((length + 7) // 8) * 8

def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
Expand Down
13 changes: 4 additions & 9 deletions odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional

from typing_extensions import override

from ..decodestate import DecodeState
from ..diagcodedtype import DiagCodedType
from ..encodestate import EncodeState
Expand Down Expand Up @@ -62,13 +64,8 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
return self.diag_coded_type.convert_internal_to_bytes(
self.coded_value, encode_state=encode_state, bit_position=bit_position_int)

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
# Extract coded values
orig_cursor_pos = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

decode_state.cursor_bit_position = self.bit_position or 0
@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
coded_val = self.diag_coded_type.decode_from_pdu(decode_state)

# Check if the coded value in the message is correct.
Expand All @@ -83,8 +80,6 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
stacklevel=1,
)

decode_state.cursor_byte_position = max(orig_cursor_pos, decode_state.cursor_byte_position)

return coded_val

@property
Expand Down
5 changes: 4 additions & 1 deletion odxtools/parameters/dynamicparameter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass

from typing_extensions import override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..odxtypes import ParameterValue
Expand All @@ -25,5 +27,6 @@ def is_settable(self) -> bool:
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.")

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError("Decoding a DynamicParameter is not implemented yet.")
7 changes: 5 additions & 2 deletions odxtools/parameters/lengthkeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict

from typing_extensions import override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..exceptions import odxraise, odxrequire
Expand Down Expand Up @@ -66,8 +68,9 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
return super().encode_into_pdu(encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
phys_val = super().decode_from_pdu(decode_state)
@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
phys_val = super()._decode_positioned_from_pdu(decode_state)

if not isinstance(phys_val, int):
odxraise(f"The pysical type of length keys must be an integer, "
Expand Down
11 changes: 4 additions & 7 deletions odxtools/parameters/matchingrequestparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass
from typing import Optional

from typing_extensions import override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..exceptions import EncodeError
Expand Down Expand Up @@ -37,16 +39,11 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
.request_byte_position:self.request_byte_position +
self.byte_length]

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
orig_cursor = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
result = decode_state.extract_atomic_value(
bit_length=self.byte_length * 8,
base_data_type=DataType.A_UINT32,
is_highlow_byte_order=False)

decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor)

return result
13 changes: 4 additions & 9 deletions odxtools/parameters/nrcconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from typing_extensions import override

from ..decodestate import DecodeState
from ..diagcodedtype import DiagCodedType
from ..encodestate import EncodeState
Expand Down Expand Up @@ -77,14 +79,9 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
return self.diag_coded_type.convert_internal_to_bytes(
coded_value, encode_state, bit_position=bit_position_int)

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
orig_cursor = decode_state.cursor_byte_position
if self.byte_position is not None:
# Update cursor position
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
# Extract coded values
decode_state.cursor_bit_position = self.bit_position or 0
coded_value = self.diag_coded_type.decode_from_pdu(decode_state)

# Check if the coded value in the message is correct.
Expand All @@ -99,8 +96,6 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
stacklevel=1,
)

decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor)

return coded_value

def get_description_of_valid_values(self) -> str:
Expand Down
48 changes: 25 additions & 23 deletions odxtools/parameters/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional

from typing_extensions import final

from ..decodestate import DecodeState
from ..element import NamedElement
from ..encodestate import EncodeState
Expand Down Expand Up @@ -31,6 +33,14 @@

@dataclass
class Parameter(NamedElement, abc.ABC):
"""This class corresponds to POSITIONABLE-PARAM in the ODX
specification.
Be aware that, even though the ODX specification seems to make the
distinction of "positionable" and "normal" parameters, it does not
define any non-positionable parameter types.
"""
byte_position: Optional[int]
bit_position: Optional[int]
semantic: Optional[str]
Expand Down Expand Up @@ -90,35 +100,27 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
"""
pass

@abc.abstractmethod
@final
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""Decode the parameter value from the coded message.
orig_cursor = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

If the parameter does have a byte position property, the coded bytes the parameter covers are extracted
at this byte position and the function parameter `default_byte_position` is ignored.
decode_state.cursor_bit_position = self.bit_position or 0

If the parameter does not have a byte position and a byte position is passed,
the bytes are extracted at the byte position given by the argument `default_byte_position`.
result = self._decode_positioned_from_pdu(decode_state)

If the parameter does not have a byte position and the argument `default_byte_position` is None,
this function throws a `DecodeError`.
decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor)
decode_state.cursor_bit_position = 0

Parameters
----------
decode_state : DecodeState
The decoding state containing
* the byte message to be decoded
* the parameter values that are already decoded
* the next byte position that is used iff the parameter does not specify a byte position
return result

Returns
-------
ParameterValuePair | List[ParameterValuePair]
the decoded parameter value (the type is defined by the DOP)
int
the next byte position after the extracted parameter
"""
pass
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""Method which actually decodes the parameter
Its location is managed by `Parameter`."""
raise NotImplementedError(
"Required method '_decode_positioned_from_pdu()' not implemented by child class")

def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
"""Encode the value of a parameter into a binary blob and return it
Expand Down
18 changes: 5 additions & 13 deletions odxtools/parameters/parameterwithdop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional

from typing_extensions import override

from ..dataobjectproperty import DataObjectProperty
from ..decodestate import DecodeState
from ..dopbase import DopBase
Expand Down Expand Up @@ -75,16 +77,6 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
return dop.convert_physical_to_bytes(
physical_value, encode_state, bit_position=bit_position_int)

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
orig_cursor = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

decode_state.cursor_bit_position = self.bit_position or 0

# Use DOP to decode
phys_val = self.dop.decode_from_pdu(decode_state)

decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position)

return phys_val
@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
return self.dop.decode_from_pdu(decode_state)
7 changes: 5 additions & 2 deletions odxtools/parameters/physicalconstantparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict

from typing_extensions import override

from ..dataobjectproperty import DataObjectProperty
from ..decodestate import DecodeState
from ..encodestate import EncodeState
Expand Down Expand Up @@ -63,9 +65,10 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
return dop.convert_physical_to_bytes(
self.physical_constant_value, encode_state, bit_position=bit_position_int)

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
# Decode value
phys_val = super().decode_from_pdu(decode_state)
phys_val = super()._decode_positioned_from_pdu(decode_state)

# Check if decoded value matches expected value
if phys_val != self.physical_constant_value:
Expand Down
26 changes: 11 additions & 15 deletions odxtools/parameters/reservedparameter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Optional, cast
from typing import Optional

from typing_extensions import override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..odxtypes import ParameterValue
from ..odxtypes import DataType, ParameterValue
from .parameter import Parameter, ParameterType


Expand All @@ -28,19 +30,13 @@ def get_static_bit_length(self) -> Optional[int]:
return self.bit_length

def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
bit_position_int = self.bit_position if self.bit_position is not None else 0
return (0).to_bytes((self.bit_length + bit_position_int + 7) // 8, "big")

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
# move the cursor
orig_cursor = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position
return (0).to_bytes(((self.bit_position or 0) + self.bit_length + 7) // 8, "big")

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
decode_state.cursor_byte_position += ((self.bit_position or 0) + self.bit_length + 7) // 8

decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position)
decode_state.cursor_bit_position = 0

# ignore the value of the parameter data
return cast(int, None)
return decode_state.extract_atomic_value(
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32,
is_highlow_byte_order=False)
7 changes: 5 additions & 2 deletions odxtools/parameters/systemparameter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass

from typing_extensions import override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..odxtypes import ParameterValue
Expand All @@ -27,5 +29,6 @@ def is_settable(self) -> bool:
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
raise NotImplementedError("Encoding a SystemParameter is not implemented yet.")

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError("Decoding a SystemParameter is not implemented yet.")
@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError("Decoding SystemParameter is not implemented yet.")
13 changes: 8 additions & 5 deletions odxtools/parameters/tableentryparameter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass

from typing_extensions import override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..odxlink import OdxLinkRef
Expand All @@ -19,14 +21,15 @@ def parameter_type(self) -> ParameterType:

@property
def is_required(self) -> bool:
raise NotImplementedError("TableKeyParameter.is_required is not implemented yet.")
raise NotImplementedError("TableEntryParameter.is_required is not implemented yet.")

@property
def is_settable(self) -> bool:
raise NotImplementedError("TableKeyParameter.is_settable is not implemented yet.")
raise NotImplementedError("TableEntryParameter.is_settable is not implemented yet.")

def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
raise NotImplementedError("Encoding a TableKeyParameter is not implemented yet.")
raise NotImplementedError("Encoding a TableEntryParameter is not implemented yet.")

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError("Decoding a TableKeyParameter is not implemented yet.")
@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError("Decoding a TableEntryParameter is not implemented yet.")
12 changes: 4 additions & 8 deletions odxtools/parameters/tablekeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional

from typing_extensions import override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..exceptions import DecodeError, EncodeError, odxraise, odxrequire
Expand Down Expand Up @@ -135,19 +137,15 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
return super().encode_into_pdu(encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
orig_cursor = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
if self.table_row is not None:
# the table row to be used is statically specified -> no
# need to decode anything!
phys_val = self.table_row.short_name
else:
# Use DOP to decode
key_dop = odxrequire(self.table.key_dop)
decode_state.cursor_bit_position = self.bit_position or 0
key_dop_val = key_dop.decode_from_pdu(decode_state)

table_row_candidates = [x for x in self.table.table_rows if x.key == key_dop_val]
Expand All @@ -162,6 +160,4 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
# update the decode_state's table key
decode_state.table_keys[self.short_name] = table_row

decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor)

return phys_val
Loading

0 comments on commit cda6e5e

Please sign in to comment.