Skip to content

Commit

Permalink
nautirust can use output from step as argument for an other step (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr authored Aug 10, 2022
1 parent ff68ca5 commit 5fb4a7c
Show file tree
Hide file tree
Showing 11 changed files with 427 additions and 181 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.2"
edition = "2021"

[dependencies]
async-recursion = "1.0.0"
async-std = { version = "1.11.0", features = ["attributes"] }
clap = { version = "3.1.18", features = ["derive"] }
config = "0.13.1"
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Nautirust

If you want to use nautirust but not develop for it, take a look at [nautirust-configs](https://github.com/ajuvercr/nautirust-configs).

Nautirust is a command-line program that helps configuring, and is able to start, a data processing pipeline of processes (called steps).
It does not restrict on programming language or start-up sequence.

Expand Down Expand Up @@ -49,7 +51,7 @@ nautirust run plan.json

## Configuration

Channels and runners have to be defined, this can be done with command line arguments or a config file.
Channels and runners have to be defined, this can be done with command line arguments or a config file (`orchestrator.toml` or specify with the `--config` flag).
```toml
channels = "configs/channels/*.json"
runners = "configs/runners/*/runner.json"
Expand Down Expand Up @@ -91,6 +93,9 @@ Example runner configuration:
"canUseChannel": [
"file", "ws"
],
"canUseSerialization": [
"json", "turtle", "plain"
],
"requiredFields": [
"jsFile",
"methodName"
Expand All @@ -101,6 +106,7 @@ Example runner configuration:
Here a runner called JsRunner is defined. Required fields are
- `runnerScript`: how is the runner started
- `canUseChannel`: what channels can this runner provide to the processor
- `canUseSerialization`: what serializations are supported

When a runner is configured in a step `jsFile` and `methodName` have to be provided.

Expand Down
8 changes: 0 additions & 8 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ pub async fn parse_channel(path: PathBuf) -> Result<Channel, Box<dyn Error>> {

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChannelConfig {
id: String,
#[serde(rename = "type")]
ty: String,
serialization: String,
Expand All @@ -110,21 +109,14 @@ pub struct ChannelConfig {

impl ChannelConfig {
pub fn new(
id: String,
ty: String,
serialization: String,
config: Value,
) -> Self {
Self {
id,
ty,
serialization,
config,
}
}
pub fn with_name(&self, name: &str) -> Self {
let mut out = self.clone();
out.id = name.to_string();
out
}
}
31 changes: 17 additions & 14 deletions src/commands/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ use async_std::fs::{self, read_to_string, write};
use tempdir::TempDir;

use super::run::Steps;
use super::OutputConfig;
use crate::channel::Channel;
use crate::runner::Runner;

/// Create a docker-compose file from a nautirust pipeline
#[derive(clap::Args, Debug)]
pub struct Command {
/// Config file
file: String,
file: String,
#[clap(short, long)]
output: bool,
output: bool,
/// temporary directory to put step configuration files
#[clap(short, long)]
tmp_dir: Option<String>,
Expand Down Expand Up @@ -80,7 +81,10 @@ impl Command {
location.as_ref(),
&mut procs,
id,
true,
OutputConfig {
stdout: true,
stderr: false,
},
)
},
);
Expand Down Expand Up @@ -118,23 +122,22 @@ impl Command {
runner.location.as_ref(),
&mut procs,
&value.processor_config.id,
true,
OutputConfig {
stdout: true,
stderr: false,
},
);
}

let docker_header = "services:\n";
let docker_content: String = [docker_header.to_string()]
.into_iter()
.chain(
procs
.into_iter()
.map(|(mut proc, h1, h2)| {
proc.wait().unwrap();
let output = h1.join().unwrap();
h2.join().unwrap();
output
})
)
.chain(procs.into_iter().map(|(mut proc, h1, h2)| {
proc.wait().unwrap();
let output = h1.join().unwrap();
h2.join().unwrap();
output
}))
.collect();

if self.output {
Expand Down
Loading

0 comments on commit 5fb4a7c

Please sign in to comment.