Skip to content

Commit

Permalink
fix: Set source type for endpoints (#68)
Browse files Browse the repository at this point in the history
* Refactor Pipeline to validate in constructor instead of requiring a
  separate call to valiate().
  • Loading branch information
ddeboer authored May 30, 2024
1 parent 46e094f commit b53c27b
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 102 deletions.
8 changes: 3 additions & 5 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import getSPARQLQuery from '../utils/getSPARQLQuery.js';
import type {Quad, NamedNode} from '@rdfjs/types';
import getSPARQLQueryString from '../utils/getSPARQLQueryString.js';
import getEndpoint from '../utils/getEndpoint.js';
import type {Endpoint, QueryEngine} from './types.js';
import type {Endpoint, QueryEngine, QuerySource} from './types.js';
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import EventEmitter from 'node:events';
Expand All @@ -23,10 +23,9 @@ export default class Generator extends EventEmitter<Events> {
private iterationsProcessed = 0;
private iterationsIncoming = 0;
private statements = 0;
private source = '';
private $thisList: NamedNode[] = [];
private readonly endpoint: Endpoint;
// private iteratorEnded: boolean = false;
private source?: QuerySource;
public constructor(
private readonly stage: Stage,
private readonly index: number
Expand Down Expand Up @@ -77,7 +76,6 @@ export default class Generator extends EventEmitter<Events> {
this.source
}: ${(e as Error).message}`
);
if (this.source === '') this.source = getEngineSource(this.endpoint);
const unionQuery = getSPARQLQuery(
getSPARQLQueryString(this.query),
'construct'
Expand All @@ -92,7 +90,7 @@ export default class Generator extends EventEmitter<Events> {

this.engine
.queryQuads(getSPARQLQueryString(unionQuery), {
sources: [this.source],
sources: [(this.source ??= getEngineSource(this.endpoint))],
})
.then(stream => {
stream.on('data', (quad: Quad) => {
Expand Down
7 changes: 3 additions & 4 deletions src/lib/Iterator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import getSPARQLQuery from '../utils/getSPARQLQuery.js';
import {type Bindings} from '@comunica/types';
import getSPARQLQueryString from '../utils/getSPARQLQueryString.js';
import getEndpoint from '../utils/getEndpoint.js';
import type {Endpoint, QueryEngine} from './types.js';
import type {Endpoint, QueryEngine, QuerySource} from './types.js';
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import parse from 'parse-duration';
Expand All @@ -24,7 +24,7 @@ export default class Iterator extends EventEmitter<Events> {
public readonly endpoint: Endpoint;
private readonly engine: QueryEngine;
private readonly delay: number = 0;
private source = '';
private source?: QuerySource;
private $offset = 0;
public totalResults = 0;

Expand All @@ -50,7 +50,6 @@ export default class Iterator extends EventEmitter<Events> {
public run(): void {
setTimeout(() => {
let resultsPerPage = 0;
if (this.source === '') this.source = getEngineSource(this.endpoint);
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
const error = (e: unknown): Error =>
Expand All @@ -63,7 +62,7 @@ export default class Iterator extends EventEmitter<Events> {
);
this.engine
.queryBindings(queryString, {
sources: [this.source],
sources: [(this.source ??= getEngineSource(this.endpoint))],
})
.then(stream => {
stream.on('data', (binding: Bindings) => {
Expand Down
22 changes: 9 additions & 13 deletions src/lib/Pipeline.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ let spinner: Ora;
class Pipeline {
public readonly stages = new Map<string, Stage>();
public dataDir: string;
private $isValidated = false;
private stageNames: string[] = [];
private startTime = performance.now();
private readonly destination: File | TriplyDB;
Expand Down Expand Up @@ -64,6 +63,7 @@ class Pipeline {
this.destination = isTriplyDBPathString(destinationFile)
? new TriplyDB(destinationFile).validate()
: new File(destinationFile, true).validate();
this.validate();
}

private error(e: Error, stage?: string): void {
Expand All @@ -76,7 +76,6 @@ class Pipeline {
}

public getPreviousStage(stage: Stage): Stage | undefined {
this.validate();
if (!this.stages.has(stage.name)) {
throw new Error(
`This is unexpected: missing stage "${stage.name}" in stages.`
Expand All @@ -88,18 +87,18 @@ class Pipeline {
else return this.stages.get(names[ix - 1]);
}

public validate(): void {
if (this.$isValidated) return;
let i = 0;
private validate(): void {
if (this.$configuration.stages.length === 0) {
throw new Error('Your pipeline contains no stages.');
}

if (this.$configuration.stages[0].iterator.endpoint === undefined) {
throw new Error(
'The first stage of your pipeline must have an endpoint defined for the Iterator.'
);
}

for (const stageConfiguration of this.$configuration.stages) {
if (i === 0 && stageConfiguration.iterator.endpoint === undefined) {
throw new Error(
'The first stage of your pipeline must have an endpoint defined for the Iterator.'
);
}
if (this.stages.has(stageConfiguration.name)) {
throw new Error(
`Detected a duplicate name for stage \`${stageConfiguration.name}\` in your pipeline: each stage must have a unique name.`
Expand All @@ -109,9 +108,7 @@ class Pipeline {
stageConfiguration.name,
new Stage(this, stageConfiguration)
);
i++;
}
this.$isValidated = true;
}

public get configuration(): LDWorkbenchConfiguration {
Expand All @@ -128,7 +125,6 @@ class Pipeline {
if (!(this.opts?.silent === true)) spinner.start();
let startFromStage = 0;
try {
this.validate();
if (this.opts?.startFromStageName !== undefined) {
if (/^\d+$/.test(this.opts.startFromStageName)) {
const ix = parseInt(this.opts.startFromStageName);
Expand Down
4 changes: 0 additions & 4 deletions src/lib/tests/Generator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ describe('Generator Class', () => {
expect(generator).to.have.property('query');
expect(generator).to.have.property('engine');
expect(generator).to.have.property('endpoint');
expect(generator).to.have.property('source');
});
});
describe('run', () => {
Expand Down Expand Up @@ -108,8 +107,6 @@ describe('Generator Class', () => {
};
// read file after pipeline has finished
const pipelineParallelGenerators = new Pipeline(config, {silent: true});
pipelineParallelGenerators.validate();

await pipelineParallelGenerators.run();
const file = fs.readFileSync(filePath, {encoding: 'utf-8'});
const fileLines = file.split('\n').sort();
Expand Down Expand Up @@ -148,7 +145,6 @@ describe('Generator Class', () => {
],
};
const pipelineBatch = new Pipeline(batchConfiguration, {silent: true});
pipelineBatch.validate();
pipelineBatch
.run()
.then(() => {
Expand Down
1 change: 0 additions & 1 deletion src/lib/tests/Iterator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ describe('Iterator Class', () => {
expect(iterator).to.have.property('query');
expect(iterator).to.have.property('endpoint');
expect(iterator).to.have.property('engine');
expect(iterator).to.have.property('source');
expect(iterator).to.have.property('$offset', 0);
expect(iterator).to.have.property('totalResults', 0);
});
Expand Down
14 changes: 4 additions & 10 deletions src/lib/tests/Pipeline.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ describe('Pipeline Class', () => {
expect(pipeline).to.be.an.instanceOf(Pipeline);
expect(pipeline).to.have.property('stages').that.is.a('Map');
expect(pipeline).to.have.property('dataDir').that.is.a('string');
expect(pipeline).to.have.property('$isValidated', false);
expect(pipeline).to.have.property('stageNames').that.is.an('array');
expect(pipeline).to.have.property('startTime').that.is.an('number');
expect(pipeline)
Expand Down Expand Up @@ -99,7 +98,6 @@ describe('Pipeline Class', () => {
],
};
const pipeline = new Pipeline(configuration, {silent: true});
pipeline.validate();

const stage1 = pipeline.stages.get('Stage 1')!;
const stage2 = pipeline.stages.get('Stage 2')!;
Expand Down Expand Up @@ -155,10 +153,9 @@ describe('Pipeline Class', () => {
destination: 'file://pipelines/data/example-pipeline.nt',
stages: [],
} as unknown as LDWorkbenchConfiguration;
const pipeline = new Pipeline(invalidConfiguration, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(invalidConfiguration, {silent: true});
} catch (error) {
if (error instanceof Error) {
if (error.message === 'Your pipeline contains no stages.') {
Expand Down Expand Up @@ -204,10 +201,9 @@ describe('Pipeline Class', () => {
},
],
} as unknown as LDWorkbenchConfiguration;
const pipeline = new Pipeline(invalidConfiguration, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(invalidConfiguration, {silent: true});
} catch (error) {
if (error instanceof Error) {
if (
Expand Down Expand Up @@ -259,10 +255,9 @@ describe('Pipeline Class', () => {
},
],
};
const pipeline = new Pipeline(configDuplicateStageName, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(configDuplicateStageName, {silent: true});
} catch (error) {
if (error instanceof Error) {
if (
Expand Down Expand Up @@ -315,10 +310,9 @@ describe('Pipeline Class', () => {
},
],
};
const pipeline = new Pipeline(configDuplicateStageName, {silent: true});
let failed = false;
try {
pipeline.validate();
new Pipeline(configDuplicateStageName, {silent: true});
} catch (error) {
failed = true;
if (error instanceof Error) {
Expand Down
4 changes: 0 additions & 4 deletions src/lib/tests/PreviousStage.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stage: Stage = new Stage(pipeline, config.stages[1]);
const stagesSoFar = Array.from(stage.pipeline.stages.keys());
const previousStage = new PreviousStage(stage, stagesSoFar.pop()!);
Expand Down Expand Up @@ -88,7 +87,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stage: Stage = new Stage(pipeline, config.stages[0]);
const stagesSoFar = Array.from(stage.pipeline.stages.keys());
const previousStage = new PreviousStage(stage, stagesSoFar.pop()!);
Expand Down Expand Up @@ -132,7 +130,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stageTwo: Stage = new Stage(pipeline, config.stages[1]);
const stagesSoFar = Array.from(stageTwo.pipeline.stages.keys());
const previousStage = new PreviousStage(stageTwo, stagesSoFar.pop()!); // should be stage one
Expand Down Expand Up @@ -178,7 +175,6 @@ describe('PreviousStage Class', () => {
],
};
const pipeline = new Pipeline(config, {silent: true});
pipeline.validate();
const stage: Stage = new Stage(pipeline, config.stages[1]);
const stagesSoFar = Array.from(stage.pipeline.stages.keys());
const previousStage = new PreviousStage(stage, stagesSoFar.pop()!);
Expand Down
1 change: 1 addition & 0 deletions src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ import {type QueryEngine as QueryEngineFile} from '@comunica/query-sparql-file';

export type Endpoint = File | URL | PreviousStage;
export type QueryEngine = QueryEngineSparql | QueryEngineFile;
export type QuerySource = {type?: string; value: string};
9 changes: 4 additions & 5 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,11 @@ async function main(): Promise<void> {
);
}

const pipeline = new Pipeline(configuration, {
startFromStageName: cliArgs.stage,
silent: cliArgs.silent,
});

try {
const pipeline = new Pipeline(configuration, {
startFromStageName: cliArgs.stage,
silent: cliArgs.silent,
});
await pipeline.run();
} catch (e) {
error(
Expand Down
3 changes: 1 addition & 2 deletions src/utils/getEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ export default function getEndpoint(
return new File(endpoint);
} else if (endpoint !== undefined) {
try {
// fix for GraphDB, see https://github.com/comunica/comunica/issues/962
return new URL((endpoint as string).replace(/^sparql@/, ''));
return new URL(endpoint);
} catch (e) {
throw new Error(`"${endpoint as string}" is not a valid URL`);
}
Expand Down
22 changes: 15 additions & 7 deletions src/utils/getEngineSource.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
import {isPreviousStage} from './guards.js';
import {existsSync} from 'fs';
import path from 'path';
import type {Endpoint} from '../lib/types.js';
import type {Endpoint, QuerySource} from '../lib/types.js';

export default function getEngineSource(endpoint: Endpoint): string {
let source: string;
export default function getEngineSource(endpoint: Endpoint): QuerySource {
if (isPreviousStage(endpoint)) {
const previousStage = endpoint.load();
if (!existsSync(previousStage.destinationPath)) {
throw new Error(
`The result from stage "${previousStage.name}" (${previousStage.destinationPath}) is not available, make sure to run that stage first`
);
}
source = path.resolve(previousStage.destinationPath);
} else {
source = endpoint.toString();
return {
type: 'file',
value: path.resolve(previousStage.destinationPath),
};
} else if (endpoint instanceof URL) {
return {
type: 'sparql',
value: endpoint.toString(),
};
}
return source;

return {
value: endpoint.toString(),
};
}
Loading

0 comments on commit b53c27b

Please sign in to comment.