Skip to content

Commit

Permalink
Adds concat function to sombine all stages into 1 file.
Browse files Browse the repository at this point in the history
  • Loading branch information
mightymax committed Nov 29, 2023
1 parent 2545be5 commit 29843ec
Showing 1 changed file with 78 additions and 31 deletions.
109 changes: 78 additions & 31 deletions src/lib/Pipeline.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ import chalk from "chalk";
import Stage from "./Stage.class.js";
import duration from "../utils/duration.js";
import path from "node:path";
import { mkdirSync } from "node:fs";
import * as fs from "node:fs";

class Pipeline {
public readonly stages = new Map<string, Stage>();
public dataDir: string;
private $isValidated: boolean = false;
private stageNames: string[] = [];
private now = new Date();

public constructor(
private readonly $configuration: LDWorkbenchConfiguration
) {
// create data folder:
this.dataDir = path.join("data", kebabcase(this.$configuration.name));
mkdirSync(this.dataDir, { recursive: true });
fs.mkdirSync(this.dataDir, { recursive: true });
}

private error(e: Error, stage?: string): void {
Expand Down Expand Up @@ -45,6 +47,11 @@ class Pipeline {
public validate(): void {
if (this.$isValidated) return;
let i = 0;
if (this.$configuration.stages.length === 0) {
throw new Error(
"Your pipeline contains no stages."
);
}
for (const stageConfiguration of this.$configuration.stages) {
if (i === 0 && stageConfiguration.iterator.endpoint === undefined) {
throw new Error(
Expand All @@ -70,28 +77,38 @@ class Pipeline {
}

public async run(startFromStageName?: string): Promise<void> {
const now = new Date();
this.now = new Date();
console.info(chalk.cyan(`🏁 starting pipeline "${chalk.bold(this.name)}"`));
const spinner = ora("validating pipeline").start();
let startFromStage = 0
let startFromStage = 0;
try {
this.validate();
if (startFromStageName !== undefined) {
if(/^\d+$/.test(startFromStageName)) {
const ix = parseInt(startFromStageName)
if (/^\d+$/.test(startFromStageName)) {
const ix = parseInt(startFromStageName);
if (Array.from(this.stages.keys()).length < ix) {
const e = new Error(`Pipeline ${chalk.italic(this.name)} does not have stage #${chalk.italic(startFromStageName)}.`)
const e = new Error(
`Pipeline ${chalk.italic(
this.name
)} does not have stage #${chalk.italic(startFromStageName)}.`
);
spinner.fail(e.message);
this.error(e);
} else {
startFromStage = ix - 1
startFromStage = ix - 1;
}
} else if(!this.stages.has(startFromStageName)) {
const e = new Error(`Pipeline ${chalk.italic(this.name)} does not have stage ${chalk.italic(startFromStageName)}.`)
} else if (!this.stages.has(startFromStageName)) {
const e = new Error(
`Pipeline ${chalk.italic(
this.name
)} does not have stage ${chalk.italic(startFromStageName)}.`
);
spinner.fail(e.message);
this.error(e);
} else {
startFromStage = Array.from(this.stages.keys()).findIndex(value => value === startFromStageName)
startFromStage = Array.from(this.stages.keys()).findIndex(
(value) => value === startFromStageName
);
}
}
spinner.succeed();
Expand All @@ -100,32 +117,62 @@ class Pipeline {
this.error(e as Error);
}

const stageNames = Array.from(this.stages.keys()).splice(startFromStage)
this.stageNames = Array.from(this.stages.keys()).splice(startFromStage);

function run(stages: Map<string, Stage>): void {
const stage = stages.get(stageNames.shift()!)!
const spinner = ora("Loading results from Iterator").start();
stage.on("iteratorResult", ($this) => {
spinner.text = $this.value;
});
stage.on("end", (iris, statements) => {
spinner.succeed(`stage "${chalk.bold(stage.name)}" resulted in ${statements} statement${statements === 1 ?'':'s'} in ${iris} iteration${iris === 1 ?'':'s'}.`)
if (stageNames.length !== 0) {
run(stages)
} else {
Array.from(this.stages.keys()).slice(0, startFromStage).forEach(stagename => {
ora().start().info(`stage "${chalk.bold(stagename)}" was skipped`).stop();
})
this.runRecursive();
}

private runRecursive(): void {
const stage = this.stages.get(this.stageNames.shift()!)!;
const spinner = ora("Loading results from Iterator").start();
stage.on("iteratorResult", ($this) => {
spinner.text = $this.value;
});
stage.on("end", (iris, statements) => {
spinner.succeed(
`stage "${chalk.bold(
stage.name
)}" resulted in ${statements.toLocaleString()} statement${
statements === 1 ? "" : "s"
} in ${iris.toLocaleString()} iteration${iris === 1 ? "" : "s"}.`
);
if (this.stageNames.length !== 0) {
this.runRecursive();
} else {
this.concat()
console.info(
chalk.green(`✔ your pipeline was completed in ${duration(now)}`)
chalk.green(
`✔ your pipeline "${chalk.bold(
this.name
)}" was completed in ${duration(this.now)}}`
)
);
}
});
try {
stage.run()
} catch(e) {
spinner.fail((e as Error).message);
}
});
try {
stage.run();
} catch (e) {
spinner.fail((e as Error).message);
}
}

run(this.stages)
private concat(): void {
const spinner = ora("Combining statements from all stages:").start();
const destinationPath = path.join(this.dataDir, 'statements.nt')
const destinationStream = fs.createWriteStream(destinationPath, {flags:'a'})
const stageNames = Array.from(this.stages.keys())
for (const stageName of stageNames) {
spinner.suffixText = chalk.bold(stageName)
fs.readFile(this.stages.get(stageName)!.destinationPath, (error, buffer) => {
if(error !== null) throw error
destinationStream.write(buffer)
})
}
spinner.suffixText = chalk.bold(destinationPath)
spinner.succeed()
}

get name(): string {
Expand Down

0 comments on commit 29843ec

Please sign in to comment.