Skip to content

Commit

Permalink
Merge branch 'master' into column-selector
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 27, 2023
2 parents 04a9b67 + 0c29040 commit 423ec01
Show file tree
Hide file tree
Showing 105 changed files with 4,761 additions and 1,352 deletions.
9 changes: 6 additions & 3 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
DispatcherRule: "",
PartitionRule: rule.PartitionRule,
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
})
}
Expand Down Expand Up @@ -554,6 +555,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Matcher: rule.Matcher,
PartitionRule: rule.PartitionRule,
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
})
}
Expand Down Expand Up @@ -915,9 +917,10 @@ type LargeMessageHandleConfig struct {
// This is a duplicate of config.DispatchRule
type DispatchRule struct {
Matcher []string `json:"matcher,omitempty"`
PartitionRule string `json:"partition"`
IndexName string `json:"index"`
TopicRule string `json:"topic"`
PartitionRule string `json:"partition,omitempty"`
IndexName string `json:"index,omitempty"`
Columns []string `json:"columns,omitempty"`
TopicRule string `json:"topic,omitempty"`
}

// ColumnSelector represents a column selector for a table.
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,12 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
ctx, cancelOwner := context.WithCancel(ctx)
ownerCtx := cdcContext.NewContext(ctx, newGlobalVars)
g.Go(func() error {
return c.runEtcdWorker(ownerCtx, owner,
return c.runEtcdWorker(ownerCtx, owner.(orchestrator.Reactor),
orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL),
ownerFlushInterval, util.RoleOwner.String())
})
g.Go(func() error {
er := c.runEtcdWorker(ownerCtx, controller,
er := c.runEtcdWorker(ownerCtx, controller.(orchestrator.Reactor),
globalState,
// todo: do not use owner flush interval
ownerFlushInterval, util.RoleController.String())
Expand Down
6 changes: 4 additions & 2 deletions cdc/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ const versionInconsistentLogRate = 1

// Controller is a manager to schedule changefeeds
type Controller interface {
orchestrator.Reactor
AsyncStop()
GetChangefeedOwnerCaptureInfo(id model.ChangeFeedID) *model.CaptureInfo
GetAllChangeFeedInfo(ctx context.Context) (
Expand All @@ -63,7 +62,10 @@ type Controller interface {
) error
}

var _ Controller = &controllerImpl{}
var (
_ orchestrator.Reactor = &controllerImpl{}
_ Controller = &controllerImpl{}
)

type controllerImpl struct {
changefeeds map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState
Expand Down
16 changes: 0 additions & 16 deletions cdc/controller/mock/controller_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,10 @@ func newEventFeedSession(
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
id := allocID()
idStr := strconv.FormatUint(id, 10)
rangeLock := regionlock.NewRegionRangeLock(
totalSpan.StartKey, totalSpan.EndKey, startTs,
id, totalSpan.StartKey, totalSpan.EndKey, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)
return &eventFeedSession{
client: client,
Expand All @@ -389,7 +390,7 @@ func newEventFeedSession(
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
id: id,
id: idStr,
regionChSizeGauge: clientChannelSize.WithLabelValues("region"),
errChSizeGauge: clientChannelSize.WithLabelValues("err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
Expand Down
5 changes: 5 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ var (
[]string{"namespace", "changefeed"})
)

// GetGlobalGrpcMetrics gets the global grpc metrics.
func GetGlobalGrpcMetrics() *grpc_prometheus.ClientMetrics {
return grpcMetrics
}

// InitMetrics registers all metrics in the kv package
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(eventFeedErrorCounter)
Expand Down
24 changes: 12 additions & 12 deletions cdc/kv/regionlock/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,6 @@ func (e *rangeLockEntry) String() string {
len(e.waiters))
}

var currentID uint64 = 0

func allocID() uint64 {
return atomic.AddUint64(&currentID, 1)
}

// RegionRangeLock is specifically used for kv client to manage exclusive region ranges. Acquiring lock will be blocked
// if part of its range is already locked. It also manages checkpoint ts of all ranges. The ranges are marked by a
// version number, which should comes from the Region's Epoch version. The version is used to compare which range is
Expand All @@ -166,10 +160,11 @@ type RegionRangeLock struct {

// NewRegionRangeLock creates a new RegionRangeLock.
func NewRegionRangeLock(
id uint64,
startKey, endKey []byte, startTs uint64, changefeedLogInfo string,
) *RegionRangeLock {
return &RegionRangeLock{
id: allocID(),
id: id,
totalSpan: tablepb.Span{StartKey: startKey, EndKey: endKey},
changefeedLogInfo: changefeedLogInfo,
rangeCheckpointTs: newRangeTsMap(startKey, endKey, startTs),
Expand Down Expand Up @@ -489,9 +484,12 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs(

lastEnd := l.totalSpan.StartKey
l.rangeLock.Ascend(func(item *rangeLockEntry) bool {
action(item.regionID, &item.state)

r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, item.startKey) < 0
if action != nil {
action(item.regionID, &item.state)
}
if spanz.EndCompare(lastEnd, item.startKey) < 0 {
r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey})
}
ckpt := item.state.CheckpointTs.Load()
if ckpt > r.FastestRegion.CheckpointTs {
r.FastestRegion.RegionID = item.regionID
Expand All @@ -508,13 +506,15 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs(
lastEnd = item.endKey
return true
})
r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0
if spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 {
r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: l.totalSpan.EndKey})
}
return
}

// CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`.
type CollectedLockedRangeAttrs struct {
HoleExists bool
Holes []tablepb.Span
FastestRegion LockedRangeAttrs
SlowestRegion LockedRangeAttrs
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/regionlock/region_range_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestRegionRangeLock(t *testing.T) {
t.Parallel()

ctx := context.TODO()
l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("h"), math.MaxUint64, "")
mustLockRangeSuccess(ctx, t, l, "a", "e", 1, 1, math.MaxUint64)
unlockRange(l, "a", "e", 1, 1, 100)

Expand All @@ -107,7 +107,7 @@ func TestRegionRangeLock(t *testing.T) {
func TestRegionRangeLockStale(t *testing.T) {
t.Parallel()

l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "")
ctx := context.TODO()
mustLockRangeSuccess(ctx, t, l, "c", "g", 1, 10, math.MaxUint64)
mustLockRangeSuccess(ctx, t, l, "j", "n", 2, 8, math.MaxUint64)
Expand All @@ -130,7 +130,7 @@ func TestRegionRangeLockLockingRegionID(t *testing.T) {
t.Parallel()

ctx := context.TODO()
l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "")
mustLockRangeSuccess(ctx, t, l, "c", "d", 1, 10, math.MaxUint64)

mustLockRangeStale(ctx, t, l, "e", "f", 1, 5, "e", "f")
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "")
mustLockRangeSuccess(ctx, t, l, "g", "h", 1, 10, math.MaxUint64)
wait := mustLockRangeWait(ctx, t, l, "g", "h", 1, 12)
cancel()
Expand Down
56 changes: 53 additions & 3 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package kv
import (
"context"
"encoding/binary"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
kvclientv2 "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -246,6 +249,7 @@ func (s *SharedClient) Run(ctx context.Context) error {
g.Go(func() error { return s.requestRegionToStore(ctx, g) })
g.Go(func() error { return s.handleErrors(ctx) })
g.Go(func() error { return s.resolveLock(ctx) })
g.Go(func() error { return s.logSlowRegions(ctx) })

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
Expand Down Expand Up @@ -406,7 +410,7 @@ func (s *SharedClient) createRegionRequest(sri singleRegionInfo) *cdcpb.ChangeDa

func (s *SharedClient) appendRequest(r *requestedStore, sri singleRegionInfo) {
offset := r.nextStream.Add(1) % uint32(len(r.streams))
log.Debug("event feed will request a region",
log.Info("event feed will request a region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("streamID", r.streams[offset].streamID),
Expand Down Expand Up @@ -572,7 +576,7 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo)
switch eerr := err.(type) {
case *eventError:
innerErr := eerr.err
log.Debug("cdc error",
log.Info("cdc region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.requestedTable.subscriptionID),
Expand Down Expand Up @@ -689,12 +693,58 @@ func (s *SharedClient) resolveLock(ctx context.Context) error {
}
}

func (s *SharedClient) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
for subscriptionID, rt := range s.totalSpans.v {
attr := rt.rangeLock.CollectLockedRangeAttrs(nil)
if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Info("event feed finds a slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
for _, hole := range attr.Holes {
holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey))
}
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.String("holes", strings.Join(holes, ", ")))
}
}
s.totalSpans.RUnlock()
}
}

func (s *SharedClient) newRequestedTable(
subID SubscriptionID, span tablepb.Span, startTs uint64,
eventCh chan<- MultiplexingEvent,
) *requestedTable {
cfName := s.changefeed.String()
rangeLock := regionlock.NewRegionRangeLock(span.StartKey, span.EndKey, startTs, cfName)
rangeLock := regionlock.NewRegionRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs, cfName)

rt := &requestedTable{
subscriptionID: subID,
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) {

pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}

grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{})
grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil)

regionCache := tikv.NewRegionCache(pdClient)

Expand Down
Loading

0 comments on commit 423ec01

Please sign in to comment.