diff --git a/src/lib/Pipeline.class.ts b/src/lib/Pipeline.class.ts index 2986e4e..0907ff6 100644 --- a/src/lib/Pipeline.class.ts +++ b/src/lib/Pipeline.class.ts @@ -5,19 +5,21 @@ import chalk from "chalk"; import Stage from "./Stage.class.js"; import duration from "../utils/duration.js"; import path from "node:path"; -import { mkdirSync } from "node:fs"; +import * as fs from "node:fs"; class Pipeline { public readonly stages = new Map(); public dataDir: string; private $isValidated: boolean = false; + private stageNames: string[] = []; + private now = new Date(); public constructor( private readonly $configuration: LDWorkbenchConfiguration ) { // create data folder: this.dataDir = path.join("data", kebabcase(this.$configuration.name)); - mkdirSync(this.dataDir, { recursive: true }); + fs.mkdirSync(this.dataDir, { recursive: true }); } private error(e: Error, stage?: string): void { @@ -45,6 +47,11 @@ class Pipeline { public validate(): void { if (this.$isValidated) return; let i = 0; + if (this.$configuration.stages.length === 0) { + throw new Error( + "Your pipeline contains no stages." + ); + } for (const stageConfiguration of this.$configuration.stages) { if (i === 0 && stageConfiguration.iterator.endpoint === undefined) { throw new Error( @@ -70,28 +77,38 @@ class Pipeline { } public async run(startFromStageName?: string): Promise { - const now = new Date(); + this.now = new Date(); console.info(chalk.cyan(`🏁 starting pipeline "${chalk.bold(this.name)}"`)); const spinner = ora("validating pipeline").start(); - let startFromStage = 0 + let startFromStage = 0; try { this.validate(); if (startFromStageName !== undefined) { - if(/^\d+$/.test(startFromStageName)) { - const ix = parseInt(startFromStageName) + if (/^\d+$/.test(startFromStageName)) { + const ix = parseInt(startFromStageName); if (Array.from(this.stages.keys()).length < ix) { - const e = new Error(`Pipeline ${chalk.italic(this.name)} does not have stage #${chalk.italic(startFromStageName)}.`) + const e = new Error( + `Pipeline ${chalk.italic( + this.name + )} does not have stage #${chalk.italic(startFromStageName)}.` + ); spinner.fail(e.message); this.error(e); } else { - startFromStage = ix - 1 + startFromStage = ix - 1; } - } else if(!this.stages.has(startFromStageName)) { - const e = new Error(`Pipeline ${chalk.italic(this.name)} does not have stage ${chalk.italic(startFromStageName)}.`) + } else if (!this.stages.has(startFromStageName)) { + const e = new Error( + `Pipeline ${chalk.italic( + this.name + )} does not have stage ${chalk.italic(startFromStageName)}.` + ); spinner.fail(e.message); this.error(e); } else { - startFromStage = Array.from(this.stages.keys()).findIndex(value => value === startFromStageName) + startFromStage = Array.from(this.stages.keys()).findIndex( + (value) => value === startFromStageName + ); } } spinner.succeed(); @@ -100,32 +117,62 @@ class Pipeline { this.error(e as Error); } - const stageNames = Array.from(this.stages.keys()).splice(startFromStage) + this.stageNames = Array.from(this.stages.keys()).splice(startFromStage); - function run(stages: Map): void { - const stage = stages.get(stageNames.shift()!)! - const spinner = ora("Loading results from Iterator").start(); - stage.on("iteratorResult", ($this) => { - spinner.text = $this.value; - }); - stage.on("end", (iris, statements) => { - spinner.succeed(`stage "${chalk.bold(stage.name)}" resulted in ${statements} statement${statements === 1 ?'':'s'} in ${iris} iteration${iris === 1 ?'':'s'}.`) - if (stageNames.length !== 0) { - run(stages) - } else { + Array.from(this.stages.keys()).slice(0, startFromStage).forEach(stagename => { + ora().start().info(`stage "${chalk.bold(stagename)}" was skipped`).stop(); + }) + this.runRecursive(); + } + + private runRecursive(): void { + const stage = this.stages.get(this.stageNames.shift()!)!; + const spinner = ora("Loading results from Iterator").start(); + stage.on("iteratorResult", ($this) => { + spinner.text = $this.value; + }); + stage.on("end", (iris, statements) => { + spinner.succeed( + `stage "${chalk.bold( + stage.name + )}" resulted in ${statements.toLocaleString()} statement${ + statements === 1 ? "" : "s" + } in ${iris.toLocaleString()} iteration${iris === 1 ? "" : "s"}.` + ); + if (this.stageNames.length !== 0) { + this.runRecursive(); + } else { + this.concat() console.info( - chalk.green(`✔ your pipeline was completed in ${duration(now)}`) + chalk.green( + `✔ your pipeline "${chalk.bold( + this.name + )}" was completed in ${duration(this.now)}}` + ) ); - } - }); - try { - stage.run() - } catch(e) { - spinner.fail((e as Error).message); } + }); + try { + stage.run(); + } catch (e) { + spinner.fail((e as Error).message); } + } - run(this.stages) + private concat(): void { + const spinner = ora("Combining statements from all stages:").start(); + const destinationPath = path.join(this.dataDir, 'statements.nt') + const destinationStream = fs.createWriteStream(destinationPath, {flags:'a'}) + const stageNames = Array.from(this.stages.keys()) + for (const stageName of stageNames) { + spinner.suffixText = chalk.bold(stageName) + fs.readFile(this.stages.get(stageName)!.destinationPath, (error, buffer) => { + if(error !== null) throw error + destinationStream.write(buffer) + }) + } + spinner.suffixText = chalk.bold(destinationPath) + spinner.succeed() } get name(): string {