From e74cef6ec3c2bf0242dee68e36d178c784902bbe Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 1 Dec 2023 17:44:54 +0800 Subject: [PATCH] implement diagnosticsService in cdc --- cdc/server/server.go | 21 +++++++++++++-------- go.mod | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/cdc/server/server.go b/cdc/server/server.go index b029e9256d6..e093787a1f0 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -25,7 +25,9 @@ import ( "github.com/dustin/go-humanize" "github.com/gin-gonic/gin" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/log" + "github.com/pingcap/sysutil" "github.com/pingcap/tidb/util/gctuner" "github.com/pingcap/tiflow/cdc" "github.com/pingcap/tiflow/cdc/capture" @@ -76,11 +78,12 @@ type Server interface { // TODO: we need to make server more unit testable and add more test cases. // Especially we need to decouple the HTTPServer out of server. type server struct { - capture capture.Capture - tcpServer tcpserver.TCPServer - grpcService *p2p.ServerWrapper - statusServer *http.Server - etcdClient etcd.CDCEtcdClient + capture capture.Capture + tcpServer tcpserver.TCPServer + grpcService *p2p.ServerWrapper + diagnosticsService *sysutil.DiagnosticsServer + statusServer *http.Server + etcdClient etcd.CDCEtcdClient // pdClient is the default upstream PD client. // The PD acts as a metadata management service for TiCDC. pdClient pd.Client @@ -113,9 +116,10 @@ func New(pdEndpoints []string) (*server, error) { debugConfig := config.GetGlobalServerConfig().Debug s := &server{ - pdEndpoints: pdEndpoints, - grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()), - tcpServer: tcpServer, + pdEndpoints: pdEndpoints, + grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()), + diagnosticsService: sysutil.NewDiagnosticsServer(conf.LogFile), + tcpServer: tcpServer, } log.Info("CDC server created", @@ -358,6 +362,7 @@ func (s *server) run(ctx context.Context) (err error) { grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...) p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService) + diagnosticspb.RegisterDiagnosticsServer(grpcServer, s.diagnosticsService) eg.Go(func() error { return grpcServer.Serve(s.tcpServer.GrpcListener()) diff --git a/go.mod b/go.mod index 5967048505b..50598e55246 100644 --- a/go.mod +++ b/go.mod @@ -291,7 +291,7 @@ require ( github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d // indirect github.com/pingcap/fn v1.0.0 // indirect github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect - github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect + github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tipb v0.0.0-20230919054518-dfd7d194838f // indirect github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect github.com/pkg/errors v0.9.1 // indirect