Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/operation invoked report #266

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Changed
### Added
- added a way to proccess operations sync (directly send FINISHED)

### Fixed
- basic_logging_setup only handles sdc logger, no more side effect due to calling logging.basicConfig.

### Changed
- change python classes of `addressing_types.py` to match ws-addressing standard of 2006 instead of 2004
- The final OperationInvokedReport has OperationTargetRef parameter set.
This required refactoring of Operations handling.

## [2.0.0a6] - 2023-09-11

Expand Down
129 changes: 96 additions & 33 deletions src/sdc11073/consumer/operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import weakref
from collections import deque
from concurrent.futures import Future
from dataclasses import dataclass
from threading import Lock
from typing import TYPE_CHECKING, Protocol

Expand All @@ -10,14 +12,15 @@
if TYPE_CHECKING:
from sdc11073.consumer.manipulator import RequestManipulatorProtocol
from sdc11073.consumer.serviceclients.serviceclientbase import HostedServiceClient
from sdc11073.pysoap.msgreader import MessageReader, ReceivedMessage
from sdc11073.pysoap.msgfactory import CreatedMessage
from sdc11073.pysoap.msgreader import MessageReader, ReceivedMessage
from sdc11073.xml_types import msg_types, pm_types

class OperationsManagerProtocol(Protocol):
"""OperationsManager calls an operation.

It returns a Future object that will contain the result at some point in time.
"""
It returns a Future object that will contain the result at some point in time.
"""

def __init__(self, msg_reader: MessageReader, log_prefix: str):
"""Construct the OperationsManager."""
Expand All @@ -38,41 +41,95 @@
"""Check operation state and set future result if it is a final state."""


@dataclass
class OperationResult:
"""OperationResult is the result of a Set operation.

Usually only the result is relevant, but for testing all intermediate data is also available.
"""

InvocationInfo: msg_types.InvocationInfo
InvocationSource: pm_types.InstanceIdentifier | None
OperationHandleRef: str | None
OperationTarget: str | None

set_response: msg_types.AbstractSetResponse
report_parts: list[
msg_types.OperationInvokedReportPart] # contains data of all OperationInvokedReportPart for operation


@dataclass
class OperationData:
"""collect all progress data of a transaction."""

future_ref: weakref.ref[Future]
set_response: msg_types.AbstractSetResponse
report_parts: list[
msg_types.OperationInvokedReportPart] # contains data of all OperationInvokedReportPart for operation


class OperationsManager(OperationsManagerProtocol): # inheriting from protocol to help typing
"""OperationsManager handles the multiple messages that are related to an operation.

The complex mechanic is hidden from user, he receives the final result.
"""

def __init__(self, msg_reader: MessageReader, log_prefix: str):
super().__init__(msg_reader, log_prefix)
self._msg_reader = msg_reader
self.log_prefix = log_prefix
self._logger = loghelper.get_logger_adapter('sdc.client.op_mgr', log_prefix)
self._transactions = {}
self._transactions: dict[int, OperationData] = {}
self._transactions_lock = Lock()
# An OperationInvokedReport can be received even before the response of the set operation is received.
# This means we must always store the last n OperationInvokedReportParts, one of them might already be the
# needed one.
self._last_operation_invoked_reports: deque[msg_types.OperationInvokedReportPart] = deque(maxlen=50)
msg_types = msg_reader.msg_types
self.nonFinalOperationStates = (msg_types.InvocationState.WAIT, msg_types.InvocationState.START)

def call_operation(self, hosted_service_client: HostedServiceClient,
def call_operation(self,
hosted_service_client: HostedServiceClient,
message: CreatedMessage,
request_manipulator: RequestManipulatorProtocol | None = None) -> Future:
"""Call an operation."""
ret = Future()
future_object = Future()
with self._transactions_lock:
message_data = hosted_service_client.post_message(message,
msg='call Operation',
request_manipulator=request_manipulator)
msg_types = self._msg_reader.msg_types
abstract_set_response = msg_types.AbstractSetResponse.from_node(message_data.p_msg.msg_node)
invocation_info = abstract_set_response.InvocationInfo
if invocation_info.InvocationState in self.nonFinalOperationStates:
self._transactions[invocation_info.TransactionId] = weakref.ref(ret)
if invocation_info.InvocationState in (msg_types.InvocationState.FAILED,
msg_types.InvocationState.CANCELLED,
msg_types.InvocationState.CANCELLED_MANUALLY):
# do not wait for an OperationInvokedReport
operation_result = OperationResult(abstract_set_response.InvocationInfo,
None,
None,
None,
abstract_set_response,
[])
future_object.set_result(operation_result)
return future_object
transaction_id = invocation_info.TransactionId
# now look for all related report parts and add them to result
parts = [part for part in self._last_operation_invoked_reports if
part.InvocationInfo.TransactionId == transaction_id]
# now look for a final report part
final_parts = [part for part in parts if
part.InvocationInfo.InvocationState not in self.nonFinalOperationStates]
if final_parts:
report_part = final_parts[0] # assuming there is only one
future_object.set_result(self._mk_operation_result(report_part, abstract_set_response, parts))
else:
self._logger.info('call_operation: transaction_id {} registered, state={}', # noqa: PLE1205
invocation_info.TransactionId, invocation_info.InvocationState)
else:
self._logger.debug('Result of Operation: {}', invocation_info) # noqa: PLE1205
ret.set_result(abstract_set_response)
return ret
self._transactions[transaction_id] = OperationData(weakref.ref(future_object),
abstract_set_response,
parts)
return future_object

def on_operation_invoked_report(self, message_data: ReceivedMessage):
"""Check operation state and set future result if it is a final state."""
Expand All @@ -85,25 +142,31 @@
self._logger.debug( # noqa: PLE1205
'{}on_operation_invoked_report: got transaction_id {} state {}',
self.log_prefix, transaction_id, invocation_state)
if invocation_state in self.nonFinalOperationStates:
self._logger.debug('nonFinal state detected, ignoring message...')
continue
with self._transactions_lock:
future_ref = self._transactions.pop(transaction_id, None)
if future_ref is None:
# this was not my transaction
self._logger.debug('transaction_id {} is not registered!', transaction_id) # noqa: PLE1205
continue
future_obj = future_ref()
if future_obj is None:
# client gave up.
self._logger.debug('transaction_id {} given up', transaction_id) # noqa: PLE1205
continue
if invocation_state == msg_types.InvocationState.FAILED:
error_text = ', '.join([err.text for err in report_part.InvocationInfo.InvocationErrorMessage])
self._logger.warning( # noqa: PLE1205
'transaction Id {} finished with error: error={}, error-message={}',
transaction_id, report_part.InvocationInfo.InvocationError, error_text)
if transaction_id in self._transactions:
self._transactions[transaction_id].report_parts.append(report_part)
if invocation_state in self.nonFinalOperationStates:
pass
else:
with self._transactions_lock:
operation_data = self._transactions.pop(transaction_id, None)
future_object = operation_data.future_ref()
if future_object is None:
# client gave up.
self._logger.debug('transaction_id {} given up', transaction_id) # noqa: PLE1205

Check warning on line 155 in src/sdc11073/consumer/operations.py

View check run for this annotation

Codecov / codecov/patch

src/sdc11073/consumer/operations.py#L155

Added line #L155 was not covered by tests
else:
future_object.set_result(self._mk_operation_result(report_part,
operation_data.set_response,
operation_data.report_parts))
else:
self._logger.info('transaction Id {} ok', transaction_id) # noqa: PLE1205
future_obj.set_result(report_part)
self._last_operation_invoked_reports.append(report_part)

def _mk_operation_result(self,
current_report_part: msg_types.OperationInvokedReportPart,
set_response: msg_types.AbstractSetResponse,
all_report_parts: list[msg_types.OperationInvokedReportPart]) -> OperationResult:
return OperationResult(current_report_part.InvocationInfo,
current_report_part.InvocationSource,
current_report_part.OperationHandleRef,
current_report_part.OperationTarget,
set_response,
all_report_parts)
12 changes: 9 additions & 3 deletions src/sdc11073/loghelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,30 @@ def ensure_log_stream():


def reset_log_levels(root_logger_name='sdc'):
sub_logger_name = root_logger_name + '.'
for name in logging.Logger.manager.loggerDict:
if name.startswith(root_logger_name):
if name.startswith(sub_logger_name) or name == root_logger_name:
logging.getLogger(name).setLevel(logging.NOTSET)


def reset_handlers(root_logger_name='sdc'):
sub_logger_name = root_logger_name + '.'
for name in logging.Logger.manager.loggerDict:
if name.startswith(root_logger_name):
if name.startswith(sub_logger_name) or name == root_logger_name:
logger = logging.getLogger(name)
for handler in logger.handlers:
logger.removeHandler(handler)


def basic_logging_setup(root_logger_name='sdc', level=logging.INFO, log_file_name=None):
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=level)
reset_log_levels(root_logger_name)
reset_handlers(root_logger_name)
logger = logging.getLogger(root_logger_name)
logger.setLevel(level)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
if log_file_name:
file_handler = logging_handlers.RotatingFileHandler(log_file_name,
maxBytes=5000000,
Expand Down
14 changes: 12 additions & 2 deletions src/sdc11073/mdib/descriptorcontainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import inspect
import sys
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Protocol, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Protocol

from sdc11073 import observableproperties as properties
from sdc11073.xml_types import ext_qnames as ext
Expand All @@ -18,8 +18,8 @@
from decimal import Decimal

from lxml import etree as etree_
from sdc11073 import xml_utils

from sdc11073 import xml_utils
from sdc11073.namespaces import NamespaceHelper
from sdc11073.xml_types.isoduration import DurationType
from sdc11073.xml_types.xml_structure import ExtensionLocalValue
Expand Down Expand Up @@ -78,6 +78,7 @@ class AbstractDescriptorProtocol(Protocol):
Type: pm_types.CodedValue | None
source_mds: str
parent_handle: str | None
coding: pm_types.Coding | None

def __init__(self, handle: str, parent_handle: str | None):
...
Expand Down Expand Up @@ -525,6 +526,15 @@ class AbstractOperationDescriptorContainer(AbstractDescriptorContainer):
_props = ('OperationTarget', 'MaxTimeToFinish', 'InvocationEffectiveTimeout', 'Retriggerable', 'AccessLevel')


class AbstractOperationDescriptorProtocol(AbstractDescriptorProtocol):
"""Protocol definition for AbstractOperationDescriptorContainer."""
OperationTarget: str
MaxTimeToFinish: DurationType | None
InvocationEffectiveTimeout: DurationType | None
Retriggerable: bool
AccessLevel: pm_types.AccessLevel


class SetValueOperationDescriptorContainer(AbstractOperationDescriptorContainer):
"""Represents SetValueOperationDescriptor in BICEPS."""

Expand Down
7 changes: 7 additions & 0 deletions src/sdc11073/mdib/statecontainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@
_props = ('BodySite', 'PhysicalConnector', 'ActivationState', 'ActiveDeterminationPeriod', 'LifeTimePeriod')


class MetricStateProtocol(AbstractStateProtocol):
MetricValue: None | pm_types.NumericMetricValue | pm_types.StringMetricValue | pm_types.SampleArrayValue

def mk_metric_value(self) -> pm_types.NumericMetricValue | pm_types.StringMetricValue | pm_types.SampleArrayValue:
pass

Check warning on line 224 in src/sdc11073/mdib/statecontainers.py

View check run for this annotation

Codecov / codecov/patch

src/sdc11073/mdib/statecontainers.py#L224

Added line #L224 was not covered by tests


class NumericMetricStateContainer(AbstractMetricStateContainer):
"""Represents NumericMetricState in BICEPS."""

Expand Down
28 changes: 13 additions & 15 deletions src/sdc11073/provider/dpwshostedservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

from lxml import etree as etree_

from ..dispatch import RequestDispatcher, DispatchKey
from ..namespaces import EventingActions
from ..namespaces import default_ns_helper as ns_hlp
from ..xml_types import mex_types
from ..xml_types.addressing_types import EndpointReferenceType
from ..xml_types.dpws_types import HostedServiceType

from sdc11073.dispatch import DispatchKey, RequestDispatcher
from sdc11073.namespaces import EventingActions
from sdc11073.namespaces import default_ns_helper as ns_hlp
from sdc11073.xml_types import mex_types
from sdc11073.xml_types.addressing_types import EndpointReferenceType
from sdc11073.xml_types.dpws_types import HostedServiceType

if typing.TYPE_CHECKING:
import pathlib
from sdc11073 import xml_utils

_wsdl_ns = ns_hlp.WSDL.namespace
Expand All @@ -27,14 +27,14 @@
WSDL_S12 = ns_hlp.WSDL12.namespace # old soap 12 namespace, used in wsdl 1.1. used only for wsdl


def etree_from_file(path) -> xml_utils.LxmlElement:
def etree_from_file(path: str | pathlib.Path) -> xml_utils.LxmlElement:
parser = etree_.ETCompatXMLParser(resolve_entities=False)
doc = etree_.parse(path, parser=parser)
doc = etree_.parse(str(path), parser=parser)
return doc.getroot()


class _EventService(RequestDispatcher):
""" A service that offers subscriptions"""
"""A service that offers subscriptions."""

def __init__(self, sdc_device, subscriptions_manager, offered_subscriptions):
super().__init__()
Expand Down Expand Up @@ -72,12 +72,10 @@ def _on_renew_status(self, request_data):


class DPWSHostedService(_EventService):
""" Container for DPWSPortTypeBase instances"""
"""Container for DPWSPortTypeBase instances."""

def __init__(self, sdc_device, subscriptions_manager, path_element, port_type_impls):
"""

:param sdc_device:
""":param sdc_device:
:param path_element:
:param port_type_impls: list of DPWSPortTypeBase
"""
Expand Down Expand Up @@ -111,7 +109,7 @@ def mk_dpws_hosted_instance(self) -> HostedServiceType:
return dpws_hosted

def _on_get_wsdl(self) -> bytes:
""" return wsdl"""
"""Return wsdl."""
self._logger.debug('_onGetWsdl returns {}', self._wsdl_string)
return self._wsdl_string

Expand Down
Loading
Loading