From 0bf4792b3aed9f2b49dcc124228c881d4963bc02 Mon Sep 17 00:00:00 2001 From: Geoff Martin Date: Wed, 16 Aug 2023 08:18:35 +0100 Subject: [PATCH] Enable Iceoryx shared memory support in Cyclone DDS (#145) --- .github/workflows/rust.yml | 18 +- Cargo.lock | 105 +++++------ Cargo.toml | 2 +- DEFAULT_CONFIG.json5 | 7 + README.md | 26 +++ zenoh-bridge-dds/Cargo.toml | 3 + zenoh-bridge-dds/src/main.rs | 20 ++- zenoh-plugin-dds/Cargo.toml | 1 + zenoh-plugin-dds/src/config.rs | 3 + zenoh-plugin-dds/src/dds_mgt.rs | 220 +++++++++++++++++++++--- zenoh-plugin-dds/src/lib.rs | 53 ++++-- zenoh-plugin-dds/src/ros_discovery.rs | 4 +- zenoh-plugin-dds/src/route_dds_zenoh.rs | 3 + 13 files changed, 369 insertions(+), 96 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3c26021b..273390ef 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -34,6 +34,10 @@ jobs: steps: - uses: actions/checkout@v2 + - name: Install ACL + if: startsWith(matrix.os,'ubuntu') + run: sudo apt-get -y install libacl1-dev + - name: Install Rust toolchain uses: actions-rs/toolchain@v1 with: @@ -57,11 +61,23 @@ jobs: command: build args: -p zenoh-plugin-dds --verbose --all-targets + - name: Build zenoh-plugin-dds (with dds_shm) + uses: actions-rs/cargo@v1 + with: + command: build + args: -p zenoh-plugin-dds --features dds_shm --verbose --all-targets + - name: Build zenoh-bridge-dds uses: actions-rs/cargo@v1 with: command: build - args: -p zenoh-bridge-dds --verbose --all-targets + args: -p zenoh-bridge-dds --verbose --all-targets + + - name: Build zenoh-bridge-dds (with dds_shm) + uses: actions-rs/cargo@v1 + with: + command: build + args: -p zenoh-bridge-dds --features dds_shm --verbose --all-targets - name: Run tests uses: actions-rs/cargo@v1 diff --git a/Cargo.lock b/Cargo.lock index 34bd6e0b..1ca0d0a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -338,7 +338,7 @@ dependencies = [ "log", "memchr", "once_cell", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "pin-utils", "slab", "wasm-bindgen-futures", @@ -1064,7 +1064,7 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "waker-fn", ] @@ -1104,7 +1104,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "pin-utils", "slab", ] @@ -1325,7 +1325,7 @@ dependencies = [ "cookie", "futures-lite", "infer", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "rand 0.7.3", "serde", "serde_json", @@ -1860,18 +1860,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", @@ -1886,9 +1886,9 @@ checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "2c516611246607d0c04186886dbb3a754368ef82c79e9827a802c6d836dd111c" [[package]] name = "pin-utils" @@ -2021,7 +2021,7 @@ dependencies = [ "concurrent-queue", "libc", "log", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "windows-sys", ] @@ -2064,7 +2064,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75" dependencies = [ "bytes", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "quinn-proto", "quinn-udp", "rustc-hash", @@ -2207,9 +2207,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ "aho-corasick", "memchr", @@ -2219,9 +2219,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ "aho-corasick", "memchr", @@ -2377,9 +2377,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.2" +version = "0.101.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" dependencies = [ "ring", "untrusted", @@ -2462,18 +2462,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", @@ -2782,7 +2782,7 @@ dependencies = [ "async-channel", "cfg-if 1.0.0", "futures-core", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", ] [[package]] @@ -2939,7 +2939,7 @@ dependencies = [ "http-types", "kv-log-macro", "log", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "route-recognizer", "serde", "serde_json", @@ -3030,7 +3030,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "socket2 0.4.9", "tokio-macros", "windows-sys", @@ -3067,7 +3067,7 @@ checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if 1.0.0", "log", - "pin-project-lite 0.2.10", + "pin-project-lite 0.2.11", "tracing-attributes", "tracing-core", ] @@ -3542,7 +3542,7 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "zenoh" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-global-executor", "async-std", @@ -3606,7 +3606,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "zenoh-collections", ] @@ -3614,7 +3614,7 @@ dependencies = [ [[package]] name = "zenoh-cfg-properties" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "zenoh-result", ] @@ -3622,7 +3622,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "uhlc", "zenoh-buffers", @@ -3632,12 +3632,12 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" [[package]] name = "zenoh-config" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "flume", "json5", @@ -3656,7 +3656,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "lazy_static", @@ -3666,7 +3666,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "aes 0.8.3", "hmac 0.12.1", @@ -3679,7 +3679,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "bincode", @@ -3699,7 +3699,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "hashbrown 0.13.2", "keyed-set", @@ -3712,7 +3712,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", @@ -3732,7 +3732,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", @@ -3749,7 +3749,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", @@ -3773,7 +3773,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", @@ -3789,13 +3789,14 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-rustls", "async-std", "async-trait", "futures", "log", + "rustls", "rustls-pemfile", "webpki", "webpki-roots", @@ -3812,7 +3813,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", @@ -3831,7 +3832,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", @@ -3849,7 +3850,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", @@ -3869,7 +3870,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "proc-macro2", "quote", @@ -3911,7 +3912,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-rest" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "anyhow", "async-std", @@ -3938,7 +3939,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "libloading", "log", @@ -3951,7 +3952,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "hex", "rand 0.8.5", @@ -3966,7 +3967,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "anyhow", ] @@ -3974,7 +3975,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "event-listener", @@ -3989,7 +3990,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-executor", "async-global-executor", @@ -4020,7 +4021,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "0.10.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#7e8808b36da441ff1d29e1da2f70b2d83a7a3979" +source = "git+https://github.com/eclipse-zenoh/zenoh?branch=master#2a08f1ab863d9eb2e734ae55fa3c7cc9f47d364e" dependencies = [ "async-std", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index bdc43569..c2108010 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ async-trait = "0.1.66" bincode = "1.3.3" cdr = "0.2.4" clap = "3.2.23" -cyclors = { git = "https://github.com/kydos/cyclors", branch = "master"} +cyclors = { git = "https://github.com/kydos/cyclors", branch = "master" } derivative = "2.2.0" env_logger = "0.10.0" flume = "0.10.14" diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 6cf35909..c46d2d3a 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -27,6 +27,13 @@ //// // localhost_only: true, + //// + //// shm_enabled: If set to true, the DDS implementation will use Iceoryx shared memory. + //// Requires the bridge to be built with the 'dds_shm' feature for this option to valid. + //// By default set to false. + //// + // shm_enabled: false, + //// //// group_member_id: A custom identifier for the bridge, that will be used in group management //// (if not specified, the zenoh UUID is used). diff --git a/README.md b/README.md index 2fe2c04c..e2a5a112 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,30 @@ $ cargo build --release -p zenoh-bridge-dds The **`zenoh-bridge-dds`** binary will be generated in the `target/release` sub-directory. +### Enabling Cyclone DDS Shared Memory Support + +Cyclone DDS Shared memory support is provided by the [Iceoryx library](https://iceoryx.io/). Iceoryx introduces additional system requirements which are documented [here](https://iceoryx.io/v2.0.1/getting-started/installation/#dependencies). + +To build the zenoh bridge for DDS with support for shared memory the `dds_shm` optional feature must be enabled during the build process as follows: +- plugin library: +```bash +$ cargo build --release -p zenoh-plugin-dds --features dds_shm +``` + +- standalone executable binary: +```bash +$ cargo build --release -p zenoh-bridge-dds --features dds_shm +``` + +**Note:** Iceoryx does not need to be installed to build the bridge when the `dds_shm` feature is enabled. Iceoryx will be automatically downloaded, compiled, and statically linked into the zenoh bridge as part of the cargo build process. + +When the zenoh bridge is configured to use DDS shared memory (see [Configuration](#configuration)) the **Iceoryx RouDi daemon (`iox-roudi`)** must be running in order for the bridge to start successfully. If not started the bridge will wait for a period of time for the daemon to become available before timing out and terminating. + +When building the zenoh bridge with the `dds_shm` feature enabled the `iox-roudi` daemon is also built for convenience. The daemon can be found under `target/debug|release/build/cyclors-/out/iceoryx-build/bin/iox-roudi`. + +See [here](https://cyclonedds.io/docs/cyclonedds/latest/shared_memory/shared_memory.html) for more details of shared memory support in Cyclone DDS. + + ### ROS2 package If you're a ROS2 user, you can also build `zenoh-bridge-dds` as a ROS package running: ```bash @@ -247,6 +271,8 @@ The `"dds"` part of this same configuration file can also be used in the configu - **`-d, --domain `** : The DDS Domain ID. By default set to `0`, or to `"$ROS_DOMAIN_ID"` is this environment variable is defined. - **`--dds-localhost-only`** : If set, the DDS discovery and traffic will occur only on the localhost interface (127.0.0.1). By default set to false, unless the "ROS_LOCALHOST_ONLY=1" environment variable is defined. + - **`--dds-enable-shm`** : If set, DDS will be configured to use shared memory. Requires the bridge to be built with the 'dds_shm' feature for this option to valid. + By default set to false. - **`-f, --fwd-discovery`** : When set, rather than creating a local route when discovering a local DDS entity, this discovery info is forwarded to the remote plugins/bridges. Those will create the routes, including a replica of the discovered entity. More details [here](#full-support-of-ros-graph-and-topic-lists-via-the-forward-discovery-mode) - **`-s, --scope `** : A string used as prefix to scope DDS traffic when mapped to zenoh keys. - **`-a, --allow `** : A regular expression matching the set of 'partition/topic-name' that must be routed via zenoh. diff --git a/zenoh-bridge-dds/Cargo.toml b/zenoh-bridge-dds/Cargo.toml index 404af5ea..d744cf2e 100644 --- a/zenoh-bridge-dds/Cargo.toml +++ b/zenoh-bridge-dds/Cargo.toml @@ -22,6 +22,9 @@ license = { workspace = true } categories = ["network-programming"] description = "Zenoh bridge for ROS2 and DDS in general" +[features] +dds_shm = ["zenoh-plugin-dds/dds_shm"] + [dependencies] async-std = { workspace = true, features = ["unstable", "attributes"] } async-liveliness-monitor = { workspace = true } diff --git a/zenoh-bridge-dds/src/main.rs b/zenoh-bridge-dds/src/main.rs index 0d73809f..5d228bb6 100644 --- a/zenoh-bridge-dds/src/main.rs +++ b/zenoh-bridge-dds/src/main.rs @@ -41,7 +41,7 @@ macro_rules! insert_json5 { } fn parse_args() -> (Config, Option) { - let app = App::new("zenoh bridge for DDS") + let mut app = App::new("zenoh bridge for DDS") .version(zenoh_plugin_dds::GIT_VERSION) .long_version(zenoh_plugin_dds::LONG_VERSION.as_str()) // @@ -98,7 +98,19 @@ r#"-d, --domain=[ID] 'The DDS Domain ID. The default value is "$ROS_DOMAIN_ID" r#"--dds-localhost-only \ 'Configure CycloneDDS to use only the localhost interface. If not set, CycloneDDS will pick the interface defined in "$CYCLONEDDS_URI" configuration, or automatically choose one. This option is not active by default, unless the "ROS_LOCALHOST_ONLY" environment variable is set to "1".'"# - )) + )); + + // Add option to enable DDS SHM if feature is enabled + #[cfg(feature = "dds_shm")] + { + app = app.arg(Arg::from_usage( + r#"--dds-enable-shm \ + 'Configure CycloneDDS to use Iceoryx shared memory. If not set, CycloneDDS will instead use any shared memory settings defined in "$CYCLONEDDS_URI" configuration. + This option is not active by default.'"# + )); + } + + app = app .arg(Arg::from_usage( r#"--group-member-id=[ID] 'A custom identifier for the bridge, that will be used in group management (if not specified, the zenoh UUID is used).'"# )) @@ -193,6 +205,10 @@ r#"--watchdog=[PERIOD] 'Experimental!! Run a watchdog thread that monitors the insert_json5!(config, args, "plugins/dds/scope", if "scope",); insert_json5!(config, args, "plugins/dds/domain", if "domain", .parse::().unwrap()); insert_json5!(config, args, "plugins/dds/localhost_only", if "dds-localhost-only"); + #[cfg(feature = "dds_shm")] + { + insert_json5!(config, args, "plugins/dds/shm_enabled", if "dds-enable-shm"); + } insert_json5!(config, args, "plugins/dds/group_member_id", if "group-member-id", ); insert_json5!(config, args, "plugins/dds/allow", for "allow", .collect::>()); insert_json5!(config, args, "plugins/dds/deny", for "deny", .collect::>()); diff --git a/zenoh-plugin-dds/Cargo.toml b/zenoh-plugin-dds/Cargo.toml index 1717b0cb..226082fe 100644 --- a/zenoh-plugin-dds/Cargo.toml +++ b/zenoh-plugin-dds/Cargo.toml @@ -29,6 +29,7 @@ crate-type = ["cdylib", "rlib"] [features] default = ["no_mangle"] no_mangle = ["zenoh-plugin-trait/no_mangle"] +dds_shm = ["cyclors/iceoryx"] [dependencies] async-std = { workspace = true, features = ["unstable", "attributes"] } diff --git a/zenoh-plugin-dds/src/config.rs b/zenoh-plugin-dds/src/config.rs index 30abbab5..62b112ec 100644 --- a/zenoh-plugin-dds/src/config.rs +++ b/zenoh-plugin-dds/src/config.rs @@ -50,6 +50,9 @@ pub struct Config { pub reliable_routes_blocking: bool, #[serde(default = "default_localhost_only")] pub localhost_only: bool, + #[serde(default)] + #[cfg(feature = "dds_shm")] + pub shm_enabled: bool, #[serde( default = "default_queries_timeout", deserialize_with = "deserialize_duration" diff --git a/zenoh-plugin-dds/src/dds_mgt.rs b/zenoh-plugin-dds/src/dds_mgt.rs index 3b3ae0b3..9a785774 100644 --- a/zenoh-plugin-dds/src/dds_mgt.rs +++ b/zenoh-plugin-dds/src/dds_mgt.rs @@ -25,6 +25,8 @@ use std::os::raw; use std::slice; use std::sync::Arc; use std::time::Duration; +#[cfg(feature = "dds_shm")] +use zenoh::buffers::{ZBuf, ZSlice}; use zenoh::prelude::*; use zenoh::publication::CongestionControl; use zenoh::Session; @@ -40,12 +42,37 @@ pub(crate) enum RouteStatus { _QoSConflict, // A route was already established but with conflicting QoS } +#[derive(Debug)] +pub(crate) struct TypeInfo { + ptr: *mut dds_typeinfo_t, +} + +impl TypeInfo { + pub(crate) unsafe fn new(ptr: *const dds_typeinfo_t) -> TypeInfo { + let ptr = ddsi_typeinfo_dup(ptr); + TypeInfo { ptr } + } +} + +impl Drop for TypeInfo { + fn drop(&mut self) { + unsafe { + ddsi_typeinfo_free(self.ptr); + } + } +} + +unsafe impl Send for TypeInfo {} +unsafe impl Sync for TypeInfo {} + #[derive(Debug, Serialize, Deserialize)] pub(crate) struct DdsEntity { pub(crate) key: String, pub(crate) participant_key: String, pub(crate) topic_name: String, pub(crate) type_name: String, + #[serde(skip)] + pub(crate) type_info: Option, pub(crate) keyless: bool, pub(crate) qos: Qos, pub(crate) routes: HashMap, // map of routes statuses indexed by partition ("*" only if no partition) @@ -84,31 +111,96 @@ impl fmt::Display for DiscoveryType { } } +#[cfg(feature = "dds_shm")] +#[derive(Clone, Copy)] +struct IoxChunk { + ptr: *mut std::ffi::c_void, + header: *mut iceoryx_header_t, +} + +#[cfg(feature = "dds_shm")] +impl IoxChunk { + fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.ptr as *const u8, (*self.header).data_size as usize) } + } +} + pub(crate) struct DDSRawSample { sdref: *mut ddsi_serdata, data: ddsrt_iovec_t, + #[cfg(feature = "dds_shm")] + iox_chunk: Option, } impl DDSRawSample { - pub(crate) fn create(serdata: *const ddsi_serdata) -> DDSRawSample { - unsafe { - let mut data = ddsrt_iovec_t { - iov_base: std::ptr::null_mut(), - iov_len: 0, + pub(crate) unsafe fn create(serdata: *const ddsi_serdata) -> DDSRawSample { + let mut data = ddsrt_iovec_t { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; + + let size = ddsi_serdata_size(serdata); + let sdref = ddsi_serdata_to_ser_ref(serdata, 0, size as usize, &mut data); + + #[cfg(feature = "dds_shm")] + { + let iox_chunk_ptr = (*serdata).iox_chunk; + let iox_chunk = match iox_chunk_ptr.is_null() { + false => { + let header = iceoryx_header_from_chunk(iox_chunk_ptr); + Some(IoxChunk { + ptr: iox_chunk_ptr, + header, + }) + } + true => None, }; - let size = ddsi_serdata_size(serdata); - let sdref = ddsi_serdata_to_ser_ref(serdata, 0, size as usize, &mut data); - + DDSRawSample { + sdref, + data, + iox_chunk, + } + } + #[cfg(not(feature = "dds_shm"))] + { DDSRawSample { sdref, data } } } - pub(crate) fn as_slice(&self) -> &[u8] { + fn data_as_slice(&self) -> &[u8] { unsafe { slice::from_raw_parts(self.data.iov_base as *const u8, self.data.iov_len as usize) } } + + pub(crate) fn payload_as_slice(&self) -> &[u8] { + unsafe { + #[cfg(feature = "dds_shm")] + { + if let Some(iox_chunk) = self.iox_chunk.as_ref() { + return iox_chunk.as_slice(); + } + } + &slice::from_raw_parts(self.data.iov_base as *const u8, self.data.iov_len as usize)[4..] + } + } + + pub(crate) fn hex_encode(&self) -> String { + let mut encoded = String::new(); + let data_encoded = hex::encode(self.data_as_slice()); + encoded.push_str(data_encoded.as_str()); + + #[cfg(feature = "dds_shm")] + { + if let Some(iox_chunk) = self.iox_chunk.as_ref() { + let iox_encoded = hex::encode(iox_chunk.as_slice()); + encoded.push_str(iox_encoded.as_str()); + } + } + + encoded + } } impl Drop for DDSRawSample { @@ -119,6 +211,42 @@ impl Drop for DDSRawSample { } } +impl fmt::Debug for DDSRawSample { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + #[cfg(feature = "dds_shm")] + { + // Where data was received via Iceoryx write both the header (contained in buf.data) and + // payload (contained in buf.iox_chunk) to the formatter. + if let Some(iox_chunk) = self.iox_chunk { + return write!( + f, + "[{:02x?}, {:02x?}]", + self.data_as_slice(), + iox_chunk.as_slice() + ); + } + } + write!(f, "{:02x?}", self.data_as_slice()) + } +} + +impl From for Value { + fn from(buf: DDSRawSample) -> Self { + #[cfg(feature = "dds_shm")] + { + // Where data was received via Iceoryx return both the header (contained in buf.data) and + // payload (contained in buf.iox_chunk) in a buffer. + if let Some(iox_chunk) = buf.iox_chunk { + let mut zbuf = ZBuf::default(); + zbuf.push_zslice(ZSlice::from(buf.data_as_slice().to_vec())); + zbuf.push_zslice(ZSlice::from(iox_chunk.as_slice().to_vec())); + return zbuf.into(); + } + } + buf.data_as_slice().into() + } +} + unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) { let btx = Box::from_raw(arg as *mut (DiscoveryType, Sender)); let discovery_type = btx.0; @@ -184,6 +312,28 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) { discovery_type, key, participant_key, topic_name, type_name, keyless ); + let mut type_info: *const dds_typeinfo_t = std::ptr::null(); + let ret = dds_builtintopic_get_endpoint_type_info(sample, &mut type_info); + + let type_info = match ret { + 0 => match type_info.is_null() { + false => Some(TypeInfo::new(type_info)), + true => { + debug!("Type information not available for type {}", type_name); + None + } + }, + _ => { + warn!( + "Failed to lookup type information({})", + CStr::from_ptr(dds_strretcode(ret)) + .to_str() + .unwrap_or("unrecoverable DDS retcode") + ); + None + } + }; + // send a DiscoveryEvent let entity = DdsEntity { key: key.clone(), @@ -191,6 +341,7 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) { topic_name: String::from(topic_name), type_name: String::from(type_name), keyless, + type_info, qos: Qos::from_qos_native((*sample).qos), routes: HashMap::::new(), }; @@ -310,21 +461,20 @@ unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os let si = si.assume_init(); if si[0].valid_data { let raw_sample = DDSRawSample::create(zp); - let data_in_slice = raw_sample.as_slice(); if *crate::LOG_PAYLOAD { log::trace!( "Route data from DDS {} to zenoh key={} - payload: {:02x?}", &(*pa).0, &(*pa).1, - data_in_slice + raw_sample ); } else { log::trace!("Route data from DDS {} to zenoh key={}", &(*pa).0, &(*pa).1); } let _ = (*pa) .2 - .put(&(*pa).1, data_in_slice) + .put(&(*pa).1, raw_sample) .congestion_control((*pa).3) .res_sync(); } @@ -333,10 +483,11 @@ unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os } #[allow(clippy::too_many_arguments)] -pub fn create_forwarding_dds_reader( +pub(crate) fn create_forwarding_dds_reader( dp: dds_entity_t, topic_name: String, type_name: String, + type_info: &Option, keyless: bool, mut qos: Qos, z_key: KeyExpr, @@ -344,11 +495,8 @@ pub fn create_forwarding_dds_reader( read_period: Option, congestion_ctrl: CongestionControl, ) -> Result { - let cton = CString::new(topic_name.clone()).unwrap().into_raw(); - let ctyn = CString::new(type_name).unwrap().into_raw(); - unsafe { - let t = cdds_create_blob_topic(dp, cton, ctyn, keyless); + let t = create_topic(dp, &topic_name, &type_name, type_info, keyless); match read_period { None => { @@ -421,10 +569,9 @@ pub fn create_forwarding_dds_reader( ); let raw_sample = DDSRawSample::create(zp); - let data_in_slice = raw_sample.as_slice(); let _ = z - .put(&z_key, data_in_slice) + .put(&z_key, raw_sample) .congestion_control(congestion_ctrl) .res_sync(); } @@ -438,6 +585,39 @@ pub fn create_forwarding_dds_reader( } } +unsafe fn create_topic( + dp: dds_entity_t, + topic_name: &str, + type_name: &str, + type_info: &Option, + keyless: bool, +) -> dds_entity_t { + let cton = CString::new(topic_name.to_owned()).unwrap().into_raw(); + let ctyn = CString::new(type_name.to_owned()).unwrap().into_raw(); + + match type_info { + None => cdds_create_blob_topic(dp, cton, ctyn, keyless), + Some(type_info) => { + let mut descriptor: *mut dds_topic_descriptor_t = std::ptr::null_mut(); + + let ret = dds_create_topic_descriptor( + dds_find_scope_DDS_FIND_SCOPE_GLOBAL, + dp, + type_info.ptr, + 500000000, + &mut descriptor, + ); + let mut topic: dds_entity_t = 0; + if ret == (DDS_RETCODE_OK as i32) { + topic = dds_create_topic(dp, descriptor, cton, std::ptr::null(), std::ptr::null()); + assert!(topic >= 0); + dds_delete_topic_descriptor(descriptor); + } + topic + } + } +} + pub fn create_forwarding_dds_writer( dp: dds_entity_t, topic_name: String, @@ -451,7 +631,7 @@ pub fn create_forwarding_dds_writer( unsafe { let t = cdds_create_blob_topic(dp, cton, ctyn, keyless); let qos_native = qos.to_qos_native(); - let writer = dds_create_writer(dp, t, qos_native, std::ptr::null_mut()); + let writer: i32 = dds_create_writer(dp, t, qos_native, std::ptr::null_mut()); Qos::delete_qos_native(qos_native); if writer >= 0 { Ok(writer) diff --git a/zenoh-plugin-dds/src/lib.rs b/zenoh-plugin-dds/src/lib.rs index efacf8fb..705786e8 100644 --- a/zenoh-plugin-dds/src/lib.rs +++ b/zenoh-plugin-dds/src/lib.rs @@ -95,6 +95,10 @@ lazy_static::lazy_static!( // Empty configuration fragments are ignored, so it is safe to unconditionally append a comma. const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#","#; +// CycloneDDS' enable-shm: enable usage of Iceoryx shared memory +#[cfg(feature = "dds_shm")] +const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"true,"#; + const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 500; zenoh_plugin_trait::declare_plugin!(DDSPlugin); @@ -202,6 +206,24 @@ pub async fn run(runtime: Runtime, config: Config) { ); } + // if "enable_shm" is set, configure CycloneDDS to use Iceoryx shared memory + #[cfg(feature = "dds_shm")] + { + if config.shm_enabled { + env::set_var( + "CYCLONEDDS_URI", + format!( + "{}{}", + CYCLONEDDS_CONFIG_ENABLE_SHM, + env::var("CYCLONEDDS_URI").unwrap_or_default() + ), + ); + if config.forward_discovery { + warn!("DDS shared memory support enabled but will not be used as forward discovery mode is active."); + } + } + } + // create DDS Participant debug!( "Create DDS Participant with CYCLONEDDS_URI='{}'", @@ -434,11 +456,13 @@ impl<'a> DdsPluginRuntime<'a> { self.routes_to_dds.insert(ke, r); } + #[allow(clippy::too_many_arguments)] async fn try_add_route_from_dds( &mut self, ke: OwnedKeyExpr, topic_name: &str, topic_type: &str, + type_info: &Option, keyless: bool, reader_qos: Qos, congestion_ctrl: CongestionControl, @@ -465,6 +489,7 @@ impl<'a> DdsPluginRuntime<'a> { self, topic_name.into(), topic_type.into(), + type_info, keyless, reader_qos, ke.clone(), @@ -749,7 +774,7 @@ impl<'a> DdsPluginRuntime<'a> { // create 1 route per partition, or just 1 if no partition if partition_is_empty(&entity.qos.partition) { let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap(); - let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, qos, congestion_ctrl).await; + let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await; if let RouteStatus::Routed(ref route_key) = route_status { if let Some(r) = self.routes_from_dds.get_mut(route_key) { // add Writer's key to the route @@ -760,7 +785,7 @@ impl<'a> DdsPluginRuntime<'a> { } else { for p in entity.qos.partition.as_deref().unwrap() { let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap(); - let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, qos.clone(), congestion_ctrl).await; + let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos.clone(), congestion_ctrl).await; if let RouteStatus::Routed(ref route_key) = route_status { if let Some(r) = self.routes_from_dds.get_mut(route_key) { // if route has been created, add this Writer in its routed_writers list @@ -1247,7 +1272,7 @@ impl<'a> DdsPluginRuntime<'a> { // create 1 'from_dds" route per partition, or just 1 if no partition if partition_is_empty(&entity.qos.partition) { let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap(); - let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, qos, congestion_ctrl).await; + let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await; if let RouteStatus::Routed(ref route_key) = route_status { if let Some(r) = self.routes_from_dds.get_mut(route_key) { // add the reader's admin keyexpr to the list of remote_routed_writers @@ -1264,7 +1289,7 @@ impl<'a> DdsPluginRuntime<'a> { } else { for p in &entity.qos.partition.unwrap() { let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap(); - let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, qos.clone(), congestion_ctrl).await; + let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos.clone(), congestion_ctrl).await; if let RouteStatus::Routed(ref route_key) = route_status { if let Some(r) = self.routes_from_dds.get_mut(route_key) { // add the reader's admin keyexpr to the list of remote_routed_writers @@ -1431,10 +1456,10 @@ impl<'a> DdsPluginRuntime<'a> { _ = ros_disco_timer_rcv.recv_async() => { let infos = ros_disco_mgr.read(); for (gid, buf) in infos { - trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, hex::encode(buf.as_slice())); + trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, buf.hex_encode()); // forward the payload on zenoh let ke = &fwd_ros_discovery_key_declared / ke_for_sure!(&gid); - if let Err(e) = self.zsession.put(ke, buf.as_slice()).res_sync() { + if let Err(e) = self.zsession.put(ke, buf).res_sync() { error!("Forward ROS discovery info failed: {}", e); } } @@ -1586,11 +1611,7 @@ fn adapt_writer_qos_for_reader(qos: &Qos) -> Qos { // Unset proprietary QoS which shouldn't apply reader_qos.properties = None; reader_qos.entity_name = None; - - // Ignore own messages - reader_qos.ignore_local = Some(IgnoreLocal { - kind: IgnoreLocalKind::PARTICIPANT, - }); + reader_qos.ignore_local = None; // Set default Reliability QoS if not set for writer if reader_qos.reliability.is_none() { @@ -1613,7 +1634,7 @@ fn adapt_writer_qos_for_proxy_writer(qos: &Qos) -> Qos { writer_qos.properties = None; writer_qos.entity_name = None; - // Ignore own messages + // Don't match with readers with the same participant writer_qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT, }); @@ -1631,7 +1652,7 @@ fn adapt_reader_qos_for_writer(qos: &Qos) -> Qos { writer_qos.properties = None; writer_qos.entity_name = None; - // Ignore own messages + // Don't match with readers with the same participant writer_qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT, }); @@ -1676,11 +1697,7 @@ fn adapt_reader_qos_for_proxy_reader(qos: &Qos) -> Qos { // Unset proprietary QoS which shouldn't apply reader_qos.properties = None; reader_qos.entity_name = None; - - // Ignore own messages - reader_qos.ignore_local = Some(IgnoreLocal { - kind: IgnoreLocalKind::PARTICIPANT, - }); + reader_qos.ignore_local = None; reader_qos } diff --git a/zenoh-plugin-dds/src/ros_discovery.rs b/zenoh-plugin-dds/src/ros_discovery.rs index 5b692bd2..3a30ee6a 100644 --- a/zenoh-plugin-dds/src/ros_discovery.rs +++ b/zenoh-plugin-dds/src/ros_discovery.rs @@ -150,8 +150,8 @@ impl RosDiscoveryInfoMgr { if si[0].valid_data { let raw_sample = DDSRawSample::create(zp); - // No need to deserialize the full payload. Just read the Participant gid (16 bytes after the 4 bytes of CDR header) - let gid = hex::encode(&raw_sample.as_slice()[4..20]); + // No need to deserialize the full payload. Just read the Participant gid (first 16 bytes of the payload) + let gid = hex::encode(&raw_sample.payload_as_slice()[0..16]); result.insert(gid, raw_sample); } diff --git a/zenoh-plugin-dds/src/route_dds_zenoh.rs b/zenoh-plugin-dds/src/route_dds_zenoh.rs index 208445e0..d0aa05a1 100644 --- a/zenoh-plugin-dds/src/route_dds_zenoh.rs +++ b/zenoh-plugin-dds/src/route_dds_zenoh.rs @@ -78,10 +78,12 @@ impl fmt::Display for RouteDDSZenoh<'_> { } impl RouteDDSZenoh<'_> { + #[allow(clippy::too_many_arguments)] pub(crate) async fn new<'a>( plugin: &DdsPluginRuntime<'a>, topic_name: String, topic_type: String, + type_info: &Option, keyless: bool, reader_qos: Qos, ke: OwnedKeyExpr, @@ -172,6 +174,7 @@ impl RouteDDSZenoh<'_> { plugin.dp, topic_name.clone(), topic_type.clone(), + type_info, keyless, reader_qos, declared_ke,