diff --git a/cdc/server/server.go b/cdc/server/server.go index b7c510c2d5d..bba9d2c9737 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -26,7 +26,9 @@ import ( "github.com/dustin/go-humanize" "github.com/gin-gonic/gin" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/log" + "github.com/pingcap/sysutil" "github.com/pingcap/tidb/util/gctuner" "github.com/pingcap/tiflow/cdc" "github.com/pingcap/tiflow/cdc/capture" @@ -79,15 +81,16 @@ type Server interface { // TODO: we need to make server more unit testable and add more test cases. // Especially we need to decouple the HTTPServer out of server. type server struct { - capture capture.Capture - tcpServer tcpserver.TCPServer - grpcService *p2p.ServerWrapper - statusServer *http.Server - etcdClient etcd.CDCEtcdClient - pdEndpoints []string - pdClient pd.Client - pdAPIClient *pdutil.PDAPIClient - tableActorSystem *system.System + capture capture.Capture + tcpServer tcpserver.TCPServer + grpcService *p2p.ServerWrapper + diagnosticsService *sysutil.DiagnosticsServer + statusServer *http.Server + etcdClient etcd.CDCEtcdClient + pdEndpoints []string + pdClient pd.Client + pdAPIClient *pdutil.PDAPIClient + tableActorSystem *system.System // If it's true sortEngineManager will be used, otherwise sorterSystem will be used. useEventSortEngine bool @@ -122,9 +125,10 @@ func New(pdEndpoints []string) (*server, error) { useEventSortEngine := debugConfig.EnablePullBasedSink && debugConfig.EnableDBSorter s := &server{ - pdEndpoints: pdEndpoints, - grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()), - tcpServer: tcpServer, + pdEndpoints: pdEndpoints, + grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()), + diagnosticsService: sysutil.NewDiagnosticsServer(conf.LogFile), + tcpServer: tcpServer, useEventSortEngine: useEventSortEngine, } @@ -410,20 +414,22 @@ func (s *server) run(ctx context.Context) (err error) { }) } + grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...) + diagnosticspb.RegisterDiagnosticsServer(grpcServer, s.diagnosticsService) + if conf.Debug.EnableNewScheduler { - grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...) p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService) - - wg.Go(func() error { - return grpcServer.Serve(s.tcpServer.GrpcListener()) - }) - wg.Go(func() error { - <-cctx.Done() - grpcServer.Stop() - return nil - }) } + wg.Go(func() error { + return grpcServer.Serve(s.tcpServer.GrpcListener()) + }) + wg.Go(func() error { + <-cctx.Done() + grpcServer.Stop() + return nil + }) + return wg.Wait() } diff --git a/go.mod b/go.mod index 7a74ebc14b5..24ba89fd8fa 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c + github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb v1.1.0-beta.0.20231124055343-a06279f27e4c github.com/pingcap/tidb-tools v7.0.1-0.20231228094724-d6c7fc83380a+incompatible github.com/pingcap/tidb/parser v0.0.0-20231124055343-a06279f27e4c @@ -233,7 +234,6 @@ require ( github.com/pingcap/badger v1.5.1-0.20220314162537-ab58fbf40580 // indirect github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 // indirect github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect - github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 // indirect github.com/pingcap/tipb v0.0.0-20221123081521-2fb828910813 // indirect github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect github.com/pkg/errors v0.9.1 // indirect