From 1ab80f338eadb4a211aa009ccd135b5fc4daa3d6 Mon Sep 17 00:00:00 2001 From: nabbar Date: Mon, 22 May 2023 16:23:23 +0200 Subject: [PATCH] Package AWS: - rework MultipartUpload process & helper - update test to use lib size - update object multipart to use new helper Package IO Utils : - add truncate & sync to FileProgress - fix error on open file mode for FileProgress Package Console : - fix interface used for color buffer Package Cobra : - add function to print message on write config to use custom message instead of internal message. If the function is not set, the default message will be print. Other: - fix golangci-lint config to remove crazy linter (use only golang group compliance linter) - bump dependencies --- .golangci.yml | 18 +-- aws/aws_suite_test.go | 5 +- aws/bucket_test.go | 3 +- aws/helper/partSize.go | 121 --------------- aws/multipart/errors.go | 37 +++++ aws/multipart/interface.go | 114 ++++++++++++++ aws/multipart/io.go | 34 +++++ aws/multipart/model.go | 286 ++++++++++++++++++++++++++++++++++ aws/multipart/part.go | 305 +++++++++++++++++++++++++++++++++++++ aws/multipart/start.go | 92 +++++++++++ aws/multipart/stop.go | 189 +++++++++++++++++++++++ aws/multipart/trigger.go | 87 +++++++++++ aws/object/interface.go | 3 + aws/object/multipart.go | 158 ++++--------------- aws/object_test.go | 4 +- cobra/configure.go | 15 +- cobra/interface.go | 2 +- console/color.go | 4 +- go.mod | 65 ++++---- httpcli/network.go | 9 +- ioutils/error.go | 6 + ioutils/fileProgess.go | 34 ++++- 22 files changed, 1286 insertions(+), 305 deletions(-) delete mode 100644 aws/helper/partSize.go create mode 100644 aws/multipart/errors.go create mode 100644 aws/multipart/interface.go create mode 100644 aws/multipart/io.go create mode 100644 aws/multipart/model.go create mode 100644 aws/multipart/part.go create mode 100644 aws/multipart/start.go create mode 100644 aws/multipart/stop.go create mode 100644 aws/multipart/trigger.go diff --git a/.golangci.yml b/.golangci.yml index 27d7942e..0e786ed6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -82,7 +82,7 @@ linters: disable: # - bodyclose # - contextcheck -# - cyclop + - cyclop - deadcode # - errname # - errorlint @@ -90,12 +90,12 @@ linters: - exhaustivestruct - exhaustruct # - forbidigo -# - funlen + - funlen # - gci # - gochecknoglobals # - gochecknoinits # - gocognit -# - gocritic + - gocritic # - gocyclo # - godot # - godox @@ -106,16 +106,16 @@ linters: - ifshort # - interfacebloat - interfacer -# - ireturn + - ireturn # - lll - maligned # - nakedret -# - nestif + - nestif # - nilerr # - nlreturn # - noctx -# - nolintlint -# - nonamedreturns + - nolintlint + - nonamedreturns - nosnakecase # - revive # - rowserrcheck @@ -129,5 +129,5 @@ linters: - varnamelen # - wastedassign # - whitespace -# - wrapcheck -# - wsl + - wrapcheck + - wsl diff --git a/aws/aws_suite_test.go b/aws/aws_suite_test.go index e4fb575e..b4174e79 100644 --- a/aws/aws_suite_test.go +++ b/aws/aws_suite_test.go @@ -31,6 +31,7 @@ import ( "crypto/rand" "errors" "fmt" + libsiz "github.com/nabbar/golib/size" "io/ioutil" "net" "net/http" @@ -240,8 +241,8 @@ func WaitMinio(host string) bool { return err == nil } -func randContent(size int) *bytes.Buffer { - p := make([]byte, size) +func randContent(size libsiz.Size) *bytes.Buffer { + p := make([]byte, size.Int64()) _, err := rand.Read(p) diff --git a/aws/bucket_test.go b/aws/bucket_test.go index 8ae8ea79..9bf3ee4d 100644 --- a/aws/bucket_test.go +++ b/aws/bucket_test.go @@ -26,6 +26,7 @@ package aws_test import ( + libsiz "github.com/nabbar/golib/size" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -52,7 +53,7 @@ var _ = Describe("Bucket", func() { It("Must succeed", func() { var ( err error - rnd = randContent(64 * 1024) + rnd = randContent(10 * libsiz.SizeMega) ) err = cli.Object().MultipartPut("object", rnd) diff --git a/aws/helper/partSize.go b/aws/helper/partSize.go deleted file mode 100644 index d5ead004..00000000 --- a/aws/helper/partSize.go +++ /dev/null @@ -1,121 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Nicolas JUHEL - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - * - */ - -package helper - -import ( - "errors" - "io" - "strings" - - libsiz "github.com/nabbar/golib/size" - - sdkaws "github.com/aws/aws-sdk-go-v2/aws" - sdktps "github.com/aws/aws-sdk-go-v2/service/s3/types" -) - -type ReaderPartSize interface { - io.Reader - NextPart(eTag *string) - CurrPart() int32 - CompPart() *sdktps.CompletedMultipartUpload - IeOEF() bool -} - -func NewReaderPartSize(rd io.Reader, p libsiz.Size) ReaderPartSize { - return &readerPartSize{ - b: rd, - p: p.Int64(), - i: 1, - j: 0, - e: false, - c: nil, - } -} - -type readerPartSize struct { - // buffer - b io.Reader - // partsize - p int64 - // partNumber - i int64 - // current part counter - j int64 - // Is EOF - e bool - // complete part slice - c *sdktps.CompletedMultipartUpload -} - -func (r *readerPartSize) NextPart(eTag *string) { - if r.c == nil { - r.c = &sdktps.CompletedMultipartUpload{ - Parts: nil, - } - } - - if r.c.Parts == nil { - r.c.Parts = make([]sdktps.CompletedPart, 0) - } - - r.c.Parts = append(r.c.Parts, sdktps.CompletedPart{ - ETag: sdkaws.String(strings.Replace(*eTag, "\"", "", -1)), - PartNumber: int32(r.i), - }) - - r.i++ - r.j = 0 -} - -func (r readerPartSize) CurrPart() int32 { - return int32(r.i) -} - -func (r readerPartSize) CompPart() *sdktps.CompletedMultipartUpload { - return r.c -} - -func (r readerPartSize) IeOEF() bool { - return r.e -} - -func (r *readerPartSize) Read(p []byte) (n int, err error) { - if r.e || r.j >= r.p { - return 0, io.EOF - } - - if len(p) > int(r.p-r.j) { - p = make([]byte, int(r.p-r.j)) - } - - n, e := r.b.Read(p) - - if errors.Is(e, io.EOF) { - r.e = true - } - - return n, e -} diff --git a/aws/multipart/errors.go b/aws/multipart/errors.go new file mode 100644 index 00000000..fbf625f0 --- /dev/null +++ b/aws/multipart/errors.go @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import "fmt" + +var ( + ErrInvalidInstance = fmt.Errorf("invalid instance") + ErrInvalidClient = fmt.Errorf("invalid aws S3 client") + ErrInvalidResponse = fmt.Errorf("invalid aws S3 response") + ErrInvalidUploadID = fmt.Errorf("invalid aws s3 MPU Upload ID") + ErrInvalidTMPFile = fmt.Errorf("invalid working or temporary file") + ErrWorkingPartFileExceedSize = fmt.Errorf("working or temporary file used exceed the aws S3 size limits") +) diff --git a/aws/multipart/interface.go b/aws/multipart/interface.go new file mode 100644 index 00000000..48b026cf --- /dev/null +++ b/aws/multipart/interface.go @@ -0,0 +1,114 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + "fmt" + "io" + "math" + "sync" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + libctx "github.com/nabbar/golib/context" + libsiz "github.com/nabbar/golib/size" +) + +const ( + DefaultPartSize = 5 * libsiz.SizeMega + MaxPartSize = 5 * libsiz.SizeGiga + MaxObjectSize = 5 * libsiz.SizeTera + MaxNumberPart int32 = 10000 +) + +type FuncClientS3 func() *sdksss.Client + +type MultiPart interface { + io.WriteCloser + + RegisterContext(fct libctx.FuncContext) + RegisterClientS3(fct FuncClientS3) + RegisterMultipartID(id string) + RegisterWorkingFile(file string, truncate bool) error + RegisterFuncOnPushPart(fct func(eTag string, e error)) + RegisterFuncOnAbort(fct func(nPart int, obj string, e error)) + RegisterFuncOnComplete(fct func(nPart int, obj string, e error)) + + StartMPU() error + StopMPU(abort bool) error + + AddPart(r io.Reader) (n int64, e error) + SendPart() error + CurrentSizePart() int64 + AddToPart(p []byte) (n int, e error) + RegisterPart(etag string) + + IsStarted() bool + Counter() int32 + CounterLeft() int32 +} + +func New(partSize libsiz.Size, object string, bucket string) MultiPart { + return &mpu{ + m: sync.RWMutex{}, + c: nil, + s: partSize, + i: "", + o: object, + b: bucket, + n: 0, + } +} + +func GetOptimalPartSize(objectSize, partSize libsiz.Size) (libsiz.Size, error) { + var ( + lim = math.MaxFloat64 + nbr int64 + prt int64 + obj int64 + ) + + if partSize <= DefaultPartSize { + prt = DefaultPartSize.Int64() + } else { + prt = partSize.Int64() + } + + obj = objectSize.Int64() + + if obj > (int64(MaxNumberPart) * MaxPartSize.Int64()) { + return 0, fmt.Errorf("object size need exceed the maximum number of part with the maximum size of part") + } else if objectSize > MaxObjectSize { + return 0, fmt.Errorf("object size is over allowed maximum size of object") + } else if uint64(obj) > uint64(lim) || uint64(prt) > uint64(lim) || uint64(obj/prt) > uint64(lim) { + return GetOptimalPartSize(objectSize, libsiz.SizeFromInt64(prt*2)) + } else if nbr = int64(math.Ceil(float64(obj) / float64(prt))); nbr > int64(MaxNumberPart) { + return GetOptimalPartSize(objectSize, libsiz.SizeFromInt64(prt*2)) + } else if prt > MaxPartSize.Int64() { + return GetOptimalPartSize(objectSize, libsiz.SizeFromInt64((prt/4)*3)) + } + + return libsiz.SizeFromInt64(prt), nil +} diff --git a/aws/multipart/io.go b/aws/multipart/io.go new file mode 100644 index 00000000..3ce4b859 --- /dev/null +++ b/aws/multipart/io.go @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +func (m *mpu) Write(p []byte) (n int, err error) { + return m.AddToPart(p) +} + +func (m *mpu) Close() error { + return m.StopMPU(false) +} diff --git a/aws/multipart/model.go b/aws/multipart/model.go new file mode 100644 index 00000000..06d7b701 --- /dev/null +++ b/aws/multipart/model.go @@ -0,0 +1,286 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + "context" + "fmt" + "path/filepath" + "sync" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" + libctx "github.com/nabbar/golib/context" + libiot "github.com/nabbar/golib/ioutils" + libsiz "github.com/nabbar/golib/size" +) + +type mpu struct { + m sync.RWMutex + x libctx.FuncContext + c FuncClientS3 + s libsiz.Size // part size + i string // upload id + b string // bucket name + o string // object name + n int32 // part counter + l []sdktyp.CompletedPart // slice of sent part to prepare complete MPU + w libiot.FileProgress // working file or temporary file + + // trigger function + fc func(nPart int, obj string, e error) // on complete + fp func(eTag string, e error) // on push part + fa func(nPart int, obj string, e error) // on abort +} + +func (m *mpu) RegisterContext(fct libctx.FuncContext) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.x = fct +} + +func (m *mpu) getContext() context.Context { + if m == nil { + return context.Background() + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.x == nil { + return context.Background() + } else if x := m.x(); x == nil { + return context.Background() + } else { + return x + } +} + +func (m *mpu) RegisterClientS3(fct FuncClientS3) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.c = fct +} + +func (m *mpu) getClient() *sdksss.Client { + if m == nil { + return nil + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.c == nil { + return nil + } else if c := m.c(); c == nil { + return nil + } else { + return c + } +} + +func (m *mpu) RegisterMultipartID(id string) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.i = id +} + +func (m *mpu) getMultipartID() string { + if m == nil { + return "" + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.i +} + +func (m *mpu) RegisterWorkingFile(file string, truncate bool) error { + if m == nil { + return ErrInvalidInstance + } + + m.m.Lock() + defer m.m.Unlock() + + var e error + + if m.w != nil { + m.m.Unlock() + + if e = m.CheckSend(true, false); e != nil { + return e + } + + m.m.Lock() + _ = m.w.Close() + m.w = nil + } + + m.w, e = libiot.NewFileProgressPathWrite(filepath.Clean(file), true, truncate, 0600) + + if e != nil { + return e + } else if truncate { + return m.w.Truncate(0) + } + + return nil +} + +func (m *mpu) getWorkingFile() (libiot.FileProgress, error) { + if m == nil { + return nil, ErrInvalidInstance + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.w != nil { + return m.w, nil + } + + m.m.RUnlock() + e := m.setTempWorkingFile() + m.m.RLock() + + if e != nil { + return nil, e + } else if m.w == nil { + return nil, ErrInvalidTMPFile + } + + return m.w, nil +} + +func (m *mpu) setTempWorkingFile() error { + if m == nil { + return ErrInvalidInstance + } + + m.m.Lock() + defer m.m.Unlock() + + var e error + m.w, e = libiot.NewFileProgressTemp() + return e +} + +func (m *mpu) closeWorkingFile() error { + if m == nil { + return nil + } + + m.m.Lock() + defer m.m.Unlock() + + if m.w == nil { + return nil + } + + var e error + + e = m.w.Truncate(0) + + if er := m.w.Close(); er != nil { + if e != nil { + e = fmt.Errorf("%v, %v", e, er) + } else { + e = er + } + } + + m.w = nil + return e +} + +func (m *mpu) getPartSize() libsiz.Size { + if m == nil { + return DefaultPartSize + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.s < 1 { + return DefaultPartSize + } + + return m.s +} + +func (m *mpu) setPartSize(s libsiz.Size) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + if s < 1 { + s = DefaultPartSize + } + + m.s = s +} + +func (m *mpu) getObject() string { + if m == nil { + return "" + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.o +} + +func (m *mpu) getBucket() string { + if m == nil { + return "" + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.b +} diff --git a/aws/multipart/part.go b/aws/multipart/part.go new file mode 100644 index 00000000..d126cea1 --- /dev/null +++ b/aws/multipart/part.go @@ -0,0 +1,305 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + /* #nosec */ + //nolint #nosec + "crypto/md5" + "encoding/base64" + "fmt" + "io" + "strings" + + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" + libiot "github.com/nabbar/golib/ioutils" + libsiz "github.com/nabbar/golib/size" +) + +func (m *mpu) getPartList() []sdktyp.CompletedPart { + if m == nil { + return make([]sdktyp.CompletedPart, 0) + } + + m.m.RLock() + defer m.m.RUnlock() + + if len(m.l) < 1 { + return make([]sdktyp.CompletedPart, 0) + } + + return m.l +} + +func (m *mpu) Counter() int32 { + if m == nil { + return 0 + } + + m.m.RLock() + defer m.m.RUnlock() + + return m.n +} + +func (m *mpu) CounterLeft() int32 { + if m == nil { + return 0 + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.n >= MaxNumberPart { + return 0 + } + + return MaxNumberPart - m.n +} + +func (m *mpu) RegisterPart(etag string) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + if len(m.l) < 1 { + m.l = make([]sdktyp.CompletedPart, 0) + } + + m.n++ + m.l = append(m.l, sdktyp.CompletedPart{ + ETag: sdkaws.String(strings.Replace(etag, "\"", "", -1)), + PartNumber: m.n, + }) +} + +func (m *mpu) AddPart(r io.Reader) (n int64, e error) { + if m == nil { + return 0, ErrInvalidInstance + } + + var ( + cli *sdksss.Client + res *sdksss.UploadPartOutput + tmp libiot.FileProgress + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + mid = m.getMultipartID() + + /* #nosec */ + //nolint #nosec + hsh = md5.New() + ) + + if cli = m.getClient(); cli == nil { + return 0, ErrInvalidClient + } else if tmp, e = libiot.NewFileProgressTemp(); e != nil { + return 0, e + } else if tmp == nil { + return 0, ErrInvalidTMPFile + } else { + defer func() { + if tmp != nil { + _ = tmp.Close() + } + }() + } + + if n, e = io.Copy(tmp, r); e != nil || n < 1 { + return n, e + } else if _, e = tmp.Seek(0, io.SeekStart); e != nil { + return 0, e + } else if _, e = tmp.WriteTo(hsh); e != nil { + return 0, e + } else if _, e = tmp.Seek(0, io.SeekStart); e != nil { + return 0, e + } + + res, e = cli.UploadPart(ctx, &sdksss.UploadPartInput{ + Bucket: sdkaws.String(bck), + Key: sdkaws.String(obj), + UploadId: sdkaws.String(mid), + PartNumber: m.Counter() + 1, + ContentLength: n, + Body: tmp, + RequestPayer: sdktyp.RequestPayerRequester, + ContentMD5: sdkaws.String(base64.StdEncoding.EncodeToString(hsh.Sum(nil))), + }) + + if e != nil { + m.callFuncOnPushPart("", e) + return 0, e + } else if res == nil || res.ETag == nil || len(*res.ETag) < 1 { + m.callFuncOnPushPart("", ErrInvalidResponse) + return 0, ErrInvalidResponse + } else { + t := *res.ETag + m.callFuncOnPushPart(t, nil) + m.RegisterPart(t) + } + + return n, nil +} + +func (m *mpu) AddToPart(p []byte) (n int, e error) { + var ( + tmp libiot.FileProgress + ) + + if tmp, e = m.getWorkingFile(); e != nil { + return 0, e + } else if tmp == nil { + return 0, ErrInvalidTMPFile + } + + for len(p) > 0 { + var ( + r []byte + i int + s int64 + siz = m.getPartSize().Int64() + ) + + if _, e = tmp.Seek(0, io.SeekStart); e != nil { + return n, e + } else if s, e = tmp.SizeToEOF(); e != nil { + return n, e + } else if _, e = tmp.Seek(0, io.SeekEnd); e != nil { + return n, e + } else if s > 0 && s >= siz { + if e = m.CheckSend(false, false); e != nil { + return n, e + } + continue + } else if s > 0 && s < siz { + siz -= s + } + + if int64(len(p)) > siz { + r = p[:siz] + p = p[siz:] + } else { + r = p + p = nil + } + + if i, e = tmp.Write(r); e != nil { + return n, e + } else if i != len(r) { + return n, fmt.Errorf("write a wrong number of byte") + } else if e = m.CheckSend(false, false); e != nil { + return n, e + } else { + n += len(r) + } + } + + return n, nil +} + +func (m *mpu) SendPart() error { + return m.CheckSend(true, false) +} + +func (m *mpu) CurrentSizePart() int64 { + var ( + e error + s int64 + tmp libiot.FileProgress + ) + + if tmp, e = m.getWorkingFile(); e != nil { + return 0 + } else if tmp == nil { + return 0 + } else if _, e = tmp.Seek(0, io.SeekStart); e != nil { + return 0 + } else { + s, e = tmp.SizeToEOF() + _, _ = tmp.Seek(0, io.SeekEnd) + return s + } +} + +func (m *mpu) CheckSend(force, close bool) error { + var ( + err error + siz int64 + prt = m.getPartSize() + tmp libiot.FileProgress + ) + + if tmp, err = m.getWorkingFile(); err != nil { + return err + } else if tmp == nil { + return ErrInvalidTMPFile + } else if _, err = tmp.Seek(0, io.SeekStart); err != nil { + return err + } else if siz, err = tmp.SizeToEOF(); err != nil { + return err + } else if siz < prt.Int64() && !force { + return nil + } else if siz == 0 { + return nil + } else if siz > int64(MaxObjectSize) { + return ErrWorkingPartFileExceedSize + } else if close && m.Counter() < 1 && siz < DefaultPartSize.Int64() { + return nil + } else if _, err = m.sendPart(siz, tmp); err != nil { + return err + } else if err = tmp.Truncate(0); err != nil { + return err + } else if err = tmp.Sync(); err != nil { + return err + } else { + return nil + } +} + +func (m *mpu) sendPart(siz int64, body io.Reader) (int64, error) { + var ( + err error + prt = m.getPartSize() + ) + + if prt, err = GetOptimalPartSize(libsiz.SizeFromInt64(siz), prt); err != nil { + return 0, err + } else if prt != m.getPartSize() { + old := m.getPartSize() + m.setPartSize(prt) + defer func() { + m.setPartSize(old) + }() + } + + return m.AddPart(body) +} diff --git a/aws/multipart/start.go b/aws/multipart/start.go new file mode 100644 index 00000000..389d2177 --- /dev/null +++ b/aws/multipart/start.go @@ -0,0 +1,92 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + "mime" + "path/filepath" + + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func (m *mpu) IsStarted() bool { + if cli := m.getClient(); cli == nil { + return false + } else if len(m.getMultipartID()) < 1 { + return false + } + + return true +} + +func (m *mpu) getMimeType() string { + if t := mime.TypeByExtension(filepath.Ext(m.getObject())); t == "" { + return "application/octet-stream" + } else { + return t + } +} + +func (m *mpu) StartMPU() error { + if m == nil { + return ErrInvalidInstance + } + + var ( + cli *sdksss.Client + res *sdksss.CreateMultipartUploadOutput + err error + tpe = m.getMimeType() + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + ) + + if cli = m.getClient(); cli == nil { + return ErrInvalidClient + } + + res, err = cli.CreateMultipartUpload(ctx, &sdksss.CreateMultipartUploadInput{ + Key: sdkaws.String(obj), + Bucket: sdkaws.String(bck), + ContentType: sdkaws.String(tpe), + }) + + if err != nil { + return err + } else if res == nil { + return ErrInvalidResponse + } else if res.UploadId == nil || len(*res.UploadId) < 1 { + return ErrInvalidResponse + } + + m.m.Lock() + defer m.m.Unlock() + m.i = *res.UploadId + + return nil +} diff --git a/aws/multipart/stop.go b/aws/multipart/stop.go new file mode 100644 index 00000000..7eb76946 --- /dev/null +++ b/aws/multipart/stop.go @@ -0,0 +1,189 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +import ( + "io" + + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" + libiot "github.com/nabbar/golib/ioutils" +) + +func (m *mpu) StopMPU(abort bool) error { + if m == nil { + return ErrInvalidInstance + } + + var ( + err error + lst = m.getPartList() + ) + + if !abort { + if err = m.CheckSend(true, true); err != nil { + return err + } + } + + if abort || len(lst) < 1 { + err = m.abortMPU() + } else { + err = m.completeMPU() + } + + if !abort && err == nil && len(lst) < 1 && m.CurrentSizePart() > 0 { + err = m.SendObject() + } + + m.callFuncOnComplete(abort, len(lst), m.getObject(), err) + + if err != nil { + _ = m.closeWorkingFile() + } + + m.cleanMPU() + return nil +} + +func (m *mpu) cleanMPU() { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.i = "" + m.o = "" + m.b = "" + m.l = nil + m.n = 0 +} + +func (m *mpu) SendObject() error { + var ( + err error + cli *sdksss.Client + res *sdksss.PutObjectOutput + tmp libiot.FileProgress + + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + tpe = m.getMimeType() + ) + + if cli = m.getClient(); cli == nil { + return ErrInvalidClient + } else if m.CurrentSizePart() < 1 { + return nil + } else if tmp, err = m.getWorkingFile(); err != nil { + return err + } else if tmp == nil { + return ErrInvalidTMPFile + } else if _, err = tmp.Seek(0, io.SeekStart); err != nil { + return err + } + + res, err = cli.PutObject(ctx, &sdksss.PutObjectInput{ + Bucket: sdkaws.String(bck), + Key: sdkaws.String(obj), + Body: tmp, + ContentType: sdkaws.String(tpe), + }) + + if err == nil { + if res == nil { + err = ErrInvalidResponse + } else if res.ETag == nil || len(*res.ETag) < 1 { + err = ErrInvalidResponse + } + } + + return err +} + +func (m *mpu) abortMPU() error { + var ( + cli *sdksss.Client + err error + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + mid = m.getMultipartID() + mod = &sdksss.AbortMultipartUploadInput{ + Bucket: sdkaws.String(bck), + Key: sdkaws.String(obj), + UploadId: sdkaws.String(mid), + } + ) + + if cli = m.getClient(); cli == nil { + return ErrInvalidClient + } else if len(mid) < 1 { + return nil + } else if _, err = cli.AbortMultipartUpload(ctx, mod); err != nil { + return err + } + + return nil +} + +func (m *mpu) completeMPU() error { + var ( + cli *sdksss.Client + res *sdksss.CompleteMultipartUploadOutput + err error + ctx = m.getContext() + obj = m.getObject() + bck = m.getBucket() + mid = m.getMultipartID() + lst = m.getPartList() + mod = &sdksss.CompleteMultipartUploadInput{ + Bucket: sdkaws.String(bck), + Key: sdkaws.String(obj), + UploadId: sdkaws.String(mid), + MultipartUpload: &sdktyp.CompletedMultipartUpload{ + Parts: lst, + }, + RequestPayer: sdktyp.RequestPayerRequester, + } + ) + + if cli = m.getClient(); cli == nil { + return ErrInvalidClient + } else if len(mid) < 1 { + return ErrInvalidUploadID + } else if res, err = cli.CompleteMultipartUpload(ctx, mod); err != nil { + return err + } else if res.Key == nil || len(*res.Key) < 1 { + return ErrInvalidResponse + } + + return nil +} diff --git a/aws/multipart/trigger.go b/aws/multipart/trigger.go new file mode 100644 index 00000000..5c39e9b5 --- /dev/null +++ b/aws/multipart/trigger.go @@ -0,0 +1,87 @@ +/* + * MIT License + * + * Copyright (c) 2020 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package multipart + +func (m *mpu) RegisterFuncOnPushPart(fct func(eTag string, e error)) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.fp = fct +} + +func (m *mpu) callFuncOnPushPart(eTag string, e error) { + if m == nil { + return + } + + m.m.RLock() + defer m.m.RUnlock() + + if m.fp != nil { + m.fp(eTag, e) + } +} + +func (m *mpu) RegisterFuncOnAbort(fct func(nPart int, obj string, e error)) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.fa = fct +} + +func (m *mpu) RegisterFuncOnComplete(fct func(nPart int, obj string, e error)) { + if m == nil { + return + } + + m.m.Lock() + defer m.m.Unlock() + + m.fc = fct +} + +func (m *mpu) callFuncOnComplete(abort bool, nPart int, obj string, e error) { + if m == nil { + return + } + + m.m.RLock() + defer m.m.RUnlock() + + if !abort && m.fc != nil { + m.fc(nPart, obj, e) + } else if abort && m.fa != nil { + m.fa(nPart, obj, e) + } +} diff --git a/aws/object/interface.go b/aws/object/interface.go index 9e79985e..59e88569 100644 --- a/aws/object/interface.go +++ b/aws/object/interface.go @@ -30,6 +30,8 @@ import ( "io" "time" + libmpu "github.com/nabbar/golib/aws/multipart" + libsiz "github.com/nabbar/golib/size" sdkiam "github.com/aws/aws-sdk-go-v2/service/iam" @@ -67,6 +69,7 @@ type Object interface { GetAttributes(object, version string) (*sdksss.GetObjectAttributesOutput, liberr.Error) MultipartList(keyMarker, markerId string) (uploads []sdktps.MultipartUpload, nextKeyMarker string, nextIdMarker string, count int64, e liberr.Error) + MultipartNew(partSize libsiz.Size, object string) libmpu.MultiPart MultipartPut(object string, body io.Reader) liberr.Error MultipartPutCustom(partSize libsiz.Size, object string, body io.Reader) liberr.Error MultipartCancel(uploadId, key string) liberr.Error diff --git a/aws/object/multipart.go b/aws/object/multipart.go index ce6df4e8..5747e5b0 100644 --- a/aws/object/multipart.go +++ b/aws/object/multipart.go @@ -26,29 +26,17 @@ package object import ( - //nolint #nosec - /* #nosec */ - "crypto/md5" - "encoding/base64" "io" - "mime" - "path/filepath" - - libsiz "github.com/nabbar/golib/size" - - //nolint #gci - "os" sdkaws "github.com/aws/aws-sdk-go-v2/aws" sdksss "github.com/aws/aws-sdk-go-v2/service/s3" sdktyp "github.com/aws/aws-sdk-go-v2/service/s3/types" libhlp "github.com/nabbar/golib/aws/helper" + libmpu "github.com/nabbar/golib/aws/multipart" liberr "github.com/nabbar/golib/errors" - libiou "github.com/nabbar/golib/ioutils" + libsiz "github.com/nabbar/golib/size" ) -const DefaultPartSize = 5 * libsiz.SizeMega - // MultipartList implement the ListMultipartUploads. // See docs for more infos : https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html func (cli *client) MultipartList(keyMarker, markerId string) (uploads []sdktyp.MultipartUpload, nextKeyMarker string, nextIdMarker string, count int64, e liberr.Error) { @@ -73,141 +61,57 @@ func (cli *client) MultipartList(keyMarker, markerId string) (uploads []sdktyp.M } } +func (cli *client) MultipartNew(partSize libsiz.Size, object string) libmpu.MultiPart { + m := libmpu.New(partSize, object, cli.GetBucketName()) + m.RegisterContext(cli.GetContext) + m.RegisterClientS3(func() *sdksss.Client { + return cli.s3 + }) + + return m +} + func (cli *client) MultipartPut(object string, body io.Reader) liberr.Error { - return cli.MultipartPutCustom(DefaultPartSize, object, body) + return cli.MultipartPutCustom(libmpu.DefaultPartSize, object, body) } func (cli *client) MultipartPutCustom(partSize libsiz.Size, object string, body io.Reader) liberr.Error { var ( - tmp libiou.FileProgress - rio libhlp.ReaderPartSize - upl *sdksss.CreateMultipartUploadOutput - err error - tpe *string + e error + m = cli.MultipartNew(partSize, object) ) defer func() { - if tmp != nil { - _ = tmp.Close() + if m != nil { + _ = m.Close() } }() - if t := mime.TypeByExtension(filepath.Ext(object)); t == "" { - tpe = sdkaws.String("application/octet-stream") + if e = m.StartMPU(); e != nil { + return cli.GetError(e) + } else if _, e = io.Copy(m, body); e != nil { + return cli.GetError(e) + } else if e = m.StopMPU(false); e != nil { + return cli.GetError(e) } else { - tpe = sdkaws.String(t) - } - - upl, err = cli.s3.CreateMultipartUpload(cli.GetContext(), &sdksss.CreateMultipartUploadInput{ - Key: sdkaws.String(object), - Bucket: sdkaws.String(cli.GetBucketName()), - ContentType: tpe, - }) - - if err != nil { - return cli.GetError(err) - } else if upl == nil { - return libhlp.ErrorResponse.Error(nil) - } - - rio = libhlp.NewReaderPartSize(body, partSize) - - for !rio.IeOEF() { - var ( - inf os.FileInfo - prt *sdksss.UploadPartOutput - ) - - tmp, err = libiou.NewFileProgressTemp() - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - _, err = io.Copy(tmp, rio) - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - _, err = tmp.Seek(0, io.SeekStart) - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - inf, err = tmp.FileStat() - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - /* #nosec */ - h := md5.New() - if _, e := tmp.WriteTo(h); e != nil { - return cli._MultipartCancel(e, upl.UploadId, object) - } - - _, err = tmp.Seek(0, io.SeekStart) - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } - - prt, err = cli.s3.UploadPart(cli.GetContext(), &sdksss.UploadPartInput{ - Bucket: sdkaws.String(cli.GetBucketName()), - Body: tmp, - PartNumber: rio.CurrPart(), - UploadId: upl.UploadId, - Key: sdkaws.String(object), - ContentLength: inf.Size(), - RequestPayer: sdktyp.RequestPayerRequester, - ContentMD5: sdkaws.String(base64.StdEncoding.EncodeToString(h.Sum(nil))), - }) - - _ = tmp.Close() - tmp = nil - - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } else if prt == nil || prt.ETag == nil || len(*prt.ETag) == 0 { - return cli._MultipartCancel(libhlp.ErrorResponse.Error(nil), upl.UploadId, object) - } - - rio.NextPart(prt.ETag) - } - - var prt *sdksss.CompleteMultipartUploadOutput - prt, err = cli.s3.CompleteMultipartUpload(cli.GetContext(), &sdksss.CompleteMultipartUploadInput{ - Bucket: sdkaws.String(cli.GetBucketName()), - Key: sdkaws.String(object), - UploadId: upl.UploadId, - MultipartUpload: rio.CompPart(), - RequestPayer: sdktyp.RequestPayerRequester, - }) - - if err != nil { - return cli._MultipartCancel(err, upl.UploadId, object) - } else if prt == nil || prt.ETag == nil || len(*prt.ETag) == 0 { - return cli._MultipartCancel(libhlp.ErrorResponse.Error(nil), upl.UploadId, object) + m = nil } return nil } -func (cli *client) _MultipartCancel(err error, updIp *string, object string) liberr.Error { - cnl, e := cli.s3.AbortMultipartUpload(cli.GetContext(), &sdksss.AbortMultipartUploadInput{ +func (cli *client) MultipartCancel(uploadId, key string) liberr.Error { + res, err := cli.s3.AbortMultipartUpload(cli.GetContext(), &sdksss.AbortMultipartUploadInput{ Bucket: sdkaws.String(cli.GetBucketName()), - UploadId: updIp, - Key: sdkaws.String(object), + UploadId: sdkaws.String(uploadId), + Key: sdkaws.String(key), }) - if e != nil { - return cli.GetError(e, err) - } else if cnl == nil { - return libhlp.ErrorResponse.Error(cli.GetError(err)) - } else if err != nil { + if err != nil { return cli.GetError(err) + } else if res == nil { + return libhlp.ErrorResponse.Error(nil) } else { return nil } } - -func (cli *client) MultipartCancel(uploadId, key string) liberr.Error { - return cli._MultipartCancel(nil, sdkaws.String(uploadId), key) -} diff --git a/aws/object_test.go b/aws/object_test.go index a78923ab..6ae4ece2 100644 --- a/aws/object_test.go +++ b/aws/object_test.go @@ -28,6 +28,8 @@ package aws_test import ( "bytes" + libsiz "github.com/nabbar/golib/size" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -70,7 +72,7 @@ var _ = Describe("Object", func() { Context("Multipart Put object", func() { It("Must fail as the bucket doesn't exists - 5", func() { - err := cli.Object().MultipartPut("object", randContent(4*1024)) + err := cli.Object().MultipartPut("object", randContent(10*libsiz.SizeMega)) Expect(err).To(HaveOccurred()) }) }) diff --git a/cobra/configure.go b/cobra/configure.go index f033fba0..39425dd1 100644 --- a/cobra/configure.go +++ b/cobra/configure.go @@ -79,7 +79,7 @@ func (c *cobra) ConfigureCheckArgs(basename string, args []string) error { return nil } -func (c *cobra) ConfigureWriteConfig(basename string, defaultConfig func() io.Reader) error { +func (c *cobra) ConfigureWriteConfig(basename string, defaultConfig func() io.Reader, printMsg func(pkg, file string)) error { pkg := c.getPackageName() if basename == "" && pkg != "" { @@ -137,9 +137,14 @@ func (c *cobra) ConfigureWriteConfig(basename string, defaultConfig func() io.Re return err } - println(fmt.Sprintf("\n\t>> Config File '%s' has been created and file permission have been set.", cfgFile)) - println("\t>> To explicitly specify this config file when you call this tool, use the '-c' flag like this: ") - println(fmt.Sprintf("\t\t\t %s -c %s ...\n", pkg, cfgFile)) + if printMsg == nil { + println(fmt.Sprintf("\n\t>> Config File '%s' has been created and file permission have been set.", cfgFile)) + println("\t>> To explicitly specify this config file when you call this tool, use the '-c' flag like this: ") + println(fmt.Sprintf("\t\t\t %s -c %s ...\n", pkg, cfgFile)) + } else { + printMsg(pkg, cfgFile) + } + return nil } @@ -198,7 +203,7 @@ func (c *cobra) AddCommandConfigure(basename string, defaultConfig func() io.Rea override by passed flag in command line and completed with default for non existing values.`, RunE: func(cmd *spfcbr.Command, args []string) error { - return c.ConfigureWriteConfig(basename, defaultConfig) + return c.ConfigureWriteConfig(basename, defaultConfig, nil) }, Args: func(cmd *spfcbr.Command, args []string) error { diff --git a/cobra/interface.go b/cobra/interface.go index b7dea798..8ab96774 100644 --- a/cobra/interface.go +++ b/cobra/interface.go @@ -65,7 +65,7 @@ type Cobra interface { Cobra() *spfcbr.Command ConfigureCheckArgs(basename string, args []string) error - ConfigureWriteConfig(basename string, defaultConfig func() io.Reader) error + ConfigureWriteConfig(basename string, defaultConfig func() io.Reader, printMsg func(pkg, file string)) error } func New() Cobra { diff --git a/console/color.go b/console/color.go index f59209c9..074f4b6d 100644 --- a/console/color.go +++ b/console/color.go @@ -26,8 +26,8 @@ package console import ( - "bufio" "fmt" + "io" "github.com/fatih/color" "github.com/nabbar/golib/errors" @@ -89,7 +89,7 @@ func (c colorType) Print(text string) { } } -func (c colorType) BuffPrintf(buff *bufio.ReadWriter, format string, args ...interface{}) (n int, err errors.Error) { +func (c colorType) BuffPrintf(buff io.Writer, format string, args ...interface{}) (n int, err errors.Error) { if colorList[c] != nil && buff != nil { //nolint #nosec diff --git a/go.mod b/go.mod index d31e826e..0a7b28e6 100644 --- a/go.mod +++ b/go.mod @@ -13,39 +13,39 @@ require ( github.com/fatih/color v1.15.0 github.com/fsnotify/fsnotify v1.6.0 github.com/fxamacker/cbor/v2 v2.4.0 - github.com/gin-gonic/gin v1.9.0 + github.com/gin-gonic/gin v1.9.1 github.com/go-ldap/ldap/v3 v3.4.4 - github.com/go-playground/validator/v10 v10.13.0 + github.com/go-playground/validator/v10 v10.14.0 github.com/google/go-github/v33 v33.0.0 github.com/hashicorp/go-hclog v1.5.0 github.com/hashicorp/go-retryablehttp v0.7.2 github.com/hashicorp/go-uuid v1.0.3 github.com/hashicorp/go-version v1.6.0 - github.com/jlaffaye/ftp v0.1.0 + github.com/jlaffaye/ftp v0.2.0 github.com/lni/dragonboat/v3 v3.3.6 github.com/matcornic/hermes/v2 v2.1.0 github.com/mattn/go-colorable v0.1.13 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.5.0 github.com/nats-io/jwt/v2 v2.4.1 - github.com/nats-io/nats-server/v2 v2.9.16 - github.com/nats-io/nats.go v1.25.0 - github.com/nutsdb/nutsdb v0.12.0 - github.com/onsi/ginkgo/v2 v2.9.4 - github.com/onsi/gomega v1.27.6 + github.com/nats-io/nats-server/v2 v2.9.17 + github.com/nats-io/nats.go v1.26.0 + github.com/nutsdb/nutsdb v0.12.2 + github.com/onsi/ginkgo/v2 v2.9.7 + github.com/onsi/gomega v1.27.7 github.com/pelletier/go-toml v1.9.5 github.com/prometheus/client_golang v1.15.1 github.com/shirou/gopsutil v3.21.11+incompatible - github.com/sirupsen/logrus v1.9.0 + github.com/sirupsen/logrus v1.9.2 github.com/spf13/cobra v1.7.0 github.com/spf13/jwalterweatherman v1.1.0 - github.com/spf13/viper v1.15.0 + github.com/spf13/viper v1.16.0 github.com/ugorji/go/codec v1.2.11 github.com/vbauerster/mpb/v5 v5.4.0 - github.com/xanzy/go-gitlab v0.83.0 + github.com/xanzy/go-gitlab v0.84.0 github.com/xhit/go-simple-mail v2.2.2+incompatible github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 - golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 golang.org/x/net v0.10.0 golang.org/x/oauth2 v0.8.0 golang.org/x/sync v0.2.0 @@ -53,22 +53,22 @@ require ( golang.org/x/term v0.8.0 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/clickhouse v0.5.1 - gorm.io/driver/mysql v1.5.0 - gorm.io/driver/postgres v1.5.0 - gorm.io/driver/sqlite v1.5.0 - gorm.io/driver/sqlserver v1.4.3 + gorm.io/driver/mysql v1.5.1 + gorm.io/driver/postgres v1.5.2 + gorm.io/driver/sqlite v1.5.1 + gorm.io/driver/sqlserver v1.5.0 gorm.io/gorm v1.25.1 ) require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect - github.com/ClickHouse/ch-go v0.55.0 // indirect - github.com/ClickHouse/clickhouse-go/v2 v2.9.3 // indirect + github.com/ClickHouse/ch-go v0.56.0 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.10.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible // indirect github.com/PuerkitoBio/goquery v1.8.1 // indirect - github.com/VictoriaMetrics/metrics v1.23.1 // indirect + github.com/VictoriaMetrics/metrics v1.24.0 // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/andybalholm/brotli v1.0.5 // indirect @@ -90,13 +90,14 @@ require ( github.com/aws/smithy-go v1.13.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect - github.com/bytedance/sonic v1.8.8 // indirect + github.com/bytedance/sonic v1.9.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cockroachdb/errors v1.9.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/pebble v0.0.0-20210331181633-27fc006b8bfb // indirect github.com/cockroachdb/redact v1.1.4 // indirect + github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/getsentry/sentry-go v0.21.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect @@ -117,7 +118,7 @@ require ( github.com/google/btree v1.1.2 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3 // indirect + github.com/google/pprof v0.0.0-20230602010524-ada837c32108 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gorilla/css v1.0.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -130,7 +131,7 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/memberlist v0.5.0 // indirect github.com/huandu/xstrings v1.4.0 // indirect - github.com/imdario/mergo v0.3.15 // indirect + github.com/imdario/mergo v0.3.16 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect @@ -147,12 +148,12 @@ require ( github.com/leodido/go-urn v1.2.4 // indirect github.com/lni/goutils v1.3.0 // indirect github.com/magiconair/properties v1.8.7 // indirect - github.com/mattn/go-isatty v0.0.18 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect - github.com/mattn/go-sqlite3 v1.14.16 // indirect + github.com/mattn/go-sqlite3 v1.14.17 // indirect github.com/mattn/go-tty v0.0.5 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/microsoft/go-mssqldb v0.21.0 // indirect + github.com/microsoft/go-mssqldb v1.1.0 // indirect github.com/miekg/dns v1.1.54 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -163,13 +164,13 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/paulmach/orb v0.9.2 // indirect - github.com/pelletier/go-toml/v2 v2.0.7 // indirect + github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkg/term v1.2.0-beta.2 // indirect github.com/prometheus/client_model v0.4.0 // indirect - github.com/prometheus/common v0.43.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -177,7 +178,7 @@ require ( github.com/segmentio/asm v1.2.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/spf13/afero v1.9.5 // indirect - github.com/spf13/cast v1.5.0 // indirect + github.com/spf13/cast v1.5.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf // indirect github.com/subosito/gotenv v1.4.2 // indirect @@ -189,14 +190,14 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xujiajun/mmap-go v1.0.1 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect - go.opentelemetry.io/otel v1.15.1 // indirect - go.opentelemetry.io/otel/trace v1.15.1 // indirect + go.opentelemetry.io/otel v1.16.0 // indirect + go.opentelemetry.io/otel/trace v1.16.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.9.0 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.9.1 // indirect + golang.org/x/tools v0.9.3 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/httpcli/network.go b/httpcli/network.go index 9f4796b5..90396b06 100644 --- a/httpcli/network.go +++ b/httpcli/network.go @@ -33,12 +33,15 @@ type Network uint8 const ( NetworkTCP Network = iota NetworkUDP + NetworkUnix ) func GetNetworkFromString(str string) Network { - switch strings.ToLower(str) { - case NetworkUDP.Code(): + switch { + case strings.EqualFold(NetworkUDP.Code(), str): return NetworkUDP + case strings.EqualFold(NetworkUnix.Code(), str): + return NetworkUnix default: return NetworkTCP } @@ -48,6 +51,8 @@ func (n Network) String() string { switch n { case NetworkUDP: return "UDP" + case NetworkUnix: + return "unix" default: return "TCP" } diff --git a/ioutils/error.go b/ioutils/error.go index 184ba2e8..5a49b706 100644 --- a/ioutils/error.go +++ b/ioutils/error.go @@ -38,6 +38,8 @@ const ( ErrorSyscallRLimitSet ErrorIOFileStat ErrorIOFileSeek + ErrorIOFileTruncate + ErrorIOFileSync ErrorIOFileOpen ErrorIOFileTempNew ErrorIOFileTempClose @@ -64,6 +66,10 @@ func getMessage(code liberr.CodeError) (message string) { return "error occur while trying to get stat of file" case ErrorIOFileSeek: return "error occur while trying seek into file" + case ErrorIOFileTruncate: + return "error occur while trying truncate file" + case ErrorIOFileSync: + return "error occur while trying to sync file" case ErrorIOFileOpen: return "error occur while trying to open file" case ErrorIOFileTempNew: diff --git a/ioutils/fileProgess.go b/ioutils/fileProgess.go index dfd536e5..f64c7160 100644 --- a/ioutils/fileProgess.go +++ b/ioutils/fileProgess.go @@ -67,6 +67,8 @@ type FileProgress interface { FileStat() (os.FileInfo, liberr.Error) SizeToEOF() (size int64, err liberr.Error) + Truncate(size int64) liberr.Error + Sync() liberr.Error NewFilePathMode(filepath string, mode int, perm os.FileMode) (FileProgress, liberr.Error) NewFilePathWrite(filepath string, create, overwrite bool, perm os.FileMode) (FileProgress, liberr.Error) @@ -123,12 +125,14 @@ func NewFileProgressPathMode(filepath string, mode int, perm os.FileMode) (FileP } func NewFileProgressPathWrite(filepath string, create, overwrite bool, perm os.FileMode) (FileProgress, liberr.Error) { - mode := os.O_RDWR | os.O_TRUNC + var mode = os.O_RDWR if _, err := os.Stat(filepath); err != nil && os.IsNotExist(err) && create { - mode = os.O_RDWR | os.O_CREATE | os.O_TRUNC + mode = os.O_RDWR | os.O_CREATE } else if err != nil { return nil, ErrorIOFileStat.ErrorParent(err) + } else if err == nil && overwrite { + mode = os.O_RDWR | os.O_TRUNC } return NewFileProgressPathMode(filepath, mode, perm) @@ -271,6 +275,32 @@ func (f *fileProgress) SizeToEOF() (size int64, err liberr.Error) { } } +func (f *fileProgress) Truncate(size int64) liberr.Error { + if f == nil { + return ErrorNilPointer.Error(nil) + } + + if e := f.fs.Truncate(size); e != nil { + return ErrorIOFileTruncate.ErrorParent(e) + } + + f.reset(0) + + return nil +} + +func (f *fileProgress) Sync() liberr.Error { + if f == nil { + return ErrorNilPointer.Error(nil) + } + + if e := f.fs.Sync(); e != nil { + return ErrorIOFileSync.ErrorParent(e) + } + + return nil +} + func (f *fileProgress) SetIncrement(increment func(size int64)) { if f != nil { f.fc = increment