Skip to content

Commit

Permalink
Merge pull request #37 from peng225/multipart-upload
Browse files Browse the repository at this point in the history
support multipart upload
  • Loading branch information
peng225 authored Feb 24, 2023
2 parents e0c47fe + 63f4e7f commit cae6975
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 80 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ test: $(OVAL)

.PHONY: run
run: $(OVAL)
$(OVAL) --size 4k-16k --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000 --save test.json
$(OVAL) --time 3s --load test.json
$(OVAL) --size 4k-12m --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000 --multipart_thresh 5m --save test.json
$(OVAL) --time 3s --multipart_thresh 6m --load test.json

.PHONY: run-multi-process
run-multi-process: $(OVAL)
Expand All @@ -30,7 +30,7 @@ run-multi-process: $(OVAL)

.PHONY: run-leader
run-leader: $(OVAL)
$(OVAL) leader --follower_list "http://localhost:8080,http://localhost:8081,http://localhost:8082" --size 4k-16k --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000
$(OVAL) leader --follower_list "http://localhost:8080,http://localhost:8081,http://localhost:8082" --size 4k-12m --time $(EXEC_TIME) --num_obj 1024 --num_worker 4 --bucket "test-bucket,test-bucket2" --ope_ratio 8,8,8,1 --endpoint http://localhost:9000 --multipart_thresh 5m

.PHONY: run-followers
run-followers: $(OVAL)
Expand Down
51 changes: 51 additions & 0 deletions argparser/argparser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package argparser

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseMultipartThresh(t *testing.T) {
type testCase struct {
multipartThreshStr string
expectedMultipartThresh int
expectedErr bool
}
testCases := []testCase{
{
multipartThreshStr: "512",
expectedMultipartThresh: 512,
expectedErr: false,
},
{
multipartThreshStr: "2k",
expectedMultipartThresh: 2048,
expectedErr: false,
},
{
multipartThreshStr: "4m",
expectedMultipartThresh: 4 * 1024 * 1024,
expectedErr: false,
},
{
multipartThreshStr: "12m",
expectedMultipartThresh: 12 * 1024 * 1024,
expectedErr: false,
},
{
multipartThreshStr: "8g",
expectedMultipartThresh: 0,
expectedErr: true,
},
}

for _, tc := range testCases {
multipartThresh, err := ParseMultipartThresh(tc.multipartThreshStr)
if tc.expectedErr {
assert.Errorf(t, err, "tc.multipartThreshStr: %s", tc.multipartThreshStr)
continue
}
assert.Equal(t, tc.expectedMultipartThresh, multipartThresh)
}
}
5 changes: 5 additions & 0 deletions argparser/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ func parseSizeUnit(s string) (int, error) {
}
return 0, fmt.Errorf("Illegal size format: %v\n", s)
}

func ParseMultipartThresh(s string) (int, error) {
mpThresh, err := parseSizeUnit(s)
return mpThresh, err
}
2 changes: 1 addition & 1 deletion cmd/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var leaderCmd = &cobra.Command{
log.Fatal(err)
}
err = multiprocess.StartFollower(followerList, execContext,
opeRatio, execTime.Milliseconds())
opeRatio, execTime.Milliseconds(), multipartThresh)
if err != nil {
log.Fatal(err)
}
Expand Down
33 changes: 20 additions & 13 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@ import (
)

var (
numObj int
numWorker int
sizePattern string
execTime time.Duration
bucketNames []string
opeRatioStr string
endpoint string
profiler bool
saveFileName string
loadFileName string
numObj int
numWorker int
sizePattern string
execTime time.Duration
bucketNames []string
opeRatioStr string
endpoint string
multipartThreshStr string
profiler bool
saveFileName string
loadFileName string

minSize, maxSize int
opeRatio []float64
multipartThresh int
execContext *runner.ExecutionContext
)

Expand Down Expand Up @@ -57,9 +59,9 @@ If no subcommands are specified, Oval runs in the single-process mode.`,

var r *runner.Runner
if loadFileName == "" {
r = runner.NewRunner(execContext, opeRatio, execTime.Milliseconds(), profiler, loadFileName, 0)
r = runner.NewRunner(execContext, opeRatio, execTime.Milliseconds(), profiler, loadFileName, 0, multipartThresh)
} else {
r = runner.NewRunnerFromLoadFile(loadFileName, opeRatio, execTime.Milliseconds(), profiler)
r = runner.NewRunnerFromLoadFile(loadFileName, opeRatio, execTime.Milliseconds(), profiler, multipartThresh)
}
err = r.Run(nil)
if err != nil {
Expand Down Expand Up @@ -113,6 +115,10 @@ func handleCommonFlags() {
if err != nil {
log.Fatal(err)
}
multipartThresh, err = argparser.ParseMultipartThresh(multipartThreshStr)
if err != nil {
log.Fatal(err)
}

if numWorker >= 256 {
log.Fatal("The number of workers must be less than 256.")
Expand Down Expand Up @@ -144,9 +150,10 @@ func handleCommonFlags() {
func defineCommonFlags(cmd *cobra.Command) {
cmd.Flags().IntVar(&numObj, "num_obj", 10, "The maximum number of objects per process.")
cmd.Flags().IntVar(&numWorker, "num_worker", 1, "The number of workers per process.")
cmd.Flags().StringVar(&sizePattern, "size", "4k", "The size of object. Should be in the form like \"8k\" or \"4k-2m\". The unit \"g\" or \"G\" is not allowed.")
cmd.Flags().StringVar(&sizePattern, "size", "4k", "The size of object. Should be in the form like \"8k\" or \"4k-2m\". Only \"k\" and \"m\" is allowed as an unit.")
cmd.Flags().DurationVar(&execTime, "time", time.Second*3, "Time duration for run the workload.")
cmd.Flags().StringSliceVar(&bucketNames, "bucket", nil, "The name list of the buckets. e.g. \"bucket1,bucket2\"")
cmd.Flags().StringVar(&opeRatioStr, "ope_ratio", "1,1,1,0", "The ration of put, get, delete and list operations. e.g. \"2,3,1,1\"")
cmd.Flags().StringVar(&endpoint, "endpoint", "", "The endpoint URL and TCP port number. e.g. \"http://127.0.0.1:9000\"")
cmd.Flags().StringVar(&multipartThreshStr, "multipart_thresh", "100m", "The threshold of the object size to switch to the multipart upload. Only \"k\" and \"m\" is allowed as an unit.")
}
2 changes: 1 addition & 1 deletion multiprocess/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func startHandler(w http.ResponseWriter, r *http.Request) {
state = running

go func() {
run = runner.NewRunner(&param.Context, param.OpeRatio, param.TimeInMs, false, "", param.ID)
run = runner.NewRunner(&param.Context, param.OpeRatio, param.TimeInMs, false, "", param.ID, param.MultipartThresh)
runnerErr = run.Run(cancel)
mu.Lock()
defer mu.Unlock()
Expand Down
20 changes: 11 additions & 9 deletions multiprocess/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ const (
)

type StartFollowerParameter struct {
ID int
Context runner.ExecutionContext
OpeRatio []float64
TimeInMs int64
ID int
Context runner.ExecutionContext
OpeRatio []float64
TimeInMs int64
MultipartThresh int
}

func InitFollower(followerList []string) error {
Expand All @@ -44,13 +45,14 @@ func InitFollower(followerList []string) error {

func StartFollower(followerList []string,
context *runner.ExecutionContext,
opeRatio []float64, timeInMs int64) error {
opeRatio []float64, timeInMs int64, multipartThresh int) error {
for i, follower := range followerList {
param := StartFollowerParameter{
ID: i,
Context: *context,
OpeRatio: opeRatio,
TimeInMs: timeInMs,
ID: i,
Context: *context,
OpeRatio: opeRatio,
TimeInMs: timeInMs,
MultipartThresh: multipartThresh,
}
data, err := json.Marshal(param)
if err != nil {
Expand Down
33 changes: 21 additions & 12 deletions pattern/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,52 @@ const (
dataUnitHeaderSizeWithoutBucketAndKey = 20
)

func Generate(minSize, maxSize, workerID int, bucketName string, obj *object.Object) (io.ReadSeeker, int, error) {
func DecideSize(minSize, maxSize int) (int, error) {
if minSize < dataUnitSize {
return 0, fmt.Errorf("minSize should be larger than or equal to %v.", dataUnitSize)
}
if minSize%dataUnitSize != 0 {
return nil, 0, fmt.Errorf("minSize should be a multiple of %v.", dataUnitSize)
return 0, fmt.Errorf("minSize should be a multiple of %v.", dataUnitSize)
}
if maxSize != 0 && maxSize%dataUnitSize != 0 {
return nil, 0, fmt.Errorf("maxSize should be a multiple of %v.", dataUnitSize)
return 0, fmt.Errorf("maxSize should be a multiple of %v.", dataUnitSize)
}
if maxSize < minSize {
return nil, 0, errors.New("maxSize should be larger than minSize.")
return 0, errors.New("maxSize should be larger than or equal to minSize.")
}

return minSize + dataUnitSize*rand.Intn((maxSize-minSize)/dataUnitSize+1), nil
}

func Generate(dataSize, workerID, offset int, bucketName string, obj *object.Object) (io.ReadSeeker, error) {
if offset%dataUnitSize != 0 {
return nil, fmt.Errorf("invalid offset size: %d", offset)
}
if len(bucketName) > object.MaxBucketNameLength {
bucketName = bucketName[:object.MaxBucketNameLength]
}

var dataSize int
dataSize = minSize + dataUnitSize*rand.Intn((maxSize-minSize)/dataUnitSize+1)

f := memfile.New([]byte{})
// memfile does not implement io.Closer interface.

unitCountOffset := offset / dataUnitSize
for i := 0; i < dataSize/dataUnitSize; i++ {
err := generateDataUnit(i, workerID, bucketName, obj, f)
err := generateDataUnit(i+unitCountOffset, workerID, bucketName, obj, f)
if err != nil {
return nil, 0, err
return nil, err
}
}

if len(f.Bytes()) != dataSize {
return nil, 0, fmt.Errorf("Generated data size is wrong. (expected: %v, actual: %v)", dataSize, f.Bytes())
return nil, fmt.Errorf("Generated data size is wrong. (expected: %v, actual: %v)", dataSize, f.Bytes())
}

_, err := f.Seek(0, 0)
if err != nil {
return nil, 0, err
return nil, err
}

return f, dataSize, nil
return f, nil
}

func generateDataUnit(unitCount, workerID int, bucketName string, obj *object.Object, writer io.Writer) error {
Expand Down
Loading

0 comments on commit cae6975

Please sign in to comment.