Skip to content

Commit

Permalink
enhance: add graceful stop timeout to avoid node stop hang under extr…
Browse files Browse the repository at this point in the history
…eme cases (#30320)

1. add coordinator and proxy graceful stop timeout to 5s.
3. add other work node graceful stop timeout to 900s, and we should
potentially change this to 600s when graceful stop is smooth
4. change the order of datacoord component while stop.
5. `LivenessCheck` do not perform graceful shutdown now. 

issue: #30310
pr: #30317
also see: #30306

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jan 27, 2024
1 parent 650dcc5 commit 77e1237
Show file tree
Hide file tree
Showing 26 changed files with 282 additions and 149 deletions.
8 changes: 4 additions & 4 deletions cmd/components/data_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -26,6 +27,7 @@ import (
grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -57,10 +59,8 @@ func (s *DataCoord) Run() error {

// Stop terminates service
func (s *DataCoord) Stop() error {
if err := s.svr.Stop(); err != nil {
return err
}
return nil
timeout := paramtable.Get().DataCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second)
return exitWhenStopTimeout(s.svr.Stop, timeout)
}

// GetComponentStates returns DataCoord's states
Expand Down
8 changes: 4 additions & 4 deletions cmd/components/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -26,6 +27,7 @@ import (
grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -60,10 +62,8 @@ func (d *DataNode) Run() error {

// Stop terminates service
func (d *DataNode) Stop() error {
if err := d.svr.Stop(); err != nil {
return err
}
return nil
timeout := paramtable.Get().DataNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)
return exitWhenStopTimeout(d.svr.Stop, timeout)
}

// GetComponentStates returns DataNode's states
Expand Down
8 changes: 4 additions & 4 deletions cmd/components/index_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -26,6 +27,7 @@ import (
grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -58,10 +60,8 @@ func (n *IndexNode) Run() error {

// Stop terminates service
func (n *IndexNode) Stop() error {
if err := n.svr.Stop(); err != nil {
return err
}
return nil
timeout := paramtable.Get().IndexNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)
return exitWhenStopTimeout(n.svr.Stop, timeout)
}

// GetComponentStates returns IndexNode's states
Expand Down
8 changes: 4 additions & 4 deletions cmd/components/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -26,6 +27,7 @@ import (
grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -59,10 +61,8 @@ func (n *Proxy) Run() error {

// Stop terminates service
func (n *Proxy) Stop() error {
if err := n.svr.Stop(); err != nil {
return err
}
return nil
timeout := paramtable.Get().ProxyCfg.GracefulStopTimeout.GetAsDuration(time.Second)
return exitWhenStopTimeout(n.svr.Stop, timeout)
}

// GetComponentStates returns Proxy's states
Expand Down
8 changes: 4 additions & 4 deletions cmd/components/query_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -26,6 +27,7 @@ import (
grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -60,10 +62,8 @@ func (qs *QueryCoord) Run() error {

// Stop terminates service
func (qs *QueryCoord) Stop() error {
if err := qs.svr.Stop(); err != nil {
return err
}
return nil
timeout := paramtable.Get().QueryCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second)
return exitWhenStopTimeout(qs.svr.Stop, timeout)
}

// GetComponentStates returns QueryCoord's states
Expand Down
8 changes: 4 additions & 4 deletions cmd/components/query_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -26,6 +27,7 @@ import (
grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -60,10 +62,8 @@ func (q *QueryNode) Run() error {

// Stop terminates service
func (q *QueryNode) Stop() error {
if err := q.svr.Stop(); err != nil {
return err
}
return nil
timeout := paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)
return exitWhenStopTimeout(q.svr.Stop, timeout)
}

// GetComponentStates returns QueryNode's states
Expand Down
8 changes: 4 additions & 4 deletions cmd/components/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package components

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -26,6 +27,7 @@ import (
rc "github.com/milvus-io/milvus/internal/distributed/rootcoord"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -59,10 +61,8 @@ func (rc *RootCoord) Run() error {

// Stop terminates service
func (rc *RootCoord) Stop() error {
if rc.svr != nil {
return rc.svr.Stop()
}
return nil
timeout := paramtable.Get().RootCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second)
return exitWhenStopTimeout(rc.svr.Stop, timeout)
}

// GetComponentStates returns RootCoord's states
Expand Down
38 changes: 38 additions & 0 deletions cmd/components/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package components

import (
"context"
"os"
"time"

"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus/pkg/util/conc"
)

var errStopTimeout = errors.New("stop timeout")

// exitWhenStopTimeout stops a component with timeout and exit progress when timeout.
func exitWhenStopTimeout(stop func() error, timeout time.Duration) error {
err := stopWithTimeout(stop, timeout)
if errors.Is(err, errStopTimeout) {
os.Exit(1)
}
return err
}

// stopWithTimeout stops a component with timeout.
func stopWithTimeout(stop func() error, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

future := conc.Go(func() (struct{}, error) {
return struct{}{}, stop()
})
select {
case <-future.Inner():
return errors.Wrap(future.Err(), "failed to stop component")
case <-ctx.Done():
return errStopTimeout
}
}
38 changes: 38 additions & 0 deletions cmd/components/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package components

import (
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
)

func TestExitWithTimeout(t *testing.T) {
// only normal path can be tested.
targetErr := errors.New("stop error")
err := exitWhenStopTimeout(func() error {
time.Sleep(1 * time.Second)
return targetErr
}, 5*time.Second)
assert.ErrorIs(t, err, targetErr)
}

func TestStopWithTimeout(t *testing.T) {
ch := make(chan struct{})
stop := func() error {
<-ch
return nil
}

err := stopWithTimeout(stop, 1*time.Second)
assert.ErrorIs(t, err, errStopTimeout)

targetErr := errors.New("stop error")
stop = func() error {
return targetErr
}

err = stopWithTimeout(stop, 1*time.Second)
assert.ErrorIs(t, err, targetErr)
}
24 changes: 12 additions & 12 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,7 @@ func (s *Server) Register() error {

s.session.LivenessCheck(s.serverLoopCtx, func() {
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID()))
if err := s.Stop(); err != nil {
logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Dec()
// manually send signal to starter goroutine
if s.session.IsTriggerKill() {
if p, err := os.FindProcess(os.Getpid()); err == nil {
p.Signal(syscall.SIGINT)
}
}
os.Exit(1)
})
return nil
}
Expand Down Expand Up @@ -1050,15 +1041,23 @@ func (s *Server) Stop() error {
return nil
}
logutil.Logger(s.ctx).Info("server shutdown")
s.cluster.Close()
s.garbageCollector.close()
s.stopServerLoop()
logutil.Logger(s.ctx).Info("datacoord garbage collector stopped")

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
logutil.Logger(s.ctx).Info("datacoord compaction stopped")

s.indexBuilder.Stop()
logutil.Logger(s.ctx).Info("datacoord index builder stopped")

s.cluster.Close()
logutil.Logger(s.ctx).Info("index builder stopped")

s.stopServerLoop()
logutil.Logger(s.ctx).Info("serverloop stopped")

if s.session != nil {
s.session.Stop()
Expand All @@ -1067,6 +1066,7 @@ func (s *Server) Stop() error {
if s.icSession != nil {
s.icSession.Stop()
}
logutil.Logger(s.ctx).Warn("datacoord stop successful")

return nil
}
Expand Down
12 changes: 1 addition & 11 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"os"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -188,16 +187,7 @@ func (node *DataNode) Register() error {
// Start liveness check
node.session.LivenessCheck(node.ctx, func() {
log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID))
if err := node.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataNodeRole).Dec()
// manually send signal to starter goroutine
if node.session.TriggerKill {
if p, err := os.FindProcess(os.Getpid()); err == nil {
p.Signal(syscall.SIGINT)
}
}
os.Exit(1)
})

return nil
Expand Down
10 changes: 7 additions & 3 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,14 @@ func (s *Server) start() error {

// Stop stops the DataCoord server gracefully.
// Need to call the GracefulStop interface of grpc server and call the stop method of the inner DataCoord object.
func (s *Server) Stop() error {
func (s *Server) Stop() (err error) {
Params := &paramtable.Get().DataCoordGrpcServerCfg
log.Debug("Datacoord stop", zap.String("Address", Params.GetAddress()))
var err error
logger := log.With(zap.String("address", Params.GetAddress()))
logger.Info("Datacoord stopping")
defer func() {
logger.Info("Datacoord stopped", zap.Error(err))
}()

s.cancel()

if s.etcdCli != nil {
Expand Down
Loading

0 comments on commit 77e1237

Please sign in to comment.