Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:revise etag #38

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions objectservice/s3api/object_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"strings"
)

//PutObjectHandler Put ObjectHandler
//https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
// PutObjectHandler Put ObjectHandler
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
func (s3a *s3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {

// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
Expand Down Expand Up @@ -155,7 +155,7 @@ func (s3a *s3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// ----------
// This implementation of the GET operation retrieves object. To use GET,
// you must have READ access to the object.
//https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
func (s3a *s3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
bucket, object, err := getBucketAndObject(r)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (s3a *s3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}

// HeadObjectHandler - HEAD Object
//https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
// The HEAD operation retrieves metadata from an object without returning the object itself.
func (s3a *s3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s3a *s3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request

// DeleteObjectHandler - delete an object
// Delete objectAPIHandlers
//https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
func (s3a *s3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
bucket, object, err := getBucketAndObject(r)
Expand Down Expand Up @@ -519,7 +519,13 @@ func (s3a *s3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
metadata[key] = val
}
}
obj, err := s3a.store.StoreObject(ctx, dstBucket, dstObject, srcReader, srcObjInfo.Size, metadata)
hashReader, err := hash.NewReader(srcReader, srcObjInfo.Size, srcObjInfo.ETag, "", srcObjInfo.Size)
if err != nil {
log.Errorf("PutObjectHandler NewReader err:%v", err)
response.WriteErrorResponse(w, r, apierrors.ToApiError(ctx, err))
return
}
obj, err := s3a.store.StoreObject(ctx, dstBucket, dstObject, hashReader, srcObjInfo.Size, metadata)
if err != nil {
response.WriteErrorResponse(w, r, apierrors.ToApiError(ctx, err))
return
Expand Down Expand Up @@ -761,9 +767,9 @@ func trimLeadingSlash(ep string) string {

// Validate all the ListObjects query arguments, returns an APIErrorCode
// if one of the args do not meet the required conditions.
// - delimiter if set should be equal to '/', otherwise the request is rejected.
// - marker if set should have a common prefix with 'prefix' param, otherwise
// the request is rejected.
// - delimiter if set should be equal to '/', otherwise the request is rejected.
// - marker if set should have a common prefix with 'prefix' param, otherwise
// the request is rejected.
func validateListObjectsArgs(marker, delimiter, encodingType string, maxKeys int) apierrors.ErrorCode {
// Max keys cannot be negative.
if maxKeys < 0 {
Expand Down
39 changes: 22 additions & 17 deletions objectservice/store/object_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@ import (
)

// ObjectInfo - represents object metadata.
//{
// Bucket = {string} "test"
// Name = {string} "default.exe"
// ModTime = {time.Time} 2022-03-18 10:54:43.308685163 +0800
// Size = {int64} 11604147
// IsDir = {bool} false
// ETag = {string} "a6b0b7ddb4630832ed47821af59aa125"
// VersionID = {string} ""
// IsLatest = {bool} false
// DeleteMarker = {bool} false
// ContentType = {string} "application/x-msdownload"
// ContentEncoding = {string} ""
// Expires = {time.Time} 0001-01-01 00:00:00 +0000
// Parts = {[]ObjectPartInfo} nil
// AccTime = {time.Time} 0001-01-01 00:00:00 +0000
// SuccessorModTime = {time.Time} 0001-01-01 00:00:00 +0000
//}
//
// {
// Bucket = {string} "test"
// Name = {string} "default.exe"
// ModTime = {time.Time} 2022-03-18 10:54:43.308685163 +0800
// Size = {int64} 11604147
// IsDir = {bool} false
// ETag = {string} "a6b0b7ddb4630832ed47821af59aa125"
// Cid = {string} "QmRP168AQEN9vz8vnjWdEWiiJbNt4BZ5cB81qSRL5FQfGt"
// VersionID = {string} ""
// IsLatest = {bool} false
// DeleteMarker = {bool} false
// ContentType = {string} "application/x-msdownload"
// ContentEncoding = {string} ""
// Expires = {time.Time} 0001-01-01 00:00:00 +0000
// Parts = {[]ObjectPartInfo} nil
// AccTime = {time.Time} 0001-01-01 00:00:00 +0000
// SuccessorModTime = {time.Time} 0001-01-01 00:00:00 +0000
// }
type ObjectInfo struct {
// Name of the bucket.
Bucket string
Expand All @@ -42,6 +44,8 @@ type ObjectInfo struct {
// Hex encoded unique entity tag of the object.
ETag string

// ipfs key
Cid string
// Version ID of this object.
VersionID string

Expand Down Expand Up @@ -75,6 +79,7 @@ type ObjectInfo struct {
// file after CompleteMultipartUpload() is called.
type objectPartInfo struct {
ETag string `json:"etag,omitempty"`
Cid string `json:"cid,omitempty"`
Number int `json:"number"`
Size int64 `json:"size"`
ModTime time.Time `json:"mod_time"`
Expand Down
56 changes: 38 additions & 18 deletions objectservice/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"context"
"encoding/hex"
"errors"
"fmt"
"github.com/dustin/go-humanize"
Expand All @@ -11,6 +12,8 @@ import (
"github.com/filedag-project/filedag-storage/objectservice/lock"
"github.com/filedag-project/filedag-storage/objectservice/uleveldb"
"github.com/filedag-project/filedag-storage/objectservice/utils"
"github.com/filedag-project/filedag-storage/objectservice/utils/etag"
"github.com/filedag-project/filedag-storage/objectservice/utils/hash"
"github.com/filedag-project/filedag-storage/objectservice/utils/s3utils"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -60,7 +63,7 @@ const (
var ErrObjectNotFound = errors.New("object not found")
var ErrBucketNotEmpty = errors.New("bucket not empty")

//StorageSys store sys
// StorageSys store sys
type StorageSys struct {
Db *uleveldb.ULevelDB
DagPool ipld.DAGService
Expand All @@ -73,7 +76,7 @@ type StorageSys struct {
gcTimeout time.Duration
}

//NewStorageSys new a storage sys
// NewStorageSys new a storage sys
func NewStorageSys(ctx context.Context, dagService ipld.DAGService, db *uleveldb.ULevelDB) *StorageSys {
cidBuilder, _ := merkledag.PrefixForCidVersion(0)
s := &StorageSys{
Expand Down Expand Up @@ -145,7 +148,7 @@ func (s *StorageSys) store(ctx context.Context, reader io.ReadCloser, size int64

func (s *StorageSys) checkAndDeleteObjectData(ctx context.Context, bucket, object string) {
if oldObjInfo, err := s.getObjectInfo(ctx, bucket, object); err == nil {
c, err := cid.Decode(oldObjInfo.ETag)
c, err := cid.Decode(oldObjInfo.Cid)
if err != nil {
log.Warnw("decode cid error", "cid", oldObjInfo.ETag)
} else {
Expand All @@ -156,8 +159,8 @@ func (s *StorageSys) checkAndDeleteObjectData(ctx context.Context, bucket, objec
}
}

//StoreObject store object
func (s *StorageSys) StoreObject(ctx context.Context, bucket, object string, reader io.ReadCloser, size int64, meta map[string]string) (ObjectInfo, error) {
// StoreObject store object
func (s *StorageSys) StoreObject(ctx context.Context, bucket, object string, reader *hash.Reader, size int64, meta map[string]string) (ObjectInfo, error) {
bktlk := s.newBucketNSLock(bucket)
bktlkCtx, err := bktlk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
Expand All @@ -181,7 +184,8 @@ func (s *StorageSys) StoreObject(ctx context.Context, bucket, object string, rea
ModTime: time.Now().UTC(),
Size: size,
IsDir: false,
ETag: root.String(),
ETag: reader.ETag().String(),
Cid: root.String(),
VersionID: "",
IsLatest: true,
DeleteMarker: false,
Expand Down Expand Up @@ -214,7 +218,7 @@ func (s *StorageSys) StoreObject(ctx context.Context, bucket, object string, rea
return objInfo, nil
}

//GetObject Get object
// GetObject Get object
func (s *StorageSys) GetObject(ctx context.Context, bucket, object string) (ObjectInfo, io.ReadCloser, error) {
lk := s.NewNSLock(bucket, object)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
Expand All @@ -228,11 +232,11 @@ func (s *StorageSys) GetObject(ctx context.Context, bucket, object string) (Obje
if err != nil {
return ObjectInfo{}, nil, err
}
cid, err := cid.Decode(meta.ETag)
meatCid, err := cid.Decode(meta.Cid)
if err != nil {
return ObjectInfo{}, nil, err
}
dagNode, err := s.DagPool.Get(ctx, cid)
dagNode, err := s.DagPool.Get(ctx, meatCid)
if err != nil {
return ObjectInfo{}, nil, err
}
Expand Down Expand Up @@ -265,7 +269,7 @@ func (s *StorageSys) GetObjectInfo(ctx context.Context, bucket, object string) (
return s.getObjectInfo(ctx, bucket, object)
}

//DeleteObject delete object
// DeleteObject delete object
func (s *StorageSys) DeleteObject(ctx context.Context, bucket, object string) error {
lk := s.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, deleteOperationTimeout)
Expand All @@ -279,7 +283,7 @@ func (s *StorageSys) DeleteObject(ctx context.Context, bucket, object string) er
if err != nil {
return err
}
cid, err := cid.Decode(meta.ETag)
cid, err := cid.Decode(meta.Cid)
if err != nil {
return err
}
Expand Down Expand Up @@ -337,8 +341,8 @@ type ListObjectsInfo struct {
Prefixes []string
}

//ListObjects list user object
//TODO use more params
// ListObjects list user object
// TODO use more params
func (s *StorageSys) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
if maxKeys == 0 {
return loi, nil
Expand Down Expand Up @@ -508,7 +512,7 @@ func (s *StorageSys) getMultipartInfo(ctx context.Context, bucket string, object
return info, err
}

func (s *StorageSys) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, reader io.ReadCloser, size int64, meta map[string]string) (pi objectPartInfo, err error) {
func (s *StorageSys) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, reader *hash.Reader, size int64, meta map[string]string) (pi objectPartInfo, err error) {
bktlk := s.newBucketNSLock(bucket)
bktlkCtx, err := bktlk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
Expand All @@ -524,7 +528,8 @@ func (s *StorageSys) PutObjectPart(ctx context.Context, bucket string, object st

partInfo := objectPartInfo{
Number: partID,
ETag: root.String(),
ETag: reader.ETag().String(),
Cid: root.String(),
Size: size,
ModTime: time.Now().UTC(),
}
Expand Down Expand Up @@ -634,7 +639,7 @@ func (s *StorageSys) CompleteMultiPartUpload(ctx context.Context, bucket string,
// Save for total object size.
objectSize += gotPart.Size

c, err := cid.Decode(gotPart.ETag)
c, err := cid.Decode(gotPart.Cid)
if err != nil {
return oi, err
}
Expand All @@ -648,13 +653,15 @@ func (s *StorageSys) CompleteMultiPartUpload(ctx context.Context, bucket string,
if err != nil {
return oi, err
}
etag := ComputeCompleteMultipartMD5(parts)
objInfo := ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: time.Now().UTC(),
Size: objectSize,
IsDir: false,
ETag: root.String(),
ETag: etag,
Cid: root.String(),
VersionID: "",
IsLatest: true,
DeleteMarker: false,
Expand Down Expand Up @@ -720,7 +727,7 @@ func (s *StorageSys) AbortMultipartUpload(ctx context.Context, bucket string, ob
}

for _, part := range mi.Parts {
c, err := cid.Decode(part.ETag)
c, err := cid.Decode(part.Cid)
if err != nil {
return err
}
Expand Down Expand Up @@ -1035,3 +1042,16 @@ func checkSystemIdle() bool {
}
return true
}
func ComputeCompleteMultipartMD5(parts []datatypes.CompletePart) string {
var finalMD5Bytes []byte
for _, part := range parts {
md5Bytes, err := hex.DecodeString(canonicalizeETag(part.ETag))
if err != nil {
finalMD5Bytes = append(finalMD5Bytes, []byte(part.ETag)...)
} else {
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
}
}
s3MD5 := fmt.Sprintf("%s-%d", etag.Multipart(finalMD5Bytes), len(parts))
return s3MD5
}