Skip to content

Commit

Permalink
Merge pull request #68 from quantifyearth/initial-join
Browse files Browse the repository at this point in the history
Initial join implementation
  • Loading branch information
mdales authored Jul 29, 2024
2 parents 405b221 + c913ec6 commit 7fcd309
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 99 deletions.
5 changes: 2 additions & 3 deletions src/bin/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,11 @@ let md ~fs ~net ~domain_mgr ~proc () no_run store conf file port fetcher jobs
(* Now we build the block *)
(* Import block digests need to be mapped to this build hash *)
let hb =
match Shark.Ast.find_hyperblock_from_block ast block with
match Shark.Ast.find_ast_block_from_shark_block ast block with
| Some hb -> hb
| None ->
Logs.info (fun f ->
f "Failed to find the hyperblock for %a" Shark.Block.pp
block);
f "Failed to find the astblock for %a" Shark.Block.pp block);
failwith "Block not found"
in
let res =
Expand Down
28 changes: 15 additions & 13 deletions src/lib/ast/ast.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ open Astring
open Sexplib.Conv
module DatafileSet = Set.Make (Datafile)

module Hyperblock = struct
module Astblock = struct
type t = {
hash : string list ref;
context : string;
Expand Down Expand Up @@ -37,7 +37,7 @@ module Hyperblock = struct
end

module Section = struct
type t = { name : string; blocks : Hyperblock.t list }
type t = { name : string; blocks : Astblock.t list }

let v name blocks = { name; blocks }
let name s = s.name
Expand All @@ -47,7 +47,7 @@ end
type block_id = int [@@deriving sexp]

type t = {
nodes : (block_id * Hyperblock.t) list;
nodes : (block_id * Astblock.t) list;
edges : (block_id * block_id) list;
metadata : Frontmatter.t;
}
Expand Down Expand Up @@ -205,7 +205,7 @@ let pass_one_on_list inputs section_list =
pass_one_process_commands_loop counter superblock.commands
input_map
in
(updated_map, Hyperblock.v name superblock.block leaves))
(updated_map, Astblock.v name superblock.block leaves))
input_map superblocks
in
(updated_map, Section.v name processed_section))
Expand Down Expand Up @@ -304,34 +304,34 @@ let of_sharkdown ?concrete_paths template_markdown =
let pass1 = pass_one_on_list [] expanded_sections in

(* Now I have the global graph implicitly, turn the list into a graph of blocks *)
let all_hyperblocks = List.concat_map Section.blocks pass1 in
let id_all_hyperblocks = List.mapi (fun i h -> (i, h)) all_hyperblocks in
let all_astblocks = List.concat_map Section.blocks pass1 in
let id_all_astblocks = List.mapi (fun i h -> (i, h)) all_astblocks in

(* All files will have one writer and zero or more readers *)
let writers =
List.concat
(List.map
(fun (hbid, h) ->
let _, outputs = Hyperblock.io h in
let _, outputs = Astblock.io h in
List.map (fun o -> (Datafile.id o, (o, hbid))) outputs)
id_all_hyperblocks)
id_all_astblocks)
in

let edges =
List.concat
(List.map
(fun (hbid, h) ->
let inputs, _ = Hyperblock.io h in
let inputs, _ = Astblock.io h in
List.filter_map
(fun i ->
match List.assoc_opt (Datafile.id i) writers with
| None -> None
| Some (_, writerid) -> Some (writerid, hbid))
inputs)
id_all_hyperblocks)
id_all_astblocks)
in

({ nodes = id_all_hyperblocks; edges; metadata }, expanded_markdown)
({ nodes = id_all_astblocks; edges; metadata }, expanded_markdown)

let find_id_of_block ast ib =
let d = Block.digest ib in
Expand All @@ -340,14 +340,14 @@ let find_id_of_block ast ib =
| [] -> None
| hd :: tl ->
let id, hb = hd in
let b = Hyperblock.block hb in
let b = Astblock.block hb in
if Block.digest b = d then Some id else loop tl
in
loop ast.nodes

let block_by_id ast id = List.assoc_opt id ast.nodes

let find_hyperblock_from_block ast block =
let find_ast_block_from_shark_block ast block =
let id = find_id_of_block ast block in
Option.bind id (block_by_id ast)

Expand All @@ -357,6 +357,8 @@ let find_dependencies ast id =
let from, too = edge in
if too = id then Some from else None)
ast.edges
|> List.sort_uniq (fun a b -> a - b)
(* remove duplicates if we take more than one output from a block *)
|> List.map (fun id -> List.assoc id ast.nodes)

let default_container_path ast = Frontmatter.default_container_path ast.metadata
10 changes: 5 additions & 5 deletions src/lib/ast/ast.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The AST is the logical representation of the workflow described in a
sharkdown file, including the structure of groups (aka basic blocks
in PL, but block is an overloaded term in this context). *)

module Hyperblock : sig
module Astblock : sig
type t [@@deriving sexp]

val block : t -> Block.t
Expand Down Expand Up @@ -39,10 +39,10 @@ val of_sharkdown :
being updated for any autogenerated blocks. *)

val find_id_of_block : t -> Block.t -> block_id option
val block_by_id : t -> block_id -> Hyperblock.t option
val find_hyperblock_from_block : t -> Block.t -> Hyperblock.t option
val find_dependencies : t -> block_id -> Hyperblock.t list
val block_by_id : t -> block_id -> Astblock.t option
val find_ast_block_from_shark_block : t -> Block.t -> Astblock.t option
val find_dependencies : t -> block_id -> Astblock.t list
val default_container_path : t -> Fpath.t

val to_list : t -> Hyperblock.t list
val to_list : t -> Astblock.t list
(** Convert the AST to a list of command blocks. *)
14 changes: 7 additions & 7 deletions src/lib/dotrenderer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,28 @@ let datafile_to_dot ppf datafile =
(Datafile.id datafile) shape
(Fpath.to_string (Datafile.path datafile))

let render_ast_to_dot ppf hyperblocks : unit =
let render_ast_to_dot ppf astblocks : unit =
Format.fprintf ppf "digraph{\n";
List.concat_map
(fun hb ->
let commands = Ast.Hyperblock.commands hb in
let commands = Ast.Astblock.commands hb in
List.concat_map
(fun command ->
let inputs = Leaf.inputs command and outputs = Leaf.outputs command in
List.concat [ inputs; outputs ])
commands)
hyperblocks
astblocks
|> DatafileSet.of_list
|> DatafileSet.iter (datafile_to_dot ppf);

List.iteri
(fun i hb ->
let kind = Block.kind (Ast.Hyperblock.block hb) in
let kind = Block.kind (Ast.Astblock.block hb) in
let name, style =
match kind with
| `Publish -> ("Publish", "bold")
| _ -> (Ast.Hyperblock.context hb, "solid")
and commands = Ast.Hyperblock.commands hb in
| _ -> (Ast.Astblock.context hb, "solid")
and commands = Ast.Astblock.commands hb in
Format.fprintf ppf "subgraph \"cluster_%d\" {\n" i;
Format.fprintf ppf "\tstyle = %s\n" style;
Format.fprintf ppf "\tlabel = \"%s\"\n" name;
Expand All @@ -115,7 +115,7 @@ let render_ast_to_dot ppf hyperblocks : unit =
List.iter (renderer ppf) filtered_commands;

Format.fprintf ppf "}\n")
hyperblocks;
astblocks;
Format.fprintf ppf "}\n"

let render ~template_markdown =
Expand Down
Loading

0 comments on commit 7fcd309

Please sign in to comment.