Skip to content

Commit

Permalink
openAPI (ticdc): fix openapi use old addrs to create pdClient (#9713) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 20, 2023
1 parent 2b59186 commit 6a31202
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 36 deletions.
67 changes: 38 additions & 29 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package v2

import (
"context"
"fmt"
"net/http"
"sort"
"strings"
Expand All @@ -23,16 +24,17 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -346,26 +348,31 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.String("changefeedInfo", oldCfInfo.String()),
zap.Any("upstreamInfo", OldUpInfo))

var pdAddrs []string
var credentials *security.Credential
if OldUpInfo != nil {
pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",")
credentials = &security.Credential{
CAPath: OldUpInfo.CAPath,
CertPath: OldUpInfo.CertPath,
KeyPath: OldUpInfo.KeyPath,
CertAllowedCN: OldUpInfo.CertAllowedCN,
}
}
if len(updateCfConfig.PDAddrs) != 0 {
pdAddrs = updateCfConfig.PDAddrs
credentials = updateCfConfig.PDConfig.toCredential()
upManager, err := h.capture.GetUpstreamManager()
if err != nil {
_ = c.Error(err)
return
}

storage, err := h.helpers.createTiStore(pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
var storage tidbkv.Storage
// if PDAddrs is not empty, use it to create a new kvstore
// Note: upManager is nil in some unit test cases
if len(updateCfConfig.PDAddrs) != 0 || upManager == nil {
pdAddrs := updateCfConfig.PDAddrs
credentials := updateCfConfig.PDConfig.toCredential()
storage, err = h.helpers.createTiStore(pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
}
} else { // get the upstream of the changefeed to get the kvstore
up, ok := upManager.Get(oldCfInfo.UpstreamID)
if !ok {
_ = c.Error(errors.New(fmt.Sprintf("upstream %d not found", oldCfInfo.UpstreamID)))
return
}
storage = up.KVStorage
}

newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx,
updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs)
if err != nil {
Expand Down Expand Up @@ -588,24 +595,26 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
return
}

var pdClient pd.Client
// if PDAddrs is empty, use the default pdClient
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
pdClient = up.PDClient
} else {
credential := cfg.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
defer pdClient.Close()
}
defer pdClient.Close()

if err := h.helpers.verifyResumeChangefeedConfig(
ctx,
Expand Down
19 changes: 12 additions & 7 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,15 @@ func TestUpdateChangefeed(t *testing.T) {
t.Parallel()
update := testCase{url: "/api/v2/changefeeds/%s", method: "PUT"}
helpers := NewMockAPIV2Helpers(gomock.NewController(t))
cp := mock_capture.NewMockCapture(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(cp, helpers)
mockCapture := mock_capture.NewMockCapture(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(mockCapture, helpers)
router := newRouter(apiV2)

statusProvider := &mockStatusProvider{}
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()

mockCapture.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
mockCapture.EXPECT().IsReady().Return(true).AnyTimes()
mockCapture.EXPECT().IsOwner().Return(true).AnyTimes()

// case 1 invalid id
invalidID := "#Invalid_"
Expand Down Expand Up @@ -372,7 +373,7 @@ func TestUpdateChangefeed(t *testing.T) {
etcdClient.EXPECT().
GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(100)), gomock.Any()).
Return(nil, cerrors.ErrUpstreamNotFound).Times(1)
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()

w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), update.method,
Expand All @@ -389,7 +390,7 @@ func TestUpdateChangefeed(t *testing.T) {
etcdClient.EXPECT().
GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(1)), gomock.Any()).
Return(nil, nil).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()

w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), update.method,
Expand Down Expand Up @@ -426,6 +427,7 @@ func TestUpdateChangefeed(t *testing.T) {
createTiStore(gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
mockCapture.EXPECT().GetUpstreamManager().Return(nil, nil).AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused).
Expand All @@ -445,6 +447,7 @@ func TestUpdateChangefeed(t *testing.T) {
require.Equal(t, http.StatusBadRequest, w.Code)

// case 7: update transaction failed
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Expand All @@ -468,6 +471,7 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(oldCfInfo, &model.UpstreamInfo{}, nil).
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Expand All @@ -484,6 +488,7 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(oldCfInfo, &model.UpstreamInfo{}, nil).
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Expand Down

0 comments on commit 6a31202

Please sign in to comment.