Skip to content

Commit

Permalink
export: refactor zip stream to work with backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
wacky6 committed Jul 23, 2021
1 parent f107453 commit 959a72d
Showing 1 changed file with 123 additions and 66 deletions.
189 changes: 123 additions & 66 deletions route/export.js
Original file line number Diff line number Diff line change
Expand Up @@ -803,83 +803,140 @@ const addExtensionFromMime = (name, mime) => {
return `${name}.${getExtension(mime)}`
}

const archiverAppendDbImage = (archiver, db, imageId, name) => {
return db.collection('image').findOne({ _id: imageId }).then(
image => image
? archiver.append(image.buffer.buffer, { name: addExtensionFromMime(name, image.mime), date: image.created })
: null
// `generateFn` is provided with the following object to
// maintain backpressure:
//
// {
// shouldStop(): returns whether the stream is ended
// (e.g. when client connection is lost). generateFn
// should exit after this returns true.
//
// append(): same as archiver's append(), except it
// returns after the entry has been processed
//
// finalize(): same as archiver's finalize()
// }
function streamZipArchive(generateFn) {
const archiver = Archiver('zip', {store: true})
const outStream = archiver.pipe(new PassThrough())

// Defer the following execution after we return, so Koa can start
// streaming the response instead of holding it in memory
;(async function() {
let streamEnded = false
outStream.once('close', () => streamEnded = true)

generateFn({
shouldStop() { return streamEnded },
append(...args) {
archiver.append(...args)
return new Promise(resolve => archiver.once('entry', resolve))
},
finalize() { return archiver.finalize() }
})
})()

return outStream
}

const appendDbImageToZipStream = async (zipStream, db, imageId, name) => {
const image = await db.collection('image').findOne({ _id: imageId })

if (!image)
return

return zipStream.append(
image.buffer.buffer,
{ name: addExtensionFromMime(name, image.mime), date: image.created }
)
}

route.get('/export/committees/photos',
TokenAccessFilter(AccessFilter('finance', 'admin')),
async ctx => {
let archiver = Archiver('zip', {store: true})
ctx.status = 200
ctx.set('content-type', 'application/zip;charset=utf-8')
ctx.body = archiver.pipe(new PassThrough())
const committees = await ctx.db.collection('committee').aggregate(LOOKUP_COMMITTEE).toArray()
const createName = NameCreator()
for (let committee of committees) {
const name = createName(GV(committee, 'role') + '-' + GV(committee, 'contact.name'))
await archiverAppendDbImage(archiver, ctx.db, committee.photoId, name)
}
archiver.finalize()
ctx.body = streamZipArchive(async (zipStream) => {
const committees = await ctx.db.collection('committee').aggregate(LOOKUP_COMMITTEE).toArray()
const createName = NameCreator()
for (let committee of committees) {
if (zipStream.shouldStop())
break

const name = createName(GV(committee, 'role') + '-' + GV(committee, 'contact.name'))
await appendDbImageToZipStream(zipStream, ctx.db, committee.photoId, name)
}
zipStream.finalize()
})
}
)

route.get('/export/daises/photos',
TokenAccessFilter(AccessFilter('finance', 'admin')),
async ctx => {
let archiver = Archiver('zip', {store: true})
ctx.status = 200
ctx.set('content-type', 'application/zip;charset=utf-8')
ctx.body = archiver.pipe(new PassThrough())
const daises = await ctx.db.collection('dais').aggregate(LOOKUP_DAIS).toArray()
const createName = NameCreator()
for (let dais of daises) {
const name = createName(GV(dais, 'role') + '-' + GV(dais, 'contact.name'))
await archiverAppendDbImage(archiver, ctx.db, dais.photoId, name)
}
archiver.finalize()
ctx.body = streamZipArchive(async (zipStream) => {
const daises = await ctx.db.collection('dais').aggregate(LOOKUP_DAIS).toArray()
const createName = NameCreator()
for (let dais of daises) {
if (zipStream.shouldStop())
break

const name = createName(GV(dais, 'role') + '-' + GV(dais, 'contact.name'))
await appendDbImageToZipStream(zipStream, ctx.db, dais.photoId, name)
}
zipStream.finalize()
})
}
)

route.get('/export/representatives/photos',
TokenAccessFilter(AccessFilter('finance', 'admin')),
async ctx => {
let archiver = Archiver('zip', {store: true})
ctx.status = 200
ctx.set('content-type', 'application/zip;charset=utf-8')
ctx.body = archiver.pipe(new PassThrough())
const representatives = await ctx.db.collection('representative').aggregate(LOOKUP_REPRESENTATIVE, AGGREGATE_OPTS).toArray()
const createName = NameCreator()
for (let representative of representatives) {
const name = createName(GV(representative, 'school.school.name') + '-' + GV(representative, 'contact.name') + '-' + GV(representative, 'identification.number'))
await archiverAppendDbImage(archiver, ctx.db, representative.avatar_image, name)
}
archiver.finalize()
ctx.body = streamZipArchive(async (zipStream) => {
const representatives = await ctx.db.collection('representative').aggregate(LOOKUP_REPRESENTATIVE, AGGREGATE_OPTS).toArray()
const createName = NameCreator()
for (let representative of representatives) {
if (zipStream.shouldStop())
break

const name = createName(GV(representative, 'school.school.name') + '-' + GV(representative, 'contact.name') + '-' + GV(representative, 'identification.number'))
await appendDbImageToZipStream(zipStream, ctx.db, representative.avatar_image, name)
}
zipStream.finalize()
})
}
)

route.get('/export/representatives/disclaimers',
TokenAccessFilter(AccessFilter('finance', 'admin')),
async ctx => {
let archiver = Archiver('zip', {store: true})
ctx.status = 200
ctx.set('content-type', 'application/zip;charset=utf-8')
ctx.body = archiver.pipe(new PassThrough())
const representatives = await ctx.db.collection('representative').aggregate(LOOKUP_REPRESENTATIVE, AGGREGATE_OPTS).toArray()
const createName = NameCreator()
for (let representative of representatives) {
const name = createName(
GV(representative, 'school.school.name') + '-'
+ (GV(representative, 'contact.name') || GV(representative, '_id')) + '-'
+ GV(representative, 'identification.number')
)
await archiverAppendDbImage(archiver, ctx.db, representative.disclaimer_image, name)
}
archiver.finalize()
ctx.body = streamZipArchive(async (zipStream) => {
const representatives = await ctx.db.collection('representative')
.aggregate(LOOKUP_REPRESENTATIVE, AGGREGATE_OPTS)
.toArray()
const createName = NameCreator()

for (let representative of representatives) {
if (zipStream.shouldStop())
break

const name = createName(
GV(representative, 'school.school.name') + '-'
+ (GV(representative, 'contact.name') || GV(representative, '_id')) + '-'
+ GV(representative, 'identification.number')
)

await appendDbImageToZipStream(zipStream, ctx.db, representative.disclaimer_image, name)
}

zipStream.finalize()
})
}
)

Expand All @@ -899,30 +956,30 @@ route.get('/export/daises/reimbursements',
route.get('/export/daises/reimbursement-credentials',
TokenAccessFilter(AccessFilter('finance', 'admin')),
async ctx => {
const STATES_TO_EXPORT = ['approved', 'completed']
let archiver = Archiver('zip', {store: true})
ctx.status = 200
ctx.set('content-type', 'application/zip;charset=utf-8')
ctx.body = archiver.pipe(new PassThrough())
const daises = await ctx.db.collection('dais').aggregate(LOOKUP_DAIS_REIMBURSEMENT, AGGREGATE_OPTS).toArray()
const createName = NameCreator()
for (let dais of daises) {
const inboundState = GV(dais, 'reimbursement.inbound.state')
if (STATES_TO_EXPORT.includes(inboundState)) {
const inboundCreds = GV(dais, 'reimbursement.inbound.credential') || []
const name = createName(GV(dais, 'contact.name') + '-来程')
for (let photoId of inboundCreds)
await archiverAppendDbImage(archiver, ctx.db, photoId, name)
}
const outboundState = GV(dais, 'reimbursement.outbound.state')
if (STATES_TO_EXPORT.includes(outboundState)) {
const outboundCreds = GV(dais, 'reimbursement.outbound.credential') || []
const name = createName(GV(dais, 'contact.name') + '-回程')
for (let photoId of outboundCreds)
await archiverAppendDbImage(archiver, ctx.db, photoId, name)
ctx.body = streamZipArchive(async (zipStream) => {
const STATES_TO_EXPORT = ['approved', 'completed']
const daises = await ctx.db.collection('dais').aggregate(LOOKUP_DAIS_REIMBURSEMENT, AGGREGATE_OPTS).toArray()
const createName = NameCreator()
for (let dais of daises) {
const inboundState = GV(dais, 'reimbursement.inbound.state')
if (STATES_TO_EXPORT.includes(inboundState)) {
const inboundCreds = GV(dais, 'reimbursement.inbound.credential') || []
const name = createName(GV(dais, 'contact.name') + '-来程')
for (let photoId of inboundCreds)
await appendDbImageToZipStream(zipStream, ctx.db, photoId, name)
}
const outboundState = GV(dais, 'reimbursement.outbound.state')
if (STATES_TO_EXPORT.includes(outboundState)) {
const outboundCreds = GV(dais, 'reimbursement.outbound.credential') || []
const name = createName(GV(dais, 'contact.name') + '-回程')
for (let photoId of outboundCreds)
await appendDbImageToZipStream(zipStream, ctx.db, photoId, name)
}
}
}
archiver.finalize()
zipStream.finalize()
})
}
)

Expand Down

0 comments on commit 959a72d

Please sign in to comment.