Skip to content

Commit

Permalink
add multierr
Browse files Browse the repository at this point in the history
  • Loading branch information
koyeo committed Aug 29, 2023
1 parent a0f672b commit 86676b6
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 55 deletions.
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ module github.com/gozelle/async
go 1.18

require (
github.com/gozelle/multierr v0.0.1
github.com/gozelle/atomic v1.10.10
github.com/gozelle/testify v1.8.11
golang.org/x/sync v0.3.0
)

require (
github.com/gozelle/atomic v1.10.10 // indirect
github.com/gozelle/go-difflib v1.0.0 // indirect
github.com/gozelle/go-spew v1.1.10 // indirect
github.com/gozelle/multierror v1.2.1 // indirect
github.com/gozelle/yaml v0.0.0-20221214152138-81b78a92d903 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
golang.org/x/sync v0.3.0 // indirect
)
12 changes: 0 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,11 @@ github.com/gozelle/go-difflib v1.0.0/go.mod h1:PcDy306aUy3I8kB5ohutXpbhkgQlygtcB
github.com/gozelle/go-internal v1.9.0 h1:VVobf+WAesfyYtHklxbmzo+2IXdTw3PxiwkmwC6QTTA=
github.com/gozelle/go-spew v1.1.10 h1:kFDq5IDd/Wk3l7UutORhveCt5fUdW8b9afnDYtuHmEc=
github.com/gozelle/go-spew v1.1.10/go.mod h1:qwUFQBhiE5zwtTAfe/m87+73t2jRbNHwGOvs+FgYx+8=
github.com/gozelle/multierr v0.0.1 h1:JvYYvGTGusBS6IEOTOmoroY4SqW5PU2S29Yi8T4UvnE=
github.com/gozelle/multierr v0.0.1/go.mod h1:Kd/mRKyMcPywI5eYDaMeiecSeRhgAsu8nIx35ic5NvE=
github.com/gozelle/multierr v1.9.10 h1:EUU22u5Yx82/mQm55fH1WF6ztp0w4U+cZl9lCzuyUvE=
github.com/gozelle/multierr v1.9.10/go.mod h1:4tC7qdet8CoxU9Q/Ha2bn5obpheqU7bTB9AENXwWzaI=
github.com/gozelle/multierror v1.1.1 h1:melqp8EeukL6chaEBbxu9EOiMRQIjqETkNx0fDSgDqQ=
github.com/gozelle/multierror v1.1.1/go.mod h1:mtEfirV1E5kamBQeek5fNIAjJVHyMhWyNvdZSieeUSY=
github.com/gozelle/multierror v1.2.0 h1:TlPzApJWq2J9gLHqqZ4x+4dcMoxe/qhvls1mclVD+5s=
github.com/gozelle/multierror v1.2.0/go.mod h1:9fIiDnhubQwoHOTroFTeFP/lBTEkq3PyxPpW/GLmtyM=
github.com/gozelle/multierror v1.2.1 h1:5RdVtSqmqNMF0FFXZ2C6z+0au4oOKbZy70LfI+etn+c=
github.com/gozelle/multierror v1.2.1/go.mod h1:9fIiDnhubQwoHOTroFTeFP/lBTEkq3PyxPpW/GLmtyM=
github.com/gozelle/pretty v0.3.1 h1:dtU7yIlzRqiMmB9TcunypDfvAC/QOP0K8y/BKKOrgRg=
github.com/gozelle/testify v1.8.11 h1:HDi4d07tDlTLfAc1ie9yR/ZBr3r4HR9866GA5uHSwdA=
github.com/gozelle/testify v1.8.11/go.mod h1:vYSBvyNev2JwOWCafPzpqdL3xuxy79bECwKkP5a7Y1c=
github.com/gozelle/text v0.2.0 h1:p0p6mIm1aD0lDN798Pfx9T0itU8O/iUoauFvPUtV6Gw=
github.com/gozelle/yaml v0.0.0-20221214152138-81b78a92d903 h1:Z5bL8fjqThaLRa3MdQoESurwiUh8KZksKRQvLWY+h/c=
github.com/gozelle/yaml v0.0.0-20221214152138-81b78a92d903/go.mod h1:0Pp3KXweJJtxd5jzd6y3qW2DpjEFAlYc7oRDtuhMFjg=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
38 changes: 19 additions & 19 deletions parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package parallel
import (
"context"
"fmt"
"github.com/gozelle/multierror"
"runtime/debug"
"sync"

"github.com/gozelle/async"
"github.com/gozelle/async/multierr"
)

type Null = async.Null
Expand All @@ -20,43 +20,43 @@ type Result[T any] struct {
type Runner[T any] async.Runner[T]

func Run[T any](ctx context.Context, limit uint, runners []Runner[T]) <-chan *Result[T] {

ch := make(chan *Result[T], len(runners))

if limit == 0 {
defer func() {
ch <- &Result[T]{Error: fmt.Errorf("limit expect great than 0")}
close(ch)
}()
return ch
}

if ctx == nil {
ctx = context.Background()
}

go run[T](ctx, limit, runners, ch)

return ch
}
func run[T any](ctx context.Context, limit uint, runners []Runner[T], ch chan *Result[T]) {
errs := multierror.Errors{}

errs := multierr.Errors{}
wg := sync.WaitGroup{}
sem := make(chan struct{}, limit)

defer func() {
close(ch)
close(sem)
}()

for _, v := range runners {

// achieve a blocking effect by sending semaphores to a channel with a specified capacity of "limit"
// when the channel is full, it will block here until a task is completed and frees up channel capacity
sem <- struct{}{}
// if the semaphore is acquired, prioritize checking whether the context has done.

// if the semaphore is acquired, prioritize checking whether the context has done.
// if it has, break out of the for loop.
select {
case <-ctx.Done():
Expand All @@ -81,7 +81,7 @@ func run[T any](ctx context.Context, limit uint, runners []Runner[T], ch chan *R
<-sem
wg.Done()
}()

r, e := runner(ctx)
if e != nil {
errs.AddError(e)
Expand All @@ -91,10 +91,10 @@ func run[T any](ctx context.Context, limit uint, runners []Runner[T], ch chan *R
}(v)
}
}

wg.Wait()
// all tasks have been completed.

// all tasks have been completed.
// check for any errors and ensure that the error is the last result sent to the channel.
if errs.Error() != nil {
ch <- &Result[T]{Error: errs.Error()}
Expand All @@ -113,6 +113,6 @@ func Wait[T any](results <-chan *Result[T], handler func(v T) error) error {
}
}
}

return nil
}
39 changes: 20 additions & 19 deletions race/race.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package race
import (
"context"
"fmt"
"github.com/gozelle/async"
"github.com/gozelle/multierr"
"sync"
"sync/atomic"
"time"

"github.com/gozelle/async"
"github.com/gozelle/async/multierr"
)

type Null = async.Null
Expand All @@ -26,30 +27,30 @@ type Runner[T any] struct {
// 如果全部返回错误,则返回出现的第一个错误
// 配置延迟执行的 Runner,会在等待配置时间后,再开始执行
func Run[T any](ctx context.Context, runners []*Runner[T]) (result T, err error) {

if len(runners) == 0 {
err = fmt.Errorf("no runners")
return
}

if ctx == nil {
ctx = context.Background()
}

cctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
}()

vr := async.NewValue[T]()
ve := async.NewValues[error]()
errs := multierr.Errors{}

wg := sync.WaitGroup{}

for _, f := range runners {
wg.Add(1)
go func(f *Runner[T]) {

done := atomic.Value{}
go func() {
select {
Expand All @@ -60,30 +61,30 @@ func Run[T any](ctx context.Context, runners []*Runner[T]) (result T, err error)
}
}
}()

defer func() {
if done.Load() == nil {
wg.Done()
done.Store(true)
}
}()

if f.Delay > 0 {
time.Sleep(f.Delay)
}

if cctx.Err() != nil {
return
}

r, e := f.Runner(ctx)
if e != nil {
ve.AddValue(e)
errs.AddError(e)
return
}
vr.SetValue(r)
cancel()

return
}(f)
}
Expand All @@ -92,8 +93,8 @@ func Run[T any](ctx context.Context, runners []*Runner[T]) (result T, err error)
result = vr.Value()
return
}
err = multierr.Combine(ve.Values()...)

err = errs.Error()

return
}

0 comments on commit 86676b6

Please sign in to comment.