Skip to content

Commit

Permalink
perf: Boost the performance of uploading large files
Browse files Browse the repository at this point in the history
  • Loading branch information
wsy19961129 committed Oct 29, 2024
1 parent cd13c11 commit f006b1a
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 35 deletions.
2 changes: 1 addition & 1 deletion packages/itmat-apis/src/graphql/resolvers/userResolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class UserResolvers {

async validateResetPassword(_parent, args: { token: string, encryptedEmail: string }) {
try {
return await this.userCore.validateResetPassword(args.token, args.encryptedEmail);
return await this.userCore.validateResetPassword(args.encryptedEmail, args.token);
} catch (e) {
return GraphQLErrorDecroator(e as CoreError);
}
Expand Down
46 changes: 46 additions & 0 deletions packages/itmat-cores/src/coreFunc/dataCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,52 @@ export class DataCore {
}
}

public async uploadFileDataWithFileEntry(requester: IUserWithoutToken | undefined, studyId: string, fieldId: string, fileEntry: IFile, properties?: string) {
if (!requester) {
throw new CoreError(
enumCoreErrors.NOT_LOGGED_IN,
enumCoreErrors.NOT_LOGGED_IN
);
}
const roles = await this.permissionCore.getRolesOfUser(requester, requester.id, studyId);
if (roles.length === 0) {
throw new CoreError(
enumCoreErrors.NO_PERMISSION_ERROR,
enumCoreErrors.NO_PERMISSION_ERROR
);
}
const study = await this.db.collections.studies_collection.findOne({ 'id': studyId, 'life.deletedTime': null });
if (!study) {
throw new CoreError(
enumCoreErrors.CLIENT_ACTION_ON_NON_EXISTENT_ENTRY,
'Study does not exist.'
);
}
try {
const parsedProperties = properties ? JSON.parse(properties) : {};
const dataInput: IDataInput[] = [{
fieldId: fieldId,
value: fileEntry.id,
properties: parsedProperties
}];
const res = await this.uploadData(requester, studyId, dataInput);
if (!res[0].successful) {
throw new CoreError(
enumCoreErrors.CLIENT_MALFORMED_INPUT,
res[0].description ?? 'Failed to upload file.'
);
}
// invalidate the cache
await this.db.collections.cache_collection.updateMany({ 'keys.studyId': studyId, 'keys.query': 'getStudyFiles' }, { $set: { status: enumCacheStatus.OUTDATED } });
return fileEntry;
} catch (error) {
throw new CoreError(
enumCoreErrors.CLIENT_MALFORMED_INPUT,
`${(error as Error).message}`
);
}
}

/**
* Get the summary of a study.
* Admins can study managers can access this function.
Expand Down
131 changes: 130 additions & 1 deletion packages/itmat-interface/src/server/commonMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import jwt from 'jsonwebtoken';
import { userRetrieval } from '@itmat-broker/itmat-cores';
import { DataCore, DataTransformationCore, FileCore, PermissionCore, userRetrieval, UtilsCore } from '@itmat-broker/itmat-cores';
import { db } from '../database/database';
import Busboy from 'busboy';
import crypto from 'crypto';
import { v4 as uuid } from 'uuid';
import { enumConfigType, IStudyConfig, defaultSettings, enumFileTypes, IFile, enumFileCategories, CoreError } from '@itmat-broker/itmat-types';
import { objStore } from '../objStore/objStore';
import { PassThrough } from 'stream';

const fileCore = new FileCore(db, objStore);
const permissionCore = new PermissionCore(db);
const utilsCore = new UtilsCore();
const dataTransformationCore = new DataTransformationCore(utilsCore);
const dataCore = new DataCore(db, objStore, fileCore, permissionCore, utilsCore, dataTransformationCore);

export const tokenAuthentication = async (token: string) => {
if (token !== '') {
Expand Down Expand Up @@ -32,3 +44,120 @@ export const tokenAuthentication = async (token: string) => {
return null;
}
};

export async function uploadFileData(req, res) {
const busboy = new Busboy({ headers: req.headers });
const variables: Record<string, string> = {}; // To hold form fields
let fileName: string;
let fileSize = 0;
const config = await db.collections.configs_collection.findOne({ type: enumConfigType.STUDYCONFIG, key: variables['studyId'] });
const fileConfig = config ? config.properties : defaultSettings.studyConfig;
const fileSizeLimit = (fileConfig as IStudyConfig).defaultMaximumFileSize;
const hash_ = crypto.createHash('sha256');

busboy.on('field', (fieldname, val) => {
variables[fieldname] = val; // Capture fields
});
let passThrough: PassThrough;
// Capture file stream and upload it immediately
busboy.on('file', (fieldname, file, filename) => {
fileName = filename; // Store the filename
const fileUri = uuid(); // Generate unique file identifier

passThrough = new PassThrough(); // Create a passthrough stream

// Start the MinIO upload using the PassThrough stream
// const minioUploadPromise = objStore.uploadFile(passThrough, variables['studyId'], fileUri, fileSize);

// Listen for data chunks to calculate size and hash while piping to MinIO
file.on('data', (chunk) => {
fileSize += chunk.length; // Increment file size
hash_.update(chunk); // Update the hash with the chunk of data

if (fileSize > fileSizeLimit) {
file.destroy(); // Stop the stream if the file size exceeds the limit
passThrough.end(); // End the passThrough to stop MinIO upload
res.status(400).json({ message: 'File size exceeds the limit' });
return;
}

// Pass the chunk to MinIO via the PassThrough stream
passThrough.write(chunk);
});

// When the file stream ends
file.on('end', async () => {
try {
// Upload file to MinIO
const minioUploadPromise = objStore.uploadFile(passThrough, variables['studyId'], fileUri, fileSize);
passThrough.end(); // Signal the end of the PassThrough stream
await minioUploadPromise;
} catch (err: unknown) {
// Return a response with the error message
return res.status(500).json({ message: 'Error uploading file to MinIO', error: { message: (err as CoreError).message } });
}

// Hash the file and proceed with the file entry creation
const hashString = hash_.digest('hex');
const fileType = (filename.split('.').pop() as string).toUpperCase();
if (!Object.keys(enumFileTypes).includes(fileType)) {
return res.status(400).json({ error: { message: `File type ${fileType} not supported.` } });
}

// Create the file entry object
const fileEntry: IFile = {
id: uuid(),
studyId: variables['studyId'],
userId: null,
fileName: fileName,
fileSize: fileSize,
description: variables['description'],
uri: fileUri,
hash: hashString,
fileType: fileType as enumFileTypes,
fileCategory: enumFileCategories.STUDY_DATA_FILE,
properties: variables['properties'] ? JSON.parse(variables['properties']) : {},
sharedUsers: [],
life: {
createdTime: Date.now(),
createdUser: req.user.id,
deletedTime: null,
deletedUser: null
},
metadata: {}
};
try {

// Perform any additional processing and insert file data into the database
const response = await dataCore.uploadFileDataWithFileEntry(req.user,
variables['studyId'],
variables['fieldId'],
fileEntry,
JSON.stringify({
...JSON.parse(variables['properties'] ?? '{}'),
FileName: fileName
})
);

await db.collections.files_collection.insertOne(fileEntry);
// Send success response
res.status(200).json({ result: { data: response } });
} catch (err: unknown) {
// Handle any error during processing or insertion
return res.status(400).json({ message: 'Failed to upload file.', error: { message: (err as CoreError).message } });
}
});

file.on('error', (err) => {
return res.status(400).json({ message: 'Failed to upload file.', error: { message: (err as CoreError).message } });
});
});

// When Busboy finishes processing
busboy.on('finish', () => {
// No need to respond here; we already send the response after upload completion
});

// Pipe the request into Busboy to handle the file stream
req.pipe(busboy);
}
8 changes: 7 additions & 1 deletion packages/itmat-interface/src/server/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import { logPluginInstance } from '../log/logPlugin';
import { IConfiguration, spaceFixing } from '@itmat-broker/itmat-cores';
import { userLoginUtils } from '../utils/userLoginUtils';
import * as trpcExpress from '@trpc/server/adapters/express';
import { tokenAuthentication } from './commonMiddleware';
import { tokenAuthentication, uploadFileData } from './commonMiddleware';
import multer from 'multer';
import { Readable } from 'stream';
import { z } from 'zod';
import { ApolloServerContext, DMPContext, createtRPCContext, typeDefs } from '@itmat-broker/itmat-apis';
import { APICalls } from './helper';


export class Router {
private readonly app: Express;
private readonly server: http.Server;
Expand Down Expand Up @@ -317,6 +318,10 @@ export class Router {

this.app.use('/webdav', webdav_proxy as NativeRequestHandler);

this.app.use('/trpc/data.uploadStudyFileData', (req, res, next) => {
uploadFileData(req, res).catch(next); // Ensure any error is passed to next()
});

// trpc
const upload = multer();
this.app.use(
Expand Down Expand Up @@ -376,4 +381,5 @@ export class Router {
public getServer(): http.Server {
return this.server;
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import React, { FunctionComponent, useState } from 'react';
import { Button, Table, List, Modal, Upload, Form, Select, Input, notification, message, Typography, Tooltip } from 'antd';
import { Progress, Button, Table, List, Modal, Upload, Form, Select, Input, notification, message, Typography, Tooltip } from 'antd';
import { CloudDownloadOutlined, InboxOutlined, NumberOutlined } from '@ant-design/icons';
import { enumConfigType, IStudyConfig, IStudy, IField, enumDataTypes, IStudyFileBlock, enumUserTypes, IUserWithoutToken, deviceTypes, enumStudyBlockColumnValueType, IFile } from '@itmat-broker/itmat-types';
import LoadSpinner from '../../../reusable/loadSpinner';
import css from './fileRepo.module.css';
import { trpc } from '../../../../utils/trpc';
import { convertFileListToApiFormat, formatBytes, stringCompareFunc, tableColumnRender } from '../../../../utils/tools';
import { formatBytes, stringCompareFunc, tableColumnRender } from '../../../../utils/tools';
import { UploadChangeParam } from 'antd/lib/upload';
import { RcFile, UploadFile } from 'antd/lib/upload/interface';
import axios from 'axios';
import { validate } from '@ideafast/idgen';
import dayjs from 'dayjs';
import Highlighter from 'react-highlight-words';
import ClipLoader from 'react-spinners/ClipLoader';
import { ResponsiveLine } from '@nivo/line';
import { ResponsiveBar } from '@nivo/bar';
import { useQueryClient } from '@tanstack/react-query';

const { Option } = Select;

export const FileRepositoryTabContent: FunctionComponent<{ study: IStudy }> = ({ study }) => {
Expand Down Expand Up @@ -76,15 +74,14 @@ export const FileRepositoryTabContent: FunctionComponent<{ study: IStudy }> = ({
</div >;
};

export const UploadFileComponent: FunctionComponent<{ study: IStudy, fields: IField[], fieldIds: string[] }> = ({ study, fields, fieldIds }) => {
export const UploadFileComponent: FunctionComponent<{ study: IStudy, fields: IField[], fieldIds: string[], setIsUploading: (isUploading: boolean) => void, setProgress: (progress: number) => void }> = ({ study, fields, fieldIds, setIsUploading, setProgress }) => {
const queryClient = useQueryClient();
const [__unused__api, contextHolder] = notification.useNotification();
const [isShowPanel, setIsShowPanel] = React.useState(false);
const [fileList, setFileList] = useState<RcFile[]>([]);
const [fileProperties, setFileProperties] = useState({
fieldId: ''
});
const [isUploading, setIsUploading] = useState(false);
const getCurrentDomain = trpc.domain.getCurrentDomain.useQuery();
const [form] = Form.useForm();
let selectedField = fields.filter(el => el.fieldId === fileProperties.fieldId)[0];
Expand All @@ -101,26 +98,32 @@ export const UploadFileComponent: FunctionComponent<{ study: IStudy, fields: IFi
try {
setIsShowPanel(false);
setIsUploading(true);
const files = await convertFileListToApiFormat(fileList, 'file');
const formData = new FormData();
if (files.length > 0) {
files.forEach(file => {
formData.append('file', file.stream, file.originalname);
});

// Append file
if (fileList.length > 0) {
formData.append('file', fileList[0]);
}

// Append additional fields
formData.append('fieldId', String(variables.fieldId));
formData.append('studyId', String(variables.studyId));
formData.append('properties', JSON.stringify({
...variables,
FileName: fileList[0].name
FileName: fileList[0]?.name || 'unknown'
}));
// Axios request
const response = await axios.post('/trpc/data.uploadStudyFileData', formData, {
headers: {
'Content-Type': 'multipart/form-data'
},
onUploadProgress: (progressEvent) => {
if (progressEvent.total) {
const percentCompleted = Math.round((progressEvent.loaded * 100) / progressEvent.total);
setProgress(percentCompleted);
}
}
});

if (response?.data?.result?.data?.id) {
const queryKey = [['data', 'getFiles'], {
input: {
Expand Down Expand Up @@ -156,6 +159,7 @@ export const UploadFileComponent: FunctionComponent<{ study: IStudy, fields: IFi
}
} finally {
setIsUploading(false);
setProgress(0);
}

};
Expand Down Expand Up @@ -282,28 +286,13 @@ export const UploadFileComponent: FunctionComponent<{ study: IStudy, fields: IFi
}
</Form>
</Modal>
{
isUploading ? (
<div
style={{
position: 'absolute',
top: '10%',
right: '0%',
display: 'flex',
alignItems: 'center',
transform: 'translate(-50%, -50%)'
}}
>
<ClipLoader />
<span style={{ marginLeft: '8px' }}>Uploading...Please wait</span>
</div>
) : null
}
</div >);
};

export const FileBlock: FunctionComponent<{ user: IUserWithoutToken, fields: IField[], study: IStudy, block: IStudyFileBlock }> = ({ user, fields, study, block }) => {
const [isUploading, setIsUploading] = useState(false);
const queryClient = useQueryClient();
const [progress, setProgress] = useState(0);
const [searchedKeyword, setSearchedKeyword] = useState<string | undefined>(undefined);
const [isModalOn, setIsModalOn] = useState(false);
const getFiles = trpc.data.getFiles.useQuery({ studyId: study.id, fieldIds: block.fieldIds, readable: true, useCache: false });
Expand Down Expand Up @@ -449,20 +438,39 @@ export const FileBlock: FunctionComponent<{ user: IUserWithoutToken, fields: IFi
<div>{block.title}</div>
</div>
<div>
<UploadFileComponent study={study} fields={fields.filter(el => el.dataType === enumDataTypes.FILE)} fieldIds={block.fieldIds} />
<UploadFileComponent study={study} fields={fields.filter(el => el.dataType === enumDataTypes.FILE)} fieldIds={block.fieldIds} setIsUploading={setIsUploading} setProgress={setProgress} />
</div>
</div>
}
>
<List.Item>
<div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', width: '100%' }}>
<div style={{ width: '70%' }}>
<div style={{ width: '50%' }}>
<Input
value={searchedKeyword}
placeholder="Search"
onChange={(e) => setSearchedKeyword(e.target.value)}
/>
</div>
<div style={{ width: '20%', textAlign: 'right' }}>
{
isUploading ? (
<div
style={{
position: 'absolute',
top: '2%',
right: '25%',
display: 'flex',
alignItems: 'center',
transform: 'translate(-50%, -50%)'
}}
>
<Progress type='circle' size={60} percent={Math.min(progress, 99)} />
<span style={{ fontSize: '20px', fontWeight: 'bold', color: 'black', marginRight: '20px' }}>{progress >= 99 ? 'Finishing' : 'Uploading'}</span>
</div>
) : null
}
</div>
<div style={{ width: '30%', textAlign: 'right' }}>
<span style={{ fontSize: '16px', fontWeight: 'bold', color: 'black', marginRight: '20px' }}>
{`Files: ${filteredFiles.length}`}
Expand Down

0 comments on commit f006b1a

Please sign in to comment.