diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index abc2e0be..eab37fca 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -49,3 +49,5 @@ install( COMPONENT dev) add_subdirectory(roundtrip) + +add_subdirectory(hop) diff --git a/examples/hop/CMakeLists.txt b/examples/hop/CMakeLists.txt new file mode 100644 index 00000000..9a00ed0a --- /dev/null +++ b/examples/hop/CMakeLists.txt @@ -0,0 +1,24 @@ +# +# Copyright(c) 2024 ZettaScale Technology and others +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +# v. 1.0 which is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +# +project(helloworld LANGUAGES C CXX) +cmake_minimum_required(VERSION 3.16) + +set(CMAKE_CXX_STANDARD 17) + +find_package(CycloneDDS REQUIRED) +if(NOT TARGET CycloneDDS-CXX::ddscxx) + find_package(CycloneDDS-CXX REQUIRED) +endif() + +idlcxx_generate(TARGET hop_type FILES hop_type.idl) +add_executable(hop hop.cpp) +target_link_libraries(hop CycloneDDS-CXX::ddscxx hop_type) diff --git a/examples/hop/hop.cpp b/examples/hop/hop.cpp new file mode 100644 index 00000000..5c5cd7bb --- /dev/null +++ b/examples/hop/hop.cpp @@ -0,0 +1,355 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "dds/dds.hpp" +#include "hop_type.hpp" + +using namespace org::eclipse::cyclonedds; +using namespace std::chrono_literals; + +static bool use_listener = true; +static double junkrate = 0.0; + +template +static dds::core::Time mkDDSTime (const std::chrono::time_point x) +{ + int64_t t = std::chrono::duration_cast(x.time_since_epoch()).count(); + return dds::core::Time(t / 1000000000, static_cast(t % 1000000000)); +} + +static volatile std::atomic interrupted = false; +static void sigh(int sig) +{ + static_cast(sig); + interrupted = true; +} + +static const dds::sub::status::DataState not_read() +{ + return dds::sub::status::DataState(dds::sub::status::SampleState::not_read(), + dds::sub::status::ViewState::any(), + dds::sub::status::InstanceState::any()); +} + +template +static dds::sub::DataReader make_reader(dds::topic::Topic tp, int stage) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector spart{"P" + std::to_string(stage)}; + dds::sub::qos::SubscriberQos sqos = dp.default_subscriber_qos() << dds::core::policy::Partition(spart); + dds::sub::Subscriber sub{dp, sqos}; + return dds::sub::DataReader{sub, tp, tp.qos()}; +} + +template +static dds::pub::DataWriter make_writer(dds::topic::Topic tp, int stage) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector ppart{"P" + std::to_string(stage)}; + dds::pub::qos::PublisherQos pqos = dp.default_publisher_qos() << dds::core::policy::Partition(ppart); + dds::pub::Publisher pub{dp, pqos}; + return dds::pub::DataWriter{pub, tp, tp.qos()}; +} + +// to be run on a separate thread +template +static void junksource(dds::topic::Topic tp) +{ + std::random_device ran_dev; + std::mt19937 prng(ran_dev()); + //std::uniform_int_distribution<> wrdist(0, tps.size() - 1); + std::exponential_distribution intvdist(junkrate); + std::vector> wrs; + wrs.push_back(make_writer(tp, -1)); + T sample{}; + auto now = std::chrono::high_resolution_clock::now(); + while (!interrupted) + { + //wrs[wrdist(prng)] << sample; + wrs[0] << sample; + ++sample.seq(); + auto delay = std::chrono::duration(intvdist(prng)); + if (delay > 1s) + delay = 1s; + now += std::chrono::duration_cast(delay); + std::this_thread::sleep_until(now); + } + std::cout << "wrote " << sample.seq() << " junk samples" << std::endl; +} + +template +static dds::sub::DataReader make_junkreader(dds::topic::Topic tp) +{ + return make_reader(tp, -1); +} + +template +static void source(dds::topic::Topic tp, int stage, const std::optional) +{ + auto wr = make_writer(tp, stage); + signal(SIGINT, sigh); + T sample{}; + auto now = std::chrono::high_resolution_clock::now(); + // give forwarders and sink time to start & discovery to run + std::cout << "starting in 1s" << std::endl; + now += 1s; + std::this_thread::sleep_until(now); + while (!interrupted) + { + wr.write(sample, mkDDSTime(now)); + ++sample.seq(); + now += 10ms; + std::this_thread::sleep_until(now); + } + std::cout << "wrote " << sample.seq() << " samples" << std::endl; +} + +template +static void run_reader(dds::sub::DataReaderListener *list, dds::sub::DataReader rd, std::function action) +{ + if (use_listener) + { + rd.listener(list, dds::core::status::StatusMask::data_available()); + while (!interrupted) + std::this_thread::sleep_for(103ms); + } + else + { + dds::core::cond::WaitSet ws; + dds::sub::cond::ReadCondition rc{rd, not_read()}; + ws += rc; + while (!interrupted) + { + ws.wait(); + action(); + } + } +} + +template +class Forward : public dds::sub::NoOpDataReaderListener { +public: + Forward() = delete; + Forward(dds::sub::DataReader rd, dds::pub::DataWriter wr) : rd_{rd}, wr_{wr} { } + + void run() + { + run_reader(this, rd_, [this](){action();}); + } + +private: + void action() + { + auto xs = rd_.take(); + for (const auto& x : xs) { + if (x.info().valid()) { + wr_.write (x.data(), x.info().timestamp()); + } else { + interrupted = true; + } + }; + } + + void on_data_available(dds::sub::DataReader&) + { + action(); + } + + dds::sub::DataReader rd_; + dds::pub::DataWriter wr_; +}; + +template +static void forward(dds::topic::Topic tp, int stage, const std::optional) +{ + auto rd = make_reader(tp, stage); + auto wr = make_writer(tp, stage + 1); + Forward x{rd, wr}; + x.run(); +} + +template +class Sink : public dds::sub::NoOpDataReaderListener { +public: + Sink() = delete; + Sink(dds::sub::DataReader rd, std::vector& lats) : rd_{rd}, lats_{lats} { } + + void run() + { + run_reader(this, rd_, [this](){action();}); + } + +private: + void action() + { + const auto now = std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + auto xs = rd_.take(); + for (const auto& x : xs) { + if (x.info().valid()) { + const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec()); + lats_.push_back(lat / 1e3); + } else { + interrupted = true; + } + }; + } + + void on_data_available(dds::sub::DataReader&) + { + action(); + } + + dds::sub::DataReader rd_; + std::vector& lats_; +}; + +template +static void sink(dds::topic::Topic tp, int stage, const std::optional datafile) +{ + // latencies in microseconds + std::vector lats; + // read until source disappears + // always create the "junk reader": it costs us nothing if no junk data is being published + { + auto rd = make_reader(tp, stage); + Sink x{rd, lats}; + x.run(); + } + // destructors will have run, latencies are ours now + if (datafile.has_value()) + { + std::ofstream f; + f.open(datafile.value()); + for (const auto l : lats) + f << l << std::endl; + f.close(); + } + const size_t n = lats.size(); + if (n < 2) { + std::cout << "insufficient data" << std::endl; + } else { + std::sort(lats.begin(), lats.end()); + std::cout << "received " << n << " samples; min " << lats[0] << " max-1 " << lats[n-2] << " max " << lats[n-1] << std::endl; + } +} + +enum class Mode { Source, Forward, Sink }; + +template +static void run(const Mode mode, int stage, const std::optional datafile) +{ + dds::domain::DomainParticipant dp{0}; + auto tpqos = dp.default_topic_qos() + << dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite()) + << dds::core::policy::History::KeepLast(1); + dds::topic::Topic tp(dp, "Hop", tpqos); + std::thread junkthr; + if (junkrate > 0) + junkthr = std::thread(junksource, tp); + auto junkrd = make_junkreader(tp); + switch (mode) + { + case Mode::Source: source(tp, stage, datafile); break; + case Mode::Forward: forward(tp, stage, datafile); break; + case Mode::Sink: sink(tp, stage, datafile); break; + } + if (junkthr.joinable()) + junkthr.join(); +} + +// type=128 n=1 bash -c 'bin/hop sink -ohop-result.$n.txt $n $type & i=0;while [[ i -lt n ]]; do bin/hop forward $i $type & i=$((i+1)) ; done ; bin/hop source 0 $type' +// for n in {8..10} ; do n=$n type=128 bash -c 'bin/hop sink -ohop-result.$n.txt $n $type & i=0;while [[ i -lt n ]]; do bin/hop forward $i $type & i=$((i+1)) ; done ; bin/hop source 0 $type & p=$! ; sleep 10 ; kill -INT $p ; wait' ; done + +[[noreturn]] +static void usage() +{ + std::cout + << "usage: hop {source|forward|sink} [OPTIONS] STAGE TYPE" << std::endl + << "OPTIONS:" << std::endl + << "-jRATE write junk at RATE Hz" << std::endl + << "-w: use waitset instead of listener (forward, sink)" << std::endl + << "-oFILE write latencies to FILE (sink)" << std::endl + << "TYPE: one of 8, 128, 1k, 8k, 128k" << std::endl; + std::exit(1); +} + +int main (int argc, char **argv) +{ + if (argc < 2) + usage(); + const std::string modestr = std::string(argv[1]); + Mode mode; + if (modestr == "source") { + mode = Mode::Source; + } else if (modestr == "forward") { + mode = Mode::Forward; + } else if (modestr == "sink") { + mode = Mode::Sink; + } else { + std::cout << "invalid mode, should be source, forward or sink" << std::endl; + return 1; + } + + std::optional datafile; + optind = 2; + int opt; + while ((opt = getopt (argc, argv, "j:o:w")) != EOF) + { + switch (opt) + { + case 'j': + junkrate = std::atof(optarg); + break; + case 'o': + datafile = std::string(optarg); + break; + case 'w': + use_listener = false; + break; + default: + usage(); + } + } + + if (argc - optind != 2) + usage(); + const int stage = std::atoi(argv[optind]); + const std::string typestr = std::string(argv[optind + 1]); + if (typestr == "8") { + run(mode, stage, datafile); + } else if (typestr == "128") { + run(mode, stage, datafile); + } else if (typestr == "1k") { + run(mode, stage, datafile); + } else if (typestr == "8k") { + run(mode, stage, datafile); + } else if (typestr == "128k") { + run(mode, stage, datafile); + } else { + std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl; + return 1; + } + return 0; +} diff --git a/examples/hop/hop_type.idl b/examples/hop/hop_type.idl new file mode 100644 index 00000000..004abf96 --- /dev/null +++ b/examples/hop/hop_type.idl @@ -0,0 +1,37 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +@final @topic +struct Hop8 { + uint32 seq; + octet z[8 - 4]; +}; +@final @topic +struct Hop128 { + uint32 seq; + octet z[128 - 4]; +}; +@final @topic +struct Hop1k { + uint32 seq; + octet z[1024 - 4]; +}; +@final @topic +struct Hop8k { + uint32 seq; + octet z[8*1024 - 4]; +}; +@final @topic +struct Hop128k { + uint32 seq; + octet z[128*1024 - 4]; +};