Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661) #9708

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,6 +82,7 @@ type captureImpl struct {
liveness model.Liveness
config *config.ServerConfig

pdClient pd.Client
pdEndpoints []string
ownerMu sync.Mutex
owner owner.Owner
Expand Down Expand Up @@ -133,6 +135,7 @@ func NewCapture(pdEndpoints []string,
tableActorSystem *system.System,
sortEngineMangerFactory *factory.SortEngineFactory,
sorterSystem *ssystem.System,
pdClient pd.Client,
) Capture {
conf := config.GetGlobalServerConfig()
return &captureImpl{
Expand All @@ -146,12 +149,11 @@ func NewCapture(pdEndpoints []string,
newProcessorManager: processor.NewManager,
newOwner: owner.NewOwner,
info: &model.CaptureInfo{},

useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,

migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,
migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
pdClient: pdClient,
}
}

Expand Down Expand Up @@ -217,7 +219,7 @@ func (c *captureImpl) reset(ctx context.Context) error {
c.upstreamManager.Close()
}
c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security)
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security, c.pdClient)
if err != nil {
return errors.Trace(err)
}
Expand Down
110 changes: 54 additions & 56 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,11 @@ import (
"github.com/pingcap/tiflow/pkg/util"
p2pProto "github.com/pingcap/tiflow/proto/p2p"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -82,13 +78,14 @@ type Server interface {
// TODO: we need to make server more unit testable and add more test cases.
// Especially we need to decouple the HTTPServer out of server.
type server struct {
capture capture.Capture
tcpServer tcpserver.TCPServer
grpcService *p2p.ServerWrapper
statusServer *http.Server
etcdClient etcd.CDCEtcdClient
pdEndpoints []string

capture capture.Capture
tcpServer tcpserver.TCPServer
grpcService *p2p.ServerWrapper
statusServer *http.Server
etcdClient etcd.CDCEtcdClient
pdEndpoints []string
pdClient pd.Client
pdAPIClient *pdutil.PDAPIClient
tableActorSystem *system.System

// If it's true sortEngineManager will be used, otherwise sorterSystem will be used.
Expand Down Expand Up @@ -140,35 +137,21 @@ func New(pdEndpoints []string) (*server, error) {
func (s *server) prepare(ctx context.Context) error {
conf := config.GetGlobalServerConfig()

grpcTLSOption, err := conf.Security.ToGRPCDialOption()
tlsConfig, err := conf.Security.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}

tlsConfig, err := conf.Security.ToTLSConfig()
grpcTLSOption, err := conf.Security.ToGRPCDialOption()
if err != nil {
return errors.Trace(err)
}

logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)

log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: s.pdEndpoints,
TLS: tlsConfig,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
AutoSyncInterval: 30 * time.Second,
DialOptions: []grpc.DialOption{
log.Info("create pd client", zap.Strings("endpoints", s.pdEndpoints))
s.pdClient, err = pd.NewClientWithContext(
ctx, s.pdEndpoints, conf.Security.PDSecurityOption(),
// the default `timeout` is 3s, maybe too small if the pd is busy,
// set to 10s to avoid frequent timeout.
pd.WithCustomTimeoutOption(10*time.Second),
pd.WithGRPCDialOptions(
grpcTLSOption,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand All @@ -180,12 +163,24 @@ func (s *server) prepare(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}),
},
})
))
if err != nil {
return errors.Trace(err)
}
s.pdAPIClient, err = pdutil.NewPDAPIClient(s.pdClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to create a the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, grpcTLSOption, s.pdEndpoints...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -196,6 +191,15 @@ func (s *server) prepare(ctx context.Context) error {
}
s.etcdClient = cdcEtcdClient

// Collect all endpoints from pd here to make the server more robust.
// Because in some scenarios, the deployer may only provide one pd endpoint,
// this will cause the TiCDC server to fail to restart when some pd node is down.
allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
return errors.Trace(err)
}
s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...)

err = s.initDir(ctx)
if err != nil {
return errors.Trace(err)
Expand All @@ -211,7 +215,7 @@ func (s *server) prepare(ctx context.Context) error {

s.capture = capture.NewCapture(
s.pdEndpoints, cdcEtcdClient, s.grpcService,
s.tableActorSystem, s.sortEngineFactory, s.sorterSystem)
s.tableActorSystem, s.sortEngineFactory, s.sorterSystem, s.pdClient)

return nil
}
Expand Down Expand Up @@ -340,18 +344,7 @@ func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) er
return nil
}

func (s *server) etcdHealthChecker(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption())
if err != nil {
return errors.Trace(err)
}
pc, err := pdutil.NewPDAPIClient(grpcClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
defer pc.Close()

func (s *server) upstreamPDHealthChecker(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

Expand All @@ -360,15 +353,15 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
endpoints, err := pc.CollectMemberEndpoints(ctx)
endpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
log.Warn("etcd health check: cannot collect all members", zap.Error(err))
continue
}
for _, endpoint := range endpoints {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if err := pc.Healthy(ctx, endpoint); err != nil {
if err := s.pdAPIClient.Healthy(ctx, endpoint); err != nil {
log.Warn("etcd health check error",
zap.String("endpoint", endpoint), zap.Error(err))
}
Expand All @@ -389,6 +382,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
func (s *server) run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer s.pdAPIClient.Close()

wg, cctx := errgroup.WithContext(ctx)

Expand All @@ -397,7 +391,7 @@ func (s *server) run(ctx context.Context) (err error) {
})

wg.Go(func() error {
return s.etcdHealthChecker(cctx)
return s.upstreamPDHealthChecker(cctx)
})

wg.Go(func() error {
Expand Down Expand Up @@ -460,6 +454,10 @@ func (s *server) Close() {
}
s.tcpServer = nil
}

if s.pdClient != nil {
s.pdClient.Close()
}
}

func (s *server) stopActorSystems() {
Expand Down
Loading