diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..edd1e96 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.checkOnSave.command": "clippy" +} \ No newline at end of file diff --git a/README.md b/README.md index 6dbee2b..83ef066 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,54 @@ # Zeebe Rust Client -Rust client for Zeebe +Rust client for Zeebe in early stages. Suitable for early adopters -In very early stages. +Features +* CLI for all commands based on Zeebe Client 8.0 +* Support for OAuth authentication + +Next Steps +1. Worker implementation +2. Publish crates +3. Build an application that uses this Rust client + +## Cli Tool + +> **Warning** +> The Cli Tool might leak credentials in log messages + +Run `cargo run -- help` to see available commands and options. + +**Authentication for Camunda Cloud** + +First, [generate and download](https://docs.camunda.io/docs/next/components/console/manage-clusters/manage-api-clients/) client credentials for Camunda Cloud. Let's assume they are in a file called credentials.txt. + +The `credentials.txt` file contains environment variables, so let's source them: +```shell +$ source credentials.txt +``` + +Finally, run the cli tool: + +```shell +$ cargo run -- status +TopologyResponse { + brokers: [ + BrokerInfo { + node_id: 0, +... +``` + +Alternatively, you can also provide your credentials as arguments: + +```shell +$ cargo run -- --address
--client-id --client_secret --authorization-server status +TopologyResponse { + brokers: [ + BrokerInfo { + node_id: 0, +... +``` ## Prior Work/Alternatives @@ -18,3 +63,13 @@ These repositories also implement Zeebe clients for Rust. Most of them are more Your best choice is probably: https://github.com/OutThereLabs/zeebe-rust + +# Developer Guide + +## CLI + +**Conventions for CLI Argument Annotations** +* Named parameters by default +* Use positional parameters only if there is just one parameter, or just one required parameter +* For required parameters use short and long version `#[clap(short, long)]` +* For optional parameters use long version only `#[clap(long, default_value_t = 1)]` diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 183f450..e69959c 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -7,5 +7,10 @@ edition = "2021" zeebe-client = { path = "../client" } clap = { version = "3.2", features = ["derive", "env"] } color-eyre = "0.6" -tonic = "0.8" -tokio = { version = "1", features=["full"] } \ No newline at end of file +tonic = { version = "0.8", features = ["tls", "tls-roots", "gzip"] } +tokio = { version = "1", features=["full"] } +tracing = { version="0.1.36", features = ["async-await", "log"] } +tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } +tracing-error = { version = "0.2", features = ["traced-error"] } +tracing-tree = { version = "0.2" } +async-trait = "0.1.57" diff --git a/cli/src/activate_jobs.rs b/cli/src/activate_jobs.rs new file mode 100644 index 0000000..6ee708d --- /dev/null +++ b/cli/src/activate_jobs.rs @@ -0,0 +1,53 @@ +use crate::{Debug, ExecuteZeebeCommand}; +use async_trait::async_trait; +use clap::Args; +use color_eyre::Result; + +use zeebe_client::{ + api::{ActivateJobsRequest, ActivateJobsResponse}, + ZeebeClient, +}; + +#[derive(Debug, Args)] +pub(crate) struct ActivateJobsArgs { + job_type: String, + + #[clap(short, long, default_value_t = 10)] + max_jobs_to_activate: usize, + #[clap(short= 't', long, default_value_t = 5 * 60 * 1000)] + job_timeout: u64, // todo: should be duration + #[clap(long, required = false, default_value = "worker")] + worker: String, + #[clap(long, required = false)] + variables: Vec, +} + +impl From<&ActivateJobsArgs> for ActivateJobsRequest { + fn from(args: &ActivateJobsArgs) -> Self { + ActivateJobsRequest { + r#type: args.job_type.to_owned(), + worker: args.worker.to_owned(), + timeout: args.job_timeout as i64, + max_jobs_to_activate: args.max_jobs_to_activate as i32, + fetch_variable: args.variables.to_owned(), + request_timeout: Default::default(), + } + } +} + +#[async_trait] +impl ExecuteZeebeCommand for ActivateJobsArgs { + type Output = Vec; + + #[tracing::instrument(skip(client))] + async fn execute(self, client: &mut ZeebeClient) -> Result { + let args = &self; + let request: ActivateJobsRequest = args.into(); + let mut stream = client.activate_jobs(request).await?.into_inner(); + let mut result = Vec::with_capacity(args.max_jobs_to_activate); + while let Some(response) = stream.message().await? { + result.push(response); + } + Ok(result) + } +} diff --git a/cli/src/cancel_process_instance.rs b/cli/src/cancel_process_instance.rs new file mode 100644 index 0000000..98d9149 --- /dev/null +++ b/cli/src/cancel_process_instance.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use color_eyre::eyre::Result; + +use clap::Args; +use zeebe_client::{ + api::{CancelProcessInstanceRequest, CancelProcessInstanceResponse}, + ZeebeClient, +}; + +use crate::ExecuteZeebeCommand; + +#[derive(Args)] +pub(crate) struct CancelProcessInstanceArgs { + process_instance_key: i64, +} + +impl From<&CancelProcessInstanceArgs> for CancelProcessInstanceRequest { + fn from(args: &CancelProcessInstanceArgs) -> Self { + CancelProcessInstanceRequest { + process_instance_key: args.process_instance_key, + } + } +} + +#[async_trait] +impl ExecuteZeebeCommand for CancelProcessInstanceArgs { + type Output = CancelProcessInstanceResponse; + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client + .cancel_process_instance(CancelProcessInstanceRequest::from(&self)) + .await? + .into_inner()) + } +} diff --git a/cli/src/complete_job.rs b/cli/src/complete_job.rs new file mode 100644 index 0000000..7cd72b2 --- /dev/null +++ b/cli/src/complete_job.rs @@ -0,0 +1,37 @@ +use crate::{Debug, ExecuteZeebeCommand}; +use async_trait::async_trait; +use clap::Args; +use color_eyre::eyre::Result; +use zeebe_client::{ + api::{CompleteJobRequest, CompleteJobResponse}, + ZeebeClient, +}; + +#[derive(Args, Clone, Debug)] +pub(crate) struct CompleteJobArgs { + job_key: i64, + + #[clap(long, required = false, default_value = "")] + variables: String, +} + +impl From<&CompleteJobArgs> for CompleteJobRequest { + fn from(args: &CompleteJobArgs) -> Self { + CompleteJobRequest { + job_key: args.job_key, + variables: args.variables.clone(), + } + } +} + +#[async_trait] +impl ExecuteZeebeCommand for CompleteJobArgs { + type Output = CompleteJobResponse; + + #[tracing::instrument(skip(client))] + async fn execute(self, client: &mut ZeebeClient) -> Result { + let args = &self; + let request: CompleteJobRequest = args.into(); + Ok(client.complete_job(request).await?.into_inner()) + } +} diff --git a/cli/src/create_process_instance.rs b/cli/src/create_process_instance.rs new file mode 100644 index 0000000..d184e94 --- /dev/null +++ b/cli/src/create_process_instance.rs @@ -0,0 +1,63 @@ +use crate::{Debug, ExecuteZeebeCommand}; +use async_trait::async_trait; +use clap::Args; +use color_eyre::eyre::Result; +use zeebe_client::{ + api::{CreateProcessInstanceRequest, CreateProcessInstanceWithResultRequest}, + ZeebeClient, +}; + +#[derive(Args, Clone, Debug)] +pub(crate) struct CreateProcessInstanceArgs { + process_instance_key: i64, + + #[clap(long, required = false)] + with_results: bool, + #[clap(long, required = false, default_value = "")] + variables: String, + #[clap(long, required = false, default_value_t = -1)] + version: i32, +} + +impl From<&CreateProcessInstanceArgs> for CreateProcessInstanceRequest { + fn from(args: &CreateProcessInstanceArgs) -> Self { + CreateProcessInstanceRequest { + process_definition_key: args.process_instance_key, + bpmn_process_id: String::new(), + version: args.version, + variables: args.variables.clone(), + start_instructions: vec![], + } + } +} + +#[async_trait] +impl ExecuteZeebeCommand for CreateProcessInstanceArgs { + type Output = Box; + + #[tracing::instrument(skip(client))] + async fn execute(self, client: &mut ZeebeClient) -> Result { + handle_create_instance_command(client, &self).await + } +} + +async fn handle_create_instance_command( + client: &mut ZeebeClient, + args: &CreateProcessInstanceArgs, +) -> Result> { + let request: CreateProcessInstanceRequest = args.into(); + match args.with_results { + true => Ok(Box::new( + client + .create_process_instance_with_result(CreateProcessInstanceWithResultRequest { + request: Some(request), + ..Default::default() + }) + .await? + .into_inner(), + )), + false => Ok(Box::new( + client.create_process_instance(request).await?.into_inner(), + )), + } +} diff --git a/cli/src/deploy_resource.rs b/cli/src/deploy_resource.rs new file mode 100644 index 0000000..fb3fb72 --- /dev/null +++ b/cli/src/deploy_resource.rs @@ -0,0 +1,50 @@ +use std::path::PathBuf; + +use async_trait::async_trait; +use clap::Args; + +use zeebe_client::{ + api::{DeployResourceRequest, DeployResourceResponse, Resource}, + ZeebeClient, +}; + +use crate::ExecuteZeebeCommand; +use color_eyre::Result; + +#[derive(Args)] +pub(crate) struct DeployResourceArgs { + #[clap(required = true, value_parser, value_name = "FILE")] + resources: Vec, +} +#[async_trait] +impl ExecuteZeebeCommand for DeployResourceArgs { + type Output = DeployResourceResponse; + + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client + .deploy_resource(DeployResourceRequest::try_from(&self)?) + .await? + .into_inner()) + } +} + +impl TryFrom<&DeployResourceArgs> for DeployResourceRequest { + type Error = color_eyre::Report; + + fn try_from(args: &DeployResourceArgs) -> Result { + let mut resources = Vec::with_capacity(args.resources.len()); + for path in &args.resources { + let resource = Resource { + name: path + .file_name() + .expect("resource path should point to a file") + .to_str() + .expect("file name should be UTF-8") + .to_string(), + content: std::fs::read(path)?, + }; + resources.push(resource); + } + Ok(Self { resources }) + } +} diff --git a/cli/src/fail_job.rs b/cli/src/fail_job.rs new file mode 100644 index 0000000..1b8ab67 --- /dev/null +++ b/cli/src/fail_job.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; +use color_eyre::eyre::Result; + +use crate::ExecuteZeebeCommand; +use clap::Args; + +use zeebe_client::{ + api::{FailJobRequest, FailJobResponse}, + ZeebeClient, +}; +#[derive(Args)] +pub(crate) struct FailJobArgs { + // the unique job identifier, as obtained when activating the job + #[clap(short, long)] + job_key: i64, + // the amount of retries the job should have left + #[clap(short, long)] + retries: i32, + // an optional message describing why the job failed + // this is particularly useful if a job runs out of retries and an incident is raised, + // as it this message can help explain why an incident was raised + #[clap(long, required = false, default_value = "")] + error_message: String, + // the back off timeout for the next retry + #[clap(long, required = false, default_value_t = 0)] + retry_back_off: i64, +} + +impl From<&FailJobArgs> for FailJobRequest { + fn from(args: &FailJobArgs) -> Self { + FailJobRequest { + job_key: args.job_key, + retries: args.retries, + error_message: args.error_message.to_owned(), + retry_back_off: args.retry_back_off, + } + } +} + +#[async_trait] +impl ExecuteZeebeCommand for FailJobArgs { + type Output = FailJobResponse; + + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client + .fail_job(FailJobRequest::from(&self)) + .await? + .into_inner()) + } +} diff --git a/cli/src/main.rs b/cli/src/main.rs index 051a1dc..ab8ff95 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,19 +1,50 @@ -use std::{fmt::Debug, path::PathBuf}; +mod activate_jobs; +mod cancel_process_instance; +mod complete_job; +mod create_process_instance; +mod deploy_resource; +mod fail_job; +mod publish_message; +mod resolve_incident; +mod set_variables; +mod status; +mod throw_error; +mod update_retries; -use clap::{AppSettings, Args, Parser, Subcommand}; -use color_eyre::eyre::Result; +use std::fmt::Debug; + +use async_trait::async_trait; +use clap::{AppSettings, Parser, Subcommand}; -use zeebe_client::api::{DeployResourceRequest, Resource, TopologyRequest}; +use color_eyre::eyre::Result; +use zeebe_client::ZeebeClient; #[derive(Parser)] #[clap(global_setting(AppSettings::DeriveDisplayOrder))] struct Cli { #[clap(flatten)] connection: Connection, + #[clap(flatten)] + auth: Authentication, #[clap(subcommand)] command: Commands, } +#[derive(Parser)] +struct Authentication { + #[clap(long, value_parser, group = "authentication", env = "ZEEBE_CLIENT_ID")] + client_id: Option, + #[clap(long, value_parser, env = "ZEEBE_CLIENT_SECRET")] + client_secret: Option, + #[clap( + long, + value_parser, + env = "ZEEBE_AUTHORIZATION_SERVER_URL", + default_value = "https://login.cloud.camunda.io/oauth/token/" + )] + authorization_server: String, +} + #[derive(Parser)] #[clap(group = clap::ArgGroup::new("connection"))] struct Connection { @@ -45,58 +76,127 @@ struct Connection { #[derive(Subcommand)] enum Commands { - Status, - Deploy(DeployArgs), -} + // status + Status(status::StatusArgs), //aka topology + + // deployment + DeployResource(deploy_resource::DeployResourceArgs), + + // process instance + CreateProcessInstance(create_process_instance::CreateProcessInstanceArgs), + CancelProcessInstance(cancel_process_instance::CancelProcessInstanceArgs), + + // message + PublishMessage(publish_message::PublishMessageArgs), + + // incident + ResolveIncident(resolve_incident::ResolveIncidentArgs), + + // variables + SetVariables(set_variables::SetVariablesArgs), -#[derive(Args)] -struct DeployArgs { - #[clap(required = true, value_parser, value_name = "FILE")] - resources: Vec, + //jobs + ActivateJobs(activate_jobs::ActivateJobsArgs), + CompleteJob(complete_job::CompleteJobArgs), + FailJob(fail_job::FailJobArgs), + UpdateRetries(update_retries::UpdateRetriesArgs), + ThrowError(throw_error::ThrowErrorArgs), } impl From for zeebe_client::Connection { fn from(conn: Connection) -> Self { match conn.address { - Some(addr) => zeebe_client::Connection::Address(addr), - None => zeebe_client::Connection::HostPort(conn.host, conn.port), + Some(addr) => zeebe_client::Connection { + insecure: conn.insecure, + addr, + }, + None => zeebe_client::Connection { + insecure: conn.insecure, + addr: format!("{}:{}", conn.host, conn.port), + }, } } } +impl Authentication { + fn for_connection( + &self, + conn: &zeebe_client::Connection, + ) -> Result { + match (&self.client_id, &self.client_secret) { + (None, None) => Ok(zeebe_client::Authentication::Unauthenticated), + (Some(client_id), Some(client_secret)) => { + let audience = conn + .addr + .rsplit_once(':') + .map(|(authority, _port)| authority) + .unwrap_or(&conn.addr) + .to_owned(); + Ok(zeebe_client::Authentication::Oauth2( + zeebe_client::auth::OAuth2Config { + client_id: client_id.clone(), + client_secret: client_secret.clone(), + auth_server: self.authorization_server.clone(), + audience, + }, + )) + } + _ => Err(color_eyre::eyre::eyre!( + "Partial authentication info, needs at least client id and client secret" + )), + } + } +} + +#[async_trait] +trait ExecuteZeebeCommand { + type Output: Debug; + async fn execute(self, client: &mut ZeebeClient) -> Result; +} + +fn install_tracing() { + use tracing_error::ErrorLayer; + use tracing_subscriber::prelude::*; + use tracing_subscriber::EnvFilter; + use tracing_tree::HierarchicalLayer; + + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); + + tracing_subscriber::registry() + .with(filter_layer) + .with(HierarchicalLayer::new(2)) + .with(ErrorLayer::default()) + .init(); +} + #[tokio::main] async fn main() -> Result<()> { + install_tracing(); + color_eyre::install()?; + let cli: Cli = Cli::parse(); - let mut client = zeebe_client::connect(cli.connection.into()).await?; + let conn: zeebe_client::Connection = cli.connection.into(); + let mut client: ZeebeClient = + zeebe_client::connect(conn.clone(), cli.auth.for_connection(&conn)?).await?; let response: Box = match cli.command { - Commands::Status => Box::new(client.topology(TopologyRequest {}).await?.into_inner()), - Commands::Deploy(args) => Box::new( - client - .deploy_resource(build_deploy_request(args)?) - .await? - .into_inner(), - ), + Commands::ActivateJobs(args) => Box::new(args.execute(&mut client).await?), + Commands::CancelProcessInstance(args) => Box::new(args.execute(&mut client).await?), + Commands::CompleteJob(args) => Box::new(args.execute(&mut client).await?), + Commands::CreateProcessInstance(args) => args.execute(&mut client).await?, // Already boxed, because it could be one of two results + Commands::DeployResource(args) => Box::new(args.execute(&mut client).await?), + Commands::FailJob(args) => Box::new(args.execute(&mut client).await?), + Commands::PublishMessage(args) => Box::new(args.execute(&mut client).await?), + Commands::ResolveIncident(args) => Box::new(args.execute(&mut client).await?), + Commands::SetVariables(args) => Box::new(args.execute(&mut client).await?), + Commands::Status(args) => Box::new(args.execute(&mut client).await?), + Commands::ThrowError(args) => Box::new(args.execute(&mut client).await?), + Commands::UpdateRetries(args) => Box::new(args.execute(&mut client).await?), }; println!("{:#?}", response); Ok(()) } - -fn build_deploy_request(args: DeployArgs) -> Result { - let mut resources = Vec::with_capacity(args.resources.len()); - for path in &args.resources { - let resource = Resource { - name: path - .file_name() - .expect("resource path should point to a file") - .to_str() - .expect("file name should be UTF-8") - .to_string(), - content: std::fs::read(path)?, - }; - resources.push(resource); - } - Ok(DeployResourceRequest { resources }) -} diff --git a/cli/src/publish_message.rs b/cli/src/publish_message.rs new file mode 100644 index 0000000..770a787 --- /dev/null +++ b/cli/src/publish_message.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; +use color_eyre::eyre::Result; +use std::fmt::Debug; + +use clap::Args; + +use zeebe_client::{ + api::{PublishMessageRequest, PublishMessageResponse}, + ZeebeClient, +}; + +use crate::ExecuteZeebeCommand; + +#[derive(Args, Clone, Debug)] +pub(crate) struct PublishMessageArgs { + #[clap(short, long)] + name: String, + #[clap(short, long)] + correlation_key: String, + #[clap(long, required = false)] + message_id: String, + #[clap(long, required = false, default_value = "")] + variables: String, + #[clap(long, required = false, default_value_t = -1)] + ttl: i64, // todo: should be duration +} + +impl From<&PublishMessageArgs> for PublishMessageRequest { + fn from(args: &PublishMessageArgs) -> Self { + PublishMessageRequest { + name: args.name.to_owned(), + correlation_key: args.correlation_key.to_owned(), + time_to_live: args.ttl, + message_id: args.message_id.to_owned(), + variables: args.variables.to_owned(), + } + } +} + +#[async_trait] +impl ExecuteZeebeCommand for PublishMessageArgs { + type Output = PublishMessageResponse; + + #[tracing::instrument(skip(client))] + async fn execute(self, client: &mut ZeebeClient) -> Result { + let args = &self; + let request: PublishMessageRequest = args.into(); + Ok(client.publish_message(request).await?.into_inner()) + } +} diff --git a/cli/src/resolve_incident.rs b/cli/src/resolve_incident.rs new file mode 100644 index 0000000..5fca9ce --- /dev/null +++ b/cli/src/resolve_incident.rs @@ -0,0 +1,29 @@ +use async_trait::async_trait; +use clap::Args; +use color_eyre::Result; + +use zeebe_client::{ + api::{ResolveIncidentRequest, ResolveIncidentResponse}, + ZeebeClient, +}; + +use crate::ExecuteZeebeCommand; + +#[derive(Args)] +pub(crate) struct ResolveIncidentArgs { + incident_key: i64, +} + +#[async_trait] +impl ExecuteZeebeCommand for ResolveIncidentArgs { + type Output = ResolveIncidentResponse; + + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client + .resolve_incident(ResolveIncidentRequest { + incident_key: self.incident_key, + }) + .await? + .into_inner()) + } +} diff --git a/cli/src/set_variables.rs b/cli/src/set_variables.rs new file mode 100644 index 0000000..447e1e3 --- /dev/null +++ b/cli/src/set_variables.rs @@ -0,0 +1,55 @@ +use std::path::PathBuf; + +use crate::ExecuteZeebeCommand; +use async_trait::async_trait; +use clap::Args; +use color_eyre::Result; + +use zeebe_client::{ + api::{SetVariablesRequest, SetVariablesResponse}, + ZeebeClient, +}; + +#[derive(Args)] + +pub(crate) struct SetVariablesArgs { + #[clap(short, long)] + element_instance_key: i64, + #[clap(short, long)] + local: bool, + #[clap(long, value_parser, group = "value")] + path: Option, + #[clap(long, group = "value")] + json: Option, +} + +impl TryFrom for SetVariablesRequest { + type Error = color_eyre::Report; + + fn try_from(args: SetVariablesArgs) -> Result { + let variables = if let Some(path) = &args.path { + std::fs::read_to_string(path)? + } else if let Some(json) = args.json { + json + } else { + unreachable!() + }; + Ok(Self { + element_instance_key: args.element_instance_key, + variables, + local: args.local, + }) + } +} + +#[async_trait] +impl ExecuteZeebeCommand for SetVariablesArgs { + type Output = SetVariablesResponse; + + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client + .set_variables(SetVariablesRequest::try_from(self)?) + .await? + .into_inner()) + } +} diff --git a/cli/src/status.rs b/cli/src/status.rs new file mode 100644 index 0000000..936ec7d --- /dev/null +++ b/cli/src/status.rs @@ -0,0 +1,21 @@ +use crate::ExecuteZeebeCommand; +use async_trait::async_trait; +use clap::Args; +use color_eyre::Result; + +use zeebe_client::{ + api::{TopologyRequest, TopologyResponse}, + ZeebeClient, +}; + +#[derive(Args)] +pub(crate) struct StatusArgs {} + +#[async_trait] +impl ExecuteZeebeCommand for StatusArgs { + type Output = TopologyResponse; + + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client.topology(TopologyRequest {}).await?.into_inner()) + } +} diff --git a/cli/src/throw_error.rs b/cli/src/throw_error.rs new file mode 100644 index 0000000..4047c20 --- /dev/null +++ b/cli/src/throw_error.rs @@ -0,0 +1,45 @@ +use async_trait::async_trait; +use color_eyre::eyre::Result; + +use clap::Args; +use zeebe_client::{ + api::{ThrowErrorRequest, ThrowErrorResponse}, + ZeebeClient, +}; + +use crate::ExecuteZeebeCommand; + +#[derive(Args)] +pub(crate) struct ThrowErrorArgs { + // the unique job identifier, as obtained when activating the job + #[clap(short, long)] + job_key: i64, + // the error code that will be matched with an error catch event + #[clap(short = 'c', long)] + error_code: String, + // an optional error message that provides additional context + #[clap(long, default_value = "")] + error_message: String, +} + +impl From<&ThrowErrorArgs> for ThrowErrorRequest { + fn from(args: &ThrowErrorArgs) -> Self { + ThrowErrorRequest { + job_key: args.job_key, + error_code: args.error_code.to_owned(), + error_message: args.error_message.to_owned(), + } + } +} + +#[async_trait] +impl ExecuteZeebeCommand for ThrowErrorArgs { + type Output = ThrowErrorResponse; + + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client + .throw_error(ThrowErrorRequest::from(&self)) + .await? + .into_inner()) + } +} diff --git a/cli/src/update_retries.rs b/cli/src/update_retries.rs new file mode 100644 index 0000000..ae882f2 --- /dev/null +++ b/cli/src/update_retries.rs @@ -0,0 +1,42 @@ +use crate::{Debug, ExecuteZeebeCommand}; + +use async_trait::async_trait; +use clap::Args; +use color_eyre::Result; +use zeebe_client::{ + api::{UpdateJobRetriesRequest, UpdateJobRetriesResponse}, + ZeebeClient, +}; + +#[derive(Debug, Args)] +pub(crate) struct UpdateRetriesArgs { + #[clap(short, long)] + job_key: u64, + #[clap(short, long)] + retries: u32, +} + +impl TryFrom<&UpdateRetriesArgs> for UpdateJobRetriesRequest { + type Error = std::num::TryFromIntError; + fn try_from( + args: &UpdateRetriesArgs, + ) -> Result { + Ok(UpdateJobRetriesRequest { + job_key: args.job_key.try_into()?, + retries: args.retries.try_into()?, + }) + } +} + +#[async_trait] +impl ExecuteZeebeCommand for UpdateRetriesArgs { + type Output = UpdateJobRetriesResponse; + + #[tracing::instrument(skip(client))] + async fn execute(self, client: &mut ZeebeClient) -> Result { + Ok(client + .update_job_retries(UpdateJobRetriesRequest::try_from(&self)?) + .await? + .into_inner()) + } +} diff --git a/client/Cargo.toml b/client/Cargo.toml index 1603769..56e815b 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,10 +4,12 @@ version = "0.1.0" edition = "2021" [dependencies] -tonic = "0.8" +tonic = { version = "0.8", features = ["tls", "tls-roots", "gzip"] } prost = "0.11" thiserror = "1.0" +tracing = { version="0.1", features = ["async-await"] } +oauth2 = { version = "4.2.3", features = ["ureq"] } [build-dependencies] -tonic-build = "0.8" -prost-build = "0.11" \ No newline at end of file +tonic-build = { version = "0.8" } +prost-build = "0.11" diff --git a/client/src/auth.rs b/client/src/auth.rs new file mode 100644 index 0000000..d4fa9d8 --- /dev/null +++ b/client/src/auth.rs @@ -0,0 +1,96 @@ +use oauth2::{ + basic::{BasicClient, BasicTokenResponse}, + url::ParseError, + AuthUrl, ClientId, ClientSecret, TokenResponse, TokenUrl, +}; +use thiserror::Error; +use tonic::{metadata::MetadataValue, service::Interceptor}; +use tracing::instrument; + +#[derive(Debug)] +pub struct OAuth2Config { + pub client_id: String, + pub client_secret: String, + pub auth_server: String, + pub audience: String, +} + +#[derive(Debug)] +pub struct OAuth2Provider { + client: BasicClient, + config: OAuth2Config, +} + +#[derive(Error, Debug)] +pub enum AuthError { + #[error("Token request failed")] + TokenRequestFailed, +} + +impl OAuth2Provider { + fn from_config(config: OAuth2Config) -> Result { + let client = BasicClient::new( + ClientId::new(config.client_id.clone()), + Some(ClientSecret::new(config.client_secret.clone())), + AuthUrl::new(config.auth_server.clone())?, + Some(TokenUrl::new(config.auth_server.clone())?), + ) + .set_auth_type(oauth2::AuthType::RequestBody); + Ok(OAuth2Provider { config, client }) + } + + #[instrument] + fn get_token(&mut self) -> Result { + let request = self + .client + .exchange_client_credentials() + .add_extra_param("audience", &self.config.audience); + tracing::debug!(request = ?request, "requesting token"); + request.request(oauth2::ureq::http_client).map_err(|e| { + tracing::error!(error = ?e, "request to get token failed"); + AuthError::TokenRequestFailed + }) + } +} + +pub struct AuthInterceptor { + auth: Option, +} + +impl AuthInterceptor { + pub fn none() -> AuthInterceptor { + AuthInterceptor { auth: None } + } + pub fn oauth2(config: OAuth2Config) -> Result { + Ok(AuthInterceptor { + auth: Some(OAuth2Provider::from_config(config)?), + }) + } +} + +impl Interceptor for AuthInterceptor { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> Result, tonic::Status> { + // TODO: Don't request a new token for every request. Look into tower to handle this + // elegantly. + if let Some(provider) = &mut self.auth { + let token = match provider.get_token() { + Ok(token) => token.access_token().secret().to_owned(), + Err(e) => { + tracing::error!(error = ?e, "failed to get token"); + return Err(tonic::Status::unauthenticated("failed to get token")); + } + }; + let header_value = format!("Bearer {}", token); + request.metadata_mut().insert( + "authorization", + MetadataValue::try_from(&header_value).map_err(|_| { + tonic::Status::unauthenticated("token is not a valid header value") + })?, + ); + } + Ok(request) + } +} diff --git a/client/src/lib.rs b/client/src/lib.rs index 4a2aef2..8fa6d30 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,10 +1,14 @@ -use std::str::FromStr; +pub mod auth; -use api::gateway_client::GatewayClient; +use auth::{AuthInterceptor, OAuth2Config}; +use generated_api::gateway_client::GatewayClient; +use oauth2::url::ParseError; use thiserror::Error; +use tracing::instrument; + use tonic::{ - codegen::http::uri::InvalidUri, - transport::{self, Channel, Uri}, + codegen::http::{self}, + transport::{self, Channel, ClientTlsConfig, Uri}, }; mod generated_api { @@ -15,9 +19,16 @@ pub mod api { pub use super::generated_api::*; } -pub enum Connection { - Address(String), - HostPort(String, u16), +#[derive(Debug, Clone)] +pub struct Connection { + pub insecure: bool, + pub addr: String, +} + +#[derive(Debug)] +pub enum Authentication { + Unauthenticated, + Oauth2(OAuth2Config), } #[derive(Error, Debug)] @@ -25,14 +36,39 @@ pub enum ConnectionError { #[error(transparent)] Transport(#[from] transport::Error), #[error(transparent)] - Uri(#[from] InvalidUri), + Http(#[from] http::Error), + #[error(transparent)] + Oauth2(#[from] ParseError), } -pub async fn connect(conn: Connection) -> Result, ConnectionError> { - let uri = match conn { - Connection::Address(addr) => Uri::from_str(&addr), - Connection::HostPort(host, port) => Uri::from_str(&format!("http://{}:{}", host, port)), - }?; - let channel = Channel::builder(uri); - Ok(GatewayClient::new(channel.connect().await?)) +pub type ZeebeClient = + GatewayClient>; + +#[instrument(level = "debug")] +pub async fn connect( + conn: Connection, + auth: Authentication, +) -> Result { + let uri = Uri::builder() + .scheme(match conn.insecure { + true => "http", + false => "https", + }) + .authority(conn.addr) + .path_and_query("") + .build()?; + let interceptor = match auth { + Authentication::Unauthenticated => AuthInterceptor::none(), + Authentication::Oauth2(oauth_config) => AuthInterceptor::oauth2(oauth_config)?, + }; + tracing::debug!("Connecting to {}", uri); + let channel = if conn.insecure { + Channel::builder(uri) + } else { + Channel::builder(uri).tls_config(ClientTlsConfig::new())? + }; + Ok(api::gateway_client::GatewayClient::with_interceptor( + channel.connect().await?, + interceptor, + )) }