Skip to content

Commit

Permalink
scheduler(ticdc): adjust some logic in scheduler (#9296)
Browse files Browse the repository at this point in the history
ref #9058
  • Loading branch information
CharlesCheung96 authored Jun 29, 2023
1 parent 161c70b commit 70eab3f
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 64 deletions.
14 changes: 4 additions & 10 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,21 @@ import (
"go.uber.org/zap"
)

// newSchedulerFromCtx creates a new scheduler from context.
// newScheduler creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
func newScheduler(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig, redoMetaManager redo.MetaManager,
) (ret scheduler.Scheduler, err error) {
) (scheduler.Scheduler, error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ownerRev := ctx.GlobalVars().OwnerRevision
captureID := ctx.GlobalVars().CaptureInfo.ID
ret, err = scheduler.NewScheduler(
ret, err := scheduler.NewScheduler(
ctx, captureID, changeFeedID, messageServer, messageRouter, ownerRev, epoch, up, cfg, redoMetaManager)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig, redoMetaManager redo.MetaManager,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, up, epoch, cfg, redoMetaManager)
}

type changefeed struct {
id model.ChangeFeedID
// state is read-only during the Tick, should only be updated by patch the etcd.
Expand Down
36 changes: 13 additions & 23 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,27 @@ func NewCoordinator(
if err != nil {
return nil, errors.Trace(err)
}
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg, redoMetaManager)
coord.trans = trans
coord.pdClock = up.PDClock
coord.changefeedEpoch = changefeedEpoch
coord.reconciler, err = keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings)
reconciler, err := keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings)
if err != nil {
return nil, errors.Trace(err)
}
return coord, nil
}

func newCoordinator(
captureID model.CaptureID,
changefeedID model.ChangeFeedID,
ownerRevision int64,
cfg *config.SchedulerConfig,
redoMetaManager redo.MetaManager,
) *coordinator {
revision := schedulepb.OwnerRevision{Revision: ownerRevision}

return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
captureID: captureID,
version: version.ReleaseSemver(),
revision: revision,
changefeedEpoch: changefeedEpoch,
captureID: captureID,
trans: trans,
replicationM: replication.NewReplicationManager(
cfg.MaxTaskConcurrency, changefeedID),
captureM: member.NewCaptureManager(captureID, changefeedID, revision, cfg),
schedulerM: scheduler.NewSchedulerManager(changefeedID, cfg),
reconciler: reconciler,
changefeedID: changefeedID,
compat: compat.New(cfg, map[model.CaptureID]*model.CaptureInfo{}),
pdClock: up.PDClock,
redoMetaManager: redoMetaManager,
}
}, nil
}

// Tick implement the scheduler interface
Expand Down Expand Up @@ -225,8 +214,8 @@ func (c *coordinator) DrainCapture(target model.CaptureID) (int, error) {
return count, nil
}

// when draining the capture, tables need to be dispatched to other
// capture except the draining one, so at least should have 2 captures alive.
// when draining the capture, tables need to be dispatched to other capture
// except the draining one, so there should be at least two live captures.
if len(c.captureM.Captures) <= 1 {
log.Warn("schedulerv3: drain capture request ignored, "+
"only one captures alive",
Expand Down Expand Up @@ -286,9 +275,10 @@ func (c *coordinator) poll(
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
c.maybeCollectMetrics()
if c.compat.UpdateCaptureInfo(aliveCaptures) {
spanReplicationEnabled := c.compat.CheckSpanReplicationEnabled()
log.Info("schedulerv3: compat update capture info",
zap.Any("captures", aliveCaptures),
zap.Bool("spanReplicationEnabled", c.compat.CheckSpanReplicationEnabled()))
zap.Bool("spanReplicationEnabled", spanReplicationEnabled))
}

recvMsgs, err := c.recvMsgs(ctx)
Expand Down
26 changes: 25 additions & 1 deletion cdc/scheduler/internal/v3/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/leakutil"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/version"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -201,8 +202,31 @@ func TestCoordinatorTransportCompat(t *testing.T) {
}}, msgs)
}

func newCoordinatorForTest(
captureID model.CaptureID,
changefeedID model.ChangeFeedID,
ownerRevision int64,
cfg *config.SchedulerConfig,
redoMetaManager redo.MetaManager,
) *coordinator {
revision := schedulepb.OwnerRevision{Revision: ownerRevision}

return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
captureID: captureID,
replicationM: replication.NewReplicationManager(
cfg.MaxTaskConcurrency, changefeedID),
captureM: member.NewCaptureManager(captureID, changefeedID, revision, cfg),
schedulerM: scheduler.NewSchedulerManager(changefeedID, cfg),
changefeedID: changefeedID,
compat: compat.New(cfg, map[model.CaptureID]*model.CaptureInfo{}),
redoMetaManager: redoMetaManager,
}
}

func newTestCoordinator(cfg *config.SchedulerConfig) (*coordinator, *transport.MockTrans) {
coord := newCoordinator("a", model.ChangeFeedID{}, 1, cfg, redo.NewDisabledMetaManager())
coord := newCoordinatorForTest("a", model.ChangeFeedID{}, 1, cfg, redo.NewDisabledMetaManager())
trans := transport.NewMockTrans()
coord.trans = trans
coord.reconciler = keyspan.NewReconcilerForTests(
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/info_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestInfoProvider(t *testing.T) {
MaxTaskConcurrency: 1,
ChangefeedSettings: config.GetDefaultReplicaConfig().Scheduler,
}
coord := newCoordinator("a", model.ChangeFeedID{}, 1, cfg, redo.NewDisabledMetaManager())
coord := newCoordinatorForTest("a", model.ChangeFeedID{}, 1, cfg, redo.NewDisabledMetaManager())
cfg.ChangefeedSettings = config.GetDefaultReplicaConfig().Scheduler
coord.reconciler = keyspan.NewReconcilerForTests(
keyspan.NewMockRegionCache(), cfg.ChangefeedSettings)
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestInfoProvider(t *testing.T) {
func TestInfoProviderIsInitialized(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinatorForTest("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
ChangefeedSettings: config.GetDefaultReplicaConfig().Scheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r *Manager) HandleCaptureChanges(
var err error
spanStatusMap.Ascend(func(span tablepb.Span, status map[string]*tablepb.TableStatus) bool {
table, err1 := NewReplicationSet(span, checkpointTs, status, r.changefeedID)
if err != nil {
if err1 != nil {
err = errors.Trace(err1)
return false
}
Expand Down
16 changes: 10 additions & 6 deletions cdc/scheduler/internal/v3/replication/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,21 +813,25 @@ func (r *ReplicationSet) handleAddTable(
zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID))
return nil, nil
}
oldState := r.State
r.State = ReplicationSetStateAbsent
err := r.setCapture(captureID, RoleSecondary)
if err != nil {
return nil, errors.Trace(err)
}
log.Info("schedulerv3: replication state transition, add table",
zap.Any("replicationSet", r),
zap.Stringer("old", oldState), zap.Stringer("new", r.State))
oldState := r.State
status := tablepb.TableStatus{
Span: r.Span,
State: tablepb.TableStateAbsent,
Checkpoint: tablepb.Checkpoint{},
}
return r.poll(&status, captureID)
msgs, err := r.poll(&status, captureID)
if err != nil {
return nil, errors.Trace(err)
}

log.Info("schedulerv3: replication state transition, add table",
zap.Any("replicationSet", r),
zap.Stringer("old", oldState), zap.Stringer("new", r.State))
return msgs, nil
}

func (r *ReplicationSet) handleMoveTable(
Expand Down
5 changes: 0 additions & 5 deletions cdc/scheduler/internal/v3/scheduler/scheduler_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
package scheduler

import (
"math/rand"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -37,14 +34,12 @@ var _ scheduler = &basicScheduler{}
// 3. Capture offline.
type basicScheduler struct {
batchSize int
random *rand.Rand
changefeedID model.ChangeFeedID
}

func newBasicScheduler(batchSize int, changefeed model.ChangeFeedID) *basicScheduler {
return &basicScheduler{
batchSize: batchSize,
random: rand.New(rand.NewSource(time.Now().UnixNano())),
changefeedID: changefeed,
}
}
Expand Down
27 changes: 11 additions & 16 deletions pkg/spanz/btree_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,29 +141,24 @@ func (m *BtreeMap[T]) FindHoles(start, end tablepb.Span) ([]tablepb.Span, []tabl
m.cache.coveredSpans = m.cache.coveredSpans[:0]
m.cache.holes = m.cache.holes[:0]

firstSpan := true
var lastSpan tablepb.Span
lastSpan := tablepb.Span{
StartKey: start.StartKey,
EndKey: start.StartKey,
}
m.AscendRange(start, end, func(current tablepb.Span, _ T) bool {
if firstSpan {
ord := bytes.Compare(start.StartKey, current.StartKey)
if ord < 0 {
m.cache.holes = append(m.cache.holes, tablepb.Span{
StartKey: start.StartKey,
EndKey: current.StartKey,
})
} else if ord > 0 {
log.Panic("map is out of order",
zap.String("start", start.String()),
zap.String("current", current.String()))
}
firstSpan = false
} else if !bytes.Equal(lastSpan.EndKey, current.StartKey) {
ord := bytes.Compare(lastSpan.EndKey, current.StartKey)
if ord < 0 {
// Find a hole.
m.cache.holes = append(m.cache.holes, tablepb.Span{
StartKey: lastSpan.EndKey,
EndKey: current.StartKey,
})
} else if ord > 0 {
log.Panic("map is out of order",
zap.String("lastSpan", lastSpan.String()),
zap.String("current", current.String()))
}

lastSpan = current
m.cache.coveredSpans = append(m.cache.coveredSpans, current)
return true
Expand Down

0 comments on commit 70eab3f

Please sign in to comment.