-
Notifications
You must be signed in to change notification settings - Fork 4
/
throttler.go
161 lines (139 loc) · 4.23 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package alertnotification
import (
"fmt"
"os"
"strconv"
"time"
"github.com/GitbookIO/diskache"
)
// Throttler struct storing disckage directory and Throttling duration
type Throttler struct {
CacheOpt string
ThrottleDuration int
GraceDuration int
}
// ErrorOccurrence store error time and error
type ErrorOccurrence struct {
StartTime time.Time
ErrorType error
}
// NewThrottler constructs new Throttle struct and init diskcache directory
func NewThrottler() Throttler {
t := Throttler{
CacheOpt: fmt.Sprintf("/tmp/cache/%v_throttler_disk_cache", os.Getenv("APP_NAME")),
ThrottleDuration: 5, // default 5mn
GraceDuration: 0, // default 0sc
}
if len(os.Getenv("THROTTLE_DURATION")) != 0 {
duration, err := strconv.Atoi(os.Getenv("THROTTLE_DURATION"))
if err != nil {
return t
}
t.ThrottleDuration = duration
}
if len(os.Getenv("THROTTLE_GRACE_SECONDS")) != 0 {
grace, err := strconv.Atoi(os.Getenv("THROTTLE_GRACE_SECONDS"))
if err != nil {
return t
}
t.GraceDuration = grace
}
if len(os.Getenv("THROTTLE_DISKCACHE_DIR")) != 0 {
t.CacheOpt = os.Getenv("THROTTLE_DISKCACHE_DIR")
}
return t
}
// IsThrottled checks if the error has been throttled. If not, throttle it
func (t *Throttler) IsThrottledOrGraced(ocError error) bool {
dc, err := t.getDiskCache()
if err != nil {
return false
}
cachedThrottleTime, throttled := dc.Get(ocError.Error())
cachedDetectionTime, graced := dc.Get(fmt.Sprintf("%v_detectionTime", ocError.Error()))
throttleIsOver := isOverThrottleDuration(string(cachedThrottleTime), t.ThrottleDuration)
if throttled && !throttleIsOver {
// already throttled and not over throttling duration, do nothing
return true
}
if !graced || isOverGracePlusThrottleDuration(string(cachedDetectionTime), t.GraceDuration, t.ThrottleDuration) {
cachedDetectionTime = t.InitGrace(ocError)
}
if cachedDetectionTime != nil && !isOverGraceDuration(string(cachedDetectionTime), t.GraceDuration) {
// grace duration is not over yet, do nothing
return true
}
// if it has not throttled yet or over throttle duration, throttle it and return false to send notification
// Rethrottler will also renew the timestamp in the throttler cache.
if err = t.ThrottleError(ocError); err != nil {
return false
}
return false
}
func isOverGracePlusThrottleDuration(cachedTime string, graceDurationInSec int, throttleDurationInMin int) bool {
detectionTime, err := time.Parse(time.RFC3339, string(cachedTime))
if err != nil {
return false
}
now := time.Now()
diff := int(now.Sub(detectionTime).Seconds())
overallDurationInSec := graceDurationInSec + throttleDurationInMin*60
return diff >= overallDurationInSec
}
func isOverGraceDuration(cachedTime string, graceDuration int) bool {
detectionTime, err := time.Parse(time.RFC3339, string(cachedTime))
if err != nil {
return false
}
now := time.Now()
diff := int(now.Sub(detectionTime).Seconds())
return diff >= graceDuration
}
func isOverThrottleDuration(cachedTime string, throttleDuration int) bool {
throttledTime, err := time.Parse(time.RFC3339, string(cachedTime))
if err != nil {
return false
}
now := time.Now()
diff := int(now.Sub(throttledTime).Minutes())
return diff >= throttleDuration
}
// ThrottleError throttle the alert within the limited duration
func (t *Throttler) ThrottleError(errObj error) error {
dc, err := t.getDiskCache()
if err != nil {
return err
}
now := time.Now().Format(time.RFC3339)
err = dc.Set(errObj.Error(), []byte(now))
return err
}
// ThrottleError throttle the alert within the limited duration
func (t *Throttler) InitGrace(errObj error) []byte {
dc, err := t.getDiskCache()
if err != nil {
return nil
}
now := time.Now().Format(time.RFC3339)
cachedDetectionTime := []byte(now)
err = dc.Set(fmt.Sprintf("%v_detectionTime", errObj.Error()), cachedDetectionTime)
if err != nil {
return nil
}
return cachedDetectionTime
}
// CleanThrottlingCache clean all the diskcache in throttling cache directory
func (t *Throttler) CleanThrottlingCache() (err error) {
dc, err := t.getDiskCache()
if err != nil {
return err
}
err = dc.Clean()
return err
}
func (t *Throttler) getDiskCache() (*diskache.Diskache, error) {
opts := diskache.Opts{
Directory: t.CacheOpt,
}
return diskache.New(&opts)
}