From ce0c095bbff56a7e2809f6bde61a5b6a14997e8d Mon Sep 17 00:00:00 2001 From: yhostc Date: Sun, 5 May 2024 18:01:19 +0800 Subject: [PATCH] chore: update --- .../src/modules/files/dto/queue.dto.ts | 5 +- .../src/modules/files/files.controller.ts | 29 ++++---- .../src/modules/files/files.service.ts | 35 +++++---- .../modules/files/process/clean.processor.ts | 35 ++++++--- .../files/process/download.processor.ts | 74 +++++++++---------- .../files/process/extract.processor.ts | 70 +++++++----------- .../src/modules/files/service/file.service.ts | 37 +++++----- 7 files changed, 139 insertions(+), 146 deletions(-) diff --git a/apps/bodhi-service/src/modules/files/dto/queue.dto.ts b/apps/bodhi-service/src/modules/files/dto/queue.dto.ts index d3d495b..42fec95 100644 --- a/apps/bodhi-service/src/modules/files/dto/queue.dto.ts +++ b/apps/bodhi-service/src/modules/files/dto/queue.dto.ts @@ -18,7 +18,7 @@ export class ExtractQueueDto { id: number; @ApiProperty() - mimetype: string; + mimeType: string; @ApiProperty() folderPath: string; @@ -30,7 +30,4 @@ export class ExtractQueueDto { export class CleanQueueDto { @ApiProperty() id: number; - - @ApiProperty() - user_id: number; } diff --git a/apps/bodhi-service/src/modules/files/files.controller.ts b/apps/bodhi-service/src/modules/files/files.controller.ts index d9fbef0..ccae280 100644 --- a/apps/bodhi-service/src/modules/files/files.controller.ts +++ b/apps/bodhi-service/src/modules/files/files.controller.ts @@ -11,7 +11,6 @@ import { RequestWithUser } from '@/core/common/request.interface'; import { JwtOrApiKeyGuard } from '../auth/guard/mixed.guard'; import { FileDto, UploadFileReq } from './dto/upload.dto'; import { FilesService } from './files.service'; -import { FileService } from './service'; @ApiTags('files') @ApiBearerAuth() @@ -19,10 +18,7 @@ import { FileService } from './service'; @ApiSecurity('api-key', []) @Controller('files') export class FilesController { - constructor( - private readonly file: FileService, - private readonly service: FilesService, - ) {} + constructor(private readonly service: FilesService) {} @Get() @ApiOperation({ summary: 'Get Files', description: 'Get Files' }) @@ -31,11 +27,11 @@ export class FilesController { const { user_id, client_user_id = '' } = req.user; // from jwt or apikey try { - const rows = await this.service.findActiveFilesByUserId(user_id, client_user_id); + const rows = await this.service.findActiveByUserId(user_id, client_user_id); return rows.map((item) => { const url = `https://s.alidraft.com${item.path}`; const { name, size, mimetype, expires_at } = item; - const id = this.file.encodeId(item.id); + const id = this.service.encodeId(item.id); return { id, name, size, mimetype, url, expires_at }; }); } catch (err) { @@ -52,12 +48,12 @@ export class FilesController { async upload(@Req() req: RequestWithUser, @UploadedFiles() files, @Body() body: UploadFileReq) { const { user_id, client_user_id = '' } = req.user; // from jwt or apikey const { purpose } = body; - const expires_at = new Date(Date.now() + 1000 * 60 * 60 * 24 * 30); // 30 days + const expires_at = new Date(Date.now() + 1000 * 60 * 60 * 24 * 15); // 15 days try { // 计算并检查hash return Promise.all( - files.map((file) => { + files.map((file: Express.Multer.File) => { const hashhex = createHash('md5'); hashhex.update(file.buffer); const hash = hashhex.digest('hex'); @@ -75,14 +71,14 @@ export class FilesController { } @Get(':id') - @ApiOperation({ summary: 'Get file detail', description: 'Get file detail' }) + @ApiOperation({ summary: 'Find file detail', description: 'Find file detail' }) @ApiResponse({ status: 200, description: 'success', type: FileDto }) - async get(@Req() req: RequestWithUser, @Param('id') file_id: string): Promise { + async find(@Req() req: RequestWithUser, @Param('id') file_id: string): Promise { const { user_id, client_user_id = '' } = req.user; // from jwt or apikey try { - const id = this.file.decodeId(file_id); - const file = await this.service.findActiveById(id, user_id, client_user_id); + const id = this.service.decodeId(file_id); + const file = await this.service.findById(id, user_id, client_user_id); const url = `https://s.alidraft.com${file.path}`; delete file.path; @@ -99,10 +95,11 @@ export class FilesController { const { user_id, client_user_id = '' } = req.user; // from jwt or apikey try { - const id = this.file.decodeId(file_id); - const file = await this.service.findActiveById(id, user_id, client_user_id); + const id = this.service.decodeId(file_id); + const file = await this.service.findById(id, user_id, client_user_id); if (file && ['active', 'expired', 'created'].includes(file.state)) { - return this.service.delete(id, user_id, client_user_id); + this.service.delete(id, user_id); + return; } throw new NotFoundException(); } catch (err) { diff --git a/apps/bodhi-service/src/modules/files/files.service.ts b/apps/bodhi-service/src/modules/files/files.service.ts index 0970a9b..3563f02 100644 --- a/apps/bodhi-service/src/modules/files/files.service.ts +++ b/apps/bodhi-service/src/modules/files/files.service.ts @@ -3,6 +3,7 @@ import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Queue } from 'bull'; +import Hashids from 'hashids'; import * as mime from 'mime-types'; import * as moment from 'moment-timezone'; @@ -12,6 +13,7 @@ import { FileService } from './service'; @Injectable() export class FilesService { + private readonly hashids: Hashids; private readonly storage: Storage; constructor( @@ -20,20 +22,29 @@ export class FilesService { private readonly file: FileService, private readonly config: ConfigService, ) { + this.hashids = new Hashids('bodhi-files', 10); this.storage = new Storage({ credentials: this.config.get('gcloud') }); } - async findActiveFilesByUserId(user_id: number, client_user_id?: string) { + encodeId(id: number) { + return this.hashids.encode(id); + } + + decodeId(id: string) { + return this.hashids.decode(id)[0] as number; + } + + async findActiveByUserId(user_id: number, client_user_id?: string) { return this.file.findActiveByUserId(user_id, client_user_id); } - async findActiveById(id: number, user_id: number, client_user_id?: string) { - return this.file.findActive(id, user_id, client_user_id); + async findById(id: number, user_id: number, client_user_id?: string) { + return this.file.find(id, user_id, client_user_id); } - async delete(id: number, user_id: number, client_user_id?: string) { - this.queue.add('clean', { id, user_id }); - return this.file.delete(id, user_id, client_user_id); + async delete(id: number, user_id: number) { + this.queue.add('file-clean', { id }); + return this.file.delete(id, user_id); } async uploadFile(file: Express.Multer.File, opts: Partial, purpose: string): Promise { @@ -41,9 +52,8 @@ export class FilesService { // 检查是否已经上传 let f = await this.file.findActiveByHash(hash); if (f) { - const id = this.file.encodeId(f.id); const url = `https://s.alidraft.com${f.path}`; - return { id, name, size, mimetype, url, expires_at: f.expires_at }; + return { id: this.encodeId(f.id), name, size, mimetype, url, expires_at: f.expires_at }; } // 初次上传 @@ -58,16 +68,15 @@ export class FilesService { await this.storage.bucket(bucket).file(filePath).save(file.buffer); // file extract - if (mimetype.includes('pdf') && purpose === 'file-extract') { - this.queue.add('extract', { id: f.id, mimetype, folderPath, filePath }); + if (purpose === 'file-extract') { + this.queue.add('file-extract', { id: f.id, mimeType: mimetype, folderPath, filePath }); } // 更新文件 this.file.update(f.id, { path: filePath, state: FileState.ACTIVE }); - const id = this.file.encodeId(f.id); const url = `https://s.alidraft.com/${filePath}`; - return { id, name, url, size, mimetype, expires_at: f.expires_at } as FileDto; + return { id: this.encodeId(f.id), name, url, size, mimetype, expires_at: f.expires_at } as FileDto; } catch (err) { console.warn(err); @@ -94,7 +103,7 @@ export class FilesService { // 新建下载任务 file = await this.file.create({ file_id, name }); - this.queue.add('download', { id: file.id, file_id, name, url }); + this.queue.add('file-download', { id: file.id, file_id, name, url }); return { id: file.id, name, url }; } diff --git a/apps/bodhi-service/src/modules/files/process/clean.processor.ts b/apps/bodhi-service/src/modules/files/process/clean.processor.ts index adfc885..d8c6e4d 100644 --- a/apps/bodhi-service/src/modules/files/process/clean.processor.ts +++ b/apps/bodhi-service/src/modules/files/process/clean.processor.ts @@ -1,8 +1,8 @@ import { Storage } from '@google-cloud/storage'; -import { Process, Processor } from '@nestjs/bull'; +import { InjectQueue, Process, Processor } from '@nestjs/bull'; import { ConfigService } from '@nestjs/config'; -import { Job } from 'bull'; -import { GoogleAuth } from 'google-auth-library'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { Job, Queue } from 'bull'; import path from 'path'; import { CleanQueueDto } from '../dto/queue.dto'; @@ -11,26 +11,41 @@ import { FileService } from '../service/file.service'; @Processor('bodhi') export class CleanProcessor { private readonly storage: Storage; - private readonly auth: GoogleAuth; constructor( + @InjectQueue('bodhi') + private readonly queue: Queue, private readonly config: ConfigService, private readonly file: FileService, ) { this.storage = new Storage({ credentials: this.config.get('gcloud') }); } - @Process('clean') + /** + * Auto clean expired files + */ + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async handleDailyCleanExpired() { + const files = await this.file.findExpired(); + for (const file of files) { + this.queue.add('file-clean', { id: file.id }); + } + } + + /** + * Clean file from GCP Storage + * @param job + */ + @Process('file-clean') async clean(job: Job) { - console.log(`[file]progress:clean`, job.data); - const { id, user_id } = job.data; - const file = await this.file.findActive(id, user_id); + console.log(`[files]progress:clean`, job.data); + const { id } = job.data; try { - if (file.mimetype === 'application/pdf') { + const file = await this.file.find(id); + if (file && file.mimetype === 'application/pdf') { const { bucket } = this.config.get('gcloud'); const prefix = path.dirname(file.path); this.storage.bucket(bucket).deleteFiles({ prefix, force: true }); - console.log(`->deleted`, prefix); } } catch (err) { console.warn(`[file]progress:clean`, err.message); diff --git a/apps/bodhi-service/src/modules/files/process/download.processor.ts b/apps/bodhi-service/src/modules/files/process/download.processor.ts index 1561801..78521fe 100644 --- a/apps/bodhi-service/src/modules/files/process/download.processor.ts +++ b/apps/bodhi-service/src/modules/files/process/download.processor.ts @@ -18,48 +18,44 @@ export class DownloadProcessor { private readonly file: FileService, ) {} - @Process('download') + @Process('file-download') async expired(job: Job) { - console.log(`[file]process`, job.data); + console.log(`[file]processs`, job.data); const { id, url } = job.data; - /* eslint no-async-promise-executor: */ - return new Promise(async (resolve, reject) => { - try { - // download & upload - const { buffer, mimetype, size } = await fetch(url, { - headers: { - 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', - 'sec-ch-ua-mobile': '?0', - 'User-Agent': - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', - 'sec-ch-ua-platform': '"macOS"', - }, - agent: new HttpsProxyAgent(this.config.get('proxy')), - }).then((res) => { - const mimetype = res.headers.get('Content-Type') as string; - return res.buffer().then((buffer) => ({ buffer, mimetype, size: buffer.length })); - }); - console.log(`[file]process`, mimetype, size, url); + try { + // download & upload + const { buffer, mimetype, size } = await fetch(url, { + headers: { + 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', + 'sec-ch-ua-mobile': '?0', + 'User-Agent': + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'sec-ch-ua-platform': '"macOS"', + }, + agent: new HttpsProxyAgent(this.config.get('proxy')), + }).then((res) => { + const mimetype = res.headers.get('Content-Type') as string; + return res.buffer().then((buffer) => ({ buffer, mimetype, size: buffer.length })); + }); + console.log(`[files]process`, mimetype, size, url); - // Upload to oss - const hashhex = createHash('md5'); - hashhex.update(buffer); - const hash = hashhex.digest('hex'); - const ext = mime.extension(mimetype); - const path = `/attachments/${moment.tz('Asia/Shanghai').format('YYYYMM')}/${uuidv4()}.${ext}`; - this.file.update(id, { hash, path, size, mimetype }); + // Upload to oss + const hashhex = createHash('md5'); + hashhex.update(buffer); + const hash = hashhex.digest('hex'); + const ext = mime.extension(mimetype); + const path = `/attachments/${moment.tz('Asia/Shanghai').format('YYYYMM')}/${uuidv4()}.${ext}`; + this.file.update(id, { hash, path, size, mimetype }); - // const { res }: any = await putStream(path, { buffer, size }); - // console.log(`[file]process`, res.statusMessage); - // if (res.statusMessage === 'OK') { - // this.file.updateState(id, FileState.ACTIVE); - // resolve(res); - // } - // reject(res.statusMessage); - } catch (err) { - console.warn(`[file]process`, err.message); - reject(err); - } - }); + // const { res }: any = await putStream(path, { buffer, size }); + // console.log(`[file]process`, res.statusMessage); + // if (res.statusMessage === 'OK') { + // this.file.updateState(id, FileState.ACTIVE); + // resolve(res); + // } + // reject(res.statusMessage); + } catch (err) { + console.warn(`[files]process`, err.message); + } } } diff --git a/apps/bodhi-service/src/modules/files/process/extract.processor.ts b/apps/bodhi-service/src/modules/files/process/extract.processor.ts index 73b9b64..ec56c90 100644 --- a/apps/bodhi-service/src/modules/files/process/extract.processor.ts +++ b/apps/bodhi-service/src/modules/files/process/extract.processor.ts @@ -25,51 +25,31 @@ export class ExtractProcessor { this.storage = new Storage({ credentials: this.config.get('gcloud') }); } - @Process('extract') + @Process('file-extract') async extract(job: Job) { - const { id, folderPath, filePath } = job.data; - console.log(`[file]extract`, id, job.data); - - const { bucket, processor } = this.config.get('gcloud'); - const token = await this.auth.getAccessToken(); - const res = await fetch(`${processor}:process`, { - headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` }, - agent: new HttpsProxyAgent(process.env.HTTP_PROXY as string), - body: JSON.stringify({ - skipHumanReview: true, - gcsDocument: { mimeType: 'application/pdf', gcsUri: `gs://${bucket}/${filePath}` }, - // inputDocuments: { - // gcsDocuments: { - // documents: [{ mimeType: 'application/pdf', gcsUri: `gs://${bucket}/${path}` }], - // }, - // }, - // documentOutputConfig: { - // gcsOutputConfig: { gcsUri: `gs://${bucket}/extract/` }, - // }, - }), - method: 'POST', - }).then((res) => res.json()); - - // 存储原始JSON - await this.storage - .bucket(bucket) - .file(`${folderPath}/1.json`) - .save(JSON.stringify(res, null, 2)); - - // 存储文本 - this.file.update(id, { extract: res?.document?.text }); - - // { - // name: 'projects/844941471694/locations/us/operations/15081559040310862204', - // metadata: { - // '@type': 'type.googleapis.com/google.cloud.documentai.v1.BatchProcessMetadata', - // state: 'RUNNING', - // createTime: '2024-05-04T15:04:57.750101Z', - // updateTime: '2024-05-04T15:04:57.750101Z' - // } - // } - - console.log(`->res`, res); - // 更新状态 + const { id, mimeType, folderPath, filePath } = job.data; + console.log(`[files]extract`, job.data); + + if (['application/pdf'].includes(mimeType)) { + const token = await this.auth.getAccessToken(); + const { bucket, processor } = this.config.get('gcloud'); + const res = await fetch(`${processor}:process`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` }, + agent: new HttpsProxyAgent(process.env.HTTP_PROXY as string), + body: JSON.stringify({ + skipHumanReview: true, + gcsDocument: { mimeType, gcsUri: `gs://${bucket}/${filePath}` }, + }), + }).then((res) => res.json()); + + // 存储原始JSON + await this.storage.bucket(bucket).file(`${folderPath}/1.json`).save(JSON.stringify(res)); + + // 存储文本 + this.file.update(id, { extract: res?.document?.text }); + } else { + console.warn(`[files]extract:unsupported`, mimeType); + } } } diff --git a/apps/bodhi-service/src/modules/files/service/file.service.ts b/apps/bodhi-service/src/modules/files/service/file.service.ts index 5236eb7..d54a8aa 100644 --- a/apps/bodhi-service/src/modules/files/service/file.service.ts +++ b/apps/bodhi-service/src/modules/files/service/file.service.ts @@ -1,37 +1,31 @@ import { Injectable } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; import { InjectRepository } from '@nestjs/typeorm'; -import Hashids from 'hashids'; -import { IsNull, MoreThan, Repository } from 'typeorm'; +import { In, IsNull, MoreThan, Repository } from 'typeorm'; import { File, FileState } from '../entity/file.entity'; @Injectable() export class FileService { - private readonly hashids: Hashids; - constructor( @InjectRepository(File) private readonly repository: Repository, - ) { - this.hashids = new Hashids('bodhi-files', 10); - } - - encodeId(id: number) { - return this.hashids.encode(id); - } - - decodeId(id: string) { - return this.hashids.decode(id)[0] as number; + ) {} + + @Cron(CronExpression.EVERY_HOUR) + async handleExpired() { + const files = await this.repository.find({ where: { expires_at: MoreThan(new Date()), state: FileState.ACTIVE } }); + for (const file of files) { + this.repository.update(file.id, { state: FileState.EXPIRED }); + } } async create(opts: Partial) { return this.repository.save(this.repository.create(opts)); } - async delete(id: number, user_id: number, client_user_id?: string) { + async delete(id: number, user_id: number) { const query = { id, user_id }; - client_user_id && (query['client_user_id'] = client_user_id); - return this.repository.update(query, { state: FileState.DELETED }); } @@ -48,8 +42,9 @@ export class FileService { }); } - async findActive(id: number, user_id: number, client_user_id?: string) { - const query = { id, user_id, state: FileState.ACTIVE }; + async find(id: number, user_id?: number, client_user_id?: string) { + const query = { id }; + user_id && (query['user_id'] = user_id); client_user_id && (query['client_user_id'] = client_user_id); return this.repository.findOne({ @@ -83,4 +78,8 @@ export class FileService { async findActiveByFileID(file_id: string) { return this.repository.findOne({ where: { file_id, state: FileState.ACTIVE } }); } + + async findExpired() { + return this.repository.find({ where: { state: FileState.EXPIRED } }); + } }