diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 09c5b01ba71..f6ebe513eae 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" + pd "github.com/tikv/pd/client" "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" @@ -79,6 +80,7 @@ type captureImpl struct { liveness model.Liveness config *config.ServerConfig + pdClient pd.Client pdEndpoints []string ownerMu sync.Mutex owner owner.Owner @@ -124,6 +126,7 @@ func NewCapture(pdEndpoints []string, etcdClient etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper, sortEngineMangerFactory *factory.SortEngineFactory, + pdClient pd.Client, ) Capture { conf := config.GetGlobalServerConfig() return &captureImpl{ @@ -137,8 +140,8 @@ func NewCapture(pdEndpoints []string, newOwner: owner.NewOwner, info: &model.CaptureInfo{}, sortEngineFactory: sortEngineMangerFactory, - - migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf), + migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf), + pdClient: pdClient, } } @@ -204,7 +207,7 @@ func (c *captureImpl) reset(ctx context.Context) error { c.upstreamManager.Close() } c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID()) - _, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security) + _, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security, c.pdClient) if err != nil { return errors.Trace(err) } diff --git a/cdc/server/server.go b/cdc/server/server.go index a9475508765..13ce2b95697 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -43,15 +43,11 @@ import ( "github.com/pingcap/tiflow/pkg/util" p2pProto "github.com/pingcap/tiflow/proto/p2p" pd "github.com/tikv/pd/client" - "go.etcd.io/etcd/client/pkg/v3/logutil" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/net/netutil" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/backoff" - "google.golang.org/grpc/keepalive" ) const ( @@ -79,11 +75,15 @@ type Server interface { // TODO: we need to make server more unit testable and add more test cases. // Especially we need to decouple the HTTPServer out of server. type server struct { - capture capture.Capture - tcpServer tcpserver.TCPServer - grpcService *p2p.ServerWrapper - statusServer *http.Server - etcdClient etcd.CDCEtcdClient + capture capture.Capture + tcpServer tcpserver.TCPServer + grpcService *p2p.ServerWrapper + statusServer *http.Server + etcdClient etcd.CDCEtcdClient + // pdClient is the default upstream PD client. + // The PD acts as a metadata management service for TiCDC. + pdClient pd.Client + pdAPIClient pdutil.PDAPIClient pdEndpoints []string sortEngineFactory *factory.SortEngineFactory } @@ -126,35 +126,21 @@ func New(pdEndpoints []string) (*server, error) { func (s *server) prepare(ctx context.Context) error { conf := config.GetGlobalServerConfig() - grpcTLSOption, err := conf.Security.ToGRPCDialOption() + tlsConfig, err := conf.Security.ToTLSConfig() if err != nil { return errors.Trace(err) } - - tlsConfig, err := conf.Security.ToTLSConfig() + grpcTLSOption, err := conf.Security.ToGRPCDialOption() if err != nil { return errors.Trace(err) } - - logConfig := logutil.DefaultZapLoggerConfig - logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) - - log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) - // we do not pass a `context` to the etcd client, - // to prevent it's cancelled when the server is closing. - // For example, when the non-owner node goes offline, - // it would resign the campaign key which was put by call `campaign`, - // if this is not done due to the passed context cancelled, - // the key will be kept for the lease TTL, which is 10 seconds, - // then cause the new owner cannot be elected immediately after the old owner offline. - // see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98 - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: s.pdEndpoints, - TLS: tlsConfig, - LogConfig: &logConfig, - DialTimeout: 5 * time.Second, - AutoSyncInterval: 30 * time.Second, - DialOptions: []grpc.DialOption{ + log.Info("create pd client", zap.Strings("endpoints", s.pdEndpoints)) + s.pdClient, err = pd.NewClientWithContext( + ctx, s.pdEndpoints, conf.Security.PDSecurityOption(), + // the default `timeout` is 3s, maybe too small if the pd is busy, + // set to 10s to avoid frequent timeout. + pd.WithCustomTimeoutOption(10*time.Second), + pd.WithGRPCDialOptions( grpcTLSOption, grpc.WithBlock(), grpc.WithConnectParams(grpc.ConnectParams{ @@ -166,12 +152,24 @@ func (s *server) prepare(ctx context.Context) error { }, MinConnectTimeout: 3 * time.Second, }), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 20 * time.Second, - }), - }, - }) + )) + if err != nil { + return errors.Trace(err) + } + s.pdAPIClient, err = pdutil.NewPDAPIClient(s.pdClient, conf.Security) + if err != nil { + return errors.Trace(err) + } + log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) + // we do not pass a `context` to create a the etcd client, + // to prevent it's cancelled when the server is closing. + // For example, when the non-owner node goes offline, + // it would resign the campaign key which was put by call `campaign`, + // if this is not done due to the passed context cancelled, + // the key will be kept for the lease TTL, which is 10 seconds, + // then cause the new owner cannot be elected immediately after the old owner offline. + // see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98 + etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, grpcTLSOption, s.pdEndpoints...) if err != nil { return errors.Trace(err) } @@ -182,6 +180,15 @@ func (s *server) prepare(ctx context.Context) error { } s.etcdClient = cdcEtcdClient + // Collect all endpoints from pd here to make the server more robust. + // Because in some scenarios, the deployer may only provide one pd endpoint, + // this will cause the TiCDC server to fail to restart when some pd node is down. + allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx) + if err != nil { + return errors.Trace(err) + } + s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...) + err = s.initDir(ctx) if err != nil { return errors.Trace(err) @@ -193,9 +200,8 @@ func (s *server) prepare(ctx context.Context) error { return errors.Trace(err) } - s.capture = capture.NewCapture( - s.pdEndpoints, cdcEtcdClient, s.grpcService, s.sortEngineFactory) - + s.capture = capture.NewCapture(s.pdEndpoints, cdcEtcdClient, + s.grpcService, s.sortEngineFactory, s.pdClient) return nil } @@ -294,18 +300,7 @@ func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) er return nil } -func (s *server) etcdHealthChecker(ctx context.Context) error { - conf := config.GetGlobalServerConfig() - grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption()) - if err != nil { - return errors.Trace(err) - } - pc, err := pdutil.NewPDAPIClient(grpcClient, conf.Security) - if err != nil { - return errors.Trace(err) - } - defer pc.Close() - +func (s *server) upstreamPDHealthChecker(ctx context.Context) error { ticker := time.NewTicker(time.Second * 3) defer ticker.Stop() @@ -314,7 +309,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - endpoints, err := pc.CollectMemberEndpoints(ctx) + endpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx) if err != nil { log.Warn("etcd health check: cannot collect all members", zap.Error(err)) continue @@ -322,7 +317,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { for _, endpoint := range endpoints { start := time.Now() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - if err := pc.Healthy(ctx, endpoint); err != nil { + if err := s.pdAPIClient.Healthy(ctx, endpoint); err != nil { log.Warn("etcd health check error", zap.String("endpoint", endpoint), zap.Error(err)) } @@ -343,6 +338,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { func (s *server) run(ctx context.Context) (err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + defer s.pdAPIClient.Close() eg, egCtx := errgroup.WithContext(ctx) @@ -351,7 +347,7 @@ func (s *server) run(ctx context.Context) (err error) { }) eg.Go(func() error { - return s.etcdHealthChecker(egCtx) + return s.upstreamPDHealthChecker(egCtx) }) eg.Go(func() error { @@ -404,6 +400,10 @@ func (s *server) Close() { } s.tcpServer = nil } + + if s.pdClient != nil { + s.pdClient.Close() + } } func (s *server) closeSortEngineFactory() { diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 1e96c1546be..e63f775793f 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -15,6 +15,9 @@ package etcd import ( "context" + "crypto/tls" + "fmt" + "sync" "time" "github.com/benbjohnson/clock" @@ -23,11 +26,20 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/pkg/errs" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/logutil" clientV3 "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" ) // etcd operation names @@ -313,3 +325,248 @@ func isRetryableError(rpcName string) retry.IsRetryable { return true } } + +// The following code is mainly copied from: +// https://github.com/tikv/pd/blob/master/pkg/utils/etcdutil/etcdutil.go +const ( + // defaultEtcdClientTimeout is the default timeout for etcd client. + defaultEtcdClientTimeout = 5 * time.Second + // defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive. + defaultDialKeepAliveTime = 10 * time.Second + // defaultDialKeepAliveTimeout is the time that the client waits for a response for the + // keep-alive probe. If the response is not received in this time, the connection is closed. + defaultDialKeepAliveTimeout = 3 * time.Second + // etcdServerOfflineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker. + etcdServerOfflineTimeout = 30 * time.Minute + // etcdServerDisconnectedTimeout is the timeout for an unhealthy etcd endpoint to be disconnected from healthy checker. + etcdServerDisconnectedTimeout = 1 * time.Minute + // healthyPath is the path to check etcd health. + healthyPath = "health" +) + +func newClient(tlsConfig *tls.Config, grpcDialOption grpc.DialOption, endpoints ...string) (*clientv3.Client, error) { + if len(endpoints) == 0 { + return nil, errors.New("empty endpoints") + } + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + + lgc := zap.NewProductionConfig() + lgc.Encoding = log.ZapEncodingName + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + TLS: tlsConfig, + LogConfig: &logConfig, + DialTimeout: defaultEtcdClientTimeout, + DialKeepAliveTime: defaultDialKeepAliveTime, + DialKeepAliveTimeout: defaultDialKeepAliveTimeout, + DialOptions: []grpc.DialOption{ + grpcDialOption, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 20 * time.Second, + }), + }, + }) + if err != nil { + return nil, errors.Trace(err) + } + return client, nil +} + +// CreateRawEtcdClient creates etcd v3 client with detecting endpoints. +// It will check the health of endpoints periodically, and update endpoints if needed. +func CreateRawEtcdClient(tlsConfig *tls.Config, grpcDialOption grpc.DialOption, endpoints ...string) (*clientv3.Client, error) { + client, err := newClient(tlsConfig, grpcDialOption, endpoints...) + if err != nil { + return nil, err + } + + tickerInterval := defaultDialKeepAliveTime + + checker := &healthyChecker{ + tlsConfig: tlsConfig, + grpcDialOption: grpcDialOption, + } + eps := syncUrls(client) + checker.update(eps) + + // Create a goroutine to check the health of etcd endpoints periodically. + go func(client *clientv3.Client) { + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + lastAvailable := time.Now() + for { + select { + case <-client.Ctx().Done(): + log.Info("etcd client is closed, exit health check goroutine") + checker.Range(func(key, value interface{}) bool { + client := value.(*healthyClient) + client.Close() + return true + }) + return + case <-ticker.C: + usedEps := client.Endpoints() + healthyEps := checker.patrol(client.Ctx()) + if len(healthyEps) == 0 { + // when all endpoints are unhealthy, try to reset endpoints to update connect + // rather than delete them to avoid there is no any endpoint in client. + // Note: reset endpoints will trigger subconn closed, and then trigger reconnect. + // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, + // and it cannot recover as soon as possible. + if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { + log.Info("no available endpoint, try to reset endpoints", zap.Strings("lastEndpoints", usedEps)) + client.SetEndpoints([]string{}...) + client.SetEndpoints(usedEps...) + } + } else { + if !util.AreStringSlicesEquivalent(healthyEps, usedEps) { + client.SetEndpoints(healthyEps...) + change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) + etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) + log.Info("update endpoints", zap.String("numChange", change), + zap.Strings("lastEndpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) + } + lastAvailable = time.Now() + } + } + } + }(client) + + // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. + go func(client *clientv3.Client) { + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + for { + select { + case <-client.Ctx().Done(): + log.Info("etcd client is closed, exit update endpoint goroutine") + return + case <-ticker.C: + eps := syncUrls(client) + checker.update(eps) + } + } + }(client) + + return client, nil +} + +type healthyClient struct { + *clientv3.Client + lastHealth time.Time +} + +type healthyChecker struct { + sync.Map // map[string]*healthyClient + tlsConfig *tls.Config + grpcDialOption grpc.DialOption +} + +func (checker *healthyChecker) patrol(ctx context.Context) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145 + var wg sync.WaitGroup + count := 0 + checker.Range(func(key, value interface{}) bool { + count++ + return true + }) + hch := make(chan string, count) + healthyList := make([]string, 0, count) + checker.Range(func(key, value interface{}) bool { + wg.Add(1) + go func(key, value interface{}) { + defer wg.Done() + ep := key.(string) + client := value.(*healthyClient) + if IsHealthy(ctx, client.Client) { + hch <- ep + checker.Store(ep, &healthyClient{ + Client: client.Client, + lastHealth: time.Now(), + }) + return + } + }(key, value) + return true + }) + wg.Wait() + close(hch) + for h := range hch { + healthyList = append(healthyList, h) + } + return healthyList +} + +func (checker *healthyChecker) update(eps []string) { + for _, ep := range eps { + // check if client exists, if not, create one, if exists, check if it's offline or disconnected. + if client, ok := checker.Load(ep); ok { + lastHealthy := client.(*healthyClient).lastHealth + if time.Since(lastHealthy) > etcdServerOfflineTimeout { + log.Info("some etcd server maybe offline", zap.String("endpoint", ep)) + checker.Delete(ep) + } + if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { + // try to reset client endpoint to trigger reconnect + client.(*healthyClient).Client.SetEndpoints([]string{}...) + client.(*healthyClient).Client.SetEndpoints(ep) + } + continue + } + checker.addClient(ep, time.Now()) + } +} + +func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { + client, err := newClient(checker.tlsConfig, checker.grpcDialOption, ep) + if err != nil { + log.Error("failed to create etcd healthy client", zap.Error(err)) + return + } + checker.Store(ep, &healthyClient{ + Client: client, + lastHealth: lastHealth, + }) +} + +func syncUrls(client *clientv3.Client) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183 + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), + etcdClientTimeoutDuration) + defer cancel() + mresp, err := client.MemberList(ctx) + if err != nil { + log.Error("failed to list members", errs.ZapError(err)) + return []string{} + } + var eps []string + for _, m := range mresp.Members { + if len(m.Name) != 0 && !m.IsLearner { + eps = append(eps, m.ClientURLs...) + } + } + return eps +} + +// IsHealthy checks if the etcd is healthy. +func IsHealthy(ctx context.Context, client *clientv3.Client) bool { + timeout := etcdClientTimeoutDuration + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout) + defer cancel() + _, err := client.Get(ctx, healthyPath) + // permission denied is OK since proposal goes through consensus to get it + // See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124 + return err == nil || err == rpctypes.ErrPermissionDenied +} diff --git a/pkg/etcd/metrics.go b/pkg/etcd/metrics.go index 45690cadbc4..1f4e55e7f24 100644 --- a/pkg/etcd/metrics.go +++ b/pkg/etcd/metrics.go @@ -23,7 +23,16 @@ var etcdRequestCounter = prometheus.NewCounterVec( Help: "request counter of etcd operation", }, []string{"type"}) +var etcdStateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "etcd", + Name: "etcd_client", + Help: "Etcd client states.", + }, []string{"type"}) + // InitMetrics registers the etcd request counter. func InitMetrics(registry *prometheus.Registry) { + prometheus.MustRegister(etcdStateGauge) registry.MustRegister(etcdRequestCounter) } diff --git a/pkg/upstream/manager.go b/pkg/upstream/manager.go index de5fb711619..d9c69077f4f 100644 --- a/pkg/upstream/manager.go +++ b/pkg/upstream/manager.go @@ -87,12 +87,16 @@ func NewManager4Test(pdClient pd.Client) *Manager { // AddDefaultUpstream add the default upstream func (m *Manager) AddDefaultUpstream(pdEndpoints []string, conf *security.Credential, + pdClient pd.Client, ) (*Upstream, error) { up := newUpstream(pdEndpoints, conf) + // use the pdClient pass from cdc server as the default upstream + // to reduce the creation times of pdClient to make cdc server more stable + up.isDefaultUpstream = true + up.PDClient = pdClient if err := m.initUpstreamFunc(m.ctx, up, m.gcServiceID); err != nil { return nil, err } - up.isDefaultUpstream = true m.defaultUpstream = up m.ups.Store(up.ID, up) log.Info("default upstream is added", zap.Uint64("id", up.ID)) diff --git a/pkg/upstream/manager_test.go b/pkg/upstream/manager_test.go index 870aa32f580..4bc544d9868 100644 --- a/pkg/upstream/manager_test.go +++ b/pkg/upstream/manager_test.go @@ -106,7 +106,8 @@ func TestAddDefaultUpstream(t *testing.T) { ) error { return errors.New("test") } - _, err := m.AddDefaultUpstream([]string{}, &security.Credential{}) + pdClient := &gc.MockPDClient{} + _, err := m.AddDefaultUpstream([]string{}, &security.Credential{}, pdClient) require.NotNil(t, err) up, err := m.GetDefaultUpstream() require.Nil(t, up) @@ -117,7 +118,7 @@ func TestAddDefaultUpstream(t *testing.T) { up.ID = uint64(2) return nil } - _, err = m.AddDefaultUpstream([]string{}, &security.Credential{}) + _, err = m.AddDefaultUpstream([]string{}, &security.Credential{}, pdClient) require.Nil(t, err) up, err = m.GetDefaultUpstream() require.NotNil(t, up) diff --git a/pkg/upstream/upstream.go b/pkg/upstream/upstream.go index fb576ddcf34..0c868791d06 100644 --- a/pkg/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -122,28 +122,30 @@ func initUpstream(ctx context.Context, up *Upstream, gcServiceID string) error { } // init the tikv client tls global config initGlobalConfig(up.SecurityConfig) - - up.PDClient, err = pd.NewClientWithContext( - ctx, up.PdEndpoints, up.SecurityConfig.PDSecurityOption(), - // the default `timeout` is 3s, maybe too small if the pd is busy, - // set to 10s to avoid frequent timeout. - pd.WithCustomTimeoutOption(10*time.Second), - pd.WithGRPCDialOptions( - grpcTLSOption, - grpc.WithBlock(), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: time.Second, - Multiplier: 1.1, - Jitter: 0.1, - MaxDelay: 3 * time.Second, - }, - MinConnectTimeout: 3 * time.Second, - }), - )) - if err != nil { - up.err.Store(err) - return errors.Trace(err) + // default upstream always use the pdClient pass from cdc server + if !up.isDefaultUpstream { + up.PDClient, err = pd.NewClientWithContext( + ctx, up.PdEndpoints, up.SecurityConfig.PDSecurityOption(), + // the default `timeout` is 3s, maybe too small if the pd is busy, + // set to 10s to avoid frequent timeout. + pd.WithCustomTimeoutOption(10*time.Second), + pd.WithGRPCDialOptions( + grpcTLSOption, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + )) + if err != nil { + up.err.Store(err) + return errors.Trace(err) + } } clusterID := up.PDClient.GetClusterID(ctx) if up.ID != 0 && up.ID != clusterID { @@ -240,7 +242,9 @@ func (up *Upstream) Close() { } atomic.StoreInt32(&up.status, closing) - if up.PDClient != nil { + // should never close default upstream's pdClient here + // because it's shared by the cdc server + if up.PDClient != nil && !up.isDefaultUpstream { up.PDClient.Close() } diff --git a/pkg/util/comparison.go b/pkg/util/comparison.go new file mode 100644 index 00000000000..f89f216e474 --- /dev/null +++ b/pkg/util/comparison.go @@ -0,0 +1,33 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "sort" + +// AreStringSlicesEquivalent checks if two string slices are equivalent. +// If the slices are of the same length and contain the same elements (but possibly in different order), the function returns true. +// Note: This function does modify the slices. Please be caution of this if you are using it. +func AreStringSlicesEquivalent(a, b []string) bool { + if len(a) != len(b) { + return false + } + sort.Strings(a) + sort.Strings(b) + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} diff --git a/pkg/util/comparison_test.go b/pkg/util/comparison_test.go new file mode 100644 index 00000000000..49cf8edb655 --- /dev/null +++ b/pkg/util/comparison_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package util + +import ( + "testing" +) + +func TestAreStringSlicesEquivalent(t *testing.T) { + tests := []struct { + name string + a []string + b []string + want bool + }{ + { + name: "equal slices", + a: []string{"foo", "bar", "baz"}, + b: []string{"baz", "foo", "bar"}, + want: true, + }, + { + name: "different lengths", + a: []string{"foo", "bar", "baz"}, + b: []string{"foo", "bar"}, + want: false, + }, + { + name: "different elements", + a: []string{"foo", "bar", "baz"}, + b: []string{"qux", "quux", "corge"}, + want: false, + }, + { + name: "nil elements", + a: []string{}, + b: []string{}, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := AreStringSlicesEquivalent(tt.a, tt.b); got != tt.want { + t.Errorf("AreStringSlicesEquivalent() = %v, want %v", got, tt.want) + } + }) + } +} + +// END: j3d8f4b2j2p9 diff --git a/tests/integration_tests/http_proxies/run-proxy.go b/tests/integration_tests/http_proxies/run-proxy.go index e8bc936c826..04f1df907bb 100644 --- a/tests/integration_tests/http_proxies/run-proxy.go +++ b/tests/integration_tests/http_proxies/run-proxy.go @@ -24,8 +24,15 @@ import ( ) func main() { + defer func() { + fmt.Println("proxy stopped") + }() + grpc_proxy.RegisterDefaultFlags() flag.Parse() + + log.Info("starting proxy", zap.Any("flags", flag.Args())) + proxy, err := grpc_proxy.New( grpc_proxy.WithInterceptor(intercept), grpc_proxy.DefaultFlags(), @@ -37,6 +44,7 @@ func main() { if err != nil { log.Fatal("failed to start proxy", zap.Error(err)) } + fmt.Println("proxy started") } func intercept(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { diff --git a/tests/integration_tests/http_proxies/run.sh b/tests/integration_tests/http_proxies/run.sh index 7ef620df91f..3be2ef77dbe 100644 --- a/tests/integration_tests/http_proxies/run.sh +++ b/tests/integration_tests/http_proxies/run.sh @@ -23,9 +23,11 @@ export UP_TIDB_HOST=$lan_addr \ proxy_pid="" proxy_port=$(shuf -i 10081-20081 -n1) function start_proxy() { - echo "dumpling grpc packet to $WORK_DIR/packets.dump..." - GO111MODULE=on WORK_DIR=$WORK_DIR go run $CUR/run-proxy.go --port=$proxy_port >$WORK_DIR/packets.dump & + echo "dumpling grpc packet to $WORK_DIR/test_proxy.log..." + GO111MODULE=on WORK_DIR=$WORK_DIR go run $CUR/run-proxy.go --port=$proxy_port >$WORK_DIR/test_proxy.log & proxy_pid=$! + echo "proxy port: $proxy_port" + echo "proxy pid: $proxy_pid" } function stop_proxy() { @@ -55,21 +57,25 @@ function prepare() { sleep 5 export http_proxy=http://127.0.0.1:$proxy_port export https_proxy=http://127.0.0.1:$proxy_port + echo "try to connect pd cluster via proxy, pd addr: $UP_PD_HOST_1:2379" ensure 10 curl http://$UP_PD_HOST_1:2379/ - echo started proxy at $proxy_pid + echo started proxy pid: $proxy_pid + echo started proxy at port: $proxy_port cd $WORK_DIR start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + echo "query start ts: $start_ts" run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + echo started cdc server successfully SINK_URI="blackhole:///" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" } function check() { - services=($(cat $WORK_DIR/packets.dump | xargs -L1 dirname | sort | uniq)) + services=($(cat $WORK_DIR/test_proxy.log | xargs -L1 dirname | sort | uniq)) service_type_count=${#services[@]} echo "captured services: " echo ${services[@]}