From 4004959b0a19485e4e2661ca21accfb6cc98e13d Mon Sep 17 00:00:00 2001 From: David de Boer Date: Fri, 31 May 2024 14:25:20 +0200 Subject: [PATCH] refactor: Use async/await pattern (#69) --- src/lib/Generator.class.ts | 74 ++++++++++++++++++++------------------ src/lib/Iterator.class.ts | 65 +++++++++++++++++---------------- src/lib/Stage.class.ts | 8 ++--- 3 files changed, 75 insertions(+), 72 deletions(-) diff --git a/src/lib/Generator.class.ts b/src/lib/Generator.class.ts index 15b4f07..27b2927 100644 --- a/src/lib/Generator.class.ts +++ b/src/lib/Generator.class.ts @@ -26,6 +26,7 @@ export default class Generator extends EventEmitter { private $thisList: NamedNode[] = []; private readonly endpoint: Endpoint; private source?: QuerySource; + public constructor( private readonly stage: Stage, private readonly index: number @@ -48,17 +49,19 @@ export default class Generator extends EventEmitter { this.engine = getEngine(this.endpoint); - stage.iterator.on('end', () => { - this.flush(); + stage.iterator.on('end', async () => { + await this.flush(); }); } - public run($this: NamedNode, batchSize?: number): void { + public async run($this: NamedNode): Promise { + // Prevent duplicates from added to $thisList, but immediately run any query that is batched. this.$thisList.push($this); this.iterationsIncoming++; - if (this.$thisList.length >= (batchSize ?? this.batchSize)) { - this.runBatch(this.$thisList); + if (this.$thisList.length >= this.batchSize) { + const batch = this.$thisList; this.$thisList = []; + await this.runBatch(batch); } } @@ -69,12 +72,11 @@ export default class Generator extends EventEmitter { ); } - private runBatch(batch: NamedNode[]): void { + private async runBatch(batch: NamedNode[]): Promise { const error = (e: unknown): Error => new Error( - `The Generator did not run successfully, it could not get the results from the endpoint ${ - this.source - }: ${(e as Error).message}` + `The Generator did not run successfully, it could not get the results from the endpoint ${this + .source?.value}: ${(e as Error).message}` ); const unionQuery = getSPARQLQuery( getSPARQLQueryString(this.query), @@ -88,34 +90,38 @@ export default class Generator extends EventEmitter { patterns.push({type: 'values', values: valuePatterns}); unionQuery.where = [{type: 'group', patterns}]; - this.engine - .queryQuads(getSPARQLQueryString(unionQuery), { - sources: [(this.source ??= getEngineSource(this.endpoint))], - }) - .then(stream => { - stream.on('data', (quad: Quad) => { - this.statements++; - this.emit('data', quad); - }); - stream.on('error', e => { - this.emit('error', error(e)); - }); - stream.on('end', () => { - this.iterationsProcessed += batch.length; - this.emit( - 'end', - this.iterationsIncoming, - this.statements, - this.iterationsProcessed - ); - }); - }) - .catch(e => { + try { + const stream = await this.engine.queryQuads( + getSPARQLQueryString(unionQuery), + { + sources: [(this.source ??= getEngineSource(this.endpoint))], + } + ); + + stream.on('data', (quad: Quad) => { + this.statements++; + this.emit('data', quad); + }); + stream.on('error', e => { this.emit('error', error(e)); + // reject(e); + }); + stream.on('end', () => { + // resolve(); + this.iterationsProcessed += batch.length; + this.emit( + 'end', + this.iterationsIncoming, + this.statements, + this.iterationsProcessed + ); }); + } catch (e) { + this.emit('error', error(e)); + } } - private flush(): void { - this.runBatch(this.$thisList); + private async flush(): Promise { + await this.runBatch(this.$thisList); } } diff --git a/src/lib/Iterator.class.ts b/src/lib/Iterator.class.ts index a5c3dbf..2b2a5da 100644 --- a/src/lib/Iterator.class.ts +++ b/src/lib/Iterator.class.ts @@ -47,8 +47,8 @@ export default class Iterator extends EventEmitter { } } - public run(): void { - setTimeout(() => { + public async run(): Promise { + setTimeout(async () => { let resultsPerPage = 0; this.query.offset = this.$offset; const queryString = getSPARQLQueryString(this.query); @@ -60,41 +60,40 @@ export default class Iterator extends EventEmitter { (e as Error).message }` ); - this.engine - .queryBindings(queryString, { + try { + const stream = await this.engine.queryBindings(queryString, { sources: [(this.source ??= getEngineSource(this.endpoint))], - }) - .then(stream => { - stream.on('data', (binding: Bindings) => { - resultsPerPage++; - if (!binding.has('this')) - throw new Error('Missing binding $this in the Iterator result.'); - const $this = binding.get('this')!; - if ($this.termType !== 'NamedNode') { - throw new Error( - `Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.` - ); - } else { - this.emit('data', $this); - } - }); - stream.on('end', () => { - this.totalResults += resultsPerPage; - this.$offset += this.query.limit!; - if (resultsPerPage < this.query.limit!) { - this.emit('end', this.totalResults); - } else { - this.run(); - } - }); + }); + + stream.on('data', (binding: Bindings) => { + resultsPerPage++; + if (!binding.has('this')) + throw new Error('Missing binding $this in the Iterator result.'); + const $this = binding.get('this')!; + if ($this.termType !== 'NamedNode') { + throw new Error( + `Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.` + ); + } else { + this.emit('data', $this); + } + }); + stream.on('end', () => { + this.totalResults += resultsPerPage; + this.$offset += this.query.limit!; + if (resultsPerPage < this.query.limit!) { + this.emit('end', this.totalResults); + } else { + this.run(); + } + }); - stream.on('error', e => { - this.emit('error', error(e)); - }); - }) - .catch(e => { + stream.on('error', e => { this.emit('error', error(e)); }); + } catch (e) { + this.emit('error', error(e)); + } }, this.delay); } } diff --git a/src/lib/Stage.class.ts b/src/lib/Stage.class.ts index 5185cf1..7f1ed30 100644 --- a/src/lib/Stage.class.ts +++ b/src/lib/Stage.class.ts @@ -67,7 +67,7 @@ export default class Stage extends EventEmitter { return this.configuration.name; } - public run(): void { + public async run(): Promise { const writer = new Writer(this.destination(), { end: false, format: 'N-Triples', @@ -106,10 +106,8 @@ export default class Stage extends EventEmitter { }); }); - this.iterator.on('data', $this => { - this.generators.forEach(generator => { - generator.run($this); - }); + this.iterator.on('data', async $this => { + await Promise.all(this.generators.map(generator => generator.run($this))); this.emit('iteratorResult', $this, quadsGenerated); });