Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add set_topic_filter & matched_pub, sub & status getters & statistics #281

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 144 additions & 66 deletions clayer/pysertype.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ static inline const ddspy_serdata_t *cserdata (const ddsi_serdata_t *this)

static void typeid_ser (dds_ostream_t *os, const dds_typeid_t *type_id)
{
dds_stream_write (os, &cdrstream_allocator, (char *)type_id, DDS_XTypes_TypeIdentifier_desc.m_ops);
if (!dds_stream_write (os, &cdrstream_allocator, (char *)type_id, DDS_XTypes_TypeIdentifier_desc.m_ops))
abort (); // internally generated data, so should never fail
}

#ifdef DDS_HAS_TYPE_DISCOVERY
Expand All @@ -102,7 +103,8 @@ static void typeid_deser (dds_istream_t *is, dds_typeid_t **type_id)

static void typeobj_ser (dds_ostream_t *os, const dds_typeobj_t *type_obj)
{
dds_stream_write (os, &cdrstream_allocator, (char *)type_obj, DDS_XTypes_TypeObject_desc.m_ops);
if (!dds_stream_write (os, &cdrstream_allocator, (char *)type_obj, DDS_XTypes_TypeObject_desc.m_ops))
abort (); // internally generated data, so should never fail
}

#endif /* DDS_HAS_TYPE_DISCOVERY */
Expand Down Expand Up @@ -1413,6 +1415,88 @@ static PyObject *ddspy_take_participant (PyObject *self, PyObject *args)
return ddspy_readtake_participant (self, args, dds_take);
}

static PyObject *ddspy_construct_endpoint (struct dds_builtintopic_endpoint *endpoint, PyObject *sampleinfo, PyObject *endpoint_constructor, PyObject *cqos_to_qos)
{
PyObject *type_id_bytes = NULL;

dds_ostream_t type_obj_stream;
const dds_typeinfo_t *type_info = NULL;

// Fetch the type id
dds_builtintopic_get_endpoint_type_info (endpoint, &type_info);

// convert to cdr bytes
if (type_info != NULL)
{
dds_ostream_init (&type_obj_stream, &cdrstream_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2);
const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid (type_info);
typeid_ser (&type_obj_stream, type_id);
type_id_bytes = Py_BuildValue ("y#", type_obj_stream.m_buffer, type_obj_stream.m_index);
dds_ostream_fini (&type_obj_stream, &cdrstream_allocator);
}
else
{
type_id_bytes = Py_None;
Py_INCREF (type_id_bytes);
}

PyObject *qos_p, *qos;
if (endpoint->qos != NULL)
{
qos_p = PyLong_FromVoidPtr (endpoint->qos);
if (PyErr_Occurred ())
{
Py_DECREF (type_id_bytes);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "VoidPtr errored.");
return NULL;
}
qos = PyObject_CallFunction (cqos_to_qos, "O", qos_p);
if (PyErr_Occurred ())
{
Py_DECREF (type_id_bytes);
Py_DECREF (qos_p);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc cqos errored.");
return NULL;
}
}
else
{
Py_INCREF (Py_None);
Py_INCREF (Py_None);
qos_p = Py_None;
qos = Py_None;
}

PyObject *item = PyObject_CallFunction (
endpoint_constructor, "y#y#Ks#s#OOO",
endpoint->key.v, (Py_ssize_t) 16,
endpoint->participant_key.v, (Py_ssize_t) 16,
endpoint->participant_instance_handle,
endpoint->topic_name,
endpoint->topic_name == NULL ? 0 : strlen(endpoint->topic_name),
endpoint->type_name,
endpoint->type_name == NULL ? 0 : strlen(endpoint->type_name),
qos,
sampleinfo,
type_id_bytes);
if (PyErr_Occurred ())
{
Py_DECREF (type_id_bytes);
Py_DECREF (qos_p);
Py_DECREF (qos);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc endpoint constructor errored.");
return NULL;
}

Py_DECREF (type_id_bytes);
Py_DECREF (qos_p);
Py_DECREF (qos);
return item;
}

static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_return_t (*readtake) (dds_entity_t, void **, dds_sample_info_t *, size_t, uint32_t))
{
uint32_t Nu32;
Expand Down Expand Up @@ -1442,29 +1526,6 @@ static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_re
PyObject *list = PyList_New (sts);
for (uint32_t i = 0; i < (uint32_t)sts; ++i)
{
PyObject *type_id_bytes = NULL;

dds_ostream_t type_obj_stream;
const dds_typeinfo_t *type_info = NULL;

// Fetch the type id
dds_builtintopic_get_endpoint_type_info (rcontainer[i], &type_info);

// convert to cdr bytes
if (type_info != NULL)
{
dds_ostream_init (&type_obj_stream, &cdrstream_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2);
const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid (type_info);
typeid_ser (&type_obj_stream, type_id);
type_id_bytes = Py_BuildValue ("y#", type_obj_stream.m_buffer, type_obj_stream.m_index);
dds_ostream_fini (&type_obj_stream, &cdrstream_allocator);
}
else
{
type_id_bytes = Py_None;
Py_INCREF (type_id_bytes);
}

PyObject *sampleinfo = get_sampleinfo_pyobject (&info[i]);
if (PyErr_Occurred ())
{
Expand All @@ -1473,54 +1534,17 @@ static PyObject *ddspy_readtake_endpoint (PyObject *self, PyObject *args, dds_re
return NULL;
}

PyObject *qos_p, *qos;
if (rcontainer[i]->qos != NULL)
{
qos_p = PyLong_FromVoidPtr (rcontainer[i]->qos);
if (PyErr_Occurred ())
{
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "VoidPtr errored.");
return NULL;
}
qos = PyObject_CallFunction (cqos_to_qos, "O", qos_p);
if (PyErr_Occurred ())
{
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc cqos errored.");
return NULL;
}
}
else
{
Py_INCREF (Py_None);
Py_INCREF (Py_None);
qos_p = Py_None;
qos = Py_None;
}

PyObject *item = PyObject_CallFunction (
endpoint_constructor, "y#y#Ks#s#OOO",
rcontainer[i]->key.v, (Py_ssize_t) 16,
rcontainer[i]->participant_key.v, (Py_ssize_t) 16,
rcontainer[i]->participant_instance_handle,
rcontainer[i]->topic_name,
rcontainer[i]->topic_name == NULL ? 0 : strlen(rcontainer[i]->topic_name),
rcontainer[i]->type_name,
rcontainer[i]->type_name == NULL ? 0 : strlen(rcontainer[i]->type_name),
qos,
sampleinfo,
type_id_bytes);
PyObject *item = ddspy_construct_endpoint (rcontainer[i], sampleinfo, endpoint_constructor, cqos_to_qos);
if (PyErr_Occurred ())
{
Py_DECREF (sampleinfo);
PyErr_Clear ();
PyErr_SetString (PyExc_Exception, "Callfunc endpoint constructor errored.");
return NULL;
}
PyList_SetItem (list, i, item); // steals ref
Py_DECREF (sampleinfo);
Py_DECREF (qos_p);
Py_DECREF (qos);

PyList_SetItem (list, i, item); // steals ref
}

dds_return_loan (reader, (void **)rcontainer, sts);
Expand Down Expand Up @@ -1719,6 +1743,58 @@ static PyObject *ddspy_get_typeobj (PyObject *self, PyObject *args)

#endif

static PyObject *
ddspy_get_matched_subscription_data(PyObject *self, PyObject *args)
{
dds_entity_t writer;
dds_instance_handle_t handle;
dds_builtintopic_endpoint_t* endpoint = NULL;

PyObject* endpoint_constructor;
PyObject* cqos_to_qos;
(void)self;

if (!PyArg_ParseTuple(args, "iKOO", &writer, &handle, &endpoint_constructor, &cqos_to_qos))
return NULL;

endpoint = dds_get_matched_subscription_data(writer, handle);
if (endpoint == NULL) {
Py_INCREF(Py_None);
return Py_None;
}

PyObject *item = ddspy_construct_endpoint (endpoint, Py_None, endpoint_constructor, cqos_to_qos);
dds_builtintopic_free_endpoint(endpoint);
return item;
}


static PyObject *
ddspy_get_matched_publication_data(PyObject *self, PyObject *args)
{
dds_entity_t reader;
dds_instance_handle_t handle;
dds_builtintopic_endpoint_t* endpoint = NULL;

PyObject* endpoint_constructor;
PyObject* cqos_to_qos;
(void)self;

if (!PyArg_ParseTuple(args, "iKOO", &reader, &handle, &endpoint_constructor, &cqos_to_qos))
return NULL;

endpoint = dds_get_matched_publication_data(reader, handle);
if (endpoint == NULL) {
Py_INCREF(Py_None);
return Py_None;
}

PyObject *item = ddspy_construct_endpoint (endpoint, Py_None, endpoint_constructor, cqos_to_qos);
dds_builtintopic_free_endpoint(endpoint);
return item;
}


char ddspy_docs[] = "DDSPY module";

PyMethodDef ddspy_funcs[] = {
Expand Down Expand Up @@ -1753,6 +1829,8 @@ PyMethodDef ddspy_funcs[] = {
#ifdef DDS_HAS_TYPE_DISCOVERY
{ "ddspy_get_typeobj", (PyCFunction)ddspy_get_typeobj, METH_VARARGS, ddspy_docs },
#endif
{ "ddspy_get_matched_subscription_data", (PyCFunction)ddspy_get_matched_subscription_data, METH_VARARGS, ddspy_docs },
{ "ddspy_get_matched_publication_data", (PyCFunction)ddspy_get_matched_publication_data, METH_VARARGS, ddspy_docs },
{ NULL }
};

Expand Down
Loading