diff --git a/modules/ObjectControl/inc/channel.hpp b/modules/ObjectControl/inc/channel.hpp index 20917d4ed..e1a02fb40 100644 --- a/modules/ObjectControl/inc/channel.hpp +++ b/modules/ObjectControl/inc/channel.hpp @@ -19,16 +19,22 @@ struct MonitorMessage : std::pair {}; class Channel : public Loggable { public: - Channel(const size_t bufferLength, const int type, rclcpp::Logger log) + Channel(const size_t bufferLength, const int type, rclcpp::Logger log, int id = 0, int transmitter = 0) : channelType(type), transmitBuffer(bufferLength, 0), receiveBuffer(bufferLength, 0), - Loggable(log) + Loggable(log), + objectId(id), + transmitterId(transmitter) {} - Channel(int type, rclcpp::Logger log) : Channel(1024, type, log) {} + Channel(int type, rclcpp::Logger log, int id = 0) : Channel(1024, type, log, id) {} struct sockaddr_in addr = {}; int socket = -1; int channelType = 0; //!< SOCK_STREAM or SOCK_DGRAM + int objectId = 0; + int transmitterId = 0; + int sentMessageCounter = 0; + int receivedMessageCounter = 0; std::vector transmitBuffer; std::vector receiveBuffer; @@ -50,4 +56,7 @@ class Channel : public Loggable friend Channel& operator>>(Channel&,MonitorMessage&); friend Channel& operator>>(Channel&,ObjectPropertiesType&); friend Channel& operator>>(Channel&,GeneralResponseMessageType&); + +protected: + MessageHeaderType *populateHeaderType(MessageHeaderType *header); }; \ No newline at end of file diff --git a/modules/ObjectControl/inc/objectconnection.hpp b/modules/ObjectControl/inc/objectconnection.hpp index 460a6515c..000d4c3e3 100644 --- a/modules/ObjectControl/inc/objectconnection.hpp +++ b/modules/ObjectControl/inc/objectconnection.hpp @@ -22,9 +22,9 @@ class ObjectConnection : public Loggable { Channel cmd; Channel mntr; - ObjectConnection(rclcpp::Logger log) - : cmd(SOCK_STREAM, log), - mntr(SOCK_DGRAM, log), + ObjectConnection(rclcpp::Logger log, int id) + : cmd(SOCK_STREAM, log, id), + mntr(SOCK_DGRAM, log, id), Loggable(log) { pipe(interruptionPipeFds); } diff --git a/modules/ObjectControl/src/channel.cpp b/modules/ObjectControl/src/channel.cpp index d25a7f512..fda40a598 100644 --- a/modules/ObjectControl/src/channel.cpp +++ b/modules/ObjectControl/src/channel.cpp @@ -7,11 +7,13 @@ #include "iso22133.h" #include #include "atosTime.h" +#include "header.h" using namespace ROSChannels; Channel& operator<<(Channel& chnl, const HeabMessageDataType& heartbeat) { - auto nBytes = encodeHEABMessage(&heartbeat.dataTimestamp, heartbeat.controlCenterStatus, + MessageHeaderType header; + auto nBytes = encodeHEABMessage(chnl.populateHeaderType(&header), &heartbeat.dataTimestamp, heartbeat.controlCenterStatus, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); if (nBytes < 0) { throw std::invalid_argument(std::string("Failed to encode HEAB message: ") + strerror(errno)); @@ -24,7 +26,8 @@ Channel& operator<<(Channel& chnl, const HeabMessageDataType& heartbeat) { } Channel& operator<<(Channel& chnl, const ObjectSettingsType& settings) { - auto nBytes = encodeOSEMMessage(&settings, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); + MessageHeaderType header; + auto nBytes = encodeOSEMMessage(chnl.populateHeaderType(&header), &settings, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); if (nBytes < 0) { throw std::invalid_argument(std::string("Failed to encode OSEM message: ") + strerror(errno)); } @@ -39,7 +42,8 @@ Channel& operator<<(Channel& chnl, const ATOS::Trajectory& traj) { ssize_t nBytes; // TRAJ header - nBytes = encodeTRAJMessageHeader( + MessageHeaderType header; + nBytes = encodeTRAJMessageHeader(chnl.populateHeaderType(&header), traj.id, TRAJECTORY_INFO_RELATIVE_TO_ORIGIN, traj.name.c_str(),traj.name.length(), static_cast(traj.points.size()), chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); @@ -87,7 +91,8 @@ Channel& operator<<(Channel& chnl, const ATOS::Trajectory& traj) { } Channel& operator<<(Channel& chnl, const ObjectCommandType& cmd) { - auto nBytes = encodeOSTMMessage(cmd, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); + MessageHeaderType header; + auto nBytes = encodeOSTMMessage(chnl.populateHeaderType(&header), cmd, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); if (nBytes < 0) { throw std::invalid_argument(std::string("Failed to encode OSTM message: ") + strerror(errno)); } @@ -99,7 +104,8 @@ Channel& operator<<(Channel& chnl, const ObjectCommandType& cmd) { } Channel& operator<<(Channel& chnl, const StartMessageType& strt) { - auto nBytes = encodeSTRTMessage(&strt, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); + MessageHeaderType header; + auto nBytes = encodeSTRTMessage(chnl.populateHeaderType(&header), &strt, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); if (nBytes < 0) { throw std::invalid_argument(std::string("Failed to encode STRT message: ") + strerror(errno)); } @@ -115,8 +121,11 @@ Channel& operator>>(Channel& chnl, MonitorMessage& monitor) { if (chnl.pendingMessageType() == MESSAGE_ID_MONR) { struct timeval tv; TimeSetToCurrentSystemTime(&tv); + HeaderType header; + decodeISOHeader(chnl.receiveBuffer.data(), chnl.receiveBuffer.size(), &header, false); + monitor.first = header.transmitterID; auto nBytes = decodeMONRMessage(chnl.receiveBuffer.data(), chnl.receiveBuffer.size(), tv, - &monitor.first, &monitor.second, false); + &monitor.second, false); if (nBytes < 0) { throw std::invalid_argument("Failed to decode MONR message"); } @@ -174,7 +183,9 @@ Channel& operator<<(Channel& chnl, const ControlSignal::message_type::SharedPtr rcmm.throttleManoeuvre.pct = csp->throttle; rcmm.brakeManoeuvre.pct = csp->brake; rcmm.steeringManoeuvre.pct = csp->steering_angle; - auto nBytes = encodeRCMMMessage(&rcmm, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); + + MessageHeaderType header; + auto nBytes = encodeRCMMMessage(chnl.populateHeaderType(&header),&rcmm, chnl.transmitBuffer.data(), chnl.transmitBuffer.size(), false); if (nBytes < 0) { throw std::invalid_argument(std::string("Failed to encode RCM message: ") + strerror(errno)); } @@ -220,6 +231,14 @@ ISOMessageID Channel::pendingMessageType(bool awaitNext) { } } +MessageHeaderType *Channel::populateHeaderType(MessageHeaderType *header) { + memset(header, 0, sizeof (MessageHeaderType)); + header->transmitterID = this->transmitterId; + header->receiverID = this->objectId; + header->messageCounter = this->sentMessageCounter++; + return header; +} + void Channel::connect( std::shared_future stopRequest, const std::chrono::milliseconds retryPeriod) { diff --git a/modules/ObjectControl/src/testobject.cpp b/modules/ObjectControl/src/testobject.cpp index 9baee430b..bddb8c6b4 100644 --- a/modules/ObjectControl/src/testobject.cpp +++ b/modules/ObjectControl/src/testobject.cpp @@ -15,7 +15,7 @@ using std::placeholders::_1; TestObject::TestObject(uint32_t id) : Node("object_" + std::to_string(id)), osiChannel(SOCK_STREAM, get_logger()), - comms(get_logger()), + comms(get_logger(), id), conf(get_logger()) { pathSub = std::make_shared(*this, id, std::bind(&TestObject::onPathMessage, this, _1, id)); @@ -29,7 +29,7 @@ void TestObject::onPathMessage(const ROSChannels::Path::message_type::SharedPtr TestObject::TestObject(TestObject&& other) : osiChannel(SOCK_STREAM, other.get_logger()), - comms(other.get_logger()), + comms(other.get_logger(), other.getTransmitterID()), state(other.state), conf(other.conf), lastMonitor(other.lastMonitor), @@ -271,7 +271,7 @@ void TestObject::sendSettings() { objSettings.testMode = TEST_MODE_PREPLANNED; objSettings.desiredID.transmitter = conf.getTransmitterID(); - objSettings.desiredID.controlCentre = ::getTransmitterID(); + objSettings.desiredID.controlCentre = 0; objSettings.desiredID.subTransmitter = 0; objSettings.coordinateSystemOrigin = conf.getOrigin();