diff --git a/Cargo.toml b/Cargo.toml index d957d5b..17763fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,6 @@ resolver = "2" members = ["zrpc", "zrpc-derive"] -exclude = ["zenoh-typed", "zrpc-perf"] - [workspace.package] authors = ["gabrik "] categories = ["network-programming"] @@ -27,7 +25,7 @@ homepage = "https://github.com/ZettaScaleLabs/zenoh-rpc" license = " EPL-2.0 OR Apache-2.0" readme = "README.md" repository = "https://github.com/ZettaScaleLabs/zenoh-rpc" -version = "0.8.6-dev" +version = "0.8.6" [profile.release] codegen-units = 1 @@ -62,11 +60,11 @@ tokio = { version = "1.35.1", default-features = false, features = [ "time", ] } tracing = "0.1" -zenoh = { version = "1.0.3", default-features = false } -zenoh-codec = { version = "1.0.3" } -zenoh-core = { version = "1.0.3" } -zenoh-ext = { version = "1.0.3" } -zenoh-macros = { version = "1.0.3" } -zenoh-protocol = { version = "1.0.3" } -zenoh-util = { version = "1.0.3" } -zrpc = { version = "0.8.6-dev", path = "./zrpc" } +zenoh = { version = "1.0.4", default-features = false } +zenoh-codec = { version = "1.0.4" } +zenoh-core = { version = "1.0.4" } +zenoh-ext = { version = "1.0.4" } +zenoh-macros = { version = "1.0.4" } +zenoh-protocol = { version = "1.0.4" } +zenoh-util = { version = "1.0.4" } +zrpc = { version = "0.8.6", path = "./zrpc" } diff --git a/zenoh-typed/Cargo.toml b/zenoh-typed/Cargo.toml deleted file mode 100644 index 41563d8..0000000 --- a/zenoh-typed/Cargo.toml +++ /dev/null @@ -1,52 +0,0 @@ -[package] -authors.workspace = true -categories.workspace = true -description.workspace = true -edition.workspace = true -homepage.workspace = true -license.workspace = true -name = "zenoh-typed" -readme.workspace = true -repository.workspace = true -version.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -base64 = { workspace = true } -bincode = { workspace = true, optional = true } -flume = { workspace = true } -futures = { workspace = true } -log = { workspace = true } -serde = { workspace = true } -serde_cbor = { workspace = true } -serde_derive = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true, features = ["io-std","io-util", "rt-multi-thread"]} -zenoh = { workspace = true } -zenoh-codec = { workspace = true } -zenoh-core = { workspace = true } -zenoh-macros = { workspace = true } -zenoh-protocol = { workspace = true } -zenoh-util = { workspace = true } - -[dev-dependencies] -clap = { workspace = true } -env_logger = { workspace = true } - -[features] -#zenoh feature re-exports -auth_pubkey = ["zenoh/auth_pubkey"] -auth_usrpwd = ["zenoh/auth_usrpwd"] -shared-memory = ["zenoh/shared-memory"] -stats = ["zenoh/stats"] -transport_quic = ["zenoh/transport_quic"] -transport_tcp = ["zenoh/transport_tcp"] -transport_tls = ["zenoh/transport_tls"] -transport_udp = ["zenoh/transport_udp"] -transport_unixsock-stream = ["zenoh/transport_unixsock-stream"] -transport_ws = ["zenoh/transport_ws"] - -query_payload = ["zenoh/unstable"] - -default = ["query_payload", "transport_tcp", "transport_udp"] diff --git a/zenoh-typed/examples/z_pub.rs b/zenoh-typed/examples/z_pub.rs deleted file mode 100644 index 5901db0..0000000 --- a/zenoh-typed/examples/z_pub.rs +++ /dev/null @@ -1,157 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use tokio::time::sleep; -use clap::Parser; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use zenoh::config::Config; -// use zenoh::payload::Serialize; -use zenoh::prelude::r#async::*; -use zenoh_typed::prelude::*; - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct MyData { - pub name: String, - pub id: u64, -} - -#[tokio::main] -async fn main() { - // Initiate logging - env_logger::init(); - - let (config, key_expr, value, _attachment) = parse_args(); - - println!("Opening session..."); - let session = zenoh::open(config).res().await.unwrap(); - let session = SerdeSession::new(session, CBOR); - - println!("Declaring Publisher on '{key_expr}'..."); - let publisher = session - .declare_publisher::<_, MyData>(key_expr) - .res() - .await - .unwrap(); - - for idx in 0..u32::MAX { - sleep(Duration::from_secs(1)).await; - let value: MyData = MyData { - name: value.clone(), - id: idx as u64, - }; - println!("Putting Data ('{value:?}')..."); - - publisher.put(value).res().await.unwrap(); - - // TypedSession::::put(&session, &key_expr, &value) - // .res() - // .await - // .unwrap(); - } -} - -#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] -struct Args { - #[arg(short, long, default_value = "demo/example/zenoh-rs-pub")] - /// The key expression to write to. - key: KeyExpr<'static>, - #[arg(short, long, default_value = "Pub from Rust!")] - /// The value to write. - value: String, - #[arg(short, long)] - /// The attachments to add to each put. - /// - /// The key-value pairs are &-separated, and = serves as the separator between key and value. - attach: Option, - #[command(flatten)] - common: CommonArgs, -} - -fn parse_args() -> (Config, KeyExpr<'static>, String, Option) { - let args = Args::parse(); - (args.common.into(), args.key, args.value, args.attach) -} - -#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub enum Wai { - Peer, - Client, - Router, -} -impl core::fmt::Display for Wai { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - core::fmt::Debug::fmt(&self, f) - } -} -#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] -pub struct CommonArgs { - #[arg(short, long)] - /// A configuration file. - config: Option, - #[arg(short, long)] - /// The Zenoh session mode [default: peer]. - mode: Option, - #[arg(short = 'e', long)] - /// Endpoints to connect to. - connect: Vec, - #[arg(short, long)] - /// Endpoints to listen on. - listen: Vec, - #[arg(long)] - /// Disable the multicast-based scouting mechanism. - no_multicast_scouting: bool, - #[arg(long)] - /// Disable the multicast-based scouting mechanism. - enable_shm: bool, -} - -impl From for Config { - fn from(value: CommonArgs) -> Self { - (&value).into() - } -} -impl From<&CommonArgs> for Config { - fn from(value: &CommonArgs) -> Self { - let mut config = match &value.config { - Some(path) => Config::from_file(path).unwrap(), - None => Config::default(), - }; - match value.mode { - Some(Wai::Peer) => config.set_mode(Some(zenoh::scouting::WhatAmI::Peer)), - Some(Wai::Client) => config.set_mode(Some(zenoh::scouting::WhatAmI::Client)), - Some(Wai::Router) => config.set_mode(Some(zenoh::scouting::WhatAmI::Router)), - None => Ok(None), - } - .unwrap(); - if !value.connect.is_empty() { - config.connect.endpoints = value.connect.iter().map(|v| v.parse().unwrap()).collect(); - } - if !value.listen.is_empty() { - config.listen.endpoints = value.listen.iter().map(|v| v.parse().unwrap()).collect(); - } - if value.no_multicast_scouting { - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - } - if value.enable_shm { - #[cfg(feature = "shared-memory")] - config.transport.shared_memory.set_enabled(true).unwrap(); - #[cfg(not(feature = "shared-memory"))] - { - println!("enable-shm argument: SHM cannot be enabled, because Zenoh is compiled without shared-memory feature!"); - std::process::exit(-1); - } - } - config - } -} diff --git a/zenoh-typed/examples/z_put.rs b/zenoh-typed/examples/z_put.rs deleted file mode 100644 index 2e9d089..0000000 --- a/zenoh-typed/examples/z_put.rs +++ /dev/null @@ -1,154 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use tokio::time::sleep; -use clap::Parser; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use zenoh::config::Config; -// use zenoh::payload::Serialize; -use zenoh::prelude::r#async::*; -use zenoh_typed::prelude::*; - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct MyData { - pub name: String, - pub id: u64, -} - - -#[tokio::main] -async fn main() { - // Initiate logging - env_logger::init(); - - let (config, key_expr, value, _attachment) = parse_args(); - - println!("Opening session..."); - let session = zenoh::open(config).res().await.unwrap(); - let session = SerdeSession::new(session, CBOR); - - println!("Declaring Publisher on '{key_expr}'..."); - // let publisher = session.declare_publisher(&key_expr).res().await.unwrap(); - - for idx in 0..u32::MAX { - sleep(Duration::from_secs(1)).await; - let value: MyData = MyData { - name: value.clone(), - id: idx as u64, - }; - println!("Putting Data ('{key_expr}': '{value:?}')..."); - - session.put(&key_expr, value).res().await.unwrap(); - - // TypedSession::::put(&session, &key_expr, &value) - // .res() - // .await - // .unwrap(); - } -} - -#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] -struct Args { - #[arg(short, long, default_value = "demo/example/zenoh-rs-pub")] - /// The key expression to write to. - key: KeyExpr<'static>, - #[arg(short, long, default_value = "Pub from Rust!")] - /// The value to write. - value: String, - #[arg(short, long)] - /// The attachments to add to each put. - /// - /// The key-value pairs are &-separated, and = serves as the separator between key and value. - attach: Option, - #[command(flatten)] - common: CommonArgs, -} - -fn parse_args() -> (Config, KeyExpr<'static>, String, Option) { - let args = Args::parse(); - (args.common.into(), args.key, args.value, args.attach) -} - -#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub enum Wai { - Peer, - Client, - Router, -} -impl core::fmt::Display for Wai { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - core::fmt::Debug::fmt(&self, f) - } -} -#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] -pub struct CommonArgs { - #[arg(short, long)] - /// A configuration file. - config: Option, - #[arg(short, long)] - /// The Zenoh session mode [default: peer]. - mode: Option, - #[arg(short = 'e', long)] - /// Endpoints to connect to. - connect: Vec, - #[arg(short, long)] - /// Endpoints to listen on. - listen: Vec, - #[arg(long)] - /// Disable the multicast-based scouting mechanism. - no_multicast_scouting: bool, - #[arg(long)] - /// Disable the multicast-based scouting mechanism. - enable_shm: bool, -} - -impl From for Config { - fn from(value: CommonArgs) -> Self { - (&value).into() - } -} -impl From<&CommonArgs> for Config { - fn from(value: &CommonArgs) -> Self { - let mut config = match &value.config { - Some(path) => Config::from_file(path).unwrap(), - None => Config::default(), - }; - match value.mode { - Some(Wai::Peer) => config.set_mode(Some(zenoh::scouting::WhatAmI::Peer)), - Some(Wai::Client) => config.set_mode(Some(zenoh::scouting::WhatAmI::Client)), - Some(Wai::Router) => config.set_mode(Some(zenoh::scouting::WhatAmI::Router)), - None => Ok(None), - } - .unwrap(); - if !value.connect.is_empty() { - config.connect.endpoints = value.connect.iter().map(|v| v.parse().unwrap()).collect(); - } - if !value.listen.is_empty() { - config.listen.endpoints = value.listen.iter().map(|v| v.parse().unwrap()).collect(); - } - if value.no_multicast_scouting { - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - } - if value.enable_shm { - #[cfg(feature = "shared-memory")] - config.transport.shared_memory.set_enabled(true).unwrap(); - #[cfg(not(feature = "shared-memory"))] - { - println!("enable-shm argument: SHM cannot be enabled, because Zenoh is compiled without shared-memory feature!"); - std::process::exit(-1); - } - } - config - } -} diff --git a/zenoh-typed/examples/z_sub.rs b/zenoh-typed/examples/z_sub.rs deleted file mode 100644 index 4fa8351..0000000 --- a/zenoh-typed/examples/z_sub.rs +++ /dev/null @@ -1,142 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use clap::Parser; -use serde::{Deserialize, Serialize}; -use zenoh::config::Config; -use zenoh::prelude::r#async::*; -use zenoh_typed::prelude::*; - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct MyData { - pub name: String, - pub id: u64, -} - -#[tokio::main] -async fn main() { - // Initiate logging - env_logger::init(); - - let (mut config, key_expr) = parse_args(); - - // A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate - // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the - // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. - config.transport.shared_memory.set_enabled(true).unwrap(); - - println!("Opening session..."); - let session = zenoh::open(config).res().await.unwrap(); - let session = SerdeSession::new(session, CBOR); - - println!("Declaring Subscriber on '{}'...", &key_expr); - - let subscriber = session - .declare_subscriber::<_, MyData>(&key_expr) - .res() - .await - .unwrap(); - - println!("Enter 'q' to quit..."); - while let Ok(data) = subscriber.recv_async().await { - println!(">> [Subscriber] Received {} - {}", data.id, data.name); - } -} - -#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] -struct SubArgs { - #[arg(short, long, default_value = "demo/example/**")] - /// The Key Expression to subscribe to. - key: KeyExpr<'static>, - #[command(flatten)] - common: CommonArgs, -} - -fn parse_args() -> (Config, KeyExpr<'static>) { - let args = SubArgs::parse(); - (args.common.into(), args.key) -} - -#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub enum Wai { - Peer, - Client, - Router, -} -impl core::fmt::Display for Wai { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - core::fmt::Debug::fmt(&self, f) - } -} -#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] -pub struct CommonArgs { - #[arg(short, long)] - /// A configuration file. - config: Option, - #[arg(short, long)] - /// The Zenoh session mode [default: peer]. - mode: Option, - #[arg(short = 'e', long)] - /// Endpoints to connect to. - connect: Vec, - #[arg(short, long)] - /// Endpoints to listen on. - listen: Vec, - #[arg(long)] - /// Disable the multicast-based scouting mechanism. - no_multicast_scouting: bool, - #[arg(long)] - /// Disable the multicast-based scouting mechanism. - enable_shm: bool, -} - -impl From for Config { - fn from(value: CommonArgs) -> Self { - (&value).into() - } -} -impl From<&CommonArgs> for Config { - fn from(value: &CommonArgs) -> Self { - let mut config = match &value.config { - Some(path) => Config::from_file(path).unwrap(), - None => Config::default(), - }; - match value.mode { - Some(Wai::Peer) => config.set_mode(Some(zenoh::scouting::WhatAmI::Peer)), - Some(Wai::Client) => config.set_mode(Some(zenoh::scouting::WhatAmI::Client)), - Some(Wai::Router) => config.set_mode(Some(zenoh::scouting::WhatAmI::Router)), - None => Ok(None), - } - .unwrap(); - if !value.connect.is_empty() { - config.connect.endpoints = value.connect.iter().map(|v| v.parse().unwrap()).collect(); - } - if !value.listen.is_empty() { - config.listen.endpoints = value.listen.iter().map(|v| v.parse().unwrap()).collect(); - } - if value.no_multicast_scouting { - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - } - if value.enable_shm { - #[cfg(feature = "shared-memory")] - config.transport.shared_memory.set_enabled(true).unwrap(); - #[cfg(not(feature = "shared-memory"))] - { - println!("enable-shm argument: SHM cannot be enabled, because Zenoh is compiled without shared-memory feature!"); - std::process::exit(-1); - } - } - config - } -} diff --git a/zenoh-typed/src/handlers.rs b/zenoh-typed/src/handlers.rs deleted file mode 100644 index df5ea13..0000000 --- a/zenoh-typed/src/handlers.rs +++ /dev/null @@ -1,68 +0,0 @@ -/********************************************************************************* -* Copyright (c) 2023 ZettaScale Technology -* -* This program and the accompanying materials are made available under the -* terms of the Eclipse Public License 2.0 which is available at -* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0 -* which is available at https://www.apache.org/licenses/LICENSE-2.0. -* -* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -* Contributors: -* ZettaScale PaaS Team, -*********************************************************************************/ - -use std::marker::PhantomData; - -use zenoh::{ - handlers::{Callback, Dyn}, - payload::Deserialize, - prelude::IntoHandler, - sample::Sample, -}; - -// Pierre's magic -pub struct Typer(pub PhantomData, pub S, pub Handler); - -impl<'a, T, S, Handler> IntoHandler<'a, Sample> for Typer -where - Handler: IntoHandler<'a, T>, - for<'b> S: Send + Sync + Clone + Deserialize<'b, T> + 'a, - T: 'a + Send + Sync, - for<'b> >::Error: std::fmt::Debug, -{ - type Handler = Handler::Handler; - fn into_handler(self) -> (Callback<'a, Sample>, Self::Handler) { - let (cb, receiver) = self.2.into_handler(); - ( - Dyn::new( - move |z: Sample| match self.1.clone().deserialize(z.payload()) { - Ok(d) => cb(d), - Err(e) => log::error!("Cannot deserialize: {e:?}"), - }, - ), - receiver, - ) - } -} - -// impl IntoHandler<'static, Sample> for Typer -// where -// Handler: IntoHandler<'static, T>, -// for<'b> S: Send + Sync + Clone + Deserialize<'b, T>, -// T: Send + Sync + 'static, -// { -// type Handler = Handler::Handler; -// fn into_handler(self) -> (Callback<'static, Sample>, Self::Handler) { -// let (cb, receiver) = self.2.into_handler(); -// let de = self.1.clone(); -// ( -// Dyn::new( -// move |z: Sample| match de.deserialize(z.payload()) { -// Ok(d) => cb(d), -// Err(_) => (), -// }, -// ), -// receiver, -// ) -// } -// } diff --git a/zenoh-typed/src/lib.rs b/zenoh-typed/src/lib.rs deleted file mode 100644 index a55559f..0000000 --- a/zenoh-typed/src/lib.rs +++ /dev/null @@ -1,22 +0,0 @@ -/********************************************************************************* -* Copyright (c) 2023 ZettaScale Technology -* -* This program and the accompanying materials are made available under the -* terms of the Eclipse Public License 2.0 which is available at -* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0 -* which is available at https://www.apache.org/licenses/LICENSE-2.0. -* -* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -* Contributors: -* ZettaScale PaaS Team, -*********************************************************************************/ - -pub mod handlers; -pub mod publication; -pub mod session; -pub mod subscription; - -pub mod prelude { - pub use crate::session::SerdeSession; - pub use crate::session::CBOR; -} diff --git a/zenoh-typed/src/publication.rs b/zenoh-typed/src/publication.rs deleted file mode 100644 index 3a0e3e2..0000000 --- a/zenoh-typed/src/publication.rs +++ /dev/null @@ -1,111 +0,0 @@ -/********************************************************************************* -* Copyright (c) 2023 ZettaScale Technology -* -* This program and the accompanying materials are made available under the -* terms of the Eclipse Public License 2.0 which is available at -* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0 -* which is available at https://www.apache.org/licenses/LICENSE-2.0. -* -* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -* Contributors: -* ZettaScale PaaS Team, -*********************************************************************************/ - -use std::future::Ready; -use std::marker::PhantomData; - -use zenoh::payload::{Payload, Serialize as ZSerialize}; -use zenoh::prelude::QoSBuilderTrait; -use zenoh::publication::PublisherPutBuilder; -use zenoh::Result as ZResult; -use zenoh::{ - prelude::sync::SyncResolve, - publication::{CongestionControl, Priority, Publisher, PublisherBuilder}, -}; -use zenoh_core::{AsyncResolve, Resolvable}; - -pub struct TypePublisherBuilder<'a, 'b: 'a, T, S> { - pub(crate) inner: PublisherBuilder<'a, 'b>, - pub(crate) phantom_data: PhantomData, - pub(crate) serde: S, -} - -impl<'a, 'b, T, S> TypePublisherBuilder<'a, 'b, T, S> -where - S: ZSerialize + Clone, -{ - #[inline] - pub fn congestion_control(mut self, congestion_control: CongestionControl) -> Self { - self.inner = self.inner.congestion_control(congestion_control); - self - } - - /// Change the priority of the written data. - #[inline] - pub fn priority(mut self, priority: Priority) -> Self { - self.inner = self.inner.priority(priority); - self - } - - // /// Restrict the matching subscribers that will receive the published data - // /// to the ones that have the given [`Locality`](crate::prelude::Locality). - // #[zenoh_macros::unstable] - // #[inline] - // pub fn allowed_destination(mut self, destination: Locality) -> Self { - // self.inner = sel.finner.destination(destination); - // self - // } -} - -impl<'a, 'b, T, S> Resolvable for TypePublisherBuilder<'a, 'b, T, S> -where - S: Send + Sync, - T: Send + Sync, -{ - type To = ZResult>; -} - -impl<'a, 'b, T, S> SyncResolve for TypePublisherBuilder<'a, 'b, T, S> -where - S: Send + Sync, - T: Send + Sync, -{ - fn res_sync(self) -> ::To { - let inner = self.inner.res_sync()?; - let publisher = TypedPubliser { - inner, - phantom_data: self.phantom_data, - serde: self.serde, - }; - Ok(publisher) - } -} - -impl<'a, 'b, T, S> AsyncResolve for TypePublisherBuilder<'a, 'b, T, S> -where - S: Send + Sync, - T: Send + Sync, -{ - type Future = Ready; - - fn res_async(self) -> Self::Future { - std::future::ready(self.res_sync()) - } -} - -pub struct TypedPubliser<'a, T, S> { - inner: Publisher<'a>, - phantom_data: PhantomData, - serde: S, -} - -impl<'a, T, S> TypedPubliser<'a, T, S> -where - S: ZSerialize + Send + Sync + Clone, - T: Send + Sync, -{ - pub fn put(&self, payload: T) -> PublisherPutBuilder<'_> { - let payload = self.serde.clone().serialize(payload); - self.inner.put(payload) - } -} diff --git a/zenoh-typed/src/session.rs b/zenoh-typed/src/session.rs deleted file mode 100644 index ee5ed84..0000000 --- a/zenoh-typed/src/session.rs +++ /dev/null @@ -1,125 +0,0 @@ -/********************************************************************************* -* Copyright (c) 2023 ZettaScale Technology -* -* This program and the accompanying materials are made available under the -* terms of the Eclipse Public License 2.0 which is available at -* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0 -* which is available at https://www.apache.org/licenses/LICENSE-2.0. -* -* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -* Contributors: -* ZettaScale PaaS Team, -*********************************************************************************/ - -use crate::handlers::Typer; -use crate::publication::TypePublisherBuilder; -use crate::subscription::TypedSubscriberBuilder; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::marker::PhantomData; -use std::sync::Arc; -use zenoh::publication::SessionPutBuilder; - -use zenoh::payload::{Deserialize as ZDeserialize, Serialize as ZSerialize}; -use zenoh::prelude::Encoding; - -use zenoh::handlers::DefaultHandler; -use zenoh::prelude::r#async::*; -use zenoh::Session; - -#[derive(Clone)] -pub struct CBOR; - -impl ZSerialize for CBOR -where - T: Serialize, -{ - type Output = Payload; - - fn serialize(self, t: T) -> Self::Output { - let data = serde_cbor::to_vec(&t).unwrap(); - Payload::new(data) - } -} - -impl<'a, T> ZDeserialize<'a, T> for CBOR -where - T: DeserializeOwned, -{ - type Error = serde_cbor::Error; - - fn deserialize(self, v: &'a Payload) -> Result { - let data = serde_cbor::from_reader(v.reader())?; - Ok(data) - } -} - -#[derive(Clone)] -pub struct SerdeSession -where - S: Send + Sync + Clone, -{ - session: Arc, - serde: S, -} - -impl<'a, S: Send + Sync + Clone> SerdeSession { - pub fn new(session: Session, serde: S) -> Self { - Self { - session: Arc::new(session), - serde, - } - } - - pub fn put<'b: 'a, TryIntoKeyExpr, T>( - &'a self, - key_expr: TryIntoKeyExpr, - value: T, - ) -> SessionPutBuilder<'a, 'b> - where - S: ZSerialize, - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - let payload = self.serde.clone().serialize(value); - self.session - .put(key_expr, payload) - .encoding(Encoding::APPLICATION_CBOR) - } - - pub fn declare_publisher<'b: 'static, TryIntoKeyExpr, T>( - &'a self, - key_expr: TryIntoKeyExpr, - ) -> TypePublisherBuilder<'a, 'b, T, S> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - S: ZSerialize, - { - TypePublisherBuilder { - inner: zenoh::SessionDeclarations::declare_publisher(&self.session, key_expr), - phantom_data: PhantomData, - serde: self.serde.clone(), - } - } - - pub fn declare_subscriber<'b: 'a, TryIntoKeyExpr, T>( - &'a self, - key_expr: TryIntoKeyExpr, - ) -> TypedSubscriberBuilder<'a, 'b, Typer, T> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - S: ZDeserialize<'a, T>, - T: Send + Sync + 'static, - Typer: IntoHandler<'static, Sample>, - { - let handler = Typer(PhantomData, self.serde.clone(), DefaultHandler); - let inner = - zenoh::SessionDeclarations::declare_subscriber(&self.session, key_expr).with(handler); - TypedSubscriberBuilder { - inner, - phantom_data: PhantomData, - } - } -} diff --git a/zenoh-typed/src/subscription.rs b/zenoh-typed/src/subscription.rs deleted file mode 100644 index ce04066..0000000 --- a/zenoh-typed/src/subscription.rs +++ /dev/null @@ -1,58 +0,0 @@ -/********************************************************************************* -* Copyright (c) 2023 ZettaScale Technology -* -* This program and the accompanying materials are made available under the -* terms of the Eclipse Public License 2.0 which is available at -* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0 -* which is available at https://www.apache.org/licenses/LICENSE-2.0. -* -* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -* Contributors: -* ZettaScale PaaS Team, -*********************************************************************************/ - -use std::{future::Ready, marker::PhantomData}; - -use zenoh::{ - handlers::IntoHandler, - sample::Sample, - subscriber::{Subscriber, SubscriberBuilder}, - Result as ZResult, -}; -use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; - -#[derive(Debug)] -pub struct TypedSubscriberBuilder<'a, 'b, Handler, T> { - pub inner: SubscriberBuilder<'a, 'b, Handler>, - pub phantom_data: PhantomData, -} - -impl<'a, T, Handler> Resolvable for TypedSubscriberBuilder<'a, '_, Handler, T> -where - Handler: IntoHandler<'static, Sample> + Send, - Handler::Handler: Send, -{ - type To = ZResult>; -} - -impl<'a, Handler, T> SyncResolve for TypedSubscriberBuilder<'a, '_, Handler, T> -where - Handler: IntoHandler<'static, Sample> + Send, - Handler::Handler: Send, -{ - fn res_sync(self) -> ::To { - self.inner.res_sync() - } -} - -impl<'a, Handler, T> AsyncResolve for TypedSubscriberBuilder<'a, '_, Handler, T> -where - Handler: IntoHandler<'static, Sample> + Send, - Handler::Handler: Send, -{ - type Future = Ready; - - fn res_async(self) -> Self::Future { - std::future::ready(self.res_sync()) - } -} diff --git a/zrpc-perf/Cargo.toml b/zrpc-perf/Cargo.toml deleted file mode 100644 index 5c4ff20..0000000 --- a/zrpc-perf/Cargo.toml +++ /dev/null @@ -1,51 +0,0 @@ -[package] -authors = ["gabrik "] -categories = ["network-programming"] -description = "Framework for decentralized and distributed microservices over Zenoh." -edition = "2021" -homepage = "https://github.com/ZettaScaleLabs/zenoh-rpc" -license = " EPL-2.0 OR Apache-2.0" -name = "zrpc-perf" -readme = "README.md" -repository = "https://github.com/ZettaScaleLabs/zenoh-rpc" -version = "0.6.0-alpha1" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -# To build with debug on macros: RUSTFLAGS="-Z macro-backtrace" - -[dependencies] -Inflector = "0.11.4" -async-std = { version = "=1.12.0", features = ["attributes"] } -base64 = "0.20.0" -bincode = "1.3.1" -clap = "4.0" -darling = "0.14" -env_logger = "0.10.0" -futures = "0.3.5" -log = "0.4.11" -proc-macro2 = "1.0.6" -prost = "0.11" -quote = "1.0.2" -serde = { version = "1.0.55", features = ["derive"] } -serde_derive = "1.0.55" -serde_json = "1.0.55" -structopt = "0.3.13" -syn = { version = "1.0.11", features = ["full"] } -tokio = { version = "1.2.0", features = ["macros", "rt", "rt-multi-thread"] } -tonic = "0.8" -uhlc = "0.5" -uuid = { version = "1.1", features = ["serde", "v4"] } -zenoh = { version = "=0.7.0-rc", default-features = false } -zenoh-util = { version = "=0.7.0-rc" } -zrpc = { version = "0.6.0-alpha1", path = "../zrpc" } -zrpc-macros = { version = "0.6.0-alpha1", path = "../zrpc-macros" } - -[build-dependencies] -tonic-build = "0.8" - -[profile.release] -codegen-units = 1 -debug = false -lto = "fat" -opt-level = 3 -panic = "abort" diff --git a/zrpc-perf/build.rs b/zrpc-perf/build.rs deleted file mode 100644 index 3ffa963..0000000 --- a/zrpc-perf/build.rs +++ /dev/null @@ -1,4 +0,0 @@ -fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/bench.proto")?; - Ok(()) -} diff --git a/zrpc-perf/plots.r b/zrpc-perf/plots.r deleted file mode 100644 index ec7f6c6..0000000 --- a/zrpc-perf/plots.r +++ /dev/null @@ -1,144 +0,0 @@ -library(ggplot2) -library(cowplot) -library(tidyverse) -library(cowplot) -library(hrbrthemes) -library(viridis) -library(readr) - -setwd(".") - -# The logs to read for 8K batches -read_dir <- function(file_name) { - read_csv(file_name) -} - -lower_ci <- function(mean, se, n, conf_level = 0.95){ - lower_ci <- mean - qt(1 - ((1 - conf_level) / 2), n - 1) * se -} - -upper_ci <- function(mean, se, n, conf_level = 0.95){ - upper_ci <- mean + qt(1 - ((1 - conf_level) / 2), n - 1) * se -} - -logdir <- c("./results"); - -data_files <- c(); - for(ld in logdir) { - data_files <- c(data_files, list.files(ld, pattern = "*.csv", full.names=TRUE)); -} - -raw_data <- data_files %>% - map_df(~ read_dir(.)) - -raw_data <- raw_data %>% filter(MSGS>0) %>% filter(RTT_US>0) %>% mutate(MBIT_THR = THR/1024/1024) - -sizes = c(8,16,32, 64, 128,256,512,1024,2048,4096,8192,16384,32768,65536) - - -## Getting statistics -data <- raw_data %>% group_by(SIZE,KIND) %>% summarise( MEAN_MSGS = mean(MSGS), - SD_MSGS=sd(MSGS), - MEAN_THR= mean(MBIT_THR), - SD_THR=sd(MBIT_THR), - MEAN_RTT= mean(RTT_US), - MEDIAN_RTT=median(RTT_US), - COUNT = n() - ) %>% mutate ( - SE_MSGS= (SD_MSGS) / sqrt(COUNT), - SE_THR = (SD_THR) / sqrt(COUNT), - LCI_MSGS = lower_ci(MEAN_MSGS, SE_MSGS, COUNT), - UCI_MSGS = upper_ci(MEAN_MSGS, SE_MSGS, COUNT), - LCI_THR = lower_ci(MEAN_THR, SE_THR, COUNT), - UCI_THR = upper_ci(MEAN_THR, SE_THR, COUNT), - ) - - -data <- data %>% filter(KIND!="ZRPC-RESP-SER" & KIND!="ZRPC-RESP-DE") - -# p_msgs <- ggplot(data=data, aes(x=factor(SIZE), y=MEAN_MSGS, fill=KIND)) + -# geom_bar(stat="identity", position="dodge") + scale_y_log10() -# -# plot(p_msgs) -# -# p_rtt <- ggplot(data=data, aes(x=factor(SIZE), y=MEAN_RTT, fill=KIND)) + -# geom_bar(stat="identity", position="dodge") + scale_y_log10() -# -# plot(p_rtt) -# -# ggsave("zrpc-msgs.pdf",plot = p_msgs, width = 10, height = 10, limitsize = TRUE); -# ggsave("zrpc-rtt.pdf",plot = p_rtt, width = 10, height = 10, limitsize = TRUE); - - -p_msgs<-ggplot(data=data, aes(x=factor(SIZE), y=MEAN_MSGS, colour=KIND, group=KIND)) + - geom_point(size=2) + - geom_line() + - #scale_y_log10() + - geom_errorbar(aes(ymin=MEAN_MSGS-SE_MSGS, ymax=MEAN_MSGS+SE_MSGS), colour="black", width=.2) + - scale_x_discrete(breaks = sizes, labels = sizes) + - ggtitle("zrpc zenoh comparison msg/s localhost") + - xlab("Payload size") + ylab("msg/s") -plot(p_msgs) - - -p_thr<-ggplot(data=data, aes(x=factor(SIZE), y=MEAN_THR, colour=KIND, group=KIND)) + - geom_point(size=2) + - geom_line() + - scale_y_log10() + - geom_errorbar(aes(ymin=MEAN_THR-SE_THR, ymax=MEAN_THR+SE_THR), colour="black", width=.2) + - scale_x_discrete(breaks = sizes, labels = sizes) + - ggtitle("zrpc zenoh comparison throughput localhost") + - xlab("Payload size") + ylab("Mbit/s") -plot(p_thr) - - -p_rtt<-ggplot(data=data, aes(x=factor(SIZE), y=MEAN_RTT, colour=KIND, group=KIND)) + - geom_point(size=2) + - geom_line() + - #scale_y_log10() + - scale_x_discrete(breaks = sizes, labels = sizes) + - ggtitle("zrpc zenoh comparison rtt localhost") + - xlab("Payload size") + ylab("RTT µS") -plot(p_rtt) - -#p_rtt_median<-ggplot(data=data, aes(x=factor(SIZE), y=MEDIAN_RTT, colour=KIND, group=KIND)) -# geom_point(size=2) + -# geom_line() + -# scale_y_log10() + -# scale_x_discrete(breaks = sizes, labels = sizes) + -# ggtitle("zrpc zenoh comparison median rtt localhost") + -# xlab("Payload size") + ylab("RTT µS") -#plot(p_rtt_median) - - -#dist_data <- raw_data %>% filter(SIZE==8) %>% filter(KIND=="QUERY" | KIND=="QUERY-EVAL" | KIND=="P2P-QUERY-EVAL") - -# dist_data <- raw_data %>% filter(SIZE==8) -# -# -# p_rtt_dist<-ggplot(data=dist_data, aes(x=RTT_US, colour=KIND, group=KIND)) + -# geom_density() + -# #scale_y_log10() + -# ggtitle("zenoh rtt density plot 8 byte") + -# xlab("RTT µS") + ylab("Probability") -# plot(p_rtt_dist) -# -# cmp_data <- data %>% filter(KIND=="PP-ZRPC" | KIND=="PP-QUERY-EVAL" | KIND=="GRPC-CLIENT" ) -# p_msgs_cmp<-ggplot(data=cmp_data, aes(x=factor(SIZE), y=MEAN_MSGS, colour=KIND, group=KIND)) + -# geom_point(size=2) + -# geom_line() + -# # scale_y_log10() + -# geom_errorbar(aes(ymin=MEAN_MSGS-SE_MSGS, ymax=MEAN_MSGS+SE_MSGS), colour="black", width=.2) + -# scale_x_discrete(breaks = sizes, labels = sizes) + -# ggtitle("zrpc zenoh comparison msg/s localhost") + -# xlab("Payload size") + ylab("msg/s") -# plot(p_msgs_cmp) -# -# - -ggsave("msgs-comparison.pdf",plot = p_msgs, width = 10, height = 10, limitsize = TRUE); -ggsave("thr-comparison.pdf",plot = p_thr, width = 10, height = 10, limitsize = TRUE); -ggsave("rtt-comparison.pdf",plot = p_rtt, width = 10, height = 10, limitsize = TRUE); -#ggsave("rtt-median-comparison.pdf",plot = p_rtt_median, width = 10, height = 10, limitsize = TRUE); - -print("Done") \ No newline at end of file diff --git a/zrpc-perf/proto/bench.proto b/zrpc-perf/proto/bench.proto deleted file mode 100644 index cee3cd4..0000000 --- a/zrpc-perf/proto/bench.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; -package bench; - -service Bencher { - rpc Bench (BenchRequest) returns (BenchReply); -} - -message BenchRequest {} - -message BenchReply { - bytes data = 1; -} \ No newline at end of file diff --git a/zrpc-perf/run-test.sh b/zrpc-perf/run-test.sh deleted file mode 100644 index c45e7f2..0000000 --- a/zrpc-perf/run-test.sh +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env bash - - -plog () { -TS=`eval date "+%F-%T"` - echo "[$TS]: $1" -} - -TS=$(date "+%F-%T") - -INITIAL_SIZE=8 -END_SIZE=65536 - -BIN_DIR="./target/release" - -WD=$(pwd) - -SERVER_BIN="gserver" -CLIENT_BIN="gclient" - -GET_BIN="get" - - - -NCALL_BIN="zrpc_call" - -EVAL_BIN="eval" -GET_EVAL_BIN="get_eval" - - -SER_BIN="serialization" -DE_BIN="deserialization" - -ZENOH_REPO="https://github.com/eclipse-zenoh/zenoh" -ZENOH_BRANCH="master" -ZENOH_DIR="$WD/zenoh" - -OUT_DIR="results" - -DURATION=20 -ZENOHD_PATH="$ZENOH_DIR/target/release/zenohd" - -if [[ ! -d $ZENOH_DIR ]]; -then - plog "Cloning and building zenoh from $ZENOH_REPO branch $ZENOH_BRANCH" - git clone $ZENOH_REPO -b $ZENOH_BRANCH $ZENOH_DIR - cd $ZENOH_DIR - cargo build --release - cd $WD -else - cd $ZENOH_DIR - git pull - cargo build --release - cd $WD -fi - - -mkdir -p $OUT_DIR - -plog "Running baseline gRPC bench" - -x=8 -while [ $x -le $END_SIZE ] -do - nohup $BIN_DIR/$SERVER_BIN -a 127.0.0.1:50001 -s $x > /dev/null 2>&1 & - SERVER_PID=$! - plog "Server PID $SERVER_PID" - plog "Running gRPC bench with $x size" - $BIN_DIR/$CLIENT_BIN -d $DURATION -i 1 -a 127.0.0.1:50001 -s $x > $OUT_DIR/grpc-$x-$TS.csv - plog "Done gRPC bench with $x size" - kill -9 $SERVER_PID - x=$(( $x * 2 )) -done - -plog "Running baseline get bench" - -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - nohup $ZENOHD_PATH --mem-storage "test/**" -l tcp/127.0.0.1:7447 > /dev/null 2>&1 & - ZENOHD_PID=$! - plog "Zenohd running PID $ZENOHD_PID" - plog "Running GET bench with $x size" - $BIN_DIR/$GET_BIN -d $DURATION -i 1 -m client -p tcp/127.0.0.1:7447 -s $x > $OUT_DIR/get-$x-$TS.csv - plog "Done GET bench with $x size" - kill -9 $ZENOHD_PID - x=$(( $x * 2 )) -done - - - - - -plog "Running baseline get queryable bench" - -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - nohup $ZENOHD_PATH --mem-storage "test/**" -l tcp/127.0.0.1:7447 > /dev/null 2>&1 & - ZENOHD_PID=$! - plog "Zenohd running PID $ZENOHD_PID" - nohup $BIN_DIR/$NET_EVAL_BIN -m client -p tcp/127.0.0.1:7447 -s $x > /dev/null 2>&1 & - EV_PID=$! - plog "Queryable PID $EV_PID" - plog "Running Queryable bench with $x size" - $BIN_DIR/$NET_GET_EVAL_BIN -d $DURATION -i 1 -m client -p tcp/127.0.0.1:7447 -s $x > $OUT_DIR/queryable-$x-$TS.csv - plog "Done Queryable bench with $x size" - kill -9 $ZENOHD_PID - kill -9 $EV_PID - x=$(( $x * 2 )) -done - - -plog "Running baseline get p2p queryable bench" - -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - nohup $BIN_DIR/$NET_EVAL_BIN -s $x > /dev/null 2>&1 & - EV_PID=$! - plog "Queryable PID $EV_PID" - plog "Running Queryable P2P bench with $x size" - $BIN_DIR/$NET_GET_EVAL_BIN -d $DURATION -i 1 -s $x > $OUT_DIR/p2p-queryable-$x-$TS.csv - plog "Done Queryable P2P bench with $x size" - kill -9 $EV_PID - x=$(( $x * 2 )) -done - -plog "Running baseline eval bench" - -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - nohup $ZENOHD_PATH --mem-storage "test/**" -l tcp/127.0.0.1:7447 > /dev/null 2>&1 & - ZENOHD_PID=$! - plog "Zenohd running PID $ZENOHD_PID" - nohup $BIN_DIR/$EVAL_BIN -m client -p tcp/127.0.0.1:7447 -s $x > /dev/null 2>&1 & - EV_PID=$! - plog "Eval PID $EV_PID" - plog "Running EVAL bench with $x size" - $BIN_DIR/$GET_EVAL_BIN -d $DURATION -i 1 -m client -p tcp/127.0.0.1:7447 -s $x > $OUT_DIR/eval-$x-$TS.csv - plog "Done EVAL bench with $x size" - kill -9 $ZENOHD_PID - kill -9 $EV_PID - x=$(( $x * 2 )) -done - - -plog "Running ZRPC Call bench" -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - plog "Starting zenohd..." - nohup $ZENOHD_PATH --mem-storage "test/**" -l tcp/127.0.0.1:7447 > /dev/null 2>&1 & - ZENOHD_PID=$! - plog "Zenohd running PID $ZENOHD_PID" - sleep 2 - nohup $BIN_DIR/$NCALL_BIN -z client -m server -r tcp/127.0.0.1:7447 -s $x > /tmp/server.out 2>&1 & - SERVER_PID=$! - plog "ZRPC Server running $SERVER_PID" - sleep 6 - plog "Running ZRPC call bench with $x size" - $BIN_DIR/$NCALL_BIN -d $DURATION -z client -i 1 -m client -r tcp/127.0.0.1:7447 -s $x > $OUT_DIR/zcall-$x-$TS.csv - plog "Done ZRPC Call bench, killing server and zenoh" - kill -9 $SERVER_PID - kill -9 $ZENOHD_PID - x=$(( $x * 2 )) -done - - -plog "Running P2P ZRPC Call bench" -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - nohup $BIN_DIR/$NCALL_BIN -z peer -m server -s $x > /tmp/server.out 2>&1 & - SERVER_PID=$! - plog "P2P ZRPC Server running $SERVER_PID" - sleep 5 - plog "Running P2P ZRPC call bench with $x size" - $BIN_DIR/$NCALL_BIN -d $DURATION -z peer -i 1 -m client -s $x > $OUT_DIR/p2p-zcall-$x-$TS.csv - plog "Done P2P ZRPC Call bench, killing server and zenoh" - kill -9 $SERVER_PID - x=$(( $x * 2 )) -done - - -plog "Running ZRPC Response Serialization bench" -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - plog "Running ZRPC Response Serialization bench with $x size" - $BIN_DIR/$SER_BIN -d $DURATION -i 1 -s $x > $OUT_DIR/serialize-$x-$TS.csv - plog "Done ZRPC Response Serialization" - x=$(( $x * 2 )) -done - -plog "Running ZRPC Response Deserialization bench" -x=$INITIAL_SIZE -while [ $x -le $END_SIZE ] -do - plog "Running ZRPC Response Deserialization bench with $x size" - $BIN_DIR/$DE_BIN -d $DURATION -i 1 -s $x > $OUT_DIR/deserialize-$x-$TS.csv - plog "Done ZRPC Response Deserialization" - x=$(( $x * 2 )) -done - - - -plog "Done Test results stored in $OUT_DIR, killing zenohd" -plog "Bye!" - - diff --git a/zrpc-perf/src/bin/deserialization.rs b/zrpc-perf/src/bin/deserialization.rs deleted file mode 100644 index 89d7e33..0000000 --- a/zrpc-perf/src/bin/deserialization.rs +++ /dev/null @@ -1,89 +0,0 @@ -#![allow(clippy::manual_async_fn)] -#![allow(clippy::large_enum_variant)] -#[macro_use] -extern crate std; - -use async_std::sync::Arc; -use async_std::task; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, Instant}; -use structopt::StructOpt; - -use std::str; -use uuid::Uuid; -use zrpc::zrpcresult::{ZRPCError, ZRPCResult}; -use zrpc_macros::zservice; - -static DEFAULT_INT: &str = "5"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_DURATION: &str = "60"; - -#[derive(StructOpt, Debug)] -struct CallArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: u64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -#[zservice( - timeout_s = 60, - prefix = "test", - service_uuid = "00000000-0000-0000-0000-000000000001" -)] -pub trait Bench { - async fn bench(&self) -> Vec; -} - -#[async_std::main] -async fn main() { - // initiate logging - env_logger::init(); - - let args = CallArgs::from_args(); - let rtts = Arc::new(AtomicU64::new(0)); - let count: Arc = Arc::new(AtomicU64::new(0)); - - let kind = "ZRPC-RESP-DE"; - - let c = count.clone(); - let s = args.size; - let i = args.interveal; - let rt = rtts.clone(); - println!("MSGS,SIZE,THR,INTERVEAL,RTT_US,KIND"); - task::spawn(async move { - loop { - task::sleep(Duration::from_secs(i)).await; - let n = c.swap(0, Ordering::AcqRel); - let r = rt.swap(0, Ordering::AcqRel); - let msgs = n / i; - let thr = (n * s * 8) / i; - let rtt = if n == 0 { - 0f64 - } else { - (r as f64) / (n as f64) - } as f64; - println!("{},{},{},{},{},{}", msgs, s, thr, i, rtt, kind); - } - }); - - let d = args.duration; - task::spawn(async move { - task::sleep(Duration::from_secs(d)).await; - std::process::exit(0); - }); - - let data = vec![0; args.size as usize]; - let resp = BenchResponse::Bench(data); - let serialized = zrpc::serialize::serialize_response(&resp).unwrap(); - - loop { - let now_q = Instant::now(); - let _d: BenchResponse = zrpc::serialize::deserialize_response(&serialized.clone()).unwrap(); - count.fetch_add(1, Ordering::AcqRel); - rtts.fetch_add(now_q.elapsed().as_micros() as u64, Ordering::AcqRel); - } -} diff --git a/zrpc-perf/src/bin/eval.rs b/zrpc-perf/src/bin/eval.rs deleted file mode 100644 index e994699..0000000 --- a/zrpc-perf/src/bin/eval.rs +++ /dev/null @@ -1,45 +0,0 @@ -use futures::prelude::*; -use structopt::StructOpt; -use zenoh::{prelude::*, queryable::EVAL}; - -static DEFAULT_MODE: &str = "peer"; -static DEFAULT_SIZE: &str = "8"; - -#[derive(StructOpt, Debug)] -struct GetArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_MODE)] - mode: String, - #[structopt(short, long)] - peer: Option, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, -} - -#[async_std::main] -async fn main() { - let args = GetArgs::from_args(); - - let mut config = zenoh::config::Config::default(); - config.set_mode(Some(args.mode.parse().unwrap())).unwrap(); - - match args.peer { - Some(peer) => { - let peers: Vec = vec![peer.parse().unwrap()]; - config.set_peers(peers).unwrap(); - } - None => (), - }; - - let zenoh = zenoh::open(config).await.unwrap(); - - let path = String::from("test/eval"); - - let data: Vec = vec![0; args.size as usize]; - let mut query_stream = zenoh.queryable(&path).kind(EVAL).await.unwrap(); - while let Some(query) = query_stream.receiver().next().await { - let value = Value::new(data.clone().into()); - let sample = Sample::new(path.clone(), value); - query.reply_async(sample).await; - } -} diff --git a/zrpc-perf/src/bin/gclient.rs b/zrpc-perf/src/bin/gclient.rs deleted file mode 100644 index 4477270..0000000 --- a/zrpc-perf/src/bin/gclient.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::sync::Arc; -use tokio::task; - -pub mod bench { - tonic::include_proto!("bench"); -} - -use bench::bencher_client::BencherClient; -use bench::BenchRequest; - -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, Instant}; -use structopt::StructOpt; - -static DEFAULT_ADDRESS: &str = "127.0.0.1:50001"; -static DEFAULT_INT: &str = "5"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_DURATION: &str = "60"; - -#[derive(StructOpt, Debug)] -struct ClientArgs { - #[structopt(short, long, default_value = DEFAULT_ADDRESS)] - address: String, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: u64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -#[tokio::main(worker_threads = 1)] -async fn main() -> Result<(), Box> { - let args = ClientArgs::from_args(); - - let rtts = Arc::new(AtomicU64::new(0)); - let count: Arc = Arc::new(AtomicU64::new(0)); - - let kind = "GRPC-CLIENT"; - - let mut client = BencherClient::connect(format!("http://{}", args.address)).await?; - - let c = count.clone(); - let s = args.size; - let i = args.interveal; - let rt = rtts.clone(); - - println!("MSGS,SIZE,THR,INTERVEAL,RTT_US,KIND"); - task::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(i)).await; - let n = c.swap(0, Ordering::AcqRel); - let r = rt.swap(0, Ordering::AcqRel); - let msgs = n / i; - let thr = (n * s * 8) / i; - let rtt = if n == 0 { 0 } else { r / n }; - println!("{},{},{},{},{},{}", msgs, s, thr, i, rtt, kind); - } - }); - - let start = Instant::now(); - - while start.elapsed() < Duration::from_secs(args.duration) { - let now_q = Instant::now(); - let request = tonic::Request::new(BenchRequest {}); - let _r = client.bench(request).await.unwrap(); - count.fetch_add(1, Ordering::AcqRel); - rtts.fetch_add(now_q.elapsed().as_micros() as u64, Ordering::AcqRel); - } - Ok(()) -} diff --git a/zrpc-perf/src/bin/get.rs b/zrpc-perf/src/bin/get.rs deleted file mode 100644 index d7d33f6..0000000 --- a/zrpc-perf/src/bin/get.rs +++ /dev/null @@ -1,89 +0,0 @@ -use async_std::sync::Arc; -use async_std::task; -use futures::prelude::*; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, Instant}; -use structopt::StructOpt; -use zenoh::prelude::*; - -static DEFAULT_MODE: &str = "peer"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_INT: &str = "5"; -static DEFAULT_DURATION: &str = "60"; - -#[derive(StructOpt, Debug)] -struct GetArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_MODE)] - mode: String, - #[structopt(short, long)] - peer: Option, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: u64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -#[async_std::main] -async fn main() { - let args = GetArgs::from_args(); - - let rtts = Arc::new(AtomicU64::new(0)); - let count: Arc = Arc::new(AtomicU64::new(0)); - - let mut config = zenoh::config::Config::default(); - config.set_mode(Some(args.mode.parse().unwrap())).unwrap(); - - match args.peer { - Some(peer) => { - let peers: Vec = vec![peer.clone().parse().unwrap()]; - config.set_peers(peers).unwrap(); - } - None => (), - }; - let zenoh = zenoh::open(config).await.unwrap(); - - let selector = format!("test/{}", args.size); - - let kind = if args.mode == "peer" { - "PR-GET" - } else { - "CR-GET" - }; - - let path = format!("test/{}", args.size); - let data = vec![0; args.size as usize]; - let value = Value::new(data.into()); - zenoh.put(&path, value).await.unwrap(); - - println!("MSGS,SIZE,THR,INTERVEAL,RTT_US,KIND"); - let c = count.clone(); - let s = args.size; - let i = args.interveal; - let rt = rtts.clone(); - task::spawn(async move { - loop { - task::sleep(Duration::from_secs(i)).await; - let n = c.swap(0, Ordering::AcqRel); - let r = rt.swap(0, Ordering::AcqRel); - let msgs = n / i; - let thr = (n * s * 8) / i; - let rtt = if n == 0 { 0 } else { r / n }; - println!("{},{},{},{},{},{}", msgs, s, thr, i, rtt, kind); - } - }); - - let start = Instant::now(); - - while start.elapsed() < Duration::from_secs(args.duration) { - let now_q = Instant::now(); - let mut data_stream = zenoh.get(&selector).await.unwrap(); - while data_stream.next().await.is_some() {} - count.fetch_add(1, Ordering::AcqRel); - rtts.fetch_add(now_q.elapsed().as_micros() as u64, Ordering::AcqRel); - } - - zenoh.close().await.unwrap(); -} diff --git a/zrpc-perf/src/bin/get_eval.rs b/zrpc-perf/src/bin/get_eval.rs deleted file mode 100644 index 830404f..0000000 --- a/zrpc-perf/src/bin/get_eval.rs +++ /dev/null @@ -1,84 +0,0 @@ -use async_std::sync::Arc; -use async_std::task; -use futures::prelude::*; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, Instant}; -use structopt::StructOpt; -use zenoh::prelude::*; - -static DEFAULT_MODE: &str = "peer"; -static DEFAULT_INT: &str = "5"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_DURATION: &str = "60"; - -#[derive(StructOpt, Debug)] -struct GetArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_MODE)] - mode: String, - #[structopt(short, long)] - peer: Option, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: u64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -#[async_std::main] -async fn main() { - let args = GetArgs::from_args(); - - let rtts = Arc::new(AtomicU64::new(0)); - let count: Arc = Arc::new(AtomicU64::new(0)); - - let mut config = zenoh::config::Config::default(); - config.set_mode(Some(args.mode.parse().unwrap())).unwrap(); - - match args.peer { - Some(peer) => { - let peers: Vec = vec![peer.clone().parse().unwrap()]; - config.set_peers(peers).unwrap(); - } - None => (), - }; - - let kind = if args.mode == "peer" { - "PP-GET-EVAL" - } else { - "CRC-GET-EVAL" - }; - - let zenoh = zenoh::open(config).await.unwrap(); - - let path = String::from("test/eval"); - - let c = count.clone(); - let s = args.size; - let i = args.interveal; - let rt = rtts.clone(); - task::spawn(async move { - loop { - task::sleep(Duration::from_secs(i)).await; - let n = c.swap(0, Ordering::AcqRel); - let r = rt.swap(0, Ordering::AcqRel); - let msgs = n / i; - let thr = (n * s * 8) / i; - let rtt = if n == 0 { 0 } else { r / n }; - println!("{},{},{},{},{},{}", msgs, s, thr, i, rtt, kind); - } - }); - - let start = Instant::now(); - - while start.elapsed() < Duration::from_secs(args.duration) { - let now_q = Instant::now(); - let mut data_stream = zenoh.get(&path).await.unwrap(); - while data_stream.next().await.is_some() {} - count.fetch_add(1, Ordering::AcqRel); - rtts.fetch_add(now_q.elapsed().as_micros() as u64, Ordering::AcqRel); - } - - zenoh.close().await.unwrap(); -} diff --git a/zrpc-perf/src/bin/gserver.rs b/zrpc-perf/src/bin/gserver.rs deleted file mode 100644 index 4f8193e..0000000 --- a/zrpc-perf/src/bin/gserver.rs +++ /dev/null @@ -1,54 +0,0 @@ -use tonic::{transport::Server, Request, Response, Status}; - -pub mod bench { - tonic::include_proto!("bench"); -} - -use bench::bencher_server::{Bencher, BencherServer}; -use bench::{BenchReply, BenchRequest}; -use structopt::StructOpt; - -static DEFAULT_ADDRESS: &str = "127.0.0.1:50001"; -static DEFAULT_SIZE: &str = "8"; - -#[derive(StructOpt, Debug)] -struct ServerArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_ADDRESS)] - address: String, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, -} - -#[derive(Debug, Default)] -pub struct MyBench { - data: Vec, -} - -#[tonic::async_trait] -impl Bencher for MyBench { - async fn bench(&self, _request: Request) -> Result, Status> { - let reply = bench::BenchReply { - data: self.data.clone(), - }; - Ok(Response::new(reply)) - } -} - -#[tokio::main(worker_threads = 1)] -async fn main() -> Result<(), Box> { - let args = ServerArgs::from_args(); - - let addr = args.address.parse()?; - - let data = vec![0; args.size as usize]; - - let greeter = MyBench { data }; - - Server::builder() - .add_service(BencherServer::new(greeter)) - .serve(addr) - .await?; - - Ok(()) -} diff --git a/zrpc-perf/src/bin/put_thr.rs b/zrpc-perf/src/bin/put_thr.rs deleted file mode 100644 index fdaa927..0000000 --- a/zrpc-perf/src/bin/put_thr.rs +++ /dev/null @@ -1,42 +0,0 @@ -use structopt::StructOpt; -use zenoh::prelude::r#async::*; - -static DEFAULT_MODE: &str = "peer"; -static DEFAULT_SIZE: &str = "8"; - -#[derive(StructOpt, Debug)] -struct PutArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_MODE)] - mode: String, - #[structopt(short, long)] - peer: Option, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, -} - -#[async_std::main] -async fn main() { - let args = PutArgs::from_args(); - - //println!("Args {:?}", args); - - let mut config = zenoh::config::Config::default(); - config.set_mode(Some(args.mode.parse().unwrap())).unwrap(); - - match args.peer { - Some(peer) => { - config.connect.endpoints.extend(vec![peer.parse().unwrap()]); - } - None => (), - }; - let zenoh = zenoh::open(config).res().await.unwrap(); - - let path = String::from("test/thr"); - let data = vec![0; args.size as usize]; - let value = Value::new(data.into()); - - loop { - zenoh.put(&path, value.clone()).res().await.unwrap(); - } -} diff --git a/zrpc-perf/src/bin/serialization.rs b/zrpc-perf/src/bin/serialization.rs deleted file mode 100644 index ebd3691..0000000 --- a/zrpc-perf/src/bin/serialization.rs +++ /dev/null @@ -1,87 +0,0 @@ -#![allow(clippy::manual_async_fn)] -#![allow(clippy::large_enum_variant)] -#[macro_use] -extern crate std; - -use async_std::sync::Arc; -use async_std::task; -use std::str; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, Instant}; -use structopt::StructOpt; -use uuid::Uuid; -use zrpc::zrpcresult::{ZRPCError, ZRPCResult}; -use zrpc_macros::zservice; - -static DEFAULT_INT: &str = "5"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_DURATION: &str = "60"; - -#[derive(StructOpt, Debug)] -struct CallArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: u64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -#[zservice( - timeout_s = 60, - prefix = "test", - service_uuid = "00000000-0000-0000-0000-000000000001" -)] -pub trait Bench { - async fn bench(&self) -> Vec; -} - -#[async_std::main] -async fn main() { - // initiate logging - env_logger::init(); - - let args = CallArgs::from_args(); - let rtts = Arc::new(AtomicU64::new(0)); - let count: Arc = Arc::new(AtomicU64::new(0)); - - let kind = "ZRPC-RESP-SER"; - - let c = count.clone(); - let s = args.size; - let i = args.interveal; - let rt = rtts.clone(); - println!("MSGS,SIZE,THR,INTERVEAL,RTT_US,KIND"); - task::spawn(async move { - loop { - task::sleep(Duration::from_secs(i)).await; - let n = c.swap(0, Ordering::AcqRel); - let r = rt.swap(0, Ordering::AcqRel); - let msgs = n / i; - let thr = (n * s * 8) / i; - let rtt = if n == 0 { - 0f64 - } else { - (r as f64) / (n as f64) - } as f64; - println!("{},{},{},{},{},{}", msgs, s, thr, i, rtt, kind); - } - }); - - let d = args.duration; - task::spawn(async move { - task::sleep(Duration::from_secs(d)).await; - std::process::exit(0); - }); - - let data = vec![0; args.size as usize]; - let resp = BenchResponse::Bench(data); - - loop { - let now_q = Instant::now(); - let _d = zrpc::serialize::serialize_response(&resp.clone()).unwrap(); - count.fetch_add(1, Ordering::AcqRel); - rtts.fetch_add(now_q.elapsed().as_micros() as u64, Ordering::AcqRel); - } -} diff --git a/zrpc-perf/src/bin/sub_thr.rs b/zrpc-perf/src/bin/sub_thr.rs deleted file mode 100644 index 435c7eb..0000000 --- a/zrpc-perf/src/bin/sub_thr.rs +++ /dev/null @@ -1,76 +0,0 @@ -use async_std::sync::Arc; -use async_std::task; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Duration; -use structopt::StructOpt; - -use zenoh::prelude::*; - -static DEFAULT_MODE: &str = "peer"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_INT: &str = "5"; -static DEFAULT_DURATION: &str = "60"; - -#[derive(StructOpt, Debug)] -struct GetArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_MODE)] - mode: String, - #[structopt(short, long)] - peer: Option, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: u64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -#[async_std::main] -async fn main() { - let args = GetArgs::from_args(); - let count: Arc = Arc::new(AtomicU64::new(0)); - - let mut config = zenoh::config::Config::default(); - config.set_mode(Some(args.mode.parse().unwrap())).unwrap(); - - match args.peer { - Some(peer) => { - let peers: Vec = vec![peer.clone().parse().unwrap()]; - config.set_peers(peers).unwrap(); - } - None => (), - }; - let zenoh = zenoh::open(config).await.unwrap(); - - let reskey = String::from("test/thr"); - - let kind = if args.mode == "peer" { - "PP-NET-SUB" - } else { - "CRC-NET-SUB" - }; - println!("MSGS,SIZE,THR,INTERVEAL,RTT_US,KIND"); - let c = count.clone(); - let s = args.size; - let i = args.interveal; - task::spawn(async move { - loop { - task::sleep(Duration::from_secs(i)).await; - let n = c.swap(0, Ordering::AcqRel); - let msgs = n / i; - let thr = (n * s * 8) / i; - println!("{},{},{},{},{},{}", msgs, s, thr, i, 0, kind); - } - }); - - let _subscriber = zenoh - .subscribe(&reskey) - .callback(move |_sample| { - count.fetch_add(1, Ordering::AcqRel); - }) - .await - .unwrap(); - - task::sleep(Duration::from_secs(args.duration)).await; -} diff --git a/zrpc-perf/src/bin/z_ping.rs b/zrpc-perf/src/bin/z_ping.rs deleted file mode 100644 index b6af75c..0000000 --- a/zrpc-perf/src/bin/z_ping.rs +++ /dev/null @@ -1,148 +0,0 @@ -// -// Copyright (c) 2017, 2020 ADLINK Technology Inc. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ADLINK zenoh team, -// -use async_std::channel::unbounded; -use async_std::stream::StreamExt; -use async_std::sync::{Arc, Barrier, Mutex}; -use async_std::task; -use std::collections::HashMap; -use std::time::{Duration, Instant}; -use structopt::StructOpt; -use zenoh::prelude::r#async::*; - -static DEFAULT_MODE: &str = "peer"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_INT: &str = "1"; -static DEFAULT_DURATION: &str = "60"; -#[derive(StructOpt, Debug)] -struct PingArgs { - /// Zenoh mode, client or peer - #[structopt(short, long, default_value = DEFAULT_MODE)] - mode: String, - #[structopt(short, long)] - peer: Option, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: f64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -type PingInfo = (u64, usize, u128); - -#[async_std::main] -async fn main() { - // initiate logging - env_logger::init(); - - let args = PingArgs::from_args(); - - let scenario = if args.mode == "peer" { - "PP-PING" - } else { - "CRC-PING" - }; - - let mut config = zenoh::config::Config::default(); - config.set_mode(Some(args.mode.parse().unwrap())).unwrap(); - - match args.peer { - Some(peer) => { - config.connect.endpoints.extend(vec![peer.parse().unwrap()]); - } - None => (), - }; - let session = Arc::new(zenoh::open(config).res().await.unwrap()); - - let (s, r) = unbounded::(); - - // The hashmap with the pings - let pending = Arc::new(Mutex::new(HashMap::::new())); - let barrier = Arc::new(Barrier::new(2)); - - let c_pending = pending.clone(); - let c_barrier = barrier.clone(); - let c_session = session.clone(); - task::spawn(async move { - // The resource to wait the response back - let reskey_pong = String::from("test/pong"); - - let sub = c_session - .declare_subscriber(&reskey_pong) - .res() - .await - .unwrap(); - - // Wait for the both publishers and subscribers to be declared - c_barrier.wait().await; - println!("SQ_NUMBER,SIZE,RTT_US,SCENARIO"); - while let Ok(mut sample) = sub.recv_async().await { - let mut count_bytes = [0u8; 8]; - sample.value.payload.read_bytes(&mut count_bytes); - let count = u64::from_le_bytes(count_bytes); - let instant = c_pending.lock().await.remove(&count).unwrap(); - s.send(( - count, - sample.value.payload.len(), - instant.elapsed().as_micros(), - )) - .await - .unwrap(); - //print!("{},{},{},{}\n", count,sample.payload.len(),instant.elapsed().as_micros(),scenario); - } - }); - - task::spawn(async move { - loop { - while let Ok(pi) = r.recv().await { - let (c, s, rtt) = pi; - print!("{},{},{},{}\n", c, s, rtt, scenario); - } - } - }); - - let d = args.duration; - task::spawn(async move { - task::sleep(Duration::from_secs(d)).await; - std::process::exit(0); - }); - - // The resource to publish data on - let reskey_ping = String::from("test/ping"); - - // Wait for the both publishers and subscribers to be declared - barrier.wait().await; - - let payload = vec![0u8; args.size as usize - 8]; - let mut count: u64 = 0; - let i = args.interveal; - loop { - let mut data: WBuf = WBuf::new(args.size as usize, true); - let count_bytes: [u8; 8] = count.to_le_bytes(); - data.write_bytes(&count_bytes); - data.write_bytes(&payload); - - let value = Value::new(data.into()); - - pending.lock().await.insert(count, Instant::now()); - session - .put(&reskey_ping, value) - .congestion_control(zenoh::publication::CongestionControl::Block) // Make sure to not drop messages because of congestion control - .await - .unwrap(); - - task::sleep(Duration::from_secs_f64(i)).await; - count += 1; - } -} diff --git a/zrpc-perf/src/bin/z_pong.rs b/zrpc-perf/src/bin/z_pong.rs deleted file mode 100644 index b1c4f15..0000000 --- a/zrpc-perf/src/bin/z_pong.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::io::{stdin, Read}; - -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use clap::{App, Arg}; -use zenoh::config::Config; -use zenoh::prelude::keyexpr; -use zenoh::prelude::sync::{SessionDeclarations, SyncResolve}; -use zenoh::publication::CongestionControl; - -fn main() { - // initiate logging - env_logger::init(); - - let config = parse_args(); - - let session = zenoh::open(config).res().unwrap().into_arc(); - - // The key expression to read the data from - let key_expr_ping = keyexpr::new("test/ping").unwrap(); - - // The key expression to echo the data back - let key_expr_pong = keyexpr::new("test/pong").unwrap(); - - let publisher = session - .declare_publisher(key_expr_pong) - .congestion_control(CongestionControl::Block) - .res() - .unwrap(); - - let _sub = session - .declare_subscriber(key_expr_ping) - .callback(move |sample| publisher.put(sample.value).res().unwrap()) - .res() - .unwrap(); - for _ in stdin().bytes().take_while(|b| !matches!(b, Ok(b'q'))) {} -} - -fn parse_args() -> Config { - let args = App::new("zenoh roundtrip pong example") - .arg( - Arg::from_usage("-m, --mode=[MODE] 'The zenoh session mode (peer by default).") - .possible_values(&["peer", "client"]), - ) - .arg(Arg::from_usage( - "-e, --connect=[ENDPOINT]... 'Endpoints to connect to.'", - )) - .arg(Arg::from_usage( - "-l, --listen=[ENDPOINT]... 'Endpoints to listen on.'", - )) - .arg(Arg::from_usage( - "--no-multicast-scouting 'Disable the multicast-based scouting mechanism.'", - )) - .get_matches(); - - let mut config = if let Some(conf_file) = args.value_of("config") { - Config::from_file(conf_file).unwrap() - } else { - Config::default() - }; - if let Some(Ok(mode)) = args.value_of("mode").map(|mode| mode.parse()) { - config.set_mode(Some(mode)).unwrap(); - } - if let Some(values) = args.values_of("connect") { - config - .connect - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - if let Some(values) = args.values_of("listen") { - config - .listen - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - if args.is_present("no-multicast-scouting") { - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - } - - config -} diff --git a/zrpc-perf/src/bin/znrpc_call.rs b/zrpc-perf/src/bin/znrpc_call.rs deleted file mode 100644 index 2982e5f..0000000 --- a/zrpc-perf/src/bin/znrpc_call.rs +++ /dev/null @@ -1,167 +0,0 @@ -#![allow(clippy::manual_async_fn)] -#![allow(clippy::large_enum_variant)] -#[macro_use] -extern crate std; - -use async_std::sync::Arc; -use async_std::task; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, Instant}; -use structopt::StructOpt; -use zenoh::prelude::r#async::*; - -use std::str; -use uuid::Uuid; -use zrpc::zrpcresult::{ZRPCError, ZRPCResult}; -use zrpc::ZServe; -use zrpc_macros::{zserver, zservice}; - -static DEFAULT_MODE: &str = "client"; -static DEFAULT_ZMODE: &str = "peer"; -static DEFAULT_ROUTER: &str = "tcp/127.0.0.1:7447"; -static DEFAULT_INT: &str = "5"; -static DEFAULT_SIZE: &str = "8"; -static DEFAULT_DURATION: &str = "60"; - -#[derive(StructOpt, Debug)] -struct CallArgs { - /// Config file - #[structopt(short, long, default_value = DEFAULT_MODE)] - mode: String, - #[structopt(short, long, default_value = DEFAULT_ZMODE)] - zenoh_mode: String, - #[structopt(short, long, default_value = DEFAULT_ROUTER)] - router: String, - #[structopt(short, long, default_value = DEFAULT_SIZE)] - size: u64, - #[structopt(short, long, default_value = DEFAULT_INT)] - interveal: u64, - #[structopt(short, long, default_value = DEFAULT_DURATION)] - duration: u64, -} - -#[zservice( - timeout_s = 60, - prefix = "test", - service_uuid = "00000000-0000-0000-0000-000000000001" -)] -pub trait Bench { - async fn bench(&self) -> Vec; -} - -#[derive(Clone)] -struct BenchZService { - pub data: Vec, -} - -#[zserver] -impl Bench for BenchZService { - async fn bench(&self) -> Vec { - self.data.clone() - } -} - -#[async_std::main] -async fn main() { - // initiate logging - env_logger::init(); - - let args = CallArgs::from_args(); - - if args.mode == "server" { - server(args).await; - } else if args.mode == "client" { - client(args).await; - } else { - panic!("Mode can be only one of [client|server]") - } -} - -async fn client(args: CallArgs) { - let rtts = Arc::new(AtomicU64::new(0)); - let count: Arc = Arc::new(AtomicU64::new(0)); - - let mut config = zenoh::config::Config::default(); - config - .set_mode(Some(args.zenoh_mode.parse().unwrap())) - .unwrap(); - - if args.zenoh_mode == "client" { - config - .connect - .endpoints - .extend(vec![args.router.parse().unwrap()]); - } - let zenoh = Arc::new(zenoh::open(config).res().await.unwrap()); - - task::sleep(std::time::Duration::from_secs(1)).await; - - let local_servers = BenchClient::find_servers(zenoh.clone()).await.unwrap(); - log::trace!("Servers: {:?}", local_servers); - - let client = BenchClient::new(zenoh.clone(), local_servers[0]); - - let kind = if args.zenoh_mode == "client" { - "CRC-ZRPC" - } else { - "PP-ZRPC" - }; - - let c = count.clone(); - let s = args.size; - let i = args.interveal; - let rt = rtts.clone(); - println!("MSGS,SIZE,THR,INTERVEAL,RTT_US,KIND"); - task::spawn(async move { - loop { - task::sleep(Duration::from_secs(i)).await; - let n = c.swap(0, Ordering::AcqRel); - let r = rt.swap(0, Ordering::AcqRel); - let msgs = n / i; - let thr = (n * s * 8) / i; - let rtt = if n == 0 { 0 } else { r / n }; - println!("{},{},{},{},{},{}", msgs, s, thr, i, rtt, kind); - } - }); - - let start = Instant::now(); - - while start.elapsed() < Duration::from_secs(args.duration) { - let now_q = Instant::now(); - let _r = client.bench().await; - count.fetch_add(1, Ordering::AcqRel); - rtts.fetch_add(now_q.elapsed().as_micros() as u64, Ordering::AcqRel); - } - - //zenoh.close().await.unwrap(); -} - -async fn server(args: CallArgs) { - let mut config = zenoh::config::Config::default(); - config - .set_mode(Some(args.zenoh_mode.parse().unwrap())) - .unwrap(); - - if args.zenoh_mode == "client" { - config - .connect - .endpoints - .extend(vec![args.router.parse().unwrap()]); - } - - let zenoh = Arc::new(zenoh::open(config).res().await.unwrap()); - - let data = vec![0; args.size as usize]; - - let service = BenchZService { data }; - - let server = service.get_bench_server(zenoh.clone(), None); - - println!("Instance ID {}", server.instance_uuid()); - let (_ss, _h) = server.connect().await.unwrap(); - server.initialize().await.unwrap(); - server.register().await.unwrap(); - let (_s, handle) = server.start().await.unwrap(); - - let _ = handle.await; -} diff --git a/zrpc-perf/src/lib.rs b/zrpc-perf/src/lib.rs deleted file mode 100644 index 4ed4e7f..0000000 --- a/zrpc-perf/src/lib.rs +++ /dev/null @@ -1,12 +0,0 @@ -/********************************************************************************* -* Copyright (c) 2018,2020 ADLINK Technology Inc. -* -* This program and the accompanying materials are made available under the -* terms of the Eclipse Public License 2.0 which is available at -* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0 -* which is available at https://www.apache.org/licenses/LICENSE-2.0. -* -* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -* Contributors: -* ADLINK fog05 team, -*********************************************************************************/