Skip to content

Commit

Permalink
s3store: Adjust implementation of GetReader
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Aug 3, 2024
1 parent 083d840 commit c9c5c6a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 88 deletions.
47 changes: 12 additions & 35 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,48 +723,25 @@ func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMul
}

func (upload s3Upload) GetReader(ctx context.Context) (io.ReadCloser, error) {
// TODO: We can access the file info in here to see if the upload is still incomplete.
store := upload.store
// If the uplload is not yet complete, we cannot download the file. There is no way to retrieve
// the content of an incomplete multipart upload.
isComplete := !upload.info.SizeIsDeferred && upload.info.Offset == upload.info.Size
if !isComplete {
return nil, handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest)
}

// Attempt to get upload content
store := upload.store
res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(upload.objectId),
})
if err == nil {
// No error occurred, and we are able to stream the object
return res.Body, nil
}

// If the file cannot be found, we ignore this error and continue since the
// upload may not have been finished yet. In this case we do not want to
// return a ErrNotFound but a more meaning-full message.
if !isAwsError[*types.NoSuchKey](err) {
return nil, err
}

// Test whether the multipart upload exists to find out if the upload
// never existsted or just has not been finished yet
_, err = store.Service.ListParts(ctx, &s3.ListPartsInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(upload.objectId),
UploadId: aws.String(upload.multipartId),
MaxParts: aws.Int32(0),
})
if err == nil {
// The multipart upload still exists, which means we cannot download it yet
return nil, handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest)
}

// The AWS Go SDK v2 has a bug where types.NoSuchUpload is not returned,
// so we also need to check the error code itself.
// See https://github.com/aws/aws-sdk-go-v2/issues/1635
if isAwsError[*types.NoSuchUpload](err) || isAwsErrorCode(err, "NoSuchUpload") {
// Neither the object nor the multipart upload exists, so we return a 404
return nil, handler.ErrNotFound
if err != nil {
// Note: We do not check for NoSuchKey here on purpose. If the object cannot be found
// but we expect it to be there, then we should error out with a 500, not a 404.
return nil, fmt.Errorf("s3store: failed to fetch object: %w", err)
}

return nil, err
return res.Body, nil
}

func (upload s3Upload) Terminate(ctx context.Context) error {
Expand Down
100 changes: 47 additions & 53 deletions pkg/s3store/s3store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,16 +638,29 @@ func TestGetInfoWithOldIdFormat(t *testing.T) {
}

func TestGetReader(t *testing.T) {
// TODO: Simplify GetReader implementation
t.SkipNow()

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)

s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)

s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":12,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))),
}, nil)
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: nil,
}).Return(nil, &types.NoSuchUpload{})
s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(nil, &types.NoSuchKey{})
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
Expand All @@ -663,65 +676,46 @@ func TestGetReader(t *testing.T) {
assert.Equal(io.NopCloser(bytes.NewReader([]byte(`hello world`))), content)
}

func TestGetReaderNotFound(t *testing.T) {
// TODO: Simplify GetReader implementation
t.SkipNow()

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)

s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)

gomock.InOrder(
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
}).Return(nil, &types.NoSuchKey{}),
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
MaxParts: aws.Int32(0),
}).Return(nil, &types.NoSuchUpload{}),
)

upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)

content, err := upload.GetReader(context.Background())
assert.Nil(content)
assert.Equal(handler.ErrNotFound, err)
}

func TestGetReaderNotFinished(t *testing.T) {
// TODO: Simplify GetReader implementation
t.SkipNow()

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)

s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)

gomock.InOrder(
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
}).Return(nil, &types.NoSuchKey{}),
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
MaxParts: aws.Int32(0),
}).Return(&s3.ListPartsOutput{
Parts: []types.Part{},
}, nil),
)
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))),
}, nil)
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: nil,
}).Return(&s3.ListPartsOutput{
Parts: []types.Part{
{
PartNumber: aws.Int32(1),
Size: aws.Int64(100),
ETag: aws.String("etag-1"),
},
{
PartNumber: aws.Int32(2),
Size: aws.Int64(200),
ETag: aws.String("etag-2"),
},
},
IsTruncated: aws.Bool(false),
}, nil)
s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(nil, &types.NoSuchKey{})

upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
upload, err := store.GetUpload(context.Background(), "uploadId")
assert.Nil(err)

content, err := upload.GetReader(context.Background())
Expand Down

0 comments on commit c9c5c6a

Please sign in to comment.