Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Jan 3, 2025
1 parent 17f3ae3 commit 3297699
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func (s *GrpcServer) GetMinTS(
// GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across
// all keyspace groups.
func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, error) {
if s.IsClosed() {
return nil, ErrNotStarted
}
addrs := s.keyspaceGroupManager.GetTSOServiceAddrs()
if len(addrs) == 0 {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered")
Expand Down Expand Up @@ -392,6 +395,10 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}

// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return ErrNotStarted
}
if forwardedHost, err := s.getForwardedHost(ctx, stream.Context()); err != nil {
return err
} else if len(forwardedHost) > 0 {
Expand All @@ -412,10 +419,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
}

start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
}

if request.GetHeader().GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition,
"mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
Expand Down Expand Up @@ -779,6 +783,9 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest)

// IsSnapshotRecovering implements gRPC PDServer.
func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, request *pdpb.IsSnapshotRecoveringRequest) (*pdpb.IsSnapshotRecoveringResponse, error) {
if s.IsClosed() {
return nil, ErrNotStarted
}
// recovering mark is stored in etcd directly, there's no need to forward.
marked, err := s.Server.IsSnapshotRecovering(ctx)
if err != nil {
Expand Down

0 comments on commit 3297699

Please sign in to comment.