diff --git a/public/scripts/graph-util.js b/public/scripts/graph-util.js
index 8f44aab..d0a54aa 100644
--- a/public/scripts/graph-util.js
+++ b/public/scripts/graph-util.js
@@ -30,7 +30,7 @@ var GraphUtil = (function($, d3, dagreD3, ViewUtil, ChartUtil, StateUtil) {
}
graph.renderFlowGraph = function(graph_data, step_map, edges) {
- addAllVertices(graph_data, step_map);
+ addAllVertices(graph_data, step_map, edges);
addAllEdges(graph_data, step_map, edges);
renderD3Graph(graph_data);
@@ -51,20 +51,96 @@ var GraphUtil = (function($, d3, dagreD3, ViewUtil, ChartUtil, StateUtil) {
item.src_stage.toString(),
item.dest_stage.toString(),
{
- label: '' +
- ViewUtil.prettyPrintBytes(step.hdfs_bytes_written) + ''
+ label: getLabelWithSize(step.hdfs_bytes_written)
}
);
});
+
+ for (var key in step_map) {
+ var step = step_map[key];
+ if (hasAdditionalInput(step)) {
+ graph_data.addEdge(
+ null,
+ getInputVertex(key),
+ key,
+ {
+ label: getLabelWithSize(Math.max(step.hdfs_bytes_read - getReadFromIntemediate(key, step_map, rows), 0))
+ }
+ );
+ }
+ if (!isIntermediate(key, rows)) {
+ graph_data.addEdge(
+ null,
+ key,
+ getOutputVertex(key),
+ {
+ label: getLabelWithSize(step.hdfs_bytes_written)
+ }
+ );
+ }
+ }
+ }
+
+ function getLabelWithSize(size) {
+ return '' +
+ ViewUtil.prettyPrintBytes(size) + ''
}
- function addAllVertices(graph_data, step_map) {
+ function addAllVertices(graph_data, step_map, edges) {
for (var key in step_map) {
var step = step_map[key];
graph_data.addNode(key, { label: addElephant(step) });
+
+ if (hasAdditionalInput(step)) {
+ graph_data.addNode(getInputVertex(key), { label: "I" + key + ""})
+ }
+ if (!isIntermediate(key, edges)) {
+ graph_data.addNode(getOutputVertex(key), { label: "O" + key + ""})
+ }
+ }
+ }
+
+ function getInputVertex(step_number) {
+ return step_number + "_input"
+ }
+
+ function getOutputVertex(step_number) {
+ return step_number + "_output"
+ }
+
+ function isIntermediate(step_number, edges) {
+ for (var key in edges) {
+ var edge = edges[key]
+ if (edge.src_stage == step_number) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ function hasAdditionalInput(step) {
+ var hasAdditionalInput = false;
+ for (tap in step.sources) {
+ if (!tap.match(/\/tmp\/hadoop-/)) {
+ hasAdditionalInput = true;
+ }
}
+ return hasAdditionalInput;
}
+ function getReadFromIntemediate(step_number, steps, edges) {
+ var readFromIntemediate = 0
+ for (var key in edges) {
+ var edge = edges[key]
+ if (edge.dest_stage == step_number) {
+ var src_step = steps[edge.src_stage]
+ readFromIntemediate += src_step.hdfs_bytes_written
+ }
+ }
+ return readFromIntemediate
+ }
+
+
function renderD3Graph(graph_data) {
theG = d3.select("#flowgraph > g");
renderViewLayout(graph_data);