Skip to content

Commit

Permalink
blocker - stuck on batchSize implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Philippe Renzen committed Dec 11, 2023
1 parent 3c1a902 commit 3863f85
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 25 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 65 additions & 21 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { Endpoint, QueryEngine } from "./types.js";
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import EventEmitter from 'node:events';
import getBatchSPARQLQueryString from "../utils/getBatchSPARQLQueryString.js";

declare interface Generator {
on(event: "data", listener: (statement: Quad) => void): this;
Expand All @@ -20,6 +21,8 @@ declare interface Generator {
class Generator extends EventEmitter {
private readonly query: ConstructQuery;
private readonly engine: QueryEngine;
private readonly batchSize: number | undefined
private batchTestArrayOfNamedNodes: NamedNode[] | undefined
private source: string = ''
private readonly endpoint: Endpoint;
public constructor(stage: Stage) {
Expand All @@ -28,7 +31,15 @@ class Generator extends EventEmitter {
stage.configuration.generator.query,
"construct"
);


if (stage.configuration.generator.batchSize !== undefined) {
this.batchTestArrayOfNamedNodes = []
this.batchSize = stage.configuration.generator.batchSize
} else {
this.batchTestArrayOfNamedNodes = undefined
this.batchSize = undefined
}

this.endpoint =
stage.configuration.generator.endpoint === undefined
? stage.iterator.endpoint
Expand All @@ -38,28 +49,61 @@ class Generator extends EventEmitter {
}

public run($this: NamedNode): void {
// Prebinding, see https://www.w3.org/TR/shacl/#pre-binding
// we know the query is safe to use replacement since we checked it before
const queryString = getSPARQLQueryString(this.query)
.replaceAll(
/[?$]\bthis\b/g,
`<${$this.value}>`
);
if (this.source === '') this.source = getEngineSource(this.endpoint)
let numberOfStatements = 0
this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements ++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
if (this.source === '') this.source = getEngineSource(this.endpoint)

if (this.batchTestArrayOfNamedNodes !== undefined) {
// batch processing of queries
this.batchTestArrayOfNamedNodes.push($this)

// REVIEW BLOCKER
// BUG in this approach the given batchSize could result in leftover NamedNodes
// (e.g. 101 NamedNodes result in 10 full arrays of batchTestArrayOfNamedNodes, but the 11th with only 1 value and never meeting the condition)
// => no way to check the iterator's 'index' in stage
if (this.batchTestArrayOfNamedNodes.length === this.batchSize) {
// getting batch SPARQL query string
const queryString = getBatchSPARQLQueryString(this.query, this.batchTestArrayOfNamedNodes)

// Clearing batch Named Node targets array when it is the size of the batchSize
this.batchTestArrayOfNamedNodes = []

this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}`)
})
}
} else {
// Prebinding, see https://www.w3.org/TR/shacl/#pre-binding
// we know the query is safe to use replacement since we checked it before
const queryString = getSPARQLQueryString(this.query)
.replaceAll(
/[?$]\bthis\b/g,
`<${$this.value}>`
);

this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}`)
})
}).catch(_ => {
throw new Error(`The Generator did not run succesfully, it could not get the results from the endpoint ${this.source}`)
})
}
}
}

Expand Down
38 changes: 38 additions & 0 deletions src/lib/tests/Generator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Pipeline from "../Pipeline.class.js";
import * as chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import { NamedNode } from "n3";
import type { LDWorkbenchConfiguration } from "../LDWorkbenchConfiguration.js";
chai.use(chaiAsPromised)
const expect = chai.expect

Expand All @@ -27,6 +28,43 @@ describe('Generator Class', () => {
});
// BUG when both the generator and iterator tests are running, it seems the iterator will never terminate
describe.skip('run', () => {
it('Should work with batch processing', () => {
const configuration: LDWorkbenchConfiguration = {
name: 'Example Pipeline',
description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n',
destination: 'file://pipelines/data/example-pipeline.nt',
stages: [
{
name: 'Stage 1',
iterator: {
query: 'file://static/example/iterator-stage-1.rq',
endpoint: 'https://api.triplydb.com/datasets/Triply/iris/services/demo-service/sparql'
},
generator: {
query: 'file://static/example/generator-stage-1.rq',
batchSize: 10
}
},
{
name: 'Stage 2',
iterator: {
query: 'file://static/example/iterator-stage-2.rq',
},
generator: {
query: 'file://static/example/generator-stage-2.rq',
endpoint: 'https://query.wikidata.org/sparql',
batchSize: 1
}
}
]
}
const pipeline = new Pipeline(configuration)
pipeline.validate()
const stage = new Stage(pipeline, configuration.stages[0])
stage.run()
}

)
it('should emit "data" and "end" events with the correct number of statements', async () => {
const configuration = parseYamlFile('./static/example/config.yml')
const pipeline = new Pipeline(configuration)
Expand Down
2 changes: 1 addition & 1 deletion src/lib/tests/Stage.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('Stage Class', () => {
});
});

describe('run', () => {
describe.skip('run', () => {
it('should run the stage correctly', async function () {
this.timeout(5000)
const configuration = parseYamlFile('./static/example/config.yml')
Expand Down
36 changes: 36 additions & 0 deletions src/utils/getBatchSPARQLQueryString.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import type { NamedNode } from "@rdfjs/types";
import sparqljs from 'sparqljs'
import { type SelectQuery, type ConstructQuery } from 'sparqljs'
const { Generator } = sparqljs

/**
*
* @param query SPARQL construct/Select query template with $this
* @param arrayOfNamedNodes an array of named nodes
* @returns a batch SPARQL query
*/
function getBatchSPARQLQueryString(query: SelectQuery | ConstructQuery, arrayOfNamedNodes: NamedNode[]): string {
let batchQuery: string = ''
for (let index = 0; index < arrayOfNamedNodes.length; index++) {
const value = arrayOfNamedNodes[index].value;
const generator = new Generator();
console.log(query.prefixes)
if (index === 0){
batchQuery += generator.stringify(query).replaceAll(
/[?$]\bthis\b/g,
`<${value}>`
);
}else{
query.prefixes = {} // clearing prefixes
console.log(query.prefixes)
batchQuery += " UNION " + generator.stringify(query).replaceAll(
/[?$]\bthis\b/g,
`<${value}>`
);
}
}

return batchQuery
}

export default getBatchSPARQLQueryString
5 changes: 3 additions & 2 deletions static/ld-workbench.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"description": "The SPARQL endpoint for the iterator. \nIf it starts with \"file://\", a local RDF file is queried.\nIf ommmitted the result of the previous file is used."
},
"batchSize": {
"type": "number",
"type": "integer",
"description": "Overrule the iterator's behaviour of fetching 10 results per request, regardless of any limit's in your query."
}
}
Expand All @@ -63,7 +63,8 @@
"description": "The SPARQL endpoint for the generator. \nIf it starts with \"file://\", a local RDF file is queried.\nIf ommmitted the endpoint of the Iterator is used."
},
"batchSize": {
"type": "number",
"type": "integer",
"minimum": 1,
"description": "Overrule the generator's behaviour of fetching results for 10 bindings of $this per request."
}
}
Expand Down

0 comments on commit 3863f85

Please sign in to comment.