Skip to content

Commit

Permalink
Fixes Implement paralel generators for each Iterator #4
Browse files Browse the repository at this point in the history
Adds parallel approach to LDWorkbench with base of feature/3/mightymax branch
  • Loading branch information
Philippe Renzen committed Dec 29, 2023
1 parent cca0d0d commit 09edbf7
Show file tree
Hide file tree
Showing 19 changed files with 428 additions and 246 deletions.
20 changes: 11 additions & 9 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ const DEFAULT_BATCH_SIZE = 10

declare interface Generator {
on(event: "data", listener: (statement: Quad) => void): this;
on(event: "end", listener: (iterations: number, statements: number) => void): this;
on(event: "end", listener: (iterations: number, statements: number, processed: number) => void): this;
on(event: "error", listener: (e: Error) => void): this;

emit(event: "data", statement: Quad): boolean;
emit(event: "end", iterations: number, statements: number): boolean;
emit(event: "end", iterations: number, statements: number, processed: number): boolean;
emit(event: "error", e: Error): boolean;
}
class Generator extends EventEmitter {
Expand All @@ -31,17 +31,19 @@ class Generator extends EventEmitter {
private source: string = ''
private readonly $thisList: NamedNode[] = []
private readonly endpoint: Endpoint;
public constructor(private readonly stage: Stage) {
public constructor(private readonly stage: Stage, private readonly index: number) {
if (stage.configuration.generator === undefined) throw new Error('Error in Generator: no generators were present in stage configuration')
super()
this.index = index
this.query = getSPARQLQuery(
stage.configuration.generator.query,
stage.configuration.generator[this.index].query,
"construct"
);

this.endpoint =
stage.configuration.generator.endpoint === undefined
stage.configuration.generator[this.index].endpoint === undefined
? stage.iterator.endpoint
: getEndpoint(stage, "generator");
: getEndpoint(stage, "generator", this.index);

this.engine = getEngine(this.endpoint)

Expand All @@ -57,7 +59,7 @@ class Generator extends EventEmitter {
}

private get batchSize(): number {
return this.stage.configuration.generator.batchSize ?? DEFAULT_BATCH_SIZE
return this.stage.configuration.generator[this.index].batchSize ?? DEFAULT_BATCH_SIZE
}

private $thisToFilter($this: NamedNode): FilterPattern {
Expand All @@ -72,7 +74,7 @@ class Generator extends EventEmitter {
]
}
}
}
}

public run($this?: NamedNode, batchSize?: number): void {
if ($this !== undefined) this.$thisList.push($this)
Expand Down Expand Up @@ -101,7 +103,7 @@ class Generator extends EventEmitter {
})
stream.on('end', () => {
if (this.iterationsIncoming !== undefined && this.iterationsProcessed >= this.iterationsIncoming) {
this.emit('end', this.iterationsIncoming, this.statements)
this.emit('end', this.iterationsIncoming, this.statements, this.iterationsProcessed)
}
})
}).catch(e => {
Expand Down
112 changes: 78 additions & 34 deletions src/lib/LDWorkbenchConfiguration.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,45 @@ export interface LDWorkbenchConfiguration {
*/
batchSize?: number;
};
generator: {
/**
* Path (prefixed with "file://") or SPARQL Query
* that makes the generator using SPARQL construct.
*/
query: string;
/**
* The SPARQL endpoint for the generator.
* If it starts with "file://", a local RDF file is queried.
* If ommmitted the endpoint of the Iterator is used.
*/
endpoint?: string;
/**
* Overrule the generator's behaviour of fetching results for 10 bindings of $this per request.
*/
batchSize?: number;
};
/**
* @minItems 1
*/
generator: [
{
/**
* Path (prefixed with "file://") or SPARQL Query
* that makes the generator using SPARQL construct.
*/
query: string;
/**
* The SPARQL endpoint for the generator.
* If it starts with "file://", a local RDF file is queried.
* If ommmitted the endpoint of the Iterator is used.
*/
endpoint?: string;
/**
* Overrule the generator's behaviour of fetching results for 10 bindings of $this per request.
*/
batchSize?: number;
},
...{
/**
* Path (prefixed with "file://") or SPARQL Query
* that makes the generator using SPARQL construct.
*/
query: string;
/**
* The SPARQL endpoint for the generator.
* If it starts with "file://", a local RDF file is queried.
* If ommmitted the endpoint of the Iterator is used.
*/
endpoint?: string;
/**
* Overrule the generator's behaviour of fetching results for 10 bindings of $this per request.
*/
batchSize?: number;
}[]
];
/**
* The file where the results are saved.
* This is not a required property,
Expand Down Expand Up @@ -96,23 +118,45 @@ export interface LDWorkbenchConfiguration {
*/
batchSize?: number;
};
generator: {
/**
* Path (prefixed with "file://") or SPARQL Query
* that makes the generator using SPARQL construct.
*/
query: string;
/**
* The SPARQL endpoint for the generator.
* If it starts with "file://", a local RDF file is queried.
* If ommmitted the endpoint of the Iterator is used.
*/
endpoint?: string;
/**
* Overrule the generator's behaviour of fetching results for 10 bindings of $this per request.
*/
batchSize?: number;
};
/**
* @minItems 1
*/
generator: [
{
/**
* Path (prefixed with "file://") or SPARQL Query
* that makes the generator using SPARQL construct.
*/
query: string;
/**
* The SPARQL endpoint for the generator.
* If it starts with "file://", a local RDF file is queried.
* If ommmitted the endpoint of the Iterator is used.
*/
endpoint?: string;
/**
* Overrule the generator's behaviour of fetching results for 10 bindings of $this per request.
*/
batchSize?: number;
},
...{
/**
* Path (prefixed with "file://") or SPARQL Query
* that makes the generator using SPARQL construct.
*/
query: string;
/**
* The SPARQL endpoint for the generator.
* If it starts with "file://", a local RDF file is queried.
* If ommmitted the endpoint of the Iterator is used.
*/
endpoint?: string;
/**
* Overrule the generator's behaviour of fetching results for 10 bindings of $this per request.
*/
batchSize?: number;
}[]
];
/**
* The file where the results are saved.
* This is not a required property,
Expand Down
78 changes: 49 additions & 29 deletions src/lib/Stage.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,29 @@ declare interface Stage {
class Stage extends EventEmitter {
public destination: () => WriteStream
public iterator: Iterator
public generator: Generator
public generators: Generator[] = []
private totalProcessed: number

public constructor(
public readonly pipeline: Pipeline,
public readonly configuration: LDWorkbenchConfiguration['stages'][0]
) {
super()
this.totalProcessed = 0
try {
this.iterator = new Iterator(this)
} catch(e) {
throw new Error(`Error in the iterator of stage \`${configuration.name}\`: ${(e as Error).message}`)
}

try {
this.generator = new Generator(this)
} catch(e) {
throw new Error(`Error in the generator of stage \`${configuration.name}\`: ${(e as Error).message}`)
// Handle both single generator and array of generators
for (let index = 0; index < this.configuration.generator.length; index++) {
const generatorConfig = this.configuration.generator[index];
try {
this.generators.push(new Generator({...this, generators: [generatorConfig]}, index))
} catch(e) {
throw new Error(`Error in the generator of stage \`${configuration.name}\`: ${(e as Error).message}`)
}
}
this.destination = () => new File(this.destinationPath).getStream()
}
Expand All @@ -55,30 +61,44 @@ class Stage extends EventEmitter {
}

public run(): void {
let quadCount = 0
// let iteratorCount = 0
const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' })

this.generator.on('data', quad => {
writer.addQuad(quad)
quadCount ++
this.emit('generatorResult', quadCount)
})

this.generator.on('error', e => {
this.emit('error', e)
})

this.iterator.on('data', $this => {
this.generator.run($this)
this.emit('iteratorResult', $this)
})

this.iterator.on('error', e => {
this.emit('error', e)
})
this.iterator.run()
}
const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' });
let quadCount = 0;

this.generators.forEach(generator => {
generator.on('data', (quad) => {
writer.addQuad(quad);
quadCount++;
this.emit('generatorResult', quadCount);
});

generator.on('end', (iterationsIncoming, statements, processed) => {
this.totalProcessed += processed
if (this.totalProcessed >= (iterationsIncoming * this.configuration.generator.length)){
this.emit('end', iterationsIncoming, statements);
}
});

generator.on('error', e => {
this.emit('error', e)
})
});

this.iterator.on('data', ($this) => {
this.generators.forEach(generator => {
generator.run($this);
});
this.emit('iteratorResult', $this);
});

this.iterator.on('error', e => {
this.emit('error', e)
})

// Start the iterator
this.iterator.run();


}

}

Expand Down
Loading

0 comments on commit 09edbf7

Please sign in to comment.