From 557360d29cfcdc7e39c05a044775d85c67c84337 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 31 Aug 2023 16:06:43 +0800 Subject: [PATCH 1/6] refine pdClient and etcdClient initialization --- cdc/capture/capture.go | 9 +- cdc/server/server.go | 110 ++++++++-------- pkg/etcd/client.go | 235 +++++++++++++++++++++++++++++++++++ pkg/etcd/metrics.go | 9 ++ pkg/upstream/manager.go | 6 +- pkg/upstream/manager_test.go | 5 +- pkg/upstream/upstream.go | 50 ++++---- pkg/util/comparison.go | 32 +++++ pkg/util/comparison_test.go | 55 ++++++++ 9 files changed, 426 insertions(+), 85 deletions(-) create mode 100644 pkg/util/comparison.go create mode 100644 pkg/util/comparison_test.go diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 8a3c0817272..b1b2fda90e7 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" + pd "github.com/tikv/pd/client" "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" @@ -81,6 +82,7 @@ type captureImpl struct { liveness model.Liveness config *config.ServerConfig + pdClient pd.Client pdEndpoints []string ownerMu sync.Mutex owner owner.Owner @@ -128,6 +130,7 @@ func NewCapture(pdEndpoints []string, etcdClient etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper, sortEngineMangerFactory *factory.SortEngineFactory, + pdClient pd.Client, ) Capture { conf := config.GetGlobalServerConfig() return &captureImpl{ @@ -142,8 +145,8 @@ func NewCapture(pdEndpoints []string, newController: controller.NewController, info: &model.CaptureInfo{}, sortEngineFactory: sortEngineMangerFactory, - - migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf), + migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf), + pdClient: pdClient, } } @@ -227,7 +230,7 @@ func (c *captureImpl) reset(ctx context.Context) error { c.upstreamManager.Close() } c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID()) - _, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security) + _, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security, c.pdClient) if err != nil { return errors.Trace(err) } diff --git a/cdc/server/server.go b/cdc/server/server.go index c4523e4bb1b..73778d8dde4 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -42,15 +42,11 @@ import ( "github.com/pingcap/tiflow/pkg/util" p2pProto "github.com/pingcap/tiflow/proto/p2p" pd "github.com/tikv/pd/client" - "go.etcd.io/etcd/client/pkg/v3/logutil" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/net/netutil" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/backoff" - "google.golang.org/grpc/keepalive" ) const ( @@ -78,11 +74,15 @@ type Server interface { // TODO: we need to make server more unit testable and add more test cases. // Especially we need to decouple the HTTPServer out of server. type server struct { - capture capture.Capture - tcpServer tcpserver.TCPServer - grpcService *p2p.ServerWrapper - statusServer *http.Server - etcdClient etcd.CDCEtcdClient + capture capture.Capture + tcpServer tcpserver.TCPServer + grpcService *p2p.ServerWrapper + statusServer *http.Server + etcdClient etcd.CDCEtcdClient + // pdClient is the default upstream PD client. + // The PD acts as a metadata management service for TiCDC. + pdClient pd.Client + pdAPIClient pdutil.PDAPIClient pdEndpoints []string sortEngineFactory *factory.SortEngineFactory } @@ -125,35 +125,21 @@ func New(pdEndpoints []string) (*server, error) { func (s *server) prepare(ctx context.Context) error { conf := config.GetGlobalServerConfig() - grpcTLSOption, err := conf.Security.ToGRPCDialOption() + tlsConfig, err := conf.Security.ToTLSConfig() if err != nil { return errors.Trace(err) } - - tlsConfig, err := conf.Security.ToTLSConfig() + grpcTLSOption, err := conf.Security.ToGRPCDialOption() if err != nil { return errors.Trace(err) } - - logConfig := logutil.DefaultZapLoggerConfig - logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) - - log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) - // we do not pass a `context` to the etcd client, - // to prevent it's cancelled when the server is closing. - // For example, when the non-owner node goes offline, - // it would resign the campaign key which was put by call `campaign`, - // if this is not done due to the passed context cancelled, - // the key will be kept for the lease TTL, which is 10 seconds, - // then cause the new owner cannot be elected immediately after the old owner offline. - // see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98 - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: s.pdEndpoints, - TLS: tlsConfig, - LogConfig: &logConfig, - DialTimeout: 5 * time.Second, - AutoSyncInterval: 30 * time.Second, - DialOptions: []grpc.DialOption{ + log.Info("create pd client", zap.Strings("endpoints", s.pdEndpoints)) + s.pdClient, err = pd.NewClientWithContext( + ctx, s.pdEndpoints, conf.Security.PDSecurityOption(), + // the default `timeout` is 3s, maybe too small if the pd is busy, + // set to 10s to avoid frequent timeout. + pd.WithCustomTimeoutOption(10*time.Second), + pd.WithGRPCDialOptions( grpcTLSOption, grpc.WithBlock(), grpc.WithConnectParams(grpc.ConnectParams{ @@ -165,12 +151,31 @@ func (s *server) prepare(ctx context.Context) error { }, MinConnectTimeout: 3 * time.Second, }), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 20 * time.Second, - }), - }, - }) + )) + if err != nil { + return errors.Trace(err) + } + s.pdAPIClient, err = pdutil.NewPDAPIClient(s.pdClient, conf.Security) + if err != nil { + return errors.Trace(err) + } + // Collect all endpoints from pd here to make the server more robust. + // Because in some scenarios, the deployer may only provide one pd endpoint, + // this will cause the TiCDC server to fail to restart when some pd node is down. + s.pdEndpoints, err = s.pdAPIClient.CollectMemberEndpoints(ctx) + if err != nil { + return errors.Trace(err) + } + log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) + // we do not pass a `context` to create a the etcd client, + // to prevent it's cancelled when the server is closing. + // For example, when the non-owner node goes offline, + // it would resign the campaign key which was put by call `campaign`, + // if this is not done due to the passed context cancelled, + // the key will be kept for the lease TTL, which is 10 seconds, + // then cause the new owner cannot be elected immediately after the old owner offline. + // see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98 + etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, s.pdEndpoints...) if err != nil { return errors.Trace(err) } @@ -192,9 +197,8 @@ func (s *server) prepare(ctx context.Context) error { return errors.Trace(err) } - s.capture = capture.NewCapture( - s.pdEndpoints, cdcEtcdClient, s.grpcService, s.sortEngineFactory) - + s.capture = capture.NewCapture(s.pdEndpoints, cdcEtcdClient, + s.grpcService, s.sortEngineFactory, s.pdClient) return nil } @@ -289,18 +293,7 @@ func (s *server) startStatusHTTP(lis net.Listener) error { return nil } -func (s *server) etcdHealthChecker(ctx context.Context) error { - conf := config.GetGlobalServerConfig() - grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption()) - if err != nil { - return errors.Trace(err) - } - pc, err := pdutil.NewPDAPIClient(grpcClient, conf.Security) - if err != nil { - return errors.Trace(err) - } - defer pc.Close() - +func (s *server) upstreamPDHealthChecker(ctx context.Context) error { ticker := time.NewTicker(time.Second * 3) defer ticker.Stop() @@ -309,7 +302,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - endpoints, err := pc.CollectMemberEndpoints(ctx) + endpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx) if err != nil { log.Warn("etcd health check: cannot collect all members", zap.Error(err)) continue @@ -317,7 +310,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { for _, endpoint := range endpoints { start := time.Now() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - if err := pc.Healthy(ctx, endpoint); err != nil { + if err := s.pdAPIClient.Healthy(ctx, endpoint); err != nil { log.Warn("etcd health check error", zap.String("endpoint", endpoint), zap.Error(err)) } @@ -338,6 +331,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { func (s *server) run(ctx context.Context) (err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + defer s.pdAPIClient.Close() eg, egCtx := errgroup.WithContext(ctx) @@ -346,7 +340,7 @@ func (s *server) run(ctx context.Context) (err error) { }) eg.Go(func() error { - return s.etcdHealthChecker(egCtx) + return s.upstreamPDHealthChecker(egCtx) }) eg.Go(func() error { @@ -401,6 +395,10 @@ func (s *server) Close() { } s.tcpServer = nil } + + if s.pdClient != nil { + s.pdClient.Close() + } } func (s *server) closeSortEngineFactory() { diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 1e96c1546be..c8280506b95 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -15,6 +15,9 @@ package etcd import ( "context" + "crypto/tls" + "fmt" + "sync" "time" "github.com/benbjohnson/clock" @@ -23,10 +26,16 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/pkg/errs" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/logutil" clientV3 "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" ) @@ -313,3 +322,229 @@ func isRetryableError(rpcName string) retry.IsRetryable { return true } } + +// The following code is mainly copied from: +// https://github.com/tikv/pd/blob/master/pkg/utils/etcdutil/etcdutil.go +const ( + // defaultEtcdClientTimeout is the default timeout for etcd client. + defaultEtcdClientTimeout = 5 * time.Second + // defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive. + defaultDialKeepAliveTime = 10 * time.Second + // defaultDialKeepAliveTimeout is the time that the client waits for a response for the + // keep-alive probe. If the response is not received in this time, the connection is closed. + defaultDialKeepAliveTimeout = 3 * time.Second + // etcdServerOfflineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker. + etcdServerOfflineTimeout = 30 * time.Minute + // etcdServerDisconnectedTimeout is the timeout for an unhealthy etcd endpoint to be disconnected from healthy checker. + etcdServerDisconnectedTimeout = 1 * time.Minute + // healthyPath is the path to check etcd health. + healthyPath = "health" +) + +func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, error) { + if len(endpoints) == 0 { + return nil, errors.New("empty endpoints") + } + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + + lgc := zap.NewProductionConfig() + lgc.Encoding = log.ZapEncodingName + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + TLS: tlsConfig, + LogConfig: &logConfig, + DialTimeout: defaultEtcdClientTimeout, + DialKeepAliveTime: defaultDialKeepAliveTime, + DialKeepAliveTimeout: defaultDialKeepAliveTimeout, + }) + if err != nil { + return nil, errors.Trace(err) + } + return client, err +} + +// CreateRawEtcdClient creates etcd v3 client with detecting endpoints. +// It will check the health of endpoints periodically, and update endpoints if needed. +func CreateRawEtcdClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, error) { + client, err := newClient(tlsConfig, endpoints...) + if err != nil { + return nil, err + } + + tickerInterval := defaultDialKeepAliveTime + + checker := &healthyChecker{ + tlsConfig: tlsConfig, + } + eps := syncUrls(client) + checker.update(eps) + + // Create a goroutine to check the health of etcd endpoints periodically. + go func(client *clientv3.Client) { + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + lastAvailable := time.Now() + for { + select { + case <-client.Ctx().Done(): + log.Info("etcd client is closed, exit health check goroutine") + checker.Range(func(key, value interface{}) bool { + client := value.(*healthyClient) + client.Close() + return true + }) + return + case <-ticker.C: + usedEps := client.Endpoints() + healthyEps := checker.patrol(client.Ctx()) + if len(healthyEps) == 0 { + // when all endpoints are unhealthy, try to reset endpoints to update connect + // rather than delete them to avoid there is no any endpoint in client. + // Note: reset endpoints will trigger subconn closed, and then trigger reconnect. + // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, + // and it cannot recover as soon as possible. + if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { + log.Info("no available endpoint, try to reset endpoints", zap.Strings("lastEndpoints", usedEps)) + client.SetEndpoints([]string{}...) + client.SetEndpoints(usedEps...) + } + } else { + if !util.AreStringSlicesEquivalent(healthyEps, usedEps) { + client.SetEndpoints(healthyEps...) + change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) + etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) + log.Info("update endpoints", zap.String("numChange", change), + zap.Strings("lastEndpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) + } + lastAvailable = time.Now() + } + } + } + }(client) + + // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. + go func(client *clientv3.Client) { + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + for { + select { + case <-client.Ctx().Done(): + log.Info("etcd client is closed, exit update endpoint goroutine") + return + case <-ticker.C: + eps := syncUrls(client) + checker.update(eps) + } + } + }(client) + + return client, err +} + +type healthyClient struct { + *clientv3.Client + lastHealth time.Time +} + +type healthyChecker struct { + sync.Map // map[string]*healthyClient + tlsConfig *tls.Config +} + +func (checker *healthyChecker) patrol(ctx context.Context) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145 + var wg sync.WaitGroup + count := 0 + checker.Range(func(key, value interface{}) bool { + count++ + return true + }) + hch := make(chan string, count) + healthyList := make([]string, 0, count) + checker.Range(func(key, value interface{}) bool { + wg.Add(1) + go func(key, value interface{}) { + defer wg.Done() + ep := key.(string) + client := value.(*healthyClient) + if IsHealthy(ctx, client.Client) { + hch <- ep + checker.Store(ep, &healthyClient{ + Client: client.Client, + lastHealth: time.Now(), + }) + return + } + }(key, value) + return true + }) + wg.Wait() + close(hch) + for h := range hch { + healthyList = append(healthyList, h) + } + return healthyList +} + +func (checker *healthyChecker) update(eps []string) { + for _, ep := range eps { + // check if client exists, if not, create one, if exists, check if it's offline or disconnected. + if client, ok := checker.Load(ep); ok { + lastHealthy := client.(*healthyClient).lastHealth + if time.Since(lastHealthy) > etcdServerOfflineTimeout { + log.Info("some etcd server maybe offline", zap.String("endpoint", ep)) + checker.Delete(ep) + } + if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { + // try to reset client endpoint to trigger reconnect + client.(*healthyClient).Client.SetEndpoints([]string{}...) + client.(*healthyClient).Client.SetEndpoints(ep) + } + continue + } + checker.addClient(ep, time.Now()) + } +} + +func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { + client, err := newClient(checker.tlsConfig, ep) + if err != nil { + log.Error("failed to create etcd healthy client", zap.Error(err)) + return + } + checker.Store(ep, &healthyClient{ + Client: client, + lastHealth: lastHealth, + }) +} + +func syncUrls(client *clientv3.Client) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183 + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), + etcdClientTimeoutDuration) + defer cancel() + mresp, err := client.MemberList(ctx) + if err != nil { + log.Error("failed to list members", errs.ZapError(err)) + return []string{} + } + var eps []string + for _, m := range mresp.Members { + if len(m.Name) != 0 && !m.IsLearner { + eps = append(eps, m.ClientURLs...) + } + } + return eps +} + +// IsHealthy checks if the etcd is healthy. +func IsHealthy(ctx context.Context, client *clientv3.Client) bool { + timeout := etcdClientTimeoutDuration + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout) + defer cancel() + _, err := client.Get(ctx, healthyPath) + // permission denied is OK since proposal goes through consensus to get it + // See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124 + return err == nil || err == rpctypes.ErrPermissionDenied +} diff --git a/pkg/etcd/metrics.go b/pkg/etcd/metrics.go index 45690cadbc4..1f4e55e7f24 100644 --- a/pkg/etcd/metrics.go +++ b/pkg/etcd/metrics.go @@ -23,7 +23,16 @@ var etcdRequestCounter = prometheus.NewCounterVec( Help: "request counter of etcd operation", }, []string{"type"}) +var etcdStateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "etcd", + Name: "etcd_client", + Help: "Etcd client states.", + }, []string{"type"}) + // InitMetrics registers the etcd request counter. func InitMetrics(registry *prometheus.Registry) { + prometheus.MustRegister(etcdStateGauge) registry.MustRegister(etcdRequestCounter) } diff --git a/pkg/upstream/manager.go b/pkg/upstream/manager.go index de5fb711619..d9c69077f4f 100644 --- a/pkg/upstream/manager.go +++ b/pkg/upstream/manager.go @@ -87,12 +87,16 @@ func NewManager4Test(pdClient pd.Client) *Manager { // AddDefaultUpstream add the default upstream func (m *Manager) AddDefaultUpstream(pdEndpoints []string, conf *security.Credential, + pdClient pd.Client, ) (*Upstream, error) { up := newUpstream(pdEndpoints, conf) + // use the pdClient pass from cdc server as the default upstream + // to reduce the creation times of pdClient to make cdc server more stable + up.isDefaultUpstream = true + up.PDClient = pdClient if err := m.initUpstreamFunc(m.ctx, up, m.gcServiceID); err != nil { return nil, err } - up.isDefaultUpstream = true m.defaultUpstream = up m.ups.Store(up.ID, up) log.Info("default upstream is added", zap.Uint64("id", up.ID)) diff --git a/pkg/upstream/manager_test.go b/pkg/upstream/manager_test.go index 870aa32f580..4bc544d9868 100644 --- a/pkg/upstream/manager_test.go +++ b/pkg/upstream/manager_test.go @@ -106,7 +106,8 @@ func TestAddDefaultUpstream(t *testing.T) { ) error { return errors.New("test") } - _, err := m.AddDefaultUpstream([]string{}, &security.Credential{}) + pdClient := &gc.MockPDClient{} + _, err := m.AddDefaultUpstream([]string{}, &security.Credential{}, pdClient) require.NotNil(t, err) up, err := m.GetDefaultUpstream() require.Nil(t, up) @@ -117,7 +118,7 @@ func TestAddDefaultUpstream(t *testing.T) { up.ID = uint64(2) return nil } - _, err = m.AddDefaultUpstream([]string{}, &security.Credential{}) + _, err = m.AddDefaultUpstream([]string{}, &security.Credential{}, pdClient) require.Nil(t, err) up, err = m.GetDefaultUpstream() require.NotNil(t, up) diff --git a/pkg/upstream/upstream.go b/pkg/upstream/upstream.go index fb576ddcf34..0c868791d06 100644 --- a/pkg/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -122,28 +122,30 @@ func initUpstream(ctx context.Context, up *Upstream, gcServiceID string) error { } // init the tikv client tls global config initGlobalConfig(up.SecurityConfig) - - up.PDClient, err = pd.NewClientWithContext( - ctx, up.PdEndpoints, up.SecurityConfig.PDSecurityOption(), - // the default `timeout` is 3s, maybe too small if the pd is busy, - // set to 10s to avoid frequent timeout. - pd.WithCustomTimeoutOption(10*time.Second), - pd.WithGRPCDialOptions( - grpcTLSOption, - grpc.WithBlock(), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: time.Second, - Multiplier: 1.1, - Jitter: 0.1, - MaxDelay: 3 * time.Second, - }, - MinConnectTimeout: 3 * time.Second, - }), - )) - if err != nil { - up.err.Store(err) - return errors.Trace(err) + // default upstream always use the pdClient pass from cdc server + if !up.isDefaultUpstream { + up.PDClient, err = pd.NewClientWithContext( + ctx, up.PdEndpoints, up.SecurityConfig.PDSecurityOption(), + // the default `timeout` is 3s, maybe too small if the pd is busy, + // set to 10s to avoid frequent timeout. + pd.WithCustomTimeoutOption(10*time.Second), + pd.WithGRPCDialOptions( + grpcTLSOption, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + )) + if err != nil { + up.err.Store(err) + return errors.Trace(err) + } } clusterID := up.PDClient.GetClusterID(ctx) if up.ID != 0 && up.ID != clusterID { @@ -240,7 +242,9 @@ func (up *Upstream) Close() { } atomic.StoreInt32(&up.status, closing) - if up.PDClient != nil { + // should never close default upstream's pdClient here + // because it's shared by the cdc server + if up.PDClient != nil && !up.isDefaultUpstream { up.PDClient.Close() } diff --git a/pkg/util/comparison.go b/pkg/util/comparison.go new file mode 100644 index 00000000000..7479a56197c --- /dev/null +++ b/pkg/util/comparison.go @@ -0,0 +1,32 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "sort" + +// AreStringSlicesEquivalent checks if two string slices are equivalent. +// If the slices are of the same length and contain the same elements (but possibly in different order), the function returns true. +func AreStringSlicesEquivalent(a, b []string) bool { + if len(a) != len(b) { + return false + } + sort.Strings(a) + sort.Strings(b) + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} diff --git a/pkg/util/comparison_test.go b/pkg/util/comparison_test.go new file mode 100644 index 00000000000..0d2f014d9da --- /dev/null +++ b/pkg/util/comparison_test.go @@ -0,0 +1,55 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package util + +import ( + "testing" +) + +func TestAreStringSlicesEquivalent(t *testing.T) { + tests := []struct { + name string + a []string + b []string + want bool + }{ + { + name: "equal slices", + a: []string{"foo", "bar", "baz"}, + b: []string{"baz", "foo", "bar"}, + want: true, + }, + { + name: "different lengths", + a: []string{"foo", "bar", "baz"}, + b: []string{"foo", "bar"}, + want: false, + }, + { + name: "different elements", + a: []string{"foo", "bar", "baz"}, + b: []string{"qux", "quux", "corge"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := AreStringSlicesEquivalent(tt.a, tt.b); got != tt.want { + t.Errorf("AreStringSlicesEquivalent() = %v, want %v", got, tt.want) + } + }) + } +} + +// END: j3d8f4b2j2p9 From 48bb6e31c38c091594408217e1311fd32ca0b54e Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 5 Sep 2023 11:12:29 +0800 Subject: [PATCH 2/6] resolve comments --- pkg/util/comparison.go | 1 + pkg/util/comparison_test.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/pkg/util/comparison.go b/pkg/util/comparison.go index 7479a56197c..f89f216e474 100644 --- a/pkg/util/comparison.go +++ b/pkg/util/comparison.go @@ -17,6 +17,7 @@ import "sort" // AreStringSlicesEquivalent checks if two string slices are equivalent. // If the slices are of the same length and contain the same elements (but possibly in different order), the function returns true. +// Note: This function does modify the slices. Please be caution of this if you are using it. func AreStringSlicesEquivalent(a, b []string) bool { if len(a) != len(b) { return false diff --git a/pkg/util/comparison_test.go b/pkg/util/comparison_test.go index 0d2f014d9da..49cf8edb655 100644 --- a/pkg/util/comparison_test.go +++ b/pkg/util/comparison_test.go @@ -41,6 +41,12 @@ func TestAreStringSlicesEquivalent(t *testing.T) { b: []string{"qux", "quux", "corge"}, want: false, }, + { + name: "nil elements", + a: []string{}, + b: []string{}, + want: true, + }, } for _, tt := range tests { From 8d4fe577fa1dbf9d0ad14257e257ea3597305d78 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 6 Sep 2023 13:23:16 +0800 Subject: [PATCH 3/6] resolve comments --- pkg/etcd/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index c8280506b95..82df923c69a 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -361,7 +361,7 @@ func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, er if err != nil { return nil, errors.Trace(err) } - return client, err + return client, nil } // CreateRawEtcdClient creates etcd v3 client with detecting endpoints. @@ -439,7 +439,7 @@ func CreateRawEtcdClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3. } }(client) - return client, err + return client, nil } type healthyClient struct { From 1a79ae38d20bcd6f7e4a8d88bfd059362276df53 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 7 Sep 2023 16:16:53 +0800 Subject: [PATCH 4/6] fix integration test fails --- cdc/server/server.go | 3 ++- tests/integration_tests/http_proxies/run-proxy.go | 8 ++++++++ tests/integration_tests/http_proxies/run.sh | 14 ++++++++++---- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/cdc/server/server.go b/cdc/server/server.go index 73778d8dde4..56243d823eb 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -162,10 +162,11 @@ func (s *server) prepare(ctx context.Context) error { // Collect all endpoints from pd here to make the server more robust. // Because in some scenarios, the deployer may only provide one pd endpoint, // this will cause the TiCDC server to fail to restart when some pd node is down. - s.pdEndpoints, err = s.pdAPIClient.CollectMemberEndpoints(ctx) + allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx) if err != nil { return errors.Trace(err) } + s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...) log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) // we do not pass a `context` to create a the etcd client, // to prevent it's cancelled when the server is closing. diff --git a/tests/integration_tests/http_proxies/run-proxy.go b/tests/integration_tests/http_proxies/run-proxy.go index e8bc936c826..04f1df907bb 100644 --- a/tests/integration_tests/http_proxies/run-proxy.go +++ b/tests/integration_tests/http_proxies/run-proxy.go @@ -24,8 +24,15 @@ import ( ) func main() { + defer func() { + fmt.Println("proxy stopped") + }() + grpc_proxy.RegisterDefaultFlags() flag.Parse() + + log.Info("starting proxy", zap.Any("flags", flag.Args())) + proxy, err := grpc_proxy.New( grpc_proxy.WithInterceptor(intercept), grpc_proxy.DefaultFlags(), @@ -37,6 +44,7 @@ func main() { if err != nil { log.Fatal("failed to start proxy", zap.Error(err)) } + fmt.Println("proxy started") } func intercept(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { diff --git a/tests/integration_tests/http_proxies/run.sh b/tests/integration_tests/http_proxies/run.sh index 7ef620df91f..c7493f60a8c 100644 --- a/tests/integration_tests/http_proxies/run.sh +++ b/tests/integration_tests/http_proxies/run.sh @@ -23,9 +23,11 @@ export UP_TIDB_HOST=$lan_addr \ proxy_pid="" proxy_port=$(shuf -i 10081-20081 -n1) function start_proxy() { - echo "dumpling grpc packet to $WORK_DIR/packets.dump..." - GO111MODULE=on WORK_DIR=$WORK_DIR go run $CUR/run-proxy.go --port=$proxy_port >$WORK_DIR/packets.dump & + echo "dumpling grpc packet to $WORK_DIR/test_proxy.log..." + GO111MODULE=on WORK_DIR=$WORK_DIR go run $CUR/run-proxy.go --port=$proxy_port >$WORK_DIR/test_proxy.log & proxy_pid=$! + echo "proxy port: $proxy_port" + echo "proxy pid: $proxy_pid" } function stop_proxy() { @@ -55,21 +57,25 @@ function prepare() { sleep 5 export http_proxy=http://127.0.0.1:$proxy_port export https_proxy=http://127.0.0.1:$proxy_port + echo "try to connect pd cluster via proxy, pd addr: $UP_PD_HOST_1:2379" ensure 10 curl http://$UP_PD_HOST_1:2379/ - echo started proxy at $proxy_pid + echo started proxy pid: $proxy_pid + echo started proxy at port: $proxy_port cd $WORK_DIR start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + echo "query start ts: $start_ts" run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + echo started cdc server successfully SINK_URI="blackhole:///" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" } function check() { - services=($(cat $WORK_DIR/packets.dump | xargs -L1 dirname | sort | uniq)) + services=($(cat $WORK_DIR/test_proxy.lo | xargs -L1 dirname | sort | uniq)) service_type_count=${#services[@]} echo "captured services: " echo ${services[@]} From ffd240c66a4226e92b4a1daf6cf946aa576d5b1a Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Fri, 8 Sep 2023 11:34:58 +0800 Subject: [PATCH 5/6] fix integration test fails 3 --- cdc/server/server.go | 2 +- pkg/etcd/client.go | 36 +++++++++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/cdc/server/server.go b/cdc/server/server.go index 56243d823eb..cd575a68d89 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -176,7 +176,7 @@ func (s *server) prepare(ctx context.Context) error { // the key will be kept for the lease TTL, which is 10 seconds, // then cause the new owner cannot be elected immediately after the old owner offline. // see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98 - etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, s.pdEndpoints...) + etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, grpcTLSOption, s.pdEndpoints...) if err != nil { return errors.Trace(err) } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 82df923c69a..e63f775793f 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -36,7 +36,10 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" ) // etcd operation names @@ -341,7 +344,7 @@ const ( healthyPath = "health" ) -func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, error) { +func newClient(tlsConfig *tls.Config, grpcDialOption grpc.DialOption, endpoints ...string) (*clientv3.Client, error) { if len(endpoints) == 0 { return nil, errors.New("empty endpoints") } @@ -357,6 +360,23 @@ func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, er DialTimeout: defaultEtcdClientTimeout, DialKeepAliveTime: defaultDialKeepAliveTime, DialKeepAliveTimeout: defaultDialKeepAliveTimeout, + DialOptions: []grpc.DialOption{ + grpcDialOption, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 20 * time.Second, + }), + }, }) if err != nil { return nil, errors.Trace(err) @@ -366,8 +386,8 @@ func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, er // CreateRawEtcdClient creates etcd v3 client with detecting endpoints. // It will check the health of endpoints periodically, and update endpoints if needed. -func CreateRawEtcdClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, error) { - client, err := newClient(tlsConfig, endpoints...) +func CreateRawEtcdClient(tlsConfig *tls.Config, grpcDialOption grpc.DialOption, endpoints ...string) (*clientv3.Client, error) { + client, err := newClient(tlsConfig, grpcDialOption, endpoints...) if err != nil { return nil, err } @@ -375,7 +395,8 @@ func CreateRawEtcdClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3. tickerInterval := defaultDialKeepAliveTime checker := &healthyChecker{ - tlsConfig: tlsConfig, + tlsConfig: tlsConfig, + grpcDialOption: grpcDialOption, } eps := syncUrls(client) checker.update(eps) @@ -448,8 +469,9 @@ type healthyClient struct { } type healthyChecker struct { - sync.Map // map[string]*healthyClient - tlsConfig *tls.Config + sync.Map // map[string]*healthyClient + tlsConfig *tls.Config + grpcDialOption grpc.DialOption } func (checker *healthyChecker) patrol(ctx context.Context) []string { @@ -508,7 +530,7 @@ func (checker *healthyChecker) update(eps []string) { } func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { - client, err := newClient(checker.tlsConfig, ep) + client, err := newClient(checker.tlsConfig, checker.grpcDialOption, ep) if err != nil { log.Error("failed to create etcd healthy client", zap.Error(err)) return From 1480ee4e1898bf85ec9c5c5cb8528518bc9651a2 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Fri, 8 Sep 2023 13:21:01 +0800 Subject: [PATCH 6/6] fix integration test fails 4 --- cdc/server/server.go | 17 +++++++++-------- tests/integration_tests/http_proxies/run.sh | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cdc/server/server.go b/cdc/server/server.go index cd575a68d89..4b5898c875e 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -159,14 +159,6 @@ func (s *server) prepare(ctx context.Context) error { if err != nil { return errors.Trace(err) } - // Collect all endpoints from pd here to make the server more robust. - // Because in some scenarios, the deployer may only provide one pd endpoint, - // this will cause the TiCDC server to fail to restart when some pd node is down. - allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx) - if err != nil { - return errors.Trace(err) - } - s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...) log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) // we do not pass a `context` to create a the etcd client, // to prevent it's cancelled when the server is closing. @@ -187,6 +179,15 @@ func (s *server) prepare(ctx context.Context) error { } s.etcdClient = cdcEtcdClient + // Collect all endpoints from pd here to make the server more robust. + // Because in some scenarios, the deployer may only provide one pd endpoint, + // this will cause the TiCDC server to fail to restart when some pd node is down. + allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx) + if err != nil { + return errors.Trace(err) + } + s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...) + err = s.initDir(ctx) if err != nil { return errors.Trace(err) diff --git a/tests/integration_tests/http_proxies/run.sh b/tests/integration_tests/http_proxies/run.sh index c7493f60a8c..3be2ef77dbe 100644 --- a/tests/integration_tests/http_proxies/run.sh +++ b/tests/integration_tests/http_proxies/run.sh @@ -75,7 +75,7 @@ function prepare() { } function check() { - services=($(cat $WORK_DIR/test_proxy.lo | xargs -L1 dirname | sort | uniq)) + services=($(cat $WORK_DIR/test_proxy.log | xargs -L1 dirname | sort | uniq)) service_type_count=${#services[@]} echo "captured services: " echo ${services[@]}