diff --git a/.github/workflows/pat_integration.yml b/.github/workflows/pat_integration.yml index cc002993..84c92a5c 100644 --- a/.github/workflows/pat_integration.yml +++ b/.github/workflows/pat_integration.yml @@ -7,7 +7,9 @@ on: - master - v* jobs: - sdc11073_provider_v1: + sdc11073_provider_v2: + env: + EXPECTED_WAVEFORM_SAMPLES_4F: 100 # set to a low value as we cannot control GitHub ci network latency strategy: matrix: os: [ ubuntu-latest, macos-latest, windows-latest ] @@ -28,10 +30,10 @@ jobs: - name: Run tests with tls enabled if: ${{ matrix.tls_enable }} - run: python -m examples.ReferenceTest.run --tls + run: python -m examples.ReferenceTestV2.run --tls timeout-minutes: 2 - name: Run tests with tls disabled if: ${{ !matrix.tls_enable }} - run: python -m examples.ReferenceTest.run + run: python -m examples.ReferenceTestV2.run timeout-minutes: 2 diff --git a/examples/ReferenceTest/reference_consumer.py b/examples/ReferenceTest/reference_consumer.py index eadd0ffe..c1c552ee 100644 --- a/examples/ReferenceTest/reference_consumer.py +++ b/examples/ReferenceTest/reference_consumer.py @@ -1,18 +1,19 @@ +"""Reference test v1.""" + import dataclasses import enum import os +import pathlib import sys import time import traceback -import typing import uuid from collections import defaultdict from concurrent import futures from decimal import Decimal import sdc11073.certloader -from sdc11073 import commlog, network -from sdc11073 import observableproperties +from sdc11073 import commlog, network, observableproperties from sdc11073.certloader import mk_ssl_contexts_from_folder from sdc11073.consumer import SdcConsumer from sdc11073.definitions_sdc import SdcV1Definitions @@ -24,7 +25,7 @@ ConsumerMdibMethods.DETERMINATIONTIME_WARN_LIMIT = 2.0 # ref_discovery_runs indicates the maximum executions of wsdiscovery search services, "0" -> run until service is found -discovery_runs = int(os.getenv('ref_discovery_runs', 0)) # noqa: SIM112 +discovery_runs = int(os.getenv('ref_discovery_runs', "0")) # noqa: SIM112 ENABLE_COMMLOG = True @@ -54,19 +55,22 @@ def get_epr() -> uuid.UUID: return uuid.UUID(epr) return uuid.UUID('12345678-6f55-11ea-9697-123456789abc') + class TestResult(enum.Enum): - """ - Represents the overall test result. - """ + """Represents the overall test result.""" + PASSED = 'PASSED' FAILED = 'FAILED' @dataclasses.dataclass class TestCollector: + """Test collector.""" + overall_test_result: TestResult = TestResult.PASSED - test_messages: typing.List = dataclasses.field(default_factory=list) + test_messages: list = dataclasses.field(default_factory=list) def add_result(self, test_step_message: str, test_step_result: TestResult): + """Add result to result list.""" if not isinstance(test_step_result, TestResult): raise ValueError("Unexpected parameter") if self.overall_test_result is not TestResult.FAILED: @@ -74,12 +78,13 @@ def add_result(self, test_step_message: str, test_step_result: TestResult): self.test_messages.append(test_step_message) -def run_ref_test() -> TestCollector: +def run_ref_test() -> TestCollector: # noqa: PLR0915,PLR0912,C901 + """Run reference tests.""" test_collector = TestCollector() adapter_ip = get_network_adapter().ip print(f'using adapter address {adapter_ip}') search_epr = get_epr() - print('Test step 1: discover device which endpoint ends with "{}"'.format(search_epr)) + print(f'Test step 1: discover device which endpoint ends with "{search_epr}"') wsd = WSDiscovery(str(adapter_ip)) wsd.start() my_service = None @@ -90,7 +95,7 @@ def run_ref_test() -> TestCollector: for s in services: if s.epr.endswith(str(search_epr)): my_service = s - print('found service {}'.format(s.epr)) + print(f'found service {s.epr}') break discovery_counter += 1 if discovery_runs and discovery_counter >= discovery_runs: @@ -108,7 +113,7 @@ def run_ref_test() -> TestCollector: client.start_all() print('Test step 2 passed: connected to device') test_collector.add_result('### Test 2 ### passed', TestResult.PASSED) - except: + except Exception: # noqa: BLE001 print(traceback.format_exc()) test_collector.add_result('### Test 2 ### failed', TestResult.FAILED) return test_collector @@ -120,7 +125,7 @@ def run_ref_test() -> TestCollector: print('Test step 3&4 passed') test_collector.add_result('### Test 3 ### passed', TestResult.PASSED) test_collector.add_result('### Test 4 ### passed', TestResult.PASSED) - except: + except Exception: # noqa: BLE001 print(traceback.format_exc()) test_collector.add_result('### Test 3 ### failed', TestResult.FAILED) test_collector.add_result('### Test 4 ### failed', TestResult.FAILED) @@ -131,7 +136,7 @@ def run_ref_test() -> TestCollector: print('Test step 5: check that at least one patient context exists') patients = mdib.context_states.NODETYPE.get(pm.PatientContextState, []) if len(patients) > 0: - print('found {} patients, Test step 5 passed'.format(len(patients))) + print(f'found {len(patients)} patients, Test step 5 passed') test_collector.add_result('### Test 5 ### passed', TestResult.PASSED) else: print('found no patients, Test step 5 failed') @@ -141,7 +146,7 @@ def run_ref_test() -> TestCollector: print('Test step 6: check that at least one location context exists') locations = mdib.context_states.NODETYPE.get(pm.LocationContextState, []) if len(locations) > 0: - print('found {} locations, Test step 6 passed'.format(len(locations))) + print(f'found {len(locations)} locations, Test step 6 passed') test_collector.add_result('### Test 6 ### passed', TestResult.PASSED) else: print('found no locations, Test step 6 failed') @@ -151,22 +156,22 @@ def run_ref_test() -> TestCollector: metric_updates = defaultdict(list) alert_updates = defaultdict(list) - def onMetricUpdates(metricsbyhandle): + def _on_metric_updates(metricsbyhandle: dict): print('onMetricUpdates', metricsbyhandle) for k, v in metricsbyhandle.items(): metric_updates[k].append(v) - def onAlertUpdates(alertsbyhandle): + def _on_alert_updates(alertsbyhandle: dict): print('onAlertUpdates', alertsbyhandle) for k, v in alertsbyhandle.items(): alert_updates[k].append(v) - observableproperties.bind(mdib, metrics_by_handle=onMetricUpdates) - observableproperties.bind(mdib, alert_by_handle=onAlertUpdates) + observableproperties.bind(mdib, metrics_by_handle=_on_metric_updates) + observableproperties.bind(mdib, alert_by_handle=_on_alert_updates) sleep_timer = 20 min_updates = sleep_timer // 5 - 1 - print('will wait for {} seconds now, expecting at least {} updates per Handle'.format(sleep_timer, min_updates)) + print(f'will wait for {sleep_timer} seconds now, expecting at least {min_updates} updates per Handle') time.sleep(sleep_timer) print(metric_updates) print(alert_updates) @@ -175,20 +180,20 @@ def onAlertUpdates(alertsbyhandle): else: for k, v in metric_updates.items(): if len(v) < min_updates: - print('found only {} updates for {}, test step 7 failed'.format(len(v), k)) + print(f'found only {len(v)} updates for {k}, test step 7 failed') test_collector.add_result(f'### Test 7 Handle {k} ### failed', TestResult.FAILED) else: - print('found {} updates for {}, test step 7 ok'.format(len(v), k)) + print(f'found {len(v)} updates for {k}, test step 7 ok') test_collector.add_result(f'### Test 7 Handle {k} ### passed', TestResult.PASSED) if len(alert_updates) == 0: test_collector.add_result('### Test 8 ### failed', TestResult.FAILED) else: for k, v in alert_updates.items(): if len(v) < min_updates: - print('found only {} updates for {}, test step 8 failed'.format(len(v), k)) + print(f'found only {len(v)} updates for {k}, test step 8 failed') test_collector.add_result(f'### Test 8 Handle {k} ### failed', TestResult.FAILED) else: - print('found {} updates for {}, test step 8 ok'.format(len(v), k)) + print(f'found {len(v)} updates for {k}, test step 8 ok') test_collector.add_result(f'### Test 8 Handle {k} ### passed', TestResult.PASSED) print('Test step 9: call SetString operation') @@ -201,28 +206,27 @@ def onAlertUpdates(alertsbyhandle): for s in setstring_operations: if s.Handle != setst_handle: continue - print('setString Op ={}'.format(s)) + print(f'setString Op ={s}') try: fut = client.set_service_client.set_string(s.Handle, 'hoppeldipop') try: res = fut.result(timeout=10) print(res) if res.InvocationInfo.InvocationState != InvocationState.FINISHED: - print('set string operation {} did not finish with "Fin":{}'.format(s.Handle, res)) + print(f'set string operation {s.Handle} did not finish with "Fin":{res}') test_collector.add_result('### Test 9(SetString) ### failed', TestResult.FAILED) else: - print('set string operation {} ok:{}'.format(s.Handle, res)) + print(f'set string operation {s.Handle} ok:{res}') test_collector.add_result('### Test 9(SetString) ### passed', TestResult.PASSED) except futures.TimeoutError: print('timeout error') test_collector.add_result('### Test 9(SetString) ### failed', TestResult.FAILED) - except Exception as ex: + except Exception as ex:# noqa: BLE001 print(f'Test 9(SetString): {ex}') test_collector.add_result('### Test 9(SetString) ### failed', TestResult.FAILED) print('Test step 9: call SetValue operation') setvalue_operations = mdib.descriptions.NODETYPE.get(pm.SetValueOperationDescriptor, []) - # print('setvalue_operations', setvalue_operations) setval_handle = 'numeric.ch0.vmd1_sco_0' if len(setvalue_operations) == 0: print('Test step 9 failed, no SetValue operation found') @@ -231,22 +235,22 @@ def onAlertUpdates(alertsbyhandle): for s in setvalue_operations: if s.Handle != setval_handle: continue - print('setNumericValue Op ={}'.format(s)) + print(f'setNumericValue Op ={s}') try: fut = client.set_service_client.set_numeric_value(s.Handle, Decimal('42')) try: res = fut.result(timeout=10) print(res) if res.InvocationInfo.InvocationState != InvocationState.FINISHED: - print('set value operation {} did not finish with "Fin":{}'.format(s.Handle, res)) + print(f'set value operation {s.Handle} did not finish with "Fin":{res}') test_collector.add_result('### Test 9(SetValue) ### failed', TestResult.FAILED) else: - print('set value operation {} ok:{}'.format(s.Handle, res)) + print(f'set value operation {s.Handle} ok:{res}') test_collector.add_result('### Test 9(SetValue) ### passed', TestResult.PASSED) except futures.TimeoutError: print('timeout error') test_collector.add_result('### Test 9(SetValue) ### failed', TestResult.FAILED) - except Exception as ex: + except Exception as ex:# noqa: BLE001 print(f'Test 9(SetValue): {ex}') test_collector.add_result('### Test 9(SetValue) ### failed', TestResult.FAILED) @@ -260,27 +264,27 @@ def onAlertUpdates(alertsbyhandle): for s in activate_operations: if s.Handle != activate_handle: continue - print('activate Op ={}'.format(s)) + print(f'activate Op ={s}') try: fut = client.set_service_client.activate(s.Handle, 'hoppeldipop') try: res = fut.result(timeout=10) print(res) if res.InvocationInfo.InvocationState != InvocationState.FINISHED: - print('activate operation {} did not finish with "Fin":{}'.format(s.Handle, res)) + print(f'activate operation {s.Handle} did not finish with "Fin":{res}') test_collector.add_result('### Test 9(Activate) ### failed', TestResult.FAILED) else: - print('activate operation {} ok:{}'.format(s.Handle, res)) + print(f'activate operation {s.Handle} ok:{res}') test_collector.add_result('### Test 9(Activate) ### passed', TestResult.PASSED) except futures.TimeoutError: print('timeout error') test_collector.add_result('### Test 9(Activate) ### failed', TestResult.FAILED) - except Exception as ex: + except Exception as ex: # noqa: BLE001 print(f'Test 9(Activate): {ex}') test_collector.add_result('### Test 9(Activate) ### failed', TestResult.FAILED) print('Test step 10: cancel all subscriptions') - success = client._subscription_mgr.unsubscribe_all() + success = client._subscription_mgr.unsubscribe_all() # noqa: SLF001 if success: test_collector.add_result('### Test 10(unsubscribe) ### passed', TestResult.PASSED) else: @@ -289,18 +293,17 @@ def onAlertUpdates(alertsbyhandle): return test_collector def main() -> TestCollector: + """Execute reference tests.""" xtra_log_config = os.getenv('ref_xtra_log_cnf') # noqa: SIM112 import json import logging.config - here = os.path.dirname(__file__) - - with open(os.path.join(here, 'logging_default.json')) as f: + with pathlib.Path(__file__).parent.joinpath("logging_default.json").open() as f: logging_setup = json.load(f) logging.config.dictConfig(logging_setup) if xtra_log_config is not None: - with open(xtra_log_config) as f: + with pathlib.Path(xtra_log_config).open() as f: logging_setup2 = json.load(f) logging.config.dictConfig(logging_setup2) comm_logger = commlog.DirectoryLogger(log_folder=r'c:\temp\sdc_refclient_commlog', diff --git a/examples/ReferenceTestV2/__init__.py b/examples/ReferenceTestV2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/ReferenceTestV2/discoproxyclient.py b/examples/ReferenceTestV2/discoproxyclient.py new file mode 100644 index 00000000..4c5b0bec --- /dev/null +++ b/examples/ReferenceTestV2/discoproxyclient.py @@ -0,0 +1,412 @@ +"""Implementation of discovery proxy client. + +A discovery proxy prototype has been implemented based on +https://confluence.hl7.org/display/GP/Topic%3A+Discovery+Proxy+Actors + +This client connects to that proxy. +""" + +from __future__ import annotations + +import os +import pathlib +import random +import time +from typing import TYPE_CHECKING +from uuid import UUID + +from sdc11073.certloader import mk_ssl_contexts_from_folder +from sdc11073.consumer.request_handler_deferred import EmptyResponse +from sdc11073.definitions_sdc import SdcV1Definitions +from sdc11073.dispatch import MessageConverterMiddleware +from sdc11073.httpserver.httpserverimpl import HttpServerThreadBase +from sdc11073.location import SdcLocation +from sdc11073.loghelper import LoggerAdapter, basic_logging_setup, get_logger_adapter +from sdc11073.mdib import ProviderMdib +from sdc11073.namespaces import EventingActions +from sdc11073.namespaces import default_ns_helper as nsh +from sdc11073.provider import SdcProvider +from sdc11073.pysoap.msgfactory import CreatedMessage, MessageFactory +from sdc11073.pysoap.msgreader import MessageReader, ReceivedMessage +from sdc11073.pysoap.soapclient import Fault, SoapClient +from sdc11073.wsdiscovery.wsdimpl import Service +from sdc11073.xml_types import eventing_types, pm_types, wsd_types +from sdc11073.xml_types.addressing_types import HeaderInformationBlock +from sdc11073.xml_types.dpws_types import ThisDeviceType, ThisModelType + +if TYPE_CHECKING: + from collections.abc import Iterable + + from lxml.etree import QName + + from sdc11073.certloader import SSLContextContainer + from sdc11073.dispatch.request import RequestData + from sdc11073.xml_types.basetypes import MessageType + +message_factory = MessageFactory(SdcV1Definitions, None, logger=get_logger_adapter("sdc.disco.msg")) +message_reader = MessageReader(SdcV1Definitions, None, logger=get_logger_adapter("sdc.disco.msg")) + +ADDRESS_ALL = "urn:docs-oasis-open-org:ws-dd:ns:discovery:2009:01" # format acc to RFC 2141 + + +def _mk_wsd_soap_message(header_info: HeaderInformationBlock, payload: MessageType) -> CreatedMessage: + # use discovery specific namespaces + return message_factory.mk_soap_message( + header_info, + payload, + ns_list=[nsh.S12, nsh.WSA, nsh.WSD], + use_defaults=False, + ) + + +class DiscoProxyClient: + """Discovery proxy consumer.""" + + def __init__( + self, + disco_proxy_address: str, + my_address: str, + ssl_context_container: SSLContextContainer | None = None, + ): + self._proxy_address = disco_proxy_address + self._my_address = my_address + self._ssl_context_container = ssl_context_container + self._logger = get_logger_adapter("sdc.disco") + self._local_services: dict[str, Service] = {} + self._remote_services: dict[str, Service] = {} + ssl_context = None if ssl_context_container is None else ssl_context_container.client_context + self._soap_client = SoapClient( + disco_proxy_address, + socket_timeout=5, + logger=get_logger_adapter("sdc.disco.client"), + ssl_context=ssl_context, + sdc_definitions=SdcV1Definitions, + msg_reader=message_reader, + ) + self._http_server = HttpServerThreadBase( + my_address, + ssl_context_container.server_context if ssl_context_container else None, + logger=get_logger_adapter("sdc.disco.httpsrv"), + supported_encodings=["gzip"], + ) + + self._msg_converter = MessageConverterMiddleware(message_reader, message_factory, self._logger, self) + self._my_server_port = None + self.subscribe_response = None + + def start(self, subscribe: bool = True): + """Subscribe.""" + # first start http server, the services need to know the ip port number + self._http_server.start() + + event_is_set = self._http_server.started_evt.wait(timeout=15.0) + if not event_is_set: + self._logger.error("Cannot start device, start event of http server not set.") + raise RuntimeError("Cannot start device, start event of http server not set.") + self._my_server_port = self._http_server.my_port + self._http_server.dispatcher.register_instance("", self._msg_converter) + + if subscribe: + self.send_subscribe() + + def stop(self, unsubscribe: bool = False): + """Unsubscribe.""" + # it seems that unsubscribe is not supported + if unsubscribe: + self.send_unsubscribe() + self._http_server.stop() + + def get_active_addresses(self) -> list[str]: + """Get active addresses.""" + # TODO: do not return list + return [self._my_address] + + def search_services( + self, + types: Iterable[QName] | None = None, + scopes: wsd_types.ScopesType | None = None, + ) -> list[Service]: + """Send a Probe message. + + Update known services with found services. Return list of services found in probe response. + """ + payload = wsd_types.ProbeType() + payload.Types = types + if scopes is not None: + payload.Scopes = scopes + + inf = HeaderInformationBlock(action=payload.action, addr_to=ADDRESS_ALL) + created_message = _mk_wsd_soap_message(inf, payload) + received_message = self._soap_client.post_message_to("", created_message) + probe_response = wsd_types.ProbeMatchesType.from_node(received_message.p_msg.msg_node) + result = [] + for probe_match in probe_response.ProbeMatch: + service = Service( + types=probe_match.Types, + scopes=probe_match.Scopes, + x_addrs=probe_match.XAddrs, + epr=probe_match.EndpointReference.Address, + instance_id="", + metadata_version=probe_match.MetadataVersion, + ) + self._remote_services[service.epr] = service + result.append(service) + return result + + def send_resolve(self, epr: str) -> wsd_types.ResolveMatchesType: + """Send resolve.""" + payload = wsd_types.ResolveType() + payload.EndpointReference.Address = epr + inf = HeaderInformationBlock(action=payload.action, addr_to=ADDRESS_ALL) + created_message = _mk_wsd_soap_message(inf, payload) + received_message = self._soap_client.post_message_to("", created_message) + return wsd_types.ResolveMatchesType.from_node(received_message.p_msg.msg_node) + + def clear_remote_services(self): + """Clear remotely discovered services.""" + self._remote_services.clear() + + def publish_service(self, epr: str, types: list[QName], scopes: wsd_types.ScopesType, x_addrs: list[str]): + """Publish services.""" + metadata_version = 1 + instance_id = str(random.randint(1, 0xFFFFFFFF)) + service = Service(types, scopes, x_addrs, epr, instance_id, metadata_version=metadata_version) + self._logger.info("publishing %r", service) + self._local_services[epr] = service + + service.increment_message_number() + app_sequence = wsd_types.AppSequenceType() + app_sequence.InstanceId = int(service.instance_id) + app_sequence.MessageNumber = service.message_number + + payload = wsd_types.HelloType() + payload.Types = service.types + payload.Scopes = service.scopes + payload.XAddrs = service.x_addrs + payload.EndpointReference.Address = service.epr + + inf = HeaderInformationBlock(action=payload.action, addr_to=ADDRESS_ALL) + + created_message = _mk_wsd_soap_message(inf, payload) + created_message.p_msg.add_header_element( + app_sequence.as_etree_node(nsh.WSD.tag("AppSequence"), ns_map=nsh.partial_map(nsh.WSD)), + ) + created_message = _mk_wsd_soap_message(inf, payload) + self._soap_client.post_message_to("", created_message) + + def send_subscribe(self) -> ReceivedMessage: + """Send subscribe message.""" + subscribe_request = eventing_types.Subscribe() + subscribe_request.Delivery.NotifyTo.Address = f"https://{self._my_address}:{self._my_server_port}" + subscribe_request.Expires = 3600 + subscribe_request.set_filter("", dialect="http://discoproxy") + inf = HeaderInformationBlock(action=subscribe_request.action, addr_to=ADDRESS_ALL) + created_message = _mk_wsd_soap_message(inf, subscribe_request) + received_message = self._soap_client.post_message_to("", created_message) + response_action = received_message.action + if response_action == EventingActions.SubscribeResponse: + self.subscribe_response = eventing_types.SubscribeResponse.from_node(received_message.p_msg.msg_node) + elif response_action == Fault.NODETYPE: + fault = Fault.from_node(received_message.p_msg.msg_node) + self._logger.error( # noqa: PLE1205 + "subscribe: Fault response : {}", + fault, + ) + return received_message + + def send_unsubscribe(self): + """Send an unsubscribe request to the provider and handle the response.""" + if not self.subscribe_response: + return + subscribe_response, self.subscribe_response = self.subscribe_response, None + request = eventing_types.Unsubscribe() + dev_reference_param = subscribe_response.SubscriptionManager.ReferenceParameters + subscription_manager_address = subscribe_response.SubscriptionManager.Address + inf = HeaderInformationBlock( + action=request.action, + addr_to=subscription_manager_address, + reference_parameters=dev_reference_param, + ) + message = message_factory.mk_soap_message(inf, payload=request) + received_message_data = self._soap_client.post_message_to("", message, msg="unsubscribe") + response_action = received_message_data.action + # check response: response does not contain explicit status. If action== UnsubscribeResponse all is fine. + if response_action == EventingActions.UnsubscribeResponse: + self._logger.info( # noqa: PLE1205 + "unsubscribe: end of subscription {} was confirmed.", + self.notification_url, + ) + elif response_action == Fault.NODETYPE: + fault = Fault.from_node(received_message_data.p_msg.msg_node) + self._logger.error( # noqa: PLE1205 + "unsubscribe: Fault response : {}", + fault, + ) + + else: + self._logger.error( # noqa: PLE1205 + "unsubscribe: unexpected response action: {}", + received_message_data.p_msg.raw_data, + ) + raise ValueError(f"unsubscribe: unexpected response action: {received_message_data.p_msg.raw_data}") + + def clear_service(self, epr: str): + """Clear services.""" + service = self._local_services[epr] + self._send_bye(service) + del self._local_services[epr] + + def _send_bye(self, service: Service): + self._logger.debug("sending bye on %s", service) + + bye = wsd_types.ByeType() + bye.EndpointReference.Address = service.epr + + inf = HeaderInformationBlock(action=bye.action, addr_to=ADDRESS_ALL) + + app_sequence = wsd_types.AppSequenceType() + app_sequence.InstanceId = int(service.instance_id) + app_sequence.MessageNumber = service.message_number + + created_message = _mk_wsd_soap_message(inf, bye) + created_message.p_msg.add_header_element( + app_sequence.as_etree_node(nsh.WSD.tag("AppSequence"), ns_map=nsh.partial_map(nsh.WSD)), + ) + created_message = _mk_wsd_soap_message(inf, bye) + received_message = self._soap_client.post_message_to("", created_message) # noqa: F841 + # hello_response = wsd_types.HelloType.from_node(received_message.p_msg.msg_node) # noqa: ERA001 + # print(hello_response)# noqa: ERA001 + + def on_post(self, request_data: RequestData) -> CreatedMessage: + """On post.""" + print("on_post") + if request_data.message_data.action == wsd_types.HelloType.action: + hello = wsd_types.HelloType.from_node(request_data.message_data.p_msg.msg_node) + service = Service( + types=hello.Types, + scopes=hello.Scopes, + x_addrs=hello.XAddrs, + epr=hello.EndpointReference.Address, + instance_id="", # Todo: needed in any way? + metadata_version=hello.MetadataVersion, + ) + self._remote_services[service.epr] = service + self._logger.info("hello epr = %s, xaddrs =%r", service.epr, service.x_addrs) + + elif request_data.message_data.action == wsd_types.ByeType.action: + bye = wsd_types.ByeType.from_node(request_data.message_data.p_msg.msg_node) + epr = bye.EndpointReference.Address + self._logger.info("bye epr = %s, xaddrs =%r", epr, bye.XAddrs) + if epr in self._remote_services: + del self._remote_services[epr] + self._logger.info("removed %s from known remote services") + else: + self._logger.info("unknown remote service %s", epr) + return EmptyResponse() + + def on_get(self, _: RequestData) -> CreatedMessage: + """On get.""" + print("on_get") + return EmptyResponse() + + +if __name__ == "__main__": + + def mk_provider( + wsd: DiscoProxyClient, + mdib_path: str, + uuid_str: str, + ssl_contexts: SSLContextContainer, + ) -> SdcProvider: + """Create sdc provider.""" + my_mdib = ProviderMdib.from_mdib_file(mdib_path) + print(f"UUID for this device is {uuid_str}") + dpws_model = ThisModelType( + manufacturer="sdc11073", + manufacturer_url="www.sdc11073.com", + model_name="TestDevice", + model_number="1.0", + model_url="www.sdc11073.com/model", + presentation_url="www.sdc11073.com/model/presentation", + ) + + dpws_device = ThisDeviceType(friendly_name="TestDevice", firmware_version="Version1", serial_number="12345") + specific_components = None + return SdcProvider( + wsd, + dpws_model, + dpws_device, + my_mdib, + UUID(uuid_str), + ssl_context_container=ssl_contexts, + specific_components=specific_components, + max_subscription_duration=15, + ) + + def log_services(log: LoggerAdapter, the_services: list[Service]): + """Print the found services.""" + log.info("found %d services:", len(the_services)) + for the_service in the_services: + log.info("found service: %r", the_service) + + def main(): + """Execute disco proxy.""" + # example code how to use the DiscoProxyClient. + # It assumes a discovery proxy is reachable on disco_ip address. + basic_logging_setup() + logger = get_logger_adapter("sdc.disco.main") + ca_folder = r"C:\tmp\ORNET_REF_Certificates" + ssl_passwd = "dummypass" # noqa: S105 + disco_ip = "192.168.30.5:33479" + my_ip = "192.168.30.106" + my_uuid_str = "12345678-6f55-11ea-9697-123456789bcd" + mdib_path = os.getenv("ref_mdib") or str( # noqa:SIM112 + pathlib.Path(__file__).parent.joinpath("mdib_test_sequence_2_v4(temp).xml"), + ) + ref_fac = os.getenv("ref_fac") or "r_fac" # noqa:SIM112 + ref_poc = os.getenv("ref_poc") or "r_poc" # noqa:SIM112 + ref_bed = os.getenv("ref_bed") or "r_bed" # noqa:SIM112 + loc = SdcLocation(ref_fac, ref_poc, ref_bed) + + ssl_contexts = mk_ssl_contexts_from_folder( + ca_folder, + cyphers_file=None, + private_key="user_private_key_encrypted.pem", + certificate="user_certificate_root_signed.pem", + ca_public_key="root_certificate.pem", + ssl_passwd=ssl_passwd, + ) + + proxy = DiscoProxyClient(disco_ip, my_ip, ssl_contexts) + proxy.start() + try: + services = proxy.search_services() + log_services(logger, services) + + # now publish a device + logger.info("location for this device is %s", loc) + logger.info("start provider...") + + sdc_provider = mk_provider(proxy, mdib_path, my_uuid_str, ssl_contexts) + sdc_provider.start_all() + + validators = [pm_types.InstanceIdentifier("Validator", extension_string="System")] + sdc_provider.set_location(loc, validators) + + services = proxy.search_services() + log_services(logger, services) + for service in services: + result = proxy.send_resolve(service.epr) + logger.info("resolvematches: %r", result.ResolveMatch) + + time.sleep(5) + logger.info("stop provider...") + sdc_provider.stop_all() + + services = proxy.search_services() + log_services(logger, services) + + finally: + proxy.stop() + + main() diff --git a/examples/ReferenceTestV2/logging_default.json b/examples/ReferenceTestV2/logging_default.json new file mode 100644 index 00000000..3e4ee096 --- /dev/null +++ b/examples/ReferenceTestV2/logging_default.json @@ -0,0 +1,110 @@ +{ + "version": 1, + "formatters":{ + "default":{ + "class": "logging.Formatter", + "format": "{asctime} - {name} - {levelname} - {message}", + "style": "{" + } + }, + "handlers":{ + "file":{ + "class": "logging.FileHandler", + "filename": "sdc_ref_dev.log", + "mode":"w", + "formatter": "default" + }, + "console": { + "class": "logging.StreamHandler", + "formatter": "default" + } + }, + "loggers":{ + "sdc": { + "handlers": ["file", "console"], + "level":"INFO" + }, + "sdc.discover": { + "level": "INFO" + }, + "sdc.discover.monitor": { + "level": null + }, + "sdc.schema_resolver": { + "comment": "logs resolving of schema locations", + "level": null + }, + "sdc.device": { + "comment": "base logger for device", + "level": null + }, + "sdc.device.soap": { + "level": null + }, + "sdc.device.mdib": { + "level": null + }, + "sdc.device.httpsrv": { + "level": null + }, + "sdc.device.op_worker": { + "comment": "logs execution of called operations", + "level": null + }, + "sdc.device.op_reg": { + "comment": "logs operation creation ", + "level": null + }, + "sdc.device.op_mgr": { + "comment": "logs operation calls", + "level": null + }, + "sdc.device.op": { + "comment": "for all operations", + "level": null + }, + "sdc.device.ops": { + "level": null + }, + "sdc.device.subscrMgr": { + "level": null + }, + "sdc.device.player": { + "comment": "if set to DEBUG, it will log all notifications that are replayed", + "level": null + }, + "sdc.client": { + "comment": "base logger for client", + "level": "WARN" + }, + "sdc.client.soap": { + "level": null + }, + "sdc.client.subscr": { + "comment": "logs events per subscription, e.g. renew", + "level": null + }, + "sdc.client.subscrMgr": { + "comment": "logs events of Subscriptions Manager, mainly errors if something goes wrong.", + "level": null + }, + "sdc.client.notif_dispatch": { + "comment": "debug level logs every incoming notification", + "level": null + }, + "sdc.client.mdib": { + "level": null + }, + "sdc.client.mdib.rt": { + "comment": "logs for real time data buffer", + "level": null + }, + "sdc.client.wf": { + "comment": "on debug it logs every incoming waveform", + "level": null + }, + "sdc.client.op_mgr": { + "level": null + } + } +} diff --git a/examples/ReferenceTestV2/mdib_test_sequence_2_v4(temp).xml b/examples/ReferenceTestV2/mdib_test_sequence_2_v4(temp).xml new file mode 100644 index 00000000..60d50f88 --- /dev/null +++ b/examples/ReferenceTestV2/mdib_test_sequence_2_v4(temp).xml @@ -0,0 +1,283 @@ + + + + + + + + + + equipment-label + + + + + SDPi Test MDS + + + + + Alert Condition that is present when the source metric exceeds 80 + + numeric_metric_1.channel_1.vmd_0.mds_0 + + + + + + + + Adds a exactly one Patient Context to the operation target by using the attributes and elements from the SCO Operation's payload + + + + + Sets the value of the targeted Numeric Metric + + + + + Sets the value of the targeted Enum String Metric + + + + + Performs nothing. The operational state will be toggled periodically at least every 5 seconds in order to produce Operational State Reports. + + + + + + + + + + + None + + + NTPv4 + + + EBWW + + + + + + Magnitude ampere(s) hour + + + + + Magnitude volt(s) + + + + + + SDPi Test VMD that contains settings and measurements including waveforms + + + + Channel that contains settings + + + + Numeric setting, externally controllable + + + Dimensionless + + + + + + Enum setting, externally controllable + + + Dimensionless + + + ON + + Enum Value ON + + + + OFF + + Enum Value OFF + + + + STANDBY + + Enum Value STANDBY + + + + + + String setting + + + Dimensionless + + + + + + Channel that contains measurements + + + + Periodically determined intermittent numeric measurement metric + + + Dimensionless + + + + + + Waveform metric 1 + + + Dimensionless + + + + + Waveform metric 2 + + + Dimensionless + + + + + Waveform metric 3 + + + Dimensionless + + + + + + + SDPi Test VMD that contains settings to be externally controlled by bulk operations + + + + + Sets the @Value of 2 metric states at once + + + + + + Channel that contains settings to be externally controlled by bulk operations + + + + Numeric setting, externally controllable by bulk update + + + Dimensionless + + + + + Numeric setting, externally controllable by bulk update + + + Dimensionless + + + + + + + + SDPi Test MDS used for description modification reports. This MDS periodically inserts and deletes a VMD including Channels including Metrics. + + + + SDPi Test VMD that contains a metric and an alarm for which units and cause-remedy information is periodically updated (description updates) + + + + + An alert condition that periodically changes its cause-remedy information at least every 5 seconds + + numeric_metric_0.channel_0.vmd_0.mds_1 + + + Remedy Info + + Cause Info + + + + + + Channel that contains a metric which is periodically changing its unit of measure + + + + Flow Rate: Numeric measurement that periodically changes the unit of measure at least every 5 seconds + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/ReferenceTestV2/reference_consumer_v2.py b/examples/ReferenceTestV2/reference_consumer_v2.py new file mode 100644 index 00000000..d47232d3 --- /dev/null +++ b/examples/ReferenceTestV2/reference_consumer_v2.py @@ -0,0 +1,835 @@ +"""Implementation of reference consumer v2. + +If a value is not provided as environment variable, the default value (see code below) will be used. +""" + +from __future__ import annotations + +import os +import pathlib +import sys +import time +import traceback +import uuid +from collections import defaultdict +from dataclasses import dataclass +from decimal import Decimal +from typing import TYPE_CHECKING + +import sdc11073 +from sdc11073 import network, observableproperties +from sdc11073.certloader import mk_ssl_contexts_from_folder +from sdc11073.consumer import SdcConsumer +from sdc11073.definitions_sdc import SdcV1Definitions +from sdc11073.mdib.consumermdib import ConsumerMdib +from sdc11073.mdib.consumermdibxtra import ConsumerMdibMethods +from sdc11073.wsdiscovery import WSDiscovery +from sdc11073.xml_types import msg_types, pm_qnames + +if TYPE_CHECKING: + from lxml.etree import QName + + from sdc11073.loghelper import LoggerAdapter + from sdc11073.pysoap.msgreader import ReceivedMessage + from sdc11073.wsdiscovery.service import Service + from sdc11073.xml_types.eventing_types import FilterType + +ConsumerMdibMethods.DETERMINATIONTIME_WARN_LIMIT = 2.0 + + +numeric_metric_handle = "numeric_metric_0.channel_0.vmd_0.mds_0" +alert_condition_handle = "alert_condition_0.vmd_0.mds_1" +set_value_handle = "set_value_0.sco.mds_0" +set_string_handle = "set_string_0.sco.mds_0" +set_context_state_handle = "set_context_0.sco.mds_0" + + +def get_network_adapter() -> network.NetworkAdapter: + """Get network adapter from environment or first loopback.""" + if (ip := os.getenv("ref_ip")) is not None: # noqa: SIM112 + return network.get_adapter_containing_ip(ip) + # get next available loopback adapter + return next(adapter for adapter in network.get_adapters() if adapter.is_loopback) + + +def get_ssl_context() -> sdc11073.certloader.SSLContextContainer | None: + """Get ssl context from environment or None.""" + if (ca_folder := os.getenv("ref_ca")) is None: # noqa: SIM112 + return None + return mk_ssl_contexts_from_folder( + ca_folder, + private_key="user_private_key_encrypted.pem", + certificate="user_certificate_root_signed.pem", + ca_public_key="root_certificate.pem", + cyphers_file=None, + ssl_passwd=os.getenv("ref_ssl_passwd"), # noqa:SIM112 + ) + + +def get_epr() -> uuid.UUID: + """Get epr from environment or default.""" + if (epr := os.getenv("ref_search_epr")) is not None: # noqa: SIM112 + return uuid.UUID(epr) + return uuid.UUID("12345678-6f55-11ea-9697-123456789abc") + + +@dataclass +class ResultEntry: + """Represents one result entry.""" + + verdict: bool | None + step: str + info: str + xtra: str + + def __str__(self): + verdict_str = {None: "no result", True: "passed", False: "failed"} + return f"{self.step:6s}:{verdict_str[self.verdict]:10s} {self.info}{self.xtra}" + + +class ResultsCollector: + """Result collector.""" + + def __init__(self): + self._results: list[ResultEntry] = [] + + def log_result(self, is_ok: bool | None, step: str, info: str, extra_info: str | None = None): + """Log the result.""" + xtra = f" ({extra_info}) " if extra_info else "" + self._results.append(ResultEntry(is_ok, step, info, xtra)) + + def print_summary(self): + """Print the summary.""" + print("\n### Summary ###") + for r in self._results: + print(r) + + @property + def failed_count(self) -> int: + """Get the amount of failures.""" + return len([r for r in self._results if r.verdict is False]) + + +class ConsumerMdibMethodsReferenceTest(ConsumerMdibMethods): + """Consumer mdib reference test.""" + + def __init__(self, consumer_mdib: ConsumerMdib, logger: LoggerAdapter): + super().__init__(consumer_mdib, logger) + self.alert_condition_type_concept_updates: list[float] = [] # for test 5a.1 + self._last_alert_condition_type_concept_updates = time.monotonic() # timestamp + + self.alert_condition_cause_remedy_updates: list[float] = [] # for test 5a.2 + self._last_alert_condition_cause_remedy_updates = time.monotonic() # timestamp + + self.unit_of_measure_updates: list[float] = [] # for test 5a.3 + self._last_unit_of_measure_updates = time.monotonic() # timestamp + + def _on_episodic_metric_report(self, received_message_data: ReceivedMessage): + # test 4.1 : count numeric metric updates + # The Reference Provider produces at least 5 numeric metric updates in 30 seconds + super()._on_episodic_metric_report(received_message_data) + + def _on_description_modification_report(self, received_message_data: ReceivedMessage): # noqa: C901 + """For Test 5a.1 check if the concept description of updated alert condition Type changed. + + For Test 5a.2 check if alert condition cause-remedy information changed. + """ + cls = self._mdib.data_model.msg_types.DescriptionModificationReport + report = cls.from_node(received_message_data.p_msg.msg_node) + now = time.monotonic() + dmt = self._mdib.sdc_definitions.data_model.msg_types.DescriptionModificationType + for report_part in report.ReportPart: + modification_type = report_part.ModificationType + if modification_type == dmt.UPDATE: + for descriptor_container in report_part.Descriptor: + if descriptor_container.is_alert_condition_descriptor: + old_descriptor = self._mdib.descriptions.handle.get_one(descriptor_container.Handle) + # test 5a.1 + if descriptor_container.Type.ConceptDescription != old_descriptor.Type.ConceptDescription: + print( + f"concept description {descriptor_container.Type.ConceptDescription} <=> " + f"{old_descriptor.Type.ConceptDescription}", + ) + self.alert_condition_type_concept_updates.append( + now - self._last_alert_condition_type_concept_updates, + ) + self._last_alert_condition_type_concept_updates = now + # test 5a.2 + # (CauseInfo is a list) + detected_5a2 = False + if len(descriptor_container.CauseInfo) != len(old_descriptor.CauseInfo): + print( + f"RemedyInfo no. of CauseInfo {len(descriptor_container.CauseInfo)} <=> " + f"{len(old_descriptor.CauseInfo)}", + ) + detected_5a2 = True + else: + for i, cause_info in enumerate(descriptor_container.CauseInfo): + old_cause_info = old_descriptor.CauseInfo[i] + if cause_info.RemedyInfo != old_cause_info.RemedyInfo: + print(f"RemedyInfo {cause_info.RemedyInfo} <=> {old_cause_info.RemedyInfo}") + detected_5a2 = True + if detected_5a2: + self.alert_condition_cause_remedy_updates.append( + now - self._last_alert_condition_cause_remedy_updates, + ) + self._last_alert_condition_cause_remedy_updates = now + elif descriptor_container.is_metric_descriptor: + # test 5a.3 + old_descriptor = self._mdib.descriptions.handle.get_one(descriptor_container.Handle) + if old_descriptor.Unit != descriptor_container.Unit: + self.unit_of_measure_updates.append(now - self._last_unit_of_measure_updates) + self._last_unit_of_measure_updates = now + + super()._on_description_modification_report(received_message_data) + + +def test_1b_resolve(wsd: WSDiscovery, my_service: Service) -> (bool, str): + """Send resolve and check response.""" + wsd.clear_remote_services() + wsd._send_resolve(my_service.epr) # noqa: SLF001 + time.sleep(3) + if len(wsd._remote_services) == 0: # noqa: SLF001 + return False, "no response" + if len(wsd._remote_services) > 1: # noqa: SLF001 + return False, "multiple response" + service = wsd._remote_services.get(my_service.epr) # noqa: SLF001 + if service.epr != my_service.epr: + return False, "not the same epr" + return True, "resolve answered" + + +def connect_client(my_service: Service) -> SdcConsumer: + """Connect sdc consumer.""" + client = SdcConsumer.from_wsd_service(my_service, ssl_context_container=get_ssl_context(), validate=True) + client.start_all() + return client + + +def test_min_updates_per_handle( + updates_dict: dict, + min_updates: int, + node_type_filter: FilterType = None, +) -> (bool, str): # True ok + """Test minimum updates per handle.""" + results = [] + is_ok = True + if len(updates_dict) == 0: + is_ok = False + results.append("no updates") + else: + for k, v in updates_dict.items(): + if node_type_filter: + v = [n for n in v if node_type_filter == n.NODETYPE] # noqa: PLW2901 + if len(v) < min_updates: + is_ok = False + results.append(f"Handle {k} only {len(v)} updates, expect >= {min_updates}") + return is_ok, "\n".join(results) + + +def test_min_updates_for_type(updates_dict: dict, min_updates: int, q_name_list: list[QName]) -> (bool, str): # True ok + """Verify minimum updates for specified type.""" + flat_list = [] + for v in updates_dict.values(): + flat_list.extend(v) + matches = [x for x in flat_list if x.NODETYPE in q_name_list] + if len(matches) >= min_updates: + return True, "" + return False, f"expect >= {min_updates}, got {len(matches)} out of {len(flat_list)}" + + +def run_ref_test(results_collector: ResultsCollector) -> ResultsCollector: # noqa: PLR0915,PLR0912,C901 + """Run reference test.""" + adapter_ip = get_network_adapter().ip + search_epr = str(get_epr()) + print(f"using adapter address {adapter_ip}") + print(f'Test step 1: discover device which endpoint ends with "{search_epr}"') + wsd = WSDiscovery(adapter_ip) + wsd.start() + + # 1. Device Discovery + # a) The Reference Provider sends Hello messages + # b) The Reference Provider answers to Probe and Resolve messages + + # Remark: 1a) is not testable because provider can't be forced to send a hello while this test is running. + step = "1a" + info = "The Reference Provider sends Hello messages" + results_collector.log_result(None, step, info, extra_info="not testable") + + step = "1b.1" + info = "The Reference Provider answers to Probe messages" + my_service = None + while my_service is None: + services = wsd.search_services(types=SdcV1Definitions.MedicalDeviceTypesFilter) + print("found {} services {}".format(len(services), ", ".join([s.epr for s in services]))) + for s in services: + if s.epr.endswith(search_epr): + my_service = s + print(f"found service {s.epr}") + break + print("Test step 1 successful: device discovered") + results_collector.log_result(True, step, info) + + step = "1b.2" + info = "The Reference Provider answers to Resolve messages" + print("Test step 1b: send resolve and check response") + is_ok, txt = test_1b_resolve(wsd, my_service) + results_collector.log_result(is_ok, step, info, extra_info=txt) + + # 2. BICEPS Services Discovery and binding + # a) The Reference Provider answers to TransferGet + # b) The SDCri Reference Provider grants subscription runtime of at most 15 seconds in order to enforce Reference Consumers to send renew requests + + """2. BICEPS Services Discovery and binding + a) The Reference Provider answers to TransferGet + b) The Reference Consumer renews at least one subscription once during the test phase; + the Reference Provider grants subscriptions of at most 15 seconds + (this allows for the Reference Consumer to verify if auto-renew works)""" + step = "2a" + info = "The Reference Provider answers to TransferGet" + print(step, info) + try: + client = connect_client(my_service) + results_collector.log_result(client.host_description is not None, step, info) + except Exception: # noqa: BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info) + return None # results + + step = "2b.1" + info = "the Reference Provider grants subscriptions of at most 15 seconds" + now = time.time() + durations = [s.expires_at - now for s in client.subscription_mgr.subscriptions.values()] + print(f"subscription durations = {durations}") + results_collector.log_result(max(durations) <= 15, step, info) # noqa: PLR2004 + step = "2b.2" + info = "the Reference Provider grants subscriptions of at most 15 seconds (renew)" + subscription = list(client.subscription_mgr.subscriptions.values())[0] # noqa: RUF015 + granted = subscription.renew(30000) + print(f"renew granted = {granted}") + results_collector.log_result(max(durations) <= 15, step, info) # noqa: PLR2004 + + # 3. Request Response + # a) The Reference Provider answers to GetMdib + # b) The Reference Provider answers to GetContextStates messages + # b.1) The Reference Provider provides at least one location context state + step = "3a" + info = "The Reference Provider answers to GetMdib" + print(step, info) + try: + mdib = ConsumerMdib(client, extras_cls=ConsumerMdibMethodsReferenceTest) + mdib.init_mdib() # throws an exception if provider did not answer to GetMdib + results_collector.log_result(True, step, info) + except Exception: # noqa: BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info) + return None # results + + step = "3b" + info = "The Reference Provider answers to GetContextStates messages" + context_service = client.context_service_client + if context_service is None: + results_collector.log_result(False, step, info, extra_info="no context service") + else: + try: + states = context_service.get_context_states().result.ContextState + results_collector.log_result(True, step, info) + except Exception: # noqa: BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info, extra_info="exception") + step = "3b.1" + info = "The Reference Provider provides at least one location context state" + loc_states = [s for s in states if pm_qnames.LocationContextState == s.NODETYPE] + results_collector.log_result(len(loc_states) > 0, step, info) + + # 4 State Reports + # a) The Reference Provider produces at least 5 numeric metric updates in 30 seconds + # b) The Reference Provider produces at least 5 string metric updates (StringMetric or EnumStringMetric) in 30 seconds + # c) The Reference Provider produces at least 5 alert condition updates (AlertCondition or LimitAlertCondition) in 30 seconds + # d) The Reference Provider produces at least 5 alert signal updates in 30 seconds + # e) The Reference Provider provides alert system self checks in accordance to the periodicity defined in the MDIB (at least every 10 seconds) + # f) The Reference Provider provides 3 waveforms (RealTimeSampleArrayMetric) x 10 messages per second x 100 samples per message + # g) The Reference Provider provides changes for the following components: + # * At least 5 Clock or Battery object updates in 30 seconds (Component report) + # * At least 5 MDS or VMD updates in 30 seconds (Component report) + # g) The Reference Provider provides changes for the following operational states: + # At least 5 Operation updates in 30 seconds; enable/disable operations; some different than the ones mentioned above (Operational State Report)""" + + # setup data collectors for next test steps + numeric_metric_updates = defaultdict(list) + string_metric_updates = defaultdict(list) + alert_condition_updates = defaultdict(list) + alert_signal_updates = defaultdict(list) + alert_system_updates = defaultdict(list) + component_updates = defaultdict(list) + waveform_updates = defaultdict(list) + operational_state_updates = defaultdict(list) + description_updates = [] + + def on_metric_updates(metrics_by_handle: dict): + """Write to numeric_metric_updates or string_metric_updates, depending on type of state.""" + for k, v in metrics_by_handle.items(): + print(f"State {v.NODETYPE.localname} {v.DescriptorHandle}") + if pm_qnames.NumericMetricState == v.NODETYPE: + numeric_metric_updates[k].append(v) + elif pm_qnames.StringMetricState == v.NODETYPE: + string_metric_updates[k].append(v) + + def on_alert_updates(alerts_by_handle: dict): + """Write to alert_condition_updates, alert_signal_updates or alert_system_updates, depending on type of state.""" + for k, v in alerts_by_handle.items(): + print(f"State {v.NODETYPE.localname} {v.DescriptorHandle}") + if v.is_alert_condition: + alert_condition_updates[k].append(v) + elif v.is_alert_signal: + alert_signal_updates[k].append(v) + elif pm_qnames.AlertSystemState == v.NODETYPE: + alert_system_updates[k].append(v) + + def on_component_updates(components_by_handle: dict): + """Write to component_updates.""" + for k, v in components_by_handle.items(): + print(f"State {v.NODETYPE.localname} {v.DescriptorHandle}") + component_updates[k].append(v) + + def on_waveform_updates(waveforms_by_handle: dict): + """Write to waveform_updates.""" + for k, v in waveforms_by_handle.items(): + waveform_updates[k].append(v) + + def on_description_modification(description_modification_report: dict): + """Write to description_updates.""" + print("on_description_modification") + description_updates.append(description_modification_report) + + def on_operational_state_updates(operational_states_by_handle: dict): + """Write to operational_state_updates.""" + for k, v in operational_states_by_handle.items(): + print(f"State {v.NODETYPE.localname} {v.DescriptorHandle}") + operational_state_updates[k].append(v) + + observableproperties.bind(mdib, metrics_by_handle=on_metric_updates) + observableproperties.bind(mdib, alert_by_handle=on_alert_updates) + observableproperties.bind(mdib, component_by_handle=on_component_updates) + observableproperties.bind(mdib, waveform_by_handle=on_waveform_updates) + observableproperties.bind(mdib, description_modifications=on_description_modification) + observableproperties.bind(mdib, operation_by_handle=on_operational_state_updates) + + # now collect reports + sleep_timer = 30 + min_updates = 5 + print(f"will wait for {sleep_timer} seconds now, expecting at least {min_updates} updates per Handle") + time.sleep(sleep_timer) + + # now check report count + step = "4a" + info = "count numeric metric state updates" + print(step, info) + is_ok, result = test_min_updates_per_handle(numeric_metric_updates, min_updates) + results_collector.log_result(is_ok, step, info) + + step = "4b" + info = "count string metric state updates" + print(step) + is_ok, result = test_min_updates_per_handle(string_metric_updates, min_updates) + results_collector.log_result(is_ok, step, info) + + step = "4c" + info = "count alert condition updates" + print(step) + is_ok, result = test_min_updates_per_handle(alert_condition_updates, min_updates) + results_collector.log_result(is_ok, step, info) + + step = "4d" + info = " count alert signal updates" + print(step, info) + is_ok, result = test_min_updates_per_handle(alert_signal_updates, min_updates) + results_collector.log_result(is_ok, step, info) + + step = "4e" + info = "count alert system self checks" + is_ok, result = test_min_updates_per_handle(alert_system_updates, min_updates) + results_collector.log_result(is_ok, step, info) + + step = "4f" + info = "count waveform updates" + # 3 waveforms (RealTimeSampleArrayMetric) x 10 messages per second x 100 samples per message + print(step, info) + is_ok, result = test_min_updates_per_handle(waveform_updates, min_updates) + results_collector.log_result(is_ok, step, info + " notifications per second") + results_collector.log_result( + len(waveform_updates) >= 3, # noqa:PLR2004 + step, + info + f" number of waveforms: {len(waveform_updates)}", + ) + + expected_samples = int(os.getenv("EXPECTED_WAVEFORM_SAMPLES_4F", 1000 * sleep_timer * 0.9)) + for handle, reports in waveform_updates.items(): + notifications = [n for n in reports if n.MetricValue is not None] + samples = sum([len(n.MetricValue.Samples) for n in notifications]) + if samples < expected_samples: + results_collector.log_result( + False, + step, + info + f" waveform {handle} has {samples} samples, expecting {expected_samples}", + ) + else: + results_collector.log_result( + True, + step, + info + f" waveform {handle} has more than {expected_samples} samples: {samples}", + ) + + pm = mdib.data_model.pm_names + pm_types = mdib.data_model.pm_types + + step = "4g.1" + info = "count battery or clock updates" + print(step, info) + is_ok, result = test_min_updates_for_type(component_updates, min_updates, [pm.BatteryState, pm.ClockState]) + results_collector.log_result(is_ok, step, info) + + step = "4g.2" + info = "count VMD or MDS updates" + print(step, info) + is_ok, result = test_min_updates_for_type(component_updates, min_updates, [pm.VmdState, pm.MdsState]) + results_collector.log_result(is_ok, step, info) + + step = "4h" + info = "Enable/Disable operations" + print(step, info) + is_ok, result = test_min_updates_for_type( + operational_state_updates, + min_updates, + [ + pm.SetValueOperationState, + pm.SetStringOperationState, + pm.ActivateOperationState, + pm.SetContextStateOperationState, + pm.SetMetricStateOperationState, + pm.SetComponentStateOperationState, + pm.SetAlertStateOperationState, + ], + ) + results_collector.log_result(is_ok, step, info) + + # 5 Description Modifications: + # a) The Reference Provider produces at least 1 update every 10 seconds comprising + # * Update Alert condition concept description of Type + # * Update Alert condition cause-remedy information + # * Update Unit of measure (metrics) + # b) The Reference Provider produces at least 1 insertion followed by a deletion every 10 seconds comprising + # * Insert a VMD including Channels including metrics (inserted VMDs/Channels/Metrics are required to have + # a new handle assigned on each insertion such that containment tree entries are not recycled). + # (Tests for the handling of re-insertion of previously inserted objects should be tested additionally) + # * Remove the VMD + step = "5a.1" + info = "Update Alert condition concept description of Type" + print(step, info) + # verify only that there are Alert Condition Descriptors updated + updates = mdib.xtra.alert_condition_type_concept_updates + if not updates: + results_collector.log_result(False, step, info, "no updates") + else: + max_diff = max(updates) + if max_diff > 10: # noqa:PLR2004 + results_collector.log_result(False, step, info, f"max dt={max_diff}") + else: + results_collector.log_result(True, step, info, f"{len(updates) - 1} updates, max diff= {max_diff:.1f}") + + step = "5a.2" + info = "Update Alert condition cause-remedy information" + print(step, info) + # verify only that there are remedy infos updated + updates = mdib.xtra.alert_condition_cause_remedy_updates + if not updates: + results_collector.log_result(False, step, info, "no updates") + else: + max_diff = max(updates) + if max_diff > 10: # noqa:PLR2004 + results_collector.log_result(False, step, info, f"{updates} => max dt={max_diff}") + else: + results_collector.log_result(True, step, info, f"{len(updates) - 1} updates, max diff= {max_diff:.1f}") + + step = "5a.3" + info = "Update Unit of measure" + print(step, info) + updates = mdib.xtra.unit_of_measure_updates + if not updates: + results_collector.log_result(False, step, info, "no updates") + else: + max_diff = max(updates) + if max_diff > 10: # noqa: PLR2004 + results_collector.log_result(False, step, info, f"max dt={max_diff}") + else: + results_collector.log_result(True, step, info, f"{len(updates) - 1} updates, max diff= {max_diff:.1f}") + + step = "5b" + info = "Add / remove vmd" + print(step, info) + # verify only that there are Alert Condition Descriptors updated + add_found = False + rm_found = False + for report in description_updates: + for report_part in report.ReportPart: + if report_part.ModificationType == msg_types.DescriptionModificationType.CREATE: + for descriptor in report_part.Descriptor: + if pm_qnames.VmdDescriptor == descriptor.NODETYPE: + add_found = True + if report_part.ModificationType == msg_types.DescriptionModificationType.DELETE: + for descriptor in report_part.Descriptor: + if pm_qnames.VmdDescriptor == descriptor.NODETYPE: + rm_found = True + results_collector.log_result(add_found, step, info, "add") + results_collector.log_result(rm_found, step, info, "remove") + + # 6 Operation invocation + # a) (removed) + # b) SetContextState: + # * Payload: 1 Patient Context + # * Context state is added to the MDIB including context association and validation + # * If there is an associated context already, that context shall be disassociated + # * Handle and version information is generated by the provider + # * In order to avoid infinite growth of patient contexts, older contexts are allowed to be removed from the MDIB + # (=ContextAssociation=No) + # c) SetValue: Immediately answers with "finished" + # * Finished has to be sent as a report in addition to the response => + # d) SetString: Initiates a transaction that sends Wait, Start and Finished + # e) SetMetricStates: + # * Payload: 2 Metric States (settings; consider alert limits) + # * Immediately sends finished + # * Action: Alter values of metrics + + step = "6b" + info = "SetContextState" + print(step, info) + # patients = mdib.context_states.NODETYPE.get(pm.PatientContextState, []) # noqa: ERA001 + patient_context_descriptors = mdib.descriptions.NODETYPE.get(pm.PatientContextDescriptor, []) + generated_family_names = [] + if len(patient_context_descriptors) == 0: + results_collector.log_result(False, step, info, extra_info="no PatientContextDescriptor") + else: + try: + for i, p in enumerate(patient_context_descriptors): # noqa: B007 + pat = client.context_service_client.mk_proposed_context_object(p.Handle) + pat.CoreData.Familyname = uuid.uuid4().hex + pat.ContextAssociation = pm_types.ContextAssociation.ASSOCIATED + generated_family_names.append(pat.CoreData.Familyname) + client.context_service_client.set_context_state(set_context_state_handle, [pat]) + time.sleep(1) # allow update notification to arrive + patients = mdib.context_states.NODETYPE.get(pm_qnames.PatientContextState, []) + if len(patients) == 0: + results_collector.log_result(False, step, info, extra_info="no patients found") + else: + all_ok = True + for patient in patients: + if patient.CoreData.Familyname in generated_family_names: + if patient.ContextAssociation != pm_types.ContextAssociation.ASSOCIATED: + results_collector.log_result( + False, + step, + info, + extra_info=f"new patient {patient.CoreData.Familyname} is {patient.ContextAssociation}", + ) + all_ok = False + elif patient.ContextAssociation == pm_types.ContextAssociation.ASSOCIATED: + results_collector.log_result( + False, + step, + info, + extra_info=f"old patient {patient.CoreData.Familyname} is {patient.ContextAssociation}", + ) + all_ok = False + results_collector.log_result(all_ok, step, info) + except Exception as ex: # noqa: BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info, ex) + + step = "6c" + info = 'SetValue: Immediately answers with "finished"' + print(step, info) + subscriptions = client.subscription_mgr.subscriptions.values() + operation_invoked_subscriptions = [ + subscr for subscr in subscriptions if "OperationInvokedReport" in subscr.short_filter_string + ] + if len(operation_invoked_subscriptions) == 0: + results_collector.log_result(False, step, info, "OperationInvokedReport not subscribed, cannot test") + elif len(operation_invoked_subscriptions) > 1: + results_collector.log_result( + False, + step, + info, + f"found {len(operation_invoked_subscriptions)} OperationInvokedReport subscribed, cannot test", + ) + else: + try: + operations = client.mdib.descriptions.NODETYPE.get(pm_qnames.SetValueOperationDescriptor, []) + my_ops = [op for op in operations if op.Type.Code == "67108888"] + if len(my_ops) != 1: + results_collector.log_result(False, step, info, f'found {len(my_ops)} operations with code "67108888"') + else: + operation = my_ops[0] + future_object = client.set_service_client.set_numeric_value(operation.Handle, Decimal(42)) + operation_result = future_object.result() + if len(operation_result.report_parts) == 0: + results_collector.log_result(False, step, info, "no notification") + elif len(operation_result.report_parts) > 1: + results_collector.log_result( + False, + step, + info, + f"got {len(operation_result.report_parts)} notifications, expect only one", + ) + else: + results_collector.log_result( + True, + step, + info, + f"got {len(operation_result.report_parts)} notifications", + ) + if operation_result.InvocationInfo.InvocationState != msg_types.InvocationState.FINISHED: + results_collector.log_result( + False, + step, + info, + f"got result {operation_result.InvocationInfo.InvocationState} " + f"{operation_result.InvocationInfo.InvocationError} " + f"{operation_result.InvocationInfo.InvocationErrorMessage}", + ) + except Exception as ex: # noqa:BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info, ex) + + step = "6d" + info = "SetString: Initiates a transaction that sends Wait, Start and Finished" + print(step, info) + try: + operations = client.mdib.descriptions.NODETYPE.get(pm_qnames.SetStringOperationDescriptor, []) + my_ops = [op for op in operations if op.Type.Code == "67108889"] + if len(my_ops) != 1: + results_collector.log_result(False, step, info, f'found {len(my_ops)} operations with code "67108889"') + else: + operation = my_ops[0] + future_object = client.set_service_client.set_string(operation.Handle, "STANDBY") + operation_result = future_object.result() + if len(operation_result.report_parts) < 3: # noqa: PLR2004 + results_collector.log_result( + False, + step, + info, + f"only {len(operation_result.report_parts)} notification(s)", + ) + elif len(operation_result.report_parts) >= 3: # noqa: PLR2004 + # check order of operation invoked reports (simple expectation, there could be multiple WAIT in theory) + expectation = [ + msg_types.InvocationState.WAIT, + msg_types.InvocationState.START, + msg_types.InvocationState.FINISHED, + ] + inv_states = [p.InvocationInfo.InvocationState for p in operation_result.report_parts] + if inv_states != expectation: + results_collector.log_result(False, step, info, f"wrong order {inv_states}") + else: + results_collector.log_result( + True, + step, + info, + f"got {len(operation_result.report_parts)} notifications", + ) + if operation_result.InvocationInfo.InvocationState != msg_types.InvocationState.FINISHED: + results_collector.log_result( + False, + step, + info, + f"got result {operation_result.InvocationInfo.InvocationState} " + f"{operation_result.InvocationInfo.InvocationError} " + f"{operation_result.InvocationInfo.InvocationErrorMessage}", + ) + + except Exception as ex: # noqa: BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info, ex) + + step = "6e" + info = "SetMetricStates Immediately answers with finished" + print(step, info) + try: + operations = client.mdib.descriptions.NODETYPE.get(pm_qnames.SetMetricStateOperationDescriptor, []) + my_ops = [op for op in operations if op.Type.Code == "67108890"] + if len(my_ops) != 1: + results_collector.log_result(False, step, info, f'found {len(my_ops)} operations with code "67108890"') + else: + operation = my_ops[0] + proposed_metric_state1 = client.mdib.xtra.mk_proposed_state("numeric_metric_0.channel_0.vmd_1.mds_0") + proposed_metric_state2 = client.mdib.xtra.mk_proposed_state("numeric_metric_1.channel_0.vmd_1.mds_0") + for st in (proposed_metric_state1, proposed_metric_state2): + if st.MetricValue is None: + st.mk_metric_value() + st.MetricValue.Value = Decimal(1) + else: + st.MetricValue.Value += Decimal(0.1) + future_object = client.set_service_client.set_metric_state( + operation.Handle, + [proposed_metric_state1, proposed_metric_state2], + ) + operation_result = future_object.result() + if len(operation_result.report_parts) == 0: + results_collector.log_result(False, step, info, "no notification") + elif len(operation_result.report_parts) > 1: + results_collector.log_result( + False, + step, + info, + f"got {len(operation_result.report_parts)} notifications, expect only one", + ) + else: + results_collector.log_result( + True, + step, + info, + f"got {len(operation_result.report_parts)} notifications", + ) + if operation_result.InvocationInfo.InvocationState != msg_types.InvocationState.FINISHED: + results_collector.log_result( + False, + step, + info, + f"got result {operation_result.InvocationInfo.InvocationState} " + f"{operation_result.InvocationInfo.InvocationError} " + f"{operation_result.InvocationInfo.InvocationErrorMessage}", + ) + except Exception as ex: # noqa: BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info, ex) + + step = "7" + info = "Graceful shutdown (at least subscriptions are ended; optionally Bye is sent)" + try: + success = client._subscription_mgr.unsubscribe_all() # noqa: SLF001 + results_collector.log_result(success, step, info) + except Exception as ex: # noqa: BLE001 + print(traceback.format_exc()) + results_collector.log_result(False, step, info, ex) + time.sleep(2) + return results_collector + + +if __name__ == "__main__": + xtra_log_config = os.getenv("ref_xtra_log_cnf") # noqa:SIM112 + + import json + import logging.config + + with pathlib.Path(__file__).parent.joinpath("logging_default.json").open() as f: + logging_setup = json.load(f) + logging.config.dictConfig(logging_setup) + if xtra_log_config is not None: + with pathlib.Path(xtra_log_config).open() as f: + logging_setup2 = json.load(f) + logging.config.dictConfig(logging_setup2) + + results = ResultsCollector() + + run_ref_test(results) + results.print_summary() + sys.exit(bool(results.failed_count)) diff --git a/examples/ReferenceTestV2/reference_provider_v2.py b/examples/ReferenceTestV2/reference_provider_v2.py new file mode 100644 index 00000000..19189ed6 --- /dev/null +++ b/examples/ReferenceTestV2/reference_provider_v2.py @@ -0,0 +1,477 @@ +"""Implementation of reference provider. + +The reference provider gets its parameters from environment variables: +- adapter_ip specifies which ip address shall be used +- ca_folder specifies where the communication certificates are located. +- ref_fac, ref_poc and ref_bed specify the location values facility, point of care and bed. +- ssl_passwd specifies an optional password for the certificates. + +If a value is not provided as environment variable, the default value (see code below) will be used. +""" + +from __future__ import annotations + +import datetime +import json +import logging.config +import os +import pathlib +import traceback +import uuid +from decimal import Decimal +from time import sleep +from typing import TYPE_CHECKING + +import sdc11073 +from sdc11073 import location, network +from sdc11073.certloader import mk_ssl_contexts_from_folder +from sdc11073.loghelper import LoggerAdapter +from sdc11073.mdib import ProviderMdib, descriptorcontainers +from sdc11073.provider import SdcProvider, components +from sdc11073.provider.servicesfactory import DPWSHostedService, HostedServices, mk_dpws_hosts +from sdc11073.provider.subscriptionmgr_async import SubscriptionsManagerReferenceParamAsync +from sdc11073.pysoap.soapclient_async import SoapClientAsync +from sdc11073.roles.waveformprovider import waveforms +from sdc11073.wsdiscovery import WSDiscovery +from sdc11073.xml_types import pm_qnames +from sdc11073.xml_types.dpws_types import ThisDeviceType, ThisModelType + +if TYPE_CHECKING: + from sdc11073.provider.components import SdcProviderComponents + + +def get_network_adapter() -> network.NetworkAdapter: + """Get network adapter from environment or first loopback.""" + if (ip := os.getenv("ref_ip")) is not None: # noqa: SIM112 + return network.get_adapter_containing_ip(ip) + # get next available loopback adapter + return next(adapter for adapter in network.get_adapters() if adapter.is_loopback) + + +def get_location() -> location.SdcLocation: + """Get location from environment or default.""" + return location.SdcLocation( + fac=os.getenv("ref_fac", default="r_fac"), # noqa: SIM112 + poc=os.getenv("ref_poc", default="r_poc"), # noqa: SIM112 + bed=os.getenv("ref_bed", default="r_bed"), # noqa: SIM112 + ) + + +def get_ssl_context() -> sdc11073.certloader.SSLContextContainer | None: + """Get ssl context from environment or None.""" + if (ca_folder := os.getenv("ref_ca")) is None: # noqa: SIM112 + return None + return mk_ssl_contexts_from_folder( + ca_folder, + private_key="user_private_key_encrypted.pem", + certificate="user_certificate_root_signed.pem", + ca_public_key="root_certificate.pem", + cyphers_file=None, + ssl_passwd=os.getenv("ref_ssl_passwd"), # noqa: SIM112 + ) + + +def get_epr() -> uuid.UUID: + """Get epr from environment or default.""" + if (epr := os.getenv("ref_search_epr")) is not None: # noqa: SIM112 + return uuid.UUID(epr) + return uuid.UUID("12345678-6f55-11ea-9697-123456789abc") + + +def get_mdib_path() -> pathlib.Path: + """Get mdib from environment or default mdib.""" + if mdib_path := os.getenv("ref_mdib"): # noqa:SIM112 + return pathlib.Path(mdib_path) + return pathlib.Path(__file__).parent.joinpath("mdib_test_sequence_2_v4(temp).xml") + + +numeric_metric_handle = "numeric_metric_0.channel_0.vmd_0.mds_0" +string_metric_handle = "string_metric_0.channel_0.vmd_0.mds_0" +alert_condition_handle = "alert_condition_0.vmd_0.mds_1" +alert_signal_handle = "alert_signal_0.mds_0" +set_value_handle = "set_value_0.sco.mds_0" +set_string_handle = "set_string_0.sco.mds_0" +battery_handle = "battery_0.mds_0" +vmd_handle = "vmd_0.mds_0" +mds_handle = "mds_0" +USE_REFERENCE_PARAMETERS = False + +# some switches to enable/disable some of the provider data updates +# enabling allows to verify that the reference consumer detects missing updates + +# 4 State Reports +# a) The Reference Provider produces at least 5 numeric metric updates in 30 seconds +# b) The Reference Provider produces at least 5 string metric updates (StringMetric or EnumStringMetric) in 30 seconds +# c) The Reference Provider produces at least 5 alert condition updates (AlertCondition or LimitAlertCondition) in 30 seconds +# d) The Reference Provider produces at least 5 alert signal updates in 30 seconds +# e) The Reference Provider provides alert system self checks in accordance to the periodicity defined in the MDIB (at least every 10 seconds) +# f) The Reference Provider provides 3 waveforms (RealTimeSampleArrayMetric) x 10 messages per second x 100 samples per message +# g) The Reference Provider provides changes for the following components: +# * At least 5 Clock or Battery object updates in 30 seconds (Component report) +# * At least 5 MDS or VMD updates in 30 seconds (Component report) +# g) The Reference Provider provides changes for the following operational states: +# At least 5 Operation updates in 30 seconds; enable/disable operations; some different than the ones mentioned above (Operational State Report)""" +enable_4a = True +enable_4b = True +enable_4c = True +enable_4d = True +# switching 4e not implemented +enable_4f = True + +# 5 Description Modifications: +# a) The Reference Provider produces at least 1 update every 10 seconds comprising +# * Update Alert condition concept description of Type +# * Update Alert condition cause-remedy information +# * Update Unit of measure (metrics) +enable_5a1 = True +enable_5a2 = True +enable_5a3 = True + +# 6 Operation invocation +# a) (removed) +# b) SetContextState: +# * Payload: 1 Patient Context +# * Context state is added to the MDIB including context association and validation +# * If there is an associated context already, that context shall be disassociated +# * Handle and version information is generated by the provider +# * In order to avoid infinite growth of patient contexts, older contexts are allowed to be removed from the MDIB +# (=ContextAssociation=No) +# c) SetValue: Immediately answers with "finished" +# * Finished has to be sent as a report in addition to the response => +# d) SetString: Initiates a transaction that sends Wait, Start and Finished +# e) SetMetricStates: +# * Payload: 2 Metric States (settings; consider alert limits) +# * Immediately sends finished +# * Action: Alter values of metrics +enable_6c = True +enable_6d = True +enable_6e = True + + +def mk_all_services_except_localization( + sdc_provider: SdcProvider, + components: SdcProviderComponents, + subscription_managers: dict, +) -> HostedServices: + """Create all services except localization service.""" + # register all services with their endpoint references acc. to structure in components + dpws_services, services_by_name = mk_dpws_hosts(sdc_provider, components, DPWSHostedService, subscription_managers) + return HostedServices( + dpws_services, + services_by_name["GetService"], + set_service=services_by_name.get("SetService"), + context_service=services_by_name.get("ContextService"), + description_event_service=services_by_name.get("DescriptionEventService"), + state_event_service=services_by_name.get("StateEventService"), + waveform_service=services_by_name.get("WaveformService"), + containment_tree_service=services_by_name.get("ContainmentTreeService"), + # localization_service=services_by_name.get('LocalizationService') # noqa: ERA001 + ) + + +def provide_realtime_data(sdc_provider: SdcProvider): + """Provide realtime data.""" + waveform_provider = sdc_provider.waveform_provider + if waveform_provider is None: + return + mdib_waveforms = sdc_provider.mdib.descriptions.NODETYPE.get(pm_qnames.RealTimeSampleArrayMetricDescriptor) + for waveform in mdib_waveforms: + wf_generator = waveforms.SawtoothGenerator(min_value=0, max_value=10, waveform_period=1.1, sample_period=0.001) + waveform_provider.register_waveform_generator(waveform.Handle, wf_generator) + + +def run_provider(): # noqa: PLR0915, PLR0912, C901 + """Run provider until KeyboardError is raised.""" + with pathlib.Path(__file__).parent.joinpath("logging_default.json").open() as f: + logging_setup = json.load(f) + logging.config.dictConfig(logging_setup) + xtra_log_config = os.getenv("ref_xtra_log_cnf") # noqa:SIM112 + if xtra_log_config is not None: + with pathlib.Path(xtra_log_config).open() as f: + logging_setup2 = json.load(f) + logging.config.dictConfig(logging_setup2) + + logger = logging.getLogger("sdc") + logger = LoggerAdapter(logger) + logger.info("%s", "start") + adapter_ip = get_network_adapter().ip + wsd = WSDiscovery(adapter_ip) + wsd.start() + my_mdib = ProviderMdib.from_mdib_file(str(get_mdib_path())) + my_uuid = get_epr() + print(f"UUID for this device is {my_uuid}") + loc = get_location() + print(f"location for this device is {loc}") + dpws_model = ThisModelType( + manufacturer="sdc11073", + manufacturer_url="www.sdc11073.com", + model_name="TestDevice", + model_number="1.0", + model_url="www.sdc11073.com/model", + presentation_url="www.sdc11073.com/model/presentation", + ) + + dpws_device = ThisDeviceType(friendly_name="TestDevice", firmware_version="Version1", serial_number="12345") + ssl_context = get_ssl_context() + if USE_REFERENCE_PARAMETERS: + tmp = {"StateEvent": SubscriptionsManagerReferenceParamAsync} + specific_components = components.SdcProviderComponents( + subscriptions_manager_class=tmp, + hosted_services={ + "Get": [components.GetService], + "StateEvent": [ + components.StateEventService, + components.ContextService, + components.DescriptionEventService, + components.WaveformService, + ], + "Set": [components.SetService], + "ContainmentTree": [components.ContainmentTreeService], + }, + soap_client_class=SoapClientAsync, + ) + else: + specific_components = components.SdcProviderComponents( + hosted_services={ + "Get": [components.GetService], + "StateEvent": [ + components.StateEventService, + components.ContextService, + components.DescriptionEventService, + components.WaveformService, + ], + "Set": [components.SetService], + "ContainmentTree": [components.ContainmentTreeService], + }, + ) + sdc_provider = SdcProvider( + wsd, + dpws_model, + dpws_device, + my_mdib, + my_uuid, + ssl_context_container=ssl_context, + specific_components=specific_components, + max_subscription_duration=15, + ) + sdc_provider.start_all() + + # disable delayed processing for 2 operations + if enable_6c: + sdc_provider.get_operation_by_handle("set_value_0.sco.mds_0").delayed_processing = False + if not enable_6d: + sdc_provider.get_operation_by_handle("set_string_0.sco.mds_0").delayed_processing = False + if enable_6e: + sdc_provider.get_operation_by_handle("set_metric_0.sco.vmd_1.mds_0").delayed_processing = False + + pm = my_mdib.data_model.pm_names + pm_types = my_mdib.data_model.pm_types + validators = [pm_types.InstanceIdentifier("Validator", extension_string="System")] + sdc_provider.set_location(loc, validators) + if enable_4f: + provide_realtime_data(sdc_provider) + patient_descriptor_handle = my_mdib.descriptions.NODETYPE.get(pm.PatientContextDescriptor)[0].Handle + with my_mdib.context_state_transaction() as mgr: + patient_container = mgr.mk_context_state(patient_descriptor_handle) + patient_container.CoreData.Givenname = "Given" + patient_container.CoreData.Middlename = ["Middle"] + patient_container.CoreData.Familyname = "Familiy" + patient_container.CoreData.Birthname = "Birthname" + patient_container.CoreData.Title = "Title" + patient_container.ContextAssociation = pm_types.ContextAssociation.ASSOCIATED + patient_container.Validator.extend(validators) + identifiers = [] + patient_container.Identification = identifiers + + all_descriptors = list(sdc_provider.mdib.descriptions.objects) + all_descriptors.sort(key=lambda x: x.Handle) + numeric_metric = None + string_metric = None + alert_condition = None + alert_signal = None + battery_descriptor = None + string_operation = None + value_operation = None + + # search for descriptors of specific types + for one_descriptor in all_descriptors: + if one_descriptor.Handle == numeric_metric_handle: + numeric_metric = one_descriptor + if one_descriptor.Handle == string_metric_handle: + string_metric = one_descriptor + if one_descriptor.Handle == alert_condition_handle: + alert_condition = one_descriptor + if one_descriptor.Handle == alert_signal_handle: + alert_signal = one_descriptor + if one_descriptor.Handle == battery_handle: + battery_descriptor = one_descriptor + if one_descriptor.Handle == set_value_handle: + value_operation = one_descriptor + if one_descriptor.Handle == set_string_handle: + string_operation = one_descriptor + + with sdc_provider.mdib.metric_state_transaction() as mgr: + state = mgr.get_state(value_operation.OperationTarget) + if not state.MetricValue: + state.mk_metric_value() + state = mgr.get_state(string_operation.OperationTarget) + if not state.MetricValue: + state.mk_metric_value() + print("Running forever, CTRL-C to exit") + try: + str_current_value = 0 + while True: + if numeric_metric: + try: + if enable_4a: + with sdc_provider.mdib.metric_state_transaction() as mgr: + state = mgr.get_state(numeric_metric.Handle) + if not state.MetricValue: + state.mk_metric_value() + if state.MetricValue.Value is None: + state.MetricValue.Value = Decimal("0") + else: + state.MetricValue.Value += Decimal(1) + if enable_5a3: + with sdc_provider.mdib.descriptor_transaction() as mgr: + descriptor: descriptorcontainers.AbstractMetricDescriptorContainer = mgr.get_descriptor( + numeric_metric.Handle, + ) + descriptor.Unit.Code = "code1" if descriptor.Unit.Code == "code2" else "code2" + except Exception: # noqa: BLE001 + print(traceback.format_exc()) + else: + print("Numeric Metric not found in MDIB!") + if string_metric: + try: + if enable_4b: + with sdc_provider.mdib.metric_state_transaction() as mgr: + state = mgr.get_state(string_metric.Handle) + if not state.MetricValue: + state.mk_metric_value() + state.MetricValue.Value = f"my string {str_current_value}" + str_current_value += 1 + except Exception: # noqa: BLE001 + print(traceback.format_exc()) + else: + print("Numeric Metric not found in MDIB!") + + if alert_condition: + try: + if enable_4c: + with sdc_provider.mdib.alert_state_transaction() as mgr: + state = mgr.get_state(alert_condition.Handle) + state.Presence = not state.Presence + except Exception: # noqa: BLE001 + print(traceback.format_exc()) + try: + with sdc_provider.mdib.descriptor_transaction() as mgr: + now = datetime.datetime.now() + text = f"last changed at {now.hour:02d}:{now.minute:02d}:{now.second:02d}" + descriptor: descriptorcontainers.AlertConditionDescriptorContainer = mgr.get_descriptor( + alert_condition.Handle, + ) + if enable_5a1: + if len(descriptor.Type.ConceptDescription) == 0: + descriptor.Type.ConceptDescription.append(pm_types.LocalizedText(text)) + else: + descriptor.Type.ConceptDescription[0].text = text + if enable_5a2: + if len(descriptor.CauseInfo) == 0: + descriptor.CauseInfo.append(pm_types.CauseInfo()) + if len(descriptor.CauseInfo[0].RemedyInfo.Description) == 0: + descriptor.CauseInfo[0].RemedyInfo.Description.append(pm_types.LocalizedText(text)) + else: + descriptor.CauseInfo[0].RemedyInfo.Description[0].text = text + except Exception: # noqa: BLE001 + print(traceback.format_exc()) + + else: + print("Alert condition not found in MDIB") + + if alert_signal: + try: + if enable_4d: + with sdc_provider.mdib.alert_state_transaction() as mgr: + state = mgr.get_state(alert_signal.Handle) + if state.Slot is None: + state.Slot = 1 + else: + state.Slot += 1 + except Exception: # noqa:BLE001 + print(traceback.format_exc()) + else: + print("Alert signal not found in MDIB") + + if battery_descriptor: + try: + with sdc_provider.mdib.component_state_transaction() as mgr: + state = mgr.get_state(battery_descriptor.Handle) + if state.Voltage is None: + state.Voltage = pm_types.Measurement(value=Decimal("14.4"), unit=pm_types.CodedValue("xyz")) + else: + state.Voltage.MeasuredValue += Decimal("0.1") + print(f"battery voltage = {state.Voltage.MeasuredValue}") + except Exception: # noqa:BLE001 + print(traceback.format_exc()) + else: + print("battery state not found in MDIB") + + try: + with sdc_provider.mdib.component_state_transaction() as mgr: + state = mgr.get_state(vmd_handle) + state.OperatingHours = 2 if state.OperatingHours != 2 else 1 # noqa:PLR2004 + print(f"operating hours = {state.OperatingHours}") + except Exception: # noqa:BLE001 + print(traceback.format_exc()) + + try: + with sdc_provider.mdib.component_state_transaction() as mgr: + state = mgr.get_state(mds_handle) + state.Lang = "de" if state.Lang != "de" else "en" + print(f"mds lang = {state.Lang}") + except Exception: # noqa:BLE001 + print(traceback.format_exc()) + + # add or rm vmd + add_rm_metric_handle = "add_rm_metric" + add_rm_channel_handle = "add_rm_channel" + add_rm_vmd_handle = "add_rm_vmd" + add_rm_mds_handle = "mds_0" + vmd_descriptor = sdc_provider.mdib.descriptions.handle.get_one(add_rm_vmd_handle, allow_none=True) + if vmd_descriptor is None: + vmd = descriptorcontainers.VmdDescriptorContainer(add_rm_vmd_handle, add_rm_mds_handle) + channel = descriptorcontainers.ChannelDescriptorContainer(add_rm_channel_handle, add_rm_vmd_handle) + metric = descriptorcontainers.StringMetricDescriptorContainer( + add_rm_metric_handle, + add_rm_channel_handle, + ) + metric.Unit = pm_types.CodedValue("123") + with sdc_provider.mdib.descriptor_transaction() as mgr: + mgr.add_descriptor(vmd) + mgr.add_descriptor(channel) + mgr.add_descriptor(metric) + mgr.add_state(sdc_provider.mdib.data_model.mk_state_container(vmd)) + mgr.add_state(sdc_provider.mdib.data_model.mk_state_container(channel)) + mgr.add_state(sdc_provider.mdib.data_model.mk_state_container(metric)) + else: + with sdc_provider.mdib.descriptor_transaction() as mgr: + mgr.remove_descriptor(add_rm_vmd_handle) + + # enable disable operation + with sdc_provider.mdib.operational_state_transaction() as mgr: + op_state = mgr.get_state("activate_0.sco.mds_0") + op_state.OperatingMode = ( + pm_types.OperatingMode.ENABLED + if op_state.OperatingMode == pm_types.OperatingMode.ENABLED + else pm_types.OperatingMode.DISABLED + ) + print(f"operation activate_0.sco.mds_0 {op_state.OperatingMode}") + + sleep(5) + except KeyboardInterrupt: + print("Exiting...") + + +if __name__ == "__main__": + run_provider() diff --git a/examples/ReferenceTestV2/run.py b/examples/ReferenceTestV2/run.py new file mode 100644 index 00000000..20e323bf --- /dev/null +++ b/examples/ReferenceTestV2/run.py @@ -0,0 +1,49 @@ +"""Script that executes the plug-a-thon tests.""" + +import os +import pathlib +import platform +import sys +import threading +import uuid + +from examples.ReferenceTestV2 import reference_consumer_v2, reference_provider_v2 +from sdc11073 import network + + +def setup(tls: bool): + """Setups the run.""" + os.environ["ref_search_epr"] = str(uuid.uuid4()) # noqa: SIM112 + if platform.system() == "Darwin": + os.environ["ref_ip"] = next(str(adapter.ip) for adapter in network.get_adapters() if not adapter.is_loopback) # noqa: SIM112 + else: + os.environ["ref_ip"] = next(str(adapter.ip) for adapter in network.get_adapters() if adapter.is_loopback) # noqa: SIM112 + if tls: + certs_path = pathlib.Path(__file__).parent.parent.joinpath("certs") + assert certs_path.exists() + os.environ["ref_ca"] = str(certs_path) # noqa: SIM112 + os.environ["ref_ssl_passwd"] = "dummypass" # noqa: S105,SIM112 + + +def run() -> reference_consumer_v2.ResultsCollector: + """Run tests.""" + threading.Thread(target=reference_provider_v2.run_provider, daemon=True).start() + return reference_consumer_v2.run_ref_test(reference_consumer_v2.ResultsCollector()) + + +def main(tls: bool) -> reference_consumer_v2.ResultsCollector: + """Setups and run tests.""" + setup(tls) + return run() + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="run plug-a-thon tests") + parser.add_argument("--tls", action="store_true", help="Indicates whether tls encryption should be enabled.") + + args = parser.parse_args() + run_results = main(tls=args.tls) + run_results.print_summary() + sys.exit(bool(run_results.failed_count))