Skip to content

Commit

Permalink
Merge pull request #917 from luxonis/message_groups
Browse files Browse the repository at this point in the history
Message groups
  • Loading branch information
asahtik authored Nov 29, 2023
2 parents 3260c38 + a5b6cbb commit e38023c
Show file tree
Hide file tree
Showing 25 changed files with 680 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ add_library(${TARGET_CORE_NAME}
src/pipeline/node/ColorCamera.cpp
src/pipeline/node/Camera.cpp
src/pipeline/node/ToF.cpp
src/pipeline/node/MessageDemux.cpp
src/pipeline/node/MonoCamera.cpp
src/pipeline/node/StereoDepth.cpp
src/pipeline/node/Sync.cpp
src/pipeline/node/NeuralNetwork.cpp
src/pipeline/node/ImageManip.cpp
src/pipeline/node/Warp.cpp
Expand Down Expand Up @@ -248,6 +250,7 @@ add_library(${TARGET_CORE_NAME}
src/pipeline/datatype/TrackedFeatures.cpp
src/pipeline/datatype/FeatureTrackerConfig.cpp
src/pipeline/datatype/ToFConfig.cpp
src/pipeline/datatype/MessageGroup.cpp
src/utility/H26xParsers.cpp
src/utility/Initialization.cpp
src/utility/Resources.cpp
Expand Down
2 changes: 1 addition & 1 deletion cmake/Depthai/DepthaiDeviceSideConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set(DEPTHAI_DEVICE_SIDE_MATURITY "snapshot")

# "full commit hash of device side binary"
set(DEPTHAI_DEVICE_SIDE_COMMIT "cae51725e78d8128ee7324c13f74648bcc59addc")
set(DEPTHAI_DEVICE_SIDE_COMMIT "36ce766b29927e2b6bd8873d4e3799ccca3cb821")

# "version if applicable"
set(DEPTHAI_DEVICE_SIDE_VERSION "")
6 changes: 3 additions & 3 deletions cmake/Hunter/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ hunter_config(
# Specific Catch2 version
hunter_config(
Catch2
VERSION "2.13.7"
URL "https://github.com/catchorg/Catch2/archive/refs/tags/v3.2.1.tar.gz"
SHA1 "acfba7f71cbbbbf60bc1bc4c0e3efca4a9c70df7"
VERSION "3.4.0"
URL "https://github.com/catchorg/Catch2/archive/refs/tags/v3.4.0.tar.gz"
SHA1 "4c308576c856a43dc88949a8f64ef90ebf94ae1b"
)

# ZLib - Luxonis fix for alias on imported target for old CMake versions
Expand Down
5 changes: 5 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,8 @@ target_compile_definitions(detection_parser PRIVATE BLOB_PATH="${mobilenet_blob}

# DetectionParser
dai_add_example(crash_report CrashReport/crash_report.cpp OFF)

# Sync
dai_add_example(sync_scripts Sync/sync_scripts.cpp ON)
dai_add_example(demux_message_group Sync/demux_message_group.cpp ON)
dai_add_example(depth_video_synced Sync/depth_video_synced.cpp ON)
63 changes: 63 additions & 0 deletions examples/Sync/demux_message_group.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include <chrono>
#include <iostream>

#include "depthai/depthai.hpp"

int main() {
dai::Pipeline pipeline;

auto script1 = pipeline.create<dai::node::Script>();
script1->setScript(
R"SCRPT(
from time import sleep
while True:
sleep(1)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(0, 128)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
)SCRPT");

auto script2 = pipeline.create<dai::node::Script>();
script2->setScript(
R"SCRPT(
from time import sleep
while True:
sleep(0.3)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(128, 256)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
)SCRPT");

auto sync = pipeline.create<dai::node::Sync>();
sync->setSyncThreshold(std::chrono::milliseconds(100));

auto demux = pipeline.create<dai::node::MessageDemux>();

auto xout1 = pipeline.create<dai::node::XLinkOut>();
xout1->setStreamName("xout1");
auto xout2 = pipeline.create<dai::node::XLinkOut>();
xout2->setStreamName("xout2");

script1->outputs["out"].link(sync->inputs["s1"]);
script2->outputs["out"].link(sync->inputs["s2"]);
sync->out.link(demux->input);
demux->outputs["s1"].link(xout1->input);
demux->outputs["s2"].link(xout2->input);

dai::Device device(pipeline);
std::cout << "Start" << std::endl;
auto queue1 = device.getOutputQueue("xout1", 10, true);
auto queue2 = device.getOutputQueue("xout2", 10, true);
while(true) {
auto bufS1 = queue1->get<dai::Buffer>();
auto bufS2 = queue2->get<dai::Buffer>();
std::cout << "Buffer 1 timestamp: " << bufS1->getTimestamp().time_since_epoch().count() << std::endl;
std::cout << "Buffer 2 timestamp: " << bufS2->getTimestamp().time_since_epoch().count() << std::endl;
std::cout << "----------" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
68 changes: 68 additions & 0 deletions examples/Sync/depth_video_synced.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#include <iostream>

// Includes common necessary includes for development using depthai library
#include "depthai/depthai.hpp"

int main() {
// Create pipeline
dai::Pipeline pipeline;

// Define sources and outputs
auto monoLeft = pipeline.create<dai::node::MonoCamera>();
auto monoRight = pipeline.create<dai::node::MonoCamera>();
auto color = pipeline.create<dai::node::ColorCamera>();
auto stereo = pipeline.create<dai::node::StereoDepth>();
auto sync = pipeline.create<dai::node::Sync>();

auto xoutGrp = pipeline.create<dai::node::XLinkOut>();

// XLinkOut
xoutGrp->setStreamName("xout");

// Properties
monoLeft->setResolution(dai::MonoCameraProperties::SensorResolution::THE_400_P);
monoLeft->setCamera("left");
monoRight->setResolution(dai::MonoCameraProperties::SensorResolution::THE_400_P);
monoRight->setCamera("right");

stereo->setDefaultProfilePreset(dai::node::StereoDepth::PresetMode::HIGH_ACCURACY);

color->setCamera("color");

sync->setSyncThreshold(std::chrono::milliseconds(100));

// Linking
monoLeft->out.link(stereo->left);
monoRight->out.link(stereo->right);

stereo->disparity.link(sync->inputs["disparity"]);
color->video.link(sync->inputs["video"]);

sync->out.link(xoutGrp->input);

// Connect to device and start pipeline
dai::Device device(pipeline);

auto queue = device.getOutputQueue("xout", 10, true);

float disparityMultiplier = 255 / stereo->initialConfig.getMaxDisparity();

while(true) {
auto msgGrp = queue->get<dai::MessageGroup>();
for(auto& frm : *msgGrp) {
auto imgFrm = std::dynamic_pointer_cast<dai::ImgFrame>(frm.second);
cv::Mat img = imgFrm->getCvFrame();
if(frm.first == "disparity") {
img.convertTo(img, CV_8UC1, disparityMultiplier); // Extend disparity range
cv::applyColorMap(img, img, cv::COLORMAP_JET);
}
cv::imshow(frm.first, img);
}

int key = cv::waitKey(1);
if(key == 'q' || key == 'Q') {
return 0;
}
}
return 0;
}
56 changes: 56 additions & 0 deletions examples/Sync/sync_scripts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include <chrono>
#include <iostream>

#include "depthai/depthai.hpp"

int main() {
dai::Pipeline pipeline;

auto script1 = pipeline.create<dai::node::Script>();
script1->setScript(
R"SCRPT(
from time import sleep
while True:
sleep(1)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(0, 128)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
)SCRPT");

auto script2 = pipeline.create<dai::node::Script>();
script2->setScript(
R"SCRPT(
from time import sleep
while True:
sleep(0.3)
b = Buffer(512)
b.setData(bytes(4 * [i for i in range(128, 256)]))
b.setTimestamp(Clock.now())
node.io['out'].send(b)
)SCRPT");

auto sync = pipeline.create<dai::node::Sync>();
sync->setSyncThreshold(std::chrono::milliseconds(100));

auto xout = pipeline.create<dai::node::XLinkOut>();
xout->setStreamName("xout");

sync->out.link(xout->input);
script1->outputs["out"].link(sync->inputs["s1"]);
script2->outputs["out"].link(sync->inputs["s2"]);

dai::Device device(pipeline);
std::cout << "Start" << std::endl;
auto queue = device.getOutputQueue("xout", 10, true);
while(true) {
auto grp = queue->get<dai::MessageGroup>();
std::cout << "Buffer 1 timestamp: " << grp->get<dai::Buffer>("s1")->getTimestamp().time_since_epoch().count() << std::endl;
std::cout << "Buffer 2 timestamp: " << grp->get<dai::Buffer>("s2")->getTimestamp().time_since_epoch().count() << std::endl;
std::cout << "Time interval between messages: " << static_cast<double>(grp->getIntervalNs()) / 1e6 << "ms" << std::endl;
std::cout << "----------" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
6 changes: 3 additions & 3 deletions include/depthai/device/DeviceBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

// shared
#include "depthai-shared/common/ChipTemperature.hpp"
#include "depthai-shared/common/Connectivity.hpp"
#include "depthai-shared/common/ConnectionInterface.hpp"
#include "depthai-shared/common/CpuUsage.hpp"
#include "depthai-shared/common/MemoryInfo.hpp"
#include "depthai-shared/datatype/RawIMUData.hpp"
Expand Down Expand Up @@ -596,11 +596,11 @@ class DeviceBase {
std::vector<CameraBoardSocket> getConnectedCameras();

/**
* Get connectivity for device
* Get connection interfaces for device
*
* @returns Vector of connection type
*/
std::vector<Connectivity> getConnectionInterfaces();
std::vector<ConnectionInterface> getConnectionInterfaces();

/**
* Get cameras that are connected to the device with their features/properties
Expand Down
79 changes: 79 additions & 0 deletions include/depthai/pipeline/datatype/MessageGroup.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#pragma once

#include <chrono>
#include <memory>
#include <unordered_map>
#include <vector>

#include "depthai-shared/datatype/RawMessageGroup.hpp"
#include "depthai/pipeline/datatype/Buffer.hpp"

namespace dai {

/**
* MessageGroup message. Carries multiple messages in one.
*/
class MessageGroup : public Buffer {
std::shared_ptr<RawBuffer> serialize() const override;
RawMessageGroup& rawGrp;
std::unordered_map<std::string, std::shared_ptr<ADatatype>> group;

public:
/// Construct MessageGroup message
MessageGroup();
explicit MessageGroup(std::shared_ptr<RawMessageGroup> ptr);
virtual ~MessageGroup() = default;

/// Group
std::shared_ptr<ADatatype> operator[](const std::string& name);
template <typename T>
std::shared_ptr<T> get(const std::string& name) {
return std::dynamic_pointer_cast<T>(group[name]);
}
void add(const std::string& name, const std::shared_ptr<ADatatype>& value);
template <typename T>
void add(const std::string& name, const T& value) {
static_assert(std::is_base_of<ADatatype, T>::value, "T must derive from ADatatype");
group[name] = std::make_shared<T>(value);
rawGrp.group[name] = {value.getRaw(), 0};
}

// Iterators
std::unordered_map<std::string, std::shared_ptr<ADatatype>>::iterator begin();
std::unordered_map<std::string, std::shared_ptr<ADatatype>>::iterator end();

/**
* True if all messages in the group are in the interval
* @param thresholdNs Maximal interval between messages
*/
bool isSynced(int64_t thresholdNs) const;

/**
* Retrieves interval between the first and the last message in the group.
*/
int64_t getIntervalNs() const;

int64_t getNumMessages() const;

/**
* Gets the names of messages in the group
*/
std::vector<std::string> getMessageNames() const;

/**
* Sets image timestamp related to dai::Clock::now()
*/
MessageGroup& setTimestamp(std::chrono::time_point<std::chrono::steady_clock, std::chrono::steady_clock::duration> timestamp);

/**
* Sets image timestamp related to dai::Clock::now()
*/
MessageGroup& setTimestampDevice(std::chrono::time_point<std::chrono::steady_clock, std::chrono::steady_clock::duration> timestamp);

/**
* Retrieves image sequence number
*/
MessageGroup& setSequenceNum(int64_t sequenceNum);
};

} // namespace dai
3 changes: 3 additions & 0 deletions include/depthai/pipeline/datatype/StreamMessageParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <XLink/XLinkPublicDefines.h>

// project
#include "depthai-shared/datatype/DatatypeEnum.hpp"
#include "depthai-shared/datatype/RawMessageGroup.hpp"
#include "depthai/pipeline/datatype/ADatatype.hpp"

// shared
Expand All @@ -20,6 +22,7 @@ class StreamMessageParser {
public:
static std::shared_ptr<RawBuffer> parseMessage(streamPacketDesc_t* const packet);
static std::shared_ptr<ADatatype> parseMessageToADatatype(streamPacketDesc_t* const packet);
static std::shared_ptr<ADatatype> parseMessageToADatatype(streamPacketDesc_t* const packet, DatatypeEnum& type);
static std::vector<std::uint8_t> serializeMessage(const std::shared_ptr<const RawBuffer>& data);
static std::vector<std::uint8_t> serializeMessage(const RawBuffer& data);
static std::vector<std::uint8_t> serializeMessage(const std::shared_ptr<const ADatatype>& data);
Expand Down
1 change: 1 addition & 0 deletions include/depthai/pipeline/datatypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "datatype/ImageManipConfig.hpp"
#include "datatype/ImgDetections.hpp"
#include "datatype/ImgFrame.hpp"
#include "datatype/MessageGroup.hpp"
#include "datatype/NNData.hpp"
#include "datatype/SpatialImgDetections.hpp"
#include "datatype/SpatialLocationCalculatorConfig.hpp"
Expand Down
28 changes: 28 additions & 0 deletions include/depthai/pipeline/node/MessageDemux.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include "depthai-shared/properties/MessageDemuxProperties.hpp"
#include "depthai/pipeline/Node.hpp"

namespace dai {
namespace node {

class MessageDemux : public NodeCRTP<Node, MessageDemux, MessageDemuxProperties> {
public:
constexpr static const char* NAME = "MessageDemux";
MessageDemux(const std::shared_ptr<PipelineImpl>& par, int64_t nodeId);

MessageDemux(const std::shared_ptr<PipelineImpl>& par, int64_t nodeId, std::unique_ptr<Properties> props);

/**
* Input message of type MessageGroup
*/
Input input{*this, "input", Input::Type::SReceiver, {{DatatypeEnum::MessageGroup, false}}};

/**
* A map of outputs, where keys are same as in the input MessageGroup
*/
OutputMap outputs;
};

} // namespace node
} // namespace dai
Loading

0 comments on commit e38023c

Please sign in to comment.