Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hackdays #32

Merged
merged 53 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
08d719b
feat: implement resolve incident command
vringar Aug 10, 2022
0d0296f
run formatter
pihme Aug 10, 2022
5581013
implement CancelProcessInstance
pihme Aug 10, 2022
ee24228
Implement FailJob command
pihme Aug 10, 2022
eae3ea8
feat: implement create process instance command
lenaschoenburg Aug 10, 2022
de7bb74
Merge branch '29-create-process-instance' into hackdays
lenaschoenburg Aug 10, 2022
f3ec1a5
feat: stub publish message command
lenaschoenburg Aug 10, 2022
952e80d
feat: implement publish message command
lenaschoenburg Aug 10, 2022
2ad87af
feat: implement update-retries command
lenaschoenburg Aug 10, 2022
86641e0
SetVariables implemented
vringar Aug 10, 2022
c728a61
refactor: simplify optional attributes for fail job
pihme Aug 10, 2022
e66ab11
Switched to try_from
vringar Aug 10, 2022
c9a3c2a
feat: implement activate jobs command
lenaschoenburg Aug 10, 2022
8f507ef
feat: ThrowError command in CLI
pihme Aug 10, 2022
ab8dc48
refactor: move cancel process instance logic into own file
pihme Aug 10, 2022
974f887
ci: use clippy for linting
pihme Aug 10, 2022
cac0f0b
refactor: move fail_job into own file
pihme Aug 10, 2022
07df129
Remove one unneeded parse
vringar Aug 10, 2022
a0cc4b5
Remove testfile
vringar Aug 10, 2022
ce96eff
feat: add empty auth interceptor
lenaschoenburg Aug 10, 2022
d37112a
Grand rewrite
vringar Aug 10, 2022
1e91194
ci: try nightly toolchain
pihme Aug 10, 2022
04bfec9
Less generics
vringar Aug 11, 2022
23d2931
No more args in main.rs
vringar Aug 11, 2022
a8b985d
fix: remove unused nightly feature
lenaschoenburg Aug 11, 2022
c77f42c
ci: use stable rust
lenaschoenburg Aug 11, 2022
ba1b659
feat: connect over tls
lenaschoenburg Aug 11, 2022
964a684
fix: allow insecure connections
lenaschoenburg Aug 11, 2022
1c907d5
feat: build oauth client
lenaschoenburg Aug 11, 2022
870a55b
refactor: box activate jobs response in main
pihme Aug 12, 2022
3790680
refactor: box message response in main
pihme Aug 12, 2022
07c6064
refactor: rename deploy to deploy resource
pihme Aug 12, 2022
93c59d3
refactor: rename incident to resolve incident
pihme Aug 12, 2022
b07b455
refactor: rename create to create process instance
pihme Aug 12, 2022
0af837d
refactor: rename publish to publihs message
pihme Aug 12, 2022
c69af6d
fixup refactor: rename publish to publish message
pihme Aug 12, 2022
aa7536c
fixup refactor: rename retries to update retries
pihme Aug 12, 2022
830cbf1
fixup refactor: rename active to activate jobs
pihme Aug 12, 2022
93012c5
fixup refactor: reorder commands
pihme Aug 12, 2022
e7e5313
feat: request oauth tokens
lenaschoenburg Aug 12, 2022
36df6a6
refactor: code cleanup
pihme Aug 12, 2022
2b6fbc7
feat: add complete job command
pihme Aug 12, 2022
bf53115
refactor: use pub(crate) consistently
pihme Aug 12, 2022
32d1466
refactor: harmonize CLI annotations
pihme Aug 12, 2022
9cb1e21
docs: improve readme
pihme Aug 12, 2022
48585f2
docs: add README content for the cli tool
lenaschoenburg Aug 12, 2022
e0dbd8b
fix: remove dead code
lenaschoenburg Oct 12, 2022
e204220
fix: replace deprecated method usage
lenaschoenburg Oct 12, 2022
51f1b97
refactor: client only accepts address based connection
lenaschoenburg Oct 12, 2022
b574a33
fix: set audience based connection
lenaschoenburg Oct 12, 2022
a04b1ad
style: fix formatting
lenaschoenburg Oct 12, 2022
f90b337
fix: use usize for argument to reflect usage
lenaschoenburg Oct 12, 2022
d860b11
Update README.md
pihme Oct 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.checkOnSave.command": "clippy"
}
59 changes: 57 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,54 @@


# Zeebe Rust Client
Rust client for Zeebe
Rust client for Zeebe in early stages. Suitable for early adopters

In very early stages.
Features
* CLI for all commands based on Zeebe Client 8.0
* Support for OAuth authentication

Next Steps
1. Worker implementation
2. Publish crates
3. Build an application that uses this Rust client

## Cli Tool

> **Warning**
> The Cli Tool might leak credentials in log messages

Run `cargo run -- help` to see available commands and options.

**Authentication for Camunda Cloud**

First, [generate and download](https://docs.camunda.io/docs/next/components/console/manage-clusters/manage-api-clients/) client credentials from Console. Let's assume they are in a file called credentials.txt.
pihme marked this conversation as resolved.
Show resolved Hide resolved

The `credentials.txt` file contains environment variables, so let's source them:
```shell
$ source credentials.txt
```

Finally, run the cli tool:

```shell
$ cargo run -- status
TopologyResponse {
brokers: [
BrokerInfo {
node_id: 0,
...
```

Alternatively, you can also provide your credentials as arguments:

```shell
$ cargo run -- --address <ADDRESS> --client-id <CLIENT_ID> --client_secret <CLIENT_SECRET> --authorization-server <AUTH_SERVER> status
TopologyResponse {
brokers: [
BrokerInfo {
node_id: 0,
...
```

## Prior Work/Alternatives

Expand All @@ -18,3 +63,13 @@ These repositories also implement Zeebe clients for Rust. Most of them are more

Your best choice is probably: https://github.com/OutThereLabs/zeebe-rust


# Developer Guide

## CLI

**Conventions for CLI Argument Annotations**
* Named parameters by default
* Use positional parameters only if there is just one parameter, or just one required parameter
* For required parameters use short and long version `#[clap(short, long)]`
* For optional parameters use long version only `#[clap(long, default_value_t = 1)]`
9 changes: 7 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@ edition = "2021"
zeebe-client = { path = "../client" }
clap = { version = "3.2", features = ["derive", "env"] }
color-eyre = "0.6"
tonic = "0.8"
tokio = { version = "1", features=["full"] }
tonic = { version = "0.8", features = ["tls", "tls-roots", "gzip"] }
tokio = { version = "1", features=["full"] }
tracing = { version="0.1.36", features = ["async-await", "log"] }
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
tracing-error = { version = "0.2", features = ["traced-error"] }
tracing-tree = { version = "0.2" }
async-trait = "0.1.57"
53 changes: 53 additions & 0 deletions cli/src/activate_jobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::{Debug, ExecuteZeebeCommand};
use async_trait::async_trait;
use clap::Args;
use color_eyre::Result;

use zeebe_client::{
api::{ActivateJobsRequest, ActivateJobsResponse},
ZeebeClient,
};

#[derive(Debug, Args)]
pub(crate) struct ActivateJobsArgs {
job_type: String,

#[clap(short, long, default_value_t = 10)]
max_jobs_to_activate: usize,
#[clap(short= 't', long, default_value_t = 5 * 60 * 1000)]
job_timeout: u64, // todo: should be duration
#[clap(long, required = false, default_value = "worker")]
worker: String,
#[clap(long, required = false)]
variables: Vec<String>,
}

impl From<&ActivateJobsArgs> for ActivateJobsRequest {
fn from(args: &ActivateJobsArgs) -> Self {
ActivateJobsRequest {
r#type: args.job_type.to_owned(),
worker: args.worker.to_owned(),
timeout: args.job_timeout as i64,
max_jobs_to_activate: args.max_jobs_to_activate as i32,
fetch_variable: args.variables.to_owned(),
request_timeout: Default::default(),
}
}
}

#[async_trait]
impl ExecuteZeebeCommand for ActivateJobsArgs {
type Output = Vec<ActivateJobsResponse>;

#[tracing::instrument(skip(client))]
async fn execute(self, client: &mut ZeebeClient) -> Result<Self::Output> {
let args = &self;
let request: ActivateJobsRequest = args.into();
let mut stream = client.activate_jobs(request).await?.into_inner();
let mut result = Vec::with_capacity(args.max_jobs_to_activate);
while let Some(response) = stream.message().await? {
result.push(response);
}
Ok(result)
}
}
34 changes: 34 additions & 0 deletions cli/src/cancel_process_instance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use async_trait::async_trait;
use color_eyre::eyre::Result;

use clap::Args;
use zeebe_client::{
api::{CancelProcessInstanceRequest, CancelProcessInstanceResponse},
ZeebeClient,
};

use crate::ExecuteZeebeCommand;

#[derive(Args)]
pub(crate) struct CancelProcessInstanceArgs {
process_instance_key: i64,
}

impl From<&CancelProcessInstanceArgs> for CancelProcessInstanceRequest {
fn from(args: &CancelProcessInstanceArgs) -> Self {
CancelProcessInstanceRequest {
process_instance_key: args.process_instance_key,
}
}
}

#[async_trait]
impl ExecuteZeebeCommand for CancelProcessInstanceArgs {
type Output = CancelProcessInstanceResponse;
async fn execute(self, client: &mut ZeebeClient) -> Result<Self::Output> {
pihme marked this conversation as resolved.
Show resolved Hide resolved
Ok(client
.cancel_process_instance(CancelProcessInstanceRequest::from(&self))
.await?
.into_inner())
}
}
37 changes: 37 additions & 0 deletions cli/src/complete_job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::{Debug, ExecuteZeebeCommand};
use async_trait::async_trait;
use clap::Args;
use color_eyre::eyre::Result;
use zeebe_client::{
api::{CompleteJobRequest, CompleteJobResponse},
ZeebeClient,
};

#[derive(Args, Clone, Debug)]
pub(crate) struct CompleteJobArgs {
job_key: i64,

#[clap(long, required = false, default_value = "")]
pihme marked this conversation as resolved.
Show resolved Hide resolved
variables: String,
}

impl From<&CompleteJobArgs> for CompleteJobRequest {
fn from(args: &CompleteJobArgs) -> Self {
CompleteJobRequest {
job_key: args.job_key,
variables: args.variables.clone(),
}
}
}

#[async_trait]
impl ExecuteZeebeCommand for CompleteJobArgs {
type Output = CompleteJobResponse;

#[tracing::instrument(skip(client))]
async fn execute(self, client: &mut ZeebeClient) -> Result<Self::Output> {
let args = &self;
let request: CompleteJobRequest = args.into();
Ok(client.complete_job(request).await?.into_inner())
}
}
63 changes: 63 additions & 0 deletions cli/src/create_process_instance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::{Debug, ExecuteZeebeCommand};
use async_trait::async_trait;
use clap::Args;
use color_eyre::eyre::Result;
use zeebe_client::{
api::{CreateProcessInstanceRequest, CreateProcessInstanceWithResultRequest},
ZeebeClient,
};

#[derive(Args, Clone, Debug)]
pub(crate) struct CreateProcessInstanceArgs {
process_instance_key: i64,

#[clap(long, required = false)]
with_results: bool,
#[clap(long, required = false, default_value = "")]
variables: String,
#[clap(long, required = false, default_value_t = -1)]
version: i32,
}

impl From<&CreateProcessInstanceArgs> for CreateProcessInstanceRequest {
fn from(args: &CreateProcessInstanceArgs) -> Self {
CreateProcessInstanceRequest {
process_definition_key: args.process_instance_key,
bpmn_process_id: String::new(),
version: args.version,
variables: args.variables.clone(),
start_instructions: vec![],
}
}
}

#[async_trait]
impl ExecuteZeebeCommand for CreateProcessInstanceArgs {
type Output = Box<dyn Debug>;

#[tracing::instrument(skip(client))]
async fn execute(self, client: &mut ZeebeClient) -> Result<Self::Output> {
handle_create_instance_command(client, &self).await
}
}

async fn handle_create_instance_command(
client: &mut ZeebeClient,
args: &CreateProcessInstanceArgs,
) -> Result<Box<dyn Debug>> {
let request: CreateProcessInstanceRequest = args.into();
match args.with_results {
true => Ok(Box::new(
client
.create_process_instance_with_result(CreateProcessInstanceWithResultRequest {
request: Some(request),
..Default::default()
pihme marked this conversation as resolved.
Show resolved Hide resolved
})
.await?
.into_inner(),
)),
false => Ok(Box::new(
client.create_process_instance(request).await?.into_inner(),
)),
}
}
50 changes: 50 additions & 0 deletions cli/src/deploy_resource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::path::PathBuf;

use async_trait::async_trait;
use clap::Args;

use zeebe_client::{
api::{DeployResourceRequest, DeployResourceResponse, Resource},
ZeebeClient,
};

use crate::ExecuteZeebeCommand;
use color_eyre::Result;

#[derive(Args)]
pub(crate) struct DeployResourceArgs {
#[clap(required = true, value_parser, value_name = "FILE")]
resources: Vec<PathBuf>,
}
#[async_trait]
impl ExecuteZeebeCommand for DeployResourceArgs {
type Output = DeployResourceResponse;

async fn execute(self, client: &mut ZeebeClient) -> Result<Self::Output> {
Ok(client
.deploy_resource(DeployResourceRequest::try_from(&self)?)
.await?
.into_inner())
}
}

impl TryFrom<&DeployResourceArgs> for DeployResourceRequest {
type Error = color_eyre::Report;

fn try_from(args: &DeployResourceArgs) -> Result<DeployResourceRequest, Self::Error> {
let mut resources = Vec::with_capacity(args.resources.len());
for path in &args.resources {
let resource = Resource {
name: path
.file_name()
.expect("resource path should point to a file")
.to_str()
.expect("file name should be UTF-8")
.to_string(),
content: std::fs::read(path)?,
};
resources.push(resource);
}
Ok(Self { resources })
}
}
50 changes: 50 additions & 0 deletions cli/src/fail_job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use async_trait::async_trait;
use color_eyre::eyre::Result;

use crate::ExecuteZeebeCommand;
use clap::Args;

use zeebe_client::{
api::{FailJobRequest, FailJobResponse},
ZeebeClient,
};
#[derive(Args)]
pub(crate) struct FailJobArgs {
// the unique job identifier, as obtained when activating the job
pihme marked this conversation as resolved.
Show resolved Hide resolved
#[clap(short, long)]
job_key: i64,
// the amount of retries the job should have left
#[clap(short, long)]
retries: i32,
// an optional message describing why the job failed
// this is particularly useful if a job runs out of retries and an incident is raised,
// as it this message can help explain why an incident was raised
#[clap(long, required = false, default_value = "")]
error_message: String,
// the back off timeout for the next retry
#[clap(long, required = false, default_value_t = 0)]
retry_back_off: i64,
}

impl From<&FailJobArgs> for FailJobRequest {
fn from(args: &FailJobArgs) -> Self {
FailJobRequest {
job_key: args.job_key,
retries: args.retries,
error_message: args.error_message.to_owned(),
retry_back_off: args.retry_back_off,
}
}
}

#[async_trait]
impl ExecuteZeebeCommand for FailJobArgs {
type Output = FailJobResponse;

async fn execute(self, client: &mut ZeebeClient) -> Result<Self::Output> {
Ok(client
.fail_job(FailJobRequest::from(&self))
.await?
.into_inner())
}
}
Loading