Skip to content

Commit

Permalink
Merge pull request #2616 from ideafast/perf/upload-large-file
Browse files Browse the repository at this point in the history
Perf/upload large file
  • Loading branch information
wsy19961129 authored Nov 19, 2024
2 parents 7698f21 + a6240cd commit 44c1785
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 40 deletions.
2 changes: 1 addition & 1 deletion packages/itmat-apis/src/graphql/resolvers/userResolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class UserResolvers {

async validateResetPassword(_parent, args: { token: string, encryptedEmail: string }) {
try {
return await this.userCore.validateResetPassword(args.token, args.encryptedEmail);
return await this.userCore.validateResetPassword(args.encryptedEmail, args.token);
} catch (e) {
return GraphQLErrorDecroator(e as CoreError);
}
Expand Down
8 changes: 5 additions & 3 deletions packages/itmat-apis/src/trpc/dataProcedure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const CreateFieldInputSchema = z.object({
unit: z.optional(z.string()),
comments: z.optional(z.string()),
verifier: z.optional(z.array(z.array(ZValueVerifier))),
properties: z.optional(z.array(ZFieldProperty))
properties: z.optional(z.array(ZFieldProperty)),
metadata: z.optional(z.record(z.string(), z.unknown()))
});

const EditFieldInputSchema = CreateFieldInputSchema;
Expand Down Expand Up @@ -108,7 +109,8 @@ export class DataRouter {
unit: opts.input.unit,
comments: opts.input.comments,
verifier: opts.input.verifier,
properties: opts.input.properties
properties: opts.input.properties,
metadata: opts.input.metadata
});
}),
/**
Expand Down Expand Up @@ -288,7 +290,7 @@ export class DataRouter {
*/
getFiles: this.baseProcedure.input(z.object({
studyId: z.string(),
versionId: z.optional(z.string()),
versionId: z.optional(z.union([z.string(), z.null(), z.array(z.union([z.string(), z.null()]))])),
fieldIds: z.optional(z.array(z.string())),
readable: z.optional(z.boolean()),
useCache: z.optional(z.boolean()),
Expand Down
49 changes: 48 additions & 1 deletion packages/itmat-cores/src/coreFunc/dataCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ interface CreateFieldInput {
comments?: string;
verifier?: ValueVerifierInput[][];
properties?: IFieldProperty[];
metadata?: Record<string, unknown>;
}

type EditFieldInput = CreateFieldInput;
Expand Down Expand Up @@ -255,7 +256,7 @@ export class DataCore {
deletedTime: null,
deletedUser: null
},
metadata: {}
metadata: fieldInput.metadata ?? {}
};

await this.db.collections.field_dictionary_collection.insertOne(fieldEntry);
Expand Down Expand Up @@ -1539,6 +1540,52 @@ export class DataCore {
}
}

public async uploadFileDataWithFileEntry(requester: IUserWithoutToken | undefined, studyId: string, fieldId: string, fileEntry: IFile, properties?: string) {
if (!requester) {
throw new CoreError(
enumCoreErrors.NOT_LOGGED_IN,
enumCoreErrors.NOT_LOGGED_IN
);
}
const roles = await this.permissionCore.getRolesOfUser(requester, requester.id, studyId);
if (roles.length === 0) {
throw new CoreError(
enumCoreErrors.NO_PERMISSION_ERROR,
enumCoreErrors.NO_PERMISSION_ERROR
);
}
const study = await this.db.collections.studies_collection.findOne({ 'id': studyId, 'life.deletedTime': null });
if (!study) {
throw new CoreError(
enumCoreErrors.CLIENT_ACTION_ON_NON_EXISTENT_ENTRY,
'Study does not exist.'
);
}
try {
const parsedProperties = properties ? JSON.parse(properties) : {};
const dataInput: IDataInput[] = [{
fieldId: fieldId,
value: fileEntry.id,
properties: parsedProperties
}];
const res = await this.uploadData(requester, studyId, dataInput);
if (!res[0].successful) {
throw new CoreError(
enumCoreErrors.CLIENT_MALFORMED_INPUT,
res[0].description ?? 'Failed to upload file.'
);
}
// invalidate the cache
await this.db.collections.cache_collection.updateMany({ 'keys.studyId': studyId, 'keys.query': 'getStudyFiles' }, { $set: { status: enumCacheStatus.OUTDATED } });
return fileEntry;
} catch (error) {
throw new CoreError(
enumCoreErrors.CLIENT_MALFORMED_INPUT,
`${(error as Error).message}`
);
}
}

/**
* Get the summary of a study.
* Admins can study managers can access this function.
Expand Down
3 changes: 2 additions & 1 deletion packages/itmat-cores/src/utils/GraphQL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ export function convertV2CreateFieldInputToV3(studyId: string, fields: V2CreateF
required: false
}],
unit: field.unit,
comments: field.comments
comments: field.comments,
metadata: field.metadata
};
});
}
Expand Down
131 changes: 130 additions & 1 deletion packages/itmat-interface/src/server/commonMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import jwt from 'jsonwebtoken';
import { userRetrieval } from '@itmat-broker/itmat-cores';
import { DataCore, DataTransformationCore, FileCore, PermissionCore, userRetrieval, UtilsCore } from '@itmat-broker/itmat-cores';
import { db } from '../database/database';
import Busboy from 'busboy';
import crypto from 'crypto';
import { v4 as uuid } from 'uuid';
import { enumConfigType, IStudyConfig, defaultSettings, enumFileTypes, IFile, enumFileCategories, CoreError } from '@itmat-broker/itmat-types';
import { objStore } from '../objStore/objStore';
import { PassThrough } from 'stream';

const fileCore = new FileCore(db, objStore);
const permissionCore = new PermissionCore(db);
const utilsCore = new UtilsCore();
const dataTransformationCore = new DataTransformationCore(utilsCore);
const dataCore = new DataCore(db, objStore, fileCore, permissionCore, utilsCore, dataTransformationCore);

export const tokenAuthentication = async (token: string) => {
if (token !== '') {
Expand Down Expand Up @@ -32,3 +44,120 @@ export const tokenAuthentication = async (token: string) => {
return null;
}
};

export async function uploadFileData(req, res) {
const busboy = new Busboy({ headers: req.headers });
const variables: Record<string, string> = {}; // To hold form fields
let fileName: string;
let fileSize = 0;
const config = await db.collections.configs_collection.findOne({ type: enumConfigType.STUDYCONFIG, key: variables['studyId'] });
const fileConfig = config ? config.properties : defaultSettings.studyConfig;
const fileSizeLimit = (fileConfig as IStudyConfig).defaultMaximumFileSize;
const hash_ = crypto.createHash('sha256');

busboy.on('field', (fieldname, val) => {
variables[fieldname] = val; // Capture fields
});
let passThrough: PassThrough;
// Capture file stream and upload it immediately
busboy.on('file', (fieldname, file, filename) => {
fileName = filename; // Store the filename
const fileUri = uuid(); // Generate unique file identifier

passThrough = new PassThrough(); // Create a passthrough stream

// Start the MinIO upload using the PassThrough stream
// const minioUploadPromise = objStore.uploadFile(passThrough, variables['studyId'], fileUri, fileSize);

// Listen for data chunks to calculate size and hash while piping to MinIO
file.on('data', (chunk) => {
fileSize += chunk.length; // Increment file size
hash_.update(chunk); // Update the hash with the chunk of data

if (fileSize > fileSizeLimit) {
file.destroy(); // Stop the stream if the file size exceeds the limit
passThrough.end(); // End the passThrough to stop MinIO upload
res.status(400).json({ message: 'File size exceeds the limit' });
return;
}

// Pass the chunk to MinIO via the PassThrough stream
passThrough.write(chunk);
});

// When the file stream ends
file.on('end', async () => {
try {
// Upload file to MinIO
const minioUploadPromise = objStore.uploadFile(passThrough, variables['studyId'], fileUri, fileSize);
passThrough.end(); // Signal the end of the PassThrough stream
await minioUploadPromise;
} catch (err: unknown) {
// Return a response with the error message
return res.status(500).json({ message: 'Error uploading file to MinIO', error: { message: (err as CoreError).message } });
}

// Hash the file and proceed with the file entry creation
const hashString = hash_.digest('hex');
const fileType = (filename.split('.').pop() as string).toUpperCase();
if (!Object.keys(enumFileTypes).includes(fileType)) {
return res.status(400).json({ error: { message: `File type ${fileType} not supported.` } });
}

// Create the file entry object
const fileEntry: IFile = {
id: uuid(),
studyId: variables['studyId'],
userId: null,
fileName: fileName,
fileSize: fileSize,
description: variables['description'],
uri: fileUri,
hash: hashString,
fileType: fileType as enumFileTypes,
fileCategory: enumFileCategories.STUDY_DATA_FILE,
properties: variables['properties'] ? JSON.parse(variables['properties']) : {},
sharedUsers: [],
life: {
createdTime: Date.now(),
createdUser: req.user.id,
deletedTime: null,
deletedUser: null
},
metadata: {}
};
try {

// Perform any additional processing and insert file data into the database
const response = await dataCore.uploadFileDataWithFileEntry(req.user,
variables['studyId'],
variables['fieldId'],
fileEntry,
JSON.stringify({
...JSON.parse(variables['properties'] ?? '{}'),
FileName: fileName
})
);

await db.collections.files_collection.insertOne(fileEntry);
// Send success response
res.status(200).json({ result: { data: response } });
} catch (err: unknown) {
// Handle any error during processing or insertion
return res.status(400).json({ message: 'Failed to upload file.', error: { message: (err as CoreError).message } });
}
});

file.on('error', (err) => {
return res.status(400).json({ message: 'Failed to upload file.', error: { message: (err as CoreError).message } });
});
});

// When Busboy finishes processing
busboy.on('finish', () => {
// No need to respond here; we already send the response after upload completion
});

// Pipe the request into Busboy to handle the file stream
req.pipe(busboy);
}
8 changes: 7 additions & 1 deletion packages/itmat-interface/src/server/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import { logPluginInstance } from '../log/logPlugin';
import { IConfiguration, spaceFixing } from '@itmat-broker/itmat-cores';
import { userLoginUtils } from '../utils/userLoginUtils';
import * as trpcExpress from '@trpc/server/adapters/express';
import { tokenAuthentication } from './commonMiddleware';
import { tokenAuthentication, uploadFileData } from './commonMiddleware';
import multer from 'multer';
import { Readable } from 'stream';
import { z } from 'zod';
import { ApolloServerContext, DMPContext, createtRPCContext, typeDefs } from '@itmat-broker/itmat-apis';
import { APICalls } from './helper';


export class Router {
private readonly app: Express;
private readonly server: http.Server;
Expand Down Expand Up @@ -317,6 +318,10 @@ export class Router {

this.app.use('/webdav', webdav_proxy as NativeRequestHandler);

this.app.use('/trpc/data.uploadStudyFileData', (req, res, next) => {
uploadFileData(req, res).catch(next); // Ensure any error is passed to next()
});

// trpc
const upload = multer();
this.app.use(
Expand Down Expand Up @@ -376,4 +381,5 @@ export class Router {
public getServer(): http.Server {
return this.server;
}

}
Loading

0 comments on commit 44c1785

Please sign in to comment.