From 4439f9109d2f4d5eaef67d976e56f52fcdd93215 Mon Sep 17 00:00:00 2001 From: Rex Schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Tue, 24 Oct 2023 13:43:52 +0200 Subject: [PATCH] cleanup add/rem datawriter connection logic (#1216) Core: Cleanup add/rem datawriter connection logic (no need to cherry pick anywhere) --- ecal/core/src/readwrite/ecal_writer.cpp | 29 ++++++++--------- ecal/core/src/readwrite/ecal_writer_base.h | 8 ++--- ecal/core/src/readwrite/ecal_writer_shm.cpp | 31 ++----------------- ecal/core/src/readwrite/ecal_writer_shm.h | 3 +- .../src/person_rec_events.cpp | 2 +- .../src/person_snd_events.cpp | 2 +- 6 files changed, 23 insertions(+), 52 deletions(-) diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index ce9c6f13ca..5a18c3d08d 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -668,8 +668,9 @@ namespace eCAL m_loc_subscribed = true; // add a new local subscription - m_writer.udp_mc.AddLocConnection (local_info_.process_id, reader_par_); - m_writer.shm.AddLocConnection (local_info_.process_id, reader_par_); + m_writer.udp_mc.AddLocConnection (local_info_.process_id, local_info_.topic_id, reader_par_); + m_writer.shm.AddLocConnection (local_info_.process_id, local_info_.topic_id, reader_par_); + m_writer.tcp.AddLocConnection(local_info_.process_id, local_info_.topic_id, reader_par_); #ifndef NDEBUG // log it @@ -686,8 +687,9 @@ namespace eCAL } // remove a local subscription - m_writer.udp_mc.RemLocConnection (local_info_.process_id); - m_writer.shm.RemLocConnection (local_info_.process_id); + m_writer.udp_mc.RemLocConnection (local_info_.process_id, local_info_.topic_id); + m_writer.shm.RemLocConnection (local_info_.process_id, local_info_.topic_id); + m_writer.tcp.RemLocConnection (local_info_.process_id, local_info_.topic_id); #ifndef NDEBUG // log it @@ -708,8 +710,9 @@ namespace eCAL m_ext_subscribed = true; // add a new external subscription - m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_); - m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_); + m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_); + m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_); + m_writer.tcp.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_); #ifndef NDEBUG // log it @@ -726,8 +729,9 @@ namespace eCAL } // remove external subscription - m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id); - m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id); + m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id); + m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id); + m_writer.tcp.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id); } void CDataWriter::RefreshRegistration() @@ -764,22 +768,15 @@ namespace eCAL Register(false); // check connection timeouts - // Todo: Why are only Local connections removed, not external connections? - const std::shared_ptr> loc_timeouts = std::make_shared>(); { const std::lock_guard lock(m_sub_map_sync); - m_loc_sub_map.remove_deprecated(loc_timeouts.get()); + m_loc_sub_map.remove_deprecated(); m_ext_sub_map.remove_deprecated(); m_loc_subscribed = !m_loc_sub_map.empty(); m_ext_subscribed = !m_ext_sub_map.empty(); } - for(const auto& loc_sub : *loc_timeouts) - { - m_writer.shm.RemLocConnection(loc_sub.process_id); - } - if (!m_loc_subscribed && !m_ext_subscribed) { Disconnect(); diff --git a/ecal/core/src/readwrite/ecal_writer_base.h b/ecal/core/src/readwrite/ecal_writer_base.h index 8eca724dcc..c2eee1c385 100644 --- a/ecal/core/src/readwrite/ecal_writer_base.h +++ b/ecal/core/src/readwrite/ecal_writer_base.h @@ -48,11 +48,11 @@ namespace eCAL virtual bool SetQOS(const QOS::SWriterQOS& qos_) { m_qos = qos_; return true; }; QOS::SWriterQOS GetQOS() { return(m_qos); }; - virtual bool AddLocConnection(const std::string& /*process_id_*/, const std::string& /*conn_par_*/) { return false; }; - virtual bool RemLocConnection(const std::string& /*process_id_*/) { return false; }; + virtual void AddLocConnection(const std::string& /*process_id_*/, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) {}; + virtual void RemLocConnection(const std::string& /*process_id_*/, const std::string& /*topic_id_*/) {}; - virtual bool AddExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*conn_par_*/) { return false; }; - virtual bool RemExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/) { return false; }; + virtual void AddExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) {}; + virtual void RemExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*topic_id_*/) {}; virtual std::string GetConnectionParameter() { return ""; }; diff --git a/ecal/core/src/readwrite/ecal_writer_shm.cpp b/ecal/core/src/readwrite/ecal_writer_shm.cpp index eefbb74227..5dbee9cd44 100644 --- a/ecal/core/src/readwrite/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/ecal_writer_shm.cpp @@ -177,39 +177,14 @@ namespace eCAL return sent; } - bool CDataWriterSHM::AddLocConnection(const std::string& process_id_, const std::string& /*conn_par_*/) + void CDataWriterSHM::AddLocConnection(const std::string& process_id_, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) { - if (!m_created) return false; - bool ret_state(true); + if (!m_created) return; for (auto& memory_file : m_memory_file_vec) { - ret_state &= memory_file->Connect(process_id_); + memory_file->Connect(process_id_); } - - return ret_state; - } - - bool CDataWriterSHM::RemLocConnection(const std::string& process_id_) - { - if (!m_created) return false; - bool ret_state(true); - - for (auto& memory_file : m_memory_file_vec) - { - // This is not working correctly under POSIX for memory files that are read and written within the same process. - // - // The functions 'CSyncMemoryFile::Disconnect' and 'CDataWriterSHM::RemLocConnection' are now called - // by the new Subscriber Unregistration event logic and were never called in any previous eCAL version. - // - // TODO: Fix this in 'CSyncMemoryFile::Disconnect' to handle event resources properly. - if (std::to_string(eCAL::Process::GetProcessID()) != process_id_) - { - ret_state &= memory_file->Disconnect(process_id_); - } - } - - return ret_state; } std::string CDataWriterSHM::GetConnectionParameter() diff --git a/ecal/core/src/readwrite/ecal_writer_shm.h b/ecal/core/src/readwrite/ecal_writer_shm.h index b4dbdbfd64..6dd7e72e40 100644 --- a/ecal/core/src/readwrite/ecal_writer_shm.h +++ b/ecal/core/src/readwrite/ecal_writer_shm.h @@ -51,8 +51,7 @@ namespace eCAL bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override; - bool AddLocConnection(const std::string& process_id_, const std::string& conn_par_) override; - bool RemLocConnection(const std::string& process_id_) override; + void AddLocConnection(const std::string& process_id_, const std::string& /*topic_id_*/, const std::string& conn_par_) override; std::string GetConnectionParameter() override; diff --git a/samples/cpp/pubsub/protobuf/person_rec_events/src/person_rec_events.cpp b/samples/cpp/pubsub/protobuf/person_rec_events/src/person_rec_events.cpp index dd592e5da9..fbabe467ea 100644 --- a/samples/cpp/pubsub/protobuf/person_rec_events/src/person_rec_events.cpp +++ b/samples/cpp/pubsub/protobuf/person_rec_events/src/person_rec_events.cpp @@ -26,7 +26,7 @@ void OnEvent(const char* topic_name_, const struct eCAL::SSubEventCallbackData* data_) { - std::cout << "topic name : " << topic_name_ << std::endl; + std::cout << "topic name : " << topic_name_ << std::endl; switch (data_->type) { case sub_event_connected: diff --git a/samples/cpp/pubsub/protobuf/person_snd_events/src/person_snd_events.cpp b/samples/cpp/pubsub/protobuf/person_snd_events/src/person_snd_events.cpp index 3c70c7767a..a61356622e 100644 --- a/samples/cpp/pubsub/protobuf/person_snd_events/src/person_snd_events.cpp +++ b/samples/cpp/pubsub/protobuf/person_snd_events/src/person_snd_events.cpp @@ -26,7 +26,7 @@ void OnEvent(const char* topic_name_, const struct eCAL::SPubEventCallbackData* data_) { - std::cout << "topic name : " << topic_name_ << std::endl; + std::cout << "topic name : " << topic_name_ << std::endl; switch (data_->type) { case pub_event_connected: