-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.go
137 lines (115 loc) · 2.84 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package eventx
import (
"context"
)
type fetchRequest[E EventConstraint] struct {
from uint64
limit uint64
sizeLimit uint64
placeholder []E
respChan chan<- fetchResponse[E]
}
type fetchResponse[E EventConstraint] struct {
existed bool
events []E
}
type coreService[E EventConstraint] struct {
coreChan <-chan []E
fetchChan chan fetchRequest[E]
storedEvents []E
first uint64
last uint64
waitList []fetchRequest[E]
}
func newCoreService[E EventConstraint](coreChan <-chan []E, options eventxOptions) *coreService[E] {
return &coreService[E]{
coreChan: coreChan,
fetchChan: make(chan fetchRequest[E], 256),
storedEvents: make([]E, options.coreStoredEventsSize),
first: 0,
last: 0,
}
}
func computeRequestToResponse[E EventConstraint](
req fetchRequest[E], first uint64, last uint64, storedEvents []E,
) fetchResponse[E] {
if req.from < first {
return fetchResponse[E]{
existed: false,
}
}
events := req.placeholder
end := req.from + req.limit
if end > last {
end = last
}
size := uint64(0)
storeSize := uint64(len(storedEvents))
for i := req.from; i < end; i++ {
event := storedEvents[i%storeSize]
size += event.GetSize()
if i > req.from && req.sizeLimit > 0 && size > req.sizeLimit {
break
}
events = append(events, event)
}
return fetchResponse[E]{
existed: true,
events: events,
}
}
func (s *coreService[E]) requestToResponse(req fetchRequest[E]) {
resp := computeRequestToResponse(req, s.first, s.last, s.storedEvents)
req.respChan <- resp
}
func waitListRemoveIf[E EventConstraint](waitList []fetchRequest[E], fn func(req fetchRequest[E]) bool) int {
clearIndex := len(waitList)
for i := 0; i < clearIndex; {
req := waitList[i]
if !fn(req) {
i++
continue
}
clearIndex--
waitList[i], waitList[clearIndex] = waitList[clearIndex], waitList[i]
}
return clearIndex
}
func (s *coreService[E]) handleWaitList() {
offset := waitListRemoveIf(s.waitList, func(req fetchRequest[E]) bool {
return req.from < s.last
})
for i, waitReq := range s.waitList[offset:] {
s.requestToResponse(waitReq)
s.waitList[i] = fetchRequest[E]{}
}
s.waitList = s.waitList[:offset]
}
func (s *coreService[E]) runCore(ctx context.Context) {
select {
case events := <-s.coreChan:
firstSeq := events[0].GetSequence()
if s.first == 0 || firstSeq != s.last {
s.first = firstSeq
}
s.last = events[len(events)-1].GetSequence() + 1
size := uint64(len(s.storedEvents))
if s.last > s.first+size {
s.first = s.last - size
}
for _, e := range events {
s.storedEvents[e.GetSequence()%size] = e
}
s.handleWaitList()
case req := <-s.fetchChan:
if req.from >= s.last {
s.waitList = append(s.waitList, req)
return
}
s.requestToResponse(req)
case <-ctx.Done():
}
}
func (s *coreService[E]) doFetch(req fetchRequest[E]) {
s.fetchChan <- req
}