diff --git a/Makefile b/Makefile index 2bb068d..8ca5625 100644 --- a/Makefile +++ b/Makefile @@ -19,8 +19,8 @@ test: $(OVAL) .PHONY: run run: $(OVAL) - $(OVAL) --size 4k-16k --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000 --save test.json - $(OVAL) --time 3s --load test.json + $(OVAL) --size 4k-12m --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000 --multipart_thresh 5m --save test.json + $(OVAL) --time 3s --multipart_thresh 6m --load test.json .PHONY: run-multi-process run-multi-process: $(OVAL) @@ -30,7 +30,7 @@ run-multi-process: $(OVAL) .PHONY: run-leader run-leader: $(OVAL) - $(OVAL) leader --follower_list "http://localhost:8080,http://localhost:8081,http://localhost:8082" --size 4k-16k --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000 + $(OVAL) leader --follower_list "http://localhost:8080,http://localhost:8081,http://localhost:8082" --size 4k-12m --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000 --multipart_thresh 5m .PHONY: run-followers run-followers: $(OVAL) diff --git a/argparser/argparser_test.go b/argparser/argparser_test.go new file mode 100644 index 0000000..7ca9f86 --- /dev/null +++ b/argparser/argparser_test.go @@ -0,0 +1,51 @@ +package argparser + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseMultipartThresh(t *testing.T) { + type testCase struct { + multipartThreshStr string + expectedMultipartThresh int + expectedErr bool + } + testCases := []testCase{ + { + multipartThreshStr: "512", + expectedMultipartThresh: 512, + expectedErr: false, + }, + { + multipartThreshStr: "2k", + expectedMultipartThresh: 2048, + expectedErr: false, + }, + { + multipartThreshStr: "4m", + expectedMultipartThresh: 4 * 1024 * 1024, + expectedErr: false, + }, + { + multipartThreshStr: "12m", + expectedMultipartThresh: 12 * 1024 * 1024, + expectedErr: false, + }, + { + multipartThreshStr: "8g", + expectedMultipartThresh: 0, + expectedErr: true, + }, + } + + for _, tc := range testCases { + multipartThresh, err := ParseMultipartThresh(tc.multipartThreshStr) + if tc.expectedErr { + assert.Errorf(t, err, "tc.multipartThreshStr: %s", tc.multipartThreshStr) + continue + } + assert.Equal(t, tc.expectedMultipartThresh, multipartThresh) + } +} diff --git a/argparser/size.go b/argparser/size.go index 1ddc23e..ebd1e37 100644 --- a/argparser/size.go +++ b/argparser/size.go @@ -53,3 +53,8 @@ func parseSizeUnit(s string) (int, error) { } return 0, fmt.Errorf("Illegal size format: %v\n", s) } + +func ParseMultipartThresh(s string) (int, error) { + mpThresh, err := parseSizeUnit(s) + return mpThresh, err +} diff --git a/cmd/leader.go b/cmd/leader.go index 02ec8d7..eaee540 100644 --- a/cmd/leader.go +++ b/cmd/leader.go @@ -24,7 +24,7 @@ var leaderCmd = &cobra.Command{ log.Fatal(err) } err = multiprocess.StartFollower(followerList, execContext, - opeRatio, execTime.Milliseconds()) + opeRatio, execTime.Milliseconds(), multipartThresh) if err != nil { log.Fatal(err) } diff --git a/cmd/root.go b/cmd/root.go index c7dedc2..294c95a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -12,19 +12,21 @@ import ( ) var ( - numObj int - numWorker int - sizePattern string - execTime time.Duration - bucketNames []string - opeRatioStr string - endpoint string - profiler bool - saveFileName string - loadFileName string + numObj int + numWorker int + sizePattern string + execTime time.Duration + bucketNames []string + opeRatioStr string + endpoint string + multipartThreshStr string + profiler bool + saveFileName string + loadFileName string minSize, maxSize int opeRatio []float64 + multipartThresh int execContext *runner.ExecutionContext ) @@ -57,9 +59,9 @@ If no subcommands are specified, Oval runs in the single-process mode.`, var r *runner.Runner if loadFileName == "" { - r = runner.NewRunner(execContext, opeRatio, execTime.Milliseconds(), profiler, loadFileName, 0) + r = runner.NewRunner(execContext, opeRatio, execTime.Milliseconds(), profiler, loadFileName, 0, multipartThresh) } else { - r = runner.NewRunnerFromLoadFile(loadFileName, opeRatio, execTime.Milliseconds(), profiler) + r = runner.NewRunnerFromLoadFile(loadFileName, opeRatio, execTime.Milliseconds(), profiler, multipartThresh) } err = r.Run(nil) if err != nil { @@ -113,6 +115,10 @@ func handleCommonFlags() { if err != nil { log.Fatal(err) } + multipartThresh, err = argparser.ParseMultipartThresh(multipartThreshStr) + if err != nil { + log.Fatal(err) + } if numWorker >= 256 { log.Fatal("The number of workers must be less than 256.") @@ -144,9 +150,10 @@ func handleCommonFlags() { func defineCommonFlags(cmd *cobra.Command) { cmd.Flags().IntVar(&numObj, "num_obj", 10, "The maximum number of objects per process.") cmd.Flags().IntVar(&numWorker, "num_worker", 1, "The number of workers per process.") - cmd.Flags().StringVar(&sizePattern, "size", "4k", "The size of object. Should be in the form like \"8k\" or \"4k-2m\". The unit \"g\" or \"G\" is not allowed.") + cmd.Flags().StringVar(&sizePattern, "size", "4k", "The size of object. Should be in the form like \"8k\" or \"4k-2m\". Only \"k\" and \"m\" is allowed as an unit.") cmd.Flags().DurationVar(&execTime, "time", time.Second*3, "Time duration for run the workload.") cmd.Flags().StringSliceVar(&bucketNames, "bucket", nil, "The name list of the buckets. e.g. \"bucket1,bucket2\"") cmd.Flags().StringVar(&opeRatioStr, "ope_ratio", "1,1,1,0", "The ration of put, get, delete and list operations. e.g. \"2,3,1,1\"") cmd.Flags().StringVar(&endpoint, "endpoint", "", "The endpoint URL and TCP port number. e.g. \"http://127.0.0.1:9000\"") + cmd.Flags().StringVar(&multipartThreshStr, "multipart_thresh", "100m", "The threshold of the object size to switch to the multipart upload. Only \"k\" and \"m\" is allowed as an unit.") } diff --git a/multiprocess/follower.go b/multiprocess/follower.go index 395970c..6bae3b8 100644 --- a/multiprocess/follower.go +++ b/multiprocess/follower.go @@ -88,7 +88,7 @@ func startHandler(w http.ResponseWriter, r *http.Request) { state = running go func() { - run = runner.NewRunner(¶m.Context, param.OpeRatio, param.TimeInMs, false, "", param.ID) + run = runner.NewRunner(¶m.Context, param.OpeRatio, param.TimeInMs, false, "", param.ID, param.MultipartThresh) runnerErr = run.Run(cancel) mu.Lock() defer mu.Unlock() diff --git a/multiprocess/leader.go b/multiprocess/leader.go index 92a11a0..e325089 100644 --- a/multiprocess/leader.go +++ b/multiprocess/leader.go @@ -19,10 +19,11 @@ const ( ) type StartFollowerParameter struct { - ID int - Context runner.ExecutionContext - OpeRatio []float64 - TimeInMs int64 + ID int + Context runner.ExecutionContext + OpeRatio []float64 + TimeInMs int64 + MultipartThresh int } func InitFollower(followerList []string) error { @@ -44,13 +45,14 @@ func InitFollower(followerList []string) error { func StartFollower(followerList []string, context *runner.ExecutionContext, - opeRatio []float64, timeInMs int64) error { + opeRatio []float64, timeInMs int64, multipartThresh int) error { for i, follower := range followerList { param := StartFollowerParameter{ - ID: i, - Context: *context, - OpeRatio: opeRatio, - TimeInMs: timeInMs, + ID: i, + Context: *context, + OpeRatio: opeRatio, + TimeInMs: timeInMs, + MultipartThresh: multipartThresh, } data, err := json.Marshal(param) if err != nil { diff --git a/pattern/pattern.go b/pattern/pattern.go index 4a86ad0..a61b460 100644 --- a/pattern/pattern.go +++ b/pattern/pattern.go @@ -19,43 +19,52 @@ const ( dataUnitHeaderSizeWithoutBucketAndKey = 20 ) -func Generate(minSize, maxSize, workerID int, bucketName string, obj *object.Object) (io.ReadSeeker, int, error) { +func DecideSize(minSize, maxSize int) (int, error) { + if minSize < dataUnitSize { + return 0, fmt.Errorf("minSize should be larger than or equal to %v.", dataUnitSize) + } if minSize%dataUnitSize != 0 { - return nil, 0, fmt.Errorf("minSize should be a multiple of %v.", dataUnitSize) + return 0, fmt.Errorf("minSize should be a multiple of %v.", dataUnitSize) } if maxSize != 0 && maxSize%dataUnitSize != 0 { - return nil, 0, fmt.Errorf("maxSize should be a multiple of %v.", dataUnitSize) + return 0, fmt.Errorf("maxSize should be a multiple of %v.", dataUnitSize) } if maxSize < minSize { - return nil, 0, errors.New("maxSize should be larger than minSize.") + return 0, errors.New("maxSize should be larger than or equal to minSize.") + } + + return minSize + dataUnitSize*rand.Intn((maxSize-minSize)/dataUnitSize+1), nil +} + +func Generate(dataSize, workerID, offset int, bucketName string, obj *object.Object) (io.ReadSeeker, error) { + if offset%dataUnitSize != 0 { + return nil, fmt.Errorf("invalid offset size: %d", offset) } if len(bucketName) > object.MaxBucketNameLength { bucketName = bucketName[:object.MaxBucketNameLength] } - var dataSize int - dataSize = minSize + dataUnitSize*rand.Intn((maxSize-minSize)/dataUnitSize+1) - f := memfile.New([]byte{}) // memfile does not implement io.Closer interface. + unitCountOffset := offset / dataUnitSize for i := 0; i < dataSize/dataUnitSize; i++ { - err := generateDataUnit(i, workerID, bucketName, obj, f) + err := generateDataUnit(i+unitCountOffset, workerID, bucketName, obj, f) if err != nil { - return nil, 0, err + return nil, err } } if len(f.Bytes()) != dataSize { - return nil, 0, fmt.Errorf("Generated data size is wrong. (expected: %v, actual: %v)", dataSize, f.Bytes()) + return nil, fmt.Errorf("Generated data size is wrong. (expected: %v, actual: %v)", dataSize, f.Bytes()) } _, err := f.Seek(0, 0) if err != nil { - return nil, 0, err + return nil, err } - return f, dataSize, nil + return f, nil } func generateDataUnit(unitCount, workerID int, bucketName string, obj *object.Object, writer io.Writer) error { diff --git a/pattern/pattern_test.go b/pattern/pattern_test.go index 3b5a63a..b36e2ff 100644 --- a/pattern/pattern_test.go +++ b/pattern/pattern_test.go @@ -18,19 +18,19 @@ const ( /*******************************/ /* Test set up */ /*******************************/ -type GeneratorSuite struct { +type PatternSuite struct { suite.Suite f io.ReadWriteSeeker } -func (suite *GeneratorSuite) SetupTest() { +func (suite *PatternSuite) SetupTest() { suite.f = memfile.New([]byte{}) } /*******************************/ /* Test cases */ /*******************************/ -func (suite *GeneratorSuite) TestGenerateDataUnitSuccess() { +func (suite *PatternSuite) TestGenerateDataUnitSuccess() { obj := &object.Object{ Key: testKeyName, Size: 256, @@ -61,7 +61,7 @@ func (suite *GeneratorSuite) TestGenerateDataUnitSuccess() { suite.Equal([]byte{0x64, 0x00, 0x00, 0x00}, data[current:current+4]) // hex(100) = 0x64 } -func (suite *GeneratorSuite) TestGenerateSuccess() { +func (suite *PatternSuite) TestGenerateSuccess() { obj := &object.Object{ Key: testKeyName, Size: 256, @@ -69,7 +69,8 @@ func (suite *GeneratorSuite) TestGenerateSuccess() { } workerID := 100 - readSeeker, size, err := Generate(512, 512, workerID, testBucketName, obj) + size := 512 + readSeeker, err := Generate(size, workerID, 0, testBucketName, obj) suite.NoError(err) suite.Equal(512, size) data, err := io.ReadAll(readSeeker) @@ -111,7 +112,7 @@ func (suite *GeneratorSuite) TestGenerateSuccess() { suite.Equal([]byte{0x64, 0x00, 0x00, 0x00}, data[current:current+4]) // hex(100) = 0x64 } -func (suite *GeneratorSuite) TestGenerateLongBucketName() { +func (suite *PatternSuite) TestGenerateLongBucketName() { obj := &object.Object{ Key: testKeyName, Size: 256, @@ -119,7 +120,8 @@ func (suite *GeneratorSuite) TestGenerateLongBucketName() { } workerID := 100 - readSeeker, size, err := Generate(512, 512, workerID, testLongBucketName, obj) + size := 512 + readSeeker, err := Generate(size, workerID, 0, testLongBucketName, obj) suite.NoError(err) suite.Equal(512, size) data, err := io.ReadAll(readSeeker) @@ -161,7 +163,7 @@ func (suite *GeneratorSuite) TestGenerateLongBucketName() { suite.Equal([]byte{0x64, 0x00, 0x00, 0x00}, data[current:current+4]) // hex(100) = 0x64 } -func (suite *GeneratorSuite) TestValidDataUnitSuccess() { +func (suite *PatternSuite) TestValidDataUnitSuccess() { obj := &object.Object{ Key: testKeyName, Size: 256, @@ -178,14 +180,15 @@ func (suite *GeneratorSuite) TestValidDataUnitSuccess() { suite.Equal(nil, validDataUnit(4, workerID, testBucketName, obj, data)) } -func (suite *GeneratorSuite) TestValidSuccess() { +func (suite *PatternSuite) TestValidSuccess() { obj := &object.Object{ Key: testKeyName, WriteCount: 300, } workerID := 100 - readSeeker, size, err := Generate(1024, 1024, workerID, testBucketName, obj) + size := 1024 + readSeeker, err := Generate(size, workerID, 0, testBucketName, obj) suite.NoError(err) obj.Size = size @@ -193,14 +196,15 @@ func (suite *GeneratorSuite) TestValidSuccess() { suite.NoError(err) } -func (suite *GeneratorSuite) TestValidLongBucketName() { +func (suite *PatternSuite) TestValidLongBucketName() { obj := &object.Object{ Key: testKeyName, WriteCount: 300, } workerID := 100 - readSeeker, size, err := Generate(1024, 1024, workerID, testLongBucketName, obj) + size := 1024 + readSeeker, err := Generate(size, workerID, 0, testLongBucketName, obj) suite.NoError(err) obj.Size = size @@ -208,9 +212,64 @@ func (suite *GeneratorSuite) TestValidLongBucketName() { suite.NoError(err) } +func (suite *PatternSuite) TestDecideSize() { + type testCase struct { + minSize int + maxSize int + expectedErr bool + } + testCases := []testCase{ + { + minSize: 512, + maxSize: 512, + expectedErr: false, + }, + { + minSize: 1024, + maxSize: 5 * 1024 * 1024, + expectedErr: false, + }, + { + minSize: 256, + maxSize: 1024, + expectedErr: false, + }, + { + minSize: 513, + maxSize: 1024, + expectedErr: true, + }, + { + minSize: 512, + maxSize: 513, + expectedErr: true, + }, + { + minSize: 0, + maxSize: 512, + expectedErr: true, + }, + { + minSize: 1024, + maxSize: 512, + expectedErr: true, + }, + } + + for _, tc := range testCases { + size, err := DecideSize(tc.minSize, tc.maxSize) + if tc.expectedErr { + suite.Errorf(err, "tc.minSize: %d, tc.maxSize: %d", tc.minSize, tc.maxSize) + continue + } + suite.GreaterOrEqual(size, tc.minSize) + suite.LessOrEqual(size, tc.maxSize) + } +} + /*******************************/ /* Run tests */ /*******************************/ -func TestGenerateSuite(t *testing.T) { - suite.Run(t, new(GeneratorSuite)) +func TestPatternSuite(t *testing.T) { + suite.Run(t, new(PatternSuite)) } diff --git a/runner/runner.go b/runner/runner.go index 9e87b1e..0ccac38 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -33,31 +33,33 @@ type ExecutionContext struct { } type Runner struct { - execContext *ExecutionContext - opeRatio []float64 - timeInMs int64 - profiler bool - loadFileName string - client *s3_client.S3Client - st stat.Stat - processID int + execContext *ExecutionContext + opeRatio []float64 + timeInMs int64 + profiler bool + loadFileName string + client *s3_client.S3Client + st stat.Stat + processID int + multipartThresh int } func NewRunner(execContext *ExecutionContext, opeRatio []float64, timeInMs int64, - profiler bool, loadFileName string, processID int) *Runner { + profiler bool, loadFileName string, processID, multipartThresh int) *Runner { runner := &Runner{ - execContext: execContext, - opeRatio: opeRatio, - timeInMs: timeInMs, - profiler: profiler, - loadFileName: loadFileName, - processID: processID, + execContext: execContext, + opeRatio: opeRatio, + timeInMs: timeInMs, + profiler: profiler, + loadFileName: loadFileName, + processID: processID, + multipartThresh: multipartThresh, } runner.init() return runner } -func NewRunnerFromLoadFile(loadFileName string, opeRatio []float64, timeInMs int64, profiler bool) *Runner { +func NewRunnerFromLoadFile(loadFileName string, opeRatio []float64, timeInMs int64, profiler bool, multipartThresh int) *Runner { if loadFileName == "" { log.Fatal("loadFileName is empty.") } @@ -66,7 +68,7 @@ func NewRunnerFromLoadFile(loadFileName string, opeRatio []float64, timeInMs int log.Fatal(err) } ec := loadSavedContext(loadFileName) - return NewRunner(ec, opeRatio, timeInMs, profiler, loadFileName, 0) + return NewRunner(ec, opeRatio, timeInMs, profiler, loadFileName, 0, multipartThresh) } func loadSavedContext(loadFileName string) *ExecutionContext { @@ -180,7 +182,7 @@ func (r *Runner) Run(cancel chan struct{}) error { operation := r.selectOperation() switch operation { case Put: - err = r.execContext.Workers[workerID].Put() + err = r.execContext.Workers[workerID].Put(r.multipartThresh) case Get: err = r.execContext.Workers[workerID].Get() case Delete: diff --git a/runner/worker.go b/runner/worker.go index 96d5cf9..d2b66d3 100644 --- a/runner/worker.go +++ b/runner/worker.go @@ -33,7 +33,7 @@ func (w *Worker) ShowInfo() { log.Printf("Worker ID = %#x, Key = [%s, %s]\n", w.id, head, tail) } -func (w *Worker) Put() error { +func (w *Worker) Put(multipartThresh int) error { bucketWithObj := w.selectBucketWithObject() obj := bucketWithObj.ObjectMeta.GetRandomObject() @@ -69,18 +69,50 @@ func (w *Worker) Put() error { w.st.AddGetForValidCount() } - bucketWithObj.ObjectMeta.RegisterToExistingList(obj.Key) - obj.WriteCount++ - body, size, err := pattern.Generate(w.minSize, w.maxSize, w.id, bucketWithObj.BucketName, obj) - obj.Size = size + size, err := pattern.DecideSize(w.minSize, w.maxSize) if err != nil { log.Println(err.Error()) return err } - err = w.client.PutObject(bucketWithObj.BucketName, obj.Key, body) - if err != nil { - log.Println(err.Error()) - return err + obj.Size = size + bucketWithObj.ObjectMeta.RegisterToExistingList(obj.Key) + obj.WriteCount++ + if size <= multipartThresh { + body, err := pattern.Generate(size, w.id, 0, bucketWithObj.BucketName, obj) + if err != nil { + log.Println(err.Error()) + return err + } + err = w.client.PutObject(bucketWithObj.BucketName, obj.Key, body) + if err != nil { + log.Println(err.Error()) + return err + } + } else { + // multipart upload + partBodies := make([]s3_client.PartBody, 0) + remainingSize := size + for remainingSize > 0 { + partSize := remainingSize + if multipartThresh < partSize { + partSize = multipartThresh + } + body, err := pattern.Generate(partSize, w.id, size-remainingSize, bucketWithObj.BucketName, obj) + if err != nil { + log.Println(err.Error()) + return err + } + partBodies = append(partBodies, s3_client.PartBody{ + Body: body, + Size: partSize, + }) + remainingSize -= partSize + } + err = w.client.MultipartUpload(bucketWithObj.BucketName, obj.Key, partBodies) + if err != nil { + log.Println(err.Error()) + return err + } } w.st.AddPutCount() diff --git a/s3_client/s3_client.go b/s3_client/s3_client.go index b48b828..eef60cb 100644 --- a/s3_client/s3_client.go +++ b/s3_client/s3_client.go @@ -109,6 +109,70 @@ func (s *S3Client) PutObject(bucketName, key string, body io.ReadSeeker) error { return nil } +type PartBody struct { + Body io.ReadSeeker + Size int +} + +func (s *S3Client) MultipartUpload(bucketName, key string, partBodies []PartBody) error { + ctx := context.Background() + cmuOutput, err := s.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: &bucketName, + Key: &key, + }) + if err != nil { + return err + } + + partList := make([]types.CompletedPart, 0) + for i, partBody := range partBodies { + upOutput, err := s.client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: &bucketName, + Key: &key, + Body: partBody.Body, + PartNumber: int32(i + 1), + UploadId: cmuOutput.UploadId, + ContentLength: int64(partBody.Size), + }) + if err != nil { + _, abortErr := s.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: &bucketName, + Key: &key, + UploadId: cmuOutput.UploadId, + }) + if abortErr != nil { + log.Fatalf("UploadPart err: %v, AbortMultipartUpload: %v", err, abortErr) + } + return err + } + partList = append(partList, types.CompletedPart{ + PartNumber: int32(i + 1), + ETag: upOutput.ETag, + }) + } + + _, err = s.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucketName, + Key: &key, + UploadId: cmuOutput.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: partList, + }, + }) + if err != nil { + _, abortErr := s.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: &bucketName, + Key: &key, + UploadId: cmuOutput.UploadId, + }) + if abortErr != nil { + log.Fatalf("CompleteMultipartUpload err: %v, AbortMultipartUpload: %v", err, abortErr) + } + return err + } + return nil +} + func (s *S3Client) GetObject(bucketName, key string) (io.ReadCloser, error) { res, err := s.client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: &bucketName,