Skip to content

Commit

Permalink
Blocker File class not writing batch file properly
Browse files Browse the repository at this point in the history
  • Loading branch information
Philippe Renzen committed Dec 20, 2023
1 parent 41ea315 commit 9355e4c
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 140 deletions.
3 changes: 3 additions & 0 deletions src/lib/File.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ export default class File {

public async write(pipeline: Pipeline, spinner: Ora): Promise<void> {
const destinationStream = this.getStream()
console.log('🪵 | file: File.class.ts:54 | File | write | destinationStream:', destinationStream)
const stageNames = Array.from(pipeline.stages.keys())
for (const stageName of stageNames) {
if (spinner !== undefined) spinner.suffixText = chalk.bold(stageName)
// @mightymax the buffer seems to be a lot smaller for the batch pipeline
readFile(pipeline.stages.get(stageName)!.destinationPath, (error, buffer) => {
console.log('🪵 | file: File.class.ts:59 | File | readFile | buffer:', buffer)
if(error !== null) throw error
destinationStream.write(buffer)
})
Expand Down
123 changes: 44 additions & 79 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import type { ConstructQuery } from "sparqljs";
import type Stage from "./Stage.class.js";
import getSPARQLQuery from "../utils/getSPARQLQuery.js";
import type { Quad, NamedNode } from "@rdfjs/types";
import getSPARQLQueryString from "../utils/getSPARQLQueryString.js";
import getEndpoint from "../utils/getEndpoint.js";
import type { Endpoint, QueryEngine } from "./types.js";
import getEngine from '../utils/getEngine.js';
Expand All @@ -12,33 +11,53 @@ import EventEmitter from 'node:events';
import getBatchSPARQLQueryString from "../utils/getBatchSPARQLQueryString.js";
declare interface Generator {
on(event: "data", listener: (statement: Quad) => void): this;
on(event: "dataCleanup", listener: (statement: Quad) => void): this;
on(event: "end", listener: (numResults: number) => void): this;
on(event: "endCleanup", listener: (numResults: number) => void): this;

emit(event: "data", statement: Quad): boolean;
emit(event: "dataCleanup", statement: Quad): boolean;
emit(event: "end", numResults: number): boolean;
emit(event: "endCleanup", numResults: number): boolean;
}
class Generator extends EventEmitter {
private readonly query: ConstructQuery;
private readonly engine: QueryEngine;
private readonly batchSize: number | undefined
private batchArrayOfNamedNodes: NamedNode[] | undefined
public $this: NamedNode[] = []
private source: string = ''
private readonly endpoint: Endpoint;
private numberOfStatements: number = 0
private generateQuads(generator: Generator, queryString: string, onEvent:'data'|'dataCleanup',endEmit: 'end' | 'endCleanup'): void {
const emitType: any = endEmit;
const onType: any = onEvent;

generator.engine.queryQuads(queryString, {
sources: [generator.source],
}).then((stream) => {
stream.on('data', (quad: Quad) => {
this.numberOfStatements++;
generator.emit(onType, quad);
});


stream.on('end', () => {
generator.emit(emitType, this.numberOfStatements);
});
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${generator.source}`);
});
}


public constructor(stage: Stage) {
super()
this.batchSize = stage.configuration.generator.batchSize
this.query = getSPARQLQuery(
stage.configuration.generator.query,
"construct"
);

if (stage.configuration.generator.batchSize !== undefined) {
this.batchArrayOfNamedNodes = []
this.batchSize = stage.configuration.generator.batchSize
} else {
this.batchArrayOfNamedNodes = undefined
this.batchSize = undefined
}

this.endpoint =
stage.configuration.generator.endpoint === undefined
? stage.iterator.endpoint
Expand All @@ -48,86 +67,32 @@ class Generator extends EventEmitter {
}

public run($this: NamedNode): void {
let numberOfStatements = 0
if (this.source === '') this.source = getEngineSource(this.endpoint)
// batch processing of queries
this.$this.push($this)
// when batchSize is undefined -> treat it as batchSize is one
if ((this.$this.length === this.batchSize) || this.batchSize === undefined) {
// getting batch SPARQL query string
const queryString = getBatchSPARQLQueryString(this.query, this.$this)

// Clearing batch Named Node targets array when it is the size of the batchSize
this.$this = []

if (this.batchArrayOfNamedNodes !== undefined) {
// batch processing of queries
this.batchArrayOfNamedNodes.push($this)
if (this.batchArrayOfNamedNodes.length === this.batchSize) {
// getting batch SPARQL query string
const queryString = getBatchSPARQLQueryString(this.query, this.batchArrayOfNamedNodes)

// Clearing batch Named Node targets array when it is the size of the batchSize
this.batchArrayOfNamedNodes = []

// TODO turn this into a function
this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}`)
})
}
} else {
// Prebinding, see https://www.w3.org/TR/shacl/#pre-binding
// we know the query is safe to use replacement since we checked it before
const queryString = getSPARQLQueryString(this.query)
.replaceAll(
/[?$]\bthis\b/g,
`<${$this.value}>`
);
// TODO turn this into a function
this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}`)
})
this.generateQuads(this, queryString, "data", "end")
}
}

// clean up function, in case batch processing is used and there are leftovers in batchArrayOfNamedNodes
public end(): void {
let numberOfStatements = 0
if((this.batchArrayOfNamedNodes !== undefined) && (this.batchArrayOfNamedNodes?.length !== 0)){
const queryString = getBatchSPARQLQueryString(this.query, this.batchArrayOfNamedNodes)
if(this.$this?.length !== 0){
const queryString = getBatchSPARQLQueryString(this.query, this.$this)
// Clearing batch Named Node targets array when it is the size of the batchSize
this.batchArrayOfNamedNodes = []

// TODO turn this into a function
this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}`)
})
this.$this = []

this.generateQuads(this, queryString, "dataCleanup", "endCleanup")
}
else{
// REVIEW this doesn't seem to work - Stage class is not listening to this event
this.emit('end', numberOfStatements)
this.emit("endCleanup", this.numberOfStatements)
}
}
}
Expand Down
34 changes: 29 additions & 5 deletions src/lib/Pipeline.class.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/method-signature-style */
import ora from "ora";
import kebabcase from "lodash.kebabcase";
import type { LDWorkbenchConfiguration } from "./LDWorkbenchConfiguration.js";
Expand All @@ -9,8 +10,17 @@ import path from "node:path";
import * as fs from "node:fs";
import { isFilePathString, isTriplyDBPathString } from '../utils/guards.js';
import TriplyDB from './TriplyDB.class.js';
import EventEmitter from "node:events";

class Pipeline {
declare interface Pipeline {
on(event: "end", listener: () => void): this;
on(event: "error", listener: (error: Error) => void): this;

emit(event: "end"): boolean;
emit(event: "error", error: Error): boolean;
}

class Pipeline extends EventEmitter {
public readonly stages = new Map<string, Stage>();
public dataDir: string;
private $isValidated: boolean = false;
Expand All @@ -21,6 +31,7 @@ class Pipeline {
public constructor(
private readonly $configuration: LDWorkbenchConfiguration
) {
super()
// create data folder:
this.dataDir = path.join("pipelines", "data", kebabcase(this.$configuration.name));
fs.mkdirSync(this.dataDir, { recursive: true });
Expand Down Expand Up @@ -144,6 +155,7 @@ class Pipeline {
stage.on("iteratorResult", ($this) => {
spinner.text = $this.value;
});
// BUG number of statements seems to be inaccurate when using batchProcessing
stage.on("end", (iris, statements) => {
spinner.succeed(
`stage "${chalk.bold(
Expand All @@ -155,6 +167,7 @@ class Pipeline {
if (this.stageNames.length !== 0) {
this.runRecursive();
} else {
// BUG seems the buffer is not always updated and writes the file too soon
this.writeResult()
.then(_ => {
console.info(
Expand All @@ -164,8 +177,10 @@ class Pipeline {
)}" was completed in ${duration(this.now)}`
)
);
this.emit('end')
})
.catch(e => {
this.emit('error', e)
throw new Error('Pipeline failed: ' + (e as Error).message)
})
}
Expand All @@ -179,11 +194,20 @@ class Pipeline {

private async writeResult(): Promise<void> {
const spinner = ora('Writing results to destination').start();
await this.destination.write(this, spinner)
spinner.suffixText = this.destination.path
spinner.succeed()
await new Promise<void>((resolve, reject) => {
this.destination.write(this, spinner)
.then(() => {
spinner.suffixText = this.destination.path;
spinner.succeed();
resolve();
})
.catch((error) => {
spinner.fail();
reject(error);
});
});
}

get name(): string {
return this.$configuration.name;
}
Expand Down
48 changes: 19 additions & 29 deletions src/lib/Stage.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,38 +63,28 @@ class Stage extends EventEmitter {
this.emit('generatorResult', quadCount)
})
this.generator.on('end', _ => {
// when batchsize is used, the number of the generatorCount is the number of batchSize times smaller + the leftover elements that did not fit into the batch
generatorCount++
// eslint-disable-next-line @typescript-eslint/dot-notation
const numberOfLeftoverNamedNodes = this.generator["batchArrayOfNamedNodes"]!.length
console.log('🪵 | file: Stage.class.ts:70 | Stage | run | numberOfLeftoverNamedNodes:', numberOfLeftoverNamedNodes)
const batchGeneratorCount = generatorCount * this.configuration.generator.batchSize! + numberOfLeftoverNamedNodes
console.log('🪵 | file: Stage.class.ts:71 | Stage | run | generatorCount:', generatorCount)
console.log('🪵 | file: Stage.class.ts:71 | Stage | run | this.configuration.generator.batchSize!:', this.configuration.generator.batchSize!)
console.log('🪵 | file: Stage.class.ts:69 | Stage | run | batchGeneratorCount:', batchGeneratorCount)
console.log('🪵 | file: Stage.class.ts:77 | Stage | run | iteratorCount:', iteratorCount)
if (generatorCount === iteratorCount) {
console.log('🪵 | file: Stage.class.ts:70 | Stage | run | quadCount:', quadCount)
this.emit('end', iteratorCount, quadCount)
}
// with batchsize, the number of the generatorCount is the number of batchSize times smaller + the leftover elements that did not fit into the batch
else if ((this.configuration.generator.batchSize !== undefined) && (batchGeneratorCount === iteratorCount)) {
if (numberOfLeftoverNamedNodes !== 0){
// clean up generator and process quads
this.generator.end()
this.generator.on('data', quad => {
writer.addQuad(quad)
quadCount++
this.emit('generatorResult', quadCount)
})
this.generator.on('end', _ => {
console.log('🪵 | file: Stage.class.ts:83 | Stage | run | quadCount:', quadCount)
this.emit('end', iteratorCount, quadCount)
})
}else{
console.log('🪵 | file: Stage.class.ts:83 | Stage | run | quadCount:', quadCount)
const batchGeneratorCount = generatorCount * this.configuration.generator.batchSize! + this.generator.$this.length
if (batchGeneratorCount === iteratorCount){
if (this.generator.$this.length > 0){
// clean up generator and process quad
this.generator.end()
this.generator.on('dataCleanup', quad => {
writer.addQuad(quad)
quadCount++
this.emit('generatorResult', quadCount)
})
this.generator.on('endCleanup', _ => {
this.emit('end', iteratorCount, quadCount)
})
// in case the batchSize exactly results in an empty array
}else{
this.emit('end', iteratorCount, quadCount)
}
}else if (generatorCount === iteratorCount){
this.emit('end', iteratorCount, quadCount)
}
}
})
this.iterator.on('data', $this => {
this.generator.run($this)
Expand Down
Loading

0 comments on commit 9355e4c

Please sign in to comment.