diff --git a/compiler/preprocessor/preprocessor.py b/compiler/preprocessor/preprocessor.py index d44a5e0fd..e13e21ea6 100644 --- a/compiler/preprocessor/preprocessor.py +++ b/compiler/preprocessor/preprocessor.py @@ -3,7 +3,7 @@ import os import config -from shell_ast import ast_to_ast +from shell_ast import transformation_options, ast_to_ast from ir import FileIdGen from parse import parse_shell_to_asts, from_ast_objects_to_shell from util import * @@ -30,20 +30,21 @@ def preprocess(input_script_path, args): ## 3. Translate the new AST back to shell syntax preprocessing_unparsing_start_time = datetime.now() preprocessed_shell_script = from_ast_objects_to_shell(preprocessed_asts) - + preprocessing_unparsing_end_time = datetime.now() print_time_delta("Preprocessing -- Unparsing", preprocessing_unparsing_start_time, preprocessing_unparsing_end_time) return preprocessed_shell_script def preprocess_asts(ast_objects, args): - trans_mode = ast_to_ast.TransformationType(args.preprocess_mode) - if trans_mode is ast_to_ast.TransformationType.SPECULATIVE: - trans_options = ast_to_ast.SpeculativeTransformationState(mode=trans_mode, - po_file=args.partial_order_file) + trans_mode = transformation_options.TransformationType(args.preprocess_mode) + if trans_mode is transformation_options.TransformationType.SPECULATIVE: + trans_options = transformation_options.SpeculativeTransformationState(po_file=args.partial_order_file) util_spec.initialize(trans_options) + elif trans_mode is transformation_options.TransformationType.AIRFLOW: + trans_options = transformation_options.AirflowTransformationState() else: - trans_options = ast_to_ast.TransformationState(mode=trans_mode) + trans_options = transformation_options.TransformationState() ## Preprocess ASTs by replacing AST regions with calls to PaSh's runtime. ## Then the runtime will do the compilation and optimization with additional @@ -52,7 +53,7 @@ def preprocess_asts(ast_objects, args): ## Let the scheduler know that we are done with the partial_order file ## TODO: We could stream the partial_order_file to the scheduler - if trans_mode is ast_to_ast.TransformationType.SPECULATIVE: + if trans_mode is transformation_options.TransformationType.SPECULATIVE: ## First complete the partial_order file util_spec.serialize_partial_order(trans_options) @@ -85,7 +86,7 @@ def main(): ## TODO: When we better integrate, this should be automatically set. parser_spec.add_argument("partial_order_file", help="the file to store the partial order (currently just a sequence)") parser_spec.set_defaults(preprocess_mode='spec') - + args = parser.parse_args() config.set_config_globals_from_pash_args(args) diff --git a/compiler/shell_ast/ast_to_ast.py b/compiler/shell_ast/ast_to_ast.py index 7af0828c7..b1fe71054 100644 --- a/compiler/shell_ast/ast_to_ast.py +++ b/compiler/shell_ast/ast_to_ast.py @@ -1,146 +1,26 @@ -from enum import Enum -import copy -import pickle +""" +AST to AST transformation -import config -from env_var_names import * -from shell_ast.ast_util import * -from shasta.ast_node import ast_match -from shasta.json_to_ast import to_ast_node -from parse import from_ast_objects_to_shell -from speculative import util_spec - -## There are two types of ast_to_ast transformations -class TransformationType(Enum): - PASH = 'pash' - SPECULATIVE = 'spec' - -## Use this object to pass options inside the preprocessing -## trasnformation. -class TransformationState: - def __init__(self, mode: TransformationType): - self.mode = mode - self.node_counter = 0 - self.loop_counter = 0 - self.loop_contexts = [] - - def get_mode(self): - return self.mode - - ## Node id related - def get_next_id(self): - new_id = self.node_counter - self.node_counter += 1 - return new_id - - def get_current_id(self): - return self.node_counter - 1 - - def get_number_of_ids(self): - return self.node_counter - - ## Loop id related - def get_next_loop_id(self): - new_id = self.loop_counter - self.loop_counter += 1 - return new_id - - def get_current_loop_context(self): - ## We want to copy that - return self.loop_contexts[:] - - def get_current_loop_id(self): - if len(self.loop_contexts) == 0: - return None - else: - return self.loop_contexts[0] - - def enter_loop(self): - new_loop_id = self.get_next_loop_id() - self.loop_contexts.insert(0, new_loop_id) - return new_loop_id - - def exit_loop(self): - self.loop_contexts.pop(0) - - -## TODO: Turn it into a Transformation State class, and make a subclass for -## each of the two transformations. It is important for it to be state, because -## it will need to be passed around while traversing the tree. -class SpeculativeTransformationState(TransformationState): - def __init__(self, mode: TransformationType, po_file: str): - super().__init__(mode) - assert(self.mode is TransformationType.SPECULATIVE) - self.partial_order_file = po_file - self.partial_order_edges = [] - self.partial_order_node_loop_contexts = {} - - def get_partial_order_file(self): - assert(self.mode is TransformationType.SPECULATIVE) - return self.partial_order_file - - def add_edge(self, from_id: int, to_id: int): - self.partial_order_edges.append((from_id, to_id)) - - def get_all_edges(self): - return self.partial_order_edges - - def add_node_loop_context(self, node_id: int, loop_contexts): - self.partial_order_node_loop_contexts[node_id] = loop_contexts - - def get_all_loop_contexts(self): - return self.partial_order_node_loop_contexts - - -## -## Preprocessing -## - -## The preprocessing pass replaces all _candidate_ dataflow regions with -## calls to PaSh's runtime to let it establish if they are actually dataflow -## regions. The pass serializes all candidate dataflow regions: -## - A list of ASTs if at the top level or -## - an AST subtree if at a lower level -## -## The PaSh runtime then deserializes the(m, compiles them (if safe) and optimizes them. - -preprocess_cases = { - "Pipe": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_pipe(ast_node, trans_options, last_object=last_object)), - "Command": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_command(ast_node, trans_options, last_object=last_object)), - "Redir": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_redir(ast_node, trans_options, last_object=last_object)), - "Background": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_background(ast_node, trans_options, last_object=last_object)), - "Subshell": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_subshell(ast_node, trans_options, last_object=last_object)), - "For": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_for(ast_node, trans_options, last_object=last_object)), - "While": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_while(ast_node, trans_options, last_object=last_object)), - "Defun": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_defun(ast_node, trans_options, last_object=last_object)), - "Semi": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_semi(ast_node, trans_options, last_object=last_object)), - "Or": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_or(ast_node, trans_options, last_object=last_object)), - "And": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_and(ast_node, trans_options, last_object=last_object)), - "Not": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_not(ast_node, trans_options, last_object=last_object)), - "If": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_if(ast_node, trans_options, last_object=last_object)), - "Case": (lambda trans_options, last_object: - lambda ast_node: preprocess_node_case(ast_node, trans_options, last_object=last_object)) -} +The preprocessing pass replaces all _candidate_ dataflow regions with +calls to PaSh's runtime to let it establish if they are actually dataflow +regions. The pass serializes all candidate dataflow regions: +- A list of ASTs if at the top level or +- an AST subtree if at a lower level +The PaSh runtime then deserializes the(m, compiles them (if safe) and optimizes them. +""" +from env_var_names import * +from shell_ast.ast_util import * +from shell_ast.preprocess_ast_cases import preprocess_node +from shell_ast.transformation_options import AbstractTransformationState -## Replace candidate dataflow AST regions with calls to PaSh's runtime. -def replace_ast_regions(ast_objects, trans_options): +def replace_ast_regions(ast_objects, trans_options: AbstractTransformationState): + """ + Replace candidate dataflow AST regions with calls to PaSh's runtime. + """ preprocessed_asts = [] candidate_dataflow_region = [] last_object = False @@ -150,12 +30,12 @@ def replace_ast_regions(ast_objects, trans_options): ## If we are working on the last object we need to keep that in mind when replacing. ## ## The last df-region should not be executed in parallel no matter what (to not lose its exit code.) - if (i == len(ast_objects) - 1): + if i == len(ast_objects) - 1: # log("Last object") last_object = True ast, original_text, _linno_before, _linno_after = ast_object - assert(isinstance(ast, AstNode)) + assert isinstance(ast, AstNode) ## Goals: This transformation can approximate in several directions. ## 1. Not replacing a candidate dataflow region. @@ -174,487 +54,87 @@ def replace_ast_regions(ast_objects, trans_options): ## then the second output is true. ## - If the next AST needs to be replaced too (e.g. if the current one is a background) ## then the third output is true - preprocessed_ast_object = preprocess_node(ast, trans_options, last_object=last_object) + preprocessed_ast_object = preprocess_node( + ast, trans_options, last_object=last_object + ) ## If the dataflow region is not maximal then it implies that the whole ## AST should be replaced. - assert(not preprocessed_ast_object.is_non_maximal() - or preprocessed_ast_object.should_replace_whole_ast()) - + assert ( + not preprocessed_ast_object.is_non_maximal() + or preprocessed_ast_object.should_replace_whole_ast() + ) + ## If the whole AST needs to be replaced then it implies that ## something will be replaced - assert(not preprocessed_ast_object.should_replace_whole_ast() - or preprocessed_ast_object.will_anything_be_replaced()) + assert ( + not preprocessed_ast_object.should_replace_whole_ast() + or preprocessed_ast_object.will_anything_be_replaced() + ) ## If it isn't maximal then we just add it to the candidate - if(preprocessed_ast_object.is_non_maximal()): - candidate_dataflow_region.append((preprocessed_ast_object.ast, - original_text)) + if preprocessed_ast_object.is_non_maximal(): + candidate_dataflow_region.append( + (preprocessed_ast_object.ast, original_text) + ) else: ## If the current candidate dataflow region is non-empty ## it means that the previous AST was in the background so ## the current one has to be included in the process no matter what - if (len(candidate_dataflow_region) > 0): - candidate_dataflow_region.append((preprocessed_ast_object.ast, - original_text)) + if len(candidate_dataflow_region) > 0: + candidate_dataflow_region.append( + (preprocessed_ast_object.ast, original_text) + ) ## Since the current one is maximal (or not wholy replaced) ## we close the candidate. - dataflow_region_asts, dataflow_region_lines = unzip(candidate_dataflow_region) + dataflow_region_asts, dataflow_region_lines = unzip( + candidate_dataflow_region + ) dataflow_region_text = join_original_text_lines(dataflow_region_lines) - replaced_ast = replace_df_region(dataflow_region_asts, trans_options, - ast_text=dataflow_region_text, disable_parallel_pipelines=last_object) + replaced_ast = trans_options.replace_df_region( + dataflow_region_asts, + ast_text=dataflow_region_text, + disable_parallel_pipelines=last_object, + ) candidate_dataflow_region = [] preprocessed_asts.append(replaced_ast) else: - if(preprocessed_ast_object.should_replace_whole_ast()): - replaced_ast = replace_df_region([preprocessed_ast_object.ast], trans_options, - ast_text=original_text, disable_parallel_pipelines=last_object) + if preprocessed_ast_object.should_replace_whole_ast(): + replaced_ast = trans_options.replace_df_region( + [preprocessed_ast_object.ast], + ast_text=original_text, + disable_parallel_pipelines=last_object, + ) preprocessed_asts.append(replaced_ast) else: ## In this case, it is possible that no replacement happened, ## meaning that we can simply return the original parsed text as it was. - if(preprocessed_ast_object.will_anything_be_replaced() or original_text is None): + if ( + preprocessed_ast_object.will_anything_be_replaced() + or original_text is None + ): preprocessed_asts.append(preprocessed_ast_object.ast) else: preprocessed_asts.append(UnparsedScript(original_text)) ## Close the final dataflow region - if(len(candidate_dataflow_region) > 0): + if len(candidate_dataflow_region) > 0: dataflow_region_asts, dataflow_region_lines = unzip(candidate_dataflow_region) dataflow_region_text = join_original_text_lines(dataflow_region_lines) - replaced_ast = replace_df_region(dataflow_region_asts, trans_options, - ast_text=dataflow_region_text, disable_parallel_pipelines=True) + replaced_ast = trans_options.replace_df_region( + dataflow_region_asts, + ast_text=dataflow_region_text, + disable_parallel_pipelines=True, + ) candidate_dataflow_region = [] preprocessed_asts.append(replaced_ast) return preprocessed_asts -## This function joins original unparsed shell source in a safe way + +## This function joins original unparsed shell source in a safe way ## so as to deal with the case where some of the text is None (e.g., in case of stdin parsing). def join_original_text_lines(shell_source_lines_or_none): if any([text_or_none is None for text_or_none in shell_source_lines_or_none]): return None else: return "\n".join(shell_source_lines_or_none) - -def preprocess_node(ast_object, trans_options, last_object=False): - global preprocess_cases - return ast_match(ast_object, preprocess_cases, trans_options, last_object) - -## This preprocesses the AST node and also replaces it if it needs replacement . -## It is called by constructs that cannot be included in a dataflow region. -def preprocess_close_node(ast_object, trans_options, last_object=False): - preprocessed_ast_object = preprocess_node(ast_object, trans_options, last_object=last_object) - preprocessed_ast = preprocessed_ast_object.ast - should_replace_whole_ast = preprocessed_ast_object.should_replace_whole_ast() - if(should_replace_whole_ast): - final_ast = replace_df_region([preprocessed_ast], trans_options, - disable_parallel_pipelines=last_object) - something_replaced = True - else: - final_ast = preprocessed_ast - something_replaced = preprocessed_ast_object.will_anything_be_replaced() - return final_ast, something_replaced - -def preprocess_node_pipe(ast_node, trans_options, last_object=False): - ## A pipeline is *always* a candidate dataflow region. - ## Q: Is that true? - - ## TODO: Preprocess the internals of the pipe to allow - ## for mutually recursive calls to PaSh. - ## - ## For example, if a command in the pipe has a command substitution - ## in one of its arguments then we would like to call our runtime - ## there instead of - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=True, - non_maximal=ast_node.is_background, - last_ast=last_object) - return preprocessed_ast_object - -## TODO: Complete this -def preprocess_node_command(ast_node, trans_options, last_object=False): - ## TODO: Preprocess the internals of the pipe to allow - ## for mutually recursive calls to PaSh. - ## - ## For example, if a command in the pipe has a command substitution - ## in one of its arguments then we would like to call our runtime - ## there instead of - - ## If there are no arguments, the command is just an - ## assignment (Q: or just redirections?) - if(len(ast_node.arguments) == 0): - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=False, - last_ast=last_object) - return preprocessed_ast_object - - ## This means we have a command. Commands are always candidate dataflow - ## regions. - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=True, - non_maximal=False, - last_ast=last_object) - return preprocessed_ast_object - -# Background of (linno * t * redirection list) -## TODO: It might be possible to actually not close the inner node but rather apply the redirections on it -def preprocess_node_redir(ast_node, trans_options, last_object=False): - preprocessed_node, something_replaced = preprocess_close_node(ast_node.node, trans_options, - last_object=last_object) - ast_node.node = preprocessed_node - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=something_replaced, - last_ast=last_object) - return preprocessed_ast_object - -## TODO: Is that correct? Also, this should probably affect `semi`, `and`, and `or` -def preprocess_node_background(ast_node, trans_options, last_object=False): - ## A background node is *always* a candidate dataflow region. - ## Q: Is that true? - - ## TODO: Preprocess the internals of the background to allow - ## for mutually recursive calls to PaSh. - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=True, - non_maximal=True, - last_ast=last_object) - return preprocessed_ast_object - -## TODO: We can actually preprocess the underlying node and then -## return its characteristics above. However, we would need -## to add a field in the IR that a node runs in a subshell -## (which would have implications on how the backend outputs it). -## -## e.g. a subshell node should also be output as a subshell in the backend. -## FIXME: This might not just be suboptimal, but also wrong. -def preprocess_node_subshell(ast_node, trans_options, last_object=False): - preprocessed_body, something_replaced = preprocess_close_node(ast_node.body, trans_options, - last_object=last_object) - ast_node.body = preprocessed_body - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=something_replaced, - last_ast=last_object) - return preprocessed_ast_object - -## TODO: For all of the constructs below, think whether we are being too conservative - -## TODO: This is not efficient at all since it calls the PaSh runtime everytime the loop is entered. -## We have to find a way to improve that. -def preprocess_node_for(ast_node, trans_options, last_object=False): - ## If we are in a loop, we push the loop identifier into the loop context - loop_id = trans_options.enter_loop() - preprocessed_body, something_replaced = preprocess_close_node(ast_node.body, trans_options, last_object=last_object) - - ## TODO: Then send this iteration identifier when talking to the spec scheduler - ## TODO: After running checks put this behind a check to only run under speculation - - ## Create a new variable that tracks loop iterations - var_name = loop_iter_var(loop_id) - export_node = make_export_var_constant_string(var_name, '0') - increment_node = make_increment_var(var_name) - - ## Also store the whole sequence of loop iters in a file - all_loop_ids = trans_options.get_current_loop_context() - - ## export pash_loop_iters="$pash_loop_XXX_iter $pash_loop_YYY_iter ..." - save_loop_iters_node = export_pash_loop_iters_for_current_context(all_loop_ids) - - ## Prepend the increment in the body - ast_node.body = make_typed_semi_sequence( - [to_ast_node(increment_node), - to_ast_node(save_loop_iters_node), - copy.deepcopy(preprocessed_body)]) - - ## We pop the loop identifier from the loop context. - ## - ## KK 2023-04-27: Could this exit happen before the replacement leading to wrong - ## results? I think not because we use the _close_node preprocessing variant. - ## A similar issue might happen for while - trans_options.exit_loop() - - ## reset the loop iters after we exit the loop - out_of_loop_loop_ids = trans_options.get_current_loop_context() - reset_loop_iters_node = export_pash_loop_iters_for_current_context(out_of_loop_loop_ids) - - ## Prepend the export in front of the loop - # new_node = ast_node - new_node = make_typed_semi_sequence( - [to_ast_node(export_node), - ast_node, - to_ast_node(reset_loop_iters_node)]) - # print(new_node) - - preprocessed_ast_object = PreprocessedAST(new_node, - replace_whole=False, - non_maximal=False, - something_replaced=something_replaced, - last_ast=last_object) - - return preprocessed_ast_object - -def preprocess_node_while(ast_node, trans_options, last_object=False): - ## If we are in a loop, we push the loop identifier into the loop context - trans_options.enter_loop() - - preprocessed_test, sth_replaced_test = preprocess_close_node(ast_node.test, trans_options, last_object=last_object) - preprocessed_body, sth_replaced_body = preprocess_close_node(ast_node.body, trans_options, last_object=last_object) - ast_node.test = preprocessed_test - ast_node.body = preprocessed_body - something_replaced = sth_replaced_test or sth_replaced_body - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=something_replaced, - last_ast=last_object) - - ## We pop the loop identifier from the loop context. - trans_options.exit_loop() - return preprocessed_ast_object - -## This is the same as the one for `For` -def preprocess_node_defun(ast_node, trans_options, last_object=False): - ## TODO: For now we don't want to compile function bodies - # preprocessed_body = preprocess_close_node(ast_node.body) - # ast_node.body = preprocessed_body - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=False, - last_ast=last_object) - return preprocessed_ast_object - -## TODO: If the preprocessed is not maximal we actually need to combine it with the one on the right. -def preprocess_node_semi(ast_node, trans_options, last_object=False): - # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) - ## - ## TODO: Is it valid that only the right one is considered the last command? - preprocessed_left, sth_replaced_left = preprocess_close_node(ast_node.left_operand, trans_options, last_object=False) - preprocessed_right, sth_replaced_right = preprocess_close_node(ast_node.right_operand, trans_options, last_object=last_object) - ast_node.left_operand = preprocessed_left - ast_node.right_operand = preprocessed_right - sth_replaced = sth_replaced_left or sth_replaced_right - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=sth_replaced, - last_ast=last_object) - return preprocessed_ast_object - -## TODO: Make sure that what is inside an `&&`, `||`, `!` (and others) does not run in parallel_pipelines -## since we need its exit code. -def preprocess_node_and(ast_node, trans_options, last_object=False): - # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) - preprocessed_left, sth_replaced_left = preprocess_close_node(ast_node.left_operand, trans_options, last_object=last_object) - preprocessed_right, sth_replaced_right = preprocess_close_node(ast_node.right_operand, trans_options, last_object=last_object) - ast_node.left_operand = preprocessed_left - ast_node.right_operand = preprocessed_right - sth_replaced = sth_replaced_left or sth_replaced_right - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=sth_replaced, - last_ast=last_object) - return preprocessed_ast_object - -def preprocess_node_or(ast_node, trans_options, last_object=False): - # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) - preprocessed_left, sth_replaced_left = preprocess_close_node(ast_node.left_operand, trans_options, last_object=last_object) - preprocessed_right, sth_replaced_right = preprocess_close_node(ast_node.right_operand, trans_options, last_object=last_object) - ast_node.left_operand = preprocessed_left - ast_node.right_operand = preprocessed_right - sth_replaced = sth_replaced_left or sth_replaced_right - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=sth_replaced, - last_ast=last_object) - return preprocessed_ast_object - -def preprocess_node_not(ast_node, trans_options, last_object=False): - # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left) - preprocessed_body, sth_replaced = preprocess_close_node(ast_node.body, trans_options, last_object=last_object) - ast_node.body = preprocessed_body - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=sth_replaced, - last_ast=last_object) - return preprocessed_ast_object - - -def preprocess_node_if(ast_node, trans_options, last_object=False): - # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) - preprocessed_cond, sth_replaced_cond = preprocess_close_node(ast_node.cond, trans_options, last_object=last_object) - preprocessed_then, sth_replaced_then = preprocess_close_node(ast_node.then_b, trans_options, last_object=last_object) - preprocessed_else, sth_replaced_else = preprocess_close_node(ast_node.else_b, trans_options, last_object=last_object) - ast_node.cond = preprocessed_cond - ast_node.then_b = preprocessed_then - ast_node.else_b = preprocessed_else - sth_replaced = sth_replaced_cond or sth_replaced_then or sth_replaced_else - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=sth_replaced, - last_ast=last_object) - return preprocessed_ast_object - -def preprocess_case(case, trans_options, last_object=False): - preprocessed_body, sth_replaced = preprocess_close_node(case["cbody"], trans_options, last_object=last_object) - case["cbody"] = preprocessed_body - return case, sth_replaced - -def preprocess_node_case(ast_node, trans_options, last_object=False): - preprocessed_cases_replaced = [preprocess_case(case, trans_options, last_object=last_object) for case in ast_node.cases] - preprocessed_cases, sth_replaced_cases = list(zip(*preprocessed_cases_replaced)) - ast_node.cases = preprocessed_cases - preprocessed_ast_object = PreprocessedAST(ast_node, - replace_whole=False, - non_maximal=False, - something_replaced=any(sth_replaced_cases), - last_ast=last_object) - return preprocessed_ast_object - - -## TODO: I am a little bit confused about how compilation happens. -## Does it happen bottom up or top down: i.e. when we first encounter an occurence -## do we recurse in it and then compile from the leaf, or just compile the surface? - - - -## Replaces IR subtrees with a command that calls them (more -## precisely, a command that calls a python script to call them). -## -## Note: The traversal that replace_irs does, is exactly the same as -## the one that is done by compile_node. Both of these functions -## transform nodes of type t to something else. -## -## TODO: For now this just replaces the IRs starting from the ourside -## one first, but it should start from the bottom up to handle -## recursive IRs. - -## This function serializes a candidate df_region in a file, and in its place, -## it adds a command that calls our distribution planner with the name of the -## saved file. -## -## If we are need to disable parallel pipelines, e.g., if we are in the context of an if, -## or if we are in the end of a script, then we set a variable. -def replace_df_region(asts, trans_options, disable_parallel_pipelines=False, ast_text=None) -> AstNode: - transformation_mode = trans_options.get_mode() - if transformation_mode is TransformationType.PASH: - ir_filename = ptempfile() - - ## Serialize the node in a file - with open(ir_filename, "wb") as ir_file: - pickle.dump(asts, ir_file) - - ## Serialize the candidate df_region asts back to shell - ## so that the sequential script can be run in parallel to the compilation. - sequential_script_file_name = ptempfile() - text_to_output = get_shell_from_ast(asts, ast_text=ast_text) - ## However, if we have the original ast text, then we can simply output that. - with open(sequential_script_file_name, "w") as script_file: - script_file.write(text_to_output) - replaced_node = make_call_to_pash_runtime(ir_filename, sequential_script_file_name, disable_parallel_pipelines) - elif transformation_mode is TransformationType.SPECULATIVE: - text_to_output = get_shell_from_ast(asts, ast_text=ast_text) - ## Generate an ID - df_region_id = trans_options.get_next_id() - - ## Get the current loop id and save it so that the runtime knows - ## which loop it is in. - loop_id = trans_options.get_current_loop_id() - - ## Determine its predecessors - ## TODO: To make this properly work, we should keep some state - ## in the AST traversal to be able to determine predecessors. - if df_region_id == 0: - predecessors = [] - else: - predecessors = [df_region_id - 1] - ## Write to a file indexed by its ID - util_spec.save_df_region(text_to_output, trans_options, df_region_id, predecessors) - ## TODO: Add an entry point to spec through normal PaSh - replaced_node = make_call_to_spec_runtime(df_region_id, loop_id) - else: - ## Unreachable - assert(False) - - return to_ast_node(replaced_node) - - -def get_shell_from_ast(asts, ast_text=None) -> str: - ## If we don't have the original ast text, we need to unparse the ast - if (ast_text is None): - text_to_output = from_ast_objects_to_shell(asts) - else: - text_to_output = ast_text - return text_to_output - - -## -## Code that constructs the preprocessed ASTs -## - - -## This function makes a command that calls the pash runtime -## together with the name of the file containing an IR. Then the -## pash runtime should read from this file and continue -## execution. -## -## TODO: At the moment this is written in python but it is in essense a simple shell script. -## Is it possible to make it be a simple string instead of manually creating the AST? -## -## (MAYBE) TODO: The way I did it, is by calling the parser once, and seeing -## what it returns. Maybe it would make sense to call the parser on -## the fly to have a cleaner implementation here? -def make_call_to_pash_runtime(ir_filename, sequential_script_file_name, - disable_parallel_pipelines) -> AstNode: - - ## Disable parallel pipelines if we are in the last command of the script. - ## ``` - ## pash_disable_parallel_pipelines=1 - ## ``` - if(disable_parallel_pipelines): - assignments = [["pash_disable_parallel_pipelines", - string_to_argument("1")]] - else: - assignments = [["pash_disable_parallel_pipelines", - string_to_argument("0")]] - assignments.append(["pash_sequential_script_file", - string_to_argument(sequential_script_file_name)]) - assignments.append(["pash_input_ir_file", - string_to_argument(ir_filename)]) - - ## Call the runtime - arguments = [string_to_argument("source"), - string_to_argument(config.RUNTIME_EXECUTABLE)] - runtime_node = make_command(arguments, - assignments=assignments) - return runtime_node - -## TODO: Make that an actual call to the spec runtime -def make_call_to_spec_runtime(command_id: int, loop_id) -> AstNode: - assignments = [["pash_spec_command_id", - string_to_argument(str(command_id))]] - if loop_id is None: - loop_id_str = "" - else: - loop_id_str = str(loop_id) - - assignments.append(["pash_spec_loop_id", - string_to_argument(loop_id_str)]) - - ## Call the runtime - arguments = [string_to_argument("source"), - string_to_argument(config.RUNTIME_EXECUTABLE)] - ## Pass all relevant argument to the planner - runtime_node = make_command(arguments, - assignments=assignments) - - return runtime_node diff --git a/compiler/shell_ast/ast_util.py b/compiler/shell_ast/ast_util.py index 57529904f..3abc5ddbb 100644 --- a/compiler/shell_ast/ast_util.py +++ b/compiler/shell_ast/ast_util.py @@ -1,4 +1,3 @@ - from env_var_names import * from shasta.ast_node import * from shasta.json_to_ast import * @@ -20,7 +19,7 @@ def should_replace_whole_ast(self): def is_non_maximal(self): return self.non_maximal - + def will_anything_be_replaced(self): return self.something_replaced @@ -224,4 +223,4 @@ def make_echo_ast(argument, var_file_path): line_number = 0 node = make_kv('Command', [line_number, [], arguments, []]) nodes.append(node) - return nodes \ No newline at end of file + return nodes diff --git a/compiler/shell_ast/preprocess_ast_cases.py b/compiler/shell_ast/preprocess_ast_cases.py new file mode 100644 index 000000000..3ba32f584 --- /dev/null +++ b/compiler/shell_ast/preprocess_ast_cases.py @@ -0,0 +1,479 @@ +import copy + +from shell_ast.ast_util import * +from shell_ast.transformation_options import AbstractTransformationState +from shasta.ast_node import AstNode + + +def preprocess_node( + ast_node: AstNode, + trans_options: AbstractTransformationState, + last_object: bool, +) -> PreprocessedAST: + """ + Preprocesses an AstNode. Given an AstNode of any type, it will appropriately + dispatch a preprocessor for the specificy node type + + Parameters: + ast_node (AstNode): The AstNode to parse + trans_options (AbstractTransformationState): + A concrete transformation state instance corresponding to the output target + last_object (bool): Flag for whether this is the last AstNode + + Returns: + PreprocessedAst: the preprocessed version of the original AstNode + + Note: + For preprocess_node to dispatch the right function, the function being + called must follow the convention "preprocess_node_" + """ + node_name = type(ast_node).NodeName.lower() + preprocess_fn = globals().get(f"preprocess_node_{node_name}") + if preprocess_fn is None: + raise KeyError(f"Could not find appropriate preprocessor for {node_name}") + return preprocess_fn(ast_node, trans_options, last_object) + + +## This preprocesses the AST node and also replaces it if it needs replacement . +## It is called by constructs that cannot be included in a dataflow region. +def preprocess_close_node( + ast_node: AstNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + preprocessed_ast_object = preprocess_node( + ast_node, trans_options, last_object=last_object + ) + preprocessed_ast = preprocessed_ast_object.ast + should_replace_whole_ast = preprocessed_ast_object.should_replace_whole_ast() + if should_replace_whole_ast: + final_ast = trans_options.replace_df_region( + asts=[preprocessed_ast], disable_parallel_pipelines=last_object + ) + something_replaced = True + else: + final_ast = preprocessed_ast + something_replaced = preprocessed_ast_object.will_anything_be_replaced() + return final_ast, something_replaced + + +## TODO: I am a little bit confused about how compilation happens. +## Does it happen bottom up or top down: i.e. when we first encounter an occurence +## do we recurse in it and then compile from the leaf, or just compile the surface? + +## Replaces IR subtrees with a command that calls them (more +## precisely, a command that calls a python script to call them). +## +## Note: The traversal that replace_irs does, is exactly the same as +## the one that is done by compile_node. Both of these functions +## transform nodes of type t to something else. +## +## TODO: For now this just replaces the IRs starting from the ourside +## one first, but it should start from the bottom up to handle +## recursive IRs. + +## This function serializes a candidate df_region in a file, and in its place, +## it adds a command that calls our distribution planner with the name of the +## saved file. +## +## If we are need to disable parallel pipelines, e.g., if we are in the context of an if, +## or if we are in the end of a script, then we set a variable. + + +def preprocess_node_pipe( + ast_node: PipeNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + ## A pipeline is *always* a candidate dataflow region. + ## Q: Is that true? + + ## TODO: Preprocess the internals of the pipe to allow + ## for mutually recursive calls to PaSh. + ## + ## For example, if a command in the pipe has a command substitution + ## in one of its arguments then we would like to call our runtime + ## there instead of + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=True, + non_maximal=ast_node.is_background, + last_ast=last_object, + ) + return preprocessed_ast_object + + +## TODO: Complete this +def preprocess_node_command( + ast_node: CommandNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + ## TODO: Preprocess the internals of the pipe to allow + ## for mutually recursive calls to PaSh. + ## + ## For example, if a command in the pipe has a command substitution + ## in one of its arguments then we would like to call our runtime + ## there instead of + + ## If there are no arguments, the command is just an + ## assignment (Q: or just redirections?) + if len(ast_node.arguments) == 0: + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=False, + last_ast=last_object, + ) + return preprocessed_ast_object + + ## This means we have a command. Commands are always candidate dataflow + ## regions. + preprocessed_ast_object = PreprocessedAST( + ast_node, replace_whole=True, non_maximal=False, last_ast=last_object + ) + return preprocessed_ast_object + + +# Background of (linno * t * redirection list) +## TODO: It might be possible to actually not close the inner node but rather apply the redirections on it +def preprocess_node_redir( + ast_node: RedirNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + preprocessed_node, something_replaced = preprocess_close_node( + ast_node.node, trans_options, last_object=last_object + ) + ast_node.node = preprocessed_node + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=something_replaced, + last_ast=last_object, + ) + return preprocessed_ast_object + + +## TODO: Is that correct? Also, this should probably affect `semi`, `and`, and `or` +def preprocess_node_background( + ast_node: BackgroundNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + ## A background node is *always* a candidate dataflow region. + ## Q: Is that true? + + ## TODO: Preprocess the internals of the background to allow + ## for mutually recursive calls to PaSh. + preprocessed_ast_object = PreprocessedAST( + ast_node, replace_whole=True, non_maximal=True, last_ast=last_object + ) + return preprocessed_ast_object + + +## TODO: We can actually preprocess the underlying node and then +## return its characteristics above. However, we would need +## to add a field in the IR that a node runs in a subshell +## (which would have implications on how the backend outputs it). +## +## e.g. a subshell node should also be output as a subshell in the backend. +## FIXME: This might not just be suboptimal, but also wrong. +def preprocess_node_subshell( + ast_node: SubshellNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + preprocessed_body, something_replaced = preprocess_close_node( + ast_node.body, trans_options, last_object=last_object + ) + ast_node.body = preprocessed_body + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=something_replaced, + last_ast=last_object, + ) + return preprocessed_ast_object + + +## TODO: For all of the constructs below, think whether we are being too conservative + + +## TODO: This is not efficient at all since it calls the PaSh runtime everytime the loop is entered. +## We have to find a way to improve that. +def preprocess_node_for( + ast_node: ForNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + ## If we are in a loop, we push the loop identifier into the loop context + loop_id = trans_options.enter_loop() + preprocessed_body, something_replaced = preprocess_close_node( + ast_node.body, trans_options, last_object=last_object + ) + + ## TODO: Then send this iteration identifier when talking to the spec scheduler + ## TODO: After running checks put this behind a check to only run under speculation + + ## Create a new variable that tracks loop iterations + var_name = loop_iter_var(loop_id) + export_node = make_export_var_constant_string(var_name, "0") + increment_node = make_increment_var(var_name) + + ## Also store the whole sequence of loop iters in a file + all_loop_ids = trans_options.get_current_loop_context() + + ## export pash_loop_iters="$pash_loop_XXX_iter $pash_loop_YYY_iter ..." + save_loop_iters_node = export_pash_loop_iters_for_current_context(all_loop_ids) + + ## Prepend the increment in the body + ast_node.body = make_typed_semi_sequence( + [ + to_ast_node(increment_node), + to_ast_node(save_loop_iters_node), + copy.deepcopy(preprocessed_body), + ] + ) + + ## We pop the loop identifier from the loop context. + ## + ## KK 2023-04-27: Could this exit happen before the replacement leading to wrong + ## results? I think not because we use the _close_node preprocessing variant. + ## A similar issue might happen for while + trans_options.exit_loop() + + ## reset the loop iters after we exit the loop + out_of_loop_loop_ids = trans_options.get_current_loop_context() + reset_loop_iters_node = export_pash_loop_iters_for_current_context( + out_of_loop_loop_ids + ) + + ## Prepend the export in front of the loop + # new_node = ast_node + new_node = make_typed_semi_sequence( + [to_ast_node(export_node), ast_node, to_ast_node(reset_loop_iters_node)] + ) + # print(new_node) + + preprocessed_ast_object = PreprocessedAST( + new_node, + replace_whole=False, + non_maximal=False, + something_replaced=something_replaced, + last_ast=last_object, + ) + + return preprocessed_ast_object + + +def preprocess_node_while( + ast_node: WhileNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + ## If we are in a loop, we push the loop identifier into the loop context + trans_options.enter_loop() + + preprocessed_test, sth_replaced_test = preprocess_close_node( + ast_node.test, trans_options, last_object=last_object + ) + preprocessed_body, sth_replaced_body = preprocess_close_node( + ast_node.body, trans_options, last_object=last_object + ) + ast_node.test = preprocessed_test + ast_node.body = preprocessed_body + something_replaced = sth_replaced_test or sth_replaced_body + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=something_replaced, + last_ast=last_object, + ) + + ## We pop the loop identifier from the loop context. + trans_options.exit_loop() + return preprocessed_ast_object + + +## This is the same as the one for `For` +def preprocess_node_defun( + ast_node: DefunNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + ## TODO: For now we don't want to compile function bodies + # preprocessed_body = preprocess_close_node(ast_node.body) + # ast_node.body = preprocessed_body + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=False, + last_ast=last_object, + ) + return preprocessed_ast_object + + +## TODO: If the preprocessed is not maximal we actually need to combine it with the one on the right. +def preprocess_node_semi( + ast_node: SemiNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) + ## + ## TODO: Is it valid that only the right one is considered the last command? + preprocessed_left, sth_replaced_left = preprocess_close_node( + ast_node.left_operand, trans_options, last_object + ) + preprocessed_right, sth_replaced_right = preprocess_close_node( + ast_node.right_operand, trans_options, last_object=last_object + ) + ast_node.left_operand = preprocessed_left + ast_node.right_operand = preprocessed_right + sth_replaced = sth_replaced_left or sth_replaced_right + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=sth_replaced, + last_ast=last_object, + ) + return preprocessed_ast_object + + +## TODO: Make sure that what is inside an `&&`, `||`, `!` (and others) does not run in parallel_pipelines +## since we need its exit code. +def preprocess_node_and( + ast_node: AndNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) + preprocessed_left, sth_replaced_left = preprocess_close_node( + ast_node.left_operand, trans_options, last_object=last_object + ) + preprocessed_right, sth_replaced_right = preprocess_close_node( + ast_node.right_operand, trans_options, last_object=last_object + ) + ast_node.left_operand = preprocessed_left + ast_node.right_operand = preprocessed_right + sth_replaced = sth_replaced_left or sth_replaced_right + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=sth_replaced, + last_ast=last_object, + ) + return preprocessed_ast_object + + +def preprocess_node_or( + ast_node: OrNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) + preprocessed_left, sth_replaced_left = preprocess_close_node( + ast_node.left_operand, trans_options, last_object=last_object + ) + preprocessed_right, sth_replaced_right = preprocess_close_node( + ast_node.right_operand, trans_options, last_object=last_object + ) + ast_node.left_operand = preprocessed_left + ast_node.right_operand = preprocessed_right + sth_replaced = sth_replaced_left or sth_replaced_right + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=sth_replaced, + last_ast=last_object, + ) + return preprocessed_ast_object + + +def preprocess_node_not( + ast_node: NotNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left) + preprocessed_body, sth_replaced = preprocess_close_node( + ast_node.body, trans_options, last_object=last_object + ) + ast_node.body = preprocessed_body + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=sth_replaced, + last_ast=last_object, + ) + return preprocessed_ast_object + + +def preprocess_node_if( + ast_node: IfNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + # preprocessed_left, should_replace_whole_ast, is_non_maximal = preprocess_node(ast_node.left, irFileGen, config) + preprocessed_cond, sth_replaced_cond = preprocess_close_node( + ast_node.cond, trans_options, last_object=last_object + ) + preprocessed_then, sth_replaced_then = preprocess_close_node( + ast_node.then_b, trans_options, last_object=last_object + ) + preprocessed_else, sth_replaced_else = preprocess_close_node( + ast_node.else_b, trans_options, last_object=last_object + ) + ast_node.cond = preprocessed_cond + ast_node.then_b = preprocessed_then + ast_node.else_b = preprocessed_else + sth_replaced = sth_replaced_cond or sth_replaced_then or sth_replaced_else + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=sth_replaced, + last_ast=last_object, + ) + return preprocessed_ast_object + + +def preprocess_case( + case, trans_options: AbstractTransformationState, last_object: bool +): + preprocessed_body, sth_replaced = preprocess_close_node( + case["cbody"], trans_options, last_object=last_object + ) + case["cbody"] = preprocessed_body + return case, sth_replaced + + +def preprocess_node_case( + ast_node: CaseNode, + trans_options: AbstractTransformationState, + last_object: bool = False, +): + preprocessed_cases_replaced = [ + preprocess_case(case, trans_options, last_object=last_object) + for case in ast_node.cases + ] + preprocessed_cases, sth_replaced_cases = list(zip(*preprocessed_cases_replaced)) + ast_node.cases = preprocessed_cases + preprocessed_ast_object = PreprocessedAST( + ast_node, + replace_whole=False, + non_maximal=False, + something_replaced=any(sth_replaced_cases), + last_ast=last_object, + ) + return preprocessed_ast_object diff --git a/compiler/shell_ast/transformation_options.py b/compiler/shell_ast/transformation_options.py new file mode 100644 index 000000000..0d5221c2c --- /dev/null +++ b/compiler/shell_ast/transformation_options.py @@ -0,0 +1,217 @@ +from abc import ABC, abstractmethod +from enum import Enum +import pickle + +from shell_ast.ast_util import * +from shasta.json_to_ast import to_ast_node +from speculative import util_spec +from parse import from_ast_objects_to_shell + + +## There are two types of ast_to_ast transformations +class TransformationType(Enum): + PASH = "pash" + SPECULATIVE = "spec" + AIRFLOW = "airflow" + + +class AbstractTransformationState(ABC): + def __init__(self): + self._node_counter = 0 + self._loop_counter = 0 + self._loop_contexts = [] + + def get_mode(self): + return TransformationType.PASH + + ## Node id related + def get_next_id(self): + new_id = self._node_counter + self._node_counter += 1 + return new_id + + def get_current_id(self): + return self._node_counter - 1 + + def get_number_of_ids(self): + return self._node_counter + + ## Loop id related + def get_next_loop_id(self): + new_id = self._loop_counter + self._loop_counter += 1 + return new_id + + def get_current_loop_context(self): + ## We want to copy that + return self._loop_contexts[:] + + def get_current_loop_id(self): + if len(self._loop_contexts) == 0: + return None + else: + return self._loop_contexts[0] + + def enter_loop(self): + new_loop_id = self.get_next_loop_id() + self._loop_contexts.insert(0, new_loop_id) + return new_loop_id + + def exit_loop(self): + self._loop_contexts.pop(0) + + @abstractmethod + def replace_df_region( + self, asts, disable_parallel_pipelines=False, ast_text=None + ) -> AstNode: + pass + + +## Use this object to pass options inside the preprocessing +## trasnformation. +class TransformationState(AbstractTransformationState): + def replace_df_region( + self, asts, disable_parallel_pipelines=False, ast_text=None + ) -> AstNode: + ir_filename = ptempfile() + + ## Serialize the node in a file + with open(ir_filename, "wb") as ir_file: + pickle.dump(asts, ir_file) + + ## Serialize the candidate df_region asts back to shell + ## so that the sequential script can be run in parallel to the compilation. + sequential_script_file_name = ptempfile() + text_to_output = get_shell_from_ast(asts, ast_text=ast_text) + ## However, if we have the original ast text, then we can simply output that. + with open(sequential_script_file_name, "w") as script_file: + script_file.write(text_to_output) + replaced_node = TransformationState.make_call_to_pash_runtime( + ir_filename, sequential_script_file_name, disable_parallel_pipelines + ) + + return to_ast_node(replaced_node) + + ## This function makes a command that calls the pash runtime + ## together with the name of the file containing an IR. Then the + ## pash runtime should read from this file and continue + ## execution. + ## + ## TODO: At the moment this is written in python but it is in essense a simple shell script. + ## Is it possible to make it be a simple string instead of manually creating the AST? + ## + ## (MAYBE) TODO: The way I did it, is by calling the parser once, and seeing + ## what it returns. Maybe it would make sense to call the parser on + ## the fly to have a cleaner implementation here? + @staticmethod + def make_call_to_pash_runtime( + ir_filename, sequential_script_file_name, disable_parallel_pipelines + ) -> AstNode: + ## Disable parallel pipelines if we are in the last command of the script. + ## ``` + ## pash_disable_parallel_pipelines=1 + ## ``` + if disable_parallel_pipelines: + assignments = [["pash_disable_parallel_pipelines", string_to_argument("1")]] + else: + assignments = [["pash_disable_parallel_pipelines", string_to_argument("0")]] + assignments.append( + [ + "pash_sequential_script_file", + string_to_argument(sequential_script_file_name), + ] + ) + assignments.append(["pash_input_ir_file", string_to_argument(ir_filename)]) + + ## Call the runtime + arguments = [ + string_to_argument("source"), + string_to_argument(config.RUNTIME_EXECUTABLE), + ] + runtime_node = make_command(arguments, assignments=assignments) + return runtime_node + + +## TODO: Turn it into a Transformation State class, and make a subclass for +## each of the two transformations. It is important for it to be state, because +## it will need to be passed around while traversing the tree. +class SpeculativeTransformationState(AbstractTransformationState): + def __init__(self, po_file: str): + self.partial_order_file = po_file + self.partial_order_edges = [] + self.partial_order_node_loop_contexts = {} + + def replace_df_region( + self, asts, disable_parallel_pipelines=False, ast_text=None + ) -> AstNode: + text_to_output = get_shell_from_ast(asts, ast_text=ast_text) + ## Generate an ID + df_region_id = self.get_next_id() + + ## Get the current loop id and save it so that the runtime knows + ## which loop it is in. + loop_id = self.get_current_loop_id() + + ## Determine its predecessors + ## TODO: To make this properly work, we should keep some state + ## in the AST traversal to be able to determine predecessors. + if df_region_id == 0: + predecessors = [] + else: + predecessors = [df_region_id - 1] + ## Write to a file indexed by its ID + util_spec.save_df_region(text_to_output, self, df_region_id, predecessors) + ## TODO: Add an entry point to spec through normal PaSh + replaced_node = SpeculativeTransformationState.make_call_to_spec_runtime( + df_region_id, loop_id + ) + return to_ast_node(replaced_node) + + def get_partial_order_file(self): + return self.partial_order_file + + def add_edge(self, from_id: int, to_id: int): + self.partial_order_edges.append((from_id, to_id)) + + def get_all_edges(self): + return self.partial_order_edges + + def add_node_loop_context(self, node_id: int, loop_contexts): + self.partial_order_node_loop_contexts[node_id] = loop_contexts + + def get_all_loop_contexts(self): + return self.partial_order_node_loop_contexts + + ## TODO: Make that an actual call to the spec runtime + @staticmethod + def make_call_to_spec_runtime(command_id: int, loop_id) -> AstNode: + assignments = [["pash_spec_command_id", string_to_argument(str(command_id))]] + if loop_id is None: + loop_id_str = "" + else: + loop_id_str = str(loop_id) + + assignments.append(["pash_spec_loop_id", string_to_argument(loop_id_str)]) + + ## Call the runtime + arguments = [ + string_to_argument("source"), + string_to_argument(config.RUNTIME_EXECUTABLE), + ] + ## Pass all relevant argument to the planner + runtime_node = make_command(arguments, assignments=assignments) + + return runtime_node + + +class AirflowTransformationState(TransformationState): + pass + + +def get_shell_from_ast(asts, ast_text=None) -> str: + ## If we don't have the original ast text, we need to unparse the ast + if ast_text is None: + text_to_output = from_ast_objects_to_shell(asts) + else: + text_to_output = ast_text + return text_to_output