Skip to content

Commit

Permalink
cleanup add/rem datawriter connection logic (#1216)
Browse files Browse the repository at this point in the history
Core: Cleanup add/rem datawriter connection logic (no need to cherry pick anywhere)
  • Loading branch information
rex-schilasky authored Oct 24, 2023
1 parent b0b7655 commit 4439f91
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 52 deletions.
29 changes: 13 additions & 16 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,9 @@ namespace eCAL
m_loc_subscribed = true;

// add a new local subscription
m_writer.udp_mc.AddLocConnection (local_info_.process_id, reader_par_);
m_writer.shm.AddLocConnection (local_info_.process_id, reader_par_);
m_writer.udp_mc.AddLocConnection (local_info_.process_id, local_info_.topic_id, reader_par_);
m_writer.shm.AddLocConnection (local_info_.process_id, local_info_.topic_id, reader_par_);
m_writer.tcp.AddLocConnection(local_info_.process_id, local_info_.topic_id, reader_par_);

#ifndef NDEBUG
// log it
Expand All @@ -686,8 +687,9 @@ namespace eCAL
}

// remove a local subscription
m_writer.udp_mc.RemLocConnection (local_info_.process_id);
m_writer.shm.RemLocConnection (local_info_.process_id);
m_writer.udp_mc.RemLocConnection (local_info_.process_id, local_info_.topic_id);
m_writer.shm.RemLocConnection (local_info_.process_id, local_info_.topic_id);
m_writer.tcp.RemLocConnection (local_info_.process_id, local_info_.topic_id);

#ifndef NDEBUG
// log it
Expand All @@ -708,8 +710,9 @@ namespace eCAL
m_ext_subscribed = true;

// add a new external subscription
m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);
m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);
m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_);
m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_);
m_writer.tcp.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_);

#ifndef NDEBUG
// log it
Expand All @@ -726,8 +729,9 @@ namespace eCAL
}

// remove external subscription
m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id);
m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id);
m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id);
m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id);
m_writer.tcp.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id);
}

void CDataWriter::RefreshRegistration()
Expand Down Expand Up @@ -764,22 +768,15 @@ namespace eCAL
Register(false);

// check connection timeouts
// Todo: Why are only Local connections removed, not external connections?
const std::shared_ptr<std::list<SLocalSubscriptionInfo>> loc_timeouts = std::make_shared<std::list<SLocalSubscriptionInfo>>();
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.remove_deprecated(loc_timeouts.get());
m_loc_sub_map.remove_deprecated();
m_ext_sub_map.remove_deprecated();

m_loc_subscribed = !m_loc_sub_map.empty();
m_ext_subscribed = !m_ext_sub_map.empty();
}

for(const auto& loc_sub : *loc_timeouts)
{
m_writer.shm.RemLocConnection(loc_sub.process_id);
}

if (!m_loc_subscribed && !m_ext_subscribed)
{
Disconnect();
Expand Down
8 changes: 4 additions & 4 deletions ecal/core/src/readwrite/ecal_writer_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ namespace eCAL
virtual bool SetQOS(const QOS::SWriterQOS& qos_) { m_qos = qos_; return true; };
QOS::SWriterQOS GetQOS() { return(m_qos); };

virtual bool AddLocConnection(const std::string& /*process_id_*/, const std::string& /*conn_par_*/) { return false; };
virtual bool RemLocConnection(const std::string& /*process_id_*/) { return false; };
virtual void AddLocConnection(const std::string& /*process_id_*/, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) {};
virtual void RemLocConnection(const std::string& /*process_id_*/, const std::string& /*topic_id_*/) {};

virtual bool AddExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*conn_par_*/) { return false; };
virtual bool RemExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/) { return false; };
virtual void AddExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) {};
virtual void RemExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*topic_id_*/) {};

virtual std::string GetConnectionParameter() { return ""; };

Expand Down
31 changes: 3 additions & 28 deletions ecal/core/src/readwrite/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,39 +177,14 @@ namespace eCAL
return sent;
}

bool CDataWriterSHM::AddLocConnection(const std::string& process_id_, const std::string& /*conn_par_*/)
void CDataWriterSHM::AddLocConnection(const std::string& process_id_, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/)
{
if (!m_created) return false;
bool ret_state(true);
if (!m_created) return;

for (auto& memory_file : m_memory_file_vec)
{
ret_state &= memory_file->Connect(process_id_);
memory_file->Connect(process_id_);
}

return ret_state;
}

bool CDataWriterSHM::RemLocConnection(const std::string& process_id_)
{
if (!m_created) return false;
bool ret_state(true);

for (auto& memory_file : m_memory_file_vec)
{
// This is not working correctly under POSIX for memory files that are read and written within the same process.
//
// The functions 'CSyncMemoryFile::Disconnect' and 'CDataWriterSHM::RemLocConnection' are now called
// by the new Subscriber Unregistration event logic and were never called in any previous eCAL version.
//
// TODO: Fix this in 'CSyncMemoryFile::Disconnect' to handle event resources properly.
if (std::to_string(eCAL::Process::GetProcessID()) != process_id_)
{
ret_state &= memory_file->Disconnect(process_id_);
}
}

return ret_state;
}

std::string CDataWriterSHM::GetConnectionParameter()
Expand Down
3 changes: 1 addition & 2 deletions ecal/core/src/readwrite/ecal_writer_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ namespace eCAL

bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override;

bool AddLocConnection(const std::string& process_id_, const std::string& conn_par_) override;
bool RemLocConnection(const std::string& process_id_) override;
void AddLocConnection(const std::string& process_id_, const std::string& /*topic_id_*/, const std::string& conn_par_) override;

std::string GetConnectionParameter() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

void OnEvent(const char* topic_name_, const struct eCAL::SSubEventCallbackData* data_)
{
std::cout << "topic name : " << topic_name_ << std::endl;
std::cout << "topic name : " << topic_name_ << std::endl;
switch (data_->type)
{
case sub_event_connected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

void OnEvent(const char* topic_name_, const struct eCAL::SPubEventCallbackData* data_)
{
std::cout << "topic name : " << topic_name_ << std::endl;
std::cout << "topic name : " << topic_name_ << std::endl;
switch (data_->type)
{
case pub_event_connected:
Expand Down

0 comments on commit 4439f91

Please sign in to comment.