forked from tovenja/cron
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cron.go
463 lines (411 loc) · 12.6 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
package cron
import (
"container/heap"
"context"
"sync"
"time"
)
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
entries EntryHeap
chain Chain
stop chan struct{}
timeChange chan struct{}
add chan *Entry
remove chan EntryID
snapshot chan chan []Entry
running bool
logger Logger
clock Clock
runningMu sync.Mutex
location *time.Location
parser ScheduleParser
nextID EntryID
jobWaiter sync.WaitGroup
// SlowJobTh(SlowJobThreshold) 慢作业阈值, 0: 关闭慢作业日志, 大于0: 开启慢作业日志,且作业时间大于该阈值slowJobTh的话,则记录
SlowJobTh time.Duration
}
// ScheduleParser is an interface for schedule spec parsers that return a Schedule
type ScheduleParser interface {
Parse(spec string) (Schedule, error)
}
// Job is an interface for submitted cron jobs.
type Job interface {
Run()
}
// Schedule describes a job's duty cycle.
type Schedule interface {
// Next returns the next activation time, later than the given time.
// Next is invoked initially, and then each time the job is run.
Next(time.Time) time.Time
HasNext() bool
}
// EntryID identifies an entry within a Cron instance
type EntryID int
// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
// ID is the cron-assigned ID of this entry, which may be used to look up a
// snapshot or remove it.
ID EntryID
// Schedule on which this job should be run.
Schedule Schedule
// Next time the job will run, or the zero time if Cron has not been
// started or this entry's schedule is unsatisfiable
Next time.Time
// Prev is the last time this job was run, or the zero time if never.
Prev time.Time
// WrappedJob is the thing to run when the Schedule is activated.
WrappedJob Job
// Job is the thing that was submitted to cron.
// It is kept around so that user code that needs to get at the job later,
// e.g. via Entries() can do so.
Job Job
// Desc is a description of Job
Desc string
}
// Valid returns true if this is not the zero entry.
func (e Entry) Valid() bool { return e.ID != 0 }
// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
func (s byTime) Len() int { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
if s[i].Next.IsZero() {
return false
}
if s[j].Next.IsZero() {
return true
}
return s[i].Next.Before(s[j].Next)
}
// New returns a new Cron job runner, modified by the given options.
//
// Available Settings
//
// Time Zone
// Description: The time zone in which schedules are interpreted
// Default: time.Local
//
// Parser
// Description: Parser converts cron spec strings into cron.Schedules.
// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
//
// Chain
// Description: Wrap submitted jobs to customize behavior.
// Default: A chain that recovers panics and logs them to stderr.
//
// See "cron.With*" to modify the default behavior.
func New(opts ...Option) *Cron {
c := &Cron{
entries: nil,
chain: NewChain(),
add: make(chan *Entry),
stop: make(chan struct{}),
timeChange: make(chan struct{}),
snapshot: make(chan chan []Entry),
remove: make(chan EntryID),
running: false,
runningMu: sync.Mutex{},
logger: DefaultLogger,
clock: DefaultClock,
parser: standardParser,
}
for _, opt := range opts {
opt(c)
}
c.location = c.clock.Location()
return c
}
// FuncJob is a wrapper that turns a func() into a cron.Job
type FuncJob func()
func (f FuncJob) Run() { f() }
// AddFuncAtFixedTime adds a func to the Cron to be executed at the time of given
func (c *Cron) AddFuncAtFixedTime(desc string, execTime time.Time, cmd func()) (EntryID, error) {
return c.AddJobAtFixedTime(desc, execTime, FuncJob(cmd))
}
// AddJobAtFixedTime adds a Job to the Cron to be executed at the fixed time of given
func (c *Cron) AddJobAtFixedTime(desc string, execTime time.Time, cmd Job) (EntryID, error) {
schedule, err := NewFixedTimeSchedule(execTime)
if err != nil {
return 0, err
}
return c.Schedule(desc, schedule, cmd), nil
}
// AddFuncWithDelay adds a func to the Cron to be executed with a delay time of given, only once
func (c *Cron) AddFuncWithDelay(desc string, delay time.Duration, cmd func()) (EntryID, error) {
return c.AddJobWithLoopDelay(desc, delay, 1, FuncJob(cmd))
}
// AddFuncWithLoopDelay adds a func to the Cron to be executed with a delay time of given each time, repeat the given times
// @param loop: the given times
func (c *Cron) AddFuncWithLoopDelay(desc string, delay time.Duration, loop int64, cmd func()) (EntryID, error) {
return c.AddJobWithLoopDelay(desc, delay, loop, FuncJob(cmd))
}
// AddJobWithDelay adds a Job to the Cron to be executed after a delay time of given, only once
func (c *Cron) AddJobWithDelay(desc string, delay time.Duration, cmd Job) (EntryID, error) {
return c.AddJobWithLoopDelay(desc, delay, 1, cmd)
}
// AddJobWithLoopDelay adds a Job to the Cron to be executed after a delay time of given each time and repeat the given times
// @param loop: the given times
func (c *Cron) AddJobWithLoopDelay(desc string, delay time.Duration, loop int64, cmd Job) (EntryID, error) {
schedule, err := NewLimitedDelaySchedule(delay, loop)
if err != nil {
return 0, err
}
return c.Schedule(desc, schedule, cmd), nil
}
// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(desc string, spec string, cmd func()) (EntryID, error) {
return c.AddJob(desc, spec, FuncJob(cmd))
}
// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(desc string, spec string, cmd Job) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(desc, schedule, cmd), nil
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(desc string, schedule Schedule, cmd Job) EntryID {
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
Desc: desc,
}
if !c.running {
heap.Push(&c.entries, entry)
} else {
c.add <- entry
}
return entry.ID
}
// Entries returns a snapshot of the cron entries.
func (c *Cron) Entries() []Entry {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
replyChan := make(chan []Entry, 1)
c.snapshot <- replyChan
return <-replyChan
}
return c.entrySnapshot()
}
// Location gets the time zone location
func (c *Cron) Location() *time.Location {
return c.location
}
// Entry returns a snapshot of the given entry, or nil if it couldn't be found.
func (c *Cron) Entry(id EntryID) Entry {
for _, entry := range c.Entries() {
if id == entry.ID {
return entry
}
}
return Entry{}
}
// Remove an entry from being run in the future.
func (c *Cron) Remove(id EntryID) {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.remove <- id
} else {
c.removeEntry(id)
}
}
// Start the cron scheduler in its own goroutine, or no-op if already started.
func (c *Cron) Start() {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
return
}
c.running = true
go c.run()
}
// Run the cron scheduler, or no-op if already running.
func (c *Cron) Run() {
c.runningMu.Lock()
if c.running {
c.runningMu.Unlock()
return
}
c.running = true
c.runningMu.Unlock()
c.run()
}
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
c.logger.Info("start")
// Figure out the next activation times for each entry.
now := c.now()
sortedEntries := new(EntryHeap)
for len(c.entries) > 0 {
entry := heap.Pop(&c.entries).(*Entry)
if entry.Schedule.HasNext() {
entry.Next = entry.Schedule.Next(now)
heap.Push(sortedEntries, entry)
c.logger.Info("schedule", "now", now, "entry", entry.ID, ", desc", entry.Desc, "next", entry.Next)
}
}
c.entries = *sortedEntries
var timer *time.Timer
for {
// Determine the next entry to run.
// Use min-heap no need sort anymore
//sort.Sort(byTime(c.entries))
var delay time.Duration
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
delay = 1000000 * time.Hour
} else {
delay = c.entries[0].Next.Sub(now)
}
if timer == nil {
timer = time.NewTimer(delay)
} else {
c.resetTimer(timer, delay)
}
for {
select {
case <-timer.C:
now = c.now()
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for {
e := c.entries.Peek()
if e == nil || e.Next.After(now) || e.Next.IsZero() {
break
}
e = heap.Pop(&c.entries).(*Entry)
c.startJob(e)
e.Prev = e.Next
if e.Schedule.HasNext() {
e.Next = e.Schedule.Next(now)
heap.Push(&c.entries, e)
c.logger.Info("run", "now", now, "entry", e.ID, ", desc", e.Desc, "next", e.Next)
}
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
if newEntry.Schedule.HasNext() {
newEntry.Next = newEntry.Schedule.Next(now)
heap.Push(&c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, ", desc", newEntry.Desc, "next", newEntry.Next)
}
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case <-c.timeChange:
timer.Stop()
now = c.now()
c.logger.Info("timeChange")
case id := <-c.remove:
timer.Stop()
now = c.now()
removedEntry := c.removeEntry(id)
if removedEntry != nil {
c.logger.Info("removed", "entry", id, ", desc", removedEntry.Desc)
}
}
break
}
}
}
// resetTimer reset timer
func (c *Cron) resetTimer(timer *time.Timer, delay time.Duration) {
// timer.Stop It returns true if the call stops the timer, false if the timer has already expired or been stopped.
// timer.Stop It does not close the channel
if !timer.Stop() {
select {
case <-timer.C: // Try to drain the channel to ensure the channel is empty after a call to Stop.
default:
}
}
// timer.Reset should always be invoked on stopped or expired channels
timer.Reset(delay)
}
// startJob runs the given job in a new goroutine.
func (c *Cron) startJob(e *Entry) {
c.jobWaiter.Add(1)
// todo 这里要不要用一个goroute池来执行作业呢?
go func() {
startTime := c.now()
defer func() {
c.jobWaiter.Done()
costTime := c.now().Sub(startTime)
if c.SlowJobTh > 0 && costTime > c.SlowJobTh {
c.logger.Warn("slow job", "entry:", e.ID, ",desc:", e.Desc, ",slowJobThreshold:", c.SlowJobTh, ",costTime:", costTime)
}
}()
e.WrappedJob.Run()
}()
}
// now returns current time
func (c *Cron) now() time.Time {
return c.clock.Now()
}
// Stop stops the cron scheduler if it is running; otherwise it does nothing.
// A context is returned so the caller can wait for running jobs to complete.
func (c *Cron) Stop() context.Context {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.stop <- struct{}{} //不再执行将来的定时任务
c.running = false
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
c.jobWaiter.Wait() //等待所有正在执行的任务完成后,再执行cancel
cancel() //会发出一个 ctx.Done() 信号
}()
return ctx //用户可以监听ctx.Done()得知什么时候Cron真的停止了.
}
// TimeChange 时间变化了
func (c *Cron) TimeChange() {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.timeChange <- struct{}{}
}
}
// entrySnapshot returns a copy of the current cron entry list.
func (c *Cron) entrySnapshot() []Entry {
var entries = make([]Entry, len(c.entries))
for i, e := range c.entries {
entries[i] = *e
}
return entries
}
func (c *Cron) removeEntry(id EntryID) *Entry {
for idx, e := range c.entries {
if e.ID == id {
heap.Remove(&c.entries, idx)
return e
}
}
return nil
}