Skip to content

Commit

Permalink
refactor: Use async/await pattern (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeboer authored May 31, 2024
1 parent b53c27b commit 4004959
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 72 deletions.
74 changes: 40 additions & 34 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export default class Generator extends EventEmitter<Events> {
private $thisList: NamedNode[] = [];
private readonly endpoint: Endpoint;
private source?: QuerySource;

public constructor(
private readonly stage: Stage,
private readonly index: number
Expand All @@ -48,17 +49,19 @@ export default class Generator extends EventEmitter<Events> {

this.engine = getEngine(this.endpoint);

stage.iterator.on('end', () => {
this.flush();
stage.iterator.on('end', async () => {
await this.flush();
});
}

public run($this: NamedNode, batchSize?: number): void {
public async run($this: NamedNode): Promise<void> {
// Prevent duplicates from added to $thisList, but immediately run any query that is batched.
this.$thisList.push($this);
this.iterationsIncoming++;
if (this.$thisList.length >= (batchSize ?? this.batchSize)) {
this.runBatch(this.$thisList);
if (this.$thisList.length >= this.batchSize) {
const batch = this.$thisList;
this.$thisList = [];
await this.runBatch(batch);
}
}

Expand All @@ -69,12 +72,11 @@ export default class Generator extends EventEmitter<Events> {
);
}

private runBatch(batch: NamedNode[]): void {
private async runBatch(batch: NamedNode[]): Promise<void> {
const error = (e: unknown): Error =>
new Error(
`The Generator did not run successfully, it could not get the results from the endpoint ${
this.source
}: ${(e as Error).message}`
`The Generator did not run successfully, it could not get the results from the endpoint ${this
.source?.value}: ${(e as Error).message}`
);
const unionQuery = getSPARQLQuery(
getSPARQLQueryString(this.query),
Expand All @@ -88,34 +90,38 @@ export default class Generator extends EventEmitter<Events> {
patterns.push({type: 'values', values: valuePatterns});
unionQuery.where = [{type: 'group', patterns}];

this.engine
.queryQuads(getSPARQLQueryString(unionQuery), {
sources: [(this.source ??= getEngineSource(this.endpoint))],
})
.then(stream => {
stream.on('data', (quad: Quad) => {
this.statements++;
this.emit('data', quad);
});
stream.on('error', e => {
this.emit('error', error(e));
});
stream.on('end', () => {
this.iterationsProcessed += batch.length;
this.emit(
'end',
this.iterationsIncoming,
this.statements,
this.iterationsProcessed
);
});
})
.catch(e => {
try {
const stream = await this.engine.queryQuads(
getSPARQLQueryString(unionQuery),
{
sources: [(this.source ??= getEngineSource(this.endpoint))],
}
);

stream.on('data', (quad: Quad) => {
this.statements++;
this.emit('data', quad);
});
stream.on('error', e => {
this.emit('error', error(e));
// reject(e);
});
stream.on('end', () => {
// resolve();
this.iterationsProcessed += batch.length;
this.emit(
'end',
this.iterationsIncoming,
this.statements,
this.iterationsProcessed
);
});
} catch (e) {
this.emit('error', error(e));
}
}

private flush(): void {
this.runBatch(this.$thisList);
private async flush(): Promise<void> {
await this.runBatch(this.$thisList);
}
}
65 changes: 32 additions & 33 deletions src/lib/Iterator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ export default class Iterator extends EventEmitter<Events> {
}
}

public run(): void {
setTimeout(() => {
public async run(): Promise<void> {
setTimeout(async () => {
let resultsPerPage = 0;
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
Expand All @@ -60,41 +60,40 @@ export default class Iterator extends EventEmitter<Events> {
(e as Error).message
}`
);
this.engine
.queryBindings(queryString, {
try {
const stream = await this.engine.queryBindings(queryString, {
sources: [(this.source ??= getEngineSource(this.endpoint))],
})
.then(stream => {
stream.on('data', (binding: Bindings) => {
resultsPerPage++;
if (!binding.has('this'))
throw new Error('Missing binding $this in the Iterator result.');
const $this = binding.get('this')!;
if ($this.termType !== 'NamedNode') {
throw new Error(
`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`
);
} else {
this.emit('data', $this);
}
});
stream.on('end', () => {
this.totalResults += resultsPerPage;
this.$offset += this.query.limit!;
if (resultsPerPage < this.query.limit!) {
this.emit('end', this.totalResults);
} else {
this.run();
}
});
});

stream.on('data', (binding: Bindings) => {
resultsPerPage++;
if (!binding.has('this'))
throw new Error('Missing binding $this in the Iterator result.');
const $this = binding.get('this')!;
if ($this.termType !== 'NamedNode') {
throw new Error(
`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`
);
} else {
this.emit('data', $this);
}
});
stream.on('end', () => {
this.totalResults += resultsPerPage;
this.$offset += this.query.limit!;
if (resultsPerPage < this.query.limit!) {
this.emit('end', this.totalResults);
} else {
this.run();
}
});

stream.on('error', e => {
this.emit('error', error(e));
});
})
.catch(e => {
stream.on('error', e => {
this.emit('error', error(e));
});
} catch (e) {
this.emit('error', error(e));
}
}, this.delay);
}
}
8 changes: 3 additions & 5 deletions src/lib/Stage.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export default class Stage extends EventEmitter<Events> {
return this.configuration.name;
}

public run(): void {
public async run(): Promise<void> {
const writer = new Writer(this.destination(), {
end: false,
format: 'N-Triples',
Expand Down Expand Up @@ -106,10 +106,8 @@ export default class Stage extends EventEmitter<Events> {
});
});

this.iterator.on('data', $this => {
this.generators.forEach(generator => {
generator.run($this);
});
this.iterator.on('data', async $this => {
await Promise.all(this.generators.map(generator => generator.run($this)));
this.emit('iteratorResult', $this, quadsGenerated);
});

Expand Down

0 comments on commit 4004959

Please sign in to comment.