Skip to content

Commit

Permalink
Blocker File class not writing batch file properly
Browse files Browse the repository at this point in the history
  • Loading branch information
Philippe Renzen committed Dec 20, 2023
1 parent 9355e4c commit 1694e27
Show file tree
Hide file tree
Showing 7 changed files with 1,504 additions and 66 deletions.
3 changes: 1 addition & 2 deletions src/lib/File.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ export default class File {

public async write(pipeline: Pipeline, spinner: Ora): Promise<void> {
const destinationStream = this.getStream()
console.log('🪵 | file: File.class.ts:54 | File | write | destinationStream:', destinationStream)
const stageNames = Array.from(pipeline.stages.keys())
for (const stageName of stageNames) {
if (spinner !== undefined) spinner.suffixText = chalk.bold(stageName)
// @mightymax the buffer seems to be a lot smaller for the batch pipeline
// @mightymax the buffer seems to be a lot smaller for the batch pipeline (sometimes)
readFile(pipeline.stages.get(stageName)!.destinationPath, (error, buffer) => {
console.log('🪵 | file: File.class.ts:59 | File | readFile | buffer:', buffer)
if(error !== null) throw error
Expand Down
1 change: 0 additions & 1 deletion src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class Generator extends EventEmitter {
if ((this.$this.length === this.batchSize) || this.batchSize === undefined) {
// getting batch SPARQL query string
const queryString = getBatchSPARQLQueryString(this.query, this.$this)

// Clearing batch Named Node targets array when it is the size of the batchSize
this.$this = []

Expand Down
207 changes: 145 additions & 62 deletions src/lib/tests/Generator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,25 @@ import * as fs from 'fs';
chai.use(chaiAsPromised)
const expect = chai.expect

function compareFiles(file1Path: string, file2Path: string): boolean {
const file1Content = fs.readFileSync(file1Path, 'utf-8');
const file2Content = fs.readFileSync(file2Path, 'utf-8');

const file1Lines = file1Content.split('\n').sort();
console.log('🪵 | file: Generator.class.test.ts:20 | compareFiles | file1Lines:', file1Lines)
const file2Lines = file2Content.split('\n').sort();
console.log('🪵 | file: Generator.class.test.ts:22 | compareFiles | file2Lines:', file2Lines)
// function compareFiles(file1Path: string, file2Path: string): boolean {
// const file1Content = fs.readFileSync(file1Path, 'utf-8');
// const file2Content = fs.readFileSync(file2Path, 'utf-8');

const areLinesEqual = JSON.stringify(file1Lines) === JSON.stringify(file2Lines);
// const file1Lines = file1Content.split('\n').sort();
// const file2Lines = file2Content.split('\n').sort();

if (!areLinesEqual) {
const diffLines = file1Lines.filter(line => !file2Lines.includes(line));
console.log('🪵 | file: Generator.class.test.ts:28 | compareFiles | diffLines:', diffLines)
// changed to console.error so test won't throw in CI
console.error(`Files are different. Lines in ${file1Path} that are not in ${file2Path}:\n${diffLines.join('\n')}`);
}
// const areLinesEqual = JSON.stringify(file1Lines) === JSON.stringify(file2Lines);

return true;
}
// if (!areLinesEqual) {
// const diffLines = file1Lines.filter(line => !file2Lines.includes(line));
// console.log('🪵 | file: Generator.class.test.ts:28 | compareFiles | diffLines:', diffLines)
// // changed to console.error so test won't throw in CI
// console.error(`Files are different. Lines in ${file1Path} that are not in ${file2Path}:\n${diffLines.join('\n')}`);
// }

// return true;
// }


describe.only('Generator Class', () => {
Expand All @@ -51,83 +50,167 @@ describe.only('Generator Class', () => {
});
// BUG when both the generator and iterator tests are running, it seems the iterator will never terminate
describe('run', () => {
it.only('Should work with batch processing', async function (){
this.timeout(200000)
const file1Path = 'pipelines/data/example-pipeline.nt';
const file2Path = 'pipelines/data/example-pipelineBatch.nt';
// @mightymax uncomment this to see File class returning a resolve before it finished
// afterEach(async ()=>{
// async function removeFile(filePath: string): Promise<void> {
// return new Promise((resolve, reject) => {
// // Use fs.unlink to remove the file
// fs.unlink(filePath, (err) => {
// if (err != null) {
// reject(err);
// } else {
// resolve();
// }
// });
// });
// }

// const filePath = 'src/lib/tests/data/example-pipelineBatch.nt';
// if(fs.existsSync(filePath)){
// await removeFile(filePath)
// }
// })
it.only('Should work in single batch pipeline', async function (){
this.timeout(3000)
const filePath = 'src/lib/tests/data/example-pipelineBatch.nt';


const configuration: LDWorkbenchConfiguration = {
name: 'Example Pipeline',
description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n',
destination: "file://"+file1Path,
stages: [
{
name: 'Stage 1',
iterator: {
query: 'file://static/example/iterator-stage-1.rq',
endpoint: 'file://static/test/iris.nt'
},
generator: {
query: 'file://static/example/generator-stage-1.rq',
}
}
]
}
const batchConfiguration: LDWorkbenchConfiguration = {
name: 'Example Pipeline Batch',
description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n',
destination: "file://"+file2Path,
destination: "file://"+filePath,
stages: [
{
name: 'Stage 1',
iterator: {
query: 'file://static/example/iterator-stage-1.rq',
endpoint: 'file://static/test/iris.nt'
// endpoint: 'file://static/test/iris-small.nt'
},
generator: {
query: 'file://static/example/generator-stage-1.rq',
// adjust batchsize for test here
batchSize: 4
batchSize: 3
}
}
]
}

const pipeline = new Pipeline(configuration)
const pipelineBatch = new Pipeline(batchConfiguration)
async function runPipelineWithPromise(): Promise<boolean> {
let pipelineEnd = false
let batchPipelineEnd = false
return new Promise((resolve, reject) => {
pipeline.addListener('error', (error) => {
reject(error);
});
// running the normal pipeline
pipeline.run().then(_ => {
// waiting for the "end" event to be emitted
pipeline.addListener('end', () => {
pipelineEnd = true
// running the batch pipeline
pipelineBatch.run().then(_ => {
// waiting for the "end" event to be emitted
pipelineBatch.addListener('end', () => {
batchPipelineEnd = true
pipelineBatch.on('end', () => {
// @mightymax even the "end" event is emitted in Pipeline (which should happen when writeResult has finished), the file seems to not have finished writing
// on the first run it cannot find the file, since it has not been written
// on the second run it will show the file it created from the run before

if (pipelineEnd && batchPipelineEnd){
resolve(true)
// if you uncomment out the afterEach for these tests, it will delete the file and you will notice that it will fail each time

// when iris.nt (the endpoint) is small (8 instances) it seems never to be able to write the file with batch processing
// => this can be tested by using iris-small.nt in the configuration above

// notice that the byte size is significantly larger for batch processing than for single processing
// the byte size of the buffer is also inconsistent for batch, but becomes consistent when batch is not used
// see: src/lib/File.class.ts
try {
const fileContent = fs.readFileSync(filePath, {encoding:'utf8'});
const fileLines = fileContent.split('\n').sort();
console.log('🪵 | file: Generator.class.test.ts:106 | pipelineBatch.addListener | fileLines:', fileLines)
batchPipelineEnd = true
if (batchPipelineEnd){
resolve(true)
}
} catch (error) {
console.error(error)
}
});
}).catch(_ => {})
});
}).catch(_ => {})
}).catch(e => {reject(e)})
});
}
await runPipelineWithPromise()
// BUG should return true but doesnt due to the File class's write() which returns a smaller buffer size for the batch pipeline
// expect(compareFiles(file1Path, file2Path)).to.equal(true)
compareFiles(file1Path, file2Path)
}
)


})
// it.skip('Should work with batch processing: normal pipeline compared to batch pipeline', async function (){
// this.timeout(200000)
// const file1Path = 'pipelines/data/example-pipeline.nt';
// const file2Path = 'pipelines/data/example-pipelineBatch.nt';

// const configuration: LDWorkbenchConfiguration = {
// name: 'Example Pipeline',
// description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n',
// destination: "file://"+file1Path,
// stages: [
// {
// name: 'Stage 1',
// iterator: {
// query: 'file://static/example/iterator-stage-1.rq',
// endpoint: 'file://static/test/iris.nt'
// },
// generator: {
// query: 'file://static/example/generator-stage-1.rq',
// }
// }
// ]
// }
// const batchConfiguration: LDWorkbenchConfiguration = {
// name: 'Example Pipeline Batch',
// description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n',
// destination: "file://"+file2Path,
// stages: [
// {
// name: 'Stage 1',
// iterator: {
// query: 'file://static/example/iterator-stage-1.rq',
// endpoint: 'file://static/test/iris.nt'
// },
// generator: {
// query: 'file://static/example/generator-stage-1.rq',
// // adjust batchsize for test here
// batchSize: 4
// }
// }
// ]
// }

// const pipeline = new Pipeline(configuration)
// const pipelineBatch = new Pipeline(batchConfiguration)
// async function runPipelineWithPromise(): Promise<boolean> {
// let pipelineEnd = false
// let batchPipelineEnd = false
// return new Promise((resolve, reject) => {
// pipeline.addListener('error', (error) => {
// reject(error);
// });
// // running the normal pipeline
// pipeline.run().then(_ => {
// // waiting for the "end" event to be emitted
// pipeline.on('end', () => {
// pipelineEnd = true
// // running the batch pipeline
// pipelineBatch.run().then(_ => {
// // waiting for the "end" event to be emitted
// pipelineBatch.on('end', () => {
// batchPipelineEnd = true

// if (pipelineEnd && batchPipelineEnd){
// resolve(true)
// }
// });
// }).catch(_ => {})
// });
// }).catch(_ => {})
// });
// }
// await runPipelineWithPromise()
// // BUG should return true but doesnt due to the File class's write() which returns a smaller buffer size for the batch pipeline
// // expect(compareFiles(file1Path, file2Path)).to.equal(true)
// compareFiles(file1Path, file2Path)
// }
// )
it.skip('should emit "data" and "end" events with the correct number of statements', async () => {
const configuration = parseYamlFile('./static/example/config.yml')
const pipeline = new Pipeline(configuration)
Expand Down
Loading

0 comments on commit 1694e27

Please sign in to comment.