Skip to content

Commit

Permalink
refactor: Extract Query classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeboer committed Jun 17, 2024
1 parent f156140 commit 0160547
Show file tree
Hide file tree
Showing 12 changed files with 410 additions and 402 deletions.
8 changes: 4 additions & 4 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ export default {
coverageReporters: ['json-summary', 'text'],
coverageThreshold: {
global: {
lines: 65.49,
statements: 64.98,
branches: 56.08,
functions: 71.15,
lines: 67.89,
statements: 68.24,
branches: 64.36,
functions: 74.4,
},
},
transform: {
Expand Down
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"clean": "gts clean",
"compile": "tsc",
"fix": "gts fix",
"posttest": "npm run fix"
"posttest": "jest-coverage-thresholds-bumper --silent"
},
"repository": {
"type": "git",
Expand Down Expand Up @@ -67,6 +67,7 @@
"@types/inquirer": "^9.0.7",
"@types/jest": "^29.5.12",
"@types/js-yaml": "^4.0.9",
"@types/lodash.clonedeep": "^4.5.9",
"@types/lodash.kebabcase": "^4.1.9",
"@types/n3": "^1.16.4",
"@types/node": "^20.12.12",
Expand All @@ -92,6 +93,7 @@
"glob": "^7.2.3",
"inquirer": "^9.2.12",
"js-yaml": "^4.1.0",
"lodash.clonedeep": "^4.5.0",
"lodash.kebabcase": "^4.1.1",
"millify": "^6.1.0",
"n3": "^1.17.2",
Expand Down
87 changes: 69 additions & 18 deletions src/generator.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import type {ConstructQuery} from 'sparqljs';
import type {ConstructQuery, Pattern} from 'sparqljs';
import type Stage from './stage.js';
import getSPARQLQuery from './utils/getSPARQLQuery.js';
import type {Quad, NamedNode} from '@rdfjs/types';
import getSPARQLQueryString from './utils/getSPARQLQueryString.js';
import type {NamedNode, Quad} from '@rdfjs/types';
import getEndpoint from './utils/getEndpoint.js';
import type {Endpoint, QueryEngine, QuerySource} from './types.js';
import getEngine from './utils/getEngine.js';
import getEngineSource from './utils/getEngineSource.js';
import EventEmitter from 'node:events';
import {BaseQuery} from './sparql.js';
import clonedeep from 'lodash.clonedeep';

const DEFAULT_BATCH_SIZE = 10;

Expand All @@ -18,7 +19,7 @@ interface Events {
}

export default class Generator extends EventEmitter<Events> {
private readonly query: ConstructQuery;
private readonly query: Query;
private readonly engine: QueryEngine;
private iterationsProcessed = 0;
private iterationsIncoming = 0;
Expand All @@ -37,9 +38,11 @@ export default class Generator extends EventEmitter<Events> {
);
super();
this.index = index;
this.query = getSPARQLQuery(
stage.configuration.generator[this.index].query,
'construct'
this.query = Query.from(
getSPARQLQuery(
stage.configuration.generator[this.index].query,
'construct'
)
);

this.endpoint =
Expand Down Expand Up @@ -78,20 +81,10 @@ export default class Generator extends EventEmitter<Events> {
`The Generator did not run successfully, it could not get the results from the endpoint ${this
.source?.value}: ${(e as Error).message}`
);
const unionQuery = getSPARQLQuery(
getSPARQLQueryString(this.query),
'construct'
);
const patterns = unionQuery.where ?? [];
patterns.push({
type: 'values',
values: batch.map($this => ({'?this': $this})),
});
unionQuery.where = [{type: 'group', patterns}];

try {
const stream = await this.engine.queryQuads(
getSPARQLQueryString(unionQuery),
this.query.withIris(batch).toString(),
{
sources: [(this.source ??= getEngineSource(this.endpoint))],
}
Expand Down Expand Up @@ -124,3 +117,61 @@ export default class Generator extends EventEmitter<Events> {
await this.runBatch(this.$thisList);
}
}

export class Query extends BaseQuery {
public static from(query: ConstructQuery) {
const self = new this(query);
self.validate();
return self;
}

private constructor(protected readonly query: ConstructQuery) {
super(query);
}

public withIris(iris: NamedNode[]) {
const query = clonedeep(this.query);
const patterns: Pattern[] = [
...(query.where ?? []),
{
type: 'values',
values: iris.map($this => ({'?this': $this})),
},
];
query.where = [{type: 'group', patterns}];

return new Query(query);
}

protected validate() {
this.validatePreBinding(this.query.where ?? []);
}

/**
* Because we use pre-binding, the query must follow the rules as specified by https://www.w3.org/TR/shacl/#pre-binding:
* - SPARQL queries must not contain a MINUS clause
* - SPARQL queries must not contain a federated query (SERVICE)
* - SPARQL queries must not contain a VALUES clause
* - SPARQL queries must not use the syntax form `AS ?var` for any potentially pre-bound variable
*/
private validatePreBinding(patterns: Pattern[]) {
for (const pattern of patterns) {
if (pattern.type === 'bind' && pattern.variable.value === 'this') {
throw new Error(
'SPARQL CONSTRUCT generator query must not use the syntax form `AS ?this` because it is a pre-bound variable'
);
} else if (['minus', 'service', 'values'].includes(pattern.type)) {
throw new Error(
`SPARQL CONSTRUCT generator query must not contain a ${pattern.type.toUpperCase()} clause`
);
} else if (
pattern.type === 'optional' ||
pattern.type === 'union' ||
pattern.type === 'group' ||
pattern.type === 'graph'
) {
this.validatePreBinding(pattern.patterns);
}
}
}
}
50 changes: 34 additions & 16 deletions src/iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import type Stage from './stage.js';
import type {NamedNode} from '@rdfjs/types';
import getSPARQLQuery from './utils/getSPARQLQuery.js';
import {type Bindings} from '@comunica/types';
import getSPARQLQueryString from './utils/getSPARQLQueryString.js';
import getEndpoint from './utils/getEndpoint.js';
import type {Endpoint, QueryEngine, QuerySource} from './types.js';
import getEngine from './utils/getEngine.js';
import getEngineSource from './utils/getEngineSource.js';
import parse from 'parse-duration';
import {BaseQuery} from './sparql.js';

const DEFAULT_LIMIT = 10;

Expand All @@ -20,22 +20,20 @@ interface Events {
}

export default class Iterator extends EventEmitter<Events> {
private readonly query: SelectQuery;
private readonly query: Query;
public readonly endpoint: Endpoint;
private readonly engine: QueryEngine;
private readonly delay: number = 0;
private source?: QuerySource;
private $offset = 0;
private offset = 0;
public totalResults = 0;

constructor(stage: Stage) {
super();
this.query = getSPARQLQuery(stage.configuration.iterator.query, 'select');
this.query.limit =
stage.configuration.iterator.batchSize ??
this.query.limit ??
DEFAULT_LIMIT;
this.validateQuery();
this.query = Query.from(
getSPARQLQuery(stage.configuration.iterator.query, 'select'),
stage.configuration.iterator.batchSize
);
this.endpoint = getEndpoint(stage);
this.engine = getEngine(this.endpoint);
if (stage.configuration.iterator.delay !== undefined) {
Expand All @@ -51,18 +49,17 @@ export default class Iterator extends EventEmitter<Events> {
public async run(): Promise<void> {
setTimeout(async () => {
let resultsPerPage = 0;
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
this.query.offset = this.offset;
const error = (e: unknown): Error =>
new Error(
`The Iterator did not run successfully, it could not get the results from the endpoint ${
this.source
} (offset: ${this.$offset}, limit ${this.query.limit}): ${
} (offset: ${this.offset}, limit ${this.query.limit}): ${
(e as Error).message
}`
);
try {
const stream = await this.engine.queryBindings(queryString, {
const stream = await this.engine.queryBindings(this.query.toString(), {
sources: [(this.source ??= getEngineSource(this.endpoint))],
});

Expand All @@ -81,7 +78,7 @@ export default class Iterator extends EventEmitter<Events> {
});
stream.on('end', () => {
this.totalResults += resultsPerPage;
this.$offset += this.query.limit!;
this.offset += this.query.limit;
if (resultsPerPage < this.query.limit!) {
this.emit('end', this.totalResults);
} else {
Expand All @@ -97,16 +94,37 @@ export default class Iterator extends EventEmitter<Events> {
}
}, this.delay);
}
}

export class Query extends BaseQuery {
public static from(query: SelectQuery, limit?: number) {
const self = new Query(query);
self.query.limit = limit ?? self.query.limit ?? DEFAULT_LIMIT;
self.validate();
return self;
}

private constructor(protected readonly query: SelectQuery) {
super(query);
}

get limit(): number {
return this.query.limit!;
}

set offset(offset: number) {
this.query.offset = offset;
}

private validateQuery() {
protected validate() {
if (
!this.query.variables.find(
v =>
v instanceof sparqljs.Wildcard || (v as VariableTerm).value === 'this'
)
) {
throw new Error(
'The SPARQL query must select either a variable $this or a wildcard *'
'The SPARQL iterator query must select either a variable $this or a wildcard *'
);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ class Pipeline {

Array.from(this.stages.keys())
.slice(0, startFromStage)
.forEach(stagename => {
new Progress({silent: this.opts?.silent === true})
.start(`stage "${chalk.bold(stagename)}" was skipped`)
.stop();
.forEach(stage => {
new Progress({silent: false}).info(
`Skipping stage ${chalk.bold(stage)}”`
);
});
await this.runStage();
}
Expand Down
12 changes: 12 additions & 0 deletions src/sparql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import sparqljs, {type SparqlQuery} from 'sparqljs';
const {Generator} = sparqljs;

const generator = new Generator();

export abstract class BaseQuery {
protected constructor(protected readonly query: SparqlQuery) {}

protected abstract validate(): void;

public toString = () => generator.stringify(this.query);
}
Loading

0 comments on commit 0160547

Please sign in to comment.