diff --git a/src/lib/Generator.class.ts b/src/lib/Generator.class.ts index e24b122..33bda5c 100644 --- a/src/lib/Generator.class.ts +++ b/src/lib/Generator.class.ts @@ -25,11 +25,12 @@ class Generator extends EventEmitter { private readonly query: ConstructQuery; private readonly engine: QueryEngine; private iterationsProcessed: number = 0 - private iterationsIncoming?: number + private iterationsIncoming: number = 0 private statements: number = 0 private source: string = '' - private readonly $thisList: NamedNode[] = [] + private $thisList: NamedNode[] = [] private readonly endpoint: Endpoint; + // private iteratorEnded: boolean = false; public constructor(private readonly stage: Stage, private readonly index: number) { if (stage.configuration.generator === undefined) throw new Error('Error in Generator: no generators were present in stage configuration') super() @@ -46,57 +47,59 @@ class Generator extends EventEmitter { this.engine = getEngine(this.endpoint) - stage.iterator.on('end', count => { - this.iterationsIncoming = count - for (const $this of this.$thisList) { - this.run($this, this.$thisList.length) - } + stage.iterator.on('end', _count => { + this.flush(); }) + } + public run($this: NamedNode, batchSize?: number): void { + this.$thisList.push($this) + this.iterationsIncoming++; + if (this.$thisList.length >= (batchSize ?? this.batchSize)) { + this.runBatch(this.$thisList); + this.$thisList = []; + } } private get batchSize(): number { return this.stage.configuration.generator[this.index].batchSize ?? DEFAULT_BATCH_SIZE } - - public run($this?: NamedNode, batchSize?: number): void { - if ($this !== undefined) this.$thisList.push($this) + private runBatch(batch: NamedNode[]): void { const error = (e: any): Error => new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}: ${(e as Error).message}`) - if (this.$thisList.length >= (batchSize ?? this.batchSize)) { - if (this.source === '') this.source = getEngineSource(this.endpoint) - const unionQuery = getSPARQLQuery(getSPARQLQueryString(this.query), "construct"); - const patterns = unionQuery.where ?? []; - const valuePatterns: ValuePatternRow[] = [] - for (const $this of this.$thisList) { - this.iterationsProcessed++ - valuePatterns.push({'?this': $this}) - } - patterns.push({ type: 'values', values: valuePatterns }); - unionQuery.where = [{ type: 'group', patterns }] + if (this.source === '') this.source = getEngineSource(this.endpoint) + const unionQuery = getSPARQLQuery(getSPARQLQueryString(this.query), "construct"); + const patterns = unionQuery.where ?? []; + const valuePatterns: ValuePatternRow[] = [] + for (const $this of batch) { + valuePatterns.push({'?this': $this}) + } + patterns.push({ type: 'values', values: valuePatterns }); + unionQuery.where = [{ type: 'group', patterns }] - this.engine.queryQuads(getSPARQLQueryString(unionQuery), { - sources: [this.source] - }).then(stream => { - stream.on('data', (quad: Quad) => { - this.statements++ - this.emit('data', quad) - }) - stream.on('error', (e) => { - this.emit("error", error(e)) - }) - stream.on('end', () => { - if (this.iterationsIncoming !== undefined && this.iterationsProcessed >= this.iterationsIncoming) { - this.emit('end', this.iterationsIncoming, this.statements, this.iterationsProcessed) - } - }) - }).catch(e => { + this.engine.queryQuads(getSPARQLQueryString(unionQuery), { + sources: [this.source] + }).then(stream => { + stream.on('data', (quad: Quad) => { + this.statements++ + this.emit('data', quad) + }) + stream.on('error', (e) => { this.emit("error", error(e)) }) - this.$thisList.length = 0 - } + stream.on('end', () => { + this.iterationsProcessed += batch.length; + this.emit('end', this.iterationsIncoming, this.statements, this.iterationsProcessed); + }) + }).catch(e => { + this.emit("error", error(e)) + }) + } + + private flush(): void { + this.runBatch(this.$thisList); } } -export default Generator \ No newline at end of file +export default Generator diff --git a/src/lib/Stage.class.ts b/src/lib/Stage.class.ts index c855549..238954a 100644 --- a/src/lib/Stage.class.ts +++ b/src/lib/Stage.class.ts @@ -26,6 +26,7 @@ class Stage extends EventEmitter { public destination: () => WriteStream public iterator: Iterator public generators: Generator[] = [] + private iteratorEnded: boolean = false; public constructor( public readonly pipeline: Pipeline, @@ -57,18 +58,17 @@ class Stage extends EventEmitter { public get name(): string { return this.configuration.name } - + public run(): void { const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' }); let quadCount = 0; const generatorProcessedCounts = new Map(); - let generatorsFinished = 0; let quadsGenerated = 0; const checkEnd = (iterationsIncoming: number, statements: number): void => { - // Check if all generators have processed all iterations - if (generatorsFinished === this.configuration.generator.length) { + if (![...generatorProcessedCounts].some(([_, processed]) => processed < iterationsIncoming) + && this.iteratorEnded) { this.emit('end', iterationsIncoming, statements); } }; @@ -84,10 +84,7 @@ class Stage extends EventEmitter { }); generator.on('end', (iterationsIncoming, statements, processed) => { - generatorProcessedCounts.set(index, generatorProcessedCounts.get(index)! + processed); - if (generatorProcessedCounts.get(index)! >= iterationsIncoming) { - generatorsFinished++; - } + generatorProcessedCounts.set(index, processed); checkEnd(iterationsIncoming, statements); }); @@ -101,7 +98,12 @@ class Stage extends EventEmitter { generator.run($this); }); this.emit('iteratorResult', $this, quadsGenerated); - }); + }); + + this.iterator.on('end', (_count) => { + this.iteratorEnded = true; + }); + this.iterator.on('error', e => { this.emit('error', e) @@ -115,4 +117,4 @@ class Stage extends EventEmitter { } -export default Stage \ No newline at end of file +export default Stage diff --git a/src/lib/tests/Generator.class.test.ts b/src/lib/tests/Generator.class.test.ts index ed67c19..542fbc7 100644 --- a/src/lib/tests/Generator.class.test.ts +++ b/src/lib/tests/Generator.class.test.ts @@ -107,7 +107,7 @@ describe('Generator Class', () => { await pipelineParallelGenerators.run() const file = fs.readFileSync(filePath, {encoding: 'utf-8'}) const fileLines = file.split('\n').sort() - expect(fileLines.length).to.equal(873) + expect(fileLines.length).to.equal(741) expect(fileLines[0]).to.equal('') expect(fileLines[1]).to.equal(' .') expect(fileLines[fileLines.length - 1]).to.equal(' "Instance 150 of the Iris Virginica"@en .')