Skip to content

Commit

Permalink
kvclient: reset the transport on lease change
Browse files Browse the repository at this point in the history
When we receive a NotLeaseHolderError and change the leaseholder based
on updated lease information, we want to run through the entire
transport again before giving up and returning to sendPartialBatch.
On proxy requests, we need to retry the non-leaseholder replicas with
the updated leaseholder information as they may be able to proxy the
request to the leaseholder.

Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed Mar 22, 2024
1 parent 0fe4c02 commit b5caa16
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
11 changes: 11 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2849,6 +2849,17 @@ func (ds *DistSender) sendToReplicas(
if updatedLeaseholder {
leaseholderUnavailable = false
routeToLeaseholder = true
// If we changed the leaseholder, reset the transport to try all the
// replicas in order again. After a leaseholder change, requests to
// followers will be marked as potential proxy requests and point to
// the new leaseholder. We need to try all the replicas again before
// giving up.
// NB: We reset and retry here because if we release a SendError to
// the caller, it will call Evict and evict the leaseholder
// information we just learned from this error.
// TODO(baptist): If sendPartialBatch didn't evict valid range
// information we would not need to reset the transport here.
transport.Reset()
}
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
Expand Down
25 changes: 18 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5683,21 +5683,32 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {

// We'll send a request that first gets a NLHE, and then a RangeNotFoundError. We
// then expect an updated descriptor to be used and return success.
// Initially the routing is (*1, 2,) - no LH
// 1) Send to n1 -> NLHE with LH=2 (updated - reset), transport -> (*2, 1,) - LH=2
// 2) Send to n2 -> not found, transport -> (2, *1,) - LH=2
// 3) Send to n1 -> NLHE with LH=2 (not updated - backoff), transport -> (1, *2,) - LH=2
// 4) Send to n2 -> not found, transport -> (1, 2, *) - LH=2
// Evict/Refresh transport is now (*3) - no LH
// 5) Send to n3 - success
call := 0
var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
br := &kvpb.BatchResponse{}
switch call {
case 0:
case 0, 2:
expRepl := desc1.Replicas().Descriptors()[0]
require.Equal(t, expRepl, ba.Replica)
br.Error = kvpb.NewError(&kvpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
})
case 1:
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
1,
&desc1,
"store not leaseholder",
))
case 1, 3:
expRep := desc1.Replicas().Descriptors()[1]
require.Equal(t, ba.Replica, expRep)
br.Error = kvpb.NewError(kvpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
case 2:
case 4:
expRep := desc2.Replicas().Descriptors()[0]
require.Equal(t, ba.Replica, expRep)
br = ba.CreateReply()
Expand Down Expand Up @@ -5745,7 +5756,7 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {

_, err := ds.Send(ctx, ba)
require.NoError(t, err.GoError())
require.Equal(t, call, 3)
require.Equal(t, call, 5)
require.Equal(t, rangeLookups, 2)
}

Expand Down

0 comments on commit b5caa16

Please sign in to comment.