Skip to content

Commit

Permalink
WIP batch processing
Browse files Browse the repository at this point in the history
BUG with quadCount
  • Loading branch information
Philippe Renzen committed Dec 14, 2023
1 parent 26c6bbd commit 6917487
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 288 deletions.
59 changes: 30 additions & 29 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,6 @@ class Generator extends EventEmitter {
this.engine = getEngine(this.endpoint)
}

// clean up function, in case batch processing is used and there are leftovers in batchArrayOfNamedNodes
public end(): void {
let numberOfStatements = 0
if((this.batchArrayOfNamedNodes !== undefined) && (this.batchArrayOfNamedNodes?.length > 0)){
console.log('TEST - leftovers!')
const queryString = getBatchSPARQLQueryString(this.query, this.batchArrayOfNamedNodes)
// Clearing batch Named Node targets array when it is the size of the batchSize
this.batchArrayOfNamedNodes = []

// TODO turn this into a function
this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}`)
})

}

}

public run($this: NamedNode): void {
let numberOfStatements = 0
if (this.source === '') this.source = getEngineSource(this.endpoint)
Expand All @@ -94,7 +66,6 @@ class Generator extends EventEmitter {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
console.log('🪵 | file: Generator.class.ts:103 | Generator | stream.on | quad:', quad)
numberOfStatements++
this.emit('data', quad)
})
Expand Down Expand Up @@ -129,6 +100,36 @@ class Generator extends EventEmitter {
})
}
}

// clean up function, in case batch processing is used and there are leftovers in batchArrayOfNamedNodes
public end(): void {
let numberOfStatements = 0
if((this.batchArrayOfNamedNodes !== undefined) && (this.batchArrayOfNamedNodes?.length !== 0)){
const queryString = getBatchSPARQLQueryString(this.query, this.batchArrayOfNamedNodes)
// Clearing batch Named Node targets array when it is the size of the batchSize
this.batchArrayOfNamedNodes = []

// TODO turn this into a function
this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
}).catch(_ => {
throw new Error(`The Generator did not run successfully, it could not get the results from the endpoint ${this.source}`)
})

}
else{
// REVIEW this doesn't seem to work - Stage class is not listening to this event
this.emit('end', numberOfStatements)
}
}
}

export default Generator
36 changes: 31 additions & 5 deletions src/lib/Stage.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ class Stage extends EventEmitter {
super()
try {
this.iterator = new Iterator(this)
} catch(e) {
} catch (e) {
throw new Error(`Error in the iterator of stage \`${configuration.name}\`: ${(e as Error).message}`)
}

try {
this.generator = new Generator(this)
} catch(e) {
} catch (e) {
throw new Error(`Error in the generator of stage \`${configuration.name}\`: ${(e as Error).message}`)
}
this.destination = () => new File(this.destinationPath).getStream()
Expand All @@ -59,22 +59,48 @@ class Stage extends EventEmitter {
const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' })
this.generator.on('data', quad => {
writer.addQuad(quad)
console.log('🪵 | file: Stage.class.ts:62 | Stage | run | quad:', quad)
quadCount ++
quadCount++
this.emit('generatorResult', quadCount)
})
this.generator.on('end', _ => {
generatorCount++
// eslint-disable-next-line @typescript-eslint/dot-notation
const numberOfLeftoverNamedNodes = this.generator["batchArrayOfNamedNodes"]!.length
console.log('🪵 | file: Stage.class.ts:70 | Stage | run | numberOfLeftoverNamedNodes:', numberOfLeftoverNamedNodes)
const batchGeneratorCount = generatorCount * this.configuration.generator.batchSize! + numberOfLeftoverNamedNodes
console.log('🪵 | file: Stage.class.ts:71 | Stage | run | generatorCount:', generatorCount)
console.log('🪵 | file: Stage.class.ts:71 | Stage | run | this.configuration.generator.batchSize!:', this.configuration.generator.batchSize!)
console.log('🪵 | file: Stage.class.ts:69 | Stage | run | batchGeneratorCount:', batchGeneratorCount)
console.log('🪵 | file: Stage.class.ts:77 | Stage | run | iteratorCount:', iteratorCount)
if (generatorCount === iteratorCount) {
console.log('🪵 | file: Stage.class.ts:70 | Stage | run | quadCount:', quadCount)
this.emit('end', iteratorCount, quadCount)
}
// with batchsize, the number of the generatorCount is the number of batchSize times smaller + the leftover elements that did not fit into the batch
else if (batchGeneratorCount === iteratorCount) {
if (numberOfLeftoverNamedNodes !== 0){
// clean up generator and process quads
this.generator.end()
this.generator.on('data', quad => {
writer.addQuad(quad)
quadCount++
this.emit('generatorResult', quadCount)
})
this.generator.on('end', _ => {
console.log('🪵 | file: Stage.class.ts:83 | Stage | run | quadCount:', quadCount)
this.emit('end', iteratorCount, quadCount)
})
}else{
console.log('🪵 | file: Stage.class.ts:83 | Stage | run | quadCount:', quadCount)
this.emit('end', iteratorCount, quadCount)
}
}
})
this.iterator.on('data', $this => {
this.generator.run($this)
this.emit('iteratorResult', $this)
})
this.iterator.on('end', count => {
this.generator.end()
iteratorCount = count
})
this.iterator.run()
Expand Down
16 changes: 2 additions & 14 deletions src/lib/tests/Generator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe.only('Generator Class', () => {
});
// BUG when both the generator and iterator tests are running, it seems the iterator will never terminate
describe('run', () => {
it.skip('Should work with batch processing', async function (){
it.only('Should work with batch processing', async function (){
this.timeout(500000)
const configuration: LDWorkbenchConfiguration = {
name: 'Example Pipeline',
Expand All @@ -43,19 +43,9 @@ describe.only('Generator Class', () => {
},
generator: {
query: 'file://static/example/generator-stage-1.rq',
// adjust batchsize for test here
batchSize: 10
}
},
{
name: 'Stage 2',
iterator: {
query: 'file://static/example/iterator-stage-2.rq',
},
generator: {
query: 'file://static/example/generator-stage-2.rq',
endpoint: 'https://query.wikidata.org/sparql',
batchSize: 1
}
}
]
}
Expand All @@ -66,10 +56,8 @@ describe.only('Generator Class', () => {
async function runGeneratorWithPromise(): Promise<boolean> {
return new Promise((resolve, reject) => {
stage.addListener('generatorResult', (quad) => {

Check failure on line 58 in src/lib/tests/Generator.class.test.ts

View workflow job for this annotation

GitHub Actions / install-build-test (20.10.0)

'quad' is declared but its value is never read.
console.log('🪵 | file: Generator.class.test.ts:70 | stage.addListener | quad:', quad)
});
stage.addListener('end', (numResults) => {

Check failure on line 60 in src/lib/tests/Generator.class.test.ts

View workflow job for this annotation

GitHub Actions / install-build-test (20.10.0)

'numResults' is declared but its value is never read.
console.log('🪵 | file: Generator.class.test.ts:74 | stage.addListener | numResults:', numResults)
resolve(true)
});
stage.addListener('error', (error) => {
Expand Down
Loading

0 comments on commit 6917487

Please sign in to comment.