Skip to content

Commit

Permalink
feat: uploader from userspace to os-system
Browse files Browse the repository at this point in the history
  • Loading branch information
lovehunter9 committed Aug 16, 2024
1 parent f025ed1 commit ec54408
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 91 deletions.
170 changes: 106 additions & 64 deletions cmd/upload/app/handler4.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,106 @@ import (
"time"
)

// UploadLink 处理上传链接的 GET 请求
const (
CacheRequestPrefix = "/AppData"
CachePathPrefix = "/appcache"
)

func getPVC(c *fiber.Ctx) (string, string, string, string, error) {
bflName := c.Get("X-Bfl-User")
klog.Info("BFL_NAME: ", bflName)

userPvc, err := PVCs.getUserPVCOrCache(bflName) // appdata.GetAnnotation(p.mainCtx, p.k8sClient, "userspace_pvc", bflName)
if err != nil {
klog.Info(err)
return bflName, "", "", "", err
} else {
klog.Info("user-space pvc: ", userPvc)
}

cachePvc, err := PVCs.getCachePVCOrCache(bflName) // appdata.GetAnnotation(p.mainCtx, p.k8sClient, "appcache_pvc", bflName)
if err != nil {
klog.Info(err)
return bflName, "", "", "", err
} else {
klog.Info("appcache pvc: ", cachePvc)
}

var uploadsDir = ""
if val, ok := fileutils.UploadsDirs4[bflName]; ok {
uploadsDir = val
} else {
uploadsDir = CachePathPrefix + "/" + cachePvc + "/files/uploadstemp"
fileutils.UploadsDirs4[bflName] = uploadsDir
}

return bflName, userPvc, cachePvc, uploadsDir, nil
}

func (a *appController) UploadLink(c *fiber.Ctx) error {
// 从查询参数中获取 path
_, userPvc, cachePvc, uploadsDir, err := getPVC(c)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "bfl header missing or invalid", nil))
}

path := c.Query("p", "")
if path == "" {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "missing path query parameter", nil))
}

// 检查上传目录是否存在,如果不存在则创建
if !utils.PathExists(fileutils.UploadsDir) {
if err := os.MkdirAll(fileutils.UploadsDir, os.ModePerm); err != nil {
if strings.HasPrefix(path, CacheRequestPrefix) {
path = CachePathPrefix + strings.TrimPrefix(path, CacheRequestPrefix)
path = rewriteUrl(path, cachePvc, CachePathPrefix)
} else {
path = rewriteUrl(path, userPvc, "")
}

if !utils.PathExists(uploadsDir) {
if err := os.MkdirAll(uploadsDir, os.ModePerm); err != nil {
klog.Warning("err:", err)
// 如果创建目录失败,返回内部服务器错误
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, "failed to create folder", nil))
}
}
klog.Infof("c:%+v", c) // 记录当前请求的上下文信息
klog.Infof("c:%+v", c)

// 检查文件的目录路径是否存在,不存在则创建
if !utils.CheckDirExist(path) {
if err := os.MkdirAll(path, os.ModePerm); err != nil {
klog.Warning("err:", err)
// 如果创建目录失败,返回内部服务器错误
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, "failed to create folder", nil))
}
}

// 生成唯一的上传ID
uploadID := uid.MakeUid(path)

// 拼接响应字符串
uploadLink := fmt.Sprintf("/upload/upload-link/%s", uploadID)

// 返回生成的链接
return c.SendString(uploadLink)
}

func (a *appController) UploadedBytes(c *fiber.Ctx) error {
_, userPvc, cachePvc, uploadsDir, err := getPVC(c)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "bfl header missing or invalid", nil))
}

parentDir := c.Query("parent_dir", "")
if parentDir == "" {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "missing parent_dir query parameter", nil))
}

if strings.HasPrefix(parentDir, CacheRequestPrefix) {
parentDir = CachePathPrefix + strings.TrimPrefix(parentDir, CacheRequestPrefix)
parentDir = rewriteUrl(parentDir, cachePvc, CachePathPrefix)
} else {
parentDir = rewriteUrl(parentDir, userPvc, "")
}

if !utils.CheckDirExist(parentDir) {
klog.Warningf("Storage path %s is not exist or is not a dir", parentDir)
return c.Status(fiber.StatusBadRequest).JSON(
Expand All @@ -76,7 +129,7 @@ func (a *appController) UploadedBytes(c *fiber.Ctx) error {
responseData := make(map[string]interface{})
responseData["uploadedBytes"] = 0

if !utils.PathExists(fileutils.UploadsDir) {
if !utils.PathExists(uploadsDir) {
return c.JSON(responseData)
}
klog.Infof("c:%+v", c)
Expand All @@ -99,7 +152,7 @@ func (a *appController) UploadedBytes(c *fiber.Ctx) error {
//resumableIdentifier := uid.GenerateUniqueIdentifier(fileName)
innerIdentifier := uid.MakeUid(fullPath)
exist, info := a.server.fileInfoMgr.ExistFileInfo(innerIdentifier)
fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile(innerIdentifier)
fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile4(innerIdentifier, uploadsDir)
if exist {
if fileExist {
if info.Offset != fileLen {
Expand All @@ -111,20 +164,26 @@ func (a *appController) UploadedBytes(c *fiber.Ctx) error {
} else if info.Offset == 0 {
klog.Warningf("innerIdentifier:%s, info.Offset:%d", innerIdentifier, info.Offset)
} else {
a.server.fileInfoMgr.DelFileInfo(innerIdentifier)
a.server.fileInfoMgr.DelFileInfo4(innerIdentifier, uploadsDir)
}
}
return c.JSON(responseData)
}

func (a *appController) UploadChunks(c *fiber.Ctx) error {
_, userPvc, cachePvc, uploadsDir, err := getPVC(c)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "bfl header missing or invalid", nil))
}

responseData := make(map[string]interface{})
responseData["success"] = true

uploadID := c.Params("uid")

if !utils.PathExists(fileutils.UploadsDir) {
if err := os.MkdirAll(fileutils.UploadsDir, os.ModePerm); err != nil {
if !utils.PathExists(uploadsDir) {
if err := os.MkdirAll(uploadsDir, os.ModePerm); err != nil {
klog.Warningf("uploadID:%s, err:%v", uploadID, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, "failed to create folder", nil))
Expand All @@ -134,19 +193,24 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
klog.Infof("uploadID:%s, c:%+v", uploadID, c)

var resumableInfo models.ResumableInfo
if err := c.BodyParser(&resumableInfo); err != nil {
if err = c.BodyParser(&resumableInfo); err != nil {
klog.Warningf("uploadID:%s, err:%v", uploadID, err)
//todo check info valid
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "param invalid", nil))
}

if uploadID != uid.MakeUid(resumableInfo.ParentDir) {
parentDir := resumableInfo.ParentDir
if strings.HasPrefix(parentDir, CacheRequestPrefix) {
parentDir = CachePathPrefix + strings.TrimPrefix(parentDir, CacheRequestPrefix)
parentDir = rewriteUrl(parentDir, cachePvc, CachePathPrefix)
} else {
parentDir = rewriteUrl(parentDir, userPvc, "")
}
if uploadID != uid.MakeUid(parentDir) {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "invalid upload link", nil))
}

var err error
resumableInfo.File, err = c.FormFile("file")
if err != nil || resumableInfo.File == nil {
klog.Warningf("uploadID:%s, Failed to parse file: %v\n", uploadID, err)
Expand All @@ -157,7 +221,7 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
klog.Infof("uploadID:%s, patchInfo:%+v", uploadID, resumableInfo)

// Get file information based on upload ID
fullPath := filepath.Join(resumableInfo.ParentDir, resumableInfo.ResumableRelativePath)
fullPath := filepath.Join(parentDir, resumableInfo.ResumableRelativePath)
//resumableIdentifier := resumableInfo.ResumableIdentifier
innerIdentifier := uid.MakeUid(fullPath)
exist, info := a.server.fileInfoMgr.ExistFileInfo(innerIdentifier)
Expand All @@ -173,14 +237,14 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {

if !exist || innerIdentifier != info.ID {
//clear temp file and reset info
fileutils.RemoveTempFileAndInfoFile(innerIdentifier)
fileutils.RemoveTempFileAndInfoFile4(innerIdentifier, uploadsDir)
if info.Offset != 0 {
info.Offset = 0
a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info)
}

//do creation when the first chunk
if !utils.CheckDirExist(resumableInfo.ParentDir) {
if !utils.CheckDirExist(parentDir) {
klog.Warningf("Parent dir %s is not exist or is not a dir", resumableInfo.ParentDir)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "Parent dir is not exist or is not a dir", nil))
Expand Down Expand Up @@ -228,7 +292,7 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
//Generate unique Upload-ID
//uploadID := uid.MakeUid(uploadInfo.FullPath)
oExist, oInfo := a.server.fileInfoMgr.ExistFileInfo(innerIdentifier)
oFileExist, oFileLen := a.server.fileInfoMgr.CheckTempFile(innerIdentifier)
oFileExist, oFileLen := a.server.fileInfoMgr.CheckTempFile4(innerIdentifier, uploadsDir)
if oExist {
if oFileExist {
if oInfo.Offset != oFileLen {
Expand All @@ -245,7 +309,7 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
// models.NewResponse(0, "success", info))
return c.JSON(responseData)
} else {
a.server.fileInfoMgr.DelFileInfo(innerIdentifier)
a.server.fileInfoMgr.DelFileInfo4(innerIdentifier, uploadsDir)
}
}

Expand All @@ -256,7 +320,7 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
FileRelativePath: resumableInfo.ResumableRelativePath,
FileType: resumableInfo.ResumableType,
FileSize: resumableInfo.ResumableTotalSize,
StoragePath: resumableInfo.ParentDir,
StoragePath: parentDir, //resumableInfo.ParentDir,
FullPath: fullPath,
},
}
Expand All @@ -279,16 +343,17 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
info = fileInfo
}

//fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile(innerIdentifier)
//if fileExist {
// klog.Infof("innerIdentifier %s temp file exist, info.Offset:%d, fileLen:%d", uploadID, info.Offset, fileLen)
// if info.Offset != fileLen {
// info.Offset = fileLen
// a.server.fileInfoMgr.UpdateInfo(uploadID, info)
// }
//}
fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile4(innerIdentifier, uploadsDir)
if fileExist {
klog.Infof("innerIdentifier %s temp file exist, info.Offset:%d, fileLen:%d", innerIdentifier, info.Offset, fileLen)
if info.Offset != fileLen {
info.Offset = fileLen
a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info)
}
}

// Check if file size and offset match
// not functional when resumable.js
//if patchInfo.UploadOffset != info.Offset {
// klog.Warningf("uploadID %s, patchInfo.UploadOffset:%d diff from info.Offset:%d, info:%v", uploadID, patchInfo.UploadOffset, info.Offset, info)
// return c.Status(fiber.StatusBadRequest).JSON(
Expand All @@ -306,9 +371,7 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
models.NewResponse(1, "Unsupported file size", nil))
}

// 获取请求头中的Content-Range字段
ranges := c.Get("Content-Range")
// 如果存在Content-Range,则解析并设置fsm的相关字段
var offset int64
var parsed bool
if ranges != "" {
Expand All @@ -319,31 +382,22 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
}
}

// 设置重试次数限制
const maxRetries = 100
for retry := 0; retry < maxRetries; retry++ {
// 假设这里有一个函数或方法来获取最新的info对象
// 这里我们直接使用传入的info对象,但在实际场景中可能需要更新它
// 例如:info = a.server.fileInfoMgr.GetInfo(innerIdentifier)

// 检查info.Offset是否与offset相等
if info.Offset == offset {
// 如果相等,则进行文件保存操作
fileSize, err := fileutils.SaveFile(fileHeader, fileutils.GetTempFilePathById(innerIdentifier))
fileSize, err := fileutils.SaveFile(fileHeader, fileutils.GetTempFilePathById4(innerIdentifier, uploadsDir))
if err != nil {
klog.Warningf("innerIdentifier:%s, info:%+v, err:%v", innerIdentifier, info, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, err.Error(), info))
}
info.Offset = fileSize // 更新info的Offset为文件大小
info.Offset = fileSize
a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info)
break
}

// 如果不相等,则等待一段时间
time.Sleep(500 * time.Millisecond) // 等待0.5秒
time.Sleep(500 * time.Millisecond)

// 可选:在这里可以添加日志记录或其他处理逻辑
klog.Infof("Waiting for info.Offset to match offset (%d != %d), retry %d/%d", info.Offset, offset, retry+1, maxRetries)

if retry < maxRetries-1 {
Expand All @@ -356,24 +410,12 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
continue
}

// 如果达到最大重试次数后仍然不相等,则返回错误
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, "Failed to match offset after multiple retries", info))
}

// Write the file contents to the file at the specified path
//fileSize, err := fileutils.SaveFile(fileHeader, fileutils.GetTempFilePathById(innerIdentifier))
//if err != nil {
// klog.Warningf("innerIdentifier:%s, info:%+v, err:%v", innerIdentifier, info, err)
// return c.Status(fiber.StatusInternalServerError).JSON(
// models.NewResponse(1, err.Error(), info))
//}

//info.Offset = fileSize
//a.server.fileInfoMgr.UpdateInfo(innerIdentifier, info)

// Update file information for debug
err = fileutils.UpdateFileInfo(info)
err = fileutils.UpdateFileInfo4(info, uploadsDir)
if err != nil {
klog.Warningf("innerIdentifier:%s, info:%+v, err:%v", innerIdentifier, info, err)
return c.Status(fiber.StatusInternalServerError).JSON(
Expand All @@ -383,13 +425,13 @@ func (a *appController) UploadChunks(c *fiber.Ctx) error {
// Check if the file has been written
if info.Offset == info.FileSize {
// Move the file to the specified upload path
err = fileutils.MoveFileByInfo(info)
err = fileutils.MoveFileByInfo4(info, uploadsDir)
if err != nil {
klog.Warningf("innerIdentifier:%s, info:%+v, err:%v", innerIdentifier, info, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, err.Error(), info))
}
a.server.fileInfoMgr.DelFileInfo(innerIdentifier)
a.server.fileInfoMgr.DelFileInfo4(innerIdentifier, uploadsDir)

klog.Infof("innerIdentifier:%s File uploaded successfully info:%+v", innerIdentifier, info)
// Return successful response
Expand Down
Loading

0 comments on commit ec54408

Please sign in to comment.