Skip to content

Commit

Permalink
s3store: Adjust tests for ConcatUploads
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Aug 6, 2024
1 parent f7f2f50 commit ab7099f
Showing 1 changed file with 104 additions and 56 deletions.
160 changes: 104 additions & 56 deletions pkg/s3store/s3store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,15 +1337,13 @@ func TestTerminateWithErrors(t *testing.T) {
}

func TestConcatUploadsUsingMultipart(t *testing.T) {
// TODO: Should we have to load the info for the partial uploads when concatenating? Probably, if we decouple upload and objectId
t.SkipNow()

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)

s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
// All partial uploads have a size (500) larger than the MinPartSize, so a S3 Multipart Upload is used for concatenation.
store.MinPartSize = 100

// Calls from NewUpload
Expand All @@ -1359,16 +1357,36 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["aaa+AAA","bbb+BBB","ccc+CCC"],"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)),
ContentLength: aws.Int64(254),
Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":1500,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["uploadA","uploadB","uploadC"],"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`)),
ContentLength: aws.Int64(257),
})

// Calls from GetUpload
for _, id := range []string{"uploadA", "uploadB", "uploadC"} {
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String(id + ".info"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"` + id + `","Size":500,"Offset":0,"MetaData":null,"IsPartial":true,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"` + id + `","MultipartUpload":"multipart` + id + `","Type":"s3store"}}`))),
}, nil)
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String(id),
UploadId: aws.String("multipart" + id),
PartNumberMarker: nil,
}).Return(nil, &types.NoSuchUpload{})
s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String(id + ".part"),
}).Return(nil, &types.NoSuchKey{})
}

// Calls from ConcatUploads
s3obj.EXPECT().UploadPartCopy(context.Background(), &s3.UploadPartCopyInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/aaa"),
CopySource: aws.String("bucket/uploadA"),
PartNumber: aws.Int32(1),
}).Return(&s3.UploadPartCopyOutput{
CopyPartResult: &types.CopyPartResult{
Expand All @@ -1380,7 +1398,7 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/bbb"),
CopySource: aws.String("bucket/uploadB"),
PartNumber: aws.Int32(2),
}).Return(&s3.UploadPartCopyOutput{
CopyPartResult: &types.CopyPartResult{
Expand All @@ -1392,7 +1410,7 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/ccc"),
CopySource: aws.String("bucket/uploadC"),
PartNumber: aws.Int32(3),
}).Return(&s3.UploadPartCopyOutput{
CopyPartResult: &types.CopyPartResult{
Expand Down Expand Up @@ -1425,28 +1443,24 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {

info := handler.FileInfo{
ID: "uploadId",
Size: 1500,
IsFinal: true,
PartialUploads: []string{
"aaa+AAA",
"bbb+BBB",
"ccc+CCC",
"uploadA",
"uploadB",
"uploadC",
},
}
upload, err := store.NewUpload(context.Background(), info)
assert.Nil(err)

uploadA, err := store.GetUpload(context.Background(), "aaa+AAA")
uploadA, err := store.GetUpload(context.Background(), "uploadA")
assert.Nil(err)
uploadB, err := store.GetUpload(context.Background(), "bbb+BBB")
uploadB, err := store.GetUpload(context.Background(), "uploadB")
assert.Nil(err)
uploadC, err := store.GetUpload(context.Background(), "ccc+CCC")
uploadC, err := store.GetUpload(context.Background(), "uploadC")
assert.Nil(err)

// All uploads have a size larger than the MinPartSize, so a S3 Multipart Upload is used for concatenation.
uploadA.(*s3Upload).info = handler.FileInfo{Size: 500}
uploadB.(*s3Upload).info = handler.FileInfo{Size: 500}
uploadC.(*s3Upload).info = handler.FileInfo{Size: 500}

err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{
uploadA,
uploadB,
Expand All @@ -1456,62 +1470,96 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
}

func TestConcatUploadsUsingDownload(t *testing.T) {
t.SkipNow() // TODO

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)

s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
// All partial uploads have a size (3, 4, 5) smaller than the MinPartSize, so the files are downloaded for concatenation.
store.MinPartSize = 100

gomock.InOrder(
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("aaa"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("aaa"))),
}, nil),
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("bbb"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("bbbb"))),
}, nil),
// Calls from GetUpload for final upload
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":12,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["uploadA","uploadB","uploadC"],"Storage":{"Bucket":"bucket","Key":"uploadId","MultipartUpload":"multipartId","Type":"s3store"}}`))),
}, nil)
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: nil,
}).Return(&s3.ListPartsOutput{
Parts: []types.Part{},
IsTruncated: aws.Bool(false),
}, nil)
s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(nil, &types.NoSuchKey{})

// Calls from GetUpload for partial uploads
for id, size := range map[string]string{"uploadA": "3", "uploadB": "4", "uploadC": "5"} {
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("ccc"),
Key: aws.String(id + ".info"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("ccccc"))),
}, nil),
s3obj.EXPECT().PutObject(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{
Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"` + id + `","Size":` + size + `,"Offset":0,"MetaData":null,"IsPartial":true,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"` + id + `","MultipartUpload":"multipart` + id + `","Type":"s3store"}}`))),
}, nil)
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String(id),
UploadId: aws.String("multipart" + id),
PartNumberMarker: nil,
}).Return(nil, &types.NoSuchUpload{})
s3obj.EXPECT().HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
Body: bytes.NewReader([]byte("aaabbbbccccc")),
})),
s3obj.EXPECT().AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
}).Return(nil, nil),
)
Key: aws.String(id + ".part"),
}).Return(nil, &types.NoSuchKey{})
}

upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
// Calls from ConcatUploads
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadA"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("aaa"))),
}, nil)
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadB"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("bbbb"))),
}, nil)
s3obj.EXPECT().GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadC"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("ccccc"))),
}, nil)
s3obj.EXPECT().PutObject(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
Body: bytes.NewReader([]byte("aaabbbbccccc")),
}))
s3obj.EXPECT().AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
}).Return(nil, nil)

upload, err := store.GetUpload(context.Background(), "uploadId")
assert.Nil(err)

uploadA, err := store.GetUpload(context.Background(), "aaa+AAA")
uploadA, err := store.GetUpload(context.Background(), "uploadA")
assert.Nil(err)
uploadB, err := store.GetUpload(context.Background(), "bbb+BBB")
uploadB, err := store.GetUpload(context.Background(), "uploadB")
assert.Nil(err)
uploadC, err := store.GetUpload(context.Background(), "ccc+CCC")
uploadC, err := store.GetUpload(context.Background(), "uploadC")
assert.Nil(err)

// All uploads have a size smaller than the MinPartSize, so the files are downloaded for concatenation.
uploadA.(*s3Upload).info = handler.FileInfo{Size: 3}
uploadB.(*s3Upload).info = handler.FileInfo{Size: 4}
uploadC.(*s3Upload).info = handler.FileInfo{Size: 5}

err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{
uploadA,
uploadB,
Expand Down

0 comments on commit ab7099f

Please sign in to comment.