From 6b2e2d016a057ce08713a4425775d0d6fc983528 Mon Sep 17 00:00:00 2001 From: lovehunter9 Date: Thu, 8 Aug 2024 14:55:06 +0800 Subject: [PATCH] feat: uploader4.0 - same as seafile with resumable.js --- cmd/upload/app/handler4.go | 465 ++++++++++++++++++++++++++++ cmd/upload/app/os_system.go | 128 ++++++++ cmd/upload/app/server.go | 24 +- pkg/upload/fileutils/checkfolder.go | 5 +- pkg/upload/fileutils/constants.go | 2 + pkg/upload/fileutils/file.go | 100 ++++++ pkg/upload/fileutils/fileinfos.go | 12 + pkg/upload/models/upload.go | 14 + pkg/upload/uid/uid.go | 8 + 9 files changed, 755 insertions(+), 3 deletions(-) create mode 100644 cmd/upload/app/handler4.go create mode 100644 cmd/upload/app/os_system.go diff --git a/cmd/upload/app/handler4.go b/cmd/upload/app/handler4.go new file mode 100644 index 0000000..3d8c7f4 --- /dev/null +++ b/cmd/upload/app/handler4.go @@ -0,0 +1,465 @@ +package app + +import ( + "bytetrade.io/web3os/tapr/pkg/upload/fileutils" + "bytetrade.io/web3os/tapr/pkg/upload/models" + "bytetrade.io/web3os/tapr/pkg/upload/uid" + "bytetrade.io/web3os/tapr/pkg/utils" + "fmt" + "github.com/gofiber/fiber/v2" + "k8s.io/klog/v2" + "os" + "path/filepath" + "strings" + "time" +) + +const ( + CacheRequestPrefix = "/AppData" + CachePathPrefix = "/appcache" +) + +func getPVC(c *fiber.Ctx) (string, string, string, string, error) { + bflName := c.Get("X-Bfl-User") + klog.Info("BFL_NAME: ", bflName) + + userPvc, err := PVCs.getUserPVCOrCache(bflName) // appdata.GetAnnotation(p.mainCtx, p.k8sClient, "userspace_pvc", bflName) + if err != nil { + klog.Info(err) + return bflName, "", "", "", err + } else { + klog.Info("user-space pvc: ", userPvc) + } + + cachePvc, err := PVCs.getCachePVCOrCache(bflName) // appdata.GetAnnotation(p.mainCtx, p.k8sClient, "appcache_pvc", bflName) + if err != nil { + klog.Info(err) + return bflName, "", "", "", err + } else { + klog.Info("appcache pvc: ", cachePvc) + } + + var uploadsDir = "" + if val, ok := fileutils.UploadsDirs4[bflName]; ok { + uploadsDir = val + } else { + uploadsDir = CachePathPrefix + "/" + cachePvc + "/files/uploadstemp" + fileutils.UploadsDirs4[bflName] = uploadsDir + } + + return bflName, userPvc, cachePvc, uploadsDir, nil +} + +func (a *appController) UploadLink(c *fiber.Ctx) error { + _, userPvc, cachePvc, uploadsDir, err := getPVC(c) + if err != nil { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "bfl header missing or invalid", nil)) + } + + path := c.Query("p", "") + if path == "" { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "missing path query parameter", nil)) + } + + if !strings.HasSuffix(path, "/") { + path = path + "/" + } + if strings.HasPrefix(path, CacheRequestPrefix) { + path = CachePathPrefix + strings.TrimPrefix(path, CacheRequestPrefix) + path = rewriteUrl(path, cachePvc, CachePathPrefix) + } else { + path = rewriteUrl(path, userPvc, "") + } + + if !utils.PathExists(uploadsDir) { + if err := os.MkdirAll(uploadsDir, os.ModePerm); err != nil { + klog.Warning("err:", err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, "failed to create folder", nil)) + } + } + klog.Infof("c:%+v", c) + + if !utils.CheckDirExist(path) { + if err := os.MkdirAll(path, os.ModePerm); err != nil { + klog.Warning("err:", err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, "failed to create folder", nil)) + } + } + + uploadID := uid.MakeUid(path) + + uploadLink := fmt.Sprintf("/upload/upload-link/%s", uploadID) + + return c.SendString(uploadLink) +} + +func (a *appController) UploadedBytes(c *fiber.Ctx) error { + _, userPvc, cachePvc, uploadsDir, err := getPVC(c) + if err != nil { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "bfl header missing or invalid", nil)) + } + + parentDir := c.Query("parent_dir", "") + if parentDir == "" { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "missing parent_dir query parameter", nil)) + } + + if !strings.HasSuffix(parentDir, "/") { + parentDir = parentDir + "/" + } + if strings.HasPrefix(parentDir, CacheRequestPrefix) { + parentDir = CachePathPrefix + strings.TrimPrefix(parentDir, CacheRequestPrefix) + parentDir = rewriteUrl(parentDir, cachePvc, CachePathPrefix) + } else { + parentDir = rewriteUrl(parentDir, userPvc, "") + } + + if !utils.CheckDirExist(parentDir) { + klog.Warningf("Storage path %s is not exist or is not a dir", parentDir) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "Storage path is not exist or is not a dir", nil)) + } + + fileName := c.Query("file_name", "") + if fileName == "" { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "file_relative_path invalid", nil)) + } + + responseData := make(map[string]interface{}) + responseData["uploadedBytes"] = 0 + + if !utils.PathExists(uploadsDir) { + return c.JSON(responseData) + } + klog.Infof("c:%+v", c) + + fullPath := filepath.Join(parentDir, fileName) + + dirPath := filepath.Dir(fullPath) + + if !utils.CheckDirExist(dirPath) { + return c.JSON(responseData) + } + + if strings.HasSuffix(fileName, "/") { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, fmt.Sprintf("full path %s is a dir", fullPath), nil)) + } + + //Generate unique Upload-ID + //uploadID := uid.MakeUid(fullPath) + //resumableIdentifier := uid.GenerateUniqueIdentifier(fileName) + innerIdentifier := uid.MakeUid(fullPath) + exist, info := a.server.fileInfoMgr.ExistFileInfo(innerIdentifier) + fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile4(innerIdentifier, uploadsDir) + if exist { + if fileExist { + if info.Offset != fileLen { + info.Offset = fileLen + a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info) + } + klog.Infof("innerIdentifier:%s, info.Offset:%d", innerIdentifier, info.Offset) + responseData["uploadedBytes"] = info.Offset + } else if info.Offset == 0 { + klog.Warningf("innerIdentifier:%s, info.Offset:%d", innerIdentifier, info.Offset) + } else { + a.server.fileInfoMgr.DelFileInfo4(innerIdentifier, uploadsDir) + } + } + return c.JSON(responseData) +} + +func (a *appController) UploadChunks(c *fiber.Ctx) error { + _, userPvc, cachePvc, uploadsDir, err := getPVC(c) + if err != nil { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "bfl header missing or invalid", nil)) + } + + responseData := make(map[string]interface{}) + responseData["success"] = true + + uploadID := c.Params("uid") + + if !utils.PathExists(uploadsDir) { + if err := os.MkdirAll(uploadsDir, os.ModePerm); err != nil { + klog.Warningf("uploadID:%s, err:%v", uploadID, err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, "failed to create folder", nil)) + } + } + + klog.Infof("uploadID:%s, c:%+v", uploadID, c) + + var resumableInfo models.ResumableInfo + if err = c.BodyParser(&resumableInfo); err != nil { + klog.Warningf("uploadID:%s, err:%v", uploadID, err) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "param invalid", nil)) + } + + parentDir := resumableInfo.ParentDir + if !strings.HasSuffix(parentDir, "/") { + parentDir = parentDir + "/" + } + if strings.HasPrefix(parentDir, CacheRequestPrefix) { + parentDir = CachePathPrefix + strings.TrimPrefix(parentDir, CacheRequestPrefix) + parentDir = rewriteUrl(parentDir, cachePvc, CachePathPrefix) + } else { + parentDir = rewriteUrl(parentDir, userPvc, "") + } + if uploadID != uid.MakeUid(parentDir) { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "invalid upload link", nil)) + } + + resumableInfo.File, err = c.FormFile("file") + if err != nil || resumableInfo.File == nil { + klog.Warningf("uploadID:%s, Failed to parse file: %v\n", uploadID, err) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "param invalid", nil)) + } + + klog.Infof("uploadID:%s, patchInfo:%+v", uploadID, resumableInfo) + + // Get file information based on upload ID + fullPath := filepath.Join(parentDir, resumableInfo.ResumableRelativePath) + //resumableIdentifier := resumableInfo.ResumableIdentifier + innerIdentifier := uid.MakeUid(fullPath) + exist, info := a.server.fileInfoMgr.ExistFileInfo(innerIdentifier) + if !exist { + klog.Warningf("innerIdentifier %s not exist", innerIdentifier) + //return c.Status(fiber.StatusBadRequest).JSON( + // models.NewResponse(1, "Invalid innerIdentifier", nil)) + } + klog.Infof("innerIdentifier:%s, info:%+v", innerIdentifier, info) + if innerIdentifier != info.ID { + klog.Warningf("innerIdentifier:%s diff from info:%+v", innerIdentifier, info) + } + + if !exist || innerIdentifier != info.ID { + //clear temp file and reset info + fileutils.RemoveTempFileAndInfoFile4(innerIdentifier, uploadsDir) + if info.Offset != 0 { + info.Offset = 0 + a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info) + } + + //do creation when the first chunk + if !utils.CheckDirExist(parentDir) { + klog.Warningf("Parent dir %s is not exist or is not a dir", resumableInfo.ParentDir) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "Parent dir is not exist or is not a dir", nil)) + } + + //fullPath := filepath.Join(resumableInfo.ParentDir, resumableInfo.ResumableRelativePath) + + dirPath := filepath.Dir(fullPath) + + if !utils.CheckDirExist(dirPath) { + if err := os.MkdirAll(dirPath, os.ModePerm); err != nil { + klog.Warning("err:", err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, "failed to create folder", nil)) + } + } + + if resumableInfo.ResumableRelativePath == "" { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "file_relative_path invalid", nil)) + } + + if strings.HasSuffix(resumableInfo.ResumableRelativePath, "/") { + klog.Warningf("full path %s is a dir", fullPath) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, fmt.Sprintf("full path %s is a dir", fullPath), nil)) + } + + // Make support judgment after parsing the file type + if !a.server.checkType(resumableInfo.ResumableType) { + klog.Warningf("unsupported filetype:%s", resumableInfo.ResumableType) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "Unsupported file type", nil)) + } + + if !a.server.checkSize(resumableInfo.ResumableTotalSize) { + klog.Warningf("Unsupported file size uploadSize:%d", resumableInfo.ResumableTotalSize) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "Unsupported file size", nil)) + } + + info.FileSize = resumableInfo.ResumableTotalSize + a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info) + + //Generate unique Upload-ID + //uploadID := uid.MakeUid(uploadInfo.FullPath) + oExist, oInfo := a.server.fileInfoMgr.ExistFileInfo(innerIdentifier) + oFileExist, oFileLen := a.server.fileInfoMgr.CheckTempFile4(innerIdentifier, uploadsDir) + if oExist { + if oFileExist { + if oInfo.Offset != oFileLen { + oInfo.Offset = oFileLen + a.server.fileInfoMgr.UpdateInfo(innerIdentifier, oInfo) + } + klog.Infof("innerIdentifier:%s, info.Offset:%d", innerIdentifier, oInfo.Offset) + //return c.Status(fiber.StatusOK).JSON( + // models.NewResponse(0, "success", info)) + return c.JSON(responseData) + } else if oInfo.Offset == 0 { + klog.Warningf("innerIdentifier:%s, info.Offset:%d", innerIdentifier, oInfo.Offset) + //return c.Status(fiber.StatusOK).JSON( + // models.NewResponse(0, "success", info)) + return c.JSON(responseData) + } else { + a.server.fileInfoMgr.DelFileInfo4(innerIdentifier, uploadsDir) + } + } + + fileInfo := models.FileInfo{ + ID: innerIdentifier, + Offset: 0, + FileMetaData: models.FileMetaData{ + FileRelativePath: resumableInfo.ResumableRelativePath, + FileType: resumableInfo.ResumableType, + FileSize: resumableInfo.ResumableTotalSize, + StoragePath: parentDir, //resumableInfo.ParentDir, + FullPath: fullPath, + }, + } + + if oFileExist { + fileInfo.Offset = oFileLen + } + + err = a.server.fileInfoMgr.AddFileInfo(innerIdentifier, fileInfo) + if err != nil { + klog.Warningf("innerIdentifier:%s, err:%v", innerIdentifier, err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, "Error save file info", nil)) + } + + klog.Infof("innerIdentifier:%s, fileInfo:%+v", innerIdentifier, fileInfo) + //return c.Status(fiber.StatusOK).JSON( + // models.NewResponse(0, "success", fileInfo)) + // can't return here + info = fileInfo + } + + fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile4(innerIdentifier, uploadsDir) + if fileExist { + klog.Infof("innerIdentifier %s temp file exist, info.Offset:%d, fileLen:%d", innerIdentifier, info.Offset, fileLen) + if info.Offset != fileLen { + info.Offset = fileLen + a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info) + } + } + + // Check if file size and offset match + // not functional when resumable.js + //if patchInfo.UploadOffset != info.Offset { + // klog.Warningf("uploadID %s, patchInfo.UploadOffset:%d diff from info.Offset:%d, info:%v", uploadID, patchInfo.UploadOffset, info.Offset, info) + // return c.Status(fiber.StatusBadRequest).JSON( + // models.NewResponse(1, "Invalid offset", nil)) + //} + + fileHeader := resumableInfo.File + size := fileHeader.Size + + klog.Infof("fileHeader.Size:%d, info.Offset:%d, info.FileSize:%d", + fileHeader.Size, info.Offset, info.FileSize) + if !a.server.checkSize(size) || size+info.Offset > info.FileSize { + klog.Warningf("Unsupported file size uploadSize:%d", size) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "Unsupported file size", nil)) + } + + ranges := c.Get("Content-Range") + var offset int64 + var parsed bool + if ranges != "" { + offset, parsed = fileutils.ParseContentRange(ranges) + if !parsed { + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "Invalid content range", nil)) + } + } + + const maxRetries = 100 + for retry := 0; retry < maxRetries; retry++ { + if info.Offset == offset { + fileSize, err := fileutils.SaveFile(fileHeader, fileutils.GetTempFilePathById4(innerIdentifier, uploadsDir)) + if err != nil { + klog.Warningf("innerIdentifier:%s, info:%+v, err:%v", innerIdentifier, info, err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, err.Error(), info)) + } + info.Offset = fileSize + a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info) + break + } + + time.Sleep(500 * time.Millisecond) + + klog.Infof("Waiting for info.Offset to match offset (%d != %d), retry %d/%d", info.Offset, offset, retry+1, maxRetries) + + if retry < maxRetries-1 { + exist, info = a.server.fileInfoMgr.ExistFileInfo(innerIdentifier) + if !exist { + klog.Warningf("innerIdentifier %s not exist", innerIdentifier) + return c.Status(fiber.StatusBadRequest).JSON( + models.NewResponse(1, "Invalid innerIdentifier", nil)) + } + continue + } + + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, "Failed to match offset after multiple retries", info)) + } + + // Update file information for debug + err = fileutils.UpdateFileInfo4(info, uploadsDir) + if err != nil { + klog.Warningf("innerIdentifier:%s, info:%+v, err:%v", innerIdentifier, info, err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, err.Error(), info)) + } + + // Check if the file has been written + if info.Offset == info.FileSize { + // Move the file to the specified upload path + err = fileutils.MoveFileByInfo4(info, uploadsDir) + if err != nil { + klog.Warningf("innerIdentifier:%s, info:%+v, err:%v", innerIdentifier, info, err) + return c.Status(fiber.StatusInternalServerError).JSON( + models.NewResponse(1, err.Error(), info)) + } + a.server.fileInfoMgr.DelFileInfo4(innerIdentifier, uploadsDir) + + klog.Infof("innerIdentifier:%s File uploaded successfully info:%+v", innerIdentifier, info) + // Return successful response + + finishData := []map[string]interface{}{ + { + "name": resumableInfo.ResumableFilename, + "id": uid.MakeUid(info.FullPath), + "size": info.FileSize, + }, + } + return c.JSON(finishData) + //return c.Status(fiber.StatusOK).JSON( + // models.NewResponse(0, "File uploaded successfully", info)) + } + + klog.Infof("innerIdentifier:%s File Continue uploading info:%+v", innerIdentifier, info) + + //return c.Status(fiber.StatusOK).JSON( + // models.NewResponse(0, "Continue uploading", info)) + return c.JSON(responseData) +} diff --git a/cmd/upload/app/os_system.go b/cmd/upload/app/os_system.go new file mode 100644 index 0000000..65fdb19 --- /dev/null +++ b/cmd/upload/app/os_system.go @@ -0,0 +1,128 @@ +package app + +import ( + "context" + "errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "strings" + "sync" +) + +var PVCs *PVCCache = nil + +func minWithNegativeOne(a, b int, aName, bName string) (int, string) { + if a == -1 && b == -1 { + return -1, "" + } + + if a == -1 { + return b, bName + } + if b == -1 { + return a, aName + } + + if a < b { + return a, aName + } else { + return b, bName + } +} + +func rewriteUrl(path string, pvc string, prefix string) string { + if prefix == "" { + homeIndex := strings.Index(path, "/Home") + applicationIndex := strings.Index(path, "/Application") + splitIndex, splitName := minWithNegativeOne(homeIndex, applicationIndex, "/Home", "/Application") + if splitIndex != -1 { + firstHalf := path[:splitIndex] + secondHalf := path[splitIndex:] + klog.Info("firstHalf=", firstHalf) + klog.Info("secondHalf=", secondHalf) + + if strings.HasSuffix(firstHalf, pvc) { + return path + } + if splitName == "/Home" { + return firstHalf + "/" + pvc + secondHalf + } else { + secondHalf = strings.TrimPrefix(path[splitIndex:], splitName) + return firstHalf + "/" + pvc + "/Data" + secondHalf + } + } + } else { + pathSuffix := strings.TrimPrefix(path, prefix) + if strings.HasPrefix(pathSuffix, "/"+pvc) { + return path + } + return prefix + "/" + pvc + pathSuffix + } + return path +} + +func GetAnnotation(ctx context.Context, client *kubernetes.Clientset, key string, bflName string) (string, error) { + if bflName == "" { + klog.Error("get Annotation error, bfl-name is empty") + return "", errors.New("bfl-name is emtpty") + } + + namespace := "user-space-" + bflName + + bfl, err := client.AppsV1().StatefulSets(namespace).Get(ctx, "bfl", metav1.GetOptions{}) + if err != nil { + klog.Error("find user's bfl error, ", err, ", ", namespace) + return "", err + } + + klog.Infof("bfl.Annotations: %+v", bfl.Annotations) + return bfl.Annotations[key], nil +} + +type PVCCache struct { + server *Server + userPvcMap map[string]string + cachePvcMap map[string]string + mu sync.Mutex +} + +func NewPVCCache(server *Server) *PVCCache { + return &PVCCache{ + server: server, + userPvcMap: make(map[string]string), + cachePvcMap: make(map[string]string), + } +} + +func (p *PVCCache) getUserPVCOrCache(bflName string) (string, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if val, ok := p.userPvcMap[bflName]; ok { + return val, nil + } + + userPvc, err := GetAnnotation(p.server.context, p.server.k8sClient, "userspace_pvc", bflName) + if err != nil { + return "", err + } + p.userPvcMap[bflName] = userPvc + return userPvc, nil +} + +func (p *PVCCache) getCachePVCOrCache(bflName string) (string, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if val, ok := p.cachePvcMap[bflName]; ok { + return val, nil + } + + cachePvc, err := GetAnnotation(p.server.context, p.server.k8sClient, "appcache_pvc", bflName) + if err != nil { + return "", err + } + p.cachePvcMap[bflName] = cachePvc + return cachePvc, nil +} diff --git a/cmd/upload/app/server.go b/cmd/upload/app/server.go index 3c656d1..272de87 100644 --- a/cmd/upload/app/server.go +++ b/cmd/upload/app/server.go @@ -2,12 +2,17 @@ package app import ( "bytetrade.io/web3os/tapr/pkg/constants" + "bytetrade.io/web3os/tapr/pkg/signals" "bytetrade.io/web3os/tapr/pkg/upload/fileutils" + + "context" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "math" "os" + ctrl "sigs.k8s.io/controller-runtime" "strconv" "strings" ) @@ -25,6 +30,8 @@ type Server struct { supportedFileTypes map[string]bool allowAllFileType bool limitedSize int64 + context context.Context + k8sClient *kubernetes.Clientset } func (server *Server) Init() error { @@ -36,7 +43,7 @@ func (server *Server) Init() error { server.app.Use(cors.New(cors.Config{ AllowOrigins: "*", AllowMethods: "GET,POST,HEAD,PUT,DELETE,PATCH", - AllowHeaders: "Origin, Content-Type, Accept, Content-Length, Upload-Offset, Upload-Metadata, Upload-Length, X-Authorization, x-authorization", + AllowHeaders: "Origin, Content-Type, Accept, Content-Length, Upload-Offset, Upload-Metadata, Upload-Length, X-Authorization, x-authorization, Content-Disposition, Content-Range, Referer, User-Agent", })) server.controller = newController(server) server.fileInfoMgr = fileutils.NewFileInfoMgr() @@ -44,6 +51,15 @@ func (server *Server) Init() error { fileutils.Init() + ctx, cancel := context.WithCancel(context.Background()) + _ = signals.SetupSignalHandler(ctx, cancel) + server.context = ctx + + config := ctrl.GetConfigOrDie() + server.k8sClient = kubernetes.NewForConfigOrDie(config) + + PVCs = NewPVCCache(server) + return nil } @@ -59,6 +75,10 @@ func (server *Server) ServerRun() { server.app.Patch("/upload/:uid", server.controller.PatchFile) //server.app.Get("/upload/info/:uid?", server.controller.Info) + server.app.Get("/upload/upload-link", server.controller.UploadLink) + server.app.Get("/upload/file-uploaded-bytes", server.controller.UploadedBytes) + server.app.Post("/upload/upload-link/:uid", server.controller.UploadChunks) + klog.Info("upload server listening on 40030") klog.Fatal(server.app.Listen(":40030")) } @@ -104,7 +124,7 @@ func (s *Server) checkType(filetype string) bool { } func (s *Server) checkSize(filesize int64) bool { - if filesize <= 0 { + if filesize < 0 { return false } diff --git a/pkg/upload/fileutils/checkfolder.go b/pkg/upload/fileutils/checkfolder.go index 2ebe1db..a647d58 100644 --- a/pkg/upload/fileutils/checkfolder.go +++ b/pkg/upload/fileutils/checkfolder.go @@ -15,7 +15,10 @@ const ( ) func Init() { - cronDeleteOldfolders(UploadsDir) + //cronDeleteOldfolders(UploadsDir) + for _, uploadsDir := range UploadsDirs4 { + cronDeleteOldfolders(uploadsDir) + } //checkTempDir(UploadsDir) } diff --git a/pkg/upload/fileutils/constants.go b/pkg/upload/fileutils/constants.go index 1388e69..e16f813 100644 --- a/pkg/upload/fileutils/constants.go +++ b/pkg/upload/fileutils/constants.go @@ -4,3 +4,5 @@ const ( DefaultMaxFileSize = 4 * 1024 * 1024 * 1024 // 4G UploadsDir = "./uploadstemp" ) + +var UploadsDirs4 map[string]string = map[string]string{} diff --git a/pkg/upload/fileutils/file.go b/pkg/upload/fileutils/file.go index 536e3e3..fb7e7f9 100644 --- a/pkg/upload/fileutils/file.go +++ b/pkg/upload/fileutils/file.go @@ -159,6 +159,10 @@ func GetTempFilePathById(id string) string { return filepath.Join(UploadsDir, id) } +func GetTempFilePathById4(id string, uploadsDir string) string { + return filepath.Join(uploadsDir, id) +} + func SaveFile(fileHeader *multipart.FileHeader, filePath string) (int64, error) { // Open source file file, err := fileHeader.Open() @@ -190,6 +194,42 @@ func SaveFile(fileHeader *multipart.FileHeader, filePath string) (int64, error) return fileSize, nil } +func ParseContentRange(ranges string) (int64, bool) { + start := strings.Index(ranges, "bytes") + end := strings.Index(ranges, "-") + slash := strings.Index(ranges, "/") + + if start < 0 || end < 0 || slash < 0 { + return -1, false + } + + startStr := strings.TrimLeft(ranges[start+len("bytes"):end], " ") + firstByte, err := strconv.ParseInt(startStr, 10, 64) + if err != nil { + return -1, false + } + + lastByte, err := strconv.ParseInt(ranges[end+1:slash], 10, 64) + if err != nil { + return -1, false + } + + fileSize, err := strconv.ParseInt(ranges[slash+1:], 10, 64) + if err != nil { + return -1, false + } + + if firstByte > lastByte || lastByte >= fileSize { + return -1, false + } + + //fsm.rstart = firstByte + //fsm.rend = lastByte + //fsm.fsize = fileSize + + return firstByte, true +} + func UpdateFileInfo(fileInfo models.FileInfo) error { // Construct file information path infoPath := filepath.Join(UploadsDir, fileInfo.ID+".info") @@ -209,6 +249,25 @@ func UpdateFileInfo(fileInfo models.FileInfo) error { return nil } +func UpdateFileInfo4(fileInfo models.FileInfo, uploadsDir string) error { + // Construct file information path + infoPath := filepath.Join(uploadsDir, fileInfo.ID+".info") + + // Convert file information to JSON string + infoJSON, err := json.Marshal(fileInfo) + if err != nil { + return err + } + + // Write file information + err = ioutil.WriteFile(infoPath, infoJSON, 0644) + if err != nil { + return err + } + + return nil +} + func RemoveTempFileAndInfoFile(uid string) { removeTempFile(uid) removeInfoFile(uid) @@ -249,3 +308,44 @@ func removeInfoFile(uid string) { klog.Warningf("remove %s err:%v", infoPath, err) } } + +func RemoveTempFileAndInfoFile4(uid string, uploadsDir string) { + removeTempFile4(uid, uploadsDir) + removeInfoFile4(uid, uploadsDir) +} + +func removeTempFile4(uid string, uploadsDir string) { + filePath := filepath.Join(uploadsDir, uid) + err := os.Remove(filePath) + if err != nil { + klog.Warningf("remove %s err:%v", filePath, err) + } + +} + +func MoveFileByInfo4(fileInfo models.FileInfo, uploadsDir string) error { + // Construct file path + filePath := filepath.Join(uploadsDir, fileInfo.ID) + + // Construct target path + destinationPath := fileInfo.FullPath + + // Move files to target path + err := MoveFile(filePath, destinationPath) + if err != nil { + return err + } + + // Remove info file + removeInfoFile4(fileInfo.ID, uploadsDir) + + return nil +} + +func removeInfoFile4(uid string, uploadsDir string) { + infoPath := filepath.Join(uploadsDir, uid+".info") + err := os.Remove(infoPath) + if err != nil { + klog.Warningf("remove %s err:%v", infoPath, err) + } +} diff --git a/pkg/upload/fileutils/fileinfos.go b/pkg/upload/fileutils/fileinfos.go index dca6a6b..7053510 100644 --- a/pkg/upload/fileutils/fileinfos.go +++ b/pkg/upload/fileutils/fileinfos.go @@ -49,6 +49,9 @@ func (m *FileInfoMgr) DeleteOldInfos() { klog.Infof("id %s expire del in map, stack:%s", key, debug.Stack()) InfoSyncMap.Delete(key) RemoveTempFileAndInfoFile(key.(string)) + for _, uploadsDir := range UploadsDirs4 { + RemoveTempFileAndInfoFile4(key.(string), uploadsDir) + } } return true }) @@ -92,6 +95,11 @@ func (m *FileInfoMgr) DelFileInfo(id string) { RemoveTempFileAndInfoFile(id) } +func (m *FileInfoMgr) DelFileInfo4(id, uploadsDir string) { + InfoSyncMap.Delete(id) + RemoveTempFileAndInfoFile4(id, uploadsDir) +} + func (m *FileInfoMgr) ExistFileInfo(id string) (bool, models.FileInfo) { value, ok := InfoSyncMap.Load(id) if ok { @@ -104,3 +112,7 @@ func (m *FileInfoMgr) ExistFileInfo(id string) (bool, models.FileInfo) { func (m *FileInfoMgr) CheckTempFile(id string) (bool, int64) { return utils.PathExistsAndGetLen(filepath.Join(UploadsDir, id)) } + +func (m *FileInfoMgr) CheckTempFile4(id, uploadsDir string) (bool, int64) { + return utils.PathExistsAndGetLen(filepath.Join(uploadsDir, id)) +} diff --git a/pkg/upload/models/upload.go b/pkg/upload/models/upload.go index 6cbd73c..bcdcd33 100644 --- a/pkg/upload/models/upload.go +++ b/pkg/upload/models/upload.go @@ -39,3 +39,17 @@ type FilePatchInfo struct { File *multipart.FileHeader `json:"file" form:"file" binding:"required"` UploadOffset int64 `json:"upload_offset" form:"upload_offset" binding:"required"` } + +type ResumableInfo struct { + ResumableChunkNumber int `json:"resumableChunkNumber" form:"resumableChunkNumber"` + ResumableChunkSize int64 `json:"resumableChunkSize" form:"resumableChunkSize"` + ResumableCurrentChunkSize int64 `json:"resumableCurrentChunkSize" form:"resumableCurrentChunkSize"` + ResumableTotalSize int64 `json:"resumableTotalSize" form:"resumableTotalSize"` + ResumableType string `json:"resumableType" form:"resumableType"` + ResumableIdentifier string `json:"resumableIdentifier" form:"resumableIdentifier"` + ResumableFilename string `json:"resumableFilename" form:"resumableFilename"` + ResumableRelativePath string `json:"resumableRelativePath" form:"resumableRelativePath"` + ResumableTotalChunks int `json:"resumableTotalChunks" form:"resumableTotalChunks"` + ParentDir string `json:"parent_dir" form:"parent_dir"` + File *multipart.FileHeader `json:"file" form:"file" binding:"required"` +} diff --git a/pkg/upload/uid/uid.go b/pkg/upload/uid/uid.go index 954fb8b..9cb432b 100644 --- a/pkg/upload/uid/uid.go +++ b/pkg/upload/uid/uid.go @@ -4,8 +4,10 @@ import ( "crypto/md5" "crypto/rand" "encoding/hex" + "fmt" "io" "k8s.io/klog/v2" + "time" ) // uid returns a unique id. These ids consist of 128 bits from a @@ -30,3 +32,9 @@ func MakeUid(filePath string) string { klog.Infof("filePath:%s, uid:%s", filePath, md5String) return md5String } + +func GenerateUniqueIdentifier(relativePath string) string { + h := md5.New() + io.WriteString(h, relativePath+time.Now().String()) + return fmt.Sprintf("%x%s", h.Sum(nil), relativePath) +}