-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.go
264 lines (226 loc) · 7.23 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package smartpoll
import (
"context"
"errors"
"sync/atomic"
"time"
)
type (
// Task performs an arbitrary operation in the background, returning either
// a fatal error, or a TaskHook, or neither (`return nil, nil`).
// The context provided to Task will be cancelled after this function (not
// the TaskHook) returns, or when the Scheduler.Run context is canceled.
//
// Each configured task runs independently, and only one will run per key,
// at any given time.
Task func(ctx context.Context) (TaskHook, error)
// TaskHook performs an arbitrary operation, synchronised with the main
// loop, within Scheduler.Run. The typical usage of this is to handle
// results (e.g. store them in an in-memory cache), and/or to reschedule
// the task.
// The context provided to TaskHook will be cancelled after this function
// returns, or the Scheduler.Run context is canceled.
//
// Like all "hooks" in this package, TaskHook runs synchronously with the
// main loop. It will always be run, prior to the next Task (of the same
// key), though it may not be called, in the event a fatal error occurs.
TaskHook func(ctx context.Context, internal *Internal) error
taskState struct {
task Task
// timer has three possible states:
//
// 1. nil - task is not scheduled
// 2. readySentinel - task is scheduled and ready to be executed
// 3. time.Timer - task is scheduled and will be ready after the timer fires
timer *time.Timer
// next is an advisory timestamp for when the task will next be ready.
// It is consumed alongside timer, when the value is received.
next time.Time
// running will be 0 if the task is not running, 1 if it is
running atomic.Int32
}
)
var (
// ErrPanicInTask will be returned by Scheduler.Run if a task calls
// runtime.Goexit. Note that it will technically also be returned if a task
// panics, but that will bubble, and cause the process to exit.
ErrPanicInTask = errors.New(`smartpoll: panic in task`)
// identifies tasks which are ready to be executed
readySentinel = new(time.Timer)
// errNoHook is used to skip the second stage of a Task, see taskState.run and Scheduler.Run
errNoHook = errors.New(`smartpoll: no hook`)
)
func (x Task) call(ctx context.Context) (TaskHook, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
return x(ctx)
}
func (x TaskHook) call(ctx context.Context, internal *Internal) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
return x(ctx, internal)
}
func (x *taskState) run(ctx context.Context, scheduler *Scheduler, internal *Internal) {
var success bool
var clearedRunning bool
clearRunning := func() {
if !clearedRunning {
clearedRunning = true
// mark the task as no longer running
x.running.Store(0)
// notify, for the edge case of re-running the scheduler
select {
case scheduler.taskCompleteCh <- struct{}{}:
default:
}
}
}
defer func() {
if !success {
// for the purposes of treating runtime.Goexit as a fatal error
select {
case <-ctx.Done():
case scheduler.taskLockCh <- ErrPanicInTask:
}
}
clearRunning()
}()
// run the (background) part of the task
hook, err := x.task.call(ctx)
// special case to facilitate skipping hook (and the synchronisation) if it is nil
if err == nil && hook == nil {
err = errNoHook
// this special case does not exit the main loop, therefore it must
// mark as not running prior to returning, lest it deadlock (see below)
clearRunning()
}
// block the main loop / notify it of any error
select {
case <-ctx.Done():
success = true
return
case scheduler.taskLockCh <- err:
}
// if we sent an error, we don't need to unlock (also it'd deadlock)
if err != nil {
success = true
return
}
defer func() {
// BEFORE we continue, while the main loop is blocked, we must allow
// the task to be rerun. If we tried to do this after, there's a small
// chance it'll race, and fail to "wake up" the main loop, which is
// responsible for re-running the task (in the case where another run
// has been scheduled, prior to the task finishing).
clearRunning()
// same deal as the defer above (handle runtime.Goexit as a fatal error)
if !success {
success = true // we only need to send one fatal error
err = ErrPanicInTask
}
// once we are done, unblock the main loop / notify it of error
// WARNING: This can't select on context, or it'll may cause a data
// race (e.g. in the deferred timer stops, in the main loop).
scheduler.taskUnlockCh <- err
}()
// finally, we run the hook (err is handled in the defer above)
err = hook.call(ctx, internal)
success = true
}
func (x *taskState) startIfPossible(ctx context.Context, scheduler *Scheduler, internal *Internal) {
if x.timer != readySentinel || !x.running.CompareAndSwap(0, 1) {
// not scheduled to start, or still running
return
}
// consume our ready tick
x.timer = nil
x.next = time.Time{}
// run the task in the background (it will synchronise using Scheduler.taskLockCh / taskSyncInternalCaseIndex)
go x.run(ctx, scheduler, internal)
}
// setTimer forcibly updates the state of the timer, disabling it if negative,
// setting it to readySentinel if zero, or setting it to a new timer if
// positive.
// It also updates next, the advisory timestamp for when the task will next be
// ready.
func (x *taskState) setTimer(d time.Duration) {
if x.timer != nil && x.timer != readySentinel {
// stop and drain (we consume any ready tick)
stopAndDrainTimer(x.timer)
}
// we update both next and timer, unless both the current and desired state is ready
switch {
case d > 0:
// take now before starting the timer (preserve "at least d" semantics)
x.next = time.Now().Add(d)
x.timer = time.NewTimer(d)
case d < 0:
x.next = time.Time{}
x.timer = nil
case x.timer != readySentinel:
x.next = time.Now()
x.timer = readySentinel
}
}
func (x *taskState) setTimerAt(t time.Time) {
if t == (time.Time{}) {
x.setTimer(-1)
return
}
stopTimer(&x.timer)
x.next = t
x.timer = startTimer(time.Until(t))
}
func (x *taskState) rescheduleSooner(t time.Time) {
if t == (time.Time{}) {
panic(`smartpoll: cannot schedule at sooner of zero time`)
}
if x.timer != nil && x.next != (time.Time{}) && !x.next.After(t) {
// already scheduled sooner / at the same time
return
}
x.next = t
if stopTimer(&x.timer) {
// keep the existing readiness - next was after t
return
}
x.timer = startTimer(time.Until(t))
}
func (x *taskState) stopTimer() (ready bool) {
ready = stopTimer(&x.timer)
if !ready {
x.next = time.Time{}
}
return
}
// stopTimer will ensure the timer is stopped, resolving it to either nil or
// readySentinel, returning a boolean to indicate which.
// The underlying time.Timer will be stopped (if any), and its channel drained.
//
// WARNING: Use taskState.stopTimer, in order to correctly update next.
func stopTimer(p **time.Timer) (ready bool) {
v := *p
switch v {
case nil:
return false
case readySentinel:
return true
}
if !stopAndDrainTimer(v) {
ready = true
}
if ready {
*p = readySentinel
} else {
*p = nil
}
return ready
}
// startTimer initializes the timer, and returns it, using readySentinel for
// negative or zero durations.
func startTimer(d time.Duration) *time.Timer {
if d > 0 {
return time.NewTimer(d)
}
return readySentinel
}