-
Notifications
You must be signed in to change notification settings - Fork 0
/
retention.go
120 lines (98 loc) · 2.26 KB
/
retention.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package eventx
import (
"context"
"time"
)
// RetentionJob ...
type RetentionJob[E EventConstraint] struct {
options retentionOptions
minSequence uint64
nextSequence uint64
retentionRepo RetentionRepository
runner *Runner[E]
sub *Subscriber[E]
sleepFunc func(ctx context.Context, d time.Duration)
}
// NewRetentionJob ...
func NewRetentionJob[E EventConstraint](
runner *Runner[E],
repo RetentionRepository,
options ...RetentionOption,
) *RetentionJob[E] {
opts := computeRetentionOptions(options...)
return &RetentionJob[E]{
options: opts,
retentionRepo: repo,
runner: runner,
sleepFunc: func(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
},
}
}
func (j *RetentionJob[E]) logError(ctx context.Context, err error) {
if ctx.Err() != nil {
return
}
j.options.errorLogger(err)
}
func (j *RetentionJob[E]) initJob(ctx context.Context) error {
events, err := j.runner.repo.GetLastEvents(ctx, 1)
if err != nil {
return err
}
fromSequence := uint64(1)
if len(events) > 0 {
fromSequence = events[0].GetSequence() + 1
}
minSequence, err := j.retentionRepo.GetMinSequence(ctx)
if err != nil {
return err
}
j.minSequence = 1
if minSequence.Valid {
j.minSequence = uint64(minSequence.Int64)
}
j.nextSequence = fromSequence
j.sub = j.runner.NewSubscriber(fromSequence, j.options.fetchLimit)
return nil
}
// RunJob will stop when the context object is cancelled / deadline exceeded
func (j *RetentionJob[E]) RunJob(ctx context.Context) {
for {
j.runInLoop(ctx)
j.sleepFunc(ctx, j.options.errorRetryDuration)
if ctx.Err() != nil {
return
}
}
}
func (j *RetentionJob[E]) runInLoop(ctx context.Context) {
err := j.initJob(ctx)
if err != nil {
j.logError(ctx, err)
return
}
for {
for {
if j.nextSequence < j.options.maxTotalEvents+j.options.deleteBatchSize+j.minSequence {
break
}
beforeSeq := j.minSequence + j.options.deleteBatchSize
err := j.retentionRepo.DeleteEventsBefore(ctx, beforeSeq)
if err != nil {
j.logError(ctx, err)
return
}
j.minSequence = beforeSeq
}
events, err := j.sub.Fetch(ctx)
if err != nil {
j.logError(ctx, err)
return
}
j.nextSequence = events[len(events)-1].GetSequence() + 1
}
}