The Listener is thread-safe and can be used without restrictions.
But be aware that all provided callbacks are executed concurrently
in the background thread of the Listener. If you access structures
inside this callback you have to either ensure that you are the only
one accessing it or that it is accessed with a guard like a std::mutex
.
For an introduction into the terminology please read the Glossary in the WaitSet C++ example.
The Listener is a completely thread-safe construct that reacts to events by executing registered callbacks in a background thread. Events can be emitted by EventOrigins like a subscriber or a user trigger. Some of the EventOrigins like the subscriber can hereby emit more than one event type.
The interface of a listener consists of two methods: attachEvent
to attach a
new event with a callback and detachEvent
. These two methods can be called
concurrently, even from inside a callback that was triggered by an event!
!!! attention Please be aware of the thread-safety restrictions of the Listener and read the Thread Safety chapter carefully.
Let's say we have an application that offers us two distinct services:
Radar.FrontLeft.Counter
and Rader.FrontRight.Counter
. Every time we have
received a sample from left and right we would like to calculate the sum with
the newest values and print it out. If we have received only one of the samples,
we store it until we received the other side.
The publisher of this example does not contain any new features but if you have some questions take a look at the icedelivery example.
The subscriber main function starts as usual and after registering the runtime we create the listener that starts a background thread.
iox::popo::Listener listener;
Because it is fun, we also create a heartbeat trigger that will be triggered
every 4 seconds so that heartbeat received
can be printed to the console.
Furthermore, we have to create two subscribers to receive samples for the two
services.
iox::popo::UserTrigger heartbeat;
iox::popo::Subscriber<CounterTopic> subscriberLeft({"Radar", "FrontLeft", "Counter"});
iox::popo::Subscriber<CounterTopic> subscriberRight({"Radar", "FrontRight", "Counter"});
Next thing is a heartbeatThread
which will trigger our heartbeat trigger every
4 seconds.
std::thread heartbeatThread([&] {
while (!iox::hasTerminationRequested())
{
heartbeat.trigger();
std::this_thread::sleep_for(std::chrono::seconds(4));
}
});
Now that everything is set up, we can attach the subscribers to the listener so that
every time a new sample (iox::popo::SubscriberEvent::DATA_RECEIVED
) is received our callback
(onSampleReceivedCallback
) will be called. We also attach
our heartbeat
user trigger to print the heartbeat message to the console via another
callback (heartbeatCallback
).
listener.attachEvent(heartbeat, iox::popo::createNotificationCallback(heartbeatCallback)).or_else([](auto) {
std::cerr << "unable to attach heartbeat event" << std::endl;
std::exit(EXIT_FAILURE);
});
// It is possible to attach any c function here with a signature of void(iox::popo::Subscriber<CounterTopic> *).
// But please be aware that the listener does not take ownership of the callback, therefore it has to exist as
// long as the event is attached. Furthermore, it excludes lambdas which are capturing data since they are not
// convertable to a c function pointer.
// to simplify the example we attach the same callback onSampleReceivedCallback again
listener
.attachEvent(subscriberLeft,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback))
.or_else([](auto) {
std::cerr << "unable to attach subscriberLeft" << std::endl;
std::exit(EXIT_FAILURE);
});
listener
.attachEvent(subscriberRight,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback))
.or_else([](auto) {
std::cerr << "unable to attach subscriberRight" << std::endl;
std::exit(EXIT_FAILURE);
});
Since a user trigger has only one event, we do not have to specify an event when we attach
it to the listener. attachEvent
returns a expected
to inform us if the attachment
succeeded. When this is not the case the error handling is performed in the .or_else([](auto){
part
after each attachEvent
call.
In this example, we choose to attach the same callback twice to make things easier
but you are free to attach any callback with the signature void(iox::popo::Subscriber<CounterTopic> *)
.
The setup is complete, but it would terminate right away since we have no blocker which
waits until SIGINT
or SIGTERM
was send. In the other examples, we had not that problem
since we pulled all the events in a while true loop but working only with callbacks
requires something like our SignalWatcher
which waits until SIGINT
or SIGTERM
was signaled.
iox::waitForTerminationRequest();
When waitForTerminationRequest
unblocks we clean up all resources and terminate the process
gracefully.
listener.detachEvent(heartbeat);
listener.detachEvent(subscriberLeft, iox::popo::SubscriberEvent::DATA_RECEIVED);
listener.detachEvent(subscriberRight, iox::popo::SubscriberEvent::DATA_RECEIVED);
heartbeatThread.join();
Hint: You do not have to detach an EventOrigin like a subscriber or user trigger before it goes out of scope. This also goes for the Listener, the implemented RAII-based design takes care of the resource cleanup.
The callbacks must have a signature like void(PointerToEventOrigin*)
.
Our heartbeatCallback
for instance, just prints the message heartbeat received
.
void heartbeatCallback(iox::popo::UserTrigger*)
{
std::cout << "heartbeat received " << std::endl;
}
The onSampleReceivedCallback
is more complex. We first acquire all the received
samples and check which subscriber signaled the event by acquiring the subscriber's
service description. If the instance is equal to FrontLeft
we store the sample
in the leftCache
otherwise in the rightCache
.
void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber)
{
// take all samples from the subscriber queue
while (subscriber->take().and_then([subscriber](auto& sample) {
auto instanceString = subscriber->getServiceDescription().getInstanceIDString();
// store the sample in the corresponding cache
if (instanceString == iox::capro::IdString_t("FrontLeft"))
{
leftCache.emplace(*sample);
}
else if (instanceString == iox::capro::IdString_t("FrontRight"))
{
rightCache.emplace(*sample);
}
std::cout << "received: " << sample->counter << std::endl;
}))
{
}
// ...
}
In the next step, we check if both caches are filled. If this is the case, we print an extra message which states the result of the sum of both received values. Afterward, we reset both caches to start fresh again.
void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber)
{
// ...
// if both caches are filled we can process them
if (leftCache && rightCache)
{
std::cout << "Received samples from FrontLeft and FrontRight. Sum of " << leftCache->counter << " + "
<< rightCache->counter << " = " << leftCache->counter + rightCache->counter << std::endl;
leftCache.reset();
rightCache.reset();
}
}
Here we demonstrate how you can provide virtually everything as an additional argument to the callbacks.
You just have to provide a reference to a value as additional argument in the attachEvent
method
which is then provided as argument in your callback. One of the use cases is to get access
to members and methods of an object inside a static method which we demonstrate here.
This example is identical to the ice_callbacks_subscriber.cpp one, except that we left out the cyclic heartbeat trigger. The key difference is that the listener is now a class member and in every callback we would like to change some member variables. For this we require an additional pointer to the object since the listener requires c function references which do not allow the usage of lambda expressions with capturing. Here we can use the userType feature which allows us to provide the this pointer as additional argument to the callback.
The main function is now pretty short, we instantiate our object of type CounterService
and call waitForTerminationRequest
like in the
previous example to wait for the control c event from the user.
iox::runtime::PoshRuntime::initRuntime(APP_NAME);
CounterService counterService;
iox::waitForTerminationRequest();
Our CounterService
has the following members:
iox::popo::Subscriber<CounterTopic> m_subscriberLeft;
iox::popo::Subscriber<CounterTopic> m_subscriberRight;
iox::optional<CounterTopic> m_leftCache;
iox::optional<CounterTopic> m_rightCache;
iox::popo::Listener m_listener;
And their purposes are the same as in the previous example. In the constructor
we initialize the two subscribers and attach them to our listener. But now we
add an additional parameter in the iox::popo::createNotificationCallback
, the
dereferenced this
pointer. It has to be dereferenced since we require a reference
as argument.
!!! attention
The user has to ensure that the contextData (*this
) in attachEvent
lives as long as the attachment, with its callback, is attached otherwise
the callback context data pointer is dangling.
CounterService()
: m_subscriberLeft({"Radar", "FrontLeft", "Counter"})
, m_subscriberRight({"Radar", "FrontRight", "Counter"})
{
/// Attach the static method onSampleReceivedCallback and provide this as additional argument
/// to the callback to gain access to the object whenever the callback is called.
/// It is not possible to use a lambda with capturing here since they are not convertable to
/// a C function pointer.
/// important: the user has to ensure that the contextData (*this) lives as long as
/// the subscriber with its callback is attached to the listener
m_listener
.attachEvent(m_subscriberLeft,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback, *this))
.or_else([](auto) {
std::cerr << "unable to attach subscriberLeft" << std::endl;
std::exit(EXIT_FAILURE);
});
m_listener
.attachEvent(m_subscriberRight,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback, *this))
.or_else([](auto) {
std::cerr << "unable to attach subscriberRight" << std::endl;
std::exit(EXIT_FAILURE);
});
}
The onSampleReceivedCallback
is now a static method instead of a free function. It
has to be static since we require a C function reference as callback argument and a
static method can be converted into such a type. But in a static method we do not
have access to the members of an object, therefore we have to add an additional
argument, the pointer to the object itself, called self
.
static void onSampleReceivedCallback(iox::popo::Subscriber<CounterTopic>* subscriber, CounterService* self)
{
// take all samples from the subscriber queue
while (subscriber->take().and_then([subscriber, self](auto& sample) {
auto instanceString = subscriber->getServiceDescription().getInstanceIDString();
// store the sample in the corresponding cache
if (instanceString == iox::capro::IdString_t("FrontLeft"))
{
self->m_leftCache.emplace(*sample);
}
else if (instanceString == iox::capro::IdString_t("FrontRight"))
{
self->m_rightCache.emplace(*sample);
}
std::cout << "received: " << sample->counter << std::endl;
}))
{
}
// if both caches are filled we can process them
if (self->m_leftCache && self->m_rightCache)
{
std::cout << "Received samples from FrontLeft and FrontRight. Sum of " << self->m_leftCache->counter
<< " + " << self->m_rightCache->counter << " = "
<< self->m_leftCache->counter + self->m_rightCache->counter << std::endl;
self->m_leftCache.reset();
self->m_rightCache.reset();
}
}