Skip to content

Commit

Permalink
Reusable pools (#129)
Browse files Browse the repository at this point in the history
This updates the pool types that collect results and errors to reset on
`Wait` so they are reusable once waited on. Previously, if a pool was
reused, the returned values of `Wait()` would contain the aggregated set
of all previous uses. This wasn't explicitly a guarantee of the library
before, but it does make it operate more like `sync.WaitGroup` and it's
easy to do, so I think it's a positive change.
  • Loading branch information
camdencheek authored Jan 19, 2024
1 parent 8427ccd commit 4c5c70a
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 6 deletions.
10 changes: 7 additions & 3 deletions pool/error_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ func (p *ErrorPool) Go(f func() error) {
// returning any errors from tasks.
func (p *ErrorPool) Wait() error {
p.pool.Wait()
if len(p.errs) == 0 {

errs := p.errs
p.errs = nil // reset errs

if len(errs) == 0 {
return nil
} else if p.onlyFirstError {
return p.errs[0]
return errs[0]
} else {
return multierror.Join(p.errs...)
return multierror.Join(errs...)
}
}

Expand Down
15 changes: 15 additions & 0 deletions pool/error_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,19 @@ func TestErrorPool(t *testing.T) {
})
}
})

t.Run("reuse", func(t *testing.T) {
// Test for https://github.com/sourcegraph/conc/issues/128
p := pool.New().WithErrors()

p.Go(func() error { return err1 })
wait1 := p.Wait()
require.ErrorIs(t, wait1, err1)

p.Go(func() error { return err2 })
wait2 := p.Wait()
// On reuse, only the new error should be returned
require.ErrorIs(t, wait2, err2)
require.NotErrorIs(t, wait1, err2)
})
}
4 changes: 3 additions & 1 deletion pool/result_context_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
// returns an error if any of the tasks errored.
func (p *ResultContextPool[T]) Wait() ([]T, error) {
err := p.contextPool.Wait()
return p.agg.collect(p.collectErrored), err
results := p.agg.collect(p.collectErrored)
p.agg = resultAggregator[T]{}
return results, err
}

// WithCollectErrored configures the pool to still collect the result of a task
Expand Down
16 changes: 16 additions & 0 deletions pool/result_context_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,20 @@ func TestResultContextPool(t *testing.T) {
})
}
})

t.Run("reuse", func(t *testing.T) {
// Test for https://github.com/sourcegraph/conc/issues/128
p := pool.NewWithResults[int]().WithContext(context.Background())

p.Go(func(context.Context) (int, error) { return 1, err1 })
results1, errs1 := p.Wait()
require.Empty(t, results1)
require.ErrorIs(t, errs1, err1)

p.Go(func(context.Context) (int, error) { return 2, err2 })
results2, errs2 := p.Wait()
require.Empty(t, results2)
require.ErrorIs(t, errs2, err2)
require.NotErrorIs(t, errs2, err1)
})
}
4 changes: 3 additions & 1 deletion pool/result_error_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func (p *ResultErrorPool[T]) Go(f func() (T, error)) {
// returning the results and any errors from tasks.
func (p *ResultErrorPool[T]) Wait() ([]T, error) {
err := p.errorPool.Wait()
return p.agg.collect(p.collectErrored), err
results := p.agg.collect(p.collectErrored)
p.agg = resultAggregator[T]{} // reset for reuse
return results, err
}

// WithCollectErrored configures the pool to still collect the result of a task
Expand Down
16 changes: 16 additions & 0 deletions pool/result_error_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,20 @@ func TestResultErrorPool(t *testing.T) {
})
}
})

t.Run("reuse", func(t *testing.T) {
// Test for https://github.com/sourcegraph/conc/issues/128
p := pool.NewWithResults[int]().WithErrors()

p.Go(func() (int, error) { return 1, err1 })
results1, errs1 := p.Wait()
require.Empty(t, results1)
require.ErrorIs(t, errs1, err1)

p.Go(func() (int, error) { return 2, err2 })
results2, errs2 := p.Wait()
require.Empty(t, results2)
require.ErrorIs(t, errs2, err2)
require.NotErrorIs(t, errs2, err1)
})
}
4 changes: 3 additions & 1 deletion pool/result_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func (p *ResultPool[T]) Go(f func() T) {
// a slice of results from tasks that did not panic.
func (p *ResultPool[T]) Wait() []T {
p.pool.Wait()
return p.agg.collect(true)
results := p.agg.collect(true)
p.agg = resultAggregator[T]{} // reset for reuse
return results
}

// MaxGoroutines returns the maximum size of the pool.
Expand Down
13 changes: 13 additions & 0 deletions pool/result_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,17 @@ func TestResultGroup(t *testing.T) {
})
}
})

t.Run("reuse", func(t *testing.T) {
// Test for https://github.com/sourcegraph/conc/issues/128
p := pool.NewWithResults[int]()

p.Go(func() int { return 1 })
results1 := p.Wait()
require.Equal(t, []int{1}, results1)

p.Go(func() int { return 2 })
results2 := p.Wait()
require.Equal(t, []int{2}, results2)
})
}

0 comments on commit 4c5c70a

Please sign in to comment.