Skip to content

Commit

Permalink
Merge pull request #28 from beclab/uploader4
Browse files Browse the repository at this point in the history
feat: uploader4.0 - same as seafile with resumable.js
  • Loading branch information
eball authored Oct 31, 2024
2 parents d8ff4fb + 6b2e2d0 commit c2d573d
Show file tree
Hide file tree
Showing 9 changed files with 755 additions and 3 deletions.
465 changes: 465 additions & 0 deletions cmd/upload/app/handler4.go

Large diffs are not rendered by default.

128 changes: 128 additions & 0 deletions cmd/upload/app/os_system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package app

import (
"context"
"errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"strings"
"sync"
)

var PVCs *PVCCache = nil

func minWithNegativeOne(a, b int, aName, bName string) (int, string) {
if a == -1 && b == -1 {
return -1, ""
}

if a == -1 {
return b, bName
}
if b == -1 {
return a, aName
}

if a < b {
return a, aName
} else {
return b, bName
}
}

func rewriteUrl(path string, pvc string, prefix string) string {
if prefix == "" {
homeIndex := strings.Index(path, "/Home")
applicationIndex := strings.Index(path, "/Application")
splitIndex, splitName := minWithNegativeOne(homeIndex, applicationIndex, "/Home", "/Application")
if splitIndex != -1 {
firstHalf := path[:splitIndex]
secondHalf := path[splitIndex:]
klog.Info("firstHalf=", firstHalf)
klog.Info("secondHalf=", secondHalf)

if strings.HasSuffix(firstHalf, pvc) {
return path
}
if splitName == "/Home" {
return firstHalf + "/" + pvc + secondHalf
} else {
secondHalf = strings.TrimPrefix(path[splitIndex:], splitName)
return firstHalf + "/" + pvc + "/Data" + secondHalf
}
}
} else {
pathSuffix := strings.TrimPrefix(path, prefix)
if strings.HasPrefix(pathSuffix, "/"+pvc) {
return path
}
return prefix + "/" + pvc + pathSuffix
}
return path
}

func GetAnnotation(ctx context.Context, client *kubernetes.Clientset, key string, bflName string) (string, error) {
if bflName == "" {
klog.Error("get Annotation error, bfl-name is empty")
return "", errors.New("bfl-name is emtpty")
}

namespace := "user-space-" + bflName

bfl, err := client.AppsV1().StatefulSets(namespace).Get(ctx, "bfl", metav1.GetOptions{})
if err != nil {
klog.Error("find user's bfl error, ", err, ", ", namespace)
return "", err
}

klog.Infof("bfl.Annotations: %+v", bfl.Annotations)
return bfl.Annotations[key], nil
}

type PVCCache struct {
server *Server
userPvcMap map[string]string
cachePvcMap map[string]string
mu sync.Mutex
}

func NewPVCCache(server *Server) *PVCCache {
return &PVCCache{
server: server,
userPvcMap: make(map[string]string),
cachePvcMap: make(map[string]string),
}
}

func (p *PVCCache) getUserPVCOrCache(bflName string) (string, error) {
p.mu.Lock()
defer p.mu.Unlock()

if val, ok := p.userPvcMap[bflName]; ok {
return val, nil
}

userPvc, err := GetAnnotation(p.server.context, p.server.k8sClient, "userspace_pvc", bflName)
if err != nil {
return "", err
}
p.userPvcMap[bflName] = userPvc
return userPvc, nil
}

func (p *PVCCache) getCachePVCOrCache(bflName string) (string, error) {
p.mu.Lock()
defer p.mu.Unlock()

if val, ok := p.cachePvcMap[bflName]; ok {
return val, nil
}

cachePvc, err := GetAnnotation(p.server.context, p.server.k8sClient, "appcache_pvc", bflName)
if err != nil {
return "", err
}
p.cachePvcMap[bflName] = cachePvc
return cachePvc, nil
}
24 changes: 22 additions & 2 deletions cmd/upload/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package app

import (
"bytetrade.io/web3os/tapr/pkg/constants"
"bytetrade.io/web3os/tapr/pkg/signals"
"bytetrade.io/web3os/tapr/pkg/upload/fileutils"

"context"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"math"
"os"
ctrl "sigs.k8s.io/controller-runtime"
"strconv"
"strings"
)
Expand All @@ -25,6 +30,8 @@ type Server struct {
supportedFileTypes map[string]bool
allowAllFileType bool
limitedSize int64
context context.Context
k8sClient *kubernetes.Clientset
}

func (server *Server) Init() error {
Expand All @@ -36,14 +43,23 @@ func (server *Server) Init() error {
server.app.Use(cors.New(cors.Config{
AllowOrigins: "*",
AllowMethods: "GET,POST,HEAD,PUT,DELETE,PATCH",
AllowHeaders: "Origin, Content-Type, Accept, Content-Length, Upload-Offset, Upload-Metadata, Upload-Length, X-Authorization, x-authorization",
AllowHeaders: "Origin, Content-Type, Accept, Content-Length, Upload-Offset, Upload-Metadata, Upload-Length, X-Authorization, x-authorization, Content-Disposition, Content-Range, Referer, User-Agent",
}))
server.controller = newController(server)
server.fileInfoMgr = fileutils.NewFileInfoMgr()
server.fileInfoMgr.Init()

fileutils.Init()

ctx, cancel := context.WithCancel(context.Background())
_ = signals.SetupSignalHandler(ctx, cancel)
server.context = ctx

config := ctrl.GetConfigOrDie()
server.k8sClient = kubernetes.NewForConfigOrDie(config)

PVCs = NewPVCCache(server)

return nil
}

Expand All @@ -59,6 +75,10 @@ func (server *Server) ServerRun() {
server.app.Patch("/upload/:uid", server.controller.PatchFile)
//server.app.Get("/upload/info/:uid?", server.controller.Info)

server.app.Get("/upload/upload-link", server.controller.UploadLink)
server.app.Get("/upload/file-uploaded-bytes", server.controller.UploadedBytes)
server.app.Post("/upload/upload-link/:uid", server.controller.UploadChunks)

klog.Info("upload server listening on 40030")
klog.Fatal(server.app.Listen(":40030"))
}
Expand Down Expand Up @@ -104,7 +124,7 @@ func (s *Server) checkType(filetype string) bool {
}

func (s *Server) checkSize(filesize int64) bool {
if filesize <= 0 {
if filesize < 0 {
return false
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/upload/fileutils/checkfolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ const (
)

func Init() {
cronDeleteOldfolders(UploadsDir)
//cronDeleteOldfolders(UploadsDir)
for _, uploadsDir := range UploadsDirs4 {
cronDeleteOldfolders(uploadsDir)
}
//checkTempDir(UploadsDir)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/upload/fileutils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ const (
DefaultMaxFileSize = 4 * 1024 * 1024 * 1024 // 4G
UploadsDir = "./uploadstemp"
)

var UploadsDirs4 map[string]string = map[string]string{}
100 changes: 100 additions & 0 deletions pkg/upload/fileutils/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ func GetTempFilePathById(id string) string {
return filepath.Join(UploadsDir, id)
}

func GetTempFilePathById4(id string, uploadsDir string) string {
return filepath.Join(uploadsDir, id)
}

func SaveFile(fileHeader *multipart.FileHeader, filePath string) (int64, error) {
// Open source file
file, err := fileHeader.Open()
Expand Down Expand Up @@ -190,6 +194,42 @@ func SaveFile(fileHeader *multipart.FileHeader, filePath string) (int64, error)
return fileSize, nil
}

func ParseContentRange(ranges string) (int64, bool) {
start := strings.Index(ranges, "bytes")
end := strings.Index(ranges, "-")
slash := strings.Index(ranges, "/")

if start < 0 || end < 0 || slash < 0 {
return -1, false
}

startStr := strings.TrimLeft(ranges[start+len("bytes"):end], " ")
firstByte, err := strconv.ParseInt(startStr, 10, 64)
if err != nil {
return -1, false
}

lastByte, err := strconv.ParseInt(ranges[end+1:slash], 10, 64)
if err != nil {
return -1, false
}

fileSize, err := strconv.ParseInt(ranges[slash+1:], 10, 64)
if err != nil {
return -1, false
}

if firstByte > lastByte || lastByte >= fileSize {
return -1, false
}

//fsm.rstart = firstByte
//fsm.rend = lastByte
//fsm.fsize = fileSize

return firstByte, true
}

func UpdateFileInfo(fileInfo models.FileInfo) error {
// Construct file information path
infoPath := filepath.Join(UploadsDir, fileInfo.ID+".info")
Expand All @@ -209,6 +249,25 @@ func UpdateFileInfo(fileInfo models.FileInfo) error {
return nil
}

func UpdateFileInfo4(fileInfo models.FileInfo, uploadsDir string) error {
// Construct file information path
infoPath := filepath.Join(uploadsDir, fileInfo.ID+".info")

// Convert file information to JSON string
infoJSON, err := json.Marshal(fileInfo)
if err != nil {
return err
}

// Write file information
err = ioutil.WriteFile(infoPath, infoJSON, 0644)
if err != nil {
return err
}

return nil
}

func RemoveTempFileAndInfoFile(uid string) {
removeTempFile(uid)
removeInfoFile(uid)
Expand Down Expand Up @@ -249,3 +308,44 @@ func removeInfoFile(uid string) {
klog.Warningf("remove %s err:%v", infoPath, err)
}
}

func RemoveTempFileAndInfoFile4(uid string, uploadsDir string) {
removeTempFile4(uid, uploadsDir)
removeInfoFile4(uid, uploadsDir)
}

func removeTempFile4(uid string, uploadsDir string) {
filePath := filepath.Join(uploadsDir, uid)
err := os.Remove(filePath)
if err != nil {
klog.Warningf("remove %s err:%v", filePath, err)
}

}

func MoveFileByInfo4(fileInfo models.FileInfo, uploadsDir string) error {
// Construct file path
filePath := filepath.Join(uploadsDir, fileInfo.ID)

// Construct target path
destinationPath := fileInfo.FullPath

// Move files to target path
err := MoveFile(filePath, destinationPath)
if err != nil {
return err
}

// Remove info file
removeInfoFile4(fileInfo.ID, uploadsDir)

return nil
}

func removeInfoFile4(uid string, uploadsDir string) {
infoPath := filepath.Join(uploadsDir, uid+".info")
err := os.Remove(infoPath)
if err != nil {
klog.Warningf("remove %s err:%v", infoPath, err)
}
}
12 changes: 12 additions & 0 deletions pkg/upload/fileutils/fileinfos.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (m *FileInfoMgr) DeleteOldInfos() {
klog.Infof("id %s expire del in map, stack:%s", key, debug.Stack())
InfoSyncMap.Delete(key)
RemoveTempFileAndInfoFile(key.(string))
for _, uploadsDir := range UploadsDirs4 {
RemoveTempFileAndInfoFile4(key.(string), uploadsDir)
}
}
return true
})
Expand Down Expand Up @@ -92,6 +95,11 @@ func (m *FileInfoMgr) DelFileInfo(id string) {
RemoveTempFileAndInfoFile(id)
}

func (m *FileInfoMgr) DelFileInfo4(id, uploadsDir string) {
InfoSyncMap.Delete(id)
RemoveTempFileAndInfoFile4(id, uploadsDir)
}

func (m *FileInfoMgr) ExistFileInfo(id string) (bool, models.FileInfo) {
value, ok := InfoSyncMap.Load(id)
if ok {
Expand All @@ -104,3 +112,7 @@ func (m *FileInfoMgr) ExistFileInfo(id string) (bool, models.FileInfo) {
func (m *FileInfoMgr) CheckTempFile(id string) (bool, int64) {
return utils.PathExistsAndGetLen(filepath.Join(UploadsDir, id))
}

func (m *FileInfoMgr) CheckTempFile4(id, uploadsDir string) (bool, int64) {
return utils.PathExistsAndGetLen(filepath.Join(uploadsDir, id))
}
14 changes: 14 additions & 0 deletions pkg/upload/models/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,17 @@ type FilePatchInfo struct {
File *multipart.FileHeader `json:"file" form:"file" binding:"required"`
UploadOffset int64 `json:"upload_offset" form:"upload_offset" binding:"required"`
}

type ResumableInfo struct {
ResumableChunkNumber int `json:"resumableChunkNumber" form:"resumableChunkNumber"`
ResumableChunkSize int64 `json:"resumableChunkSize" form:"resumableChunkSize"`
ResumableCurrentChunkSize int64 `json:"resumableCurrentChunkSize" form:"resumableCurrentChunkSize"`
ResumableTotalSize int64 `json:"resumableTotalSize" form:"resumableTotalSize"`
ResumableType string `json:"resumableType" form:"resumableType"`
ResumableIdentifier string `json:"resumableIdentifier" form:"resumableIdentifier"`
ResumableFilename string `json:"resumableFilename" form:"resumableFilename"`
ResumableRelativePath string `json:"resumableRelativePath" form:"resumableRelativePath"`
ResumableTotalChunks int `json:"resumableTotalChunks" form:"resumableTotalChunks"`
ParentDir string `json:"parent_dir" form:"parent_dir"`
File *multipart.FileHeader `json:"file" form:"file" binding:"required"`
}
Loading

0 comments on commit c2d573d

Please sign in to comment.