diff --git a/clayer/pysertype.c b/clayer/pysertype.c index 1fcefe6d..01b733b4 100644 --- a/clayer/pysertype.c +++ b/clayer/pysertype.c @@ -89,7 +89,8 @@ static inline const ddspy_serdata_t *cserdata (const ddsi_serdata_t *this) static void typeid_ser (dds_ostream_t *os, const dds_typeid_t *type_id) { - dds_stream_write (os, &cdrstream_allocator, (char *)type_id, DDS_XTypes_TypeIdentifier_desc.m_ops); + if (!dds_stream_write (os, &cdrstream_allocator, (char *)type_id, DDS_XTypes_TypeIdentifier_desc.m_ops)) + abort (); // internally generated data, so should never fail } #ifdef DDS_HAS_TYPE_DISCOVERY @@ -102,7 +103,8 @@ static void typeid_deser (dds_istream_t *is, dds_typeid_t **type_id) static void typeobj_ser (dds_ostream_t *os, const dds_typeobj_t *type_obj) { - dds_stream_write (os, &cdrstream_allocator, (char *)type_obj, DDS_XTypes_TypeObject_desc.m_ops); + if (!dds_stream_write (os, &cdrstream_allocator, (char *)type_obj, DDS_XTypes_TypeObject_desc.m_ops)) + abort (); // internally generated data, so should never fail } #endif /* DDS_HAS_TYPE_DISCOVERY */ @@ -1413,6 +1415,88 @@ static PyObject *ddspy_take_participant (PyObject *self, PyObject *args) return ddspy_readtake_participant (self, args, dds_take); } +static PyObject *ddspy_construct_endpoint (struct dds_builtintopic_endpoint *endpoint, PyObject *sampleinfo, PyObject *endpoint_constructor, PyObject *cqos_to_qos) +{ + PyObject *type_id_bytes = NULL; + + dds_ostream_t type_obj_stream; + const dds_typeinfo_t *type_info = NULL; + + // Fetch the type id + dds_builtintopic_get_endpoint_type_info (endpoint, &type_info); + + // convert to cdr bytes + if (type_info != NULL) + { + dds_ostream_init (&type_obj_stream, &cdrstream_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2); + const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid (type_info); + typeid_ser (&type_obj_stream, type_id); + type_id_bytes = Py_BuildValue ("y#", type_obj_stream.m_buffer, type_obj_stream.m_index); + dds_ostream_fini (&type_obj_stream, &cdrstream_allocator); + } + else + { + type_id_bytes = Py_None; + Py_INCREF (type_id_bytes); + } + + PyObject *qos_p, *qos; + if (endpoint->qos != NULL) + { + qos_p = PyLong_FromVoidPtr (endpoint->qos); + if (PyErr_Occurred ()) + { + Py_DECREF (type_id_bytes); + PyErr_Clear (); + PyErr_SetString (PyExc_Exception, "VoidPtr errored."); + return NULL; + } + qos = PyObject_CallFunction (cqos_to_qos, "O", qos_p); + if (PyErr_Occurred ()) + { + Py_DECREF (type_id_bytes); + Py_DECREF (qos_p); + PyErr_Clear (); + PyErr_SetString (PyExc_Exception, "Callfunc cqos errored."); + return NULL; + } + } + else + { + Py_INCREF (Py_None); + Py_INCREF (Py_None); + qos_p = Py_None; + qos = Py_None; + } + + PyObject *item = PyObject_CallFunction ( + endpoint_constructor, "y#y#Ks#s#OOO", + endpoint->key.v, (Py_ssize_t) 16, + endpoint->participant_key.v, (Py_ssize_t) 16, + endpoint->participant_instance_handle, + endpoint->topic_name, + endpoint->topic_name == NULL ? 0 : strlen(endpoint->topic_name), + endpoint->type_name, + endpoint->type_name == NULL ? 0 : strlen(endpoint->type_name), + qos, + sampleinfo, + type_id_bytes); + if (PyErr_Occurred ()) + { + Py_DECREF (type_id_bytes); + Py_DECREF (qos_p); + Py_DECREF (qos); + PyErr_Clear (); + PyErr_SetString (PyExc_Exception, "Callfunc endpoint constructor errored."); + return NULL; + } + + Py_DECREF (type_id_bytes); + Py_DECREF (qos_p); + Py_DECREF (qos); + return item; +} + static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_return_t (*readtake) (dds_entity_t, void **, dds_sample_info_t *, size_t, uint32_t)) { uint32_t Nu32; @@ -1442,29 +1526,6 @@ static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_re PyObject *list = PyList_New (sts); for (uint32_t i = 0; i < (uint32_t)sts; ++i) { - PyObject *type_id_bytes = NULL; - - dds_ostream_t type_obj_stream; - const dds_typeinfo_t *type_info = NULL; - - // Fetch the type id - dds_builtintopic_get_endpoint_type_info (rcontainer[i], &type_info); - - // convert to cdr bytes - if (type_info != NULL) - { - dds_ostream_init (&type_obj_stream, &cdrstream_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2); - const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid (type_info); - typeid_ser (&type_obj_stream, type_id); - type_id_bytes = Py_BuildValue ("y#", type_obj_stream.m_buffer, type_obj_stream.m_index); - dds_ostream_fini (&type_obj_stream, &cdrstream_allocator); - } - else - { - type_id_bytes = Py_None; - Py_INCREF (type_id_bytes); - } - PyObject *sampleinfo = get_sampleinfo_pyobject (&info[i]); if (PyErr_Occurred ()) { @@ -1473,54 +1534,17 @@ static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_re return NULL; } - PyObject *qos_p, *qos; - if (rcontainer[i]->qos != NULL) - { - qos_p = PyLong_FromVoidPtr (rcontainer[i]->qos); - if (PyErr_Occurred ()) - { - PyErr_Clear (); - PyErr_SetString (PyExc_Exception, "VoidPtr errored."); - return NULL; - } - qos = PyObject_CallFunction (cqos_to_qos, "O", qos_p); - if (PyErr_Occurred ()) - { - PyErr_Clear (); - PyErr_SetString (PyExc_Exception, "Callfunc cqos errored."); - return NULL; - } - } - else - { - Py_INCREF (Py_None); - Py_INCREF (Py_None); - qos_p = Py_None; - qos = Py_None; - } - - PyObject *item = PyObject_CallFunction ( - endpoint_constructor, "y#y#Ks#s#OOO", - rcontainer[i]->key.v, (Py_ssize_t) 16, - rcontainer[i]->participant_key.v, (Py_ssize_t) 16, - rcontainer[i]->participant_instance_handle, - rcontainer[i]->topic_name, - rcontainer[i]->topic_name == NULL ? 0 : strlen(rcontainer[i]->topic_name), - rcontainer[i]->type_name, - rcontainer[i]->type_name == NULL ? 0 : strlen(rcontainer[i]->type_name), - qos, - sampleinfo, - type_id_bytes); + PyObject *item = ddspy_construct_endpoint (rcontainer[i], sampleinfo, endpoint_constructor, cqos_to_qos); if (PyErr_Occurred ()) { + Py_DECREF (sampleinfo); PyErr_Clear (); PyErr_SetString (PyExc_Exception, "Callfunc endpoint constructor errored."); return NULL; } - PyList_SetItem (list, i, item); // steals ref Py_DECREF (sampleinfo); - Py_DECREF (qos_p); - Py_DECREF (qos); + + PyList_SetItem (list, i, item); // steals ref } dds_return_loan (reader, (void **)rcontainer, sts); @@ -1719,6 +1743,58 @@ static PyObject *ddspy_get_typeobj (PyObject *self, PyObject *args) #endif +static PyObject * +ddspy_get_matched_subscription_data(PyObject *self, PyObject *args) +{ + dds_entity_t writer; + dds_instance_handle_t handle; + dds_builtintopic_endpoint_t* endpoint = NULL; + + PyObject* endpoint_constructor; + PyObject* cqos_to_qos; + (void)self; + + if (!PyArg_ParseTuple(args, "iKOO", &writer, &handle, &endpoint_constructor, &cqos_to_qos)) + return NULL; + + endpoint = dds_get_matched_subscription_data(writer, handle); + if (endpoint == NULL) { + Py_INCREF(Py_None); + return Py_None; + } + + PyObject *item = ddspy_construct_endpoint (endpoint, Py_None, endpoint_constructor, cqos_to_qos); + dds_builtintopic_free_endpoint(endpoint); + return item; +} + + +static PyObject * +ddspy_get_matched_publication_data(PyObject *self, PyObject *args) +{ + dds_entity_t reader; + dds_instance_handle_t handle; + dds_builtintopic_endpoint_t* endpoint = NULL; + + PyObject* endpoint_constructor; + PyObject* cqos_to_qos; + (void)self; + + if (!PyArg_ParseTuple(args, "iKOO", &reader, &handle, &endpoint_constructor, &cqos_to_qos)) + return NULL; + + endpoint = dds_get_matched_publication_data(reader, handle); + if (endpoint == NULL) { + Py_INCREF(Py_None); + return Py_None; + } + + PyObject *item = ddspy_construct_endpoint (endpoint, Py_None, endpoint_constructor, cqos_to_qos); + dds_builtintopic_free_endpoint(endpoint); + return item; +} + + char ddspy_docs[] = "DDSPY module"; PyMethodDef ddspy_funcs[] = { @@ -1753,6 +1829,8 @@ PyMethodDef ddspy_funcs[] = { #ifdef DDS_HAS_TYPE_DISCOVERY { "ddspy_get_typeobj", (PyCFunction)ddspy_get_typeobj, METH_VARARGS, ddspy_docs }, #endif + { "ddspy_get_matched_subscription_data", (PyCFunction)ddspy_get_matched_subscription_data, METH_VARARGS, ddspy_docs }, + { "ddspy_get_matched_publication_data", (PyCFunction)ddspy_get_matched_publication_data, METH_VARARGS, ddspy_docs }, { NULL } }; diff --git a/cyclonedds/builtin.py b/cyclonedds/builtin.py index 66bb68ca..494ee3fc 100644 --- a/cyclonedds/builtin.py +++ b/cyclonedds/builtin.py @@ -20,6 +20,7 @@ from .sub import DataReader from .internal import dds_c_t from .qos import _CQos +from .builtin_types import DcpsParticipant, DcpsTopic, DcpsEndpoint, endpoint_constructor, participant_constructor, topic_constructor, cqos_to_qos from cyclonedds._clayer import ddspy_read_participant, ddspy_take_participant, ddspy_read_endpoint, ddspy_take_endpoint, ddspy_read_topic, ddspy_take_topic from cyclonedds.idl._typesupport.DDS.XTypes import TypeIdentifier @@ -40,82 +41,6 @@ def __del__(self): pass -@dataclass -class DcpsParticipant: - """ - Data sample as returned when you subscribe to the BuiltinTopicDcpsParticipant topic. - - Attributes - ---------- - key: uuid.UUID - Unique participant identifier - qos: Qos - Qos policies associated with the participant. - """ - - key: uuid.UUID - qos: Qos - - -@dataclass -class DcpsTopic: - """ - Data sample as returned when you subscribe to the BuiltinTopicDcpsTopic topic. - - Attributes - ---------- - key: - Unique identifier for the topic, publication or subscription endpoint. - topic_name: - Name of the associated topic. - type_name: - Name of the type. - qos: - Qos policies associated with the endpoint. - typeid: - Complete XTypes TypeIdentifier of the type, can be None. - """ - - key: uuid.UUID - topic_name: str - type_name: str - qos: Qos - type_id: Optional[TypeIdentifier] - - -@dataclass -class DcpsEndpoint: - """ - Data sample as returned when you subscribe to the BuiltinTopicDcpsPublication or - BuiltinTopicDcpsSubscription topic. - - Attributes - ---------- - key: uuid.UUID - Unique identifier for the topic, publication or subscription endpoint. - participant_key: uuid.UUID - Unique identifier of the participant the endpoint belongs to. - participant_instance_handle: int - Instance handle - topic_name: str - Name of the associated topic. - type_name: str - Name of the type. - qos: Qos - Qos policies associated with the endpoint. - typeid: TypeIdentifier, optional - Complete XTypes TypeIdentifier of the type, can be None. - """ - - key: uuid.UUID - participant_key: uuid.UUID - participant_instance_handle: int - topic_name: str - type_name: str - qos: Qos - type_id: Optional[TypeIdentifier] - - class BuiltinDataReader(DataReader): """ Builtin topics have sligtly different behaviour than normal topics, so you should use this BuiltinDataReader @@ -164,54 +89,6 @@ def __init__(self, self._keepalive_entities = [self.subscriber] def _make_constructors(self): - def participant_constructor(keybytes, qosobject, sampleinfo): - s = DcpsParticipant(uuid.UUID(bytes=keybytes), qos=qosobject) - s.sample_info = sampleinfo - return s - - def endpoint_constructor(keybytes, participant_keybytes, p_instance_handle, topic_name, type_name, - qosobject, sampleinfo, typeid_bytes): - ident = None - if typeid_bytes is not None: - try: - ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) - except Exception: - pass - - s = DcpsEndpoint( - uuid.UUID(bytes=keybytes), - uuid.UUID(bytes=participant_keybytes), - p_instance_handle, - topic_name, - type_name, - qosobject, - ident - ) - s.sample_info = sampleinfo - return s - - def topic_constructor(keybytes, topic_name, type_name, qosobject, sampleinfo, typeid_bytes): - ident = None - if typeid_bytes is not None: - try: - ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) - except Exception: - pass - - s = DcpsTopic( - uuid.UUID(bytes=keybytes), - topic_name, - type_name, - qosobject, - ident - ) - s.sample_info = sampleinfo - return s - - def cqos_to_qos(pointer): - p = ct.cast(pointer, dds_c_t.qos_p) - return _CQos.cqos_to_qos(p) - if self._topic == BuiltinTopicDcpsParticipant: self._readfn = ddspy_read_participant self._takefn = ddspy_take_participant diff --git a/cyclonedds/builtin_types.py b/cyclonedds/builtin_types.py new file mode 100644 index 00000000..ce8a53b3 --- /dev/null +++ b/cyclonedds/builtin_types.py @@ -0,0 +1,145 @@ +""" + * Copyright(c) 2021 to 2022 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +""" + +import uuid +import ctypes as ct +from dataclasses import dataclass +from typing import Optional, Union, TYPE_CHECKING + +from .core import Qos +from .internal import dds_c_t +from .qos import _CQos + +from cyclonedds.idl._typesupport.DDS.XTypes import TypeIdentifier + +@dataclass +class DcpsParticipant: + """ + Data sample as returned when you subscribe to the BuiltinTopicDcpsParticipant topic. + + Attributes + ---------- + key: uuid.UUID + Unique participant identifier + qos: Qos + Qos policies associated with the participant. + """ + + key: uuid.UUID + qos: Qos + + +@dataclass +class DcpsTopic: + """ + Data sample as returned when you subscribe to the BuiltinTopicDcpsTopic topic. + + Attributes + ---------- + key: + Unique identifier for the topic, publication or subscription endpoint. + topic_name: + Name of the associated topic. + type_name: + Name of the type. + qos: + Qos policies associated with the endpoint. + typeid: + Complete XTypes TypeIdentifier of the type, can be None. + """ + + key: uuid.UUID + topic_name: str + type_name: str + qos: Qos + type_id: Optional[TypeIdentifier] + + +@dataclass +class DcpsEndpoint: + """ + Data sample as returned when you subscribe to the BuiltinTopicDcpsPublication or + BuiltinTopicDcpsSubscription topic. + + Attributes + ---------- + key: uuid.UUID + Unique identifier for the topic, publication or subscription endpoint. + participant_key: uuid.UUID + Unique identifier of the participant the endpoint belongs to. + participant_instance_handle: int + Instance handle + topic_name: str + Name of the associated topic. + type_name: str + Name of the type. + qos: Qos + Qos policies associated with the endpoint. + typeid: TypeIdentifier, optional + Complete XTypes TypeIdentifier of the type, can be None. + """ + + key: uuid.UUID + participant_key: uuid.UUID + participant_instance_handle: int + topic_name: str + type_name: str + qos: Qos + type_id: Optional[TypeIdentifier] + + +def cqos_to_qos(pointer): + p = ct.cast(pointer, dds_c_t.qos_p) + return _CQos.cqos_to_qos(p) + +def participant_constructor(keybytes, qosobject, sampleinfo): + s = DcpsParticipant(uuid.UUID(bytes=keybytes), qos=qosobject) + s.sample_info = sampleinfo + return s + +def endpoint_constructor(keybytes, participant_keybytes, p_instance_handle, topic_name, type_name, qosobject, sampleinfo, typeid_bytes): + ident = None + if typeid_bytes is not None: + try: + ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) + except Exception: + pass + + s = DcpsEndpoint( + uuid.UUID(bytes=keybytes), + uuid.UUID(bytes=participant_keybytes), + p_instance_handle, + topic_name, + type_name, + qosobject, + ident + ) + s.sample_info = sampleinfo + return s + +def topic_constructor(keybytes, topic_name, type_name, qosobject, sampleinfo, typeid_bytes): + ident = None + if typeid_bytes is not None: + try: + ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) + except Exception: + pass + + s = DcpsTopic( + uuid.UUID(bytes=keybytes), + topic_name, + type_name, + qosobject, + ident + ) + s.sample_info = sampleinfo + return s diff --git a/cyclonedds/core.py b/cyclonedds/core.py index a13e3c10..5162cd0e 100644 --- a/cyclonedds/core.py +++ b/cyclonedds/core.py @@ -17,8 +17,9 @@ import ctypes as ct from weakref import WeakValueDictionary from typing import Any, Callable, Dict, Optional, List, TYPE_CHECKING +from datetime import datetime, time, timedelta -from .internal import c_call, c_callable, dds_infinity, dds_c_t, DDS +from .internal import c_call, c_callable, dds_infinity, dds_c_t, DDS, stat_keyvalue, stat_kind from .qos import Qos, Policy, _CQos @@ -1839,6 +1840,7 @@ async def wait_async(self, timeout: Optional[int] = None) -> int: with concurrent.futures.ThreadPoolExecutor() as pool: return await loop.run_in_executor(pool, self.wait, timeout) + @c_call("dds_create_waitset") def _create_waitset(self, domain_participant: dds_c_t.entity) -> dds_c_t.entity: pass @@ -1882,6 +1884,93 @@ def _waitset_set_trigger( pass +class Statistics(DDS): + """Statistics object for entity. + + Attributes + ---------- + entity: Entity + The handle of entity to which this set of statistics applies. + opaque: int + The internal data. + time: datatime + Time stamp of lastest call to `Statistics(entity).refresh()` in nanoseconds since epoch. + count: int + Number of key-value pairs. + data: dict + Data. + + Examples + -------- + >>> Statistics(datawriter) + >>> Statistics(datawriter).refresh() + """ + + entity: Entity + opaque: int + time: datetime + count: int + data: Dict[str, int] + + def __init__(self, entity: Entity): + self.entity = entity + self._c_statistics = self._create_statistics(entity._ref) + if not self._c_statistics: + raise DDSException(DDSException.DDS_RETCODE_ERROR, msg="Could not initialize statistics.") + self._c_statistics = ct.cast( + self._c_statistics, ct.POINTER(dds_c_t.stat_factory(self._c_statistics[0].count)) + ) + self._update() + + def __del__(self): + self._delete_statistics(ct.cast(self._c_statistics, ct.POINTER(dds_c_t.statistics))) + + def _update(self): + self.data = {} + self.opaque = self._c_statistics[0].opaque + self.time = self._c_statistics[0].time + self.count = self._c_statistics[0].count + self.kv = self._c_statistics[0].kv + + for i in range(self.count): + name = self.kv[i].name.decode('utf8') # ct.c_char_p + value = None + if self.kv[i].kind == stat_kind.DDS_STAT_KIND_UINT32: + value = self.kv[i].u.u32 + elif self.kv[i].kind == stat_kind.DDS_STAT_KIND_UINT64: + value = self.kv[i].u.u64 + elif self.kv[i].kind == stat_kind.DDS_STAT_KIND_LENGTHTIME: + value = self.kv[i].u.lengthtime + self.data[name] = value + + def refresh(self): + """Update a previously created statistics structure with current values. + + Only the time stamp and the values (and "opaque") may change. + The set of keys and the types of the values do not change. + """ + self._refresh_statistics(ct.cast(self._c_statistics, ct.POINTER(dds_c_t.statistics))) + self._c_statistics = ct.cast(self._c_statistics, ct.POINTER(dds_c_t.stat_factory(self._c_statistics[0].count))) + self._update() + + def __str__(self): + return f"Statistics({self.entity}, opaque={self.opaque}, time={self.time}, data={self.data})" + + @c_call("dds_create_statistics") + def _create_statistics(self, entity: dds_c_t.entity) -> ct.POINTER(dds_c_t.statistics): + pass + + @c_call("dds_refresh_statistics") + def _refresh_statistics(self, stat: ct.POINTER(dds_c_t.statistics)) -> dds_c_t.returnv: + pass + + @c_call("dds_delete_statistics") + def _delete_statistics(self, stat: ct.POINTER(dds_c_t.statistics)) -> None: + pass + + __repr__ = __str__ + + __all__ = [ "DDSException", "Entity", @@ -1896,4 +1985,5 @@ def _waitset_set_trigger( "QueryCondition", "GuardCondition", "WaitSet", + "Statistics" ] diff --git a/cyclonedds/internal.py b/cyclonedds/internal.py index e3b159d5..958dfd32 100644 --- a/cyclonedds/internal.py +++ b/cyclonedds/internal.py @@ -18,6 +18,7 @@ from ctypes.util import find_library from functools import wraps from dataclasses import dataclass +from enum import IntEnum if 'CYCLONEDDS_PYTHON_NO_IMPORT_LIBS' not in os.environ: @@ -278,6 +279,26 @@ class InvalidSample: sample_info: SampleInfo +class stat_kind(IntEnum): + DDS_STAT_KIND_UINT32 = 0 + DDS_STAT_KIND_UINT64 = 1 + DDS_STAT_KIND_LENGTHTIME = 2 + + +class stat_value(ct.Union): + _fields_ = [ + ('u32', ct.c_uint32), + ('u64', ct.c_uint64), + ('lengthtime', ct.c_uint64) + ] + +class stat_keyvalue(ct.Structure): + _fields_ = [ + ('name', ct.c_char_p), + ('kind', ct.c_int), + ('u', stat_value) + ] + class dds_c_t: # noqa N801 entity = ct.c_int32 time = ct.c_int64 @@ -390,6 +411,26 @@ class sample_buffer(ct.Structure): # noqa N801 ('len', ct.c_size_t) ] + class statistics(ct.Structure): + _fields_ = [ + ('entity', ct.c_int32), + ('opaque', ct.c_uint64), + ('time', ct.c_int64), + ('count', ct.c_size_t), + ('kv', ct.c_void_p) + ] + + def stat_factory(n_kv: int) -> ct.Structure: + class vstatistics(ct.Structure): + _fields_ = [ + ('entity', ct.c_int32), + ('opaque', ct.c_uint64), + ('time', ct.c_int64), + ('count', ct.c_size_t), + ('kv', stat_keyvalue * n_kv) + ] + return vstatistics + try: import cyclonedds._clayer as _clayer # noqa E402 diff --git a/cyclonedds/pub.py b/cyclonedds/pub.py index c040b301..c4ffe2cc 100644 --- a/cyclonedds/pub.py +++ b/cyclonedds/pub.py @@ -10,18 +10,21 @@ * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause """ -from typing import Optional, Union, Generic, TypeVar, TYPE_CHECKING +from typing import Optional, Union, Generic, TypeVar, List, TYPE_CHECKING +import ctypes as ct +import uuid from .internal import c_call, dds_c_t from .core import Entity, DDSException, Listener from .domain import DomainParticipant from .topic import Topic from .qos import _CQos, Qos, LimitedScopeQos, PublisherQos, DataWriterQos +from .builtin_types import DcpsEndpoint, endpoint_constructor, cqos_to_qos from cyclonedds._clayer import ddspy_write, ddspy_write_ts, ddspy_dispose, ddspy_writedispose, ddspy_writedispose_ts, \ ddspy_dispose_handle, ddspy_dispose_handle_ts, ddspy_register_instance, ddspy_unregister_instance, \ ddspy_unregister_instance_handle, ddspy_unregister_instance_ts, ddspy_unregister_instance_handle_ts, \ - ddspy_lookup_instance, ddspy_dispose_ts + ddspy_lookup_instance, ddspy_dispose_ts, ddspy_get_matched_subscription_data if TYPE_CHECKING: @@ -151,6 +154,7 @@ def __init__(self, self._topic = topic self.data_type = topic.data_type self._keepalive_entities = [self.publisher, self.topic] + self._constructor = None cqos = _CQos.cqos_create() ret = self._get_qos(self._ref, cqos) @@ -336,6 +340,122 @@ def lookup_instance(self, sample: _T) -> Optional[int]: return None return ret + def get_matched_subscriptions(self) -> List[int]: + """Get instance handles of the data readers matching a writer. + + Raises + ------ + DDSException: When the number of matching readers < 0. + + Returns + ------- + List[int]: + A list of instance handles of the matching data readers. + """ + num_matched_sub = self._get_matched_subscriptions(self._ref, None, 0) + if num_matched_sub < 0: + raise DDSException(num_matched_sub, f"Occurred when getting the number of matched subscriptions of {repr(self)}") + if num_matched_sub == 0: + return [] + + matched_sub_list = (dds_c_t.instance_handle * int(num_matched_sub))() + matched_sub_list_pt = ct.cast(matched_sub_list, ct.POINTER(dds_c_t.instance_handle)) + + ret = self._get_matched_subscriptions(self._ref, matched_sub_list_pt, num_matched_sub) + if ret >= 0: + return [matched_sub_list[i] for i in range(ret)] + + raise DDSException(ret, f"Occurred when getting the matched subscriptions of {repr(self)}") + + matched_sub = property(get_matched_subscriptions) + + def get_matched_subscription_data(self, handle) -> Optional['cyclonedds.builtin.DcpsEndpoint']: + """Get a description of a reader matched with the provided writer + + Parameters + ---------- + handle: Int + The instance handle of a reader. + + Returns + ------- + DcpsEndpoint: + The sample of the DcpsEndpoint built-in topic. + """ + return ddspy_get_matched_subscription_data(self._ref, handle, endpoint_constructor, cqos_to_qos) + + def get_liveliness_lost_status(self): + """Get LIVELINESS_LOST status + + Raises + ------ + DDSException + + Returns + ------- + liveness_lost_status: + The class 'liveness_lost_status' value. + """ + status = dds_c_t.liveliness_lost_status() + ret = self._get_liveliness_lost_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the liveliness lost status for {repr(self)}") + + def get_offered_deadline_missed_status(self): + """Get OFFERED DEADLINE MISSED status + + Raises + ------ + DDSException + + Returns + ------- + offered_deadline_missed_status: + The class 'offered_deadline_missed_status' value. + """ + status = dds_c_t.offered_deadline_missed_status() + ret = self._get_offered_deadline_missed_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the offered deadline missed status for {repr(self)}") + + def get_offered_incompatible_qos_status(self): + """Get OFFERED INCOMPATIBLE QOS status + + Raises + ------ + DDSException + + Returns + ------- + offered_incompatible_qos_status: + The class 'offered_incompatible_qos_status' value. + """ + status = dds_c_t.offered_incompatible_qos_status() + ret = self._get_offered_incompatible_qos_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the offered incompatible qos status for {repr(self)}") + + def get_publication_matched_status(self): + """Get PUBLICATION MATCHED status + + Raises + ------ + DDSException + + Returns + ------- + publication_matched_status: + The class 'publication_matched_status' value. + """ + status = dds_c_t.publication_matched_status() + ret = self._get_publication_matched_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the publication matched status for {repr(self)}") + @c_call("dds_create_writer") def _create_writer(self, publisher: dds_c_t.entity, topic: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: @@ -344,3 +464,24 @@ def _create_writer(self, publisher: dds_c_t.entity, topic: dds_c_t.entity, qos: @c_call("dds_wait_for_acks") def _wait_for_acks(self, publisher: dds_c_t.entity, timeout: dds_c_t.duration) -> dds_c_t.returnv: pass + + @c_call("dds_get_matched_subscriptions") + def _get_matched_subscriptions(self, writer: dds_c_t.entity, handle: ct.POINTER(dds_c_t.instance_handle), + size: ct.c_size_t) -> dds_c_t.returnv: + pass + + @c_call("dds_get_liveliness_lost_status") + def _get_liveliness_lost_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.liveliness_lost_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_offered_deadline_missed_status") + def _get_offered_deadline_missed_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.offered_deadline_missed_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_offered_incompatible_qos_status") + def _get_offered_incompatible_qos_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.offered_incompatible_qos_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_publication_matched_status") + def _get_publication_matched_status(self, writer: dds_c_t.entity, status: ct.POINTER(dds_c_t.publication_matched_status)) -> dds_c_t.returnv: + pass diff --git a/cyclonedds/sub.py b/cyclonedds/sub.py index b1e2d269..7803d34e 100644 --- a/cyclonedds/sub.py +++ b/cyclonedds/sub.py @@ -10,9 +10,11 @@ * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause """ +import ctypes as ct import asyncio import concurrent.futures from typing import AsyncGenerator, List, Optional, TypeVar, Union, Generator, Generic, TYPE_CHECKING +import uuid from .core import Entity, Listener, DDSException, WaitSet, ReadCondition, SampleState, InstanceState, ViewState from .domain import DomainParticipant @@ -20,8 +22,9 @@ from .internal import c_call, dds_c_t, InvalidSample from .qos import _CQos, Qos, LimitedScopeQos, SubscriberQos, DataReaderQos from .util import duration +from .builtin_types import DcpsEndpoint, endpoint_constructor, cqos_to_qos -from cyclonedds._clayer import ddspy_read, ddspy_take, ddspy_read_handle, ddspy_take_handle, ddspy_lookup_instance +from cyclonedds._clayer import ddspy_read, ddspy_take, ddspy_read_handle, ddspy_take_handle, ddspy_lookup_instance, ddspy_get_matched_publication_data if TYPE_CHECKING: @@ -142,6 +145,7 @@ def __init__( self._topic_ref = topic._ref self._next_condition = None self._keepalive_entities = [self.subscriber, topic] + self._constructor = None @property def topic(self) -> Topic[_T]: @@ -378,6 +382,158 @@ def lookup_instance(self, sample: _T) -> Optional[int]: return None return ret + def get_matched_publications(self) -> List[int]: + """Get instance handles of the data writers matching a reader. + + Raises + ------ + DDSException: When the number of matching writers < 0. + + Returns + ------- + List[int]: + A list of instance handles of the matching data writers. + """ + num_matched_pub = self._get_matched_publications(self._ref, None, 0) + if num_matched_pub < 0: + raise DDSException(num_matched_pub, f"Occurred when getting the number of matched publications of {repr(self)}") + if num_matched_pub == 0: + return [] + + matched_pub_list = (dds_c_t.instance_handle * int(num_matched_pub))() + matched_pub_list_pt = ct.cast(matched_pub_list, ct.POINTER(dds_c_t.instance_handle)) + + ret = self._get_matched_publications(self._ref, matched_pub_list_pt, num_matched_pub) + if ret >= 0: + return [matched_pub_list[i] for i in range(ret)] + + raise DDSException(ret, f"Occurred when getting the matched publications of {repr(self)}") + + matched_pub = property(get_matched_publications) + + def get_matched_publication_data(self, handle) -> Optional['cyclonedds.builtin.DcpsEndpoint']: + """Get a description of a writer matched with the provided reader. + + Parameters + ---------- + handle: Int + The instance handle of a writer. + + Returns + ------- + DcpsEndpoint: + The sample of the DcpsEndpoint built-in topic. + """ + return ddspy_get_matched_publication_data(self._ref, handle, endpoint_constructor, cqos_to_qos) + + def get_liveliness_changed_status(self): + """Get LIVELINESS_CHANGED status + + Raises + ------ + DDSException + + Returns + ------- + liveness_changed_status: + The class 'liveness_changed_status' value. + """ + status = dds_c_t.liveliness_changed_status() + ret = self._get_liveliness_changed_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the liveliness changed status for {repr(self)}") + + def get_requested_deadline_missed_status(self): + """Get REQUESTED DEALINE MISSED status + + Raises + ------ + DDSException + + Returns + ------- + requested_deadline_missed_status: + The class 'requested_deadline_missed_status' value. + """ + status = dds_c_t.requested_deadline_missed_status() + ret = self._get_requested_deadline_missed_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the requested deadline missed status for {repr(self)}") + + def get_requested_incompatible_qos_status(self): + """Get REQUESTED INCOMPATIBLE QOS status + + Raises + ------ + DDSException + + Returns + ------- + requested_incompatible_qos_status: + The class 'requested_incompatible_qos_status' value. + """ + status = dds_c_t.requested_incompatible_qos_status() + ret = self._get_requested_incompatible_qos_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the requested incompatible qos status for {repr(self)}") + + def get_sample_lost_status(self): + """Get SAMPLE LOST status + + Raises + ------ + DDSException + + Returns + ------- + sample_lost_status: + The class 'sample_lost_status' value. + """ + status = dds_c_t.sample_lost_status() + ret = self._get_sample_lost_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the sample lost status for {repr(self)}") + + def get_sample_rejected_status(self): + """Get SAMPLE REJECTED status + + Raises + ------ + DDSException + + Returns + ------- + sample_rejected_status: + The class 'sample_rejected_status' value. + """ + status = dds_c_t.sample_rejected_status() + ret = self._get_sample_rejected_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the sample rejected status for {repr(self)}") + + def get_subscription_matched_status(self): + """Get SUBSCRIPTION MATCHED status + + Raises + ------ + DDSException + + Returns + ------- + subscription_matched_status: + The class 'subscription_matched_status' value. + """ + status = dds_c_t.subscription_matched_status() + ret = self._get_subscription_matched_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the subscription matched status for {repr(self)}") + @c_call("dds_create_reader") def _create_reader(self, subscriber: dds_c_t.entity, topic: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: @@ -387,5 +543,33 @@ def _create_reader(self, subscriber: dds_c_t.entity, topic: dds_c_t.entity, qos: def _wait_for_historical_data(self, reader: dds_c_t.entity, max_wait: dds_c_t.duration) -> dds_c_t.returnv: pass + @c_call("dds_get_matched_publications") + def _get_matched_publications(self, reader: dds_c_t.entity, handle: ct.POINTER(dds_c_t.instance_handle), + size: ct.c_size_t) -> dds_c_t.returnv: + pass + + @c_call("dds_get_liveliness_changed_status") + def _get_liveliness_changed_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.liveliness_changed_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_requested_deadline_missed_status") + def _get_requested_deadline_missed_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.requested_deadline_missed_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_requested_incompatible_qos_status") + def _get_requested_incompatible_qos_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.requested_incompatible_qos_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_sample_lost_status") + def _get_sample_lost_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.sample_lost_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_sample_rejected_status") + def _get_sample_rejected_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.sample_rejected_status)) -> dds_c_t.returnv: + pass + + @c_call("dds_get_subscription_matched_status") + def _get_subscription_matched_status(self, reader: dds_c_t.entity, status: ct.POINTER(dds_c_t.subscription_matched_status)) -> dds_c_t.returnv: + pass __all__ = ["Subscriber", "DataReader"] diff --git a/cyclonedds/topic.py b/cyclonedds/topic.py index 418c1ed1..c78f4aba 100644 --- a/cyclonedds/topic.py +++ b/cyclonedds/topic.py @@ -11,9 +11,9 @@ """ import ctypes as ct -from typing import Union, AnyStr, Optional, Generic, Type, TypeVar, TYPE_CHECKING +from typing import Union, AnyStr, Callable, Optional, Generic, Type, TypeVar, TYPE_CHECKING -from .internal import c_call, dds_c_t +from .internal import DDS, c_call, c_callable, dds_c_t from .core import Entity, DDSException, Listener from .qos import _CQos, Qos, LimitedScopeQos, TopicQos from .idl import IdlStruct, IdlUnion @@ -27,6 +27,15 @@ _S = TypeVar("_S", bound=Union[IdlStruct, IdlUnion]) +class Sample(ct.Structure): + _fields_ = [ + ('usample', ct.c_void_p), + ('usample_size', ct.c_size_t) + ] + +_filter_fn = c_callable(ct.c_bool, [ct.POINTER(Sample), ct.c_void_p]) + + class Topic(Entity, Generic[_S]): """Representing a Topic""" @@ -92,6 +101,59 @@ def get_type_name(self, max_size=256) -> str: typename = property(get_type_name, doc="Get topic type name") + def set_topic_filter(self, callable: Callable[['cyclonedds.topic', Sample], bool]): + """Sets a filter and filter argument on a topic. + + Parameters + ---------- + callable : filter + The filter function used to filter topic samples. + topic: Topic + The topic to set the filter function. + Sample: Sample + The sample that needs to be checked whether to be filtered. + + Returns + ------- + bool + Whether this sample is filtered. + """ + if callable is None: + return self._set_topic_filter(self._ref, None, None) + + def call(csample, args): + return callable(self, self.data_type.deserialize( + ct.string_at(csample[0].usample, csample[0].usample_size))) + + self._topic_filter = _filter_fn(call) + self._set_topic_filter(self._ref, self._topic_filter, None) + + def set_c_topic_filter(self, c_callable): + self._c_topic_filter = c_callable + self._set_topic_filter(self._ref, self._c_topic_filter, None) + + def get_inconsistent_topic_status(self): + """Get INCONSISTENT_TOPIC status + + Raises + ------ + DDSException: + If any error code is returned by the DDS API it is converted into an exception. + + Returns + ------- + inconsistent_topic_status: + The class 'inconsistent_topic_status` value. + """ + status = dds_c_t.inconsistent_topic_status() + ret = self._get_inconsistent_topic_status(self._ref, ct.byref(status)) + if ret == 0: + return status + raise DDSException(ret, f"Occurred when getting the inconsistent topic status for {repr(self)}") + + + + @c_call("dds_get_name") def _get_name(self, topic: dds_c_t.entity, name: ct.c_char_p, size: ct.c_size_t) -> dds_c_t.returnv: pass @@ -99,3 +161,11 @@ def _get_name(self, topic: dds_c_t.entity, name: ct.c_char_p, size: ct.c_size_t) @c_call("dds_get_type_name") def _get_type_name(self, topic: dds_c_t.entity, name: ct.c_char_p, size: ct.c_size_t) -> dds_c_t.returnv: pass + + @c_call("dds_set_topic_filter_and_arg") + def _set_topic_filter(self, topic: dds_c_t.entity, callback: _filter_fn, args: ct.c_void_p) -> dds_c_t.returnv: + pass + + @c_call("dds_get_inconsistent_topic_status") + def _get_inconsistent_topic_status(self, topic: dds_c_t.entity, status: ct.POINTER(dds_c_t.inconsistent_topic_status)) -> dds_c_t.returnv: + pass diff --git a/docs/manual/cyclonedds.core.rst b/docs/manual/cyclonedds.core.rst index aa4f520d..c3d3656e 100644 --- a/docs/manual/cyclonedds.core.rst +++ b/docs/manual/cyclonedds.core.rst @@ -58,3 +58,6 @@ core :undoc-members: :show-inheritance: +.. autoclass:: cyclonedds.core.Statistics + :members: + \ No newline at end of file diff --git a/tests/test_reader.py b/tests/test_reader.py index d6b617bb..f3b40f42 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -1,10 +1,12 @@ import pytest +import random from cyclonedds.domain import Domain, DomainParticipant from cyclonedds.topic import Topic from cyclonedds.sub import Subscriber, DataReader from cyclonedds.pub import Publisher, DataWriter from cyclonedds.util import duration, isgoodentity +from cyclonedds.core import Qos, Policy from support_modules.testtopics import Message, MessageKeyed @@ -181,3 +183,29 @@ def test_reader_wrong_usage_errors(): with pytest.raises(TypeError): DataReader(dp, tp, listener=False) + + +def test_get_matched_publications(): + dp = DomainParticipant(0) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) + dr = DataReader(dp, tp) + + rand_dw = random.randint(0, 20) + dw = [] + for i in range(rand_dw): + dw.append(DataWriter(dp, tp)) + + matched = dr.get_matched_publications() + assert len(matched) == rand_dw + + +def test_get_matched_publication_data(): + dp = DomainParticipant(0) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) + dr = DataReader(dp, tp) + dw = DataWriter(dp, tp) + + matched_handles = dr.get_matched_publications() + for handle in matched_handles: + matched_data = dr.get_matched_publication_data(handle) + assert matched_data is not None diff --git a/tests/test_statistics.py b/tests/test_statistics.py new file mode 100644 index 00000000..ec398b51 --- /dev/null +++ b/tests/test_statistics.py @@ -0,0 +1,30 @@ +import pytest +import time + +from cyclonedds.core import Statistics +from cyclonedds.domain import DomainParticipant +from cyclonedds.topic import Topic +from cyclonedds.sub import Subscriber, DataReader +from cyclonedds.pub import Publisher, DataWriter + +from support_modules.testtopics import Message + + +def test_create_statistics(): + dp = DomainParticipant(0) + tp = Topic(dp, "statistics", Message) + dw = DataWriter(dp, tp) + stat = Statistics(dw) + print(f"stat = {stat}") + assert stat.data + + +def test_refresh_statistics(): + dp = DomainParticipant(0) + tp = Topic(dp, "statistics", Message) + dw = DataWriter(dp, tp) + stat = Statistics(dw) + assert stat.data + time.sleep(0.5) + stat.refresh() + assert stat.time != 0 diff --git a/tests/test_topic.py b/tests/test_topic.py index a00b88ce..80dfc88c 100644 --- a/tests/test_topic.py +++ b/tests/test_topic.py @@ -4,6 +4,10 @@ from cyclonedds.domain import DomainParticipant from cyclonedds.topic import Topic from cyclonedds.util import isgoodentity +from cyclonedds.pub import DataWriter +from cyclonedds.sub import DataReader +from cyclonedds.qos import Policy, Qos +from dataclasses import dataclass from support_modules.testtopics import Message @@ -21,8 +25,32 @@ def test_get_name(): assert tp.name == tp.get_name() == 'MessageTopic' + def test_get_type_name(): dp = DomainParticipant(0) tp = Topic(dp, 'MessageTopic', Message) assert tp.typename == tp.get_type_name() == 'Message' + + +def filter(topic: Topic, sample: Message) -> bool: + if "Filter" in sample.message: + return False + return True + + +def test_topic_filter(): + dp = DomainParticipant(0) + tp = Topic(dp, "MessageTopic", Message, qos=Qos(Policy.History.KeepLast(5))) + tp.set_topic_filter(filter) + dw = DataWriter(dp, tp) + dr = DataReader(dp, tp) + + dw.write(Message("Nice Message")) + dw.write(Message("Test Filtering")) + dw.write(Message("Hello")) + dw.write(Message("lower case filter")) + + data = str(dr.read(5)) + assert "Filter" not in data + assert "filter" and "Hello" and "Nice" in data diff --git a/tests/test_writer.py b/tests/test_writer.py index 9ae3510d..0ee2ae6c 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1,10 +1,13 @@ import pytest +import random from cyclonedds.core import DDSException from cyclonedds.domain import DomainParticipant from cyclonedds.topic import Topic from cyclonedds.pub import Publisher, DataWriter from cyclonedds.util import duration, isgoodentity +from cyclonedds.sub import DataReader +from cyclonedds.core import Qos, Policy from support_modules.testtopics import Message, MessageKeyed @@ -86,3 +89,30 @@ def test_writer_lookup(): assert handle1 > 0 and handle2 > 0 and handle1 != handle2 assert handle1 == dw.lookup_instance(keymsg1) assert handle2 == dw.lookup_instance(keymsg2) + + +def test_get_matched_subscriptions(): + dp = DomainParticipant(0) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) + dw = DataWriter(dp, tp) + + rand_dr = random.randint(0, 20) + dr = [] + for i in range(rand_dr): + dr.append(DataReader(dp, tp)) + + matched = dw.get_matched_subscriptions() + assert len(matched) == rand_dr + + +def test_get_matched_subscription_data(): + dp = DomainParticipant(0) + tp = Topic(dp, f"Message{random.randint(1000000,9999999)}", Message) + dr = DataReader(dp, tp) + dw = DataWriter(dp, tp) + + matched_handles = dw.get_matched_subscriptions() + for handle in matched_handles: + matched_data = dw.get_matched_subscription_data(handle) + print(f"matched data = {matched_data.key}") + assert matched_data is not None