Skip to content

Commit

Permalink
apiutil/middleware: add retry logic for obtaining PD leader in redire…
Browse files Browse the repository at this point in the history
…ctor (tikv#8216)

close tikv#8142

Add retry logic to improve PD HTTP request forwarding success rate during PD leader switch.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored May 27, 2024
1 parent dd7f2a7 commit beb91c1
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
52 changes: 44 additions & 8 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
Expand Down Expand Up @@ -204,20 +206,19 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
clientUrls = append(clientUrls, targetAddr)
// Add a header to the response, it is used to mark whether the request has been forwarded to the micro service.
w.Header().Add(apiutil.XForwardedToMicroServiceHeader, "true")
} else {
leader := h.s.GetMember().GetLeader()
} else if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) == 0 {
leader := h.waitForLeader(r)
if leader == nil {
http.Error(w, "no leader", http.StatusServiceUnavailable)
return
}
clientUrls = leader.GetClientUrls()
// Prevent more than one redirection among PD/API servers.
if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/API servers.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}

urls := make([]url.URL, 0, len(clientUrls))
Expand All @@ -233,3 +234,38 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
client := h.s.GetHTTPClient()
apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(w, r)
}

const (
backoffMaxDelay = 3 * time.Second
backoffInterval = 100 * time.Millisecond
)

// If current server does not have a leader, backoff to increase the chance of success.
func (h *redirector) waitForLeader(r *http.Request) (leader *pdpb.Member) {
var (
interval = backoffInterval
maxDelay = backoffMaxDelay
curDelay = time.Duration(0)
)
for {
leader = h.s.GetMember().GetLeader()
if leader != nil {
return
}
select {
case <-time.After(interval):
curDelay += interval
if curDelay >= maxDelay {
return
}
interval *= 2
if curDelay+interval > maxDelay {
interval = maxDelay - curDelay
}
case <-r.Context().Done():
return
case <-h.s.LoopContext().Done():
return
}
}
}
18 changes: 18 additions & 0 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,24 @@ func (suite *redirectorTestSuite) TestRedirect() {
re.Equal(h, header)
}
}
// Test redirect during leader election.
leader = suite.cluster.GetLeaderServer()
re.NotNil(leader)
err := leader.ResignLeader()
re.NoError(err)
for _, svr := range suite.cluster.GetServers() {
url := fmt.Sprintf("%s/pd/api/v1/version", svr.GetServer().GetAddr())
testutil.Eventually(re, func() bool {
resp, err := tests.TestDialClient.Get(url)
re.NoError(err)
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
re.NoError(err)
// Should not meet 503 since the retry logic ensure the request is sent to the new leader eventually.
re.NotEqual(http.StatusServiceUnavailable, resp.StatusCode)
return resp.StatusCode == http.StatusOK
})
}
}

func (suite *redirectorTestSuite) TestAllowFollowerHandle() {
Expand Down

0 comments on commit beb91c1

Please sign in to comment.