From ba06944695fc24758856e87962c8d9ea90529ad9 Mon Sep 17 00:00:00 2001
From: a-kleinf <87371692+a-kleinf@users.noreply.github.com>
Date: Thu, 14 Nov 2024 12:07:20 +0100
Subject: [PATCH] Unify lxml.etree import (#397)
Unified import via "from lxml import etree" (No tests or an update of
the changelog is needed since no feature was implemented)
## Types of changes
- [ ] Bug fix (non-breaking change which fixes an issue)
- [] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Checklist:
- [ ] I have updated the [changelog](../CHANGELOG.md) accordingly.
- [ ] I have added tests to cover my changes.
---
src/sdc11073/consumer/consumerimpl.py | 4 +-
.../consumer/serviceclients/contextservice.py | 4 +-
.../serviceclients/serviceclientbase.py | 6 +-
src/sdc11073/consumer/subscription.py | 10 +--
src/sdc11073/definitions_base.py | 14 +--
src/sdc11073/definitions_sdc.py | 6 +-
src/sdc11073/dispatch/dispatchkey.py | 4 +-
src/sdc11073/mdib/containerbase.py | 10 +--
src/sdc11073/mdib/descriptorcontainers.py | 17 ++--
src/sdc11073/mdib/mdibbase.py | 39 ++++-----
src/sdc11073/mdib/statecontainers.py | 10 +--
src/sdc11073/namespaces.py | 20 ++---
src/sdc11073/provider/components.py | 4 +-
src/sdc11073/provider/dpwshostedservice.py | 20 ++---
src/sdc11073/provider/operations.py | 8 +-
.../provider/porttypes/porttypebase.py | 78 ++++++++---------
src/sdc11073/provider/subscriptionmgr.py | 3 +-
.../provider/subscriptionmgr_async.py | 8 +-
src/sdc11073/provider/subscriptionmgr_base.py | 10 +--
src/sdc11073/pysoap/msgfactory.py | 12 +--
src/sdc11073/pysoap/msgreader.py | 43 +++++----
src/sdc11073/pysoap/soapclient.py | 4 +-
src/sdc11073/pysoap/soapenvelope.py | 10 +--
src/sdc11073/roles/contextprovider.py | 4 +-
src/sdc11073/roles/providerbase.py | 4 +-
src/sdc11073/schema_resolver.py | 22 ++---
src/sdc11073/wsdiscovery/networkingthread.py | 4 +-
src/sdc11073/wsdiscovery/service.py | 4 +-
src/sdc11073/wsdiscovery/wsdimpl.py | 22 ++---
src/sdc11073/xml_types/addressing_types.py | 4 +-
src/sdc11073/xml_types/basetypes.py | 8 +-
src/sdc11073/xml_types/pm_types.py | 8 +-
src/sdc11073/xml_types/xml_structure.py | 87 ++++++++++---------
src/sdc11073/xml_utils.py | 8 +-
tests/mockstuff.py | 16 ++--
tests/test_client_device.py | 45 +++++-----
tests/test_client_waveform.py | 9 +-
tests/test_containerproperties.py | 32 +++----
tests/test_descriptorcontainers.py | 26 +++---
tests/test_device_services.py | 6 +-
tests/test_mdib.py | 8 +-
tests/test_pmtypes.py | 78 ++++++++---------
tests/test_xml_utils.py | 10 +--
tests/utils.py | 8 +-
tools/generate_qnames.py | 42 ++++-----
45 files changed, 401 insertions(+), 398 deletions(-)
diff --git a/src/sdc11073/consumer/consumerimpl.py b/src/sdc11073/consumer/consumerimpl.py
index 540a8e9a..33fc4cba 100644
--- a/src/sdc11073/consumer/consumerimpl.py
+++ b/src/sdc11073/consumer/consumerimpl.py
@@ -12,7 +12,7 @@
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
-from lxml import etree as etree_
+from lxml import etree
import sdc11073.certloader
from sdc11073 import commlog, loghelper, network, xml_utils
@@ -100,7 +100,7 @@ def _read_wsdl(self, soap_client: SoapClientProtocol, wsdl_url: str):
encoding = 'UTF-8'
self.wsdl_string = self.wsdl_bytes.decode(encoding)
logging.getLogger(commlog.WSDL).debug(self.wsdl_string)
- except etree_.XMLSyntaxError as ex:
+ except etree.XMLSyntaxError as ex:
self._logger.error( # noqa: PLE1205
'could not read wsdl from {}: error={}, data=\n{}', actual_path, ex, self.wsdl_bytes)
diff --git a/src/sdc11073/consumer/serviceclients/contextservice.py b/src/sdc11073/consumer/serviceclients/contextservice.py
index 5de5709f..c5b95215 100644
--- a/src/sdc11073/consumer/serviceclients/contextservice.py
+++ b/src/sdc11073/consumer/serviceclients/contextservice.py
@@ -14,7 +14,7 @@
if TYPE_CHECKING:
from concurrent.futures import Future
- from lxml.etree import QName
+ from lxml import etree
from sdc11073.consumer.manipulator import RequestManipulatorProtocol
from sdc11073.mdib.statecontainers import AbstractMultiStateProtocol
@@ -92,7 +92,7 @@ def get_context_states(self, handles: list[str] | None = None,
return GetRequestResult(received_message_data, report)
def get_context_state_by_identification(self, identifications: list[InstanceIdentifier],
- context_type: QName | None = None,
+ context_type: etree.QName | None = None,
request_manipulator: RequestManipulatorProtocol | None = None) \
-> GetRequestResult:
"""Send a GetContextStatesByIdentification request.
diff --git a/src/sdc11073/consumer/serviceclients/serviceclientbase.py b/src/sdc11073/consumer/serviceclients/serviceclientbase.py
index e8a5514e..21f9b3df 100644
--- a/src/sdc11073/consumer/serviceclients/serviceclientbase.py
+++ b/src/sdc11073/consumer/serviceclients/serviceclientbase.py
@@ -10,7 +10,7 @@
if TYPE_CHECKING:
from concurrent.futures import Future
- from lxml.etree import QName
+ from lxml import etree
from sdc11073.consumer.consumerimpl import SdcConsumer
from sdc11073.consumer.manipulator import RequestManipulatorProtocol
@@ -55,7 +55,7 @@ def action(self) -> str | None:
return self._received_message.action
@property
- def msg_name(self) -> QName:
+ def msg_name(self) -> etree.QName:
"""Return the QName of message body."""
return self._received_message.q_name.localname
@@ -76,7 +76,7 @@ class HostedServiceClient:
def __init__(self, sdc_consumer: SdcConsumer,
soap_client: SoapClientProtocol,
dpws_hosted: HostedServiceType,
- port_type: QName):
+ port_type: etree.QName):
"""Construct a HostedServiceClient.
:param sdc_consumer:
diff --git a/src/sdc11073/consumer/subscription.py b/src/sdc11073/consumer/subscription.py
index 25d1be05..d673a73c 100644
--- a/src/sdc11073/consumer/subscription.py
+++ b/src/sdc11073/consumer/subscription.py
@@ -8,7 +8,7 @@
from typing import TYPE_CHECKING, Callable, Protocol
from urllib.parse import urlparse
-from lxml import etree as etree_
+from lxml import etree
from sdc11073 import loghelper
from sdc11073 import observableproperties as properties
@@ -64,7 +64,7 @@ class ConsumerSubscription:
notification_msg = properties.ObservableProperty()
notification_data = properties.ObservableProperty()
- IDENT_TAG = etree_.QName('http.local.com', 'MyClIdentifier')
+ IDENT_TAG = etree.QName('http.local.com', 'MyClIdentifier')
is_subscribed = properties.ObservableProperty(False)
def __init__(self, msg_factory: MessageFactory, # noqa: PLR0913
@@ -198,7 +198,7 @@ def renew(self, expires: int = 3600) -> float:
return self.granted_expires
self.is_subscribed = False
self._logger.warning('renew failed: {}', # noqa: PLE1205
- etree_.tostring(message_data.p_msg.body_node, pretty_print=True))
+ etree.tostring(message_data.p_msg.body_node, pretty_print=True))
return 0.0
def unsubscribe(self):
@@ -518,9 +518,9 @@ def mk_subscription(self, dpws_hosted: HostedServiceType, filter_type: FilterTyp
dpws_hosted, filter_type,
self._notification_url,
self._end_to_url, self.log_prefix)
- subscription.notify_to_identifier = etree_.Element(ConsumerSubscription.IDENT_TAG)
+ subscription.notify_to_identifier = etree.Element(ConsumerSubscription.IDENT_TAG)
subscription.notify_to_identifier.text = uuid.uuid4().urn
- subscription.end_to_identifier = etree_.Element(ConsumerSubscription.IDENT_TAG)
+ subscription.end_to_identifier = etree.Element(ConsumerSubscription.IDENT_TAG)
subscription.end_to_identifier.text = uuid.uuid4().urn
filter_ = filter_type.text
diff --git a/src/sdc11073/definitions_base.py b/src/sdc11073/definitions_base.py
index f0435a53..7e08bf9b 100644
--- a/src/sdc11073/definitions_base.py
+++ b/src/sdc11073/definitions_base.py
@@ -6,7 +6,7 @@
if TYPE_CHECKING:
from types import ModuleType
- from lxml.etree import QName
+ from lxml import etree
from .mdib.descriptorcontainers import AbstractDescriptorProtocol
from .mdib.statecontainers import AbstractStateProtocol
@@ -30,10 +30,10 @@ class AbstractDataModel(ABC):
"""Abstract base class for DataModelProtocol implementation."""
@abstractmethod
- def get_descriptor_container_class(self, type_qname: QName) -> type[AbstractDescriptorProtocol]:
+ def get_descriptor_container_class(self, type_qname: etree.QName) -> type[AbstractDescriptorProtocol]:
"""Get the class that represents a BICEPS descriptor entity with given QName."""
- def mk_descriptor_container(self, type_qname: QName, handle: str, parent_descriptor: Any) -> Any:
+ def mk_descriptor_container(self, type_qname: etree.QName, handle: str, parent_descriptor: Any) -> Any:
"""Create an instance that represents a BICEPS entity with given QName."""
cls = self.get_descriptor_container_class(type_qname)
if parent_descriptor is not None:
@@ -44,7 +44,7 @@ def mk_descriptor_container(self, type_qname: QName, handle: str, parent_descrip
return ret
@abstractmethod
- def get_state_container_class(self, type_qname: QName) -> type[AbstractStateProtocol]:
+ def get_state_container_class(self, type_qname: etree.QName) -> type[AbstractStateProtocol]:
"""Get the class that represents a BICEPS state entity with given QName."""
def get_state_class_for_descriptor(
@@ -98,14 +98,14 @@ class BaseDefinitions(metaclass=ProtocolsRegistry):
"""
# set the following values in derived classes:
- MedicalDeviceType: QName = None # a QName, needed for types_match method
+ MedicalDeviceType: etree.QName = None # a QName, needed for types_match method
ActionsNamespace: str = None # needed for wsdl generation
PortTypeNamespace: str = None # needed for wsdl generation
- MedicalDeviceTypesFilter: tuple[QName] | None = None # QNames that are used / expected in "types" of wsdiscovery
+ MedicalDeviceTypesFilter: tuple[etree.QName] | None = None # QNames that are used / expected in "types" of wsdiscovery
Actions = None
data_model: AbstractDataModel = None
@classmethod
- def types_match(cls, types: list[QName]) -> bool:
+ def types_match(cls, types: list[etree.QName]) -> bool:
"""Check if this definition can be used for the provided types."""
return cls.MedicalDeviceType in types
diff --git a/src/sdc11073/definitions_sdc.py b/src/sdc11073/definitions_sdc.py
index fe2c1244..d632abbd 100644
--- a/src/sdc11073/definitions_sdc.py
+++ b/src/sdc11073/definitions_sdc.py
@@ -11,7 +11,7 @@
if TYPE_CHECKING:
from types import ModuleType
- from lxml.etree import QName
+ from lxml import etree
from .namespaces import NamespaceHelper
@@ -23,11 +23,11 @@ def __init__(self):
super().__init__()
self._ns_hlp = ns_hlp
- def get_descriptor_container_class(self, type_qname: QName) -> type:
+ def get_descriptor_container_class(self, type_qname: etree.QName) -> type:
"""Get the class that represents a BICEPS descriptor entity with given QName."""
return get_descriptor_container_class(type_qname)
- def get_state_container_class(self, type_qname: QName) -> type:
+ def get_state_container_class(self, type_qname: etree.QName) -> type:
"""Get the class that represents a BICEPS state entity with given QName."""
return get_state_container_class(type_qname)
diff --git a/src/sdc11073/dispatch/dispatchkey.py b/src/sdc11073/dispatch/dispatchkey.py
index 1a0f2a23..142070c5 100644
--- a/src/sdc11073/dispatch/dispatchkey.py
+++ b/src/sdc11073/dispatch/dispatchkey.py
@@ -11,7 +11,7 @@
from sdc11073.pysoap.soapenvelope import Fault, faultcodeEnum
if TYPE_CHECKING:
- from lxml.etree import QName
+ from lxml import etree
@dataclass(frozen=True)
@@ -22,7 +22,7 @@ class DispatchKey:
"""
action: str
- message_tag: QName | None
+ message_tag: etree.QName | None
def __repr__(self) -> str:
return f'{self.__class__.__name__} action={self.action} msg={self.message_tag}'
diff --git a/src/sdc11073/mdib/containerbase.py b/src/sdc11073/mdib/containerbase.py
index fcbe17be..d0fd0d7e 100644
--- a/src/sdc11073/mdib/containerbase.py
+++ b/src/sdc11073/mdib/containerbase.py
@@ -4,7 +4,7 @@
import inspect
from typing import Any
import math
-from lxml.etree import Element, SubElement, QName
+from lxml import etree
from sdc11073 import observableproperties as properties
from sdc11073.namespaces import QN_TYPE, NamespaceHelper
@@ -14,7 +14,7 @@
class ContainerBase:
"""Common base class for descriptors and states."""
- NODETYPE: QName = None # overwrite in derived classes! This is the BICEPS Type.
+ NODETYPE: etree.QName = None # overwrite in derived classes! This is the BICEPS Type.
node = properties.ObservableProperty()
is_state_container = False
is_descriptor_container = False
@@ -35,7 +35,7 @@ def get_actual_value(self, attr_name: str) -> Any:
return getattr(self.__class__, attr_name).get_actual_value(self)
def mk_node(self,
- tag: QName,
+ tag: etree.QName,
ns_helper: NamespaceHelper,
parent_node: xml_utils.LxmlElement | None = None,
set_xsi_type: bool = False) -> xml_utils.LxmlElement:
@@ -51,9 +51,9 @@ def mk_node(self,
ns_helper.MSG,
ns_helper.XSI)
if parent_node is not None:
- node = SubElement(parent_node, tag, nsmap=ns_map)
+ node = etree.SubElement(parent_node, tag, nsmap=ns_map)
else:
- node = Element(tag, nsmap=ns_map)
+ node = etree.Element(tag, nsmap=ns_map)
self.update_node(node, ns_helper, set_xsi_type)
return node
diff --git a/src/sdc11073/mdib/descriptorcontainers.py b/src/sdc11073/mdib/descriptorcontainers.py
index 4858fecc..581707af 100644
--- a/src/sdc11073/mdib/descriptorcontainers.py
+++ b/src/sdc11073/mdib/descriptorcontainers.py
@@ -17,10 +17,9 @@
from collections.abc import Iterable
from decimal import Decimal
- from lxml import etree as etree_
+ from lxml import etree
from sdc11073 import xml_utils
- from sdc11073.namespaces import NamespaceHelper
from sdc11073.xml_types.isoduration import DurationType
from sdc11073.xml_types.xml_structure import ExtensionLocalValue
@@ -34,8 +33,8 @@ class ChildDescriptorMapping:
pm.Metric for all classes derived from AbstractMetricDescriptor.
"""
- child_qname: etree_.QName
- node_types: tuple[etree_.QName, ...] = None
+ child_qname: etree.QName
+ node_types: tuple[etree.QName, ...] = None
def __repr__(self) -> str:
if self.node_types is None:
@@ -59,8 +58,8 @@ def sorted_child_data(obj: Any, member_name: str):
class AbstractDescriptorProtocol(Protocol):
"""The common Interface of all descriptors."""
- NODETYPE: etree_.QName
- STATE_QNAME: etree_.QName
+ NODETYPE: etree.QName
+ STATE_QNAME: etree.QName
is_descriptor_container: bool
is_system_context_descriptor: bool
is_realtime_sample_array_metric_descriptor: bool
@@ -125,7 +124,7 @@ class AbstractDescriptorContainer(ContainerBase):
_props = ('Handle', 'DescriptorVersion', 'SafetyClassification', 'Extension', 'Type')
_child_elements_order = (ext.Extension, pm_qnames.Type) # child elements in BICEPS order
STATE_QNAME = None
- extension_class_lookup: ClassVar[dict[etree_.QName, type[pm_types.PropertyBasedPMType]]] = {
+ extension_class_lookup: ClassVar[dict[etree.QName, type[pm_types.PropertyBasedPMType]]] = {
msg.Retrievability: pm_types.Retrievability
}
@@ -205,7 +204,7 @@ def diff(self, other: AbstractDescriptorContainer, ignore_property_names: list[s
ret.append(f'parent_handle={my_value}, other={other_value}')
return None if len(ret) == 0 else ret
- def tag_name_for_child_descriptor(self, node_type: etree_.QName) -> (etree_.QName, bool):
+ def tag_name_for_child_descriptor(self, node_type: etree.QName) -> (etree.QName, bool):
"""Determine the tag name of a child descriptor.
This isneeded when the xml tree of the descriptor is created.
@@ -796,6 +795,6 @@ class EnsembleContextDescriptorContainer(AbstractContextDescriptorContainer):
_name_class_lookup.update(_name_class_xtra_lookup)
-def get_container_class(qname: etree_.QName) -> type[AbstractDescriptorContainer]:
+def get_container_class(qname: etree.QName) -> type[AbstractDescriptorContainer]:
""":param qname: a QName instance"""
return _name_class_lookup.get(qname)
diff --git a/src/sdc11073/mdib/mdibbase.py b/src/sdc11073/mdib/mdibbase.py
index 1d2af76e..1ecd9c21 100644
--- a/src/sdc11073/mdib/mdibbase.py
+++ b/src/sdc11073/mdib/mdibbase.py
@@ -5,7 +5,7 @@
from threading import Lock
from typing import TYPE_CHECKING, Any
-from lxml import etree as etree_
+from lxml import etree
from sdc11073 import multikey
from sdc11073 import observableproperties as properties
@@ -13,7 +13,6 @@
from sdc11073.xml_types.pm_types import Coding, have_matching_codes
if TYPE_CHECKING:
- from lxml.etree import QName
from sdc11073.definitions_base import BaseDefinitions
from sdc11073.loghelper import LoggerAdapter
from sdc11073.xml_types.pm_types import CodedValue
@@ -75,7 +74,7 @@ class DescriptorsLookup(_MultikeyWithVersionLookup):
handle: multikey.UIndexDefinition[str, list[AbstractDescriptorContainer]]
parent_handle: multikey.IndexDefinition[str, list[AbstractDescriptorContainer]]
- NODETYPE: multikey.IndexDefinition[QName, list[AbstractDescriptorContainer]]
+ NODETYPE: multikey.IndexDefinition[etree.QName, list[AbstractDescriptorContainer]]
coding: multikey.IndexDefinition[Coding, list[AbstractDescriptorContainer]]
condition_signaled: multikey.IndexDefinition[str, list[AbstractDescriptorContainer]]
source: multikey.IndexDefinition[str, list[AbstractDescriptorContainer]]
@@ -153,7 +152,7 @@ class StatesLookup(_MultikeyWithVersionLookup):
"""
descriptor_handle: multikey.UIndexDefinition[str, list[AbstractDescriptorContainer]]
- NODETYPE: multikey.IndexDefinition[QName, list[AbstractDescriptorContainer]]
+ NODETYPE: multikey.IndexDefinition[etree.QName, list[AbstractDescriptorContainer]]
def __init__(self):
super().__init__()
@@ -189,7 +188,7 @@ class MultiStatesLookup(_MultikeyWithVersionLookup):
descriptor_handle: multikey.IndexDefinition[str, list[AbstractMultiStateContainer]]
handle: multikey.UIndexDefinition[str, list[AbstractMultiStateContainer]]
- NODETYPE: multikey.IndexDefinition[QName, list[AbstractMultiStateContainer]]
+ NODETYPE: multikey.IndexDefinition[etree.QName, list[AbstractMultiStateContainer]]
def __init__(self):
super().__init__()
@@ -337,7 +336,7 @@ def add_state_containers(self, state_containers: list[AbstractStateContainer | A
self._logger.error('add_state_containers: {}, Handle={}; {}', # noqa: PLE1205
ex, state_container.Handle, traceback.format_exc())
else:
- self._logger.error('add_state_containers: {}, DescriptorHandle={}; {}', # noqa: PLE1205
+ self._logger.error('add_state_containers: {}, DescriptorHandle={}; {}', # noqa: PLE1205
ex, state_container.DescriptorHandle, traceback.format_exc())
def _reconstruct_md_description(self) -> xml_utils.LxmlElement:
@@ -345,9 +344,9 @@ def _reconstruct_md_description(self) -> xml_utils.LxmlElement:
pm = self.data_model.pm_names
doc_nsmap = self.nsmapper.ns_map
root_containers = self.descriptions.parent_handle.get(None) or []
- md_description_node = etree_.Element(pm.MdDescription,
- attrib={'DescriptionVersion': str(self.mddescription_version)},
- nsmap=doc_nsmap)
+ md_description_node = etree.Element(pm.MdDescription,
+ attrib={'DescriptionVersion': str(self.mddescription_version)},
+ nsmap=doc_nsmap)
for root_container in root_containers:
self.make_descriptor_node(root_container, md_description_node, tag=pm.Mds, set_xsi_type=False)
return md_description_node
@@ -355,7 +354,7 @@ def _reconstruct_md_description(self) -> xml_utils.LxmlElement:
def make_descriptor_node(self,
descriptor_container: AbstractDescriptorContainer,
parent_node: xml_utils.LxmlElement,
- tag: etree_.QName,
+ tag: etree.QName,
set_xsi_type: bool = True) -> xml_utils.LxmlElement:
"""Create a lxml etree node with subtree from instance data.
@@ -367,10 +366,10 @@ def make_descriptor_node(self,
"""
ns_map = self.nsmapper.partial_map(self.nsmapper.PM, self.nsmapper.XSI) \
if set_xsi_type else self.nsmapper.partial_map(self.nsmapper.PM)
- node = etree_.SubElement(parent_node,
- tag,
- attrib={'Handle': descriptor_container.Handle},
- nsmap=ns_map)
+ node = etree.SubElement(parent_node,
+ tag,
+ attrib={'Handle': descriptor_container.Handle},
+ nsmap=ns_map)
descriptor_container.update_node(node, self.nsmapper, set_xsi_type) # create all
child_list = self.descriptions.parent_handle.get(descriptor_container.Handle, [])
# append all child containers, then bring all child elements in correct order
@@ -388,7 +387,7 @@ def _reconstruct_mdib(self, add_context_states: bool) -> xml_utils.LxmlElement:
pm = self.data_model.pm_names
msg = self.data_model.msg_names
doc_nsmap = self.nsmapper.ns_map
- mdib_node = etree_.Element(msg.Mdib, nsmap=doc_nsmap)
+ mdib_node = etree.Element(msg.Mdib, nsmap=doc_nsmap)
mdib_node.set('MdibVersion', str(self.mdib_version))
mdib_node.set('SequenceId', self.sequence_id)
if self.instance_id is not None:
@@ -397,9 +396,9 @@ def _reconstruct_mdib(self, add_context_states: bool) -> xml_utils.LxmlElement:
mdib_node.append(md_description_node)
# add a list of states
- md_state_node = etree_.SubElement(mdib_node, pm.MdState,
- attrib={'StateVersion': str(self.mdstate_version)},
- nsmap=doc_nsmap)
+ md_state_node = etree.SubElement(mdib_node, pm.MdState,
+ attrib={'StateVersion': str(self.mdstate_version)},
+ nsmap=doc_nsmap)
tag = pm.State
for state_container in self.states.objects:
md_state_node.append(state_container.mk_state_node(tag, self.nsmapper))
@@ -562,7 +561,7 @@ def rm_descriptors_and_states(self, descriptor_containers: list[AbstractDescript
deleted_descriptors = {}
deleted_states = {}
for descriptor_container in descriptor_containers:
- self._logger.debug('rm Descriptor node {} handle {}', # noqa: PLE1205
+ self._logger.debug('rm Descriptor node {} handle {}', # noqa: PLE1205
descriptor_container.NODETYPE, descriptor_container.Handle)
self.descriptions.remove_object(descriptor_container)
deleted_descriptors[descriptor_container.Handle] = descriptor_container
@@ -571,7 +570,7 @@ def rm_descriptors_and_states(self, descriptor_containers: list[AbstractDescript
if state_containers is not None:
# make a copy, otherwise remove_objects will manipulate same list in place
state_containers = state_containers[:]
- self._logger.debug('rm {} states(s) associated to descriptor {} ', # noqa: PLE1205
+ self._logger.debug('rm {} states(s) associated to descriptor {} ', # noqa: PLE1205
len(state_containers), descriptor_container.Handle)
m_key.remove_objects(state_containers)
deleted_states[descriptor_container.Handle] = state_containers
diff --git a/src/sdc11073/mdib/statecontainers.py b/src/sdc11073/mdib/statecontainers.py
index 9cb11ded..88049b35 100644
--- a/src/sdc11073/mdib/statecontainers.py
+++ b/src/sdc11073/mdib/statecontainers.py
@@ -16,7 +16,7 @@
if TYPE_CHECKING:
from decimal import Decimal
- from lxml.etree import QName
+ from lxml import etree
from sdc11073.location import SdcLocation
from sdc11073.namespaces import NamespaceHelper
@@ -30,7 +30,7 @@
class AbstractStateProtocol(Protocol):
"""The common Interface of all states."""
- NODETYPE: QName
+ NODETYPE: etree.QName
is_state_container: bool
is_realtime_sample_array_metric_state: bool
is_metric_state: bool
@@ -87,7 +87,7 @@ def __init__(self, descriptor_container: AbstractDescriptorProtocol):
self.DescriptorVersion = descriptor_container.DescriptorVersion
# pylint: enable=invalid-name
- def mk_state_node(self, tag: QName,
+ def mk_state_node(self, tag: etree.QName,
nsmapper: NamespaceHelper,
set_xsi_type: bool = True) -> xml_utils.LxmlElement:
"""Create an etree node from instance data."""
@@ -575,7 +575,7 @@ def update_from_other_container(self, other: AbstractMultiStateContainer, skippe
f'Update from a node with different handle is not possible! Have "{self.Handle}", got "{other.Handle}"')
super().update_from_other_container(other, skipped_properties)
- def mk_state_node(self, tag: QName, nsmapper: NamespaceHelper, set_xsi_type: bool = True) -> xml_utils.LxmlElement:
+ def mk_state_node(self, tag: etree.QName, nsmapper: NamespaceHelper, set_xsi_type: bool = True) -> xml_utils.LxmlElement:
"""Create an etree node from instance data."""
if self.Handle is None:
self.Handle = uuid.uuid4().hex
@@ -710,7 +710,7 @@ class EnsembleContextStateContainer(AbstractContextStateContainer):
_state_lookup_by_type = {c.NODETYPE: c for c in classes_with_nodetype}
-def get_container_class(type_qname: QName) -> type[AbstractStateProtocol]:
+def get_container_class(type_qname: etree.QName) -> type[AbstractStateProtocol]:
"""Return class for given type.
:param type_qname: the QName of the expected NODETYPE.
diff --git a/src/sdc11073/namespaces.py b/src/sdc11073/namespaces.py
index c36c0bb5..8207a561 100644
--- a/src/sdc11073/namespaces.py
+++ b/src/sdc11073/namespaces.py
@@ -5,7 +5,7 @@
from enum import Enum
from typing import Optional, Type, NamedTuple
-from lxml import etree as etree_
+from lxml import etree
class PrefixNamespace(NamedTuple):
@@ -14,8 +14,8 @@ class PrefixNamespace(NamedTuple):
schema_location_url: str | None
local_schema_file: pathlib.Path | None
- def tag(self, localname: str) -> etree_.QName:
- return etree_.QName(self.namespace, localname)
+ def tag(self, localname: str) -> etree.QName:
+ return etree.QName(self.namespace, localname)
def doc_name(self, localname: str) -> str:
if self.prefix:
@@ -201,20 +201,20 @@ def partial_map(self, *prefix) -> dict:
ret[p.prefix] = p.namespace
return ret
- def doc_name_from_qname(self, qname: etree_.QName) -> str:
+ def doc_name_from_qname(self, qname: etree.QName) -> str:
""" returns the prefix:name string, or only name (if default namespace is used) """
if qname.namespace is not None and qname.namespace == self._default_ns:
return qname.localname
prefix = self._prefix_map[qname.namespace]
return f'{prefix}:{qname.localname}'
- def text_to_qname(self, text: str, nsmap: dict = None) -> etree_.QName:
+ def text_to_qname(self, text: str, nsmap: dict = None) -> etree.QName:
ns_map = nsmap or self.ns_map
elements = text.split(':')
prefix = None if len(elements) == 1 else elements[0]
name = elements[-1]
try:
- return etree_.QName(ns_map[prefix], name)
+ return etree.QName(ns_map[prefix], name)
except KeyError as ex:
raise KeyError(f'Cannot make QName for {text}, prefix is not in nsmap: {ns_map.keys()}') from ex
@@ -226,7 +226,7 @@ def text_to_qname(self, text: str, nsmap: dict = None) -> etree_.QName:
WSA_NONE = PrefixesEnum.WSA.namespace + '/none'
-def docname_from_qname(qname: etree_.QName, ns_map: dict) -> str:
+def docname_from_qname(qname: etree.QName, ns_map: dict) -> str:
""" returns prefix:name string, or only name (if default namespace is used) """
prefixmap = dict((v, k) for k, v in ns_map.items())
prefix = prefixmap.get(qname.namespace)
@@ -235,17 +235,17 @@ def docname_from_qname(qname: etree_.QName, ns_map: dict) -> str:
return f'{prefix}:{qname.localname}'
-def text_to_qname(text: str, doc_nsmap: dict) -> etree_.QName:
+def text_to_qname(text: str, doc_nsmap: dict) -> etree.QName:
elements = text.split(':')
prefix = None if len(elements) == 1 else elements[0]
name = elements[-1]
try:
- return etree_.QName(doc_nsmap[prefix], name)
+ return etree.QName(doc_nsmap[prefix], name)
except KeyError as ex:
raise KeyError(f'Cannot make QName for {text}, prefix is not in nsmap: {doc_nsmap.keys()}') from ex
-QN_TYPE = etree_.QName(PrefixesEnum.XSI.namespace, 'type') # frequently used QName, central definition
+QN_TYPE = etree.QName(PrefixesEnum.XSI.namespace, 'type') # frequently used QName, central definition
class EventingActions:
diff --git a/src/sdc11073/provider/components.py b/src/sdc11073/provider/components.py
index 77e7d987..4e92df6f 100644
--- a/src/sdc11073/provider/components.py
+++ b/src/sdc11073/provider/components.py
@@ -28,7 +28,7 @@
# pylint: disable=cyclic-import
if TYPE_CHECKING:
- from lxml.etree import QName
+ from lxml import etree
from sdc11073 import provider
from sdc11073.mdib.providermdib import ProviderMdib
@@ -54,7 +54,7 @@ class SdcProviderComponents:
client_msg_reader_class: type[MessageReader] = None # the corresponding reader for client
xml_reader_class: type[MessageReader] = None # needed to read xml based mdib files
services_factory: Callable[[provider.SdcProvider, SdcProviderComponents, dict], HostedServices] = None
- operation_cls_getter: Callable[[QName], type] = None
+ operation_cls_getter: Callable[[etree.QName], type] = None
sco_operations_registry_class: type[AbstractScoOperationsRegistry] = None
subscriptions_manager_class: dict[str, type[SubscriptionManagerProtocol]] = None
role_provider_class: type = None
diff --git a/src/sdc11073/provider/dpwshostedservice.py b/src/sdc11073/provider/dpwshostedservice.py
index b1e9a536..fe6d748d 100644
--- a/src/sdc11073/provider/dpwshostedservice.py
+++ b/src/sdc11073/provider/dpwshostedservice.py
@@ -3,7 +3,7 @@
import typing
from io import BytesIO
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.dispatch import DispatchKey, RequestDispatcher
from sdc11073.namespaces import EventingActions
@@ -28,8 +28,8 @@
def etree_from_file(path: str | pathlib.Path) -> xml_utils.LxmlElement:
- parser = etree_.ETCompatXMLParser(resolve_entities=False)
- doc = etree_.parse(str(path), parser=parser)
+ parser = etree.ETCompatXMLParser(resolve_entities=False)
+ doc = etree.parse(str(path), parser=parser)
return doc.getroot()
@@ -126,11 +126,11 @@ def _mk_wsdl_string(self) -> bytes:
for port_type_impl in self.port_type_impls:
for entry in port_type_impl.additional_namespaces:
my_nsmap[entry.prefix] = entry.namespace
- wsdl_definitions = etree_.Element(etree_.QName(_wsdl_ns, 'definitions'),
- nsmap=my_nsmap,
- attrib={'targetNamespace': sdc_definitions.PortTypeNamespace})
+ wsdl_definitions = etree.Element(etree.QName(_wsdl_ns, 'definitions'),
+ nsmap=my_nsmap,
+ attrib={'targetNamespace': sdc_definitions.PortTypeNamespace})
- types = etree_.SubElement(wsdl_definitions, etree_.QName(_wsdl_ns, 'types'))
+ types = etree.SubElement(wsdl_definitions, etree.QName(_wsdl_ns, 'types'))
# remove annotations from schemas, this reduces wsdl size from 280kb to 100kb!
ext_schema_ = etree_from_file(ns_hlp.EXT.local_schema_file)
ext_schema = self._remove_annotations(ext_schema_)
@@ -148,7 +148,7 @@ def _mk_wsdl_string(self) -> bytes:
_port_type_impl.add_wsdl_port_type(wsdl_definitions)
for _port_type_impl in self.port_type_impls:
_port_type_impl.add_wsdl_binding(wsdl_definitions, porttype_prefix)
- return etree_.tostring(wsdl_definitions, encoding='UTF-8', xml_declaration=True)
+ return etree.tostring(wsdl_definitions, encoding='UTF-8', xml_declaration=True)
@staticmethod
def _remove_annotations(root_node):
@@ -164,8 +164,8 @@ def _remove_annotations(root_node):
"""
- remove_annotations_doc = etree_.parse(BytesIO(remove_annotations_string))
- remove_annotations_xslt = etree_.XSLT(remove_annotations_doc)
+ remove_annotations_doc = etree.parse(BytesIO(remove_annotations_string))
+ remove_annotations_xslt = etree.XSLT(remove_annotations_doc)
return remove_annotations_xslt(root_node).getroot()
def _on_get_metadata(self, request_data):
diff --git a/src/sdc11073/provider/operations.py b/src/sdc11073/provider/operations.py
index 4dfe4365..0b900a25 100644
--- a/src/sdc11073/provider/operations.py
+++ b/src/sdc11073/provider/operations.py
@@ -12,7 +12,7 @@
from sdc11073.xml_types import pm_qnames as pm
if TYPE_CHECKING:
- from lxml.etree import QName
+ from lxml import etree
from sdc11073.mdib.descriptorcontainers import AbstractDescriptorProtocol
from sdc11073.mdib.providermdib import ProviderMdib
from sdc11073.pysoap.soapenvelope import ReceivedSoapMessage
@@ -63,8 +63,8 @@ class OperationDefinitionBase:
current_request = properties.ObservableProperty(fire_only_on_changed_value=False)
current_argument = properties.ObservableProperty(fire_only_on_changed_value=False)
on_timeout = properties.ObservableProperty(fire_only_on_changed_value=False)
- OP_DESCR_QNAME: QName | None = None # to be defined in derived classes
- OP_STATE_QNAME: QName | None = None # to be defined in derived classes
+ OP_DESCR_QNAME: etree.QName | None = None # to be defined in derived classes
+ OP_STATE_QNAME: etree.QName | None = None # to be defined in derived classes
def __init__(self, # noqa: PLR0913
handle: str,
@@ -236,6 +236,6 @@ class SetMetricStateOperation(OperationDefinitionBase):
_operation_lookup_by_type = {c.OP_DESCR_QNAME: c for c in _classes_with_qname}
-def get_operation_class(q_name: QName) -> type[OperationDefinitionBase]:
+def get_operation_class(q_name: etree.QName) -> type[OperationDefinitionBase]:
""":param q_name: a QName instance"""
return _operation_lookup_by_type.get(q_name)
diff --git a/src/sdc11073/provider/porttypes/porttypebase.py b/src/sdc11073/provider/porttypes/porttypebase.py
index 54bcc336..82d1e510 100644
--- a/src/sdc11073/provider/porttypes/porttypebase.py
+++ b/src/sdc11073/provider/porttypes/porttypebase.py
@@ -3,7 +3,7 @@
from collections import namedtuple
from typing import TYPE_CHECKING, ClassVar
-from lxml import etree as etree_
+from lxml import etree
from sdc11073 import loghelper
from sdc11073.namespaces import PrefixesEnum
@@ -21,9 +21,9 @@
WSDL_S12 = PrefixesEnum.WSDL12.namespace # old soap 12 namespace, used in wsdl 1.1. used only for wsdl
_wsdl_ns = PrefixesEnum.WSDL.namespace
-_wsdl_message = etree_.QName(_wsdl_ns, 'message')
-_wsdl_part = etree_.QName(_wsdl_ns, 'part')
-_wsdl_operation = etree_.QName(_wsdl_ns, 'operation')
+_wsdl_message = etree.QName(_wsdl_ns, 'message')
+_wsdl_part = etree.QName(_wsdl_ns, 'part')
+_wsdl_operation = etree.QName(_wsdl_ns, 'operation')
# WSDL Generation:
# types to allow declaration of a wsdl data per service
@@ -40,7 +40,7 @@ class DPWSPortTypeBase:
Handlers are registered in the hosting service instance.
"""
- port_type_name: etree_.QName | None = None
+ port_type_name: etree.QName | None = None
WSDLOperationBindings = () # overwrite in derived classes
WSDLMessageDescriptions = () # overwrite in derived classes
additional_namespaces: ClassVar[list[PrefixNamespace]] = [] # for special namespaces
@@ -81,12 +81,12 @@ def _mk_port_type_node(self, parent_node: xml_utils.LxmlElement,
raise ValueError('self.port_type_name is not set, cannot create port type node')
if 'dt' in parent_node.nsmap:
- port_type = etree_.SubElement(parent_node, etree_.QName(_wsdl_ns, 'portType'),
- attrib={'name': self.port_type_name.localname,
- PrefixesEnum.DPWS.tag('DiscoveryType'): 'dt:ServiceProvider'})
+ port_type = etree.SubElement(parent_node, etree.QName(_wsdl_ns, 'portType'),
+ attrib={'name': self.port_type_name.localname,
+ PrefixesEnum.DPWS.tag('DiscoveryType'): 'dt:ServiceProvider'})
else:
- port_type = etree_.SubElement(parent_node, etree_.QName(_wsdl_ns, 'portType'),
- attrib={'name': self.port_type_name.localname})
+ port_type = etree.SubElement(parent_node, etree.QName(_wsdl_ns, 'portType'),
+ attrib={'name': self.port_type_name.localname})
if is_event_source:
port_type.attrib[PrefixesEnum.WSE.tag('EventSource')] = 'true'
return port_type
@@ -104,11 +104,11 @@ def add_wsdl_messages(self, parent_node):
:return:
"""
for msg in self.WSDLMessageDescriptions:
- elem = etree_.SubElement(parent_node, _wsdl_message, attrib={'name': msg.name})
+ elem = etree.SubElement(parent_node, _wsdl_message, attrib={'name': msg.name})
for element_name in msg.parameters:
- etree_.SubElement(elem, _wsdl_part,
- attrib={'name': 'parameters',
- 'element': element_name})
+ etree.SubElement(elem, _wsdl_part,
+ attrib={'name': 'parameters',
+ 'element': element_name})
def add_wsdl_binding(self, parent_node, porttype_prefix):
"""Add wsdl:binding node to parent_node.
@@ -132,23 +132,23 @@ def add_wsdl_binding(self, parent_node, porttype_prefix):
"""
v_ref = self._sdc_device.mdib.sdc_definitions
p_type = self.port_type_name.localname
- wsdl_binding = etree_.SubElement(parent_node, etree_.QName(_wsdl_ns, 'binding'),
- attrib={'name': p_type + 'Binding',
- 'type': f'{porttype_prefix}:{p_type}'})
- etree_.SubElement(wsdl_binding, etree_.QName(WSDL_S12, 'binding'),
- attrib={'style': 'document', 'transport': 'http://schemas.xmlsoap.org/soap/http'})
+ wsdl_binding = etree.SubElement(parent_node, etree.QName(_wsdl_ns, 'binding'),
+ attrib={'name': p_type + 'Binding',
+ 'type': f'{porttype_prefix}:{p_type}'})
+ etree.SubElement(wsdl_binding, etree.QName(WSDL_S12, 'binding'),
+ attrib={'style': 'document', 'transport': 'http://schemas.xmlsoap.org/soap/http'})
_add_policy_dpws_profile(wsdl_binding)
for wsdl_op in self.WSDLOperationBindings:
- wsdl_operation = etree_.SubElement(wsdl_binding, etree_.QName(_wsdl_ns, 'operation'),
- attrib={'name': wsdl_op.name})
- etree_.SubElement(wsdl_operation, etree_.QName(WSDL_S12, 'operation'),
- attrib={'soapAction': f'{v_ref.ActionsNamespace}/{p_type}/{wsdl_op.name}'})
+ wsdl_operation = etree.SubElement(wsdl_binding, etree.QName(_wsdl_ns, 'operation'),
+ attrib={'name': wsdl_op.name})
+ etree.SubElement(wsdl_operation, etree.QName(WSDL_S12, 'operation'),
+ attrib={'soapAction': f'{v_ref.ActionsNamespace}/{p_type}/{wsdl_op.name}'})
if wsdl_op.input is not None:
- wsdl_input = etree_.SubElement(wsdl_operation, etree_.QName(_wsdl_ns, 'input'))
- etree_.SubElement(wsdl_input, etree_.QName(WSDL_S12, 'body'), attrib={'use': wsdl_op.input})
+ wsdl_input = etree.SubElement(wsdl_operation, etree.QName(_wsdl_ns, 'input'))
+ etree.SubElement(wsdl_input, etree.QName(WSDL_S12, 'body'), attrib={'use': wsdl_op.input})
if wsdl_op.output is not None:
- wsdl_output = etree_.SubElement(wsdl_operation, etree_.QName(_wsdl_ns, 'output'))
- etree_.SubElement(wsdl_output, etree_.QName(WSDL_S12, 'body'), attrib={'use': wsdl_op.output})
+ wsdl_output = etree.SubElement(wsdl_operation, etree.QName(_wsdl_ns, 'output'))
+ etree.SubElement(wsdl_output, etree.QName(WSDL_S12, 'body'), attrib={'use': wsdl_op.output})
def _mk_offered_subscriptions(self) -> list:
"""Takes action strings from sdc_definitions.Actions.
@@ -195,15 +195,15 @@ def _handle_operation_request(self, request_data: RequestData,
def _mk_wsdl_operation(parent_node, operation_name, input_message_name, output_message_name) -> xml_utils.LxmlElement:
- elem = etree_.SubElement(parent_node, _wsdl_operation, attrib={'name': operation_name})
+ elem = etree.SubElement(parent_node, _wsdl_operation, attrib={'name': operation_name})
if input_message_name is not None:
- etree_.SubElement(elem, etree_.QName(_wsdl_ns, 'input'),
- attrib={'message': f'tns:{input_message_name}',
- })
+ etree.SubElement(elem, etree.QName(_wsdl_ns, 'input'),
+ attrib={'message': f'tns:{input_message_name}',
+ })
if output_message_name is not None:
- etree_.SubElement(elem, etree_.QName(_wsdl_ns, 'output'),
- attrib={'message': f'tns:{output_message_name}',
- })
+ etree.SubElement(elem, etree.QName(_wsdl_ns, 'output'),
+ attrib={'message': f'tns:{output_message_name}',
+ })
return elem
@@ -249,8 +249,8 @@ def _add_policy_dpws_profile(parent_node):
"""
- wsp_policy_node = etree_.SubElement(parent_node, etree_.QName(WSP_NS, 'Policy'), attrib=None)
- _ = etree_.SubElement(wsp_policy_node, PrefixesEnum.DPWS.tag('Profile'),
- attrib={etree_.QName(WSP_NS, 'Optional'): 'true'})
- _ = etree_.SubElement(wsp_policy_node, PrefixesEnum.MDPWS.tag('Profile'),
- attrib={etree_.QName(WSP_NS, 'Optional'): 'true'})
+ wsp_policy_node = etree.SubElement(parent_node, etree.QName(WSP_NS, 'Policy'), attrib=None)
+ _ = etree.SubElement(wsp_policy_node, PrefixesEnum.DPWS.tag('Profile'),
+ attrib={etree.QName(WSP_NS, 'Optional'): 'true'})
+ _ = etree.SubElement(wsp_policy_node, PrefixesEnum.MDPWS.tag('Profile'),
+ attrib={etree.QName(WSP_NS, 'Optional'): 'true'})
diff --git a/src/sdc11073/provider/subscriptionmgr.py b/src/sdc11073/provider/subscriptionmgr.py
index 4a13ab96..c956e1ba 100644
--- a/src/sdc11073/provider/subscriptionmgr.py
+++ b/src/sdc11073/provider/subscriptionmgr.py
@@ -2,7 +2,6 @@
from typing import TYPE_CHECKING
-
from sdc11073.xml_types.addressing_types import HeaderInformationBlock
from .subscriptionmgr_base import ActionBasedSubscription, SubscriptionsManagerBase
from sdc11073 import observableproperties
@@ -12,7 +11,6 @@
from sdc11073.xml_types.dpws_types import DeviceEventingFilterDialectURI
if TYPE_CHECKING:
- from lxml import etree as etree_
from sdc11073.dispatch import RequestData
from sdc11073 import xml_utils
@@ -20,6 +18,7 @@
class BicepsSubscription(ActionBasedSubscription):
""" This extends ActionBasedSubscription with the ability to send notifications.
The class is used by ActionBasedSubscriptionsManager."""
+
def send_notification_report(self, body_node: xml_utils.LxmlElement, action: str):
if not self.is_valid:
return
diff --git a/src/sdc11073/provider/subscriptionmgr_async.py b/src/sdc11073/provider/subscriptionmgr_async.py
index 481c3cb7..98d3b40a 100644
--- a/src/sdc11073/provider/subscriptionmgr_async.py
+++ b/src/sdc11073/provider/subscriptionmgr_async.py
@@ -8,7 +8,7 @@
from typing import TYPE_CHECKING, Any
import aiohttp.client_exceptions
-from lxml import etree as etree_
+from lxml import etree
from sdc11073 import observableproperties
from sdc11073.etc import apply_map
@@ -248,11 +248,11 @@ async def _async_send_notification_report(self, subscription: BicepsSubscription
socket.timeout) as ex:
# this is an error related to the connection => log warning and continue
self._logger.warning('could not send notification report {} warning= {!r}: {}', # noqa: PLE1205
- action, ex, subscription)
- except etree_.DocumentInvalid as ex:
+ action, ex, subscription)
+ except etree.DocumentInvalid as ex:
# this is an error related to the document, it cannot be sent to any subscriber => re-raise
self._logger.warning('Invalid Document for action {}: {!r}\n{}', # noqa: PLE1205
- action, ex, etree_.tostring(body_node))
+ action, ex, etree.tostring(body_node))
raise
except Exception:
# this should never happen! => re-raise
diff --git a/src/sdc11073/provider/subscriptionmgr_base.py b/src/sdc11073/provider/subscriptionmgr_base.py
index 50a255ee..3e118102 100644
--- a/src/sdc11073/provider/subscriptionmgr_base.py
+++ b/src/sdc11073/provider/subscriptionmgr_base.py
@@ -10,7 +10,7 @@
from typing import TYPE_CHECKING, Any, Protocol
from urllib.parse import urlparse
-from lxml import etree as etree_
+from lxml import etree
from sdc11073 import loghelper, multikey, observableproperties, xml_utils
from sdc11073.etc import apply_map
@@ -64,7 +64,7 @@ def _mk_dispatch_identifier(reference_parameters: list, path_suffix: str):
class SubscriptionBase:
MAX_NOTIFY_ERRORS = 1
- IDENT_TAG = etree_.QName('http.local.com', 'MyDevIdentifier')
+ IDENT_TAG = etree.QName('http.local.com', 'MyDevIdentifier')
def __init__(self, mgr,
subscribe_request: evt_types.Subscribe,
@@ -124,7 +124,7 @@ def __init__(self, mgr,
def set_reference_parameter(self):
"""Create a ReferenceParameters instance with a reference parameter."""
- reference_parameter = etree_.Element(self.IDENT_TAG)
+ reference_parameter = etree.Element(self.IDENT_TAG)
reference_parameter.text = self.identifier_uuid.hex
self.reference_parameters.append(reference_parameter)
@@ -470,9 +470,9 @@ def _send_notification_report(self, subscription, body_node: xml_utils.LxmlEleme
except socket.timeout as ex:
# this is an error related to the connection => log error and continue
self._logger.error('could not send notification report error= {!r}: {}', ex, subscription)
- except etree_.DocumentInvalid as ex:
+ except etree.DocumentInvalid as ex:
# this is an error related to the document, it cannot be sent to any subscriber => re-raise
- self._logger.error('Invalid Document: {!r}\n{}', ex, etree_.tostring(body_node))
+ self._logger.error('Invalid Document: {!r}\n{}', ex, etree.tostring(body_node))
raise
except Exception as ex:
# this should never happen! => re-raise
diff --git a/src/sdc11073/pysoap/msgfactory.py b/src/sdc11073/pysoap/msgfactory.py
index ba07ed44..bd3dccf6 100644
--- a/src/sdc11073/pysoap/msgfactory.py
+++ b/src/sdc11073/pysoap/msgfactory.py
@@ -3,7 +3,7 @@
from io import BytesIO
from typing import Optional, Union, List, Type, TYPE_CHECKING
-from lxml import etree as etree_
+from lxml import etree
from .msgreader import validate_node
from .soapenvelope import Soap12Envelope
@@ -45,7 +45,7 @@ def __init__(self, sdc_definitions: Type[BaseDefinitions],
self._logger = logger
self.ns_hlp = sdc_definitions.data_model.ns_helper
self._validate = validate
- self._xml_schema: etree_.XMLSchema = mk_schema_validator(self.schema_specs, self.ns_hlp)
+ self._xml_schema: etree.XMLSchema = mk_schema_validator(self.schema_specs, self.ns_hlp)
def serialize_message(self, message: CreatedMessage, pretty=False,
request_manipulator=None, validate=True) -> bytes:
@@ -60,14 +60,14 @@ def serialize_message(self, message: CreatedMessage, pretty=False,
p_msg = message.p_msg
nsh = self.ns_hlp
tmp = BytesIO()
- root = etree_.Element(nsh.S12.tag('Envelope'), nsmap=p_msg.nsmap)
+ root = etree.Element(nsh.S12.tag('Envelope'), nsmap=p_msg.nsmap)
- header_node = etree_.SubElement(root, nsh.S12.tag('Header'))
+ header_node = etree.SubElement(root, nsh.S12.tag('Header'))
if p_msg.header_info_block:
info_node = p_msg.header_info_block.as_etree_node('tmp', {})
header_node.extend(info_node[:])
header_node.extend(p_msg.header_nodes)
- body_node = etree_.SubElement(root, nsh.S12.tag('Body'), nsmap=p_msg.nsmap)
+ body_node = etree.SubElement(root, nsh.S12.tag('Body'), nsmap=p_msg.nsmap)
if validate:
self._validate_node(root)
if p_msg.payload_element is not None:
@@ -75,7 +75,7 @@ def serialize_message(self, message: CreatedMessage, pretty=False,
self._validate_node(p_msg.payload_element)
body_node.append(p_msg.payload_element)
- doc = etree_.ElementTree(element=root)
+ doc = etree.ElementTree(element=root)
if hasattr(request_manipulator, 'manipulate_domtree'):
_doc = request_manipulator.manipulate_domtree(doc)
if _doc:
diff --git a/src/sdc11073/pysoap/msgreader.py b/src/sdc11073/pysoap/msgreader.py
index 9ca62917..34540230 100644
--- a/src/sdc11073/pysoap/msgreader.py
+++ b/src/sdc11073/pysoap/msgreader.py
@@ -7,7 +7,7 @@
from io import BytesIO
from typing import TYPE_CHECKING
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.exceptions import ValidationError
from sdc11073.namespaces import QN_TYPE, default_ns_helper, text_to_qname
@@ -27,13 +27,13 @@
from sdc11073 import xml_utils
-def validate_node(node: xml_utils.LxmlElement, xml_schema: etree_.XMLSchema, logger: LoggerAdapter):
+def validate_node(node: xml_utils.LxmlElement, xml_schema: etree.XMLSchema, logger: LoggerAdapter):
"""Let xml_schema instance validate the node."""
try:
xml_schema.assertValid(node)
- except etree_.DocumentInvalid as ex:
+ except etree.DocumentInvalid as ex:
logger.warning(traceback.format_exc())
- logger.warning(etree_.tostring(node, pretty_print=True).decode('utf-8'))
+ logger.warning(etree.tostring(node, pretty_print=True).decode('utf-8'))
fault = Fault()
fault.Code.Value = faultcodeEnum.SENDER
fault.set_sub_code(default_ns_helper.WSE.tag('InvalidMessage'))
@@ -42,7 +42,7 @@ def validate_node(node: xml_utils.LxmlElement, xml_schema: etree_.XMLSchema, log
raise ValidationError(reason='document invalid', soap_fault=fault) from ex
-def _get_text(node: xml_utils.LxmlElement, q_name: etree_.QName) -> str | None:
+def _get_text(node: xml_utils.LxmlElement, q_name: etree.QName) -> str | None:
if node is None:
return None
tmp = node.find(q_name)
@@ -84,11 +84,10 @@ class ReceivedMessage:
msg_reader: MessageReader
p_msg: ReceivedSoapMessage
action: str | None
- q_name: etree_.QName
+ q_name: etree.QName
mdib_version_group: MdibVersionGroupReader
-
class MessageReader:
"""MessageReader does all the conversions from DOM trees (body of SOAP messages) to MDIB objects."""
@@ -103,7 +102,7 @@ def __init__(self, sdc_definitions: type[BaseDefinitions],
self._data_model = sdc_definitions.data_model
self.ns_hlp = sdc_definitions.data_model.ns_helper
self._validate = validate
- self._xml_schema: etree_.XMLSchema = mk_schema_validator(self.schema_specs, self.ns_hlp)
+ self._xml_schema: etree.XMLSchema = mk_schema_validator(self.schema_specs, self.ns_hlp)
@property
def msg_names(self) -> ModuleType:
@@ -125,20 +124,20 @@ def msg_types(self) -> ModuleType:
"""Return a module with message model types."""
return self._data_model.msg_types
- def get_descriptor_container_class(self, qname: etree_.QName) -> type[AbstractDescriptorProtocol]:
+ def get_descriptor_container_class(self, qname: etree.QName) -> type[AbstractDescriptorProtocol]:
"""Get the class that represents a BICEPS descriptor entity with given QName."""
return self._data_model.get_descriptor_container_class(qname)
- def get_state_container_class(self, qname: etree_.QName) -> type[AbstractStateProtocol]:
+ def get_state_container_class(self, qname: etree.QName) -> type[AbstractStateProtocol]:
"""Return the class that represents a BICEPS state entity with given QName."""
return self._data_model.get_state_container_class(qname)
def read_received_message(self, xml_text: bytes, validate: bool = True) -> ReceivedMessage:
"""Read complete message with addressing, message_id, payload,..."""
- parser = etree_.ETCompatXMLParser(resolve_entities=False)
+ parser = etree.ETCompatXMLParser(resolve_entities=False)
try:
- doc_root = etree_.fromstring(xml_text, parser=parser)
- except etree_.XMLSyntaxError as ex:
+ doc_root = etree.fromstring(xml_text, parser=parser)
+ except etree.XMLSyntaxError as ex:
self._logger.warning('Error reading response ex=%r xml=%s', ex, xml_text.decode('utf-8'))
raise
if validate:
@@ -159,13 +158,13 @@ def read_received_message(self, xml_text: bytes, validate: bool = True) -> Recei
message.msg_name, mdib_version_group)
def read_get_mdib_response(self, received_message_data: ReceivedMessage) -> tuple[
- list[AbstractDescriptorProtocol], list[AbstractStateProtocol]]:
+ list[AbstractDescriptorProtocol], list[AbstractStateProtocol]]:
"""Return list of all descriptors and states in mdib of received message."""
mdib_node = received_message_data.p_msg.msg_node[0]
return self.read_get_mdib_payload(mdib_node)
def read_get_mdib_payload(self, mdib_node: xml_utils.LxmlElement) -> tuple[
- list[AbstractDescriptorProtocol], list[AbstractStateProtocol]]:
+ list[AbstractDescriptorProtocol], list[AbstractStateProtocol]]:
"""Return list of all descriptors and states in mdib."""
descriptors = []
states = []
@@ -180,16 +179,16 @@ def read_get_mdib_payload(self, mdib_node: xml_utils.LxmlElement) -> tuple[
def read_mdib_xml(self, xml_text: bytes) -> tuple[list[AbstractDescriptorProtocol], list[AbstractStateProtocol]]:
"""Return list of all descriptors and states in mdib."""
payload = self.read_xml_text(xml_text)
- q_name = etree_.QName(payload.tag)
+ q_name = etree.QName(payload.tag)
if q_name == self.msg_names.GetMdibResponse:
return self.read_get_mdib_payload(payload[0])
return self.read_get_mdib_payload(payload)
def read_xml_text(self, xml_text: bytes) -> xml_utils.LxmlElement:
"""Parse imput, return a node."""
- parser = etree_.ETCompatXMLParser(resolve_entities=False)
+ parser = etree.ETCompatXMLParser(resolve_entities=False)
try:
- node = etree_.fromstring(xml_text, parser=parser)
+ node = etree.fromstring(xml_text, parser=parser)
except Exception as ex:
print(f'load error "{ex}" in "{xml_text}"')
raise
@@ -226,12 +225,12 @@ def _read_md_state_node(self, md_state_node: xml_utils.LxmlElement) -> list[Abst
def _mk_descriptor_container_from_node(self, node: xml_utils.LxmlElement,
parent_handle: str | None) -> AbstractDescriptorProtocol:
node_type = node.get(QN_TYPE)
- node_type = text_to_qname(node_type, node.nsmap) if node_type is not None else etree_.QName(node.tag)
+ node_type = text_to_qname(node_type, node.nsmap) if node_type is not None else etree.QName(node.tag)
descr_cls = self.get_descriptor_container_class(node_type)
return descr_cls.from_node(node, parent_handle)
def _mk_state_container_from_node(self, node: xml_utils.LxmlElement,
- forced_type: etree_.QName | None = None) -> AbstractStateProtocol:
+ forced_type: etree.QName | None = None) -> AbstractStateProtocol:
"""Create a state container from a node.
:param node: an etree node
@@ -262,6 +261,6 @@ def _validate_node(self, node: xml_utils.LxmlElement):
validate_node(node, self._xml_schema, self._logger)
@staticmethod
- def read_wsdl(wsdl_text: bytes) -> etree_.ElementTree:
+ def read_wsdl(wsdl_text: bytes) -> etree.ElementTree:
"""Make am ElementTree instance."""
- return etree_.parse(BytesIO(wsdl_text), parser=etree_.ETCompatXMLParser(resolve_entities=False))
+ return etree.parse(BytesIO(wsdl_text), parser=etree.ETCompatXMLParser(resolve_entities=False))
diff --git a/src/sdc11073/pysoap/soapclient.py b/src/sdc11073/pysoap/soapclient.py
index 5678e29e..49aa5db4 100644
--- a/src/sdc11073/pysoap/soapclient.py
+++ b/src/sdc11073/pysoap/soapclient.py
@@ -15,7 +15,7 @@
from threading import Lock
from typing import TYPE_CHECKING, Protocol
-from lxml.etree import XMLSyntaxError
+from lxml import etree
from sdc11073 import commlog, observableproperties
from sdc11073.httpserver.compression import CompressionHandler
@@ -346,7 +346,7 @@ def _send_soap_request(self, path: str, xml: bytes, log_msg: str) -> tuple[HTTPR
self._netloc, path, response.status, content.decode('utf-8'))
try:
tmp = self._msg_reader.read_received_message(content)
- except XMLSyntaxError as ex:
+ except etree.XMLSyntaxError as ex:
raise HTTPReturnCodeError(response.status, response.reason, None) from ex
else:
soap_fault = Fault.from_node(tmp.p_msg.msg_node)
diff --git a/src/sdc11073/pysoap/soapenvelope.py b/src/sdc11073/pysoap/soapenvelope.py
index 2ae0ba7c..53a74093 100644
--- a/src/sdc11073/pysoap/soapenvelope.py
+++ b/src/sdc11073/pysoap/soapenvelope.py
@@ -3,7 +3,7 @@
from enum import Enum
from typing import TYPE_CHECKING
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.exceptions import ApiUsageError
from sdc11073.namespaces import default_ns_helper as ns_hlp
@@ -25,7 +25,7 @@ def __init__(self, response_envelope: ReceivedSoapMessage):
self.response_envelope = response_envelope
-class ExtendedDocumentInvalid(etree_.DocumentInvalid):
+class ExtendedDocumentInvalid(etree.DocumentInvalid):
"""Exception for invalid document."""
@@ -88,7 +88,7 @@ def __init__(self, xml_text: bytes, doc_root: xml_utils.LxmlElement):
self.header_info_block: HeaderInformationBlock | None = None
try:
self.msg_node = self.body_node[0]
- self.msg_name = etree_.QName(self.msg_node.tag)
+ self.msg_name = etree.QName(self.msg_node.tag)
except IndexError: # body has no content, this can happen
self.msg_node = None
self.msg_name = None
@@ -122,7 +122,7 @@ class faultreason(XMLTypeBase): # noqa: N801
class subcode(XMLTypeBase): # noqa: N801
"""Value is a QName."""
- Value: etree_.QName = struct.NodeTextQNameProperty(ns_hlp.S12.tag('Value'))
+ Value: etree.QName = struct.NodeTextQNameProperty(ns_hlp.S12.tag('Value'))
# optional Subcode Element intentionally omitted, it is of type subcode => recursion, bad idea!
_props = ('Value',)
@@ -157,7 +157,7 @@ def add_reason_text(self, text: str, lang: str = 'en-US'):
txt.text = text
self.Reason.Text.append(txt)
- def set_sub_code(self, sub_code: etree_.QName):
+ def set_sub_code(self, sub_code: etree.QName):
"""Set sub code."""
self.Code.Subcode = subcode()
self.Code.Subcode.Value = sub_code
diff --git a/src/sdc11073/roles/contextprovider.py b/src/sdc11073/roles/contextprovider.py
index 3135283f..ea351989 100644
--- a/src/sdc11073/roles/contextprovider.py
+++ b/src/sdc11073/roles/contextprovider.py
@@ -9,7 +9,7 @@
from . import providerbase
if TYPE_CHECKING:
- from lxml.etree import QName
+ from lxml import etree
from sdc11073.mdib.descriptorcontainers import AbstractOperationDescriptorProtocol
from sdc11073.mdib.providermdib import ProviderMdib
@@ -22,7 +22,7 @@ class GenericContextProvider(providerbase.ProviderRole):
"""Handles SetContextState operations."""
def __init__(self, mdib: ProviderMdib,
- op_target_descr_types: list[QName] | None = None,
+ op_target_descr_types: list[etree.QName] | None = None,
log_prefix: str | None = None):
super().__init__(mdib, log_prefix)
self._op_target_descr_types = op_target_descr_types
diff --git a/src/sdc11073/roles/providerbase.py b/src/sdc11073/roles/providerbase.py
index 71c2eb8e..c1ce896c 100644
--- a/src/sdc11073/roles/providerbase.py
+++ b/src/sdc11073/roles/providerbase.py
@@ -2,7 +2,7 @@
from typing import TYPE_CHECKING, Callable
-from lxml.etree import QName
+from lxml import etree
from sdc11073 import loghelper
from sdc11073.provider.operations import OperationDefinitionBase
@@ -15,7 +15,7 @@
from sdc11073.provider.sco import AbstractScoOperationsRegistry
from sdc11073.xml_types.pm_types import CodedValue, SafetyClassification
-OperationClassGetter = Callable[[QName], type[OperationDefinitionBase]]
+OperationClassGetter = Callable[[etree.QName], type[OperationDefinitionBase]]
class ProviderRole:
diff --git a/src/sdc11073/schema_resolver.py b/src/sdc11073/schema_resolver.py
index aacda487..b976d0bd 100644
--- a/src/sdc11073/schema_resolver.py
+++ b/src/sdc11073/schema_resolver.py
@@ -6,7 +6,7 @@
from io import StringIO
from typing import TYPE_CHECKING, Any
-from lxml import etree as etree_
+from lxml import etree
from urllib import parse
from . import loghelper
@@ -15,10 +15,10 @@
from .namespaces import NamespaceHelper, PrefixNamespace
-def mk_schema_validator(namespaces: list[PrefixNamespace], ns_helper: NamespaceHelper) -> etree_.XMLSchema:
+def mk_schema_validator(namespaces: list[PrefixNamespace], ns_helper: NamespaceHelper) -> etree.XMLSchema:
"""Create a schema validator."""
schema_resolver = SchemaResolver(namespaces)
- parser = etree_.XMLParser(resolve_entities=True)
+ parser = etree.XMLParser(resolve_entities=True)
parser.resolvers.add(schema_resolver)
prefix_enum = ns_helper.prefix_enum
not_needed = [prefix_enum.XSD]
@@ -32,20 +32,20 @@ def mk_schema_validator(namespaces: list[PrefixNamespace], ns_helper: NamespaceH
tmp.write('')
all_included = tmp.getvalue().encode('utf-8')
- elem_tree = etree_.fromstring(all_included, parser=parser, base_url='C://')
+ elem_tree = etree.fromstring(all_included, parser=parser, base_url='C://')
# for unknown reason creating the schema fails sometimes. repeat up to 3 times.
try:
- return etree_.XMLSchema(etree=elem_tree)
- except etree_.XMLSchemaParseError:
+ return etree.XMLSchema(etree=elem_tree)
+ except etree.XMLSchemaParseError:
time.sleep(0.1)
try:
- return etree_.XMLSchema(etree=elem_tree)
- except etree_.XMLSchemaParseError:
+ return etree.XMLSchema(etree=elem_tree)
+ except etree.XMLSchemaParseError:
time.sleep(0.5)
- return etree_.XMLSchema(etree=elem_tree)
+ return etree.XMLSchema(etree=elem_tree)
-class SchemaResolver(etree_.Resolver):
+class SchemaResolver(etree.Resolver):
"""A Resolver that uses a list of PrefixNamespace for resolving."""
def __init__(self, namespaces: list[PrefixNamespace], log_prefix: str | None = None):
@@ -53,7 +53,7 @@ def __init__(self, namespaces: list[PrefixNamespace], log_prefix: str | None = N
self.namespaces = namespaces
self._logger = loghelper.get_logger_adapter('sdc.schema_resolver', log_prefix)
- def resolve(self, system_url: str, _: Any, context: Any) -> Any: # return whatever type resolve_string returns
+ def resolve(self, system_url: str, _: Any, context: Any) -> Any: # return whatever type resolve_string returns
"""Look for xml file location in self.namespaces."""
self._logger.debug('try to resolve %s', system_url)
try:
diff --git a/src/sdc11073/wsdiscovery/networkingthread.py b/src/sdc11073/wsdiscovery/networkingthread.py
index 67f21b83..06f34cd0 100644
--- a/src/sdc11073/wsdiscovery/networkingthread.py
+++ b/src/sdc11073/wsdiscovery/networkingthread.py
@@ -16,7 +16,7 @@
import traceback
from typing import TYPE_CHECKING
-from lxml.etree import XMLSyntaxError
+from lxml import etree
from sdc11073 import commlog
from sdc11073.exceptions import ValidationError
@@ -199,7 +199,7 @@ def _run_q_read(self):
try:
try:
received_message = message_reader.read_received_message(data, validate=True)
- except (XMLSyntaxError, ValidationError) as ex:
+ except (etree.XMLSyntaxError, ValidationError) as ex:
self._logger.info('_run_q_read: received invalid message from %r, ignoring it (error=%s)', addr,
ex)
else:
diff --git a/src/sdc11073/wsdiscovery/service.py b/src/sdc11073/wsdiscovery/service.py
index 97811046..6446931d 100644
--- a/src/sdc11073/wsdiscovery/service.py
+++ b/src/sdc11073/wsdiscovery/service.py
@@ -3,7 +3,7 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
- from lxml.etree import QName
+ from lxml import etree
from sdc11073.xml_types import wsd_types
@@ -15,7 +15,7 @@ class Service:
"""
def __init__(self,
- types: list[QName] | None,
+ types: list[etree.QName] | None,
scopes: wsd_types.ScopesType | None,
x_addrs: list[str] | None,
epr: str,
diff --git a/src/sdc11073/wsdiscovery/wsdimpl.py b/src/sdc11073/wsdiscovery/wsdimpl.py
index 9af60d25..1956b916 100644
--- a/src/sdc11073/wsdiscovery/wsdimpl.py
+++ b/src/sdc11073/wsdiscovery/wsdimpl.py
@@ -25,7 +25,7 @@
from collections.abc import Iterable
from logging import Logger
- from lxml.etree import QName
+ from lxml import etree
from sdc11073.location import SdcLocation
from sdc11073.pysoap.msgfactory import CreatedMessage
@@ -53,7 +53,7 @@ class MatchBy(str, Enum):
strcmp = NS_D + '/strcmp0' # "http://docs.oasis-open.org/ws-dd/ns/discovery/2009/01/strcmp0"
-def types_info(types: Iterable[QName] | None) -> list[str] | None:
+def types_info(types: Iterable[etree.QName] | None) -> list[str] | None:
"""Make printable strings from list of QNames (helper method for logging)."""
return [str(t) for t in types] if types else types
@@ -83,12 +83,12 @@ def match_scope(my_scope: str, other_scope: str, match_by: MatchBy | str | None)
return False
-def match_type(type1: QName, type2: QName) -> bool:
+def match_type(type1: etree.QName, type2: etree.QName) -> bool:
"""Check if namespace and localname are identical."""
return type1.namespace == type2.namespace and type1.localname == type2.localname
-def _is_type_in_list(ttype: QName, types: list[QName]) -> bool:
+def _is_type_in_list(ttype: etree.QName, types: list[etree.QName]) -> bool:
return any(match_type(ttype, entry) for entry in types)
@@ -101,7 +101,7 @@ def _is_scope_in_list(uri: str, match_by: str, srv_sc: wsd_types.ScopesType) ->
def matches_filter(service: Service,
- types: Iterable[QName] | None,
+ types: Iterable[etree.QName] | None,
scopes: wsd_types.ScopesType | None) -> bool:
"""Check if service matches the types and scopes."""
if types is not None:
@@ -116,7 +116,7 @@ def matches_filter(service: Service,
def filter_services(services: Iterable[Service],
- types: Iterable[QName] | None,
+ types: Iterable[etree.QName] | None,
scopes: wsd_types.ScopesType | None) -> list[Service]:
"""Filter services that match types and scopes."""
return [service for service in services if matches_filter(service, types, scopes)]
@@ -182,7 +182,7 @@ def stop(self):
self._server_started = False
def search_services(self,
- types: Iterable[QName] | None = None,
+ types: Iterable[etree.QName] | None = None,
scopes: wsd_types.ScopesType | None = None,
timeout: int | float | None = 5,
repeat_probe_interval: int | None = 3) -> list[Service]:
@@ -224,7 +224,7 @@ def search_sdc_services(self,
return self.search_services(SdcV1Definitions.MedicalDeviceTypesFilter, scopes, timeout, repeat_probe_interval)
def search_multiple_types(self,
- types_list: list[list[QName]],
+ types_list: list[list[etree.QName]],
scopes: wsd_types.ScopesType | None = None,
timeout: int | float | None = 10,
repeat_probe_interval: int | None = 3) -> list[Service]:
@@ -265,7 +265,7 @@ def search_sdc_device_services_in_location(self, sdc_location: SdcLocation, time
return sdc_location.filter_services_inside(services)
def publish_service(self, epr: str,
- types: list[QName],
+ types: list[etree.QName],
scopes: wsd_types.ScopesType,
x_addrs: list[str]):
"""Publish a service with the given TYPES, SCOPES and XAddrs (service addresses).
@@ -305,7 +305,7 @@ def get_active_addresses(self) -> list[str]:
def set_remote_service_hello_callback(self,
callback: Callable,
- types: list[QName] | None = None,
+ types: list[etree.QName] | None = None,
scopes: wsd_types.ScopesType | None = None):
"""Set callback, which will be called when new service appeared online and sent Hello message.
@@ -547,7 +547,7 @@ def _send_probe_match(self, services: list[Service], relates_to: str, addr: str)
self._networking_thread.add_outbound_message(created_message, addr[0], addr[1],
networkingthread.UNICAST_REPEAT_PARAMS)
- def _send_probe(self, types: Iterable[QName] | None = None, scopes: wsd_types.ScopesType | None = None):
+ def _send_probe(self, types: Iterable[etree.QName] | None = None, scopes: wsd_types.ScopesType | None = None):
types = list(types) if types is not None else None # enforce iteration
self._logger.debug('sending probe types=%r scopes=%r', types_info(types), scopes)
payload = wsd_types.ProbeType()
diff --git a/src/sdc11073/xml_types/addressing_types.py b/src/sdc11073/xml_types/addressing_types.py
index 39265811..ee1b280f 100644
--- a/src/sdc11073/xml_types/addressing_types.py
+++ b/src/sdc11073/xml_types/addressing_types.py
@@ -16,7 +16,7 @@
from sdc11073.xml_types.basetypes import ElementWithText, XMLTypeBase
if TYPE_CHECKING:
- from lxml.etree import QName
+ from lxml import etree
from sdc11073 import xml_utils
@@ -122,7 +122,7 @@ def mk_reply_header_block(self,
return reply_address
def as_etree_node(self,
- q_name: QName, ns_map: dict[str, str],
+ q_name: etree.QName, ns_map: dict[str, str],
parent_node: xml_utils.LxmlElement | None = None) -> xml_utils.LxmlElement:
"""Create etree Element form instance data."""
node = super().as_etree_node(q_name, ns_map, parent_node)
diff --git a/src/sdc11073/xml_types/basetypes.py b/src/sdc11073/xml_types/basetypes.py
index 5c96b617..d8a7e17c 100644
--- a/src/sdc11073/xml_types/basetypes.py
+++ b/src/sdc11073/xml_types/basetypes.py
@@ -6,7 +6,7 @@
from math import isclose
from typing import TYPE_CHECKING
-from lxml import etree as etree_
+from lxml import etree
from .xml_structure import NodeStringProperty, NodeTextListProperty
@@ -35,11 +35,11 @@ def __init__(self):
for _, prop in self.sorted_container_properties():
prop.init_instance_data(self)
- def as_etree_node(self, q_name: etree_.QName, ns_map: dict, parent_node: etree_.Element | None = None):
+ def as_etree_node(self, q_name: etree.QName, ns_map: dict, parent_node: etree.Element | None = None):
if parent_node is not None:
- node = etree_.SubElement(parent_node, q_name, nsmap=ns_map)
+ node = etree.SubElement(parent_node, q_name, nsmap=ns_map)
else:
- node = etree_.Element(q_name, nsmap=ns_map)
+ node = etree.Element(q_name, nsmap=ns_map)
self.update_node(node)
return node
diff --git a/src/sdc11073/xml_types/pm_types.py b/src/sdc11073/xml_types/pm_types.py
index 2332dd69..800edeb7 100644
--- a/src/sdc11073/xml_types/pm_types.py
+++ b/src/sdc11073/xml_types/pm_types.py
@@ -16,7 +16,7 @@
from .basetypes import StringEnum, XMLTypeBase
if TYPE_CHECKING:
- from lxml.etree import QName
+ from lxml import etree
from sdc11073 import xml_utils
from sdc11073.xml_types.isoduration import DateTypeUnion, DurationType
@@ -790,10 +790,10 @@ class ActivateOperationDescriptorArgument(PropertyBasedPMType):
"""Represents BICEPS AbstractSetStateOperationDescriptor/Argument."""
ArgName: CodedValue = cp.SubElementProperty(pm.ArgName, value_class=CodedValue, is_optional=False)
- Arg: QName | None = cp.NodeTextQNameProperty(pm.Arg, is_optional=False)
+ Arg: etree.QName | None = cp.NodeTextQNameProperty(pm.Arg, is_optional=False)
_props = ('ArgName', 'Arg')
- def __init__(self, arg_name: CodedValue | None = None, arg: QName | None = None):
+ def __init__(self, arg_name: CodedValue | None = None, arg: etree.QName | None = None):
super().__init__()
self.ArgName = arg_name
self.Arg = arg
@@ -1437,7 +1437,7 @@ def __init__(self, retrievability_info_list: list[RetrievabilityInfo] | None = N
_name_class_lookup = {c.NODETYPE: c for c in _classes}
-def _get_pmtypes_class(qname: QName) -> type[PropertyBasedPMType]:
+def _get_pmtypes_class(qname: etree.QName) -> type[PropertyBasedPMType]:
"""Find class in _name_class_lookup."""
try:
return _name_class_lookup[qname]
diff --git a/src/sdc11073/xml_types/xml_structure.py b/src/sdc11073/xml_types/xml_structure.py
index 05109396..fef292a7 100644
--- a/src/sdc11073/xml_types/xml_structure.py
+++ b/src/sdc11073/xml_types/xml_structure.py
@@ -14,7 +14,7 @@
from datetime import date, datetime
from typing import TYPE_CHECKING, Any, Callable
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.exceptions import ApiUsageError
from sdc11073.namespaces import QN_TYPE, docname_from_qname, text_to_qname
@@ -210,7 +210,7 @@ def __init__(self, attribute_name: str, # noqa: PLR0913
:param implied_py_value: see base class doc.
:param is_optional: see base class doc.
"""
- if isinstance(attribute_name, etree_.QName):
+ if isinstance(attribute_name, etree.QName):
local_var_name = f'_a_{attribute_name.localname}_{_NumberStack.unique_number()}'
else:
local_var_name = f'_a_{attribute_name.lower()}_{_NumberStack.unique_number()}'
@@ -248,7 +248,7 @@ def __str__(self) -> str:
class _ElementBase(_XmlStructureBaseProperty, ABC):
"""_ElementBase represents an XML Element."""
- def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None, # noqa: PLR0913
value_converter: DataConverterProtocol,
default_py_value: Any = None,
implied_py_value: Any = None,
@@ -271,7 +271,7 @@ def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
@staticmethod
def _get_element_by_child_name(node: xml_utils.LxmlElement,
- sub_element_name: etree_.QName | None,
+ sub_element_name: etree.QName | None,
create_missing_nodes: bool) -> xml_utils.LxmlElement:
if sub_element_name is None:
return node
@@ -279,7 +279,7 @@ def _get_element_by_child_name(node: xml_utils.LxmlElement,
if sub_node is None:
if not create_missing_nodes:
raise ElementNotFoundError(f'Element {sub_element_name} not found in {node.tag}')
- sub_node = etree_.SubElement(node, sub_element_name) # create this node
+ sub_node = etree.SubElement(node, sub_element_name) # create this node
return sub_node
def remove_sub_element(self, node: xml_utils.LxmlElement):
@@ -462,10 +462,10 @@ class QNameAttributeProperty(_AttributeBase):
"""
def __init__(self, attribute_name: str,
- default_py_value: etree_.QName | None = None,
- implied_py_value: etree_.QName | None = None,
+ default_py_value: etree.QName | None = None,
+ implied_py_value: etree.QName | None = None,
is_optional: bool = True):
- super().__init__(attribute_name, value_converter=ClassCheckConverter(etree_.QName),
+ super().__init__(attribute_name, value_converter=ClassCheckConverter(etree.QName),
default_py_value=default_py_value, implied_py_value=implied_py_value, is_optional=is_optional)
def get_py_value_from_node(self, instance: Any, # noqa: ARG002
@@ -597,7 +597,7 @@ class NodeTextProperty(_ElementBase):
Python representation depends on value converter.
"""
- def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None, # noqa: PLR0913
value_converter: DataConverterProtocol,
default_py_value: Any | None = None,
implied_py_value: Any | None = None,
@@ -656,7 +656,7 @@ class NodeStringProperty(NodeTextProperty):
if the xml element that should contain the text does not exist, the python value is None.
"""
- def __init__(self, sub_element_name: etree_.QName | None = None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None = None, # noqa: PLR0913
default_py_value: str | None = None,
implied_py_value: str | None = None,
is_optional: bool = False,
@@ -679,7 +679,7 @@ class NodeEnumTextProperty(NodeTextProperty):
Python representation is an enum.
"""
- def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None, # noqa: PLR0913
enum_cls: Any,
default_py_value: Any | None = None,
implied_py_value: Any | None = None,
@@ -695,7 +695,7 @@ class NodeEnumQNameProperty(NodeTextProperty):
Python representation is an Enum of QName, XML is prefix:localname.
"""
- def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None, # noqa: PLR0913
enum_cls: Any,
default_py_value: Any | None = None,
implied_py_value: Any | None = None,
@@ -710,7 +710,7 @@ def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) ->
sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=False)
prefix, localname = sub_node.text.split(':')
namespace = node.nsmap[prefix]
- q_name = etree_.QName(namespace, localname)
+ q_name = etree.QName(namespace, localname)
return self._converter.to_py(q_name)
except ElementNotFoundError:
return self._default_py_value
@@ -748,7 +748,7 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
class NodeIntProperty(NodeTextProperty):
"""Python representation is an int."""
- def __init__(self, sub_element_name: etree_.QName | None = None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None = None, # noqa: PLR0913
default_py_value: int | None = None,
implied_py_value: int | None = None,
is_optional: bool = False,
@@ -756,10 +756,11 @@ def __init__(self, sub_element_name: etree_.QName | None = None, # noqa: PLR091
super().__init__(sub_element_name, IntegerConverter, default_py_value, implied_py_value,
is_optional, min_length)
+
class NodeDecimalProperty(NodeTextProperty):
"""Python representation is an int."""
- def __init__(self, sub_element_name: etree_.QName | None = None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None = None, # noqa: PLR0913
default_py_value: Decimal | None = None,
implied_py_value: Decimal | None = None,
is_optional: bool = False,
@@ -767,10 +768,11 @@ def __init__(self, sub_element_name: etree_.QName | None = None, # noqa: PLR091
super().__init__(sub_element_name, DecimalConverter, default_py_value, implied_py_value,
is_optional, min_length)
+
class NodeDurationProperty(NodeTextProperty):
"""Python representation is an int."""
- def __init__(self, sub_element_name: etree_.QName | None = None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None = None, # noqa: PLR0913
default_py_value: isoduration.DurationType | None = None,
implied_py_value: isoduration.DurationType | None = None,
is_optional: bool = False,
@@ -782,10 +784,10 @@ def __init__(self, sub_element_name: etree_.QName | None = None, # noqa: PLR091
class NodeTextQNameProperty(_ElementBase):
"""The handled data is a single qualified name in the text of an element in the form prefix:localname."""
- def __init__(self, sub_element_name: etree_.QName | None,
- default_py_value: etree_.QName | None = None,
+ def __init__(self, sub_element_name: etree.QName | None,
+ default_py_value: etree.QName | None = None,
is_optional: bool = False):
- super().__init__(sub_element_name, ClassCheckConverter(etree_.QName), default_py_value,
+ super().__init__(sub_element_name, ClassCheckConverter(etree.QName), default_py_value,
is_optional=is_optional)
def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002
@@ -836,8 +838,8 @@ def _compare_extension(left: xml_utils.LxmlElement, right: xml_utils.LxmlElement
return False
# ignore comments
- left_children = [child for child in left if not isinstance(child, etree_._Comment)]
- right_children = [child for child in right if not isinstance(child, etree_._Comment)]
+ left_children = [child for child in left if not isinstance(child, etree._Comment)]
+ right_children = [child for child in right if not isinstance(child, etree._Comment)]
if len(left_children) != len(right_children): # compare children count
return False
@@ -848,7 +850,6 @@ def _compare_extension(left: xml_utils.LxmlElement, right: xml_utils.LxmlElement
class ExtensionLocalValue(list[xml_utils.LxmlElement]):
-
compare_method: Callable[[xml_utils.LxmlElement, xml_utils.LxmlElement], bool] = _compare_extension
"""may be overwritten by user if a custom comparison behaviour is required"""
@@ -856,7 +857,7 @@ def __eq__(self, other: Sequence) -> bool:
try:
if len(self) != len(other):
return False
- except TypeError: # len of other cannot be determined
+ except TypeError: # len of other cannot be determined
return False
return all(self.__class__.compare_method(left, right) for left, right in zip(self, other))
@@ -870,7 +871,7 @@ class ExtensionNodeProperty(_ElementBase):
The python representation is an ExtensionLocalValue with list of elements.
"""
- def __init__(self, sub_element_name: etree_.QName | None, default_py_value: Any | None = None):
+ def __init__(self, sub_element_name: etree.QName | None, default_py_value: Any | None = None):
super().__init__(sub_element_name, ClassCheckConverter(ExtensionLocalValue), default_py_value,
is_optional=True)
@@ -918,7 +919,7 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
class AnyEtreeNodeProperty(_ElementBase):
"""Represents an Element that contains xml tree of any kind."""
- def __init__(self, sub_element_name: etree_.QName | None, is_optional: bool = False):
+ def __init__(self, sub_element_name: etree.QName | None, is_optional: bool = False):
super().__init__(sub_element_name, NullConverter, default_py_value=None,
is_optional=is_optional)
@@ -946,7 +947,7 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
raise ValueError(f'mandatory value {self._sub_element_name} missing')
else:
sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=True)
- if isinstance(py_value, etree_._Element): # noqa: SLF001
+ if isinstance(py_value, etree._Element): # noqa: SLF001
sub_node.append(py_value)
else:
sub_node.extend(py_value)
@@ -955,7 +956,7 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
class SubElementProperty(_ElementBase):
"""Uses a value that has an "as_etree_node" method."""
- def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None, # noqa: PLR0913
value_class: type[XMLTypeBase],
default_py_value: Any | None = None,
implied_py_value: Any | None = None,
@@ -986,7 +987,7 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
if not self.is_optional:
if MANDATORY_VALUE_CHECKING and not self.is_optional:
raise ValueError(f'mandatory value {self._sub_element_name} missing')
- etree_.SubElement(node, self._sub_element_name, nsmap=node.nsmap)
+ etree.SubElement(node, self._sub_element_name, nsmap=node.nsmap)
else:
sub_node = py_value.as_etree_node(self._sub_element_name, node.nsmap, node)
if hasattr(py_value, 'NODETYPE') and hasattr(self.value_class, 'NODETYPE') \
@@ -998,9 +999,9 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
class ContainerProperty(_ElementBase):
"""ContainerProperty supports xsi:type information from xml and instantiates value accordingly."""
- def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None, # noqa: PLR0913
value_class: type[ContainerBase],
- cls_getter: Callable[[etree_.QName], type],
+ cls_getter: Callable[[etree.QName], type],
ns_helper: NamespaceHelper,
is_optional: bool = False):
"""Construct a ContainerProperty.
@@ -1043,7 +1044,7 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
if not self.is_optional:
if MANDATORY_VALUE_CHECKING and not self.is_optional:
raise ValueError(f'mandatory value {self._sub_element_name} missing')
- etree_.SubElement(node, self._sub_element_name, nsmap=node.nsmap)
+ etree.SubElement(node, self._sub_element_name, nsmap=node.nsmap)
else:
self.remove_sub_element(node)
sub_node = py_value.mk_node(self._sub_element_name, self._ns_helper, node)
@@ -1090,7 +1091,7 @@ class SubElementListProperty(_ElementListProperty):
Used if maxOccurs="Unbounded" in BICEPS_ParticipantModel.
"""
- def __init__(self, sub_element_name: etree_.QName | None,
+ def __init__(self, sub_element_name: etree.QName | None,
value_class: type[XMLTypeBase],
is_optional: bool = True):
super().__init__(sub_element_name, ListConverter(ClassCheckConverter(value_class)), is_optional=is_optional)
@@ -1134,9 +1135,9 @@ class ContainerListProperty(_ElementListProperty):
Used if maxOccurs="Unbounded" in BICEPS_ParticipantModel.
"""
- def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913
+ def __init__(self, sub_element_name: etree.QName | None, # noqa: PLR0913
value_class: type[ContainerBase],
- cls_getter: Callable[[etree_.QName], type],
+ cls_getter: Callable[[etree.QName], type],
ns_helper: NamespaceHelper,
is_optional: bool = True):
"""Construct a list of Containers.
@@ -1198,7 +1199,7 @@ class SubElementTextListProperty(_ElementListProperty):
On xml side every string is a text of a sub element.
"""
- def __init__(self, sub_element_name: etree_.QName | None,
+ def __init__(self, sub_element_name: etree.QName | None,
value_class: Any,
is_optional: bool = True):
super().__init__(sub_element_name, ListConverter(ClassCheckConverter(value_class)), is_optional=is_optional)
@@ -1229,7 +1230,7 @@ def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement):
node.remove(_node)
# ... and create new ones
for val in py_value:
- child = etree_.SubElement(node, self._sub_element_name)
+ child = etree.SubElement(node, self._sub_element_name)
try:
child.text = val
except TypeError as ex:
@@ -1246,7 +1247,7 @@ class SubElementStringListProperty(SubElementTextListProperty):
On xml side every string is a text of a sub element.
"""
- def __init__(self, sub_element_name: etree_.QName | None,
+ def __init__(self, sub_element_name: etree.QName | None,
is_optional: bool = True):
super().__init__(sub_element_name, str, is_optional=is_optional)
@@ -1261,7 +1262,7 @@ class SubElementWithSubElementListProperty(SubElementProperty):
value_class must have an is_empty method.
"""
- def __init__(self, sub_element_name: etree_.QName | None,
+ def __init__(self, sub_element_name: etree.QName | None,
default_py_value: Any,
value_class: type[XMLTypeBase]):
assert hasattr(value_class, 'is_empty')
@@ -1291,7 +1292,7 @@ def __set__(self, instance: Any, py_value: Any):
class AnyEtreeNodeListProperty(_ElementListProperty):
"""class represents a list of lxml elements."""
- def __init__(self, sub_element_name: etree_.QName | None, is_optional: bool = True):
+ def __init__(self, sub_element_name: etree.QName | None, is_optional: bool = True):
super().__init__(sub_element_name,
ListConverter(ClassCheckConverter(xml_utils.LxmlElement)),
is_optional=is_optional)
@@ -1329,7 +1330,7 @@ def __str__(self) -> str:
class NodeTextListProperty(_ElementListProperty):
"""The handled data is a list of words (string without whitespace). The xml text is the joined list of words."""
- def __init__(self, sub_element_name: etree_.QName | None,
+ def __init__(self, sub_element_name: etree.QName | None,
value_class: Any,
is_optional: bool = False):
super().__init__(sub_element_name, ListConverter(ClassCheckConverter(value_class)),
@@ -1380,9 +1381,9 @@ class NodeTextQNameListProperty(_ElementListProperty):
The xml text is the joined list of qnames in the form prefix:localname.
"""
- def __init__(self, sub_element_name: etree_.QName | None,
+ def __init__(self, sub_element_name: etree.QName | None,
is_optional: bool = False):
- super().__init__(sub_element_name, ListConverter(ClassCheckConverter(etree_.QName)),
+ super().__init__(sub_element_name, ListConverter(ClassCheckConverter(etree.QName)),
is_optional=is_optional)
def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002
@@ -1452,7 +1453,7 @@ class DateOfBirthProperty(_ElementBase):
or datetime.Datetime (with time point attribute).
"""
- def __init__(self, sub_element_name: etree_.QName | None,
+ def __init__(self, sub_element_name: etree.QName | None,
default_py_value: Any = None,
implied_py_value: Any = None,
is_optional: bool = True):
diff --git a/src/sdc11073/xml_utils.py b/src/sdc11073/xml_utils.py
index f86a7043..ca43f2f1 100644
--- a/src/sdc11073/xml_utils.py
+++ b/src/sdc11073/xml_utils.py
@@ -4,16 +4,16 @@
import sys
from typing import Callable
-from lxml.etree import Element, _Element
+from lxml import etree
if sys.version_info >= (3, 10):
from typing import TypeAlias
- LxmlElement: TypeAlias = _Element
+ LxmlElement: TypeAlias = etree._Element
else:
from typing_extensions import TypeAlias
- LxmlElement: TypeAlias = _Element
+ LxmlElement: TypeAlias = etree._Element
def copy_element(node: LxmlElement, method: Callable[[LxmlElement], LxmlElement] = copy.deepcopy) -> LxmlElement:
@@ -52,7 +52,7 @@ def copy_node_wo_parent(node: LxmlElement, method: Callable[[LxmlElement], LxmlE
:param method: method that copies an etree element
:return: new node
"""
- new_node = Element(node.tag, attrib=node.attrib, nsmap=node.nsmap)
+ new_node = etree.Element(node.tag, attrib=node.attrib, nsmap=node.nsmap)
new_node.text = node.text
new_node.tail = node.tail
new_node.extend(method(child) for child in node)
diff --git a/tests/mockstuff.py b/tests/mockstuff.py
index a81b09a1..6d2cef7e 100644
--- a/tests/mockstuff.py
+++ b/tests/mockstuff.py
@@ -7,7 +7,7 @@
from decimal import Decimal
from typing import TYPE_CHECKING
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.mdib import ProviderMdib
from sdc11073.namespaces import default_ns_helper as ns_hlp
@@ -68,8 +68,8 @@ class TestDevSubscription(BicepsSubscription):
def __init__(self, filter_,
soap_client_pool: SoapClientPool,
msg_factory):
- notify_ref_node = etree_.Element(ns_hlp.WSE.tag('References'))
- identNode = etree_.SubElement(notify_ref_node, ns_hlp.WSE.tag('Identifier'))
+ notify_ref_node = etree.Element(ns_hlp.WSE.tag('References'))
+ identNode = etree.SubElement(notify_ref_node, ns_hlp.WSE.tag('Identifier'))
identNode.text = self.notify_ref
base_urls = [SplitResult('https', 'www.example.com:222', 'no_uuid', query=None, fragment=None)]
accepted_encodings = ['foo'] # not needed here
@@ -80,8 +80,8 @@ def __init__(self, filter_,
subscribe_request.Delivery.NotifyTo.Mode = self.mode
max_subscription_duration = 42
subscr_mgr = None
- super().__init__(subscr_mgr, subscribe_request, accepted_encodings, base_urls, max_subscription_duration, soap_client_pool,
- msg_factory=msg_factory, log_prefix='test')
+ super().__init__(subscr_mgr, subscribe_request, accepted_encodings, base_urls, max_subscription_duration,
+ soap_client_pool, msg_factory=msg_factory, log_prefix='test')
self.reports = []
def send_notification_report(self, body_node, action: str):
@@ -139,7 +139,7 @@ def __init__(self, wsdiscovery: WsDiscoveryProtocol,
mdsDescriptor.MetaData.ModelNumber = '0.99'
super().__init__(wsdiscovery, model, device, device_mdib_container, epr, validate,
ssl_context_container=ssl_context_container,
- max_subscription_duration = max_subscription_duration,
+ max_subscription_duration=max_subscription_duration,
log_prefix=log_prefix,
default_components=default_components,
specific_components=specific_components,
@@ -151,7 +151,7 @@ def from_mdib_file(cls,
wsdiscovery: WsDiscoveryProtocol,
epr: str | uuid.UUID | None,
mdib_xml_path: str | pathlib.Path,
- validate: bool =True,
+ validate: bool = True,
ssl_context_container: sdc11073.certloader.SSLContextContainer | None = None,
max_subscription_duration: int = 15,
log_prefix: str = '',
@@ -164,7 +164,7 @@ def from_mdib_file(cls,
if not mdib_xml_path.is_absolute():
mdib_xml_path = pathlib.Path(__file__).parent.joinpath(mdib_xml_path)
return cls(wsdiscovery, mdib_xml_path.read_bytes(), epr, validate, ssl_context_container,
- max_subscription_duration = max_subscription_duration,
+ max_subscription_duration=max_subscription_duration,
log_prefix=log_prefix,
default_components=default_components, specific_components=specific_components,
chunk_size=chunk_size,
diff --git a/tests/test_client_device.py b/tests/test_client_device.py
index d42a788d..d138648e 100644
--- a/tests/test_client_device.py
+++ b/tests/test_client_device.py
@@ -14,7 +14,7 @@
from itertools import product
from http.client import NotConnected
from threading import Event
-from lxml import etree as etree_
+from lxml import etree
import sdc11073.certloader
import sdc11073.definitions_sdc
@@ -466,6 +466,7 @@ def _read_text():
# this is a comment
secret_ciphers_string
ignored"""
+
read_text_mock.side_effect = _read_text
sdc11073.certloader.mk_ssl_contexts_from_folder(ca_folder=unittest.mock.MagicMock(), cyphers_file='lorem')
mk_ssl_contexts_mock.assert_called_once()
@@ -616,7 +617,6 @@ def test_client_stop(self):
for s in all_subscriptions:
self.assertIsNotNone(s.unsubscribed_at)
-
def test_device_stop(self):
"""Verify that sockets get closed."""
cl_mdib = ConsumerMdib(self.sdc_client)
@@ -725,7 +725,7 @@ def test_get_md_description_parameters(self):
cl_get_service = self.sdc_client.client('Get')
message_data = cl_get_service.get_md_description(['not_existing_handle'])
node = message_data.p_msg.msg_node
- print(etree_.tostring(node, pretty_print=True))
+ print(etree.tostring(node, pretty_print=True))
descriptors = list(node[0]) # that is /m:GetMdDescriptionResponse/m:MdDescription/*
self.assertEqual(len(descriptors), 0)
existing_handle = '0x34F05500'
@@ -1259,7 +1259,6 @@ def test_is_connected_friendly(self):
self.assertFalse(self.sdc_client.is_connected)
self.sdc_client.stop_all(unsubscribe=False) # without unsubscribe, is faster and would make no sense anyway
-
def test_invalid_request(self):
self.log_watcher.setPaused(True)
self.sdc_client.get_service_client._validate = False # want to send an invalid request
@@ -1279,8 +1278,8 @@ def test_invalid_request(self):
else:
self.fail('HTTPReturnCodeError not raised')
- def _mk_get_method_message(self, addr_to, action: str, method: etree_.QName, params=None) -> CreatedMessage:
- get_node = etree_.Element(method)
+ def _mk_get_method_message(self, addr_to, action: str, method: etree.QName, params=None) -> CreatedMessage:
+ get_node = etree.Element(method)
soap_envelope = Soap12Envelope(default_ns_helper.partial_map(default_ns_helper.MSG))
soap_envelope.set_header_info_block(HeaderInformationBlock(action=action, addr_to=addr_to))
if params:
@@ -1291,6 +1290,7 @@ def _mk_get_method_message(self, addr_to, action: str, method: etree_.QName, par
def test_extension(self):
"""Verify that all Extension Elements of descriptors are identical on provider and consumer."""
+
def are_equivalent(node1, node2):
if node1.tag != node2.tag or node1.attrib != node2.attrib or node1.text != node2.text:
return False
@@ -1328,6 +1328,7 @@ def test_sequence_id_change(self):
cl_mdib = ConsumerMdib(self.sdc_client)
cl_mdib.init_mdib()
ev = Event()
+
def on_mdib_id_change(flag: bool):
"""This is a typical handler for changed sequence id or instance id"""
self.logger.info('new sequence_id or instance id')
@@ -1685,10 +1686,11 @@ def setUp(self):
self.sdc_device = SomeDevice.from_mdib_file(self.wsd, None, mdib_70041,
default_components=default_sdc_provider_components_async,
max_subscription_duration=10) # shorter duration for faster tests
- self.sdc_device_ssl = SomeDevice.from_mdib_file(self.wsd, None, mdib_70041,
- default_components=default_sdc_provider_components_async,
- max_subscription_duration=10, # shorter duration for faster tests
- ssl_context_container=self.ssl_context_container)
+ self.sdc_device_ssl = SomeDevice.from_mdib_file(
+ self.wsd, None, mdib_70041,
+ default_components=default_sdc_provider_components_async,
+ max_subscription_duration=10, # shorter duration for faster tests
+ ssl_context_container=self.ssl_context_container)
self.sdc_device.start_all()
self._loc_validators = [pm_types.InstanceIdentifier('Validator', extension_string='System')]
@@ -1753,7 +1755,6 @@ def test_basic_connect(self):
force_ssl_connect=False)
self.assertRaises(NotConnected, consumer.start_all)
-
# verify that connect with certificates works
consumer = SdcConsumer(x_addr,
self.sdc_device.mdib.sdc_definitions,
@@ -1763,6 +1764,7 @@ def test_basic_connect(self):
self.assertTrue(consumer.is_connected)
self.assertTrue(consumer.is_ssl_connection)
+
class TestQualifiedName(unittest.TestCase):
"""Check usage of full qualified name in device and client."""
@@ -1773,10 +1775,11 @@ def setUp(self):
self.wsd = WSDiscovery('127.0.0.1')
self.wsd.start()
- self.sdc_device = SomeDevice.from_mdib_file(self.wsd, None, mdib_70041,
- default_components=default_sdc_provider_components_async,
- max_subscription_duration=10,
- alternative_hostname=socket.getfqdn()) # shorter duration for faster tests
+ self.sdc_device = SomeDevice.from_mdib_file(
+ self.wsd, None, mdib_70041,
+ default_components=default_sdc_provider_components_async,
+ max_subscription_duration=10,
+ alternative_hostname=socket.getfqdn()) # shorter duration for faster tests
self.sdc_device.start_all()
self._loc_validators = [pm_types.InstanceIdentifier('Validator', extension_string='System')]
self.sdc_device.set_location(utils.random_location(), self._loc_validators)
@@ -1808,7 +1811,8 @@ def test_basic_connect(self):
try:
consumer.start_all()
self.assertTrue(consumer.is_connected)
- for subscription in self.sdc_device._hosted_service_dispatcher._instances['StateEvent'].subscriptions_manager._subscriptions._objects:
+ for subscription in (self.sdc_device._hosted_service_dispatcher._instances['StateEvent']
+ .subscriptions_manager._subscriptions._objects):
# now make sure that the subscription also uses no IP
self.assertIn(socket.getfqdn(), subscription.notify_to_address)
if subscription.end_to_address:
@@ -1827,10 +1831,11 @@ def setUp(self):
self.wsd = WSDiscovery('127.0.0.1')
self.wsd.start()
- self.sdc_device = SomeDevice.from_mdib_file(self.wsd, None, mdib_70041,
- default_components=default_sdc_provider_components_async,
- max_subscription_duration=10,
- alternative_hostname="some_random_invalid_hostname") # shorter duration for faster tests
+ self.sdc_device = SomeDevice.from_mdib_file(
+ self.wsd, None, mdib_70041,
+ default_components=default_sdc_provider_components_async,
+ max_subscription_duration=10,
+ alternative_hostname="some_random_invalid_hostname") # shorter duration for faster tests
self.sdc_device.start_all()
self._loc_validators = [pm_types.InstanceIdentifier('Validator', extension_string='System')]
self.sdc_device.set_location(utils.random_location(), self._loc_validators)
diff --git a/tests/test_client_waveform.py b/tests/test_client_waveform.py
index b9321157..d2e6e269 100644
--- a/tests/test_client_waveform.py
+++ b/tests/test_client_waveform.py
@@ -2,7 +2,7 @@
import sys
import unittest
-from lxml import etree as etree_
+from lxml import etree
from sdc11073 import definitions_sdc, loghelper
from sdc11073.consumer import SdcConsumer
@@ -140,16 +140,15 @@ def test_stream_handling(self):
client_mdib = ConsumerMdib(cl)
client_mdib._xtra.bind_to_client_observables()
- client_mdib._state = ConsumerMdibState.initialized # fake it, because we do not call init_mdib()
+ client_mdib._state = ConsumerMdibState.initialized # fake it, because we do not call init_mdib()
client_mdib.MDIB_VERSION_CHECK_DISABLED = True # we have no mdib version incrementing in this test, therefore disable check
# create dummy descriptors
for handle in HANDLES:
attributes = {'SamplePeriod': 'P0Y0M0DT0H0M0.0157S', # use a unique sample period
- etree_.QName(ns_hlp.ns_map['xsi'],
- 'type'): 'dom:RealTimeSampleArrayMetricDescriptor',
+ etree.QName(ns_hlp.ns_map['xsi'], 'type'): 'dom:RealTimeSampleArrayMetricDescriptor',
'Handle': handle,
'DescriptorVersion': '2'}
- element = etree_.Element('Metric', attrib=attributes, nsmap=ns_hlp.ns_map)
+ element = etree.Element('Metric', attrib=attributes, nsmap=ns_hlp.ns_map)
descr = RealTimeSampleArrayMetricDescriptorContainer.from_node(element, None) # None = no parent handle
client_mdib.descriptions.add_object(descr)
state = RealTimeSampleArrayMetricStateContainer(descr)
diff --git a/tests/test_containerproperties.py b/tests/test_containerproperties.py
index 6180168d..069eb47d 100644
--- a/tests/test_containerproperties.py
+++ b/tests/test_containerproperties.py
@@ -3,7 +3,7 @@
from decimal import Decimal
from enum import Enum
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.xml_types.isoduration import UTC
from sdc11073.xml_types.xml_structure import DateOfBirthProperty as DoB
@@ -33,7 +33,7 @@ def props(self):
return []
def mk_node(self):
- node = etree_.Element('test')
+ node = etree.Element('test')
for prop in self.props():
prop.update_xml_value(self, node)
return node
@@ -59,8 +59,8 @@ def props(self):
class DummyNodeText(DummyBase):
- node_text_mand = NodeStringProperty(etree_.QName('pref', 'node_text_mand'), default_py_value='foo', min_length=1)
- node_text_opt = NodeStringProperty(etree_.QName('pref', 'node_text_opt'), implied_py_value='bar', is_optional=True)
+ node_text_mand = NodeStringProperty(etree.QName('pref', 'node_text_mand'), default_py_value='foo', min_length=1)
+ node_text_opt = NodeStringProperty(etree.QName('pref', 'node_text_opt'), implied_py_value='bar', is_optional=True)
def props(self):
yield self.__class__.node_text_mand
@@ -68,10 +68,10 @@ def props(self):
class DummyNodeEnumText(DummyBase):
- #node_text_mand = NodeEnumTextProperty(MyEnum, etree_.QName('pref', 'node_text_mand'), implied_py_value=MyEnum.a)
- node_text_mand = NodeEnumTextProperty(etree_.QName('pref', 'node_text_mand'), MyEnum, default_py_value=MyEnum.a)
- node_text_opt = NodeEnumTextProperty(etree_.QName('pref', 'node_text_opt'), MyEnum, is_optional=True)
- node_text_mand_no_default = NodeEnumTextProperty(etree_.QName('pref', 'node_text_mand_no_default'), MyEnum)
+ # node_text_mand = NodeEnumTextProperty(MyEnum, etree.QName('pref', 'node_text_mand'), implied_py_value=MyEnum.a)
+ node_text_mand = NodeEnumTextProperty(etree.QName('pref', 'node_text_mand'), MyEnum, default_py_value=MyEnum.a)
+ node_text_opt = NodeEnumTextProperty(etree.QName('pref', 'node_text_opt'), MyEnum, is_optional=True)
+ node_text_mand_no_default = NodeEnumTextProperty(etree.QName('pref', 'node_text_mand_no_default'), MyEnum)
def props(self):
yield self.__class__.node_text_mand
@@ -80,10 +80,10 @@ def props(self):
class DummySubElement(DummyBase):
- sub_elem_mand = SubElementProperty(etree_.QName('pref', 'sub_elem_mand'),
+ sub_elem_mand = SubElementProperty(etree.QName('pref', 'sub_elem_mand'),
value_class=CodedValue,
default_py_value=CodedValue('foo'))
- sub_elem_opt = SubElementProperty(etree_.QName('pref', 'sub_elem_opt'),
+ sub_elem_opt = SubElementProperty(etree.QName('pref', 'sub_elem_opt'),
value_class=CodedValue,
is_optional=True)
@@ -93,7 +93,7 @@ def props(self):
class DummySubElementList(DummyBase):
- sub_elem = SubElementWithSubElementListProperty(etree_.QName('pref', 'sub_elem'),
+ sub_elem = SubElementWithSubElementListProperty(etree.QName('pref', 'sub_elem'),
default_py_value=AllowedValuesType(),
value_class=AllowedValuesType)
@@ -279,7 +279,8 @@ def test_NodeTextProperty(self):
self.assertEqual(DummyNodeText.node_text_mand.get_actual_value(dummy), 'foo')
self.assertEqual(DummyNodeText.node_text_opt.get_actual_value(dummy), None)
dummy.node_text_mand = None
- self.assertRaises(ValueError, dummy.mk_node) # implied value does not help here, we need a real value for mand. prop.
+ self.assertRaises(ValueError,
+ dummy.mk_node) # implied value does not help here, we need a real value for mand. prop.
dummy.node_text_mand = 'foo'
node = dummy.mk_node()
self.assertEqual(1, len(node)) # the empty optional node is not added, only the mandatory one
@@ -309,13 +310,13 @@ def test_NodeTextProperty(self):
def test_DummyNodeEnumText(self):
dummy = DummyNodeEnumText()
- #verify that mandatory element without value raises a ValueError
+ # verify that mandatory element without value raises a ValueError
self.assertRaises(ValueError, dummy.mk_node)
dummy.node_text_mand_no_default = MyEnum.c
node = dummy.mk_node()
self.assertEqual(dummy.node_text_mand, MyEnum.a)
self.assertEqual(dummy.node_text_opt, None)
- self.assertEqual(DummyNodeEnumText.node_text_mand.get_actual_value(dummy), MyEnum.a)
+ self.assertEqual(DummyNodeEnumText.node_text_mand.get_actual_value(dummy), MyEnum.a)
self.assertEqual(DummyNodeEnumText.node_text_opt.get_actual_value(dummy), None)
node = dummy.mk_node()
self.assertEqual(2, len(node)) # the empty optional node is not added, only the mandatory ones
@@ -345,7 +346,6 @@ def test_DummyNodeEnumText(self):
else:
raise Exception(f'dummy.node_text_mand = {value} did not raise ValueError!')
-
def test_ExtensionNodeProperty(self):
# ToDo:implement
pass
@@ -359,7 +359,7 @@ def test_SubElementProperty(self):
self.assertEqual(DummySubElement.sub_elem_opt.get_actual_value(dummy), None)
dummy.sub_elem_mand = None
- self.assertRaises(ValueError, dummy.mk_node) # mand. prop has no value
+ self.assertRaises(ValueError, dummy.mk_node) # mand. prop has no value
dummy.sub_elem_mand = CodedValue('hello_again')
node = dummy.mk_node()
diff --git a/tests/test_descriptorcontainers.py b/tests/test_descriptorcontainers.py
index e7df1c67..a8f46f0e 100644
--- a/tests/test_descriptorcontainers.py
+++ b/tests/test_descriptorcontainers.py
@@ -1,17 +1,19 @@
import unittest
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.namespaces import default_ns_helper as ns_hlp
from sdc11073.xml_types import pm_types, msg_qnames as msg
from sdc11073.mdib import descriptorcontainers
from tests.mockstuff import dec_list
+
test_tag = ns_hlp.PM.tag('MyDescriptor')
+
class TestDescriptorContainers(unittest.TestCase):
def setUp(self):
- self.ns_mapper =ns_hlp
+ self.ns_mapper = ns_hlp
def test_AbstractDescriptorContainer(self):
dc = descriptorcontainers.AbstractDescriptorContainer(handle='123', parent_handle='456')
@@ -35,15 +37,15 @@ def test_AbstractDescriptorContainer(self):
dc.SafetyClassification = pm_types.SafetyClassification.MED_A
dc.Type = pm_types.CodedValue('abc', 'def')
- ext_node = etree_.Element(ns_hlp.MSG.tag('Whatever'))
- etree_.SubElement(ext_node, 'foo', attrib={'some_attr': 'some_value'})
- etree_.SubElement(ext_node, 'bar', attrib={'another_attr': 'different_value'})
+ ext_node = etree.Element(ns_hlp.MSG.tag('Whatever'))
+ etree.SubElement(ext_node, 'foo', attrib={'some_attr': 'some_value'})
+ etree.SubElement(ext_node, 'bar', attrib={'another_attr': 'different_value'})
dc.Extension.append(ext_node)
retrievability = pm_types.Retrievability([pm_types.RetrievabilityInfo(pm_types.RetrievabilityMethod.GET),
- pm_types.RetrievabilityInfo(pm_types.RetrievabilityMethod.PERIODIC,
- update_period=42.0),
- ],
- )
+ pm_types.RetrievabilityInfo(pm_types.RetrievabilityMethod.PERIODIC,
+ update_period=42.0),
+ ],
+ )
dc.Extension.append(retrievability.as_etree_node(msg.Retrievability, {}))
dc2.update_from_other_container(dc)
@@ -66,8 +68,6 @@ def test_AbstractDescriptorContainer(self):
self.assertEqual(dc3.Extension[0].tag, ext_node.tag)
self.assertEqual(dc3.get_retrievability(), [retrievability])
-
-
def test_AbstractMetricDescriptorContainer(self):
dc = descriptorcontainers.AbstractMetricDescriptorContainer(handle='123', parent_handle='456')
self.assertEqual(dc.MetricAvailability, pm_types.MetricAvailability.CONTINUOUS) # the default value
@@ -182,14 +182,14 @@ def _cmp_ActivateOperationDescriptorContainer(_dc, _dc2):
self.assertEqual(_dc.Retriggerable, _dc2.Retriggerable)
dc = descriptorcontainers.ActivateOperationDescriptorContainer(handle='123', parent_handle='456')
- dc.OperationTarget= 'my_handle'
+ dc.OperationTarget = 'my_handle'
# create copy with default values
node = dc.mk_node(test_tag, self.ns_mapper)
dc2 = descriptorcontainers.ActivateOperationDescriptorContainer.from_node(node=node, parent_handle='456')
_cmp_ActivateOperationDescriptorContainer(dc, dc2)
dc.Argument = [pm_types.ActivateOperationDescriptorArgument(arg_name=pm_types.CodedValue('abc', 'def'),
- arg=ns_hlp.PM.tag('foo'))]
+ arg=ns_hlp.PM.tag('foo'))]
dc2.update_from_other_container(dc)
_cmp_ActivateOperationDescriptorContainer(dc, dc2)
diff --git a/tests/test_device_services.py b/tests/test_device_services.py
index c076b7c2..bbc054f1 100644
--- a/tests/test_device_services.py
+++ b/tests/test_device_services.py
@@ -1,6 +1,6 @@
import unittest
-from lxml import etree as etree_
+from lxml import etree
from sdc11073.definitions_sdc import SdcV1Definitions
from sdc11073.dispatch.request import RequestData
from sdc11073.loghelper import basic_logging_setup
@@ -44,7 +44,7 @@ def _mk_get_request(self, sdc_device, port_type, method, path) -> CreatedMessage
nsm = self.sdc_device.mdib.nsmapper # shortcut
action = '{}/{}/{}'.format(name_space, port_type, method)
- body_node = etree_.Element(nsm.MSG.tag(method))
+ body_node = etree.Element(nsm.MSG.tag(method))
soap_envelope = Soap12Envelope(nsm.partial_map(nsm.S12, nsm.WSA, nsm.MSG))
soap_envelope.set_header_info_block(HeaderInformationBlock(action=action, addr_to=path))
soap_envelope.payload_element = body_node
@@ -166,7 +166,7 @@ def test_wsdl(self):
dev = self.sdc_device
_ns = dev.mdib.nsmapper # shortcut
for hosted in dev.hosted_services.dpws_hosted_services.values():
- wsdl = etree_.fromstring(hosted._wsdl_string)
+ wsdl = etree.fromstring(hosted._wsdl_string)
inputs = wsdl.xpath(f'//{_ns.WSDL.doc_name("input")}', namespaces=_ns.ns_map)
outputs = wsdl.xpath(f'//{_ns.WSDL.doc_name("output")}', namespaces=_ns.ns_map)
self.assertGreater(len(inputs), 0)
diff --git a/tests/test_mdib.py b/tests/test_mdib.py
index 16654fe4..febdbacc 100644
--- a/tests/test_mdib.py
+++ b/tests/test_mdib.py
@@ -1,7 +1,7 @@
import os
import unittest
from dataclasses import dataclass
-from lxml.etree import QName
+from lxml import etree
from sdc11073.xml_types import pm_qnames as pm
from sdc11073.exceptions import ApiUsageError
from sdc11073.mdib import ProviderMdib
@@ -136,7 +136,7 @@ def test_activate_operation_argument(self):
@dataclass
class TestData:
mdib_text: str
- expected_qname: QName
+ expected_qname: etree.QName
mdib_dummy = """
"""
- node = fromstring(text.format('')) # noqa: S320
+ node = etree.fromstring(text.format('')) # noqa: S320
allowed_value1 = pm_types.AllowedValue.from_node(node)
self.assertEqual(allowed_value1.Value, '')
- generated_node = allowed_value1.as_etree_node(QName('foo', 'bar'), {})
+ generated_node = allowed_value1.as_etree_node(etree.QName('foo', 'bar'), {})
self.assertEqual('', generated_node[0].text)
- node = fromstring(text.format('foobar')) # noqa: S320
+ node = etree.fromstring(text.format('foobar')) # noqa: S320
allowed_value2 = pm_types.AllowedValue.from_node(node)
self.assertEqual(allowed_value2.Value, 'foobar')
@@ -63,13 +63,13 @@ def test_activate_operation_descriptor_argument(self):
dd:Something
"""
- node = fromstring(text.format('')) # noqa: S320
+ node = etree.fromstring(text.format('')) # noqa: S320
arg = pm_types.ActivateOperationDescriptorArgument.from_node(node)
self.assertEqual(arg.ArgName, pm_types.CodedValue("202890"))
- self.assertEqual(arg.Arg, QName("dummy", "Something"))
+ self.assertEqual(arg.Arg, etree.QName("dummy", "Something"))
# verify that as_etree_node -> from_node conversion creates an identical arg
node2 = arg.as_etree_node(
- QName("http://standards.ieee.org/downloads/11073/11073-10207-2017/participant", 'Argument'),
+ etree.QName("http://standards.ieee.org/downloads/11073/11073-10207-2017/participant", 'Argument'),
ns_map={"pm": "http://standards.ieee.org/downloads/11073/11073-10207-2017/participant"})
arg2 = pm_types.ActivateOperationDescriptorArgument.from_node(node2)
self.assertEqual(arg, arg2)
@@ -89,9 +89,9 @@ def test_compare_extensions(self):
"""
- self.assertNotEqual(fromstring(xml), fromstring(xml)) # noqa: S320
- inst1 = pm_types.InstanceIdentifier.from_node(fromstring(xml)) # noqa: S320
- inst2 = pm_types.InstanceIdentifier.from_node(fromstring(xml)) # noqa: S320
+ self.assertNotEqual(etree.fromstring(xml), etree.fromstring(xml)) # noqa: S320
+ inst1 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml)) # noqa: S320
+ inst2 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml)) # noqa: S320
self.assertEqual(inst1.ExtExtension, inst2.ExtExtension)
self.assertEqual(inst1, inst2)
self.assertEqual(inst1.ExtExtension, tuple(inst2.ExtExtension))
@@ -106,7 +106,7 @@ def test_compare_extensions(self):
"""
- inst2 = pm_types.InstanceIdentifier.from_node(fromstring(another_xml)) # noqa: S320
+ inst2 = pm_types.InstanceIdentifier.from_node(etree.fromstring(another_xml)) # noqa: S320
self.assertNotEqual(inst1.ExtExtension, inst2.ExtExtension)
self.assertNotEqual(inst1, inst2)
@@ -116,7 +116,7 @@ def test_compare_extension_with_other_types(self):
"""
- xml1 = fromstring(xml1) # noqa: S320
+ xml1 = etree.fromstring(xml1) # noqa: S320
inst1 = xml_structure.ExtensionLocalValue([xml1])
self.assertFalse(inst1 == 42)
@@ -143,8 +143,8 @@ def test_element_order(self):
"""
- inst1 = pm_types.InstanceIdentifier.from_node(fromstring(xml1)) # noqa: S320
- inst2 = pm_types.InstanceIdentifier.from_node(fromstring(xml2)) # noqa: S320
+ inst1 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml1)) # noqa: S320
+ inst2 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml2)) # noqa: S320
self.assertNotEqual(inst1.ExtExtension, inst2.ExtExtension)
self.assertNotEqual(inst1, inst2)
@@ -167,43 +167,43 @@ def test_attribute_order(self):
"""
- inst1 = pm_types.InstanceIdentifier.from_node(fromstring(xml1)) # noqa: S320
- inst2 = pm_types.InstanceIdentifier.from_node(fromstring(xml2)) # noqa: S320
+ inst1 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml1)) # noqa: S320
+ inst2 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml2)) # noqa: S320
self.assertEqual(inst1.ExtExtension, inst2.ExtExtension)
self.assertEqual(inst1, inst2)
def test_fails_with_qname(self):
- xml1 = fromstring(b"""
+ xml1 = etree.fromstring(b"""
what:lorem
""") # noqa: S320
- xml2 = fromstring(b"""
+ xml2 = etree.fromstring(b"""
who:lorem
""") # noqa: S320
- self.assertNotEqual(tostring(xml1), tostring(xml2))
+ self.assertNotEqual(etree.tostring(xml1), etree.tostring(xml2))
inst1 = xml_structure.ExtensionLocalValue([xml1])
inst2 = xml_structure.ExtensionLocalValue([xml2])
self.assertNotEqual(inst1, inst2)
def test_ignore_not_needed_namespaces(self):
- xml1 = fromstring(b"""
+ xml1 = etree.fromstring(b"""
What does this mean?
""") # noqa: S320
- xml2 = fromstring(b"""
+ xml2 = etree.fromstring(b"""
What does this mean?
""") # noqa: S320
- self.assertNotEqual(tostring(xml1), tostring(xml2))
+ self.assertNotEqual(etree.tostring(xml1), etree.tostring(xml2))
inst1 = xml_structure.ExtensionLocalValue([xml1])
inst2 = xml_structure.ExtensionLocalValue([xml2])
self.assertEqual(inst1, inst2)
@@ -214,12 +214,12 @@ def test_different_length(self):
self.assertNotEqual(inst1, inst2)
def test_ignore_comments(self):
- xml1 = fromstring(b"""
+ xml1 = etree.fromstring(b"""
What does this mean?
""") # noqa: S320
- xml2 = fromstring(b"""
+ xml2 = etree.fromstring(b"""
What does this mean?
""") # noqa: S320
@@ -238,8 +238,8 @@ def test_custom_compare_method(self):
"""
- xml1 = fromstring(xml1) # noqa: S320
- xml2 = fromstring(xml2) # noqa: S320
+ xml1 = etree.fromstring(xml1) # noqa: S320
+ xml2 = etree.fromstring(xml2) # noqa: S320
inst1 = xml_structure.ExtensionLocalValue([xml1])
inst2 = xml_structure.ExtensionLocalValue([xml2])
@@ -266,8 +266,8 @@ def test_cdata(self):
]]>
What does this mean?
"""
- xml1 = fromstring(xml1) # noqa: S320
- xml2 = fromstring(xml2) # noqa: S320
+ xml1 = etree.fromstring(xml1) # noqa: S320
+ xml2 = etree.fromstring(xml2) # noqa: S320
inst1 = xml_structure.ExtensionLocalValue([xml1])
inst2 = xml_structure.ExtensionLocalValue([xml2])
@@ -290,8 +290,8 @@ def test_comparison_subelements(self):
"""
- xml1 = fromstring(xml1) # noqa: S320
- xml2 = fromstring(xml2) # noqa: S320
+ xml1 = etree.fromstring(xml1) # noqa: S320
+ xml2 = etree.fromstring(xml2) # noqa: S320
inst1 = xml_structure.ExtensionLocalValue([xml1])
inst2 = xml_structure.ExtensionLocalValue([xml2])
@@ -323,9 +323,9 @@ def test_assign_iterable(self):
"""
- inst1 = pm_types.InstanceIdentifier.from_node(fromstring(xml1)) # noqa: S320
- inst2 = pm_types.InstanceIdentifier.from_node(fromstring(xml2)) # noqa: S320
- inst3 = pm_types.InstanceIdentifier.from_node(fromstring(xml3)) # noqa: S320
+ inst1 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml1)) # noqa: S320
+ inst2 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml2)) # noqa: S320
+ inst3 = pm_types.InstanceIdentifier.from_node(etree.fromstring(xml3)) # noqa: S320
self.assertNotEqual(inst1.ExtExtension, inst2.ExtExtension)
self.assertEqual(len(inst3.ExtExtension), 0)
# assign a tuple with values from inst1 to inst3
@@ -339,12 +339,12 @@ def my_generator():
self.assertEqual(inst2.ExtExtension, inst3.ExtExtension)
def test_mixed_content_is_ignored(self):
- xml1 = fromstring(b"""
+ xml1 = etree.fromstring(b"""
what:lorem
""") # noqa: S320
- xml2 = fromstring(b"""
+ xml2 = etree.fromstring(b"""
@@ -357,12 +357,12 @@ def test_mixed_content_is_ignored(self):
self.assertEqual(inst1, inst2)
def test_operators_of_extension_local_value(self):
- xml1 = fromstring(b"""
+ xml1 = etree.fromstring(b"""
""") # noqa: S320
- xml2 = fromstring(b"""
+ xml2 = etree.fromstring(b"""
@@ -375,7 +375,7 @@ def test_operators_of_extension_local_value(self):
self.assertTrue(inst1 == inst2)
self.assertFalse(inst1 != inst2)
- xml2 = fromstring(b"""
+ xml2 = etree.fromstring(b"""
@@ -388,7 +388,7 @@ def test_operators_of_extension_local_value(self):
def test_element_with_text_list(self):
"""Verify that a ElementWithTextList.text is a list even if text of node is None or element does not exist."""
- node = Element('foo')
+ node = etree.Element('foo')
node.text = 'abc def ghi'
obj = basetypes.ElementWithTextList.from_node(node)
self.assertEqual(obj.text, ['abc', 'def', 'ghi'])
diff --git a/tests/test_xml_utils.py b/tests/test_xml_utils.py
index b05b549d..7a1ead18 100644
--- a/tests/test_xml_utils.py
+++ b/tests/test_xml_utils.py
@@ -1,6 +1,6 @@
import unittest
-import lxml
+from lxml import etree
from sdc11073 import xml_utils
@@ -242,7 +242,7 @@ class TestXmlParsing(unittest.TestCase):
""")
def setUp(self) -> None:
- self.xml_to_be_tested = [lxml.etree.fromstring(raw_xml)[1] for raw_xml in self.xml_to_be_parsed]
+ self.xml_to_be_tested = [etree.fromstring(raw_xml)[1] for raw_xml in self.xml_to_be_parsed]
def _compare_nodes(self, expected: xml_utils.LxmlElement, actual: xml_utils.LxmlElement):
self.assertNotEqual(id(expected), id(actual))
@@ -269,8 +269,8 @@ def test_copy_node_wo_parent(self):
self._compare_nodes(report, new_report)
def test_lxml_element_type(self):
- self.assertEqual(lxml.etree._Element, xml_utils.LxmlElement)
- parsed_xml = lxml.etree.fromstring(self.xml_to_be_parsed[0])
+ self.assertEqual(etree._Element, xml_utils.LxmlElement)
+ parsed_xml = etree.fromstring(self.xml_to_be_parsed[0])
self.assertEqual(type(parsed_xml), xml_utils.LxmlElement)
- self.assertTrue(isinstance(parsed_xml, lxml.etree._Element))
+ self.assertTrue(isinstance(parsed_xml, etree._Element))
self.assertTrue(isinstance(parsed_xml, xml_utils.LxmlElement))
diff --git a/tests/utils.py b/tests/utils.py
index 03dc61c2..2cf3c08e 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -4,7 +4,7 @@
import string
import uuid
-import lxml.etree
+from lxml import etree
from sdc11073 import location
from sdc11073.xml_types import wsd_types
@@ -34,10 +34,10 @@ def random_location() -> location.SdcLocation:
rm=get_random_RFC3986_string_of_length(7))
-def random_qname() -> lxml.etree.QName:
+def random_qname() -> etree.QName:
"""Create random qname."""
- return lxml.etree.QName(f'{"".join(random.choices(list(string.ascii_letters), k=1))}{uuid.uuid4().hex}',
- f'{"".join(random.choices(list(string.ascii_letters), k=1))}{uuid.uuid4().hex}')
+ return etree.QName(f'{"".join(random.choices(list(string.ascii_letters), k=1))}{uuid.uuid4().hex}',
+ f'{"".join(random.choices(list(string.ascii_letters), k=1))}{uuid.uuid4().hex}')
def random_scope() -> wsd_types.ScopesType:
diff --git a/tools/generate_qnames.py b/tools/generate_qnames.py
index a6b999b7..a2f8f26d 100644
--- a/tools/generate_qnames.py
+++ b/tools/generate_qnames.py
@@ -1,20 +1,21 @@
-from lxml import etree as etree_
+from lxml import etree
-parser = etree_.ETCompatXMLParser(resolve_entities=False)
+parser = etree.ETCompatXMLParser(resolve_entities=False)
+
+qn_complex_type = etree.QName('http://www.w3.org/2001/XMLSchema', 'complexType')
+qn_element = etree.QName('http://www.w3.org/2001/XMLSchema', 'element')
-qn_complex_type = etree_.QName('http://www.w3.org/2001/XMLSchema', 'complexType')
-qn_element = etree_.QName('http://www.w3.org/2001/XMLSchema', 'element')
def _generate_qnames(element, names, target_namespace):
- tag = etree_.QName(element.tag)
+ tag = etree.QName(element.tag)
if tag == qn_complex_type:
name = element.attrib.get('name')
if name:
- names[name] = etree_.QName(target_namespace, name)
+ names[name] = etree.QName(target_namespace, name)
elif tag == qn_element:
name = element.attrib.get('name')
if name:
- names[name] = etree_.QName(target_namespace, name)
+ names[name] = etree.QName(target_namespace, name)
for child in element:
_generate_qnames(child, names, target_namespace)
@@ -25,8 +26,9 @@ def generate_qnames(doc):
_generate_qnames(doc, names, target_namespace)
return names
+
def write_names_file(names, ns_helper, ns_name, fd):
- fd.write('# generated by "generate_qnames"\n' )
+ fd.write('# generated by "generate_qnames"\n')
fd.write(f'from ..namespaces import {ns_helper}\n\n')
fd.write('')
names_list = [n.localname for n in names.values()]
@@ -38,21 +40,21 @@ def write_names_file(names, ns_helper, ns_name, fd):
if __name__ == '__main__':
with open('../src/sdc11073/xsd/BICEPS_ParticipantModel.xsd', 'rb') as pm:
xml_string = pm.read()
- doc_root = etree_.fromstring(xml_string, parser=parser)
- names = generate_qnames(doc_root)
- with open('../src/sdc11073/xml_types/pm_qnames.py', 'w') as fd:
- write_names_file(names, 'default_ns_helper', 'PM', fd)
+ doc_root = etree.fromstring(xml_string, parser=parser)
+ qnames = generate_qnames(doc_root)
+ with open('../src/sdc11073/xml_types/pm_qnames.py', 'w') as stream:
+ write_names_file(qnames, 'default_ns_helper', 'PM', stream)
with open('../src/sdc11073/xsd/BICEPS_MessageModel.xsd', 'rb') as pm:
xml_string = pm.read()
- doc_root = etree_.fromstring(xml_string, parser=parser)
- names = generate_qnames(doc_root)
- with open('../src/sdc11073/xml_types/msg_qnames.py', 'w') as fd:
- write_names_file(names, 'default_ns_helper', 'MSG', fd)
+ doc_root = etree.fromstring(xml_string, parser=parser)
+ qnames = generate_qnames(doc_root)
+ with open('../src/sdc11073/xml_types/msg_qnames.py', 'w') as stream:
+ write_names_file(qnames, 'default_ns_helper', 'MSG', stream)
with open('../src/sdc11073/xsd/Extensionpoint.xsd', 'rb') as pm:
xml_string = pm.read()
- doc_root = etree_.fromstring(xml_string, parser=parser)
- names = generate_qnames(doc_root)
- with open('../src/sdc11073/xml_types/ext_qnames.py', 'w') as fd:
- write_names_file(names, 'default_ns_helper', 'EXT', fd)
+ doc_root = etree.fromstring(xml_string, parser=parser)
+ qnames = generate_qnames(doc_root)
+ with open('../src/sdc11073/xml_types/ext_qnames.py', 'w') as stream:
+ write_names_file(qnames, 'default_ns_helper', 'EXT', stream)