Skip to content

Commit

Permalink
fix: Race condition between iterator and generators (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeboer authored May 27, 2024
1 parent 0117c0b commit 0c7d07d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 51 deletions.
83 changes: 43 additions & 40 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ class Generator extends EventEmitter {
private readonly query: ConstructQuery;
private readonly engine: QueryEngine;
private iterationsProcessed: number = 0
private iterationsIncoming?: number
private iterationsIncoming: number = 0
private statements: number = 0
private source: string = ''
private readonly $thisList: NamedNode[] = []
private $thisList: NamedNode[] = []
private readonly endpoint: Endpoint;
// private iteratorEnded: boolean = false;
public constructor(private readonly stage: Stage, private readonly index: number) {
if (stage.configuration.generator === undefined) throw new Error('Error in Generator: no generators were present in stage configuration')
super()
Expand All @@ -46,57 +47,59 @@ class Generator extends EventEmitter {

this.engine = getEngine(this.endpoint)

stage.iterator.on('end', count => {
this.iterationsIncoming = count
for (const $this of this.$thisList) {
this.run($this, this.$thisList.length)
}
stage.iterator.on('end', _count => {
this.flush();
})
}

public run($this: NamedNode, batchSize?: number): void {
this.$thisList.push($this)
this.iterationsIncoming++;
if (this.$thisList.length >= (batchSize ?? this.batchSize)) {
this.runBatch(this.$thisList);
this.$thisList = [];
}
}

private get batchSize(): number {
return this.stage.configuration.generator[this.index].batchSize ?? DEFAULT_BATCH_SIZE
}


public run($this?: NamedNode, batchSize?: number): void {
if ($this !== undefined) this.$thisList.push($this)
private runBatch(batch: NamedNode[]): void {
const error = (e: any): Error => new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}: ${(e as Error).message}`)
if (this.$thisList.length >= (batchSize ?? this.batchSize)) {
if (this.source === '') this.source = getEngineSource(this.endpoint)
const unionQuery = getSPARQLQuery(getSPARQLQueryString(this.query), "construct");
const patterns = unionQuery.where ?? [];
const valuePatterns: ValuePatternRow[] = []
for (const $this of this.$thisList) {
this.iterationsProcessed++
valuePatterns.push({'?this': $this})
}
patterns.push({ type: 'values', values: valuePatterns });
unionQuery.where = [{ type: 'group', patterns }]
if (this.source === '') this.source = getEngineSource(this.endpoint)
const unionQuery = getSPARQLQuery(getSPARQLQueryString(this.query), "construct");
const patterns = unionQuery.where ?? [];
const valuePatterns: ValuePatternRow[] = []
for (const $this of batch) {
valuePatterns.push({'?this': $this})
}
patterns.push({ type: 'values', values: valuePatterns });
unionQuery.where = [{ type: 'group', patterns }]

this.engine.queryQuads(getSPARQLQueryString(unionQuery), {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
this.statements++
this.emit('data', quad)
})
stream.on('error', (e) => {
this.emit("error", error(e))
})
stream.on('end', () => {
if (this.iterationsIncoming !== undefined && this.iterationsProcessed >= this.iterationsIncoming) {
this.emit('end', this.iterationsIncoming, this.statements, this.iterationsProcessed)
}
})
}).catch(e => {
this.engine.queryQuads(getSPARQLQueryString(unionQuery), {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
this.statements++
this.emit('data', quad)
})
stream.on('error', (e) => {
this.emit("error", error(e))
})
this.$thisList.length = 0
}
stream.on('end', () => {
this.iterationsProcessed += batch.length;
this.emit('end', this.iterationsIncoming, this.statements, this.iterationsProcessed);
})
}).catch(e => {
this.emit("error", error(e))
})
}

private flush(): void {
this.runBatch(this.$thisList);
}
}


export default Generator
export default Generator
22 changes: 12 additions & 10 deletions src/lib/Stage.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Stage extends EventEmitter {
public destination: () => WriteStream
public iterator: Iterator
public generators: Generator[] = []
private iteratorEnded: boolean = false;

public constructor(
public readonly pipeline: Pipeline,
Expand Down Expand Up @@ -57,18 +58,17 @@ class Stage extends EventEmitter {
public get name(): string {
return this.configuration.name
}

public run(): void {
const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' });
let quadCount = 0;

const generatorProcessedCounts = new Map<number, number>();
let generatorsFinished = 0;
let quadsGenerated = 0;

const checkEnd = (iterationsIncoming: number, statements: number): void => {
// Check if all generators have processed all iterations
if (generatorsFinished === this.configuration.generator.length) {
if (![...generatorProcessedCounts].some(([_, processed]) => processed < iterationsIncoming)
&& this.iteratorEnded) {
this.emit('end', iterationsIncoming, statements);
}
};
Expand All @@ -84,10 +84,7 @@ class Stage extends EventEmitter {
});

generator.on('end', (iterationsIncoming, statements, processed) => {
generatorProcessedCounts.set(index, generatorProcessedCounts.get(index)! + processed);
if (generatorProcessedCounts.get(index)! >= iterationsIncoming) {
generatorsFinished++;
}
generatorProcessedCounts.set(index, processed);
checkEnd(iterationsIncoming, statements);
});

Expand All @@ -101,7 +98,12 @@ class Stage extends EventEmitter {
generator.run($this);
});
this.emit('iteratorResult', $this, quadsGenerated);
});
});

this.iterator.on('end', (_count) => {
this.iteratorEnded = true;
});


this.iterator.on('error', e => {
this.emit('error', e)
Expand All @@ -115,4 +117,4 @@ class Stage extends EventEmitter {

}

export default Stage
export default Stage
2 changes: 1 addition & 1 deletion src/lib/tests/Generator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ describe('Generator Class', () => {
await pipelineParallelGenerators.run()
const file = fs.readFileSync(filePath, {encoding: 'utf-8'})
const fileLines = file.split('\n').sort()
expect(fileLines.length).to.equal(873)
expect(fileLines.length).to.equal(741)
expect(fileLines[0]).to.equal('')
expect(fileLines[1]).to.equal('<http://dbpedia.org/resource/Iris_setosa> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Thing> .')
expect(fileLines[fileLines.length - 1]).to.equal('<https://triplydb.com/triply/iris/id/floweringPlant/00150> <https://schema.org/name> "Instance 150 of the Iris Virginica"@en .')
Expand Down

0 comments on commit 0c7d07d

Please sign in to comment.