Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddlManager (ticdc): add bootstrap sending function #10045

Merged
merged 9 commits into from
Nov 23, 2023
4 changes: 4 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (m *mockDDLSink) emitCheckpointTs(ts uint64, tables []*model.TableInfo) {
m.mu.currentTables = tables
}

func (m *mockDDLSink) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
return nil
}

func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
73 changes: 73 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -93,6 +94,14 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{
timodel.ActionRemovePartitioning: {},
}

type bootstrapState int32

const (
bootstrapStateNone bootstrapState = iota
bootstrapStateRunning
bootstrapStateCompleted
)

// ddlManager holds the pending DDL events of all tables and responsible for
// executing them to downstream.
// It also provides the ability to calculate the barrier of a changefeed.
Expand Down Expand Up @@ -130,6 +139,10 @@ type ddlManager struct {
BDRMode bool
sinkType model.DownstreamType
ddlResolvedTs model.Ts

needBootstrap bool
errCh chan error
bootstrapState int32
}

func newDDLManager(
Expand Down Expand Up @@ -167,6 +180,7 @@ func newDDLManager(
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
errCh: make(chan error, 1),
}
}

Expand All @@ -184,6 +198,18 @@ func (m *ddlManager) tick(
checkpointTs model.Ts,
tableCheckpoint map[model.TableName]model.Ts,
) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) {
// needBootstrap is true when the downstream is kafka
// and the protocol is simple protocol.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is redundant.

You can move it to the declaration of the needBootstrap variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After next PR merged, it will be remove.

if m.needBootstrap {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
ok, err := m.checkAndBootstrap(ctx)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, err
}
if !ok {
return nil, nil, nil
}
}

m.justSentDDL = nil
m.updateCheckpointTs(checkpointTs, tableCheckpoint)

Expand Down Expand Up @@ -598,6 +624,53 @@ func (m *ddlManager) cleanCache() {
m.physicalTablesCache = nil
}

func (m *ddlManager) checkAndBootstrap(ctx context.Context) (bool, error) {
if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateCompleted) {
return true, nil
}

select {
case err := <-m.errCh:
return false, err
default:
}

if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateRunning) {
return false, nil
}
// begin bootstrap
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateRunning))
tables, err := m.allTables(ctx)
if err != nil {
return false, err
}
bootstrapEvents := make([]*model.DDLEvent, 0, len(tables))
for _, table := range tables {
ddlEvent := &model.DDLEvent{
StartTs: m.startTs,
CommitTs: m.startTs,
TableInfo: table,
IsBootstrap: true,
}
bootstrapEvents = append(bootstrapEvents, ddlEvent)
}
// send bootstrap events
go func() {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
for _, event := range bootstrapEvents {
err := m.ddlSink.emitBootstrapEvent(ctx, event)
if err != nil {
log.Error("emit bootstrap event failed",
zap.Any("bootstrapEvent", event), zap.Error(err))
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateNone))
m.errCh <- err
return
}
}
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateCompleted))
}()
return false, nil
}

// getRelatedPhysicalTableIDs get all related physical table ids of a ddl event.
// It is a helper function to calculate tableBarrier.
func getRelatedPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID {
Expand Down
17 changes: 17 additions & 0 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type DDLSink interface {
// the DDL event will be sent to another goroutine and execute to downstream
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
Expand Down Expand Up @@ -384,6 +385,22 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
return false, nil
}

// emitBootstrapEvent sent bootstrap event to downstream.
// It is a synchronous operation.
func (s *ddlSinkImpl) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
if !ddl.IsBootstrap {
return nil
}
err := s.sink.WriteDDLEvent(ctx, ddl)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
// TODO: change this log to debug level after testing complete.
log.Info("emit bootstrap event", zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID), zap.Any("bootstrapEvent", ddl))
return nil
}

func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (err error) {
if checkpointTs == s.lastSyncPoint {
return nil
Expand Down
Loading