Skip to content

Commit

Permalink
feat: Import dumps to SPARQL endpoint (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeboer authored Jul 10, 2024
1 parent fcf0adb commit e2f184a
Show file tree
Hide file tree
Showing 17 changed files with 6,353 additions and 2,323 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ A pipeline consists of one or more **stages**. Each stage has:
- one or more **generators**, which generate triples about each URI using SPARQL CONSTRUCT queries.

Stages can be chained together, with the output of one stage becoming the input of the next.
The output of each stage combined becomes the final output of the pipeline.

### Design principles

Expand Down Expand Up @@ -127,6 +128,37 @@ stages:
- query: "CONSTRUCT { $this a <https://schema.org/CreativeWork> } WHERE { $this a <https://schema.org/Book> }"
```
#### Stores
To query large local files, you may need to load them into a SPARQL store first. Do so by starting a SPARQL store,
for example Oxigraph:
```shell
docker run --rm -v $PWD/data:/data -p 7878:7878 oxigraph/oxigraph --location /data serve --bind 0.0.0.0:7878
```
Then configure the store in your pipeline, configuring at least one store under `stores`
and using the `importTo` parameter to import the `endpoint`’s data to the store,
referencing the store’s `queryUrl`:
```yaml
# config.yml
stores:
- queryUrl: "http://localhost:7878/query" # SPARQL endpoint for read queries.
storeUrl: "http://localhost:7878/store" # SPARQL Graph Store HTTP Protocol endpoint.
stages:
- name: ...
iterator:
query: ...
endpoint: data.nt
importTo: http://localhost:7878/query
generator:
- query: ...
```
The data is loaded into a named graph `<import:filename>`, so in this case `<import:data.nt>`.
#### Example configuration
```yaml
Expand Down
8 changes: 4 additions & 4 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ export default {
coverageReporters: ['json-summary', 'text'],
coverageThreshold: {
global: {
lines: 68.16,
statements: 68.5,
branches: 64.36,
functions: 75,
lines: 70.9,
statements: 71.01,
branches: 66.66,
functions: 76.59,
},
},
transform: {
Expand Down
8,145 changes: 5,911 additions & 2,234 deletions package-lock.json

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,26 @@
"typescript": "~5.4.3"
},
"dependencies": {
"@comunica/query-sparql": "^3.1.2",
"@comunica/query-sparql-file": "^3.1.2",
"@comunica/query-sparql": "^3.2.0",
"@comunica/query-sparql-file": "^3.2.0",
"@triply/triplydb": "^8.0.6",
"ajv": "^8.16.0",
"chalk": "^5.3.0",
"commander": "^12.1.0",
"glob": "^7.2.3",
"got": "^14.4.1",
"inquirer": "^9.3.0",
"js-yaml": "^4.1.0",
"lodash.clonedeep": "^4.5.0",
"lodash.kebabcase": "^4.1.1",
"millify": "^6.1.0",
"n3": "^1.17.4",
"ora": "^7.0.1",
"p-retry": "^6.2.0",
"parse-duration": "^1.1.0",
"pretty-ms": "^8.0.0",
"sparqljs": "^3.7.1"
"rdf-dereference": "^2.2.0",
"sparqljs": "^3.7.1",
"testcontainers": "^10.10.0"
}
}
25 changes: 25 additions & 0 deletions src/configuration.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export interface Configuration {
* SPARQL endpoint for the iterator. If it starts with `file://`, a local RDF file is queried. If omitted the result of the previous stage is used.
*/
endpoint?: string;
/**
* Optional name of the store this endpoint should be imported into before querying. If omitted, the endpoint is queried directly.
*/
importTo?: string;
/**
* Number of `$this` bindings retrieved per query. Defaults to the LIMIT value of your iterator query or 10 if no LIMIT is present.
*/
Expand Down Expand Up @@ -107,6 +111,10 @@ export interface Configuration {
* SPARQL endpoint for the iterator. If it starts with `file://`, a local RDF file is queried. If omitted the result of the previous stage is used.
*/
endpoint?: string;
/**
* Optional name of the store this endpoint should be imported into before querying. If omitted, the endpoint is queried directly.
*/
importTo?: string;
/**
* Number of `$this` bindings retrieved per query. Defaults to the LIMIT value of your iterator query or 10 if no LIMIT is present.
*/
Expand Down Expand Up @@ -155,4 +163,21 @@ export interface Configuration {
destination?: string;
}[]
];
/**
* A list of SPARQL 1.1 Graph Stores that can be used in the pipeline.
*/
stores?: {
/**
* The store’s query URL.
*/
queryUrl: string;
/**
* The store’s Graph Store HTTP Protocol URL.
*/
storeUrl?: string;
/**
* The store’s SPARQL Update URL.
*/
updateUrl?: string;
}[];
}
11 changes: 7 additions & 4 deletions src/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ export default class Generator extends EventEmitter<Events> {
)
);

this.endpoint =
stage.configuration.generator[this.index].endpoint === undefined
? stage.iterator.endpoint
: getEndpoint(stage, 'generator', this.index);
if (stage.configuration.generator[this.index].endpoint === undefined) {
// No endpoint configured, so use the iterator’s endpoint.
this.endpoint = stage.iterator.endpoint;
this.query = this.query.withDefaultGraph(stage.importer?.graph);
} else {
this.endpoint = getEndpoint(stage, 'generator', this.index);
}

this.engine = getEngine(this.endpoint);

Expand Down
126 changes: 126 additions & 0 deletions src/import.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import File from './file.js';
import * as querystring from 'node:querystring';
import path from 'path';
import N3, {DataFactory} from 'n3';
import rdfDereferencer from 'rdf-dereference';
import {pipeline} from 'node:stream/promises';
import got, {TimeoutError} from 'got';
import {PassThrough} from 'node:stream';
import {QueryEngine} from '@comunica/query-sparql';
import pRetry from 'p-retry';
import EventEmitter from 'node:events';
import type {NamedNode} from '@rdfjs/types';
import namedNode = DataFactory.namedNode;

interface Events {
imported: [numOfTriples: number];
end: [numOfTriples: number];
}

export class Importer extends EventEmitter<Events> {
public readonly graph: NamedNode;
constructor(
private readonly store: GraphStore,
public readonly file: File,
private readonly queryEngine: QueryEngine = new QueryEngine()
) {
super();
this.graph = namedNode(
'import:' + querystring.escape(path.basename(file.toString()))
);
}

public async run() {
if (undefined !== this.store.options.storeUrl) {
await this.importWithGraphStoreProtocol(this.store.options.storeUrl);
await this.dataLoaded();
} else {
throw new Error('Not supported');
}
}

/**
* Wait for data to be available in the SPARQL store
*/
private async dataLoaded() {
await pRetry(
async () => {
const result = await this.queryEngine.queryBindings(
`SELECT * FROM <${this.graph.value}> WHERE { ?s ?p ?o } LIMIT 1`,
{
sources: [
{
type: 'sparql',
value: this.store.options.queryUrl.toString(),
},
],
}
);
const results = await result.toArray();
if (results.length === 0) {
throw new Error('No data loaded');
}
},
{retries: 3}
);
}

public async importWithGraphStoreProtocol(url: URL) {
const graphUrl = new URL(url.toString());
graphUrl.searchParams.set('graph', this.graph.value);

const {data} = await rdfDereferencer.default.dereference(
this.file.toString(),
{localFiles: true}
);

let numOfTriples = 0;
data.on('data', () => {
numOfTriples++;
if (numOfTriples % 1000 === 0) {
this.emit('imported', numOfTriples);
}
});

const writer = new N3.StreamWriter({format: 'N-Triples'});
const request = got.stream.put(graphUrl.toString(), {
headers: {
'Content-Type': 'application/n-triples',
},
maxRedirects: 0,
timeout: {
response: 1000,
},
});

try {
await pipeline(
data,
writer,
request,
new PassThrough() // Catch response errors.
);
} catch (e) {
if (!(e instanceof TimeoutError)) {
// Ignore TimeoutErrors because some graph stores, such as Oxigraph, don’t correctly respond to the streaming request.
throw e;
}
}

this.emit('end', numOfTriples);
}

public get url() {
return this.store.options.queryUrl;
}
}

export class GraphStore {
constructor(
public readonly options: {queryUrl: URL; storeUrl?: URL; updateUrl?: URL}
) {
if (undefined === options.storeUrl && undefined === options.updateUrl) {
throw new Error('Store must have at least one of storeUrl or updateUrl');
}
}
}
26 changes: 5 additions & 21 deletions src/iterator.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import EventEmitter from 'node:events';
import sparqljs, {type SelectQuery, type VariableTerm} from 'sparqljs';
import type Stage from './stage.js';
import type {NamedNode} from '@rdfjs/types';
import getSPARQLQuery from './utils/getSPARQLQuery.js';
import {type Bindings} from '@comunica/types';
import getEndpoint from './utils/getEndpoint.js';
import type {Endpoint, QueryEngine, QuerySource} from './types.js';
import getEngine from './utils/getEngine.js';
import getEngineSource from './utils/getEngineSource.js';
import parse from 'parse-duration';
import {BaseQuery} from './sparql.js';

const DEFAULT_LIMIT = 10;
Expand All @@ -20,30 +16,18 @@ interface Events {
}

export default class Iterator extends EventEmitter<Events> {
private readonly query: Query;
public readonly endpoint: Endpoint;
private readonly engine: QueryEngine;
private readonly delay: number = 0;
private source?: QuerySource;
private offset = 0;
public totalResults = 0;

constructor(stage: Stage) {
constructor(
public readonly query: Query,
public readonly endpoint: Endpoint,
private readonly delay: number = 0
) {
super();
this.query = Query.from(
getSPARQLQuery(stage.configuration.iterator.query, 'select'),
stage.configuration.iterator.batchSize
);
this.endpoint = getEndpoint(stage);
this.engine = getEngine(this.endpoint);
if (stage.configuration.iterator.delay !== undefined) {
const delay = parse(stage.configuration.iterator.delay);
if (delay === undefined)
throw new Error(
`Error in stage \`${stage.configuration.name}\`: incorrect delay format was provided.`
);
this.delay = delay;
}
}

public async run(): Promise<void> {
Expand Down
15 changes: 3 additions & 12 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ console.info(
chalk.bold(`Welcome to LD Workbench version ${chalk.cyan(version())}`)
);

async function main(): Promise<void> {
(async () => {
const pipelines = loadPipelines(cli.config ?? './pipelines/');
const names = [...pipelines.keys()];
let configuration: Configuration | undefined;
Expand All @@ -22,7 +22,6 @@ async function main(): Promise<void> {
if (configuration === undefined) {
error(
`No pipeline named “${cli.pipeline}” was found.`,
2,
`Valid pipeline names are: ${names.map(name => `"${name}"`).join(', ')}`
);
}
Expand Down Expand Up @@ -62,14 +61,6 @@ async function main(): Promise<void> {
});
await pipeline.run();
} catch (e) {
error(
`Error in pipeline ${chalk.italic(configuration.name)}`,
5,
e as Error
);
error(`Error in pipeline ${chalk.italic(configuration.name)}`, e as Error);
}
}

main().catch(e => {
error(e as Error);
});
})();
Loading

0 comments on commit e2f184a

Please sign in to comment.