Skip to content

Commit

Permalink
Fix/operation invoked report (#266)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the title above -->
<!--- Link the corresponding issues after you created the pull request
-->

## Types of changes
<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->
- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->
- [x] I have updated the [changelog](../CHANGELOG.md) accordingly.
- [x] I have added tests to cover my changes.
  • Loading branch information
deichmab-draeger authored Oct 4, 2023
1 parent cc4857c commit 26bd75b
Show file tree
Hide file tree
Showing 39 changed files with 3,004 additions and 2,509 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Changed
### Added
- added a way to proccess operations sync (directly send FINISHED)

### Fixed
- basic_logging_setup only handles sdc logger, no more side effect due to calling logging.basicConfig.

### Changed
- change python classes of `addressing_types.py` to match ws-addressing standard of 2006 instead of 2004
- The final OperationInvokedReport has OperationTargetRef parameter set.
This required refactoring of Operations handling.

## [2.0.0a6] - 2023-09-11

Expand Down
2 changes: 1 addition & 1 deletion src/sdc11073/consumer/consumerimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _mk_lookup(self) -> dict[str, str]:
actions.EpisodicMetricReport: 'episodic_metric_report',
actions.EpisodicAlertReport: 'episodic_alert_report',
actions.EpisodicComponentReport: 'episodic_component_report',
actions.EpisodicOperationalStateReport: 'operational_state_report',
actions.EpisodicOperationalStateReport: 'episodic_operational_state_report',
actions.EpisodicContextReport: 'episodic_context_report',
actions.PeriodicMetricReport: 'periodic_metric_report',
actions.PeriodicAlertReport: 'periodic_alert_report',
Expand Down
129 changes: 96 additions & 33 deletions src/sdc11073/consumer/operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import weakref
from collections import deque
from concurrent.futures import Future
from dataclasses import dataclass
from threading import Lock
from typing import TYPE_CHECKING, Protocol

Expand All @@ -10,14 +12,15 @@
if TYPE_CHECKING:
from sdc11073.consumer.manipulator import RequestManipulatorProtocol
from sdc11073.consumer.serviceclients.serviceclientbase import HostedServiceClient
from sdc11073.pysoap.msgreader import MessageReader, ReceivedMessage
from sdc11073.pysoap.msgfactory import CreatedMessage
from sdc11073.pysoap.msgreader import MessageReader, ReceivedMessage
from sdc11073.xml_types import msg_types, pm_types

class OperationsManagerProtocol(Protocol):
"""OperationsManager calls an operation.
It returns a Future object that will contain the result at some point in time.
"""
It returns a Future object that will contain the result at some point in time.
"""

def __init__(self, msg_reader: MessageReader, log_prefix: str):
"""Construct the OperationsManager."""
Expand All @@ -38,41 +41,95 @@ def on_operation_invoked_report(self, message_data: ReceivedMessage):
"""Check operation state and set future result if it is a final state."""


@dataclass
class OperationResult:
"""OperationResult is the result of a Set operation.
Usually only the result is relevant, but for testing all intermediate data is also available.
"""

InvocationInfo: msg_types.InvocationInfo
InvocationSource: pm_types.InstanceIdentifier | None
OperationHandleRef: str | None
OperationTarget: str | None

set_response: msg_types.AbstractSetResponse
report_parts: list[
msg_types.OperationInvokedReportPart] # contains data of all OperationInvokedReportPart for operation


@dataclass
class OperationData:
"""collect all progress data of a transaction."""

future_ref: weakref.ref[Future]
set_response: msg_types.AbstractSetResponse
report_parts: list[
msg_types.OperationInvokedReportPart] # contains data of all OperationInvokedReportPart for operation


class OperationsManager(OperationsManagerProtocol): # inheriting from protocol to help typing
"""OperationsManager handles the multiple messages that are related to an operation.
The complex mechanic is hidden from user, he receives the final result.
"""

def __init__(self, msg_reader: MessageReader, log_prefix: str):
super().__init__(msg_reader, log_prefix)
self._msg_reader = msg_reader
self.log_prefix = log_prefix
self._logger = loghelper.get_logger_adapter('sdc.client.op_mgr', log_prefix)
self._transactions = {}
self._transactions: dict[int, OperationData] = {}
self._transactions_lock = Lock()
# An OperationInvokedReport can be received even before the response of the set operation is received.
# This means we must always store the last n OperationInvokedReportParts, one of them might already be the
# needed one.
self._last_operation_invoked_reports: deque[msg_types.OperationInvokedReportPart] = deque(maxlen=50)
msg_types = msg_reader.msg_types
self.nonFinalOperationStates = (msg_types.InvocationState.WAIT, msg_types.InvocationState.START)

def call_operation(self, hosted_service_client: HostedServiceClient,
def call_operation(self,
hosted_service_client: HostedServiceClient,
message: CreatedMessage,
request_manipulator: RequestManipulatorProtocol | None = None) -> Future:
"""Call an operation."""
ret = Future()
future_object = Future()
with self._transactions_lock:
message_data = hosted_service_client.post_message(message,
msg='call Operation',
request_manipulator=request_manipulator)
msg_types = self._msg_reader.msg_types
abstract_set_response = msg_types.AbstractSetResponse.from_node(message_data.p_msg.msg_node)
invocation_info = abstract_set_response.InvocationInfo
if invocation_info.InvocationState in self.nonFinalOperationStates:
self._transactions[invocation_info.TransactionId] = weakref.ref(ret)
if invocation_info.InvocationState in (msg_types.InvocationState.FAILED,
msg_types.InvocationState.CANCELLED,
msg_types.InvocationState.CANCELLED_MANUALLY):
# do not wait for an OperationInvokedReport
operation_result = OperationResult(abstract_set_response.InvocationInfo,
None,
None,
None,
abstract_set_response,
[])
future_object.set_result(operation_result)
return future_object
transaction_id = invocation_info.TransactionId
# now look for all related report parts and add them to result
parts = [part for part in self._last_operation_invoked_reports if
part.InvocationInfo.TransactionId == transaction_id]
# now look for a final report part
final_parts = [part for part in parts if
part.InvocationInfo.InvocationState not in self.nonFinalOperationStates]
if final_parts:
report_part = final_parts[0] # assuming there is only one
future_object.set_result(self._mk_operation_result(report_part, abstract_set_response, parts))
else:
self._logger.info('call_operation: transaction_id {} registered, state={}', # noqa: PLE1205
invocation_info.TransactionId, invocation_info.InvocationState)
else:
self._logger.debug('Result of Operation: {}', invocation_info) # noqa: PLE1205
ret.set_result(abstract_set_response)
return ret
self._transactions[transaction_id] = OperationData(weakref.ref(future_object),
abstract_set_response,
parts)
return future_object

def on_operation_invoked_report(self, message_data: ReceivedMessage):
"""Check operation state and set future result if it is a final state."""
Expand All @@ -85,25 +142,31 @@ def on_operation_invoked_report(self, message_data: ReceivedMessage):
self._logger.debug( # noqa: PLE1205
'{}on_operation_invoked_report: got transaction_id {} state {}',
self.log_prefix, transaction_id, invocation_state)
if invocation_state in self.nonFinalOperationStates:
self._logger.debug('nonFinal state detected, ignoring message...')
continue
with self._transactions_lock:
future_ref = self._transactions.pop(transaction_id, None)
if future_ref is None:
# this was not my transaction
self._logger.debug('transaction_id {} is not registered!', transaction_id) # noqa: PLE1205
continue
future_obj = future_ref()
if future_obj is None:
# client gave up.
self._logger.debug('transaction_id {} given up', transaction_id) # noqa: PLE1205
continue
if invocation_state == msg_types.InvocationState.FAILED:
error_text = ', '.join([err.text for err in report_part.InvocationInfo.InvocationErrorMessage])
self._logger.warning( # noqa: PLE1205
'transaction Id {} finished with error: error={}, error-message={}',
transaction_id, report_part.InvocationInfo.InvocationError, error_text)
if transaction_id in self._transactions:
self._transactions[transaction_id].report_parts.append(report_part)
if invocation_state in self.nonFinalOperationStates:
pass
else:
with self._transactions_lock:
operation_data = self._transactions.pop(transaction_id, None)
future_object = operation_data.future_ref()
if future_object is None:
# client gave up.
self._logger.debug('transaction_id {} given up', transaction_id) # noqa: PLE1205
else:
future_object.set_result(self._mk_operation_result(report_part,
operation_data.set_response,
operation_data.report_parts))
else:
self._logger.info('transaction Id {} ok', transaction_id) # noqa: PLE1205
future_obj.set_result(report_part)
self._last_operation_invoked_reports.append(report_part)

def _mk_operation_result(self,
current_report_part: msg_types.OperationInvokedReportPart,
set_response: msg_types.AbstractSetResponse,
all_report_parts: list[msg_types.OperationInvokedReportPart]) -> OperationResult:
return OperationResult(current_report_part.InvocationInfo,
current_report_part.InvocationSource,
current_report_part.OperationHandleRef,
current_report_part.OperationTarget,
set_response,
all_report_parts)
12 changes: 9 additions & 3 deletions src/sdc11073/loghelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,30 @@ def ensure_log_stream():


def reset_log_levels(root_logger_name='sdc'):
sub_logger_name = root_logger_name + '.'
for name in logging.Logger.manager.loggerDict:
if name.startswith(root_logger_name):
if name.startswith(sub_logger_name) or name == root_logger_name:
logging.getLogger(name).setLevel(logging.NOTSET)


def reset_handlers(root_logger_name='sdc'):
sub_logger_name = root_logger_name + '.'
for name in logging.Logger.manager.loggerDict:
if name.startswith(root_logger_name):
if name.startswith(sub_logger_name) or name == root_logger_name:
logger = logging.getLogger(name)
for handler in logger.handlers:
logger.removeHandler(handler)


def basic_logging_setup(root_logger_name='sdc', level=logging.INFO, log_file_name=None):
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=level)
reset_log_levels(root_logger_name)
reset_handlers(root_logger_name)
logger = logging.getLogger(root_logger_name)
logger.setLevel(level)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
if log_file_name:
file_handler = logging_handlers.RotatingFileHandler(log_file_name,
maxBytes=5000000,
Expand Down
3 changes: 2 additions & 1 deletion src/sdc11073/mdib/consumermdibxtra.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ def _on_episodic_alert_report(self, received_message_data: ReceivedMessage):
self._mdib.process_incoming_alert_states_report(received_message_data.mdib_version_group, report)

def _on_operational_state_report(self, received_message_data: ReceivedMessage):
report = self._msg_reader.process_incoming_states_report(received_message_data)
cls = self._mdib.data_model.msg_types.EpisodicOperationalStateReport
report = cls.from_node(received_message_data.p_msg.msg_node)
self._mdib.process_incoming_operational_states_report(received_message_data.mdib_version_group, report)

def _on_waveform_report_profiled(self, received_message_data: ReceivedMessage):
Expand Down
14 changes: 12 additions & 2 deletions src/sdc11073/mdib/descriptorcontainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import inspect
import sys
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Protocol, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Protocol

from sdc11073 import observableproperties as properties
from sdc11073.xml_types import ext_qnames as ext
Expand All @@ -18,8 +18,8 @@
from decimal import Decimal

from lxml import etree as etree_
from sdc11073 import xml_utils

from sdc11073 import xml_utils
from sdc11073.namespaces import NamespaceHelper
from sdc11073.xml_types.isoduration import DurationType
from sdc11073.xml_types.xml_structure import ExtensionLocalValue
Expand Down Expand Up @@ -78,6 +78,7 @@ class AbstractDescriptorProtocol(Protocol):
Type: pm_types.CodedValue | None
source_mds: str
parent_handle: str | None
coding: pm_types.Coding | None

def __init__(self, handle: str, parent_handle: str | None):
...
Expand Down Expand Up @@ -525,6 +526,15 @@ class AbstractOperationDescriptorContainer(AbstractDescriptorContainer):
_props = ('OperationTarget', 'MaxTimeToFinish', 'InvocationEffectiveTimeout', 'Retriggerable', 'AccessLevel')


class AbstractOperationDescriptorProtocol(AbstractDescriptorProtocol):
"""Protocol definition for AbstractOperationDescriptorContainer."""
OperationTarget: str
MaxTimeToFinish: DurationType | None
InvocationEffectiveTimeout: DurationType | None
Retriggerable: bool
AccessLevel: pm_types.AccessLevel


class SetValueOperationDescriptorContainer(AbstractOperationDescriptorContainer):
"""Represents SetValueOperationDescriptor in BICEPS."""

Expand Down
7 changes: 7 additions & 0 deletions src/sdc11073/mdib/statecontainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ class AbstractMetricStateContainer(AbstractStateContainer):
_props = ('BodySite', 'PhysicalConnector', 'ActivationState', 'ActiveDeterminationPeriod', 'LifeTimePeriod')


class MetricStateProtocol(AbstractStateProtocol):
MetricValue: None | pm_types.NumericMetricValue | pm_types.StringMetricValue | pm_types.SampleArrayValue

def mk_metric_value(self) -> pm_types.NumericMetricValue | pm_types.StringMetricValue | pm_types.SampleArrayValue:
...


class NumericMetricStateContainer(AbstractMetricStateContainer):
"""Represents NumericMetricState in BICEPS."""

Expand Down
28 changes: 13 additions & 15 deletions src/sdc11073/provider/dpwshostedservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

from lxml import etree as etree_

from ..dispatch import RequestDispatcher, DispatchKey
from ..namespaces import EventingActions
from ..namespaces import default_ns_helper as ns_hlp
from ..xml_types import mex_types
from ..xml_types.addressing_types import EndpointReferenceType
from ..xml_types.dpws_types import HostedServiceType

from sdc11073.dispatch import DispatchKey, RequestDispatcher
from sdc11073.namespaces import EventingActions
from sdc11073.namespaces import default_ns_helper as ns_hlp
from sdc11073.xml_types import mex_types
from sdc11073.xml_types.addressing_types import EndpointReferenceType
from sdc11073.xml_types.dpws_types import HostedServiceType

if typing.TYPE_CHECKING:
import pathlib
from sdc11073 import xml_utils

_wsdl_ns = ns_hlp.WSDL.namespace
Expand All @@ -27,14 +27,14 @@
WSDL_S12 = ns_hlp.WSDL12.namespace # old soap 12 namespace, used in wsdl 1.1. used only for wsdl


def etree_from_file(path) -> xml_utils.LxmlElement:
def etree_from_file(path: str | pathlib.Path) -> xml_utils.LxmlElement:
parser = etree_.ETCompatXMLParser(resolve_entities=False)
doc = etree_.parse(path, parser=parser)
doc = etree_.parse(str(path), parser=parser)
return doc.getroot()


class _EventService(RequestDispatcher):
""" A service that offers subscriptions"""
"""A service that offers subscriptions."""

def __init__(self, sdc_device, subscriptions_manager, offered_subscriptions):
super().__init__()
Expand Down Expand Up @@ -72,12 +72,10 @@ def _on_renew_status(self, request_data):


class DPWSHostedService(_EventService):
""" Container for DPWSPortTypeBase instances"""
"""Container for DPWSPortTypeBase instances."""

def __init__(self, sdc_device, subscriptions_manager, path_element, port_type_impls):
"""
:param sdc_device:
""":param sdc_device:
:param path_element:
:param port_type_impls: list of DPWSPortTypeBase
"""
Expand Down Expand Up @@ -111,7 +109,7 @@ def mk_dpws_hosted_instance(self) -> HostedServiceType:
return dpws_hosted

def _on_get_wsdl(self) -> bytes:
""" return wsdl"""
"""Return wsdl."""
self._logger.debug('_onGetWsdl returns {}', self._wsdl_string)
return self._wsdl_string

Expand Down
Loading

0 comments on commit 26bd75b

Please sign in to comment.