diff --git a/go.mod b/go.mod index daee649..950aa39 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,13 @@ module github.com/gozelle/async go 1.18 require ( - github.com/gozelle/multierr v0.0.1 + github.com/gozelle/atomic v1.10.10 github.com/gozelle/testify v1.8.11 + golang.org/x/sync v0.3.0 ) require ( - github.com/gozelle/atomic v1.10.10 // indirect github.com/gozelle/go-difflib v1.0.0 // indirect github.com/gozelle/go-spew v1.1.10 // indirect - github.com/gozelle/multierror v1.2.1 // indirect github.com/gozelle/yaml v0.0.0-20221214152138-81b78a92d903 // indirect - github.com/hashicorp/errwrap v1.0.0 // indirect - golang.org/x/sync v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index 7855d24..dcb7442 100644 --- a/go.sum +++ b/go.sum @@ -6,23 +6,11 @@ github.com/gozelle/go-difflib v1.0.0/go.mod h1:PcDy306aUy3I8kB5ohutXpbhkgQlygtcB github.com/gozelle/go-internal v1.9.0 h1:VVobf+WAesfyYtHklxbmzo+2IXdTw3PxiwkmwC6QTTA= github.com/gozelle/go-spew v1.1.10 h1:kFDq5IDd/Wk3l7UutORhveCt5fUdW8b9afnDYtuHmEc= github.com/gozelle/go-spew v1.1.10/go.mod h1:qwUFQBhiE5zwtTAfe/m87+73t2jRbNHwGOvs+FgYx+8= -github.com/gozelle/multierr v0.0.1 h1:JvYYvGTGusBS6IEOTOmoroY4SqW5PU2S29Yi8T4UvnE= -github.com/gozelle/multierr v0.0.1/go.mod h1:Kd/mRKyMcPywI5eYDaMeiecSeRhgAsu8nIx35ic5NvE= -github.com/gozelle/multierr v1.9.10 h1:EUU22u5Yx82/mQm55fH1WF6ztp0w4U+cZl9lCzuyUvE= -github.com/gozelle/multierr v1.9.10/go.mod h1:4tC7qdet8CoxU9Q/Ha2bn5obpheqU7bTB9AENXwWzaI= -github.com/gozelle/multierror v1.1.1 h1:melqp8EeukL6chaEBbxu9EOiMRQIjqETkNx0fDSgDqQ= -github.com/gozelle/multierror v1.1.1/go.mod h1:mtEfirV1E5kamBQeek5fNIAjJVHyMhWyNvdZSieeUSY= -github.com/gozelle/multierror v1.2.0 h1:TlPzApJWq2J9gLHqqZ4x+4dcMoxe/qhvls1mclVD+5s= -github.com/gozelle/multierror v1.2.0/go.mod h1:9fIiDnhubQwoHOTroFTeFP/lBTEkq3PyxPpW/GLmtyM= -github.com/gozelle/multierror v1.2.1 h1:5RdVtSqmqNMF0FFXZ2C6z+0au4oOKbZy70LfI+etn+c= -github.com/gozelle/multierror v1.2.1/go.mod h1:9fIiDnhubQwoHOTroFTeFP/lBTEkq3PyxPpW/GLmtyM= github.com/gozelle/pretty v0.3.1 h1:dtU7yIlzRqiMmB9TcunypDfvAC/QOP0K8y/BKKOrgRg= github.com/gozelle/testify v1.8.11 h1:HDi4d07tDlTLfAc1ie9yR/ZBr3r4HR9866GA5uHSwdA= github.com/gozelle/testify v1.8.11/go.mod h1:vYSBvyNev2JwOWCafPzpqdL3xuxy79bECwKkP5a7Y1c= github.com/gozelle/text v0.2.0 h1:p0p6mIm1aD0lDN798Pfx9T0itU8O/iUoauFvPUtV6Gw= github.com/gozelle/yaml v0.0.0-20221214152138-81b78a92d903 h1:Z5bL8fjqThaLRa3MdQoESurwiUh8KZksKRQvLWY+h/c= github.com/gozelle/yaml v0.0.0-20221214152138-81b78a92d903/go.mod h1:0Pp3KXweJJtxd5jzd6y3qW2DpjEFAlYc7oRDtuhMFjg= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= diff --git a/parallel/parallel.go b/parallel/parallel.go index e6b04a7..e16da86 100644 --- a/parallel/parallel.go +++ b/parallel/parallel.go @@ -3,11 +3,11 @@ package parallel import ( "context" "fmt" - "github.com/gozelle/multierror" "runtime/debug" "sync" - + "github.com/gozelle/async" + "github.com/gozelle/async/multierr" ) type Null = async.Null @@ -20,9 +20,9 @@ type Result[T any] struct { type Runner[T any] async.Runner[T] func Run[T any](ctx context.Context, limit uint, runners []Runner[T]) <-chan *Result[T] { - + ch := make(chan *Result[T], len(runners)) - + if limit == 0 { defer func() { ch <- &Result[T]{Error: fmt.Errorf("limit expect great than 0")} @@ -30,33 +30,33 @@ func Run[T any](ctx context.Context, limit uint, runners []Runner[T]) <-chan *Re }() return ch } - + if ctx == nil { ctx = context.Background() } - + go run[T](ctx, limit, runners, ch) - + return ch } func run[T any](ctx context.Context, limit uint, runners []Runner[T], ch chan *Result[T]) { - - errs := multierror.Errors{} + + errs := multierr.Errors{} wg := sync.WaitGroup{} sem := make(chan struct{}, limit) - + defer func() { close(ch) close(sem) }() - + for _, v := range runners { - + // achieve a blocking effect by sending semaphores to a channel with a specified capacity of "limit" // when the channel is full, it will block here until a task is completed and frees up channel capacity sem <- struct{}{} - - // if the semaphore is acquired, prioritize checking whether the context has done. + + // if the semaphore is acquired, prioritize checking whether the context has done. // if it has, break out of the for loop. select { case <-ctx.Done(): @@ -81,7 +81,7 @@ func run[T any](ctx context.Context, limit uint, runners []Runner[T], ch chan *R <-sem wg.Done() }() - + r, e := runner(ctx) if e != nil { errs.AddError(e) @@ -91,10 +91,10 @@ func run[T any](ctx context.Context, limit uint, runners []Runner[T], ch chan *R }(v) } } - + wg.Wait() - - // all tasks have been completed. + + // all tasks have been completed. // check for any errors and ensure that the error is the last result sent to the channel. if errs.Error() != nil { ch <- &Result[T]{Error: errs.Error()} @@ -113,6 +113,6 @@ func Wait[T any](results <-chan *Result[T], handler func(v T) error) error { } } } - + return nil } diff --git a/race/race.go b/race/race.go index 1a03ff2..f247494 100644 --- a/race/race.go +++ b/race/race.go @@ -3,11 +3,12 @@ package race import ( "context" "fmt" - "github.com/gozelle/async" - "github.com/gozelle/multierr" "sync" "sync/atomic" "time" + + "github.com/gozelle/async" + "github.com/gozelle/async/multierr" ) type Null = async.Null @@ -26,30 +27,30 @@ type Runner[T any] struct { // 如果全部返回错误,则返回出现的第一个错误 // 配置延迟执行的 Runner,会在等待配置时间后,再开始执行 func Run[T any](ctx context.Context, runners []*Runner[T]) (result T, err error) { - + if len(runners) == 0 { err = fmt.Errorf("no runners") return } - + if ctx == nil { ctx = context.Background() } - + cctx, cancel := context.WithCancel(ctx) defer func() { cancel() }() - + vr := async.NewValue[T]() - ve := async.NewValues[error]() - + errs := multierr.Errors{} + wg := sync.WaitGroup{} - + for _, f := range runners { wg.Add(1) go func(f *Runner[T]) { - + done := atomic.Value{} go func() { select { @@ -60,30 +61,30 @@ func Run[T any](ctx context.Context, runners []*Runner[T]) (result T, err error) } } }() - + defer func() { if done.Load() == nil { wg.Done() done.Store(true) } }() - + if f.Delay > 0 { time.Sleep(f.Delay) } - + if cctx.Err() != nil { return } - + r, e := f.Runner(ctx) if e != nil { - ve.AddValue(e) + errs.AddError(e) return } vr.SetValue(r) cancel() - + return }(f) } @@ -92,8 +93,8 @@ func Run[T any](ctx context.Context, runners []*Runner[T]) (result T, err error) result = vr.Value() return } - - err = multierr.Combine(ve.Values()...) - + + err = errs.Error() + return }