Skip to content

Commit

Permalink
Providing better error reporting is some cases,
Browse files Browse the repository at this point in the history
  • Loading branch information
mightymax committed Dec 6, 2023
1 parent 7fd6af2 commit b08d2da
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 45 deletions.
4 changes: 2 additions & 2 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class Generator extends EventEmitter {
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(e => {
throw e as Error
}).catch(_ => {
throw new Error(`The Generator did not run succesfully, it could not get the results from the endpoint ${this.source}`)
})
}
}
Expand Down
93 changes: 50 additions & 43 deletions src/lib/Iterator.class.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
/* eslint-disable @typescript-eslint/method-signature-style */
import EventEmitter from 'node:events';
import EventEmitter from "node:events";
import type { SelectQuery } from "sparqljs";
import type Stage from "./Stage.class.js";
import type { NamedNode } from "@rdfjs/types";
import getSPARQLQuery from "../utils/getSPARQLQuery.js";
import { type Bindings } from "@comunica/types";
import getSPARQLQueryString from "../utils/getSPARQLQueryString.js";
import getEndpoint from '../utils/getEndpoint.js';
import type { Endpoint, QueryEngine } from './types.js';
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import getEndpoint from "../utils/getEndpoint.js";
import type { Endpoint, QueryEngine } from "./types.js";
import getEngine from "../utils/getEngine.js";
import getEngineSource from "../utils/getEngineSource.js";

const DEFAULT_LIMIT = 10;

declare interface Iterator {
on(event: "data", listener: ($this: NamedNode) => void): this;
on(event: "end", listener: (numResults: number) => void): this;
Expand All @@ -21,57 +20,65 @@ declare interface Iterator {
emit(event: "end", numResults: number): boolean;
}

class Iterator
extends EventEmitter
{
class Iterator extends EventEmitter {
private readonly query: SelectQuery;
public readonly endpoint: Endpoint;
private readonly engine: QueryEngine;
private source: string = ''
private source: string = "";
private $offset = 0;
private totalResults = 0
private totalResults = 0;

constructor(stage: Stage) {
super()
super();
this.query = getSPARQLQuery(stage.configuration.iterator.query, "select");
this.query.limit = stage.configuration.iterator.batchSize ?? DEFAULT_LIMIT;
this.endpoint = getEndpoint(stage)
this.engine = getEngine(this.endpoint)
this.query.limit =
this.query.limit ??
stage.configuration.iterator.batchSize ??
DEFAULT_LIMIT;
this.endpoint = getEndpoint(stage);
this.engine = getEngine(this.endpoint);
}

public run(): void {
let resultsPerPage = 0
if (this.source === '') this.source = getEngineSource(this.endpoint)
let resultsPerPage = 0;
if (this.source === "") this.source = getEngineSource(this.endpoint);
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
this.engine.queryBindings(queryString, {
sources: [this.source],
}).then(stream => {
stream.on('data', (binding: Bindings) => {
resultsPerPage++
if (!binding.has('this')) throw new Error('Missing binding $this in the Iterator result.')
const $this = binding.get('this')!
if ($this.termType !== 'NamedNode') {
throw new Error(`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`)
} else {
this.emit('data', $this)
}
});
this.engine
.queryBindings(queryString, {
sources: [this.source],
})
.then((stream) => {
stream.on("data", (binding: Bindings) => {
resultsPerPage++;
if (!binding.has("this"))
throw new Error("Missing binding $this in the Iterator result.");
const $this = binding.get("this")!;
if ($this.termType !== "NamedNode") {
throw new Error(
`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`
);
} else {
this.emit("data", $this);
}
});

stream.on('end', () => {
this.totalResults += resultsPerPage
this.$offset += this.query.limit!
if (resultsPerPage < this.query.limit!) {
this.emit('end', this.totalResults)
} else {
this.run()
}
stream.on("end", () => {
this.totalResults += resultsPerPage;
this.$offset += this.query.limit!;
if (resultsPerPage < this.query.limit!) {
this.emit("end", this.totalResults);
} else {
this.run();
}
});
})
.catch((_) => {
throw new Error(
`The Iterator did not run succesfully, it could not get the results from the endpoint ${this.source} (offset: ${this.$offset}, limit ${this.query.limit})`
);
});
})
.catch(e => {
throw e
})
}
}

export default Iterator
export default Iterator;

0 comments on commit b08d2da

Please sign in to comment.