-
Notifications
You must be signed in to change notification settings - Fork 0
/
repo.go
98 lines (79 loc) · 1.96 KB
/
repo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package eventx
import "context"
type sizeLimitedRepo[E EventConstraint] struct {
repo Repository[E]
limit uint64
sizeLimit uint64
lastEvents []E
}
func newSizeLimitedRepo[E EventConstraint](repo Repository[E], limit uint64, sizeLimit uint64) *sizeLimitedRepo[E] {
return &sizeLimitedRepo[E]{
repo: repo,
limit: limit,
sizeLimit: sizeLimit,
}
}
func (r *sizeLimitedRepo[E]) splitLastEvents(index int) []E {
result := r.lastEvents[:index]
r.lastEvents = r.lastEvents[index:]
return result
}
func (r *sizeLimitedRepo[E]) getEventsFrom(ctx context.Context, from uint64) ([]E, error) {
events, dbFrom, ok := r.getFromMem(from)
if ok {
return events, nil
}
dbEvents, err := r.repo.GetEventsFrom(ctx, dbFrom, r.limit)
if err != nil {
return nil, err
}
r.setDBResult(dbEvents)
return r.forceGetFromMem(), nil
}
func (r *sizeLimitedRepo[E]) tryToGetFromMem() ([]E, bool) {
size := uint64(0)
for i, event := range r.lastEvents {
if uint64(i) >= r.limit {
return r.splitLastEvents(i), true
}
size += event.GetSize()
if size == r.sizeLimit {
return r.splitLastEvents(i + 1), true
}
if size > r.sizeLimit {
if i > 0 {
return r.splitLastEvents(i), true
}
return r.splitLastEvents(i + 1), true
}
}
return nil, false
}
func (r *sizeLimitedRepo[E]) getFromMem(from uint64) ([]E, uint64, bool) {
if len(r.lastEvents) == 0 {
return nil, from, false
}
firstSeq := r.lastEvents[0].GetSequence()
if firstSeq != from {
r.lastEvents = nil
return nil, from, false
}
events, ok := r.tryToGetFromMem()
if ok {
return events, 0, true
}
lastSeq := r.lastEvents[len(r.lastEvents)-1].GetSequence()
return nil, lastSeq + 1, false
}
func (r *sizeLimitedRepo[E]) setDBResult(events []E) {
r.lastEvents = append(r.lastEvents, events...)
}
func (r *sizeLimitedRepo[E]) forceGetFromMem() []E {
events, ok := r.tryToGetFromMem()
if ok {
return events
}
events = r.lastEvents
r.lastEvents = nil
return events
}