Skip to content

Commit

Permalink
Merge branch 'master' into sink-manager-task-fast-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Nov 23, 2023
2 parents 3c57013 + 5fc3d25 commit 7ccbbdb
Show file tree
Hide file tree
Showing 18 changed files with 1,016 additions and 877 deletions.
3 changes: 0 additions & 3 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ func (ti *TableInfo) initColumnsFlag() {
if mysql.HasUnsignedFlag(colInfo.GetFlag()) {
flag.SetIsUnsigned()
}
if mysql.HasZerofillFlag(colInfo.GetFlag()) {
flag.SetZeroFill()
}
ti.ColumnsFlag[colInfo.ID] = flag
}

Expand Down
12 changes: 0 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,8 @@ const (
NullableFlag
// UnsignedFlag means the column stores an unsigned integer
UnsignedFlag
// ZerofillFlag means the column is zerofill
ZerofillFlag
)

// SetZeroFill sets ZerofillFlag
func (b *ColumnFlagType) SetZeroFill() {
(*util.Flag)(b).Add(util.Flag(ZerofillFlag))
}

// IsZerofill shows whether ZerofillFlag is set
func (b *ColumnFlagType) IsZerofill() bool {
return (*util.Flag)(b).HasAll(util.Flag(ZerofillFlag))
}

// SetIsBinary sets BinaryFlag
func (b *ColumnFlagType) SetIsBinary() {
(*util.Flag)(b).Add(util.Flag(BinaryFlag))
Expand Down
4 changes: 4 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (m *mockDDLSink) emitCheckpointTs(ts uint64, tables []*model.TableInfo) {
m.mu.currentTables = tables
}

func (m *mockDDLSink) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
return nil
}

func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
73 changes: 73 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -93,6 +94,14 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{
timodel.ActionRemovePartitioning: {},
}

type bootstrapState int32

const (
bootstrapStateNone bootstrapState = iota
bootstrapStateRunning
bootstrapStateCompleted
)

// ddlManager holds the pending DDL events of all tables and responsible for
// executing them to downstream.
// It also provides the ability to calculate the barrier of a changefeed.
Expand Down Expand Up @@ -130,6 +139,10 @@ type ddlManager struct {
BDRMode bool
sinkType model.DownstreamType
ddlResolvedTs model.Ts

needBootstrap bool
errCh chan error
bootstrapState int32
}

func newDDLManager(
Expand Down Expand Up @@ -167,6 +180,7 @@ func newDDLManager(
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
errCh: make(chan error, 1),
}
}

Expand All @@ -184,6 +198,18 @@ func (m *ddlManager) tick(
checkpointTs model.Ts,
tableCheckpoint map[model.TableName]model.Ts,
) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) {
// needBootstrap is true when the downstream is kafka
// and the protocol is simple protocol.
if m.needBootstrap {
ok, err := m.checkAndBootstrap(ctx)
if err != nil {
return nil, nil, err
}
if !ok {
return nil, nil, nil
}
}

m.justSentDDL = nil
m.updateCheckpointTs(checkpointTs, tableCheckpoint)

Expand Down Expand Up @@ -598,6 +624,53 @@ func (m *ddlManager) cleanCache() {
m.physicalTablesCache = nil
}

func (m *ddlManager) checkAndBootstrap(ctx context.Context) (bool, error) {
if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateCompleted) {
return true, nil
}

select {
case err := <-m.errCh:
return false, err
default:
}

if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateRunning) {
return false, nil
}
// begin bootstrap
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateRunning))
tables, err := m.allTables(ctx)
if err != nil {
return false, err
}
bootstrapEvents := make([]*model.DDLEvent, 0, len(tables))
for _, table := range tables {
ddlEvent := &model.DDLEvent{
StartTs: m.startTs,
CommitTs: m.startTs,
TableInfo: table,
IsBootstrap: true,
}
bootstrapEvents = append(bootstrapEvents, ddlEvent)
}
// send bootstrap events
go func() {
for _, event := range bootstrapEvents {
err := m.ddlSink.emitBootstrapEvent(ctx, event)
if err != nil {
log.Error("emit bootstrap event failed",
zap.Any("bootstrapEvent", event), zap.Error(err))
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateNone))
m.errCh <- err
return
}
}
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateCompleted))
}()
return false, nil
}

// getRelatedPhysicalTableIDs get all related physical table ids of a ddl event.
// It is a helper function to calculate tableBarrier.
func getRelatedPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID {
Expand Down
17 changes: 17 additions & 0 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type DDLSink interface {
// the DDL event will be sent to another goroutine and execute to downstream
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
Expand Down Expand Up @@ -384,6 +385,22 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
return false, nil
}

// emitBootstrapEvent sent bootstrap event to downstream.
// It is a synchronous operation.
func (s *ddlSinkImpl) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
if !ddl.IsBootstrap {
return nil
}
err := s.sink.WriteDDLEvent(ctx, ddl)
if err != nil {
return errors.Trace(err)
}
// TODO: change this log to debug level after testing complete.
log.Info("emit bootstrap event", zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID), zap.Any("bootstrapEvent", ddl))
return nil
}

func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (err error) {
if checkpointTs == s.lastSyncPoint {
return nil
Expand Down
22 changes: 14 additions & 8 deletions cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -60,8 +59,6 @@ func TestNewKafkaDMLSinkFailed(t *testing.T) {
}

func TestWriteEvents(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -84,12 +81,21 @@ func TestWriteEvents(t *testing.T) {
require.NotNil(t, s)
defer s.Close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

tableStatus := state.TableSinkSinking
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
Loading

0 comments on commit 7ccbbdb

Please sign in to comment.