Skip to content

Commit

Permalink
Merge branch 'master' into introduce-runner
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Apr 9, 2024
2 parents aa88f60 + ad3387c commit ef574fe
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 21 deletions.
20 changes: 15 additions & 5 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ROOT_PATH := $(shell pwd)/..
GO_TOOLS_BIN_PATH := $(shell pwd)/../.tools/bin
PATH := $(GO_TOOLS_BIN_PATH):$(PATH)
SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)

default: static tidy test

test:
CGO_ENABLE=1 go test ./... -race -cover
test: failpoint-enable
CGO_ENABLE=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; }
$(MAKE) failpoint-disable

basic-test:
CGO_ENABLE=1 go test ./...
basic-test: failpoint-enable
CGO_ENABLE=1 go test ./... || { $(MAKE) failpoint-disable && exit 1; }
$(MAKE) failpoint-disable

ci-test-job:
CGO_ENABLED=1 go test ./... -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client
if [ -f covprofile ]; then rm covprofile; fi
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=covprofile -coverpkg=../...

failpoint-enable:
cd $(ROOT_PATH) && $(MAKE) failpoint-enable

failpoint-disable:
cd $(ROOT_PATH) && $(MAKE) failpoint-disable

install-tools:
cd .. && $(MAKE) install-tools
Expand Down
45 changes: 42 additions & 3 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,31 @@ package retry

import (
"context"
"reflect"
"runtime"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

// withMinLogInterval sets the minimum log interval for retrying.
// Because the retry interval may be not the factor of log interval, so this is the minimum interval.
func withMinLogInterval(interval time.Duration) Option {
return func(bo *Backoffer) {
bo.logInterval = interval
}
}

// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
// base defines the initial time interval to wait before each retry.
Expand All @@ -36,6 +52,10 @@ type Backoffer struct {
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
// nextLogTime is used to record the next log time.
nextLogTime time.Duration

attempt int
next time.Duration
Expand All @@ -50,10 +70,12 @@ func (bo *Backoffer) Exec(
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err := fn()
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
Expand All @@ -63,6 +85,13 @@ func (bo *Backoffer) Exec(
break
}
currentInterval := bo.nextInterval()
bo.nextLogTime += currentInterval
if err != nil {
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
}
if after == nil {
after = time.NewTimer(currentInterval)
} else {
Expand Down Expand Up @@ -93,7 +122,7 @@ func (bo *Backoffer) Exec(
// - `base` defines the initial time interval to wait before each retry.
// - `max` defines the max time interval to wait before each retry.
// - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
func InitialBackoffer(base, max, total time.Duration) *Backoffer {
func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer {
// Make sure the base is less than or equal to the max.
if base > max {
base = max
Expand All @@ -102,7 +131,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
if total > 0 && total < base {
total = base
}
return &Backoffer{
bo := &Backoffer{
base: base,
max: max,
total: total,
Expand All @@ -113,6 +142,10 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
currentTotal: 0,
attempt: 0,
}
for _, opt := range opts {
opt(bo)
}
return bo
}

// SetRetryableChecker sets the retryable checker.
Expand Down Expand Up @@ -152,6 +185,7 @@ func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
bo.attempt = 0
bo.nextLogTime = 0
}

// Only used for test.
Expand All @@ -161,3 +195,8 @@ var testBackOffExecuteFlag = false
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}

func getFunctionName(f any) string {
strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")
return strings.Split(strs[len(strs)-1], "-")[0]
}
95 changes: 95 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package retry

import (
"bytes"
"context"
"errors"
"testing"
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestBackoffer(t *testing.T) {
Expand Down Expand Up @@ -107,3 +110,95 @@ func TestBackoffer(t *testing.T) {
func isBackofferReset(bo *Backoffer) bool {
return bo.next == bo.base && bo.currentTotal == 0
}

func TestBackofferWithLog(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true}
lg := newZapTestLogger(conf)
log.ReplaceGlobals(lg.Logger, nil)

bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100))
err := bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms := lg.Messages()
len1 := len(ms)
// 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times.
re.Len(ms, 10)
// 10 + 20 + 40 + 80 + 100 * 9, 13 times retry.
rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
// 10 + 20 + 40 + 80(log), 4 times retry.
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

bo.resetBackoff()
err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms = lg.Messages()
re.Len(ms, 20)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[len1], rfc)
}

var errTest = errors.New("test")

func testFn() error {
return errTest
}

// testingWriter is a WriteSyncer that writes the the messages.
type testingWriter struct {
messages []string
}

func newTestingWriter() *testingWriter {
return &testingWriter{}
}

func (w *testingWriter) Write(p []byte) (n int, err error) {
n = len(p)
p = bytes.TrimRight(p, "\n")
m := string(p)
w.messages = append(w.messages, m)
return n, nil
}
func (w *testingWriter) Sync() error {
return nil
}

type verifyLogger struct {
*zap.Logger
w *testingWriter
}

func (logger *verifyLogger) Message() string {
if logger.w.messages == nil {
return ""
}
return logger.w.messages[len(logger.w.messages)-1]
}

func (logger *verifyLogger) Messages() []string {
if logger.w.messages == nil {
return nil
}
return logger.w.messages
}

func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger {
// TestingWriter is used to write to memory.
// Used in the verify logger.
writer := newTestingWriter()
lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...)
return verifyLogger{
Logger: lg,
w: writer,
}
}
14 changes: 7 additions & 7 deletions scripts/ci-subtask.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

# ./ci-subtask.sh <TOTAL_TASK_N> <TASK_INDEX>

ROOT_PATH=../../
ROOT_PATH_COV=$(pwd)/covprofile

if [[ $2 -gt 9 ]]; then
# run tools tests
if [[ $2 -eq 10 ]]; then
cd ./tools && make ci-test-job && cd .. && cat ./covprofile >> covprofile || exit 1
cd ./tools && make ci-test-job && cat covprofile >> $ROOT_PATH_COV || exit 1
exit
fi

Expand All @@ -16,12 +16,12 @@ if [[ $2 -gt 9 ]]; then
integrations_tasks=($(find "$integrations_dir" -mindepth 1 -maxdepth 1 -type d))
for t in "${integrations_tasks[@]}"; do
if [[ "$t" = "$integrations_dir/client" && $2 -eq 11 ]]; then
cd ./client && make ci-test-job && cd .. && cat ./covprofile >> covprofile || exit 1
cd $integrations_dir && make ci-test-job test_name=client && cat ./client/covprofile >> "$ROOT_PATH/covprofile" || exit 1
cd ./client && make ci-test-job && cat covprofile >> $ROOT_PATH_COV && cd .. || exit 1
cd $integrations_dir && make ci-test-job test_name=client && cat ./client/covprofile >> $ROOT_PATH_COV || exit 1
elif [[ "$t" = "$integrations_dir/tso" && $2 -eq 12 ]]; then
cd $integrations_dir && make ci-test-job test_name=tso && cat ./tso/covprofile >> "$ROOT_PATH/covprofile" || exit 1
cd $integrations_dir && make ci-test-job test_name=tso && cat ./tso/covprofile >> $ROOT_PATH_COV || exit 1
elif [[ "$t" = "$integrations_dir/mcs" && $2 -eq 13 ]]; then
cd $integrations_dir && make ci-test-job test_name=mcs && cat ./mcs/covprofile >> "$ROOT_PATH/covprofile" || exit 1
cd $integrations_dir && make ci-test-job test_name=mcs && cat ./mcs/covprofile >> $ROOT_PATH_COV || exit 1
fi
done
else
Expand Down Expand Up @@ -61,5 +61,5 @@ else
[[ $(($min_i + 1)) -eq $2 ]] && res+=($t)
done

CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=./... ${res[@]}
CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -covermode=atomic -coverprofile=$ROOT_PATH_COV -coverpkg=./... ${res[@]}
fi
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,7 @@ func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string
switch action {
case removingAction:
progressName = encodeRemovingProgressKey(storeID)
opts = []progress.Option{progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())}
opts = []progress.Option{progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())}
case preparingAction:
progressName = encodePreparingProgressKey(storeID)
}
Expand Down
5 changes: 3 additions & 2 deletions tests/integrations/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ROOT_PATH := ../..
ROOT_PATH := $(shell pwd)/../..
GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin
PATH := $(GO_TOOLS_BIN_PATH):$(PATH)
SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)
Expand All @@ -35,7 +35,8 @@ test: failpoint-enable
$(MAKE) failpoint-disable

ci-test-job:
CGO_ENABLED=1 go test ./$(value test_name)/... -v -tags deadlock -race -covermode=atomic -coverprofile=./$(value test_name)/covprofile -coverpkg=$(ROOT_PATH)/...
if [ -f covprofile ]; then rm ./$(value test_name)/covprofile; fi
CGO_ENABLED=1 go test ./$(value test_name)/... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=./$(value test_name)/covprofile -coverpkg=$(ROOT_PATH)/...

install-tools:
cd $(ROOT_PATH) && $(MAKE) install-tools
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/realcluster/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ROOT_PATH := ../../..
ROOT_PATH := $(shell pwd)/../../..
GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin
PATH := $(GO_TOOLS_BIN_PATH):$(PATH)
SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)
Expand Down
8 changes: 6 additions & 2 deletions tools/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ROOT_PATH := ..
ROOT_PATH := $(shell pwd)/..
GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin
PATH := $(GO_TOOLS_BIN_PATH):$(PATH)
SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)
Expand All @@ -33,10 +33,14 @@ tidy:
git diff go.mod go.sum | cat
git diff --quiet go.mod go.sum

ci-test-job: failpoint-enable
test: failpoint-enable
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; }
$(MAKE) failpoint-disable

ci-test-job:
if [ -f covprofile ]; then rm covprofile; fi
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=covprofile -coverpkg=../...

failpoint-enable:
cd $(ROOT_PATH) && $(MAKE) failpoint-enable

Expand Down

0 comments on commit ef574fe

Please sign in to comment.