Skip to content

Commit

Permalink
share the executor pool across Loader calls
Browse files Browse the repository at this point in the history
  • Loading branch information
ajatprabha committed Aug 22, 2023
1 parent 88eba88 commit a8c5bd8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
46 changes: 34 additions & 12 deletions xload/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,30 @@ import (
"github.com/sourcegraph/conc/pool"
)

func processConcurrently(ctx context.Context, v any, opts *options) error {
doneCh := make(chan struct{}, 1)
defer close(doneCh)

p := pool.New().WithContext(ctx).WithMaxGoroutines(opts.concurrency).WithCancelOnError()

err := processAsync(p, opts, opts.loader, v, func() {
doneCh <- struct{}{}
})

select {
case <-ctx.Done():
return ctx.Err()
case <-doneCh:
return err
}
}

//nolint:funlen,nestif
func processAsync(ctx context.Context, obj any, o *options, loader Loader) error {
func processAsync(p *pool.ContextPool, o *options, loader Loader, obj any, cb func()) error {
if cb != nil {
defer cb()
}

v := reflect.ValueOf(obj)

if v.Kind() != reflect.Ptr {
Expand All @@ -22,8 +44,6 @@ func processAsync(ctx context.Context, obj any, o *options, loader Loader) error

typ := value.Type()

p := pool.New().WithErrors().WithMaxGoroutines(o.concurrency)

for i := 0; i < typ.NumField(); i++ {
fTyp := typ.Field(i)
fVal := value.Field(i)
Expand Down Expand Up @@ -79,7 +99,9 @@ func processAsync(ctx context.Context, obj any, o *options, loader Loader) error
// if the struct has a key, load it
// and set the value to the struct
if meta.name != "" && hasDecoder(fVal) {
loadAndSet := func(original reflect.Value, fVal reflect.Value, isNilStructPtr bool) error {
loadAndSet := func(
ctx context.Context, original reflect.Value, fVal reflect.Value, isNilStructPtr bool,
) error {
val, err := loader.Load(ctx, meta.name)
if err != nil {
return err
Expand All @@ -102,9 +124,7 @@ func processAsync(ctx context.Context, obj any, o *options, loader Loader) error

original := value.Field(i)

p.Go(func() error {
return loadAndSet(original, fVal, isNilStructPtr)
})
p.Go(func(ctx context.Context) error { return loadAndSet(ctx, original, fVal, isNilStructPtr) })

continue
}
Expand All @@ -114,7 +134,7 @@ func processAsync(ctx context.Context, obj any, o *options, loader Loader) error
pld = PrefixLoader(meta.prefix, loader)
}

err := processAsync(ctx, fVal.Interface(), o, pld)
err := processAsync(p, o, pld, fVal.Interface(), nil)
if err != nil {
return err
}
Expand All @@ -128,7 +148,7 @@ func processAsync(ctx context.Context, obj any, o *options, loader Loader) error
return ErrInvalidPrefix
}

loadAndSet := func(fVal reflect.Value) error {
loadAndSet := func(ctx context.Context, fVal reflect.Value) error {
// lookup value
val, err := loader.Load(ctx, meta.name)
if err != nil {
Expand All @@ -148,9 +168,11 @@ func processAsync(ctx context.Context, obj any, o *options, loader Loader) error
return nil
}

p.Go(func() error {
return loadAndSet(fVal)
})
p.Go(func(ctx context.Context) error { return loadAndSet(ctx, fVal) })
}

if cb == nil {
return nil
}

return p.Wait()
Expand Down
2 changes: 1 addition & 1 deletion xload/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Load(ctx context.Context, v any, opts ...Option) error {
o := newOptions(opts...)

if o.concurrency > 1 {
return processAsync(ctx, v, o, o.loader)
return processConcurrently(ctx, v, o)
}

return process(ctx, v, o.tagName, o.loader)
Expand Down

0 comments on commit a8c5bd8

Please sign in to comment.