diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 7bf8008b874d..0107e68336b7 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -905,6 +905,7 @@
APPLICATION | distsender.rpc.err.notleaseholdererrtype | Number of NotLeaseHolderErrType errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas as part of executing possibly range-spanning requests. Failures to reach the target replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified errors as 'roachpb.InternalErrType'.
| Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | distsender.rpc.err.oprequirestxnerrtype | Number of OpRequiresTxnErrType errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas as part of executing possibly range-spanning requests. Failures to reach the target replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified errors as 'roachpb.InternalErrType'.
| Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | distsender.rpc.err.optimisticevalconflictserrtype | Number of OptimisticEvalConflictsErrType errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas as part of executing possibly range-spanning requests. Failures to reach the target replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified errors as 'roachpb.InternalErrType'.
| Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | distsender.rpc.err.proxyfailederrtype | Number of ProxyFailedErrType errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas as part of executing possibly range-spanning requests. Failures to reach the target replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified errors as 'roachpb.InternalErrType'.
| Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | distsender.rpc.err.raftgroupdeletederrtype | Number of RaftGroupDeletedErrType errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas as part of executing possibly range-spanning requests. Failures to reach the target replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified errors as 'roachpb.InternalErrType'.
| Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | distsender.rpc.err.rangefeedretryerrtype | Number of RangeFeedRetryErrType errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas as part of executing possibly range-spanning requests. Failures to reach the target replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified errors as 'roachpb.InternalErrType'.
| Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | distsender.rpc.err.rangekeymismatcherrtype | Number of RangeKeyMismatchErrType errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas as part of executing possibly range-spanning requests. Failures to reach the target replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified errors as 'roachpb.InternalErrType'.
| Errors | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go
index 741fe909c7c2..cc74f73f393b 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender.go
@@ -352,6 +352,13 @@ var sortByLocalityFirst = settings.RegisterBoolSetting(
true,
)
+var ProxyBatchRequest = settings.RegisterBoolSetting(
+ settings.ApplicationLevel,
+ "kv.dist_sender.proxy.enabled",
+ "when true, proxy batch requests that can't be routed directly to the leaseholder",
+ true,
+)
+
// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
@@ -1974,6 +1981,18 @@ func (ds *DistSender) sendPartialBatch(
attempts++
pErr = nil
// If we've invalidated the descriptor on a send failure, re-lookup.
+
+ // On a proxy request, update our routing information with what the
+ // client sent us if the client had newer information. We have already
+ // validated the client request against our local replica state in
+ // node.go and reject requests with stale information. Here we ensure
+ // our RangeCache has the same information as both the client request
+ // and our local replica before attempting the request. If the sync
+ // makes our token invalid, we handle it similarly to a RangeNotFound or
+ // NotLeaseHolderError from a remote server.
+ if ba.ProxyRangeInfo != nil {
+ routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc)
+ }
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
@@ -2047,6 +2066,11 @@ func (ds *DistSender) sendPartialBatch(
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
pErr = kvpb.NewError(err)
+ // Proxy requests are not retried since we not the originator.
+ if ba.ProxyRangeInfo != nil {
+ log.VEventf(ctx, 1, "failing proxy request after error %s", err)
+ break
+ }
switch {
case IsSendError(err):
// We've tried all the replicas without success. Either they're all
@@ -2340,7 +2364,7 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS
// AmbiguousResultError. Of those two, the latter has to be passed back to the
// client, while the former should be handled by retrying with an updated range
// descriptor. This method handles other errors returned from replicas
-// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls
+// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
//
// routing dictates what replicas will be tried (but not necessarily their
@@ -2385,15 +2409,62 @@ func (ds *DistSender) sendToReplicas(
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
}
desc := routing.Desc()
- leaseholder := routing.Leaseholder()
- replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter)
+ replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, routing.Leaseholder(), replicaFilter)
if err != nil {
return nil, err
}
+ // This client requested we proxy this request. Only proxy if we can
+ // determine the leaseholder and it agrees with the ProxyRangeInfo from
+ // the client. We don't support a proxy request to a non-leaseholder
+ // replica. If we decide to proxy this request, we will reduce our replica
+ // list to only the requested replica. If we fail on that request we fail back
+ // to the caller so they can try something else.
+ if ba.ProxyRangeInfo != nil {
+ log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo)
+ // We don't know who the leaseholder is, and it is likely that the
+ // client had stale information. Return our information to them through
+ // a NLHE and let them retry.
+ if routing.Lease().Empty() {
+ log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
+ br := kvpb.BatchResponse{}
+ br.Error = kvpb.NewError(
+ kvpb.NewNotLeaseHolderError(roachpb.Lease{},
+ 0, /* proposerStoreID */
+ routing.Desc(),
+ "client requested a proxy but we can't figure out the leaseholder"),
+ )
+ return &br, nil
+ }
+ if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
+ ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
+ log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
+ br := kvpb.BatchResponse{}
+ br.Error = kvpb.NewError(
+ kvpb.NewNotLeaseHolderError(
+ *routing.Lease(),
+ 0, /* proposerStoreID */
+ routing.Desc(),
+ fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)),
+ )
+ return &br, nil
+ }
+
+ // On a proxy request, we only send the request to the leaseholder. If we
+ // are here then the client and server agree on the routing information, so
+ // use the leaseholder as our only replica to send to.
+ idx := replicas.Find(routing.Leaseholder().ReplicaID)
+ // This should never happen. We validated the routing above and the token
+ // is still valid.
+ if idx == -1 {
+ return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder())
+ }
+ replicas = replicas[idx : idx+1]
+ log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
+ }
// Rearrange the replicas so that they're ordered according to the routing
// policy.
- var leaseholderFirst bool
+ var routeToLeaseholder bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
@@ -2403,12 +2474,12 @@ func (ds *DistSender) sendToReplicas(
}
idx := -1
- if leaseholder != nil {
- idx = replicas.Find(leaseholder.ReplicaID)
+ if routing.Leaseholder() != nil {
+ idx = replicas.Find(routing.Leaseholder().ReplicaID)
}
if idx != -1 {
replicas.MoveToFront(idx)
- leaseholderFirst = true
+ routeToLeaseholder = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
@@ -2546,6 +2617,44 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))
+ // Determine whether we should proxy this request through a follower to
+ // the leaseholder. The primary condition for proxying is that we would
+ // like to send the request to the leaseholder, but the transport is
+ // about to send the request through a follower.
+ requestToSend := ba
+ if !ProxyBatchRequest.Get(&ds.st.SV) {
+ // The setting is disabled, so we don't proxy this request.
+ } else if ba.ProxyRangeInfo != nil {
+ // Clear out the proxy information to prevent the recipient from
+ // sending this request onwards. This is necessary to prevent proxy
+ // chaining. We want the recipient to process the request or fail
+ // immediately. This is an extra safety measure to prevent any types
+ // of routing loops.
+ requestToSend = ba.ShallowCopy()
+ requestToSend.ProxyRangeInfo = nil
+ } else if !routeToLeaseholder {
+ // This request isn't intended for the leaseholder so we don't proxy it.
+ } else if routing.Leaseholder() == nil {
+ // NB: Normally we don't have both routeToLeaseholder and a nil
+ // leaseholder. This could be changed to an assertion.
+ log.Errorf(ctx, "attempting %v to route to leaseholder, but the leaseholder is unknown %v", ba, routing)
+ } else if ba.Replica.NodeID == routing.Leaseholder().NodeID {
+ // We are sending this request to the leaseholder, so it doesn't
+ // make sense to attempt to proxy it.
+ } else if ds.nodeIDGetter() == ba.Replica.NodeID {
+ // This condition prevents proxying a request if we are the same
+ // node as the final destination. Without this we would pass through
+ // the same DistSender stack again which is pointless.
+ } else {
+ // We passed all the conditions above and want to attempt to proxy
+ // this request. We need to copy it as we are going to modify the
+ // Header. For other replicas we may not end up setting the header.
+ requestToSend = ba.ShallowCopy()
+ rangeInfo := routing.RangeInfo()
+ requestToSend.ProxyRangeInfo = &rangeInfo
+ log.VEventf(ctx, 1, "attempt proxy request %v using %v", requestToSend, rangeInfo)
+ }
+
tBegin := timeutil.Now() // for slow log message
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
@@ -2554,7 +2663,7 @@ func (ds *DistSender) sendToReplicas(
err = cbErr
transport.SkipReplica()
} else {
- br, err = transport.SendNext(sendCtx, ba)
+ br, err = transport.SendNext(sendCtx, requestToSend)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())
@@ -2570,6 +2679,18 @@ func (ds *DistSender) sendToReplicas(
}
}
}
+ if err == nil {
+ if proxyErr, ok := br.Error.GetDetail().(*kvpb.ProxyFailedError); ok {
+ // The server proxy attempt resulted in a non-BatchRequest error, likely
+ // a communication error. Convert the wrapped error into an error on our
+ // side Depending on the type of request and what the error is we may
+ // treat the error an ambiguous error. Clear out the BatchResponse as
+ // the only information it contained was this error.
+ err = proxyErr.Unwrap()
+ br = nil
+ log.VEventf(ctx, 2, "proxy send error: %s", err)
+ }
+ }
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
@@ -2635,21 +2756,10 @@ func (ds *DistSender) sendToReplicas(
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
- } else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
- // If we get a gRPC error against the leaseholder, we don't want to
- // backoff and keep trying the same leaseholder against the leaseholder.
- // TODO(baptist): This should not be in an else block. Ideally
- // we set leaseholderUnavailable to true even if there is an
- // ambiguous error as it should be set independent of an
- // ambiguous error. TestTransactionUnexpectedlyCommitted test
- // fails otherwise. That test is expecting us to retry against
- // the leaseholder after we received a gRPC error to validate
- // that it now returns a WriteTooOld error. Once the proxy code
- // is in place, this can be moved back out of the if block. In
- // practice the only impact of having this in the else block is
- // that we will retry more times against a leaseholder before
- // moving on to the other replicas. There is not an easy way to
- // modify the test without this being in the else block.
+ }
+ // If we get a gRPC error against the leaseholder, we don't want to
+ // backoff and keep trying the request against the same leaseholder.
+ if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
leaseholderUnavailable = true
}
} else {
@@ -2816,6 +2926,18 @@ func (ds *DistSender) sendToReplicas(
// error too aggressively.
if updatedLeaseholder {
leaseholderUnavailable = false
+ routeToLeaseholder = true
+ // If we changed the leaseholder, reset the transport to try all the
+ // replicas in order again. After a leaseholder change, requests to
+ // followers will be marked as potential proxy requests and point to
+ // the new leaseholder. We need to try all the replicas again before
+ // giving up.
+ // NB: We reset and retry here because if we release a SendError to
+ // the caller, it will call Evict and evict the leaseholder
+ // information we just learned from this error.
+ // TODO(baptist): If sendPartialBatch didn't evict valid range
+ // information we would not need to reset the transport here.
+ transport.Reset()
}
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
@@ -2851,7 +2973,7 @@ func (ds *DistSender) sendToReplicas(
// have a sufficient closed timestamp. In response, we should
// immediately redirect to the leaseholder, without a backoff
// period.
- intentionallySentToFollower := first && !leaseholderFirst
+ intentionallySentToFollower := first && !routeToLeaseholder
// See if we want to backoff a little before the next attempt. If
// the lease info we got is stale and we were intending to send to
// the leaseholder, we backoff because it might be the case that
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
index ddf0ed94e0b1..cbfabd0fff64 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
@@ -4694,10 +4694,8 @@ func TestPartialPartition(t *testing.T) {
for _, test := range testCases {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers),
func(t *testing.T) {
- if test.useProxy {
- skip.WithIssue(t, 93503)
- }
st := cluster.MakeTestingClusterSettings()
+ kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy)
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go
index 262ccbc014bd..abe49c01526b 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go
@@ -5683,21 +5683,32 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {
// We'll send a request that first gets a NLHE, and then a RangeNotFoundError. We
// then expect an updated descriptor to be used and return success.
+ // Initially the routing is (*1, 2,) - no LH
+ // 1) Send to n1 -> NLHE with LH=2 (updated - reset), transport -> (*2, 1,) - LH=2
+ // 2) Send to n2 -> not found, transport -> (2, *1,) - LH=2
+ // 3) Send to n1 -> NLHE with LH=2 (not updated - backoff), transport -> (1, *2,) - LH=2
+ // 4) Send to n2 -> not found, transport -> (1, 2, *) - LH=2
+ // Evict/Refresh transport is now (*3) - no LH
+ // 5) Send to n3 - success
call := 0
var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
br := &kvpb.BatchResponse{}
switch call {
- case 0:
+ case 0, 2:
expRepl := desc1.Replicas().Descriptors()[0]
require.Equal(t, expRepl, ba.Replica)
- br.Error = kvpb.NewError(&kvpb.NotLeaseHolderError{
- Lease: &roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
- })
- case 1:
+ br.Error = kvpb.NewError(
+ kvpb.NewNotLeaseHolderError(
+ roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
+ 1,
+ &desc1,
+ "store not leaseholder",
+ ))
+ case 1, 3:
expRep := desc1.Replicas().Descriptors()[1]
require.Equal(t, ba.Replica, expRep)
br.Error = kvpb.NewError(kvpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
- case 2:
+ case 4:
expRep := desc2.Replicas().Descriptors()[0]
require.Equal(t, ba.Replica, expRep)
br = ba.CreateReply()
@@ -5745,7 +5756,7 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {
_, err := ds.Send(ctx, ba)
require.NoError(t, err.GoError())
- require.Equal(t, call, 3)
+ require.Equal(t, call, 5)
require.Equal(t, rangeLookups, 2)
}
diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto
index e38ee96b1998..9906dd49cec3 100644
--- a/pkg/kv/kvpb/api.proto
+++ b/pkg/kv/kvpb/api.proto
@@ -2941,9 +2941,29 @@ message Header {
// in a mixed-version state after a new value got added), DEFAULT is assumed.
cockroach.rpc.ConnectionClass connection_class = 33;
+ // ProxyRangeInfo, if set, indicates the RangeInfo information a client had
+ // when it sent this request. This information is passed to an intermediate
+ // proxy node which can then forward the request to the final destination.
+ // If this field is set, then the node which receives the BatchRequest should
+ // first try and evaluate the request locally. If local evaluation fails with
+ // a NotLeaseHolderError, it should act on behalf of the original client and
+ // forward the original request, with this field cleared, to the leaseholder
+ // as determined by the ProxyRangeInfo header.
+ //
+ // If the kv.dist_sender.proxy.enabled parameter is set to false, the client
+ // will never set this header and proxying will be disabled.
+ //
+ // NB: This field is only used in 24.1+ systems. A mixed mode system with at
+ // least one node not on 24.1 will exhibit the following behavior.
+ // * A client older than 24.1 will never set this field.
+ // * A proxy node older than 24.1 will ignore this field being set and return an
+ // error after local evaluation like it does in older systems.
+ // * A destination node older than 24.1 will not see this field.
+ RangeInfo proxy_range_info = 34;
+
reserved 7, 10, 12, 14, 20;
- // Next ID: 34
+ // Next ID: 35
}
// BoundedStalenessHeader contains configuration values pertaining to bounded
diff --git a/pkg/kv/kvpb/errordetailtype_string.go b/pkg/kv/kvpb/errordetailtype_string.go
index 6abcea8fb453..f0d593236b49 100644
--- a/pkg/kv/kvpb/errordetailtype_string.go
+++ b/pkg/kv/kvpb/errordetailtype_string.go
@@ -43,6 +43,7 @@ func _() {
_ = x[MVCCHistoryMutationErrType-44]
_ = x[LockConflictErrType-45]
_ = x[ReplicaUnavailableErrType-46]
+ _ = x[ProxyFailedErrType-47]
_ = x[CommunicationErrType-22]
_ = x[InternalErrType-25]
}
@@ -119,6 +120,8 @@ func (i ErrorDetailType) String() string {
return "LockConflictErrType"
case ReplicaUnavailableErrType:
return "ReplicaUnavailableErrType"
+ case ProxyFailedErrType:
+ return "ProxyFailedErrType"
case CommunicationErrType:
return "CommunicationErrType"
case InternalErrType:
diff --git a/pkg/kv/kvpb/errors.go b/pkg/kv/kvpb/errors.go
index 2bbfc798658d..7b609f938027 100644
--- a/pkg/kv/kvpb/errors.go
+++ b/pkg/kv/kvpb/errors.go
@@ -299,6 +299,7 @@ const (
MVCCHistoryMutationErrType ErrorDetailType = 44
LockConflictErrType ErrorDetailType = 45
ReplicaUnavailableErrType ErrorDetailType = 46
+ ProxyFailedErrType ErrorDetailType = 47
// When adding new error types, don't forget to update NumErrors below.
// CommunicationErrType indicates a gRPC error; this is not an ErrorDetail.
@@ -308,7 +309,7 @@ const (
// detail. The value 25 is chosen because it's reserved in the errors proto.
InternalErrType ErrorDetailType = 25
- NumErrors int = 47
+ NumErrors int = 48
)
// Register the migration of all errors that used to be in the roachpb package
@@ -1711,6 +1712,56 @@ func (e *ReplicaUnavailableError) Type() ErrorDetailType {
var _ ErrorDetailInterface = &ReplicaUnavailableError{}
+// Type is part of the ErrorDetailInterface.
+func (e *ProxyFailedError) Type() ErrorDetailType {
+ return ProxyFailedErrType
+}
+
+// Error is part of the builtin err interface
+func (e *ProxyFailedError) Error() string {
+ return redact.Sprint(e).StripMarkers()
+}
+
+// Format implements fmt.Formatter.
+func (e *ProxyFailedError) Format(s fmt.State, verb rune) { errors.FormatError(e, s, verb) }
+
+// SafeFormatError is part of the SafeFormatter
+func (e *ProxyFailedError) SafeFormatError(p errors.Printer) (next error) {
+ p.Printf("proxy failed with send error")
+ return nil
+}
+
+// Unwrap implements errors.Wrapper.
+func (e *ProxyFailedError) Unwrap() error {
+ return errors.DecodeError(context.Background(), e.Cause)
+}
+
+// NewProxyFailedError returns an ProxyFailedError wrapping (via
+// errors.Wrapper) the supplied error.
+func NewProxyFailedError(err error) *ProxyFailedError {
+ return &ProxyFailedError{
+ Cause: errors.EncodeError(context.Background(), err),
+ }
+}
+
+var _ ErrorDetailInterface = &ProxyFailedError{}
+var _ errors.SafeFormatter = (*ProxyFailedError)(nil)
+var _ fmt.Formatter = (*ProxyFailedError)(nil)
+var _ errors.Wrapper = (*ProxyFailedError)(nil)
+
+func init() {
+ encode := func(ctx context.Context, err error) (msgPrefix string, safeDetails []string, payload proto.Message) {
+ errors.As(err, &payload) // payload = err.(proto.Message)
+ return "", nil, payload
+ }
+ decode := func(ctx context.Context, cause error, msgPrefix string, safeDetails []string, payload proto.Message) error {
+ return payload.(*ProxyFailedError)
+ }
+ typeName := errors.GetTypeKey((*ProxyFailedError)(nil))
+ errors.RegisterWrapperEncoder(typeName, encode)
+ errors.RegisterWrapperDecoder(typeName, decode)
+}
+
func init() {
errors.RegisterLeafDecoder(errors.GetTypeKey((*MissingRecordError)(nil)), func(_ context.Context, _ string, _ []string, _ proto.Message) error {
return &MissingRecordError{}
@@ -1755,3 +1806,4 @@ var _ errors.SafeFormatter = &RefreshFailedError{}
var _ errors.SafeFormatter = &MVCCHistoryMutationError{}
var _ errors.SafeFormatter = &UnhandledRetryableError{}
var _ errors.SafeFormatter = &ReplicaUnavailableError{}
+var _ errors.SafeFormatter = &ProxyFailedError{}
diff --git a/pkg/kv/kvpb/errors.proto b/pkg/kv/kvpb/errors.proto
index 38843abf5963..5c910ed96d79 100644
--- a/pkg/kv/kvpb/errors.proto
+++ b/pkg/kv/kvpb/errors.proto
@@ -418,6 +418,17 @@ message AmbiguousResultError {
reserved 2;
}
+// A ProxyFailedError is used to transmit a send error over the wire between a
+// proxy node and its final destination. The originator needs to handle
+// different types of SendErrors differently depending on both the type of the
+// error (specifically whether the error happened after the request started but
+// before we received a response) and the type of request (specifically whether
+// the request included a commit).
+message ProxyFailedError {
+ // The error that caused the proxy failure.
+ optional errorspb.EncodedError cause = 1 [(gogoproto.nullable) = false];
+}
+
message ReplicaUnavailableError {
optional roachpb.RangeDescriptor desc = 2 [(gogoproto.nullable) = false];
diff --git a/pkg/kv/kvpb/errors_test.go b/pkg/kv/kvpb/errors_test.go
index 08878073ccd4..598c6bffca02 100644
--- a/pkg/kv/kvpb/errors_test.go
+++ b/pkg/kv/kvpb/errors_test.go
@@ -15,6 +15,7 @@ import (
"context"
"fmt"
"io"
+ "reflect"
"strings"
"testing"
@@ -421,3 +422,28 @@ func TestDescNotFoundError(t *testing.T) {
require.True(t, errors.HasType(err, &DescNotFoundError{}))
})
}
+
+// TestProxyFailedError validates that ProxyFailedErrors can be cleanly encoded
+// and decoded with an internal error.
+func TestProxyFailedError(t *testing.T) {
+ ctx := context.Background()
+ fooErr := errors.New("foo")
+ err := NewProxyFailedError(fooErr)
+ require.Equal(t, `proxy failed with send error`, err.Error())
+ require.True(t, errors.HasType(err, &ProxyFailedError{}))
+ decodedErr := errors.DecodeError(ctx, errors.EncodeError(ctx, err))
+
+ require.Truef(t, errors.HasType(decodedErr, &ProxyFailedError{}), "wrong error %v %v", decodedErr, reflect.TypeOf(decodedErr))
+ require.True(t, errors.Is(decodedErr, fooErr))
+ require.Equal(t, `proxy failed with send error`, decodedErr.Error())
+
+ var rue *ProxyFailedError
+ require.True(t, errors.As(decodedErr, &rue))
+
+ internalErr := errors.DecodeError(context.Background(), rue.Cause)
+ require.True(t, rue.Cause.IsSet())
+ require.ErrorContains(t, internalErr, "foo")
+ require.True(t, errors.Is(internalErr, fooErr))
+
+ require.Equal(t, `foo`, string(redact.Sprint(internalErr).Redact()))
+}
diff --git a/pkg/server/node.go b/pkg/server/node.go
index 8c008d4b63c4..0a1037e11822 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -1410,9 +1410,34 @@ func (n *Node) batchInternal(
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
writeBytes.Release()
}()
+
+ // If a proxy attempt is requested, we copy the request to prevent evaluation
+ // from modifying the request. There are places on the server that can modify
+ // the request, and we can't keep these modifications if we later proxy it.
+ // Note we only ShallowCopy, so care must be taken with internal changes.
+ // For reference some of the places are:
+ // SetActiveTimestamp - sets the Header.Timestamp
+ // maybeStripInFlightWrites - can modify internal EndTxn requests
+ // tryBumpBatchTimestamp - can modify the txn.ReadTimestamp
+ // TODO(baptist): Other code copies the BatchRequest, in some cases
+ // unnecessarily, to prevent modifying the passed in request. We should clean
+ // up the contract of the Send method to allow modifying the request or more
+ // strictly enforce that the callee is not allowed to change it.
+ var originalRequest *kvpb.BatchRequest
+ if args.ProxyRangeInfo != nil {
+ originalRequest = args.ShallowCopy()
+ }
var pErr *kvpb.Error
br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, args)
if pErr != nil {
+ if originalRequest != nil {
+ if proxyResponse := n.maybeProxyRequest(ctx, originalRequest, pErr); proxyResponse != nil {
+ // If the proxy request succeeded then return its result instead of
+ // our error. If not, use our original error.
+ return proxyResponse, nil
+ }
+ }
+
br = &kvpb.BatchResponse{}
if pErr.Index != nil && keyvissettings.Enabled.Get(&n.storeCfg.Settings.SV) {
// Tell the SpanStatsCollector about the requests in this BatchRequest,
@@ -1466,6 +1491,58 @@ func (n *Node) batchInternal(
return br, nil
}
+// maybeProxyRequest is called after the server returned an error and it
+// attempts to proxy the request if it can. We attempt o proxy requests if two
+// primary conditions are met:
+// 1) The ProxyRangeInfo header is set on the request indicating the client
+// would like us to proxy this request if we can't evaluate it.
+// 2) Local evaluation has resulted in a NotLeaseHolderError which matches the
+// ProxyRangeInfo from the client.
+// If these conditions are met, attempt to send the request through our local
+// DistSender stack and use that result instead of our error.
+func (n *Node) maybeProxyRequest(
+ ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error,
+) *kvpb.BatchResponse {
+ // NB: We don't handle StoreNotFound or RangeNotFound errors. If we want to
+ // support proxy requests through non-replicas we could proxy those errors
+ // as well.
+ var nlhe *kvpb.NotLeaseHolderError
+ if ok := errors.As(pErr.GetDetail(), &nlhe); !ok {
+ log.VEventf(ctx, 2, "non-proxyable errors %v", pErr)
+ return nil
+ }
+ // If because we think the client has
+ // stale information, see if our information would update the clients
+ // state. If so, rather than proxying this request, fail back to the
+ // client first.
+ leaseCompatible := nlhe.Lease != nil && ba.ProxyRangeInfo.Lease.Sequence >= nlhe.Lease.Sequence
+ descCompatible := ba.ProxyRangeInfo.Desc.Generation >= nlhe.RangeDesc.Generation
+ if !leaseCompatible || !descCompatible {
+ log.VEventf(
+ ctx,
+ 2,
+ "out-of-date client information on proxy request %v != %v",
+ ba.ProxyRangeInfo,
+ pErr,
+ )
+ return nil
+ }
+
+ log.VEventf(ctx, 2, "proxy request for %v after local error %v", ba, pErr)
+ // TODO(baptist): Correctly set up the span / tracing.
+ br, pErr := n.proxySender.Send(ctx, ba)
+ if pErr == nil {
+ log.VEvent(ctx, 2, "proxy succeeded")
+ return br
+ }
+ // Wrap the error in a ProxyFailedError. It is unwrapped on the client side
+ // and handled there.
+ log.VEventf(ctx, 2, "proxy attempt resulted in error %v", pErr)
+ br = &kvpb.BatchResponse{}
+ br.Error = kvpb.NewError(kvpb.NewProxyFailedError(pErr.GoError()))
+ return br
+}
+
// getLocalityComparison takes gatewayNodeID as input and returns the locality
// comparison result between the gateway node and the current node. This result
// indicates whether the two nodes are located in different regions or zones.