Skip to content

Commit

Permalink
Merge pull request #273 from andlaus/refactor_limits
Browse files Browse the repository at this point in the history
Refactor limit handling
  • Loading branch information
andlaus authored Mar 4, 2024
2 parents aa5f0a1 + 7b13385 commit 385561b
Show file tree
Hide file tree
Showing 24 changed files with 560 additions and 301 deletions.
30 changes: 22 additions & 8 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,23 +424,33 @@ class SomersaultSID(IntEnum):
upper_limit=None,
short_label=None,
description=None,
internal_type=DataType.A_UINT32,
physical_type=DataType.A_UNICODE2STRING,
compu_inverse_value=None,
compu_rational_coeffs=None),
internal_to_phys=[
CompuScale(
compu_const="false",
lower_limit=Limit(0),
upper_limit=Limit(0),
lower_limit=Limit(
value_raw="0", value_type=DataType.A_UINT32, interval_type=None),
upper_limit=Limit(
value_raw="0", value_type=DataType.A_UINT32, interval_type=None),
short_label=None,
description=None,
internal_type=DataType.A_UINT32,
physical_type=DataType.A_UNICODE2STRING,
compu_inverse_value=None,
compu_rational_coeffs=None),
CompuScale(
compu_const="true",
lower_limit=Limit(1),
upper_limit=Limit(1),
lower_limit=Limit(
value_raw="1", value_type=DataType.A_UINT32, interval_type=None),
upper_limit=Limit(
value_raw="1", value_type=DataType.A_UINT32, interval_type=None),
short_label=None,
description=None,
internal_type=DataType.A_UINT32,
physical_type=DataType.A_UNICODE2STRING,
compu_inverse_value=None,
compu_rational_coeffs=None),
],
Expand Down Expand Up @@ -1252,17 +1262,21 @@ class SomersaultSID(IntEnum):
short_name="forward_flip",
long_name="Forward Flip",
description=None,
lower_limit="1",
upper_limit="3",
lower_limit=Limit(
value_raw="1", value_type=DataType.A_INT32, interval_type=None),
upper_limit=Limit(
value_raw="3", value_type=DataType.A_INT32, interval_type=None),
structure_ref=OdxLinkRef.from_id(somersault_dops["num_flips"].odx_id),
structure_snref=None,
),
MultiplexerCase(
short_name="backward_flip",
long_name="Backward Flip",
description=None,
lower_limit="1",
upper_limit="3",
lower_limit=Limit(
value_raw="1", value_type=DataType.A_INT32, interval_type=None),
upper_limit=Limit(
value_raw="3", value_type=DataType.A_INT32, interval_type=None),
structure_ref=OdxLinkRef.from_id(somersault_dops["num_flips"].odx_id),
structure_snref=None,
),
Expand Down
46 changes: 43 additions & 3 deletions odxtools/compumethods/compuscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,22 @@ class CompuScale:
compu_const: Optional[AtomicOdxType]
compu_rational_coeffs: Optional[CompuRationalCoeffs]

# the following two attributes are not specified for COMPU-SCALE
# tags in the XML, but they are required to do anything useful
# with it.
internal_type: DataType
physical_type: DataType

@staticmethod
def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment], *,
internal_type: DataType, physical_type: DataType) -> "CompuScale":
short_label = et_element.findtext("SHORT-LABEL")
description = create_description_from_et(et_element.find("DESC"))
lower_limit = Limit.from_et(et_element.find("LOWER-LIMIT"), internal_type=internal_type)
upper_limit = Limit.from_et(et_element.find("UPPER-LIMIT"), internal_type=internal_type)

lower_limit = Limit.from_et(
et_element.find("LOWER-LIMIT"), doc_frags, value_type=internal_type)
upper_limit = Limit.from_et(
et_element.find("UPPER-LIMIT"), doc_frags, value_type=internal_type)

compu_inverse_value = internal_type.create_from_et(et_element.find("COMPU-INVERSE-VALUE"))
compu_const = physical_type.create_from_et(et_element.find("COMPU-CONST"))
Expand All @@ -64,4 +73,35 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment], *,
upper_limit=upper_limit,
compu_inverse_value=compu_inverse_value,
compu_const=compu_const,
compu_rational_coeffs=compu_rational_coeffs)
compu_rational_coeffs=compu_rational_coeffs,
internal_type=internal_type,
physical_type=physical_type)

def applies(self, internal_value: AtomicOdxType) -> bool:

if self.lower_limit is None and self.upper_limit is None:
# Everything is allowed: No limits have been specified
return True
elif self.upper_limit is None:
# no upper limit has been specified. the spec says that
# the value specified by the lower limit is the only one
# which is allowed (cf section 7.3.6.6.1)
assert self.lower_limit is not None

return internal_value == self.lower_limit._value
elif self.lower_limit is None:
# only the upper limit has been specified. the spec is
# ambiguous: it only says that if no upper limit is
# defined, the lower limit shall also be used as the upper
# limit and a closed interval type ought to be assumed,
# but it does not say what happens if the lower limit is
# not defined (which is allowed by the XSD). We thus
# assume that if only the upper limit is defined, is
# treated the same way as if only the lower limit is
# specified.
assert self.upper_limit is not None

return internal_value == self.upper_limit._value

return self.lower_limit.complies_to_lower(internal_value) and \
self.upper_limit.complies_to_upper(internal_value)
37 changes: 16 additions & 21 deletions odxtools/compumethods/createanycompumethod.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
from .compumethod import CompuMethod
from .compuscale import CompuScale
from .identicalcompumethod import IdenticalCompuMethod
from .limit import IntervalType, Limit
from .limit import Limit
from .linearcompumethod import LinearCompuMethod
from .scalelinearcompumethod import ScaleLinearCompuMethod
from .tabintpcompumethod import TabIntpCompuMethod
from .texttablecompumethod import TexttableCompuMethod


def _parse_compu_scale_to_linear_compu_method(
et_element: ElementTree.Element,
doc_frags: List[OdxDocFragment],
*,
scale_element: ElementTree.Element,
internal_type: DataType,
physical_type: DataType,
is_scale_linear: bool = False,
Expand Down Expand Up @@ -47,7 +48,7 @@ def _parse_compu_scale_to_linear_compu_method(
kwargs["internal_type"] = internal_type
kwargs["physical_type"] = physical_type

coeffs = odxrequire(scale_element.find("COMPU-RATIONAL-COEFFS"))
coeffs = odxrequire(et_element.find("COMPU-RATIONAL-COEFFS"))
nums = coeffs.iterfind("COMPU-NUMERATOR/V")

offset = computation_python_type(odxrequire(next(nums).text))
Expand All @@ -63,26 +64,20 @@ def _parse_compu_scale_to_linear_compu_method(
stacklevel=1)
# Read lower limit
internal_lower_limit = Limit.from_et(
scale_element.find("LOWER-LIMIT"),
internal_type=internal_type,
et_element.find("LOWER-LIMIT"),
doc_frags,
value_type=internal_type,
)
if internal_lower_limit is None:
internal_lower_limit = Limit(0, IntervalType.INFINITE)

kwargs["internal_lower_limit"] = internal_lower_limit

# Read upper limit
internal_upper_limit = Limit.from_et(
scale_element.find("UPPER-LIMIT"),
internal_type=internal_type,
et_element.find("UPPER-LIMIT"),
doc_frags,
value_type=internal_type,
)
if internal_upper_limit is None:
if not is_scale_linear:
internal_upper_limit = Limit(0, IntervalType.INFINITE)
else:
odxassert(internal_lower_limit is not None and
internal_lower_limit.interval_type == IntervalType.CLOSED)
logger.info("Scale linear without UPPER-LIMIT")
internal_upper_limit = internal_lower_limit

kwargs["internal_upper_limit"] = internal_upper_limit
kwargs["denominator"] = denominator
kwargs["factor"] = factor
Expand All @@ -92,7 +87,7 @@ def _parse_compu_scale_to_linear_compu_method(


def create_compu_default_value(et_element: Optional[ElementTree.Element],
doc_frags: List[OdxDocFragment], internal_type: DataType,
doc_frags: List[OdxDocFragment], internal_type: DataType, *,
physical_type: DataType) -> Optional[CompuScale]:
if et_element is None:
return None
Expand All @@ -104,7 +99,7 @@ def create_compu_default_value(et_element: Optional[ElementTree.Element],


def create_any_compu_method_from_et(et_element: ElementTree.Element,
doc_frags: List[OdxDocFragment], internal_type: DataType,
doc_frags: List[OdxDocFragment], *, internal_type: DataType,
physical_type: DataType) -> CompuMethod:
compu_category = et_element.findtext("CATEGORY")
odxassert(compu_category in [
Expand Down Expand Up @@ -158,13 +153,13 @@ def create_any_compu_method_from_et(et_element: ElementTree.Element,
# Compu method can be described by the function f(x) = (offset + factor * x) / denominator

scale_elem = odxrequire(et_element.find("COMPU-INTERNAL-TO-PHYS/COMPU-SCALES/COMPU-SCALE"))
return _parse_compu_scale_to_linear_compu_method(scale_element=scale_elem, **kwargs)
return _parse_compu_scale_to_linear_compu_method(scale_elem, doc_frags, **kwargs)

elif compu_category == "SCALE-LINEAR":

scale_elems = et_element.iterfind("COMPU-INTERNAL-TO-PHYS/COMPU-SCALES/COMPU-SCALE")
linear_methods = [
_parse_compu_scale_to_linear_compu_method(scale_element=scale_elem, **kwargs)
_parse_compu_scale_to_linear_compu_method(scale_elem, doc_frags, **kwargs)
for scale_elem in scale_elems
]
return ScaleLinearCompuMethod(linear_methods=linear_methods, **kwargs)
Expand Down
102 changes: 66 additions & 36 deletions odxtools/compumethods/limit.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from typing import List, Optional, overload
from xml.etree import ElementTree

from ..exceptions import odxassert, odxraise, odxrequire
from ..odxtypes import AtomicOdxType, DataType
from ..exceptions import odxraise
from ..odxlink import OdxDocFragment
from ..odxtypes import AtomicOdxType, DataType, compare_odx_values


class IntervalType(Enum):
Expand All @@ -16,42 +17,50 @@ class IntervalType(Enum):

@dataclass
class Limit:
value: AtomicOdxType
interval_type: IntervalType = IntervalType.CLOSED
value_raw: Optional[str]
value_type: Optional[DataType]
interval_type: Optional[IntervalType]

def __post_init__(self) -> None:
if self.interval_type == IntervalType.INFINITE:
self.value = 0
self._value: Optional[AtomicOdxType] = None

if self.value_type is not None:
self.set_value_type(self.value_type)

@staticmethod
@overload
def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment],
value_type: Optional[DataType]) -> "Limit":
...

@staticmethod
@overload
def from_et(et_element: None, doc_frags: List[OdxDocFragment],
value_type: Optional[DataType]) -> None:
...

@staticmethod
def from_et(et_element: Optional[ElementTree.Element], *,
internal_type: DataType) -> Optional["Limit"]:
def from_et(et_element: Optional[ElementTree.Element], doc_frags: List[OdxDocFragment],
value_type: Optional[DataType]) -> Optional["Limit"]:

if et_element is None:
return None

interval_type = None
if (interval_type_str := et_element.get("INTERVAL-TYPE")) is not None:
try:
interval_type = IntervalType(interval_type_str)
except ValueError:
interval_type = IntervalType.CLOSED
odxraise(f"Encountered unknown interval type '{interval_type_str}'")
else:
interval_type = IntervalType.CLOSED

if interval_type == IntervalType.INFINITE:
if et_element.tag == "LOWER-LIMIT":
return Limit(float("-inf"), interval_type)
else:
odxassert(et_element.tag == "UPPER-LIMIT")
return Limit(float("inf"), interval_type)
elif internal_type == DataType.A_BYTEFIELD:
hex_text = odxrequire(et_element.text)
if len(hex_text) % 2 == 1:
hex_text = "0" + hex_text
return Limit(bytes.fromhex(hex_text), interval_type)
else:
return Limit(internal_type.from_string(odxrequire(et_element.text)), interval_type)

value_raw = et_element.text

return Limit(value_raw=value_raw, interval_type=interval_type, value_type=value_type)

def set_value_type(self, value_type: DataType) -> None:
self.value_type = value_type
if self.value_raw is not None:
self._value = value_type.from_string(self.value_raw)

def complies_to_upper(self, value: AtomicOdxType) -> bool:
"""Checks if the value is in the range w.r.t. the upper limit.
Expand All @@ -60,23 +69,44 @@ def complies_to_upper(self, value: AtomicOdxType) -> bool:
* If the interval type is open, return `value < limit.value`.
* If the interval type is infinite, return `True`.
"""
if self.interval_type == IntervalType.CLOSED:
return value <= self.value # type: ignore[operator]
elif self.interval_type == IntervalType.OPEN:
return value < self.value # type: ignore[operator]
elif self.interval_type == IntervalType.INFINITE:
if self._value is None:
# if no value is specified, assume interval type INFINITE
# (what are we supposed to compare against?)
return True

if self.interval_type is None or self.interval_type == IntervalType.CLOSED:
# assume interval type CLOSED if a value was specified,
# but no interval type
return compare_odx_values(value, self._value) <= 0
elif self.interval_type == IntervalType.OPEN:
return compare_odx_values(value, self._value) < 0

if self.interval_type != IntervalType.INFINITE:
odxraise("Unhandled interval type {self.interval_type}")

return True

def complies_to_lower(self, value: AtomicOdxType) -> bool:
"""Checks if the value is in the range w.r.t. the lower limit.
* If the interval type is closed, return `limit.value <= value`.
* If the interval type is open, return `limit.value < value`.
* If the interval type is infinite, return `True`.
"""
if self.interval_type == IntervalType.CLOSED:
return self.value <= value # type: ignore[operator]
elif self.interval_type == IntervalType.OPEN:
return self.value < value # type: ignore[operator]
elif self.interval_type == IntervalType.INFINITE:

if self._value is None:
# if no value is specified, assume interval type INFINITE
# (what are we supposed to compare against?)
return True

if self.interval_type is None or self.interval_type == IntervalType.CLOSED:
# assume interval type CLOSED if a value was specified,
# but no interval type
return compare_odx_values(value, self._value) >= 0
elif self.interval_type == IntervalType.OPEN:
return compare_odx_values(value, self._value) > 0

if self.interval_type != IntervalType.INFINITE:
odxraise("Unhandled interval type {self.interval_type}")

return True
Loading

0 comments on commit 385561b

Please sign in to comment.