Skip to content

Commit

Permalink
feat: stop active tasks when server shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
kanzihuang committed Mar 19, 2024
1 parent 3ccdab4 commit 3f14ff3
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 20 deletions.
42 changes: 25 additions & 17 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type processor struct {
// quit channel is closed when the shutdown of the "processor" goroutine starts.
quit chan struct{}

// terminate channel is closed when the shutdown of the "processor" goroutine starts.
terminate chan struct{}

// abort channel communicates to the in-flight worker goroutines to stop.
abort chan struct{}

Expand Down Expand Up @@ -113,6 +116,7 @@ func newProcessor(params processorParams) *processor {
sema: make(chan struct{}, params.concurrency),
done: make(chan struct{}),
quit: make(chan struct{}),
terminate: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
Expand All @@ -139,6 +143,7 @@ func (p *processor) stop() {
func (p *processor) shutdown() {
p.stop()

close(p.terminate)
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })

p.logger.Info("Waiting for all workers to finish...")
Expand Down Expand Up @@ -232,25 +237,28 @@ func (p *processor) exec() {
resCh <- p.perform(ctx, task)
}()

select {
case <-p.abort:
// time is up, push the message back to queue and quit this worker goroutine.
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.requeue(lease, msg)
return
case <-lease.Done():
cancel()
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
return
case <-ctx.Done():
p.handleFailedMessage(ctx, lease, msg, ctx.Err())
return
case resErr := <-resCh:
if resErr != nil {
p.handleFailedMessage(ctx, lease, msg, resErr)
for {
select {
case <-p.terminate:
cancel()
case <-lease.Done():
cancel()
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
return
case <-p.abort:
// time is up, push the message back to queue and quit this worker goroutine.
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.requeue(lease, msg)
return
case resErr := <-resCh:
switch {
case resErr == nil:
p.handleSucceededMessage(lease, msg)
default:
p.handleFailedMessage(ctx, lease, msg, resErr)
}
return
}
p.handleSucceededMessage(lease, msg)
}
}()
}
Expand Down
51 changes: 48 additions & 3 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestServer(t *testing.T) {

func TestServerRun(t *testing.T) {
if runtime.GOOS == "windows" {
// Sending Interrupt on Windows is not implemented
// because Sending Interrupt on Windows is not implemented
return
}

Expand All @@ -132,7 +132,6 @@ func TestServerRun(t *testing.T) {
done := make(chan struct{})
// Make sure server exits when receiving TERM signal.
go func() {
time.Sleep(2 * time.Second)
p, err := os.FindProcess(os.Getpid())
if err != nil {
t.Error(err)
Expand All @@ -143,7 +142,6 @@ func TestServerRun(t *testing.T) {
t.Error(err)
return
}
done <- struct{}{}
}()

go func() {
Expand All @@ -158,6 +156,53 @@ func TestServerRun(t *testing.T) {
if err := srv.Run(mux); err != nil {
t.Fatal(err)
}
done <- struct{}{}
}

func TestServerShutdown(t *testing.T) {
var err error

// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)

redisConnOpt := getRedisConnOpt(t)
r, ok := redisConnOpt.MakeRedisClient().(redis.UniversalClient)
if !ok {
t.Fatalf("asynq: unsupported RedisConnOpt type %T", r)
}
testutil.FlushDB(t, r)

srv := NewServer(redisConnOpt, Config{LogLevel: testLogLevel, ShutdownTimeout: 3 * time.Second})
done := make(chan struct{})
mux := NewServeMux()
mux.HandleFunc("send_email", func(ctx context.Context, task *Task) error {
err := timeutil.Sleep(ctx, 10*time.Second)
time.Sleep(1 * time.Second)
done <- struct{}{}
return err
})
if err := srv.Start(mux); err != nil {
t.Fatal(err)
}

c := NewClient(redisConnOpt)
defer c.Close()
_, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 123})))
if err != nil {
t.Errorf("could not enqueue a task: %v", err)
}

// Make sure active tasks stops when server shutdown.
go func() {
time.Sleep(2 * time.Second)
srv.Shutdown()
}()
select {
case <-time.After(4 * time.Second):
t.Error("active tasks did not stop after server shutdown")
case <-done:
}
}

func TestServerErrServerClosed(t *testing.T) {
Expand Down

0 comments on commit 3f14ff3

Please sign in to comment.