Skip to content

Commit

Permalink
redo, processor(ticdc): set flushed resolvedTs when start table (#9281)
Browse files Browse the repository at this point in the history
close #9172
  • Loading branch information
CharlesCheung96 authored Jun 29, 2023
1 parent dcf6f85 commit 7497ea6
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 83 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewManager4Test(
changefeedEpoch uint64,
cfg *config.SchedulerConfig,
) *processor {
return newProcessor4Test(t, state, captureInfo, m.liveness, cfg)
return newProcessor4Test(t, state, captureInfo, m.liveness, cfg, false)
}
return m
}
Expand Down
14 changes: 5 additions & 9 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,13 @@ var _ scheduler.TableExecutor = (*processor)(nil)
// 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true.
// 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false
func (p *processor) AddTableSpan(
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, barrier *schedulepb.Barrier,
ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool,
) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
}

startTs := checkpoint.CheckpointTs
if startTs == 0 {
log.Panic("table start ts must not be 0",
zap.String("captureID", p.captureInfo.ID),
Expand Down Expand Up @@ -144,14 +145,9 @@ func (p *processor) AddTableSpan(
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
if p.redo.r.Enabled() {
var redoResolvedTs model.Ts
if barrier != nil {
redoResolvedTs = barrier.GlobalBarrierTs
} else {
stats := p.sinkManager.r.GetTableStats(span)
redoResolvedTs = stats.BarrierTs
}
p.redo.r.StartTable(span, redoResolvedTs)
// ResolvedTs is store in external storage when redo log is enabled, so we need to
// start table with ResolvedTs in redoDMLManager.
p.redo.r.StartTable(span, checkpoint.ResolvedTs)
}
if err := p.sinkManager.r.StartTable(span, startTs); err != nil {
return false, errors.Trace(err)
Expand Down
172 changes: 143 additions & 29 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
redoPkg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
Expand All @@ -47,6 +48,7 @@ func newProcessor4Test(
captureInfo *model.CaptureInfo,
liveness *model.Liveness,
cfg *config.SchedulerConfig,
enableRedo bool,
) *processor {
changefeedID := model.ChangeFeedID4Test("processor-test", "processor-test")
up := upstream.NewUpstream4Test(&sinkmanager.MockPD{})
Expand All @@ -61,9 +63,28 @@ func newProcessor4Test(
return nil
}

if !enableRedo {
p.redo.r = redo.NewDisabledDMLManager()
} else {
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
require.NoError(t, err)
p.redo.r = dmlMgr
}
p.redo.name = "RedoManager"
p.redo.changefeedID = changefeedID
p.redo.spawn(ctx)

p.agent = &mockAgent{executor: p, liveness: liveness}
p.sinkManager.r, p.sourceManager.r, _ = sinkmanager.NewManagerWithMemEngine(
t, changefeedID, state.Info)
t, changefeedID, state.Info, p.redo.r)
p.sinkManager.name = "SinkManager"
p.sinkManager.changefeedID = changefeedID
p.sinkManager.spawn(ctx)
Expand All @@ -75,11 +96,6 @@ func newProcessor4Test(
// otherwise the sinkManager will not receive the resolvedTs.
p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs)

p.redo.r = redo.NewDisabledDMLManager()
p.redo.name = "RedoManager"
p.redo.changefeedID = changefeedID
p.redo.spawn(ctx)

p.initialized = true
return nil
}
Expand All @@ -91,7 +107,7 @@ func newProcessor4Test(
}

func initProcessor4Test(
ctx cdcContext.Context, t *testing.T, liveness *model.Liveness,
ctx cdcContext.Context, t *testing.T, liveness *model.Liveness, enableRedo bool,
) (*processor, *orchestrator.ReactorStateTester) {
changefeedInfo := `
{
Expand Down Expand Up @@ -132,7 +148,7 @@ func initProcessor4Test(
etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID)
captureInfo := &model.CaptureInfo{ID: "capture-test", AdvertiseAddr: "127.0.0.1:0000"}
cfg := config.NewDefaultSchedulerConfig()
p := newProcessor4Test(t, changefeed, captureInfo, liveness, cfg)
p := newProcessor4Test(t, changefeed, captureInfo, liveness, cfg, enableRedo)

captureID := ctx.GlobalVars().CaptureInfo.ID
changefeedID := ctx.ChangefeedVars().ID
Expand Down Expand Up @@ -194,7 +210,7 @@ func (a *mockAgent) Close() error {
func TestTableExecutorAddingTableIndirectly(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)

// init tick
err := p.Tick(ctx)
Expand All @@ -213,7 +229,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {

// table-1: `preparing` -> `prepared` -> `replicating`
span := spanz.TableIDToComparableSpan(1)
ok, err := p.AddTableSpan(ctx, span, 20, true, nil)
ok, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 20}, true)
require.NoError(t, err)
require.True(t, ok)
p.sinkManager.r.UpdateBarrierTs(20, nil)
Expand Down Expand Up @@ -252,7 +268,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.True(t, ok)
require.Equal(t, tablepb.TableStatePrepared, state)

ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true, nil)
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, true)
require.NoError(t, err)
require.True(t, ok)
stats = p.sinkManager.r.GetTableStats(span)
Expand All @@ -261,7 +277,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.Equal(t, model.Ts(20), stats.BarrierTs)

// Start to replicate table-1.
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false, nil)
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, false)
require.NoError(t, err)
require.True(t, ok)

Expand All @@ -279,10 +295,108 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.Nil(t, p.agent)
}

func TestTableExecutorAddingTableIndirectlyWithRedoEnabled(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness, true)

// init tick
err := p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()
p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
status.CheckpointTs = 20
return status, true, nil
})
tester.MustApplyPatches()

// no operation
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()

// table-1: `preparing` -> `prepared` -> `replicating`
span := spanz.TableIDToComparableSpan(1)
ok, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 20}, true)
require.NoError(t, err)
require.True(t, ok)
p.sinkManager.r.UpdateBarrierTs(20, nil)
stats := p.sinkManager.r.GetTableStats(span)
require.Equal(t, model.Ts(20), stats.CheckpointTs)
require.Equal(t, model.Ts(20), stats.ResolvedTs)
require.Equal(t, model.Ts(20), stats.BarrierTs)
require.Len(t, p.sinkManager.r.GetAllCurrentTableSpans(), 1)
require.Equal(t, 1, p.sinkManager.r.GetAllCurrentTableSpansCount())

done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true)
require.False(t, done)
state, ok := p.sinkManager.r.GetTableState(span)
require.True(t, ok)
require.Equal(t, tablepb.TableStatePreparing, state)

// Push the resolved ts, mock that sorterNode receive first resolved event.
p.sourceManager.r.Add(
span,
[]*model.PolymorphicEvent{{
CRTs: 101,
RawKV: &model.RawKVEntry{
OpType: model.OpTypeResolved,
CRTs: 101,
},
}}...,
)

err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()

done = p.IsAddTableSpanFinished(span, true)
require.True(t, done)
state, ok = p.sinkManager.r.GetTableState(span)
require.True(t, ok)
require.Equal(t, tablepb.TableStatePrepared, state)

// ignore duplicate add request
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, true)
require.NoError(t, err)
require.True(t, ok)
stats = p.sinkManager.r.GetTableStats(span)
require.Equal(t, model.Ts(20), stats.CheckpointTs)
require.Equal(t, model.Ts(20), stats.ResolvedTs)
require.Equal(t, model.Ts(20), stats.BarrierTs)

p.sinkManager.r.UpdateBarrierTs(50, nil)
stats = p.sinkManager.r.GetTableStats(span)
require.Equal(t, model.Ts(20), stats.ResolvedTs)
require.Equal(t, model.Ts(50), stats.BarrierTs)

// Start to replicate table-1.
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30, ResolvedTs: 60}, false)
require.NoError(t, err)
require.True(t, ok)

stats = p.sinkManager.r.GetTableStats(span)
require.Equal(t, model.Ts(60), stats.ResolvedTs)
require.Equal(t, model.Ts(50), stats.BarrierTs)

err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()

// table-1: `prepared` -> `replicating`
state, ok = p.sinkManager.r.GetTableState(span)
require.True(t, ok)
require.Equal(t, tablepb.TableStateReplicating, state)

err = p.Close()
require.Nil(t, err)
require.Nil(t, p.agent)
}

func TestProcessorError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)
// init tick
err := p.Tick(ctx)
require.Nil(t, err)
Expand All @@ -302,7 +416,7 @@ func TestProcessorError(t *testing.T) {
},
})

p, tester = initProcessor4Test(ctx, t, &liveness)
p, tester = initProcessor4Test(ctx, t, &liveness, false)
// init tick
err = p.Tick(ctx)
require.Nil(t, err)
Expand All @@ -321,7 +435,7 @@ func TestProcessorError(t *testing.T) {
func TestProcessorExit(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)
var err error
// init tick
err = p.Tick(ctx)
Expand All @@ -345,7 +459,7 @@ func TestProcessorExit(t *testing.T) {
func TestProcessorClose(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)
// init tick
err := p.Tick(ctx)
require.Nil(t, err)
Expand All @@ -357,10 +471,10 @@ func TestProcessorClose(t *testing.T) {
tester.MustApplyPatches()

// add tables
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false, nil)
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false, nil)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false)
require.Nil(t, err)
require.True(t, done)

Expand All @@ -384,7 +498,7 @@ func TestProcessorClose(t *testing.T) {
require.Nil(t, p.sourceManager.r)
require.Nil(t, p.agent)

p, tester = initProcessor4Test(ctx, t, &liveness)
p, tester = initProcessor4Test(ctx, t, &liveness, false)
// init tick
err = p.Tick(ctx)
require.Nil(t, err)
Expand All @@ -396,10 +510,10 @@ func TestProcessorClose(t *testing.T) {
tester.MustApplyPatches()

// add tables
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false, nil)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false, nil)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false)
require.Nil(t, err)
require.True(t, done)
err = p.Tick(ctx)
Expand Down Expand Up @@ -428,7 +542,7 @@ func TestProcessorClose(t *testing.T) {
func TestPositionDeleted(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)
// init tick
err := p.Tick(ctx)
require.Nil(t, err)
Expand All @@ -441,10 +555,10 @@ func TestPositionDeleted(t *testing.T) {
tester.MustApplyPatches()

// add table
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false, nil)
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, false)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 40, false, nil)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 40}, false)
require.Nil(t, err)
require.True(t, done)

Expand All @@ -469,7 +583,7 @@ func TestPositionDeleted(t *testing.T) {
func TestSchemaGC(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)

var err error
// init tick
Expand Down Expand Up @@ -531,7 +645,7 @@ func TestIgnorableError(t *testing.T) {
func TestUpdateBarrierTs(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)
p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
status.CheckpointTs = 5
return status, true, nil
Expand All @@ -550,7 +664,7 @@ func TestUpdateBarrierTs(t *testing.T) {
tester.MustApplyPatches()

span := spanz.TableIDToComparableSpan(1)
done, err := p.AddTableSpan(ctx, span, 5, false, nil)
done, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 5}, false)
require.True(t, done)
require.Nil(t, err)
err = p.Tick(ctx)
Expand Down Expand Up @@ -584,7 +698,7 @@ func TestUpdateBarrierTs(t *testing.T) {
func TestProcessorLiveness(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)

// First tick for creating position.
err := p.Tick(ctx)
Expand Down Expand Up @@ -619,7 +733,7 @@ func TestProcessorDostNotStuckInInit(t *testing.T) {

ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p, tester := initProcessor4Test(ctx, t, &liveness, false)

// First tick for creating position.
err := p.Tick(ctx)
Expand Down
Loading

0 comments on commit 7497ea6

Please sign in to comment.