-
Notifications
You must be signed in to change notification settings - Fork 7
/
queue_test.go
71 lines (55 loc) · 1.11 KB
/
queue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package goworkqueue
import (
"sync"
"sync/atomic"
"testing"
"time"
)
// func TestCloseSend(t *testing.T) {
//
// c := make(chan struct{})
//
// go func() {
// time.Sleep(time.Microsecond)
// close(c)
// }()
//
// // What happens?
// c <- struct{}{}
//
// }
func TestQueue(t *testing.T) {
// 1000 job queue with 100 workers
workers := 100
queue := NewQueue(1000, workers, func(job interface{}, workerID int) {
// time.Sleep(time.Millisecond)
})
// Pretend we suddenly need to stop the workers.
// This might be a SIGTERM or perhaps the workerFunc() called queue.Close()
go func() {
time.Sleep(5 * time.Millisecond)
queue.Close()
}()
var jobs int64
var group sync.WaitGroup
group.Add(workers)
for j := 0; j < workers; j++ {
go func(id int) {
var i int
for {
i++
atomic.AddInt64(&jobs, 1)
if ok := queue.Add(i); !ok {
break
}
}
group.Done()
// fmt.Printf("%d: %d jobs\n", id, i)
}(j)
}
// Blocks until queue.Close()
queue.Run()
// ensure all goroutines are finished
group.Wait()
// fmt.Printf("%d jobs processed\n", atomic.LoadInt64(&jobs))
}