-
Notifications
You must be signed in to change notification settings - Fork 5
/
chan.go
73 lines (68 loc) · 1.5 KB
/
chan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Non-blocking channel for Go.
package nbc
import (
"container/list"
"sync"
)
// Special type that mimics the behavior of a channel but does not block when
// items are sent. Items are stored internally until received. Closing the Send
// channel will cause the Recv channel to be closed after all items have been
// received.
type NonBlockingChan struct {
mutex sync.Mutex
Send chan<- interface{}
Recv <-chan interface{}
items *list.List
}
// Loop for buffering items between the Send and Recv channels until the Send
// channel is closed.
func (n *NonBlockingChan) run(send <-chan interface{}, recv chan<- interface{}) {
for {
if send == nil && n.items.Len() == 0 {
close(recv)
break
}
var (
recvChan chan<- interface{}
recvVal interface{}
)
if n.items.Len() > 0 {
recvChan = recv
recvVal = n.items.Front().Value
}
select {
case i, ok := <-send:
if ok {
n.mutex.Lock()
n.items.PushBack(i)
n.mutex.Unlock()
} else {
send = nil
}
case recvChan <- recvVal:
n.mutex.Lock()
n.items.Remove(n.items.Front())
n.mutex.Unlock()
}
}
}
// Create a new non-blocking channel.
func New() *NonBlockingChan {
var (
send = make(chan interface{})
recv = make(chan interface{})
n = &NonBlockingChan{
Send: send,
Recv: recv,
items: list.New(),
}
)
go n.run(send, recv)
return n
}
// Retrieve the number of items waiting to be received.
func (n *NonBlockingChan) Len() int {
n.mutex.Lock()
defer n.mutex.Unlock()
return n.items.Len()
}