-
Notifications
You must be signed in to change notification settings - Fork 28
/
monitors.go
296 lines (251 loc) · 9.54 KB
/
monitors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis
import (
"time"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/pkg/errors"
)
// Monitor is a wrapper for the Realis client which allows us to have functions
// with the same name for Monitoring purposes.
// TODO(rdelvalle): Deprecate monitors and instead add prefix Monitor to
// all functions in this file like it is done in V2.
type Monitor struct {
Client Realis
}
// JobUpdate polls the scheduler every certain amount of time to see if the update has entered a terminal state.
func (m *Monitor) JobUpdate(
updateKey aurora.JobUpdateKey,
interval int,
timeout int) (bool, error) {
updateQ := aurora.JobUpdateQuery{
Key: &updateKey,
Limit: 1,
UpdateStatuses: TerminalUpdateStates(),
}
updateSummaries, err := m.JobUpdateQuery(
updateQ,
time.Duration(interval)*time.Second,
time.Duration(timeout)*time.Second)
status := updateSummaries[0].State.Status
if err != nil {
return false, err
}
m.Client.RealisConfig().logger.Printf("job update status: %v\n", status)
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
switch status {
case aurora.JobUpdateStatus_ROLLED_FORWARD:
return true, nil
case aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED:
return false, errors.Errorf("bad terminal state for update: %v", status)
default:
return false, errors.Errorf("unexpected update state: %v", status)
}
}
// JobUpdateStatus polls the scheduler every certain amount of time to see if the update has entered a specified state.
func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
desiredStatuses []aurora.JobUpdateStatus,
interval, timeout time.Duration) (aurora.JobUpdateStatus, error) {
updateQ := aurora.JobUpdateQuery{
Key: &updateKey,
Limit: 1,
UpdateStatuses: desiredStatuses,
}
summary, err := m.JobUpdateQuery(updateQ, interval, timeout)
if err != nil {
return 0, err
}
return summary[0].State.Status, nil
}
// JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results.
func (m *Monitor) JobUpdateQuery(
updateQuery aurora.JobUpdateQuery,
interval time.Duration,
timeout time.Duration) ([]*aurora.JobUpdateSummary, error) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
var cliErr error
var respDetail *aurora.Response
for {
select {
case <-ticker.C:
respDetail, cliErr = m.Client.GetJobUpdateSummaries(&updateQuery)
if cliErr != nil {
return nil, cliErr
}
updateSummaries := respDetail.Result_.GetJobUpdateSummariesResult_.UpdateSummaries
if len(updateSummaries) >= 1 {
return updateSummaries, nil
}
case <-timer.C:
return nil, newTimedoutError(errors.New("job update monitor timed out"))
}
}
}
// AutoPausedUpdateMonitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
// the update is in using information from the update configuration.
func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, timeout time.Duration) (int, error) {
key.Job = &aurora.JobKey{
Role: key.Job.Role,
Environment: key.Job.Environment,
Name: key.Job.Name,
}
query := aurora.JobUpdateQuery{
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
Key: &key,
}
response, err := m.Client.JobUpdateDetails(query)
if err != nil {
return -1, errors.Wrap(err, "unable to get information about update")
}
// TODO (rdelvalle): check for possible nil values when going down the list of structs
updateDetails := response.Result_.GetJobUpdateDetailsResult_.DetailsList
if len(updateDetails) == 0 {
return -1, errors.Errorf("details for update could not be found")
}
updateStrategy := updateDetails[0].Update.Instructions.Settings.UpdateStrategy
var batchSizes []int32
switch {
case updateStrategy.IsSetVarBatchStrategy():
batchSizes = updateStrategy.VarBatchStrategy.GroupSizes
if !updateStrategy.VarBatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
case updateStrategy.IsSetBatchStrategy():
batchSizes = []int32{updateStrategy.BatchStrategy.GroupSize}
if !updateStrategy.BatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
default:
return -1, errors.Errorf("update is not using a batch update strategy")
}
query.UpdateStatuses = append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED)
summary, err := m.JobUpdateQuery(query, interval, timeout)
if err != nil {
return -1, err
}
if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED ||
summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
}
updatingInstances := make(map[int32]struct{})
for _, e := range updateDetails[0].InstanceEvents {
// We only care about INSTANCE_UPDATING actions because we only care that they've been attempted
if e != nil && e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING {
updatingInstances[e.GetInstanceId()] = struct{}{}
}
}
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
}
// Instances will monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) {
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
}
// ScheduleStatus will monitor a Job until all instances enter a desired status.
// Defaults sets of desired statuses provided by the thrift API include:
// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES
func (m *Monitor) ScheduleStatus(
key *aurora.JobKey,
instanceCount int32,
desiredStatuses map[aurora.ScheduleStatus]bool,
interval int,
timeout int) (bool, error) {
ticker := time.NewTicker(time.Second * time.Duration(interval))
defer ticker.Stop()
timer := time.NewTimer(time.Second * time.Duration(timeout))
defer timer.Stop()
wantedStatuses := make([]aurora.ScheduleStatus, 0)
for status := range desiredStatuses {
wantedStatuses = append(wantedStatuses, status)
}
for {
select {
case <-ticker.C:
// Query Aurora for the state of the job key ever interval
instCount, cliErr := m.Client.GetInstanceIds(key, wantedStatuses)
if cliErr != nil {
return false, errors.Wrap(cliErr, "Unable to communicate with Aurora")
}
if len(instCount) == int(instanceCount) {
return true, nil
}
case <-timer.C:
// If the timer runs out, return a timeout error to user
return false, newTimedoutError(errors.New("schedule status monitor timed out"))
}
}
}
// HostMaintenance will monitor host status until all hosts match the status provided.
// Returns a map where the value is true if the host
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
func (m *Monitor) HostMaintenance(
hosts []string,
modes []aurora.MaintenanceMode,
interval, timeout int) (map[string]bool, error) {
// Transform modes to monitor for into a set for easy lookup
desiredMode := make(map[aurora.MaintenanceMode]struct{})
for _, mode := range modes {
desiredMode[mode] = struct{}{}
}
// Turn slice into a host set to eliminate duplicates.
// We also can't use a simple count because multiple modes means
// we can have multiple matches for a single host.
// I.e. host A transitions from ACTIVE to DRAINING to DRAINED while monitored
remainingHosts := make(map[string]struct{})
for _, host := range hosts {
remainingHosts[host] = struct{}{}
}
hostResult := make(map[string]bool)
ticker := time.NewTicker(time.Second * time.Duration(interval))
defer ticker.Stop()
timer := time.NewTimer(time.Second * time.Duration(timeout))
defer timer.Stop()
for {
select {
case <-ticker.C:
// Client call has multiple retries internally
_, result, err := m.Client.MaintenanceStatus(hosts...)
if err != nil {
// Error is either a payload error or a severe connection error
for host := range remainingHosts {
hostResult[host] = false
}
return hostResult, errors.Wrap(err, "client error in monitor")
}
for _, status := range result.GetStatuses() {
if _, ok := desiredMode[status.GetMode()]; ok {
hostResult[status.GetHost()] = true
delete(remainingHosts, status.GetHost())
if len(remainingHosts) == 0 {
return hostResult, nil
}
}
}
case <-timer.C:
for host := range remainingHosts {
hostResult[host] = false
}
return hostResult, newTimedoutError(errors.New("host maintenance monitor timed out"))
}
}
}