Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: unify the gRPC errors #8910

Merged
merged 4 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,9 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
["PD:mcs:ErrNotFoundSchedulingPrimary"]
error = '''
cannot find scheduling address
cannot find scheduling primary
'''

["PD:mcs:ErrSchedulingServer"]
Expand Down
67 changes: 64 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

package errs

import "github.com/pingcap/errors"
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
)

const (
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
Expand All @@ -31,6 +36,62 @@ const (
NotServedErr = "is not served"
)

// gRPC errors
var (
// Canceled indicates the operation was canceled (typically by the caller).
ErrStreamClosed = status.Error(codes.Canceled, "stream is closed")

// Unknown error. An example of where this error may be returned is
// if a Status value received from another address space belongs to
// an error-space that is not known in this address space. Also
// errors raised by APIs that do not return enough error information
// may be converted to this error.
ErrUnknown = func(err error) error {
return status.Error(codes.Unknown, err.Error())
}

// DeadlineExceeded means operation expired before completion.
// For operations that change the state of the system, this error may be
// returned even if the operation has completed successfully. For
// example, a successful response from a server could have been delayed
// long enough for the deadline to expire.
ErrForwardTSOTimeout = status.Error(codes.DeadlineExceeded, "forward tso request timeout")
ErrTSOProxyRecvFromClientTimeout = status.Error(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
ErrSendHeartbeatTimeout = status.Error(codes.DeadlineExceeded, "send heartbeat timeout")

// NotFound means some requested entity (e.g., file or directory) was
// not found.
ErrNotFoundTSOAddr = status.Error(codes.NotFound, "not found tso address")
ErrNotFoundSchedulingAddr = status.Error(codes.NotFound, "not found scheduling address")
ErrNotFoundService = status.Error(codes.NotFound, "not found service")

// ResourceExhausted indicates some resource has been exhausted, perhaps
// a per-user quota, or perhaps the entire file system is out of space.
ErrMaxCountTSOProxyRoutinesExceeded = status.Error(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrGRPCRateLimitExceeded = func(err error) error {
return status.Error(codes.ResourceExhausted, err.Error())
}

// FailedPrecondition indicates operation was rejected because the
// system is not in a state required for the operation's execution.
// For example, directory to be deleted may be non-empty, an rmdir
// operation is applied to a non-directory, etc.
ErrMismatchClusterID = func(clusterID, requestClusterID uint64) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, requestClusterID)
}

// Unavailable indicates the service is currently unavailable.
// This is a most likely a transient condition and may be corrected
// by retrying with a backoff. Note that it is not always safe to retry
// non-idempotent operations.
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Error(codes.Unavailable, "not leader")
ErrNotStarted = status.Error(codes.Unavailable, "server not started")
ErrEtcdNotStarted = status.Error(codes.Unavailable, "server is started, but etcd not started")
ErrFollowerHandlingNotAllowed = status.Error(codes.Unavailable, "not leader and follower handling not allowed")
)

// common error in multiple packages
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore"))
Expand Down Expand Up @@ -484,6 +545,6 @@ var (

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
ErrNotFoundSchedulingPrimary = errors.Normalize("cannot find scheduling primary", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingPrimary"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
10 changes: 2 additions & 8 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

var (
// errNotLeader is returned when current server is not the leader.
errNotLeader = status.Errorf(codes.Unavailable, "not leader")
)

var _ meta_storagepb.MetaStorageServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -81,7 +75,7 @@

func (s *Service) checkServing() error {
if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() {
return errNotLeader
return errs.ErrNotLeader

Check warning on line 78 in pkg/mcs/metastorage/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/metastorage/server/grpc_service.go#L78

Added line #L78 was not covered by tests
}
return nil
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,18 @@

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
)

var (
// errNotLeader is returned when current server is not the leader.
errNotLeader = status.Errorf(codes.Unavailable, "not leader")
)

var _ rmpb.ResourceManagerServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -89,7 +83,7 @@

func (s *Service) checkServing() error {
if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() {
return errNotLeader
return errs.ErrNotLeader

Check warning on line 86 in pkg/mcs/resourcemanager/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/grpc_service.go#L86

Added line #L86 was not covered by tests
}
return nil
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -41,12 +39,6 @@
"github.com/tikv/pd/pkg/versioninfo"
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched")
)

// SetUpRestHandler is a hook to sets up the REST service.
var SetUpRestHandler = func(*Service) (http.Handler, apiutil.APIServiceGroup) {
return dummyRestService{}, apiutil.APIServiceGroup{}
Expand Down Expand Up @@ -107,8 +99,8 @@
return errors.WithStack(err)
case <-timer.C:
atomic.StoreInt32(&s.closed, 1)
return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
return errs.ErrSendHeartbeatTimeout
}

Check warning on line 103 in pkg/mcs/scheduling/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/grpc_service.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}

func (s *heartbeatServer) recv() (*schedulingpb.RegionHeartbeatRequest, error) {
Expand Down
21 changes: 6 additions & 15 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,18 @@
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched")
)

var _ tsopb.TSOServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
Expand Down Expand Up @@ -102,14 +95,12 @@
start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
return errs.ErrNotStarted
}
header := request.GetHeader()
clusterID := header.GetClusterId()
if clusterID != keypath.ClusterID() {
return status.Errorf(
codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
keypath.ClusterID(), clusterID)
return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID)
}
keyspaceID := header.GetKeyspaceId()
keyspaceGroupID := header.GetKeyspaceGroupId()
Expand All @@ -119,7 +110,7 @@
keyspaceID, keyspaceGroupID,
count)
if err != nil {
return status.Error(codes.Unknown, err.Error())
return errs.ErrUnknown(err)
}
keyspaceGroupIDStr := strconv.FormatUint(uint64(keyspaceGroupID), 10)
tsoHandleDuration.WithLabelValues(keyspaceGroupIDStr).Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -220,10 +211,10 @@

func (s *Service) validRequest(header *tsopb.RequestHeader) (tsopb.ErrorType, error) {
if s.IsClosed() || s.keyspaceGroupManager == nil {
return tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted
return tsopb.ErrorType_NOT_BOOTSTRAPPED, errs.ErrNotStarted

Check warning on line 214 in pkg/mcs/tso/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/grpc_service.go#L214

Added line #L214 was not covered by tests
}
if header == nil || header.GetClusterId() != keypath.ClusterID() {
return tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched
return tsopb.ErrorType_CLUSTER_MISMATCHED, errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId())

Check warning on line 217 in pkg/mcs/tso/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/grpc_service.go#L217

Added line #L217 was not covered by tests
}
return tsopb.ErrorType_OK, nil
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
Expand Down Expand Up @@ -279,7 +277,7 @@
// TODO: Check if the sender is from the global TSO allocator
func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error {
if s.IsClosed() {
return ErrNotStarted
return errs.ErrNotStarted

Check warning on line 280 in pkg/mcs/tso/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/server.go#L280

Added line #L280 was not covered by tests
}
return nil
}
Expand All @@ -288,11 +286,10 @@
// TODO: Check if the keyspace replica is the primary
func (s *Server) ValidateRequest(header *tsopb.RequestHeader) error {
if s.IsClosed() {
return ErrNotStarted
return errs.ErrNotStarted

Check warning on line 289 in pkg/mcs/tso/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/server.go#L289

Added line #L289 was not covered by tests
}
if header.GetClusterId() != keypath.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
keypath.ClusterID(), header.GetClusterId())
return errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId())

Check warning on line 292 in pkg/mcs/tso/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/server.go#L292

Added line #L292 was not covered by tests
}
return nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

"github.com/docker/go-units"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -208,7 +206,7 @@
}
clusterID := request.GetHeader().GetClusterId()
if clusterID != keypath.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", keypath.ClusterID(), clusterID)
return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID)

Check warning on line 209 in pkg/syncer/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/syncer/server.go#L209

Added line #L209 was not covered by tests
}
log.Info("establish sync region stream",
zap.String("requested-server", request.GetMember().GetName()),
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@
func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName)
if !ok {
return errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
return errs.ErrNotFoundSchedulingPrimary.FastGenByArgs()

Check warning on line 234 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L234

Added line #L234 was not covered by tests
}
var idStr string
if len(id) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@
func (h *confHandler) getSchedulingServerConfig() (*config.Config, error) {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName)
if !ok {
return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
return nil, errs.ErrNotFoundSchedulingPrimary.FastGenByArgs()

Check warning on line 569 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L569

Added line #L569 was not covered by tests
}
url := fmt.Sprintf("%s/scheduling/api/v1/config", addr)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
Expand Down
10 changes: 4 additions & 6 deletions server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -107,7 +105,7 @@
maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings())
if maxConcurrentTSOProxyStreamings >= 0 {
if newCount := s.concurrentTSOProxyStreamings.Add(1); newCount > maxConcurrentTSOProxyStreamings {
return errors.WithStack(ErrMaxCountTSOProxyRoutinesExceeded)
return errors.WithStack(errs.ErrMaxCountTSOProxyRoutinesExceeded)

Check warning on line 108 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L108

Added line #L108 was not covered by tests
}
}

Expand All @@ -132,7 +130,7 @@
}
if request.GetCount() == 0 {
err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
return status.Error(codes.Unknown, err.Error())
return errs.ErrUnknown(err)

Check warning on line 133 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L133

Added line #L133 was not covered by tests
}
forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, server, request, tsDeadlineCh, lastForwardedHost, cancelForward)
if tsoStreamErr != nil {
Expand All @@ -155,7 +153,7 @@
) {
forwardedHost, ok := s.GetServicePrimaryAddr(stream.Context(), constant.TSOServiceName)
if !ok || len(forwardedHost) == 0 {
return forwardCtx, cancelForward, forwardStream, lastForwardedHost, errors.WithStack(ErrNotFoundTSOAddr), nil
return forwardCtx, cancelForward, forwardStream, lastForwardedHost, errors.WithStack(errs.ErrNotFoundTSOAddr), nil
}
if forwardStream == nil || lastForwardedHost != forwardedHost {
if cancelForward != nil {
Expand Down Expand Up @@ -458,7 +456,7 @@
}
forwardedHost, ok = s.GetServicePrimaryAddr(ctx, constant.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
return pdpb.Timestamp{}, errs.ErrNotFoundTSOAddr
}
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
Expand Down
Loading
Loading