diff --git a/pkg/utils/sleeper_task.go b/pkg/utils/sleeper_task.go index 0a65ea890..02dc970b3 100644 --- a/pkg/utils/sleeper_task.go +++ b/pkg/utils/sleeper_task.go @@ -1,6 +1,7 @@ package utils import ( + "context" "fmt" "time" @@ -13,12 +14,18 @@ type Worker interface { Name() string } +// WorkerCtx is like Worker but includes [context.Context]. +type WorkerCtx interface { + Work(ctx context.Context) + Name() string +} + // SleeperTask represents a task that waits in the background to process some work. type SleeperTask struct { services.StateMachine - worker Worker + worker WorkerCtx chQueue chan struct{} - chStop chan struct{} + chStop services.StopChan chDone chan struct{} chWorkDone chan struct{} } @@ -31,16 +38,27 @@ type SleeperTask struct { // immediately after it is finished. For this reason you should take care to // make sure that Worker is idempotent. // WakeUp does not block. -func NewSleeperTask(worker Worker) *SleeperTask { +func NewSleeperTask(w Worker) *SleeperTask { + return NewSleeperTaskCtx(&worker{w}) +} + +type worker struct { + Worker +} + +func (w *worker) Work(ctx context.Context) { w.Worker.Work() } + +// NewSleeperTaskCtx is like NewSleeperTask but accepts a WorkerCtx with a [context.Context]. +func NewSleeperTaskCtx(w WorkerCtx) *SleeperTask { s := &SleeperTask{ - worker: worker, + worker: w, chQueue: make(chan struct{}, 1), chStop: make(chan struct{}), chDone: make(chan struct{}), chWorkDone: make(chan struct{}, 10), } - _ = s.StartOnce("SleeperTask-"+worker.Name(), func() error { + _ = s.StartOnce("SleeperTask-"+w.Name(), func() error { go s.workerLoop() return nil }) @@ -98,10 +116,13 @@ func (s *SleeperTask) WorkDone() <-chan struct{} { func (s *SleeperTask) workerLoop() { defer close(s.chDone) + ctx, cancel := s.chStop.NewCtx() + defer cancel() + for { select { case <-s.chQueue: - s.worker.Work() + s.worker.Work(ctx) s.workDone() case <-s.chStop: return