Skip to content

Commit

Permalink
pdClient (ticdc): Enable the pdClient forwarding function to make cdc…
Browse files Browse the repository at this point in the history
… more stable during network isolation between the PD leader (#11076)

close #10849
  • Loading branch information
asddongmen authored May 13, 2024
1 parent 985f8af commit 3bfba30
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 4 deletions.
3 changes: 2 additions & 1 deletion cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ func (APIV2HelpersImpl) getPDClient(ctx context.Context,
},
MinConnectTimeout: 3 * time.Second,
}),
))
),
pd.WithForwardingOption(config.EnablePDForwarding))
if err != nil {
return nil, cerror.WrapError(cerror.ErrAPIGetPDClientFailed, errors.Trace(err))
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (s *server) prepare(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
))
),
pd.WithForwardingOption(config.EnablePDForwarding))
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ const (
// DisableMemoryLimit is the default max memory percentage for TiCDC server.
// 0 means no memory limit.
DisableMemoryLimit = 0

// EnablePDForwarding is the value of whether to enable PD client forwarding function.
// The PD client will forward the requests throughthe follower
// If there is a network partition problem between TiCDC and PD leader.
EnablePDForwarding = true
)

var (
Expand Down
4 changes: 3 additions & 1 deletion pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func createPDClient(ctx context.Context,
},
MinConnectTimeout: 3 * time.Second,
}),
))
),
pd.WithForwardingOption(config.EnablePDForwarding),
)
}

// Note: we do not use etcd transaction to migrate key
Expand Down
4 changes: 3 additions & 1 deletion pkg/upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/etcd"
Expand Down Expand Up @@ -152,7 +153,8 @@ func initUpstream(ctx context.Context, up *Upstream, cfg CaptureTopologyCfg) err
},
MinConnectTimeout: 3 * time.Second,
}),
))
),
pd.WithForwardingOption(config.EnablePDForwarding))
if err != nil {
up.err.Store(err)
return errors.Trace(err)
Expand Down

0 comments on commit 3bfba30

Please sign in to comment.