Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #31 from catmullet/fix-no-error
Browse files Browse the repository at this point in the history
Fix for issue #30 Error not bubbling up.
  • Loading branch information
catmullet authored Jun 22, 2021
2 parents ae53f26 + 361f319 commit 0a8e61e
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 57 deletions.
43 changes: 20 additions & 23 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ func (r *runner) SetTimeout(duration time.Duration) Runner {

// Wait calls stop on workers and waits for the channel to drain.
// !!Should only be called when certain nothing will send to worker.
func (r *runner) Wait() (err error) {
func (r *runner) Wait() error {
r.waitForDrain()
if err = <-r.Stop(); err != nil || !errors.Is(err, context.Canceled) {
return
if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
Expand Down Expand Up @@ -216,6 +216,8 @@ func (r *runner) startWork() {
go func() {
var workerWG = new(sync.WaitGroup)
var closeOnce = new(sync.Once)

// write out error if not nil on exit.
defer func() {
workerWG.Wait()
r.errChan <- err
Expand All @@ -227,27 +229,22 @@ func (r *runner) startWork() {
r.wg.Done()
}()
for in := range r.inChan {
select {
case <-r.ctx.Done():
err = context.Canceled
continue
default:
r.limiter <- struct{}{}
workerWG.Add(1)
go func() {
defer func() {
<-r.limiter
workerWG.Done()
}()
if workErr := r.workFunc(in, r.outChan); workErr != nil {
r.once.Do(func() {
errors.As(err, &workErr)
r.cancel()
return
})
}
input := in
r.limiter <- struct{}{}
workerWG.Add(1)
go func() {
defer func() {
<-r.limiter
workerWG.Done()
}()
}
if err := r.workFunc(input, r.outChan); err != nil {
r.once.Do(func() {
r.errChan <- err
r.cancel()
return
})
}
}()
}
}()
}
181 changes: 147 additions & 34 deletions workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,58 @@ import (
)

const (
workerCount = 1000
workerCount = 100000
workerTimeout = time.Millisecond * 300
runTimes = 100000
)

type WorkerOne struct {
Count int
sync.Mutex
}
type WorkerTwo struct {
Count int
sync.Mutex
}

func NewWorkerOne() Worker {
func NewWorkerOne() *WorkerOne {
return &WorkerOne{}
}

func NewWorkerTwo() Worker {
func NewWorkerTwo() *WorkerTwo {
return &WorkerTwo{}
}

func (wo *WorkerOne) CurrentCount() int {
wo.Lock()
defer wo.Unlock()
return wo.Count
}

func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
var workerOne = "worker_one"
mut.Lock()
if val, ok := count[workerOne]; ok {
count[workerOne] = val + 1
} else {
count[workerOne] = 1
}
wo.Count = wo.Count + 1
mut.Unlock()

total := in.(int) * 2
out <- total
return nil
}

func (wt *WorkerTwo) CurrentCount() int {
wt.Lock()
defer wt.Unlock()
return wt.Count
}

func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error {
var workerTwo = "worker_two"
mut.Lock()
if val, ok := count[workerTwo]; ok {
count[workerTwo] = val + 1
} else {
count[workerTwo] = 1
}
wt.Count = wt.Count + 1
mut.Unlock()
return nil
}

var (
count = make(map[string]int)
mut = sync.RWMutex{}
err = errors.New("test error")
deadline = func() time.Time { return time.Now().Add(workerTimeout) }
Expand Down Expand Up @@ -184,24 +189,57 @@ func TestWorkers(t *testing.T) {
workerOne.Send(i)
}

if err := workerOne.Wait(); err != nil && !tt.errExpected {
fmt.Println(err)
t.Fail()
if err := workerOne.Wait(); err != nil && (!tt.errExpected) {
t.Error(err)
}
if err := workerTwo.Wait(); err != nil && !tt.errExpected {
fmt.Println(err)
t.Fail()
t.Error(err)
}
})
}
}

func TestWorkersFinish(t *testing.T) {
func TestWorkersFinish100(t *testing.T) {
const workCount = 100
ctx := context.Background()
w1 := NewWorkerOne()
w2 := NewWorkerTwo()
workerOne := NewRunner(ctx, w1, 1000).Start()
workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start()

for i := 0; i < workCount; i++ {
workerOne.Send(rand.Intn(100))
}

if err := workerOne.Wait(); err != nil {
fmt.Println(err)
}

if err := workerTwo.Wait(); err != nil {
fmt.Println(err)
}

if w1.CurrentCount() != workCount {
t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000")
t.Fail()
}
if w2.CurrentCount() != workCount {
t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000")
t.Fail()
}

t.Logf("worker_one count: %d, worker_two count: %d", w1.CurrentCount(), w2.CurrentCount())
}

func TestWorkersFinish100000(t *testing.T) {
const workCount = 100000
ctx := context.Background()
workerOne := NewRunner(ctx, NewWorkerOne(), 1000).Start()
workerTwo := NewRunner(ctx, NewWorkerTwo(), 1000).InFrom(workerOne).Start()
w1 := NewWorkerOne()
w2 := NewWorkerTwo()
workerOne := NewRunner(ctx, w1, 1000).Start()
workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start()

for i := 0; i < 100000; i++ {
for i := 0; i < workCount; i++ {
workerOne.Send(rand.Intn(100))
}

Expand All @@ -213,28 +251,103 @@ func TestWorkersFinish(t *testing.T) {
fmt.Println(err)
}

if count["worker_one"] != 100000 {
fmt.Println("worker one failed to finish,", "worker_one count", count["worker_one"], "/ 100000")
if w1.CurrentCount() != workCount {
t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000")
t.Fail()
}
if count["worker_two"] != 100000 {
fmt.Println("worker two failed to finish,", "worker_two count", count["worker_two"], "/ 100000")
if w2.CurrentCount() != workCount {
t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000")
t.Fail()
}

t.Logf("worker_one count: %d, worker_two count: %d", w1.CurrentCount(), w2.CurrentCount())
}

func BenchmarkGoWorkers(b *testing.B) {
func TestWorkersFinish1000000(t *testing.T) {
const workCount = 1000000
ctx := context.Background()
worker := NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount).Start()
w1 := NewWorkerOne()
w2 := NewWorkerTwo()
workerOne := NewRunner(ctx, w1, 1000).Start()
workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start()

for i := 0; i < workCount; i++ {
workerOne.Send(rand.Intn(100))
}

if err := workerOne.Wait(); err != nil {
fmt.Println(err)
}

if err := workerTwo.Wait(); err != nil {
fmt.Println(err)
}

b.StartTimer()
if w1.CurrentCount() != workCount {
t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000")
t.Fail()
}
if w2.CurrentCount() != workCount {
t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000")
t.Fail()
}

t.Logf("worker_one count: %d, worker_two count: %d", w1.CurrentCount(), w2.CurrentCount())
}

func BenchmarkGoWorkers1to1(b *testing.B) {
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start()

b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < runTimes; j++ {
for j := 0; j < 1000; j++ {
worker.Send(j)
}
}

b.StopTimer()

if err := worker.Wait(); err != nil {
b.Error(err)
}
}

func Benchmark100GoWorkers(b *testing.B) {
b.ReportAllocs()
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100).Start()

b.ResetTimer()
for i := 0; i < b.N; i++ {
worker.Send(i)
}

if err := worker.Wait(); err != nil {
b.Error(err)
}
}

func Benchmark1000GoWorkers(b *testing.B) {
b.ReportAllocs()
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start()

b.ResetTimer()
for i := 0; i < b.N; i++ {
worker.Send(i)
}

if err := worker.Wait(); err != nil {
b.Error(err)
}
}

func Benchmark10000GoWorkers(b *testing.B) {
b.ReportAllocs()
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000).Start()

b.ResetTimer()
for i := 0; i < b.N; i++ {
worker.Send(i)
}

if err := worker.Wait(); err != nil {
b.Error(err)
}
Expand Down

0 comments on commit 0a8e61e

Please sign in to comment.