Skip to content

Commit

Permalink
separate daemon and runtime commands + cargo clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
Hennzau committed Nov 22, 2024
1 parent 5862371 commit 1a3efed
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 113 deletions.
2 changes: 1 addition & 1 deletion zenoh-flow-nodes/src/io/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Outputs {
/// Insert the `flume::Sender` in the [Outputs], creating the entry if needed in the internal
/// `HashMap`.
pub fn insert(&mut self, port_id: PortId, tx: Sender<LinkMessage>) {
self.hmap.entry(port_id).or_insert_with(Vec::new).push(tx)
self.hmap.entry(port_id).or_default().push(tx)
}

/// Returns an Output builder for the provided `port_id`, if an output was declared with this exact name in the
Expand Down
4 changes: 2 additions & 2 deletions zenoh-flow-records/src/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ Caused by:
);
additional_mappings
.entry(runtime_from.clone())
.or_insert_with(HashSet::default)
.or_default()
.insert(sender_id);

receivers.insert(
Expand All @@ -220,7 +220,7 @@ Caused by:
);
additional_mappings
.entry(runtime_to.clone())
.or_insert_with(HashSet::default)
.or_default()
.insert(receiver_id);
}
}
Expand Down
136 changes: 136 additions & 0 deletions zfctl/src/daemon_command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//
// Copyright © 2021 ZettaScale Technology <[email protected]>
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::path::PathBuf;

use async_std::stream::StreamExt;
use clap::{ArgGroup, Subcommand};

use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook_async_std::Signals;
use zenoh::prelude::r#async::*;
use zenoh_flow_commons::{try_parse_from_file, Result, Vars};
use zenoh_flow_daemon::daemon::{Daemon, ZenohFlowConfiguration};

use zenoh_flow_runtime::Runtime;

#[derive(Subcommand)]
pub(crate) enum DaemonCommand {
/// Launch a Zenoh-Flow Daemon.
#[command(verbatim_doc_comment)]
#[command(group(
ArgGroup::new("exclusive")
.args(&["name", "configuration"])
.required(true)
.multiple(false)
))]
Run {
/// The human-readable name to give the Zenoh-Flow Runtime wrapped by this
/// Daemon.
///
/// To start a Zenoh-Flow Daemon, at least a name is required.
name: Option<String>,
/// The path of the configuration of the Zenoh-Flow Daemon.
///
/// This configuration allows setting extensions supported by the Runtime
/// and its name.
#[arg(short, long, verbatim_doc_comment)]
configuration: Option<PathBuf>,
/// The path to a Zenoh configuration to manage the connection to the Zenoh
/// network.
///
/// If no configuration is provided, `zfctl` will default to connecting as
/// a peer with multicast scouting enabled.
#[arg(short = 'z', long, verbatim_doc_comment)]
zenoh_configuration: Option<PathBuf>,
},
}

impl DaemonCommand {
pub async fn run(self, _session: Session) -> Result<()> {
match self {
DaemonCommand::Run {
name,
configuration,
zenoh_configuration,
} => {
let zenoh_config = match zenoh_configuration {
Some(path) => {
zenoh::prelude::Config::from_file(path.clone()).unwrap_or_else(|e| {
panic!(
"Failed to parse the Zenoh configuration from < {} >:\n{e:?}",
path.display()
)
})
}
None => zenoh::config::peer(),
};

let zenoh_session = zenoh::open(zenoh_config)
.res_async()
.await
.unwrap_or_else(|e| panic!("Failed to open Zenoh session:\n{e:?}"))
.into_arc();

let daemon = match configuration {
Some(path) => {
let (zenoh_flow_configuration, _) =
try_parse_from_file::<ZenohFlowConfiguration>(&path, Vars::default())
.unwrap_or_else(|e| {
panic!(
"Failed to parse a Zenoh-Flow Configuration from < {} >:\n{e:?}",
path.display()
)
});

Daemon::spawn_from_config(zenoh_session, zenoh_flow_configuration)
.await
.expect("Failed to spawn the Zenoh-Flow Daemon")
}
None => Daemon::spawn(
Runtime::builder(name.unwrap())
.session(zenoh_session)
.build()
.await
.expect("Failed to build the Zenoh-Flow Runtime"),
)
.await
.expect("Failed to spawn the Zenoh-Flow Daemon"),
};

async_std::task::spawn(async move {
let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT])
.expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]");

while let Some(signal) = signals.next().await {
match signal {
SIGTERM | SIGINT | SIGQUIT => {
tracing::info!("Received termination signal, shutting down.");
daemon.stop().await;
break;
}

signal => {
tracing::warn!("Ignoring signal ({signal})");
}
}
}
})
.await;
}
}

Ok(())
}
}
8 changes: 8 additions & 0 deletions zfctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use instance_command::InstanceCommand;
mod runtime_command;
use runtime_command::RuntimeCommand;

mod daemon_command;
use daemon_command::DaemonCommand;

mod utils;
use std::path::PathBuf;

Expand Down Expand Up @@ -82,6 +85,10 @@ enum Command {
/// To interact with a Zenoh-Flow runtime.
#[command(subcommand)]
Runtime(RuntimeCommand),

/// To interact with the Zenoh-Flow daemon.
#[command(subcommand)]
Daemon(DaemonCommand),
}

#[async_std::main]
Expand Down Expand Up @@ -124,5 +131,6 @@ async fn main() -> Result<()> {
command.run(session, orchestrator_id).await
}
Command::Runtime(command) => command.run(session).await,
Command::Daemon(command) => command.run(session).await,
}
}
118 changes: 8 additions & 110 deletions zfctl/src/runtime_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
use std::{path::PathBuf, time::Duration};

use anyhow::{anyhow, Context};
use async_std::{io::ReadExt, stream::StreamExt};
use clap::{ArgGroup, Subcommand};
use async_std::io::ReadExt;
use clap::Subcommand;
use comfy_table::{Row, Table};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook_async_std::Signals;

use zenoh::prelude::r#async::*;
use zenoh_flow_commons::{parse_vars, try_parse_from_file, Result, RuntimeId, Vars};
use zenoh_flow_daemon::{
daemon::{Daemon, ZenohFlowConfiguration},
queries::*,
};
use zenoh_flow_commons::{parse_vars, Result, RuntimeId, Vars};
use zenoh_flow_daemon::queries::*;
use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor};
use zenoh_flow_records::DataFlowRecord;
use zenoh_flow_runtime::{Extensions, Runtime};
Expand Down Expand Up @@ -67,9 +63,9 @@ pub(crate) enum RuntimeCommand {
runtime_name: Option<String>,
},

/// Launch a Zenoh-Flow runtime with a Data Flow in standalone mode.
/// Launch a Zenoh-Flow runtime with a DataFlow in standalone mode.
#[command(verbatim_doc_comment)]
RunStandalone {
Run {
/// The data flow to execute.
flow: PathBuf,
/// The, optional, location of the configuration to load nodes implemented not in Rust.
Expand All @@ -90,35 +86,6 @@ pub(crate) enum RuntimeCommand {
#[arg(short = 'z', long, verbatim_doc_comment)]
zenoh_configuration: Option<PathBuf>,
},

/// Launch a Zenoh-Flow runtime with a Data Flow through a Zenoh-Flow daemon.
#[command(verbatim_doc_comment)]
#[command(group(
ArgGroup::new("exclusive")
.args(&["name", "configuration"])
.required(true)
.multiple(false)
))]
Run {
/// The human-readable name to give the Zenoh-Flow Runtime wrapped by this
/// Daemon.
///
/// To start a Zenoh-Flow Daemon, at least a name is required.
name: Option<String>,
/// The path of the configuration of the Zenoh-Flow Daemon.
///
/// This configuration allows setting extensions supported by the Runtime
/// and its name.
#[arg(short, long, verbatim_doc_comment)]
configuration: Option<PathBuf>,
/// The path to a Zenoh configuration to manage the connection to the Zenoh
/// network.
///
/// If no configuration is provided, `zfctl` will default to connecting as
/// a peer with multicast scouting enabled.
#[arg(short = 'z', long, verbatim_doc_comment)]
zenoh_configuration: Option<PathBuf>,
},
}

impl RuntimeCommand {
Expand Down Expand Up @@ -236,7 +203,7 @@ impl RuntimeCommand {
}
}
}
RuntimeCommand::RunStandalone {
RuntimeCommand::Run {
flow,
extensions,
vars,
Expand Down Expand Up @@ -354,75 +321,6 @@ impl RuntimeCommand {
panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e)
});
}
RuntimeCommand::Run {
name,
configuration,
zenoh_configuration,
} => {
let zenoh_config = match zenoh_configuration {
Some(path) => {
zenoh::prelude::Config::from_file(path.clone()).unwrap_or_else(|e| {
panic!(
"Failed to parse the Zenoh configuration from < {} >:\n{e:?}",
path.display()
)
})
}
None => zenoh::config::peer(),
};

let zenoh_session = zenoh::open(zenoh_config)
.res_async()
.await
.unwrap_or_else(|e| panic!("Failed to open Zenoh session:\n{e:?}"))
.into_arc();

let daemon = match configuration {
Some(path) => {
let (zenoh_flow_configuration, _) =
try_parse_from_file::<ZenohFlowConfiguration>(&path, Vars::default())
.unwrap_or_else(|e| {
panic!(
"Failed to parse a Zenoh-Flow Configuration from < {} >:\n{e:?}",
path.display()
)
});

Daemon::spawn_from_config(zenoh_session, zenoh_flow_configuration)
.await
.expect("Failed to spawn the Zenoh-Flow Daemon")
}
None => Daemon::spawn(
Runtime::builder(name.unwrap())
.session(zenoh_session)
.build()
.await
.expect("Failed to build the Zenoh-Flow Runtime"),
)
.await
.expect("Failed to spawn the Zenoh-Flow Daemon"),
};

async_std::task::spawn(async move {
let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT])
.expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]");

while let Some(signal) = signals.next().await {
match signal {
SIGTERM | SIGINT | SIGQUIT => {
tracing::info!("Received termination signal, shutting down.");
daemon.stop().await;
break;
}

signal => {
tracing::warn!("Ignoring signal ({signal})");
}
}
}
})
.await;
}
}

Ok(())
Expand Down

0 comments on commit 1a3efed

Please sign in to comment.