Skip to content

Commit

Permalink
Refactored iterator/generator to streams/events, this should improve …
Browse files Browse the repository at this point in the history
…speed and scalibility.
  • Loading branch information
mightymax committed Nov 29, 2023
1 parent 4c23e55 commit 2545be5
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 108 deletions.
33 changes: 28 additions & 5 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
/* eslint-disable @typescript-eslint/method-signature-style */
import type { ConstructQuery } from "sparqljs";
import type Stage from "./Stage.class.js";
import getSPARQLQuery from "../utils/getSPARQLQuery.js";
import type { Quad, NamedNode, ResultStream } from "@rdfjs/types";
import type { Quad, NamedNode } from "@rdfjs/types";
import getSPARQLQueryString from "../utils/getSPARQLQueryString.js";
import getEndpoint from "../utils/getEndpoint.js";
import type { Endpoint, QueryEngine } from "./types.js";
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import EventEmitter from 'node:events';

export default class Generator {
declare interface Generator {
on(event: "data", listener: (statement: Quad) => void): this;
on(event: "end", listener: (numResults: number) => void): this;

emit(event: "data", statement: Quad): boolean;
emit(event: "end", numResults: number): boolean;
}
class Generator extends EventEmitter {
private readonly query: ConstructQuery;
private readonly engine: QueryEngine;
private source: string = ''
private readonly endpoint: Endpoint;
public constructor(stage: Stage) {
super()
this.query = getSPARQLQuery(
stage.configuration.generator.query,
"construct"
Expand All @@ -27,7 +37,7 @@ export default class Generator {
this.engine = getEngine(this.endpoint)
}

public async loadStatements($this: NamedNode): Promise<ResultStream<Quad>> {
public run($this: NamedNode): void {
// Prebinding, see https://www.w3.org/TR/shacl/#pre-binding
// we know the query is safe to use replacement since we checked it before
const queryString = getSPARQLQueryString(this.query)
Expand All @@ -36,8 +46,21 @@ export default class Generator {
`<${$this.value}>`
);
if (this.source === '') this.source = getEngineSource(this.endpoint)
return this.engine.queryQuads(queryString, {
let numberOfStatements = 0
this.engine.queryQuads(queryString, {
sources: [this.source]
});
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements ++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(e => {
throw e as Error
})
}
}

export default Generator
89 changes: 46 additions & 43 deletions src/lib/Iterator.class.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* eslint-disable @typescript-eslint/method-signature-style */
import EventEmitter from 'node:events';
import type { SelectQuery } from "sparqljs";
import type Stage from "./Stage.class.js";
import type { Term, NamedNode } from "@rdfjs/types";
import { DataFactory } from "n3";
import type { NamedNode } from "@rdfjs/types";
import getSPARQLQuery from "../utils/getSPARQLQuery.js";
import { type Bindings } from "@comunica/types";
import getSPARQLQueryString from "../utils/getSPARQLQueryString.js";
Expand All @@ -12,63 +13,65 @@ import getEngineSource from '../utils/getEngineSource.js';

const DEFAULT_LIMIT = 10;

export default class Iterator
implements AsyncIterator<NamedNode>, AsyncIterable<NamedNode>
declare interface Iterator {
on(event: "data", listener: ($this: NamedNode) => void): this;
on(event: "end", listener: (numResults: number) => void): this;

emit(event: "data", $this: NamedNode): boolean;
emit(event: "end", numResults: number): boolean;
}

class Iterator
extends EventEmitter
{
private readonly query: SelectQuery;
public readonly endpoint: Endpoint;
private readonly engine: QueryEngine;
private source: string = ''
private bindings?: Bindings[];
private $offset = 0;
private $index = 0;
constructor(private readonly stage: Stage) {
private totalResults = 0

constructor(stage: Stage) {
super()
this.query = getSPARQLQuery(stage.configuration.iterator.query, "select");
this.query.limit = stage.configuration.iterator.batchSize ?? DEFAULT_LIMIT;
this.endpoint = getEndpoint(stage)
this.engine = getEngine(this.endpoint)
}

private async loadBindings(): Promise<Bindings[]> {
public run(): void {
let resultsPerPage = 0
if (this.source === '') this.source = getEngineSource(this.endpoint)
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
const bindings = await this.engine.queryBindings(queryString, {
this.engine.queryBindings(queryString, {
sources: [this.source],
});
return bindings.toArray();
}

public async next(): Promise<{ value: NamedNode; done: boolean }> {
if (this.bindings === undefined) this.bindings = await this.loadBindings();
if (this.$index > this.bindings.length - 1) {
this.$offset += this.query.limit ?? DEFAULT_LIMIT;
this.bindings = await this.loadBindings();
this.$index = 0;
}
const done: boolean =
this.bindings === undefined || this.bindings.length === 0;
let value: Term;
if (!done) {
const $this = this.bindings[this.$index].get("this");
if ($this === undefined) {
throw new Error(
`Missing binding \`$this\` in your SPARQL query in \`${this.stage.configuration.iterator.query}\`.`
);
} else if ($this.termType !== "NamedNode") {
throw new Error(
`Binding \`$this\` in your SPARQL query in \`${this.stage.configuration.iterator.query}\` should be an Iri (NamedNode), found a ${$this.termType}.`
);
}
value = $this as NamedNode;
} else {
value = DataFactory.namedNode("");
}
this.$index++;
return { value, done };
}
}).then(stream => {
stream.on('data', (binding: Bindings) => {
resultsPerPage++
if (!binding.has('this')) throw new Error('Missing binding $this in the Iterator result.')
const $this = binding.get('this')!
if ($this.termType !== 'NamedNode') {
throw new Error(`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`)
} else {
this.emit('data', $this)
}
});

public [Symbol.asyncIterator](): Iterator {
return this;
stream.on('end', () => {
this.totalResults += resultsPerPage
this.$offset += this.query.limit!
if (resultsPerPage < this.query.limit!) {
this.emit('end', this.totalResults)
} else {
this.run()
}
});
})
.catch(e => {
throw e
})
}
}

export default Iterator
48 changes: 23 additions & 25 deletions src/lib/Pipeline.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,35 +99,33 @@ class Pipeline {
spinner.fail((e as Error).message);
this.error(e as Error);
}
let i = -1
for (const name of this.stages.keys()) {
i++
if (i < startFromStage) {
ora().start().info(`skipping stage "${chalk.bold(name)}" as requested`).stop()
} else {
const spinner = ora("Loading results from Iterator").start();
const stage = this.stages.get(name)!;
stage.on("iteratorResult", ($this) => {
spinner.text = $this.value;
});
let count = 0
stage.on("generatorResult", () => {
count++
});

try {
await stage.run()
// @TODO: the # of quads is not correct, should be something in the async loop...
spinner.succeed(`stage "${chalk.bold(name)}" resulted in ${count} quads`)
} catch (e) {
spinner.fail((e as Error).message);
this.error(e as Error);
const stageNames = Array.from(this.stages.keys()).splice(startFromStage)

function run(stages: Map<string, Stage>): void {
const stage = stages.get(stageNames.shift()!)!
const spinner = ora("Loading results from Iterator").start();
stage.on("iteratorResult", ($this) => {
spinner.text = $this.value;
});
stage.on("end", (iris, statements) => {
spinner.succeed(`stage "${chalk.bold(stage.name)}" resulted in ${statements} statement${statements === 1 ?'':'s'} in ${iris} iteration${iris === 1 ?'':'s'}.`)
if (stageNames.length !== 0) {
run(stages)
} else {
console.info(
chalk.green(`✔ your pipeline was completed in ${duration(now)}`)
);
}
});
try {
stage.run()
} catch(e) {
spinner.fail((e as Error).message);
}
}
console.info(
chalk.green(`✔ your pipeline was completed in ${duration(now)}`)
);

run(this.stages)
}

get name(): string {
Expand Down
62 changes: 27 additions & 35 deletions src/lib/Stage.class.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/adjacent-overload-signatures */
/* eslint-disable @typescript-eslint/method-signature-style */
import EventEmitter from 'node:events';
import File from './File.class.js';
Expand All @@ -9,25 +8,16 @@ import kebabcase from 'lodash.kebabcase'
import type Pipeline from './Pipeline.class.js';
import path from 'node:path';
import { Writer } from 'n3'
import type { Quad, NamedNode } from '@rdfjs/types'
import type { NamedNode } from '@rdfjs/types'
import type { WriteStream } from 'node:fs';
declare interface Stage {
on(event: "generatorResult", listener: () => void): this;
off(event: "generatorResult", listener: () => void): this;
emit(event: "generatorResult"): boolean;

on(event: "generatorResultFinished", listener: (statements: number) => void): this;
off(event: "generatorResultFinished", listener: (statements: number) => void): this;
emit(event: "generatorResultFinished", statements: number): boolean;

on(event: "finished", listener: (statements: number) => void): this;
off(event: "finished", listener: (statements: number) => void): this;
emit(event: "finished", statements: number): boolean;

on(event: "generatorResult", listener: (count: number) => void): this;
on(event: "end", listener: (iteratorCount: number, statements: number) => void): this;
on(event: "iteratorResult", listener: ($this: NamedNode) => void): this;
off(event: "iteratorResult", listener: ($this: NamedNode) => void): this;
emit(event: "iteratorResult", $this: NamedNode): boolean;

emit(event: "generatorResult", count: number): boolean;
emit(event: "end", iteratorCount: number, statements: number): boolean;
emit(event: "iteratorResult", $this: NamedNode): boolean;
}

class Stage extends EventEmitter {
Expand Down Expand Up @@ -62,27 +52,29 @@ class Stage extends EventEmitter {
return this.configuration.name
}

public async run(): Promise<number> {
public run(): void {
let quadCount = 0
const writeStream = this.destination()
for await (const $this of this.iterator) {
let qc = 0
let iteratorCount = 0
let generatorCount = 0
const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' })
this.generator.on('data', quad => {
writer.addQuad(quad)
quadCount ++
})
this.generator.on('end', _ => {
generatorCount++
if (generatorCount === iteratorCount) {
this.emit('end', iteratorCount, quadCount)
}
})
this.iterator.on('data', $this => {
this.generator.run($this)
this.emit('iteratorResult', $this)
const quadStream = await this.generator.loadStatements($this)
const writer = new Writer(writeStream, { end: false, format: 'N-Triples' })
quadStream.on('data', (quad: Quad) => {
this.emit('generatorResult')
quadCount++
qc++
writer.addQuad(quad)
})

quadStream.on('end', () => {
this.emit('generatorResultFinished', qc)
})
}
this.emit('finished', quadCount)
return quadCount
})
this.iterator.on('end', count => {
iteratorCount = count
})
this.iterator.run()
}

}
Expand Down

0 comments on commit 2545be5

Please sign in to comment.