Skip to content

Commit

Permalink
closes issue #8
Browse files Browse the repository at this point in the history
  • Loading branch information
mightymax committed Nov 29, 2023
1 parent 29843ec commit 6e688c8
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 14 deletions.
14 changes: 9 additions & 5 deletions src/lib/File.class.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { type WriteStream, createWriteStream, existsSync, statSync } from 'fs'
import { type WriteStream, createWriteStream, existsSync, statSync, mkdirSync } from 'fs'
import { isFile, isFilePathString } from '../utils/guards.js'
import { dirname } from 'path'

export default class File {
public static $id = 'File'
private readonly $isValid?: boolean
public constructor(private $path: string) {}
public constructor(private $path: string, private readonly skipExistsCheck: boolean = false) {}

public validate(): File {
if (this.$isValid !== undefined) return this
Expand All @@ -13,7 +14,7 @@ export default class File {
throw new Error(`The filename \`${wrongFilePath}\` should start with \`file://\``)
}
this.$path = this.$path.replace(/^file:\/\//, '')
if (!existsSync(this.$path) || !statSync(this.$path).isFile()) {
if (!this.skipExistsCheck && (!existsSync(this.$path) || !statSync(this.$path).isFile())) {
throw new Error(`File not found: \`${this.$path}\``)
}
return this
Expand All @@ -23,11 +24,14 @@ export default class File {
return this.$path
}

public getStream(): WriteStream {
public getStream(append: boolean = false): WriteStream {
if (existsSync(this.$path)) {
// throw new Error(`File already exists: \`${this.$path}\``)
}
return createWriteStream(this.$path)
if (!existsSync(dirname(this.$path))) {
mkdirSync(dirname(this.$path), { recursive: true})
}
return createWriteStream(this.$path, append ? {flags: 'a'} : {})
}

public toString(): string {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/LDWorkbenchConfiguration.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface LDWorkbenchConfiguration {
/**
* The file where the final result of your pipeline is saved.
*/
destination: string;
destination?: string;
/**
* This is where you define the individual iterator/generator for each step.
*
Expand Down
25 changes: 20 additions & 5 deletions src/lib/Pipeline.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,33 @@ import type { LDWorkbenchConfiguration } from "./LDWorkbenchConfiguration.js";
import chalk from "chalk";
import Stage from "./Stage.class.js";
import duration from "../utils/duration.js";
import File from './File.class.js'
import path from "node:path";
import * as fs from "node:fs";
import { isFilePathString } from '../utils/guards.js';

class Pipeline {
public readonly stages = new Map<string, Stage>();
public dataDir: string;
private $isValidated: boolean = false;
private stageNames: string[] = [];
private now = new Date();
private readonly destination: File

public constructor(
private readonly $configuration: LDWorkbenchConfiguration
) {
// create data folder:
this.dataDir = path.join("data", kebabcase(this.$configuration.name));
fs.mkdirSync(this.dataDir, { recursive: true });
const destinationFile = this.configuration.destination ?? `file://${path.join(this.dataDir, 'statements.nt')}`
if (!isFilePathString(destinationFile)) {
throw new Error('We currently only allow publishing data to local files.')
}
if(!destinationFile.endsWith('.nt')) {
throw new Error('We currently only writing results in N-Triples format,\nmake sure your destination filename ends with \'.nt\'.')
}
this.destination = new File(destinationFile, true)
}

private error(e: Error, stage?: string): void {
Expand Down Expand Up @@ -142,7 +153,7 @@ class Pipeline {
if (this.stageNames.length !== 0) {
this.runRecursive();
} else {
this.concat()
this.writeResult()
console.info(
chalk.green(
`✔ your pipeline "${chalk.bold(
Expand All @@ -159,10 +170,14 @@ class Pipeline {
}
}

private concat(): void {
private writeResult(): void {
const spinner = ora("Combining statements from all stages:").start();
const destinationPath = path.join(this.dataDir, 'statements.nt')
const destinationStream = fs.createWriteStream(destinationPath, {flags:'a'})

const destinationPathNew = this.configuration.destination
if (!isFilePathString(destinationPathNew)) {
throw new Error('We currently only allow publishing data to local files.')
}
const destinationStream = this.destination.getStream()
const stageNames = Array.from(this.stages.keys())
for (const stageName of stageNames) {
spinner.suffixText = chalk.bold(stageName)
Expand All @@ -171,7 +186,7 @@ class Pipeline {
destinationStream.write(buffer)
})
}
spinner.suffixText = chalk.bold(destinationPath)
spinner.suffixText = chalk.bold(this.destination.toString())
spinner.succeed()
}

Expand Down
3 changes: 1 addition & 2 deletions static/example/config.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# Metadata for your pipeline:
name: Example Pipeline
destination: file://data/example/example.ttl
description: >
This is an example pipeline. It uses files that are available in this repository
and SPARQL endpoints that should work.
# The individual stages for your pipeline
stages:
- name: "Stage 1"
iterator:
iterator:
query: file://static/example/iterator-stage-1.rq
endpoint: https://api.triplydb.com/datasets/Triply/iris/services/demo-service/sparql
generator:
Expand Down
2 changes: 1 addition & 1 deletion static/ld-workbench.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@
}
}
},
"required": ["name", "destination", "stages"]
"required": ["name", "stages"]
}

0 comments on commit 6e688c8

Please sign in to comment.