Skip to content

Commit

Permalink
feat(rust): wip add influx service to the launch config arg of node c…
Browse files Browse the repository at this point in the history
…reate
  • Loading branch information
adrianbenavides committed Sep 10, 2024
1 parent 9546fd0 commit f4000dc
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 49 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_abac/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ockam_vault = { path = "../ockam_vault", default-features = false, features = ["
quickcheck = "1.0.3"
rand = "0.8.5"
tempfile = "3.10.1"
serde_json = "1.0.128"

[[bin]]
name = "repl"
Expand Down
49 changes: 49 additions & 0 deletions implementations/rust/ockam/ockam_abac/src/policy_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use minicbor::{CborLen, Decode, Encode};
use ockam_core::compat::fmt::{Display, Formatter};
#[cfg(feature = "std")]
use ockam_core::Result;
use serde::{Deserialize, Serialize};

/// A Policy expression can either be represented with
/// - A full expression with string valued attributes, contain operator, etc...
Expand Down Expand Up @@ -93,6 +94,25 @@ impl TryFrom<String> for PolicyExpression {
}
}

impl Serialize for PolicyExpression {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.to_string().serialize(serializer)
}
}

impl<'d> Deserialize<'d> for PolicyExpression {
fn deserialize<D>(deserializer: D) -> Result<PolicyExpression, D::Error>
where
D: serde::Deserializer<'d>,
{
let s = String::deserialize(deserializer)?;
PolicyExpression::from_str(s.as_str()).map_err(serde::de::Error::custom)
}
}

#[cfg(test)]
mod tests {
use crate::PolicyExpression::{BooleanExpression, FullExpression};
Expand All @@ -113,4 +133,33 @@ mod tests {
BooleanExpression(BooleanExpr::from_str(boolean_expression).unwrap())
);
}

#[test]
fn serde_json() {
let full_expression = "(= subject.test = \"true\")";
let full_expression = FullExpression(Expr::from_str(full_expression).unwrap());
let full_expression_str = serde_json::to_string(&full_expression).unwrap();
assert_eq!(full_expression_str, "\"(= subject.test = \\\"true\\\")\"");
let full_expression: PolicyExpression = serde_json::from_str(&full_expression_str).unwrap();
assert_eq!(
full_expression,
FullExpression(Expr::from_str("(= subject.test = \"true\")").unwrap())
);

let boolean_expression = "test";
let boolean_expression =
BooleanExpression(BooleanExpr::from_str(boolean_expression).unwrap());
let boolean_expression_str = serde_json::to_string(&boolean_expression).unwrap();
assert_eq!(boolean_expression_str, "\"test\"");
let boolean_expression: PolicyExpression =
serde_json::from_str(&boolean_expression_str).unwrap();
assert_eq!(
boolean_expression,
BooleanExpression(BooleanExpr::from_str("test").unwrap())
);

let invalid_expression = "\"( test = \"true\")\"";
let result: Result<PolicyExpression, _> = serde_json::from_str(invalid_expression);
assert!(result.is_err());
}
}
2 changes: 2 additions & 0 deletions implementations/rust/ockam/ockam_api/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
mod node_services;
mod token_lease_manager;

pub use node_services::StartInfluxDbLeaseManagerRequest;
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ockam_abac::{Action, PolicyExpression, Resource, ResourceType};
use ockam_core::api::{Error, Response};
use ockam_core::Address;
use ockam_node::{Context, WorkerBuilder};
use serde::{Deserialize, Serialize};
use std::time::Duration;

impl NodeManagerWorker {
Expand Down Expand Up @@ -56,6 +57,7 @@ impl InMemoryNode {
address: Address,
req: StartInfluxDbLeaseManagerRequest,
) -> Result<(), Error> {
debug!(address = %address.address(), "Starting influxdb token lease manager service");
let (incoming_ac, outgoing_ac) = self
.node_manager
.access_control(
Expand All @@ -79,7 +81,7 @@ impl InMemoryNode {
.start(context)
.await?;
self.registry.influxdb_services.insert(address, ()).await;
todo!()
Ok(())
}

async fn delete_influxdb_token_lease_manager_service(
Expand All @@ -99,13 +101,13 @@ impl InMemoryNode {
}
}

#[derive(Debug, Clone, Encode, Decode, CborLen)]
#[derive(Debug, Clone, Encode, Decode, CborLen, Serialize, Deserialize, PartialEq)]
#[rustfmt::skip]
#[cbor(map)]
pub(crate) struct StartInfluxDbLeaseManagerRequest {
pub struct StartInfluxDbLeaseManagerRequest {
#[n(1)] influxdb_org_id: String,
#[n(2)] influxdb_token: String,
#[n(3)] token_permissions: String,
#[n(4)] token_ttl: Duration,
#[n(4)] token_ttl: i32,
#[n(5)] policy_expression: Option<PolicyExpression>,
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::cloud::lease_manager::models::influxdb::Token;
use crate::nodes::service::encode_response;
use minicbor::{Decoder, Encode};
use minicbor::Decoder;
use ockam_core::api::Method::{Delete, Get, Post};
use ockam_core::api::{RequestHeader, Response};
use ockam_core::{async_trait, Address, Routed, Worker};
use ockam_node::Context;
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing_core::field::debug;

#[derive(Clone)]
pub(crate) struct InfluxDbTokenLeaseManagerWorker {
Expand All @@ -20,8 +21,9 @@ impl InfluxDbTokenLeaseManagerWorker {
influxdb_org_id: String,
influxdb_token: String,
token_permissions: String,
token_ttl: Duration,
token_ttl: i32,
) -> Self {
debug!("Creating InfluxDbTokenLeaseManagerWorker");
Self {
inner: Arc::new(Mutex::new(InfluxDbTokenLeaseManagerInner {
address,
Expand Down Expand Up @@ -128,7 +130,7 @@ pub(crate) struct InfluxDbTokenLeaseManagerInner {
influxdb_org_id: String,
influxdb_token: String,
token_permissions: String,
token_ttl: Duration,
token_ttl: i32,
}

#[async_trait]
Expand Down Expand Up @@ -162,29 +164,54 @@ impl InfluxDbTokenLeaseManagerTrait for InfluxDbTokenLeaseManagerWorker {
&self,
ctx: &Context,
) -> Result<Response<Token>, Response<ockam_core::api::Error>> {
todo!()
debug!("Creating token");
Ok(Response::ok().body(Token {
id: "token_id".to_string(),
issued_for: "".to_string(),
created_at: "".to_string(),
expires: "".to_string(),
token: "".to_string(),
status: "".to_string(),
}))
}

async fn get_token(
&self,
ctx: &Context,
token_id: &str,
) -> Result<Response<Token>, Response<ockam_core::api::Error>> {
todo!()
debug!("Getting token");
Ok(Response::ok().body(Token {
id: "token_id".to_string(),
issued_for: "".to_string(),
created_at: "".to_string(),
expires: "".to_string(),
token: "".to_string(),
status: "".to_string(),
}))
}

async fn revoke_token(
&self,
ctx: &Context,
token_id: &str,
) -> Result<Response, Response<ockam_core::api::Error>> {
todo!()
debug!("Revoking token");
Ok(Response::ok())
}

async fn list_tokens(
&self,
ctx: &Context,
) -> Result<Response<Vec<Token>>, Response<ockam_core::api::Error>> {
todo!()
debug!("Listing tokens");
Ok(Response::ok().body(vec![Token {
id: "token_id".to_string(),
issued_for: "".to_string(),
created_at: "".to_string(),
expires: "".to_string(),
token: "".to_string(),
status: "".to_string(),
}]))
}
}
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_command/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod flow_control;
mod global_args;
pub mod identity;
mod kafka;
mod lease;
// mod lease;
mod manpages;
mod markdown;
mod message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ impl NodeConfig {
if let Some(project) = &cmd.trust_opts.project_name {
self.node.project = Some(project.clone().into());
}
if let Some(launch_config) = &cmd.launch_config {
self.node.launch_config = Some(
serde_json::to_string(launch_config)
.into_diagnostic()?
.into(),
);
}
if let Some(context) = &cmd.opentelemetry_context {
self.node.opentelemetry_context =
Some(serde_json::to_string(&context).into_diagnostic()?.into());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
use std::convert::identity;
use std::process::exit;
use std::sync::Arc;

use colorful::Colorful;
use miette::{miette, IntoDiagnostic};
use minicbor::{Decoder, Encode};
use tokio::time::{sleep, Duration};
use tracing::{debug, info, instrument};

use crate::node::CreateCommand;
use crate::secure_channel::listener::create as secure_channel_listener;
use crate::util::exitcode;
use crate::util::foreground_args::wait_for_exit_signal;
use crate::CommandGlobalOpts;
use ockam::tcp::{TcpListenerOptions, TcpTransport};
use ockam::udp::UdpTransport;
use ockam::{Address, Context};
use ockam::{node, Address, Context};
use ockam_api::colors::color_primary;
use ockam_api::nodes::models::services::StartServiceRequest;
use ockam_api::nodes::InMemoryNode;
use ockam_api::nodes::{
service::{NodeManagerGeneralOptions, NodeManagerTransportOptions},
NodeManagerWorker, NODEMANAGER_ADDR,
};
use ockam_api::terminal::notification::NotificationHandler;
use ockam_api::{fmt_log, fmt_ok};
use ockam_api::{fmt_log, fmt_ok, DefaultAddress};
use ockam_core::api::{Request, ResponseHeader};
use ockam_core::{route, LOCAL};

use crate::node::CreateCommand;
use crate::secure_channel::listener::create as secure_channel_listener;
use crate::util::foreground_args::wait_for_exit_signal;
use crate::CommandGlobalOpts;

impl CreateCommand {
#[instrument(skip_all, fields(node_name = self.name))]
pub(super) async fn foreground_mode(
Expand Down Expand Up @@ -188,6 +192,29 @@ impl CreateCommand {
.await?;
}
}
if let Some(cfg) = startup_services.influxdb_token_lessor.clone() {
opts.terminal
.write_line(fmt_log!("Starting InfluxDB token lease manager ..."))?;
let service_name = DefaultAddress::INFLUXDB_TOKEN_LEASE_MANAGER.to_string(); // TODO: should be configurable
let mut req = vec![];
Request::post(format!("/node/services/{service_name}"))
.body(StartServiceRequest::new(cfg, service_name))
.encode(&mut req)
.into_diagnostic()?;

let resp: Vec<u8> = ctx
.send_and_receive(NODEMANAGER_ADDR, req)
.await
.into_diagnostic()?;
let mut dec = Decoder::new(&resp);
let response = dec.decode::<ResponseHeader>().into_diagnostic()?;
if response.is_ok() {
opts.terminal
.write_line(fmt_ok!("InfluxDB token lease manager started"))?;
} else {
return Err(miette!("Failed to start InfluxDB token lease manager"));
}
}
}
}
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct Node {
pub http_server_port: Option<ArgValue>,
pub identity: Option<ArgValue>,
pub project: Option<ArgValue>,
#[serde(alias = "launch-config")]
pub launch_config: Option<ArgValue>,
#[serde(alias = "opentelemetry-context")]
pub opentelemetry_context: Option<ArgValue>,
}
Expand Down Expand Up @@ -68,6 +70,9 @@ impl Resource<CreateCommand> for Node {
if let Some(project) = self.project {
args.insert("project".into(), project);
}
if let Some(launch_config) = self.launch_config {
args.insert("launch-config".into(), launch_config);
}
if let Some(opentelemetry_context) = self.opentelemetry_context {
args.insert("opentelemetry-context".into(), opentelemetry_context);
}
Expand Down
Loading

0 comments on commit f4000dc

Please sign in to comment.