-
Notifications
You must be signed in to change notification settings - Fork 9
/
event_stream.go
70 lines (58 loc) · 1.38 KB
/
event_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package events
import (
"context"
"runtime"
"time"
"github.com/OneOfOne/xxhash"
"github.com/valyala/fastrand"
)
type eventStream struct {
ctx context.Context
shards []chan Event
}
func (e *eventStream) Send(ctx context.Context, eventType EventType, value interface{}, shardKey string) {
var shard int
if shardKey == "" {
shard = int(fastrand.Uint32n(uint32(len(e.shards))))
} else {
shard = int(xxhash.ChecksumString64(shardKey) % uint64(len(e.shards)))
}
evt := Event{
Type: eventType,
Time: time.Now(),
Value: value,
}
select {
case <-ctx.Done():
case <-e.ctx.Done():
case e.shards[shard] <- evt:
}
}
// NewStream creates, initialized and returns a new ready Stream
// instance. It spawns a set of worker goroutines under the hood. Each
// goroutine corresponds to a its own processor instance (that's why you
// pass factory here). Processor is initialized within a goroutine.
func NewStream(ctx context.Context, factory ProcessorFactory) Stream {
rv := &eventStream{
ctx: ctx,
shards: make([]chan Event, runtime.NumCPU()),
}
for i := range rv.shards {
rv.shards[i] = make(chan Event, 1)
}
for _, v := range rv.shards {
go func(channel <-chan Event) {
processor := factory()
defer processor.Shutdown()
for {
select {
case <-ctx.Done():
return
case evt := <-channel:
processor.Process(evt)
}
}
}(v)
}
return rv
}