From 32976993133f3cfa63a5030147ebc0dd8c48fe3e Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 3 Jan 2025 16:31:13 +0800 Subject: [PATCH] fix Signed-off-by: okJiang <819421878@qq.com> --- server/grpc_service.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 81b55c04e61..6c7766485b4 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -194,6 +194,9 @@ func (s *GrpcServer) GetMinTS( // GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across // all keyspace groups. func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, error) { + if s.IsClosed() { + return nil, ErrNotStarted + } addrs := s.keyspaceGroupManager.GetTSOServiceAddrs() if len(addrs) == 0 { return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered") @@ -392,6 +395,10 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { return errors.WithStack(err) } + // TSO uses leader lease to determine validity. No need to check leader here. + if s.IsClosed() { + return ErrNotStarted + } if forwardedHost, err := s.getForwardedHost(ctx, stream.Context()); err != nil { return err } else if len(forwardedHost) > 0 { @@ -412,10 +419,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { } start := time.Now() - // TSO uses leader lease to determine validity. No need to check leader here. - if s.IsClosed() { - return status.Errorf(codes.Unknown, "server not started") - } + if request.GetHeader().GetClusterId() != s.clusterID { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) @@ -779,6 +783,9 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) // IsSnapshotRecovering implements gRPC PDServer. func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, request *pdpb.IsSnapshotRecoveringRequest) (*pdpb.IsSnapshotRecoveringResponse, error) { + if s.IsClosed() { + return nil, ErrNotStarted + } // recovering mark is stored in etcd directly, there's no need to forward. marked, err := s.Server.IsSnapshotRecovering(ctx) if err != nil {