diff --git a/Cargo.lock b/Cargo.lock index 6e1f538..a8654ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,6 +117,17 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-std" version = "1.11.0" @@ -793,6 +804,7 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" name = "nautirust" version = "0.1.2" dependencies = [ + "async-recursion", "async-std", "clap", "config", diff --git a/Cargo.toml b/Cargo.toml index 061af94..4afc00f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.2" edition = "2021" [dependencies] +async-recursion = "1.0.0" async-std = { version = "1.11.0", features = ["attributes"] } clap = { version = "3.1.18", features = ["derive"] } config = "0.13.1" diff --git a/README.md b/README.md index d060ff7..be2b3b0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # Nautirust +If you want to use nautirust but not develop for it, take a look at [nautirust-configs](https://github.com/ajuvercr/nautirust-configs). + Nautirust is a command-line program that helps configuring, and is able to start, a data processing pipeline of processes (called steps). It does not restrict on programming language or start-up sequence. @@ -49,7 +51,7 @@ nautirust run plan.json ## Configuration -Channels and runners have to be defined, this can be done with command line arguments or a config file. +Channels and runners have to be defined, this can be done with command line arguments or a config file (`orchestrator.toml` or specify with the `--config` flag). ```toml channels = "configs/channels/*.json" runners = "configs/runners/*/runner.json" @@ -91,6 +93,9 @@ Example runner configuration: "canUseChannel": [ "file", "ws" ], + "canUseSerialization": [ + "json", "turtle", "plain" + ], "requiredFields": [ "jsFile", "methodName" @@ -101,6 +106,7 @@ Example runner configuration: Here a runner called JsRunner is defined. Required fields are - `runnerScript`: how is the runner started - `canUseChannel`: what channels can this runner provide to the processor +- `canUseSerialization`: what serializations are supported When a runner is configured in a step `jsFile` and `methodName` have to be provided. diff --git a/src/channel.rs b/src/channel.rs index fe0e671..74b4579 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -101,7 +101,6 @@ pub async fn parse_channel(path: PathBuf) -> Result> { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ChannelConfig { - id: String, #[serde(rename = "type")] ty: String, serialization: String, @@ -110,21 +109,14 @@ pub struct ChannelConfig { impl ChannelConfig { pub fn new( - id: String, ty: String, serialization: String, config: Value, ) -> Self { Self { - id, ty, serialization, config, } } - pub fn with_name(&self, name: &str) -> Self { - let mut out = self.clone(); - out.id = name.to_string(); - out - } } diff --git a/src/commands/docker.rs b/src/commands/docker.rs index 4299d73..4291b3d 100644 --- a/src/commands/docker.rs +++ b/src/commands/docker.rs @@ -6,6 +6,7 @@ use async_std::fs::{self, read_to_string, write}; use tempdir::TempDir; use super::run::Steps; +use super::OutputConfig; use crate::channel::Channel; use crate::runner::Runner; @@ -13,9 +14,9 @@ use crate::runner::Runner; #[derive(clap::Args, Debug)] pub struct Command { /// Config file - file: String, + file: String, #[clap(short, long)] - output: bool, + output: bool, /// temporary directory to put step configuration files #[clap(short, long)] tmp_dir: Option, @@ -80,7 +81,10 @@ impl Command { location.as_ref(), &mut procs, id, - true, + OutputConfig { + stdout: true, + stderr: false, + }, ) }, ); @@ -118,23 +122,22 @@ impl Command { runner.location.as_ref(), &mut procs, &value.processor_config.id, - true, + OutputConfig { + stdout: true, + stderr: false, + }, ); } let docker_header = "services:\n"; let docker_content: String = [docker_header.to_string()] .into_iter() - .chain( - procs - .into_iter() - .map(|(mut proc, h1, h2)| { - proc.wait().unwrap(); - let output = h1.join().unwrap(); - h2.join().unwrap(); - output - }) - ) + .chain(procs.into_iter().map(|(mut proc, h1, h2)| { + proc.wait().unwrap(); + let output = h1.join().unwrap(); + h2.join().unwrap(); + output + })) .collect(); if self.output { diff --git a/src/commands/generate.rs b/src/commands/generate.rs index 4026748..565de71 100644 --- a/src/commands/generate.rs +++ b/src/commands/generate.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use async_std::fs; @@ -10,7 +10,7 @@ use serde_json::{Map, Value}; use crate::channel::{Channel, ChannelConfig}; use crate::runner::Runner; -use crate::step::{self, Step, StepArguments}; +use crate::step::{self, Output, Step, StepArgument, StepArguments, SubStep}; /// Generate a pipeline of steps #[derive(clap::Args, Debug)] @@ -27,12 +27,11 @@ pub struct Command { automatic: bool, } -pub type Args = HashMap; #[derive(Serialize, Deserialize, Debug)] pub struct RunConfig { #[serde(rename = "processorConfig")] processor: Step, - args: Args, + args: HashMap, } #[derive(Debug)] @@ -128,8 +127,9 @@ impl Command { .collect::>(); let mut open_channels: Vec> = Vec::new(); - let mut all_step_args: HashMap = HashMap::new(); + let mut done_steps = Vec::::new(); + let mut used_steps = HashSet::::new(); for step in &steps { let mut step_args = StepArguments::new(step); @@ -155,12 +155,13 @@ impl Command { arg_style.apply_to(&arg.description), ); } - let ids = extract_string_array(&arg.other, "sourceIds") - .unwrap_or_default(); + let source_ids = + extract_string_array(&arg.other, "sourceIds") + .unwrap_or_default(); - let mut targets = Vec::new(); + let mut fields = HashMap::new(); - for id in &ids { + for id in &source_ids { // todo! make better let (config, tmp_target) = ask_channel_config( id, @@ -178,15 +179,16 @@ impl Command { .unwrap() .use_target( tmp_target.writer_id, - config.with_name(tmp_target.name), + tmp_target.name, + config.clone(), ); } - targets.push(config); + fields.insert(id.to_string(), config); } - let value = serde_json::to_value(targets).unwrap(); - step_args.add_argument(arg.id.to_string(), value); + let argument = StepArgument::StreamReader { fields }; + step_args.add_argument(arg.id.to_string(), argument); } "streamWriter" => { let ids = extract_string_array(&arg.other, "targetIds") @@ -204,38 +206,106 @@ impl Command { } } _ => { - let value; - if arg.default == false { - value = loop { - println!( - "Argument: {} ({})", - arg_style.apply_to(&arg.id), - type_style.apply_to(&arg.ty) + println!( + "Argument: {} ({})", + arg_style.apply_to(&arg.id), + type_style.apply_to(&arg.ty) + ); + if !arg.description.is_empty() { + println!( + "Description: {}", + arg_style.apply_to(&arg.description), + ); + } + let input_options = ["plain", "file", "process"]; + + let input_choice = + ask_user_for("input type", &input_options, false); + + let argument = match input_options[input_choice] { + "plain" => { + let string = if arg.default { + arg.value.clone() + } else { + let mut prompt = Input::::new(); + prompt + .with_prompt(" ") + .with_initial_text(arg.value.clone()) + .completion_with(&Complete); + ask_until_ready(|| prompt.interact_text()) + }; + + let value = serde_json::to_value(&string) + .unwrap_or(Value::String(string)); + StepArgument::Plain { value } + } + "file" => { + let mut prompt = Input::::new(); + prompt + .with_prompt("Path: ") + .with_initial_text(arg.value.clone()) + .completion_with(&Complete); + let path = + ask_until_ready(|| prompt.interact_text()); + + let serialization = ask_user_for_serialization( + serialization_types, ); - if !arg.description.is_empty() { - println!( - "Description: {}", - arg_style.apply_to(&arg.description), - ); - } - if let Ok(inp) = Input::::new() - .with_prompt(" ") - .with_initial_text(arg.value.clone()) - .completion_with(&Complete) - .interact_text() - { - if let Ok(v) = serde_json::from_str(&inp) { - break v; - } else { - break Value::String(inp); - } + StepArgument::File { + path, + serialization, } - }; - } else { - value = Value::String(arg.value.clone()); - } - step_args.add_argument(arg.id.to_string(), value); + } + "process" => { + let process_index = ask_user_for( + "Process Name", + &done_steps, + false, + ); + let output = ask_user_for( + "Process output", + &["stdout", "stderr"], + false, + ); + let output = if output == 0 { + Output::Stdout + } else { + Output::Stderr + }; + + used_steps.insert( + done_steps[process_index].to_string(), + ); + let linked_step = all_step_args + .get(&done_steps[process_index]) + .unwrap(); + let linked_step_ser = serializations_per_runner + [&linked_step.step.runner_id]; + + let possible_sers: Vec<_> = serialization_types + .iter() + .filter(|x| { + linked_step_ser.iter().any(|y| x == &y) + }) + .collect(); + let serialization = + ask_user_for_serialization(&possible_sers); + + StepArgument::Step { + sub: SubStep { + run: linked_step + .clone() + .into_runthing(), + output, + serialization, + }, + } + } + _ => unreachable!(), + }; + + step_args.add_argument(arg.id.to_string(), argument); } } } @@ -246,6 +316,8 @@ impl Command { { panic!("Found multiple steps with the same id '{}'", step.id); } + + done_steps.push(step.id.to_string()); } if !open_channels.is_empty() { @@ -267,22 +339,19 @@ impl Command { let ser = ask_user_for_serialization(target.possible_serializations); - let ch_config = ChannelConfig::new( - target.name.to_string(), - ty.to_string(), - ser, - config, - ); + let ch_config = ChannelConfig::new(ty.to_string(), ser, config); - all_step_args - .get_mut(target.step_id) - .unwrap() - .use_target(target.writer_id, ch_config); + all_step_args.get_mut(target.step_id).unwrap().use_target( + target.writer_id, + target.name, + ch_config, + ); } } let args = all_step_args .into_values() + .filter(|args| !used_steps.contains(&args.step.id)) .map(StepArguments::into_value) .collect::>(); @@ -389,13 +458,10 @@ fn ask_channel_config<'a>( let (config, ty) = ask_user_for_channel(&types, channel_options, automatic); let ser = ask_user_for_serialization(&sers); - Some(( - ChannelConfig::new(id.to_string(), ty.to_string(), ser, config), - target, - )) + Some((ChannelConfig::new(ty.to_string(), ser, config), target)) } -fn ask_user_for_serialization(options: &[String]) -> String { +fn ask_user_for_serialization(options: &[S]) -> String { let ser_index = ask_user_for("What serialization?", options, false); options[ser_index].to_string() @@ -423,6 +489,14 @@ fn ask_user_for_channel<'a>( (options.remove(channel_index), ty) } +fn ask_until_ready Result>(mut f: F) -> T { + loop { + if let Ok(x) = f() { + break x; + } + } +} + fn ask_user_for( name: &str, things: &'_ [T], diff --git a/src/commands/mod.rs b/src/commands/mod.rs index b30900a..57ccc97 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -85,12 +85,18 @@ fn get_used_channels<'a>( channels.iter().filter(move |chan| is_present(&chan.id)) } +#[derive(Default)] +pub struct OutputConfig { + stdout: bool, + stderr: bool, +} + fn start_subproc, S: AsRef>( script: Str, location: Option, name: &str, - output: bool, -) -> Option<(std::process::Child, JoinHandle, JoinHandle<()>)> { + output: OutputConfig, +) -> Option<(std::process::Child, JoinHandle, JoinHandle)> { let location = location.and_then(expand_tilde); let mut proc = std::process::Command::new("sh"); @@ -112,7 +118,7 @@ fn start_subproc, S: AsRef>( BufReader::new(stdout).lines().for_each(|line| { let line = line.unwrap_or_else(|_| String::from("error")); println!("\x1b[32mINFO\x1b[39m {}: {}", id1, line); - if output { + if output.stdout { lines.push(line); } }); @@ -121,13 +127,19 @@ fn start_subproc, S: AsRef>( let id2 = name.to_string(); let h2 = spawn(move || { + let mut lines = Vec::new(); BufReader::new(stderr).lines().for_each(|line| { + let line = line.unwrap_or_else(|_| String::from("error")); println!( "\x1b[31mERRO\x1b[39m {}: {}", id2, - line.unwrap_or_else(|_| String::from("error")) - ) + line, + ); + if output.stderr { + lines.push(line); + } }); + lines.into_iter().collect() }); Some((child, h1, h2)) @@ -136,9 +148,9 @@ fn start_subproc, S: AsRef>( fn add_add_subproc, S: AsRef>( script: &Option, location: Option, - procs: &mut Vec<(Child, JoinHandle, JoinHandle<()>)>, + procs: &mut Vec<(Child, JoinHandle, JoinHandle)>, id: &str, - output: bool, + output: OutputConfig, ) { if let Some(stop_script) = script { let proc = start_subproc(stop_script, location, id, output); diff --git a/src/commands/prepare.rs b/src/commands/prepare.rs index 45a7d14..4e65ee2 100644 --- a/src/commands/prepare.rs +++ b/src/commands/prepare.rs @@ -1,6 +1,7 @@ use async_std::fs::read_to_string; +use super::OutputConfig; use super::run::{RunThing, Steps}; use crate::channel::Channel; use crate::runner::Runner; @@ -27,7 +28,7 @@ impl Command { id, .. }| { - super::add_add_subproc(start, location.as_ref(), &mut procs, id, false) + super::add_add_subproc(start, location.as_ref(), &mut procs, id, OutputConfig::default()) }, ); @@ -50,7 +51,7 @@ impl Command { location.as_ref(), &mut procs, id, - false, + OutputConfig::default(), ) }, ); @@ -71,7 +72,7 @@ impl Command { location.as_ref(), &mut procs, id, - false, + OutputConfig::default(), ); }, ); diff --git a/src/commands/run.rs b/src/commands/run.rs index 2a32b35..9452bd5 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -1,23 +1,25 @@ +use std::collections::HashMap; use std::env; -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::process::Child; +use std::thread::JoinHandle; +use async_recursion::async_recursion; use async_std::fs::{self, read_to_string, write}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tempdir::TempDir; +use super::OutputConfig; use crate::channel::Channel; -use crate::commands::generate::Args; use crate::runner::Runner; -use crate::step::Step; +use crate::step::{Output, Step, StepArgument, SubStep}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RunThing { #[serde(rename = "processorConfig")] pub processor_config: Step, - args: Args, - #[serde(flatten)] - rest: Value, + pub(crate) args: HashMap, } #[derive(Serialize, Deserialize, Debug)] @@ -30,78 +32,175 @@ pub struct Steps { #[derive(clap::Args, Debug)] pub struct Command { /// Config file - file: String, + file: String, /// temporary directory to put step configuration files #[clap(short, long)] tmp_dir: Option, } -impl Command { - pub(crate) async fn execute( - self, - _channels: Vec, - runners: Vec, - ) { - let mut tmp_dir = None; - - if let Some(l) = &self.tmp_dir { - fs::create_dir_all(l).await.unwrap(); - } +struct RunHandler<'a> { + tmp_dir: PathBuf, + runners: &'a Vec, + sub_argument_outputs: HashMap, +} - let path = self +impl<'a> RunHandler<'a> { + fn from(command: &Command, runners: &'a Vec) -> Self { + let path = command .tmp_dir .as_ref() .map(|l| Path::new(l).to_owned()) .unwrap_or_else(|| { let tmp = TempDir::new("orchestrator").unwrap(); - let out = tmp.path().to_owned(); - tmp_dir = Some(tmp); - out + tmp.path().to_owned() }); - let content = read_to_string(self.file).await.unwrap(); + Self { + sub_argument_outputs: HashMap::default(), + runners, + tmp_dir: path, + } + } + + fn get_runner(&self, id: &str) -> &'a Runner { + self.runners.iter().find(|r| r.id == id).unwrap() + } + + fn get_tmp_file(&self, id: &str) -> PathBuf { + self.tmp_dir.join(format!("{}.json", id)) + } + + #[async_recursion] + async fn arg_to_value(&mut self, arg: StepArgument) -> Option { + if let StepArgument::Step { + sub: + SubStep { + run, + serialization, + output, + }, + } = arg + { + if let Some(value) = + self.sub_argument_outputs.get(&run.processor_config.id) + { + return value.clone().into(); + } + + let process_config_id = run.processor_config.id.clone(); + let (mut child, stdout, stderr) = run_thing(run, self, OutputConfig { stdout: true, stderr: true }).await?; + + child.wait().ok()?; + + let stdout = stdout.join().ok()?; + let stderr = stderr.join().ok()?; + + let (content, terminator) = match output { + Output::Stdout => (stdout, ".stdout"), + Output::Stderr => (stderr, ".stderr"), + }; + + let path = self.get_tmp_file(&format!("{}{}", process_config_id, terminator)); + write(&path, content).await.ok()?; + + let out = StepArgument::File { + path: path.to_string_lossy().to_string(), + serialization, + }; + + let value = serde_json::to_value(out).ok()?; + self.sub_argument_outputs + .insert(process_config_id, value.clone()); + value.into() + } else { + serde_json::to_value(arg).ok() + } + } +} + +async fn run_thing( + run: RunThing, + handler: &mut RunHandler<'_>, + output: OutputConfig, +) -> Option<(Child, JoinHandle, JoinHandle)> { + #[derive(Serialize)] + struct SimpleRun<'a> { + #[serde(rename = "processorConfig")] + pub processor_config: &'a Step, + args: HashMap, + } + + let mut args = HashMap::new(); + for (k, v) in run.args { + args.insert(k, handler.arg_to_value(v).await?); + } + + let name = &run.processor_config.id; + let runner = handler.get_runner(&run.processor_config.runner_id); + + let run = SimpleRun { + processor_config: &run.processor_config, + args + }; + + let config = serde_json::to_string_pretty(&run).ok()?; + run_value(config, handler.get_tmp_file(name), runner, name, output).await +} + +async fn run_value( + config: String, + file: PathBuf, + runner: &Runner, + name: &str, + output: OutputConfig, +) -> Option<(Child, JoinHandle, JoinHandle)> { + write(file.clone(), config).await.unwrap(); + + let config_path = format!( + "'{}'", + file.canonicalize().expect("canonicalize path").display() + ); + + let current_dir = format!( + "'{}'", + env::current_dir() + .unwrap() + .canonicalize() + .expect("canonicalize path") + .display() + ); + + let command = runner + .script + .clone() + .replace("{config}", &config_path) + .replace("{cwd}", ¤t_dir); + + super::start_subproc( + command, + runner.location.as_ref(), + name, + output + ) +} + +impl Command { + pub(crate) async fn execute( + self, + _channels: Vec, + runners: Vec, + ) { + let content = read_to_string(&self.file).await.unwrap(); let values: Steps = serde_json::from_str(&content).unwrap(); let mut procs = Vec::new(); + let mut handler = RunHandler::from(&self, &runners); + fs::create_dir_all(&handler.tmp_dir).await.unwrap(); + for value in values.steps { - let file = path.join(format!("{}.json", value.processor_config.id)); - let config = serde_json::to_vec_pretty(&value).unwrap(); - - write(file.clone(), config).await.unwrap(); - - let runner = runners - .iter() - .find(|r| r.id == value.processor_config.runner_id) - .unwrap(); - - let config_path = format!( - "'{}'", - file.canonicalize().expect("canonicalize path").display() - ); - let current_dir = format!( - "'{}'", - env::current_dir() - .unwrap() - .canonicalize() - .expect("canonicalize path") - .display() - ); - - let command = runner - .script - .clone() - .replace("{config}", &config_path) - .replace("{cwd}", ¤t_dir); - - let proc = super::start_subproc( - command, - runner.location.as_ref(), - &value.processor_config.id, - false, - ); - - procs.extend(proc); + let proc = run_thing(value, &mut handler, OutputConfig::default()).await.expect(""); + procs.push(proc); } for (mut proc, h1, h2) in procs { diff --git a/src/commands/stop.rs b/src/commands/stop.rs index d95c410..3458191 100644 --- a/src/commands/stop.rs +++ b/src/commands/stop.rs @@ -1,5 +1,6 @@ use async_std::fs::read_to_string; +use super::OutputConfig; use super::run::Steps; use crate::channel::Channel; use crate::runner::Runner; @@ -28,7 +29,7 @@ impl Command { location.as_ref(), &mut procs, id, - false, + OutputConfig::default(), ) }, ); @@ -52,7 +53,7 @@ impl Command { location.as_ref(), &mut procs, id, - false, + OutputConfig::default(), ) }, ); diff --git a/src/step.rs b/src/step.rs index e40efea..9919e71 100644 --- a/src/step.rs +++ b/src/step.rs @@ -8,24 +8,25 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use crate::channel::ChannelConfig; +use crate::commands::run::RunThing; use crate::runner::Runner; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct StepArg { - pub id: String, + pub id: String, #[serde(rename = "type")] - pub ty: String, + pub ty: String, #[serde(flatten)] - pub other: Map, + pub other: Map, #[serde(default)] - pub default: bool, + pub default: bool, #[serde(default)] - pub value: String, + pub value: String, #[serde(default)] pub description: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Step { pub id: String, #[serde(rename = "runnerId")] @@ -89,32 +90,81 @@ pub async fn parse_step>( Ok(channel) } -pub struct StepArguments { - step: Value, - stream_reader: HashMap>, +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Output { + Stdout, + Stderr, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SubStep { + pub run: RunThing, + pub serialization: String, + pub output: Output, +} - arguments: Vec<(String, Value)>, +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum StepArgument { + StreamReader { + fields: HashMap, + }, + StreamWriter { + fields: HashMap, + }, + File { + path: String, + serialization: String, + }, + Plain { + value: Value, + }, + Step { + #[serde(flatten)] + sub: SubStep, + }, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StepArguments { + pub step: Step, + arguments: HashMap, } impl StepArguments { - pub fn new(step: &Step) -> Self { - let value = serde_json::to_value(step).unwrap(); + pub fn new(from: &Step) -> Self { Self { - step: value, - stream_reader: HashMap::new(), - arguments: Vec::new(), + step: from.clone(), + arguments: HashMap::new(), } } - pub fn add_argument(&mut self, id: String, value: Value) { - self.arguments.push((id, value)); + pub fn add_argument(&mut self, id: String, value: StepArgument) { + self.arguments.insert(id, value); } - pub fn use_target(&mut self, id: &str, config: ChannelConfig) { - if let Some(configs) = self.stream_reader.get_mut(id) { - configs.push(config); - } else { - self.stream_reader.insert(id.to_string(), vec![config]); + pub fn use_target(&mut self, id: &str, field: &str, config: ChannelConfig) { + if !self.arguments.contains_key(id) { + self.arguments.insert( + id.to_string(), + StepArgument::StreamWriter { + fields: HashMap::new(), + }, + ); + } + + match self.arguments.get_mut(id) { + Some(StepArgument::StreamWriter { ref mut fields }) => { + fields.insert(field.to_string(), config); + } + _ => panic!("expected a stream writer"), + } + } + + pub fn into_runthing(self) -> RunThing { + RunThing { + processor_config: self.step, + args: self.arguments, } } @@ -125,11 +175,6 @@ impl StepArguments { out.insert(id, arg); }); - self.stream_reader.into_iter().for_each(|(id, reader)| { - let value = serde_json::to_value(reader).unwrap(); - out.insert(id, value); - }); - json!({ "processorConfig": self.step, "args": out