From fae66b1fe147db988a353876edab746b9466abef Mon Sep 17 00:00:00 2001 From: Shanith K K Date: Fri, 8 Dec 2023 15:19:13 +0530 Subject: [PATCH] fix:workflow-macro-term --- workflow/workflow_macro/src/lib.rs | 65 ++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 16 deletions(-) diff --git a/workflow/workflow_macro/src/lib.rs b/workflow/workflow_macro/src/lib.rs index 863ccfeb..ee1c762d 100644 --- a/workflow/workflow_macro/src/lib.rs +++ b/workflow/workflow_macro/src/lib.rs @@ -20,7 +20,7 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { impl #workflow { - + pub fn node_count(&self) -> usize { self.nodes.len() } @@ -60,27 +60,60 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { } } - pub fn term(&mut self, task_index: Option) -> Result { - + pub fn term(&mut self, task_index: Option) -> Result { match task_index { Some(index) => { - let previous_index = (index - 1).try_into().unwrap(); - let previous_task = self.get_task(previous_index); - let previous_task_output = previous_task.get_task_output(); - let current_task = self.get_task_as_mut(index); - current_task.set_output_to_task(previous_task_output); - match current_task.execute(){ - Ok(()) => Ok(current_task.get_task_output()), - Err(err) => Err(err), + let mut list = Vec::new(); + let edges_list = self.edges.clone(); + edges_list.iter().for_each(|(source, destination)| { + if destination == &index { + list.push(source) + } + }); + let mut res: Vec = Vec::new(); + match list.len() { + 0 => { + let current_task = self.get_task_as_mut(index); + match current_task.execute() { + Ok(()) => return Ok(current_task.get_task_output()), + Err(err) => return Err(err), + } + } + 1 => { + let previous_task_output = self.get_task(*list[0]).get_task_output(); + let current_task = self.get_task_as_mut(index); + current_task.set_output_to_task(previous_task_output); + match current_task.execute() { + Ok(()) => return Ok(current_task.get_task_output()), + Err(err) => return Err(err), + } + } + _ => { + res = list + .iter() + .map(|index| { + let previous_task = self.get_task(**index); + let previous_task_output = previous_task.get_task_output(); + previous_task_output + }) + .collect(); + + let s: Value = res.into(); + let current_task = self.get_task_as_mut(index); + current_task.set_output_to_task(s); + + match current_task.execute() { + Ok(()) => return Ok(current_task.get_task_output()), + Err(err) => return Err(err), + }; + } } - - }, + } None => { let len = self.node_count(); - Ok(self.get_task(len-1).get_task_output()) - }, + Ok(self.get_task(len - 1).get_task_output()) + } } - } pub fn pipe(&mut self, task_index: usize) -> Result<&mut Self,String> {