From c9c5c6abdba997f6e064ee682d6c84deeb07b0a2 Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Sat, 3 Aug 2024 14:49:42 +0200 Subject: [PATCH] s3store: Adjust implementation of GetReader --- pkg/s3store/s3store.go | 47 +++++------------ pkg/s3store/s3store_test.go | 100 +++++++++++++++++------------------- 2 files changed, 59 insertions(+), 88 deletions(-) diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 975a1605..4974bd1b 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -723,48 +723,25 @@ func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMul } func (upload s3Upload) GetReader(ctx context.Context) (io.ReadCloser, error) { - // TODO: We can access the file info in here to see if the upload is still incomplete. - store := upload.store + // If the uplload is not yet complete, we cannot download the file. There is no way to retrieve + // the content of an incomplete multipart upload. + isComplete := !upload.info.SizeIsDeferred && upload.info.Offset == upload.info.Size + if !isComplete { + return nil, handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest) + } - // Attempt to get upload content + store := upload.store res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(upload.objectId), }) - if err == nil { - // No error occurred, and we are able to stream the object - return res.Body, nil - } - - // If the file cannot be found, we ignore this error and continue since the - // upload may not have been finished yet. In this case we do not want to - // return a ErrNotFound but a more meaning-full message. - if !isAwsError[*types.NoSuchKey](err) { - return nil, err - } - - // Test whether the multipart upload exists to find out if the upload - // never existsted or just has not been finished yet - _, err = store.Service.ListParts(ctx, &s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(upload.objectId), - UploadId: aws.String(upload.multipartId), - MaxParts: aws.Int32(0), - }) - if err == nil { - // The multipart upload still exists, which means we cannot download it yet - return nil, handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest) - } - - // The AWS Go SDK v2 has a bug where types.NoSuchUpload is not returned, - // so we also need to check the error code itself. - // See https://github.com/aws/aws-sdk-go-v2/issues/1635 - if isAwsError[*types.NoSuchUpload](err) || isAwsErrorCode(err, "NoSuchUpload") { - // Neither the object nor the multipart upload exists, so we return a 404 - return nil, handler.ErrNotFound + if err != nil { + // Note: We do not check for NoSuchKey here on purpose. If the object cannot be found + // but we expect it to be there, then we should error out with a 500, not a 404. + return nil, fmt.Errorf("s3store: failed to fetch object: %w", err) } - return nil, err + return res.Body, nil } func (upload s3Upload) Terminate(ctx context.Context) error { diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 11e5ddc0..d4dca862 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -638,9 +638,6 @@ func TestGetInfoWithOldIdFormat(t *testing.T) { } func TestGetReader(t *testing.T) { - // TODO: Simplify GetReader implementation - t.SkipNow() - mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -648,6 +645,22 @@ func TestGetReader(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":12,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(nil, &types.NoSuchUpload{}) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -663,42 +676,7 @@ func TestGetReader(t *testing.T) { assert.Equal(io.NopCloser(bytes.NewReader([]byte(`hello world`))), content) } -func TestGetReaderNotFound(t *testing.T) { - // TODO: Simplify GetReader implementation - t.SkipNow() - - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - assert := assert.New(t) - - s3obj := NewMockS3API(mockCtrl) - store := New("bucket", s3obj) - - gomock.InOrder( - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - }).Return(nil, &types.NoSuchKey{}), - s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - MaxParts: aws.Int32(0), - }).Return(nil, &types.NoSuchUpload{}), - ) - - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") - assert.Nil(err) - - content, err := upload.GetReader(context.Background()) - assert.Nil(content) - assert.Equal(handler.ErrNotFound, err) -} - func TestGetReaderNotFinished(t *testing.T) { - // TODO: Simplify GetReader implementation - t.SkipNow() - mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -706,22 +684,38 @@ func TestGetReaderNotFinished(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - gomock.InOrder( - s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - }).Return(nil, &types.NoSuchKey{}), - s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - MaxParts: aws.Int32(0), - }).Return(&s3.ListPartsOutput{ - Parts: []types.Part{}, - }, nil), - ) + s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":{"bar":"menĂ¼","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: nil, + }).Return(&s3.ListPartsOutput{ + Parts: []types.Part{ + { + PartNumber: aws.Int32(1), + Size: aws.Int64(100), + ETag: aws.String("etag-1"), + }, + { + PartNumber: aws.Int32(2), + Size: aws.Int64(200), + ETag: aws.String("etag-2"), + }, + }, + IsTruncated: aws.Bool(false), + }, nil) + s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, &types.NoSuchKey{}) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + upload, err := store.GetUpload(context.Background(), "uploadId") assert.Nil(err) content, err := upload.GetReader(context.Background())